feat(session messages): enhance session message persistence with improved error handling and completion notifications

This commit is contained in:
Vaayne 2025-09-19 12:55:51 +08:00
parent da3cd62486
commit 5386716ebe

View File

@ -233,30 +233,55 @@ export class SessionMessageService extends BaseService {
sessionStream.emit('data', { sessionStream.emit('data', {
type: 'error', type: 'error',
error: serializeError(underlyingError) error: serializeError(underlyingError),
persistScheduled: false
}) })
// Always emit a finish chunk at the end // Always emit a finish chunk at the end
sessionStream.emit('data', { sessionStream.emit('data', {
type: 'finish' type: 'finish',
persistScheduled: false
}) })
break break
} }
case 'complete': { case 'complete': {
// Then handle async persistence const completionPayload = event.result ?? accumulator.toModelMessage('assistant')
this.database.transaction(async (tx) => {
await this.persistUserMessage(tx, session.id, req.content, newAgentSessionId)
await this.persistAssistantMessage({
tx,
session,
accumulator,
agentSessionId: newAgentSessionId
})
})
// Always emit a finish chunk at the end
sessionStream.emit('data', { sessionStream.emit('data', {
type: 'finish' type: 'complete',
result: completionPayload
}) })
try {
const persisted = await this.database.transaction(async (tx) => {
const userMessage = await this.persistUserMessage(tx, session.id, req.content, newAgentSessionId)
const assistantMessage = await this.persistAssistantMessage({
tx,
session,
accumulator,
agentSessionId: newAgentSessionId
})
return { userMessage, assistantMessage }
})
sessionStream.emit('data', {
type: 'persisted',
message: persisted.assistantMessage,
userMessage: persisted.userMessage
})
} catch (persistError) {
sessionStream.emit('data', {
type: 'persist-error',
error: serializeError(persistError)
})
} finally {
// Always emit a finish chunk at the end
sessionStream.emit('data', {
type: 'finish',
persistScheduled: true
})
}
break break
} }
@ -330,11 +355,11 @@ export class SessionMessageService extends BaseService {
session: GetAgentSessionResponse session: GetAgentSessionResponse
accumulator: ChunkAccumulator accumulator: ChunkAccumulator
agentSessionId: string agentSessionId: string
}) { }): Promise<AgentSessionMessageEntity> {
if (!session?.id) { if (!session?.id) {
const missingSessionError = new Error('Missing session_id for persisted message') const missingSessionError = new Error('Missing session_id for persisted message')
logger.error('error persisting session message', { error: missingSessionError }) logger.error('error persisting session message', { error: missingSessionError })
return throw missingSessionError
} }
const sessionId = session.id const sessionId = session.id
@ -356,10 +381,13 @@ export class SessionMessageService extends BaseService {
updated_at: now updated_at: now
} }
await tx.insert(sessionMessagesTable).values(insertData).returning() const [saved] = await tx.insert(sessionMessagesTable).values(insertData).returning()
logger.debug('Success Persisted session message') logger.debug('Success Persisted session message')
return this.deserializeSessionMessage(saved) as AgentSessionMessageEntity
} catch (error) { } catch (error) {
logger.error('Failed to persist session message', { error }) logger.error('Failed to persist session message', { error })
throw error
} }
} }