♻️ refactor: simplify streaming message lifecycle management

- Remove unused persistence tracking variables in message handler
- Simplify finalizeResponse logic by removing unnecessary checks
- Change 'finish' event type to 'complete' for consistency
- Add debug logging for streaming events
- Clean up dead code and improve readability
This commit is contained in:
Vaayne 2025-09-20 00:14:02 +08:00
parent d56521260c
commit 17df1db120
2 changed files with 6 additions and 19 deletions

View File

@ -49,8 +49,6 @@ export const createMessage = async (req: Request, res: Response): Promise<void>
// Track stream lifecycle so we keep the SSE connection open until persistence finishes // Track stream lifecycle so we keep the SSE connection open until persistence finishes
let responseEnded = false let responseEnded = false
let streamFinished = false let streamFinished = false
let awaitingPersistence = false
const persistenceResolved = false
const finalizeResponse = () => { const finalizeResponse = () => {
if (responseEnded) { if (responseEnded) {
@ -61,10 +59,6 @@ export const createMessage = async (req: Request, res: Response): Promise<void>
return return
} }
if (awaitingPersistence && !persistenceResolved) {
return
}
responseEnded = true responseEnded = true
try { try {
res.write('data: {"type":"finish"}\n\n') res.write('data: {"type":"finish"}\n\n')
@ -136,7 +130,6 @@ export const createMessage = async (req: Request, res: Response): Promise<void>
// res.write(`data: ${JSON.stringify({ type: 'complete', result: event.result })}\n\n`) // res.write(`data: ${JSON.stringify({ type: 'complete', result: event.result })}\n\n`)
streamFinished = true streamFinished = true
awaitingPersistence = true
finalizeResponse() finalizeResponse()
break break
} }
@ -151,7 +144,8 @@ export const createMessage = async (req: Request, res: Response): Promise<void>
default: default:
// Handle other event types as generic data // Handle other event types as generic data
res.write(`data: ${JSON.stringify(event)}\n\n`) logger.info(`Streaming message event for session: ${sessionId}:`, { event })
// res.write(`data: ${JSON.stringify(event)}\n\n`)
break break
} }
} catch (writeError) { } catch (writeError) {

View File

@ -238,22 +238,15 @@ export class SessionMessageService extends BaseService {
error: serializeError(underlyingError), error: serializeError(underlyingError),
persistScheduled: false persistScheduled: false
}) })
// Always emit a finish chunk at the end // Always emit a complete chunk at the end
sessionStream.emit('data', { sessionStream.emit('data', {
type: 'finish', type: 'complete',
persistScheduled: false persistScheduled: false
}) })
break break
} }
case 'complete': { case 'complete': {
const completionPayload = event.result ?? accumulator.toModelMessage('assistant')
sessionStream.emit('data', {
type: 'complete',
result: completionPayload
})
try { try {
const persisted = await this.database.transaction(async (tx) => { const persisted = await this.database.transaction(async (tx) => {
const userMessage = await this.persistUserMessage(tx, session.id, req.content, newAgentSessionId) const userMessage = await this.persistUserMessage(tx, session.id, req.content, newAgentSessionId)
@ -278,9 +271,9 @@ export class SessionMessageService extends BaseService {
error: serializeError(persistError) error: serializeError(persistError)
}) })
} finally { } finally {
// Always emit a finish chunk at the end // Always emit a complete chunk at the end
sessionStream.emit('data', { sessionStream.emit('data', {
type: 'finish', type: 'complete',
persistScheduled: true persistScheduled: true
}) })
} }