From 5386716ebe6e40c18cef9c4baf10a6cab9a1dfba Mon Sep 17 00:00:00 2001 From: Vaayne Date: Fri, 19 Sep 2025 12:55:51 +0800 Subject: [PATCH] feat(session messages): enhance session message persistence with improved error handling and completion notifications --- .../agents/services/SessionMessageService.ts | 62 ++++++++++++++----- 1 file changed, 45 insertions(+), 17 deletions(-) diff --git a/src/main/services/agents/services/SessionMessageService.ts b/src/main/services/agents/services/SessionMessageService.ts index b3245edbc2..df9cf0a69f 100644 --- a/src/main/services/agents/services/SessionMessageService.ts +++ b/src/main/services/agents/services/SessionMessageService.ts @@ -233,30 +233,55 @@ export class SessionMessageService extends BaseService { sessionStream.emit('data', { type: 'error', - error: serializeError(underlyingError) + error: serializeError(underlyingError), + persistScheduled: false }) // Always emit a finish chunk at the end sessionStream.emit('data', { - type: 'finish' + type: 'finish', + persistScheduled: false }) break } case 'complete': { - // Then handle async persistence - this.database.transaction(async (tx) => { - await this.persistUserMessage(tx, session.id, req.content, newAgentSessionId) - await this.persistAssistantMessage({ - tx, - session, - accumulator, - agentSessionId: newAgentSessionId - }) - }) - // Always emit a finish chunk at the end + const completionPayload = event.result ?? accumulator.toModelMessage('assistant') + sessionStream.emit('data', { - type: 'finish' + type: 'complete', + result: completionPayload }) + + try { + const persisted = await this.database.transaction(async (tx) => { + const userMessage = await this.persistUserMessage(tx, session.id, req.content, newAgentSessionId) + const assistantMessage = await this.persistAssistantMessage({ + tx, + session, + accumulator, + agentSessionId: newAgentSessionId + }) + + return { userMessage, assistantMessage } + }) + + sessionStream.emit('data', { + type: 'persisted', + message: persisted.assistantMessage, + userMessage: persisted.userMessage + }) + } catch (persistError) { + sessionStream.emit('data', { + type: 'persist-error', + error: serializeError(persistError) + }) + } finally { + // Always emit a finish chunk at the end + sessionStream.emit('data', { + type: 'finish', + persistScheduled: true + }) + } break } @@ -330,11 +355,11 @@ export class SessionMessageService extends BaseService { session: GetAgentSessionResponse accumulator: ChunkAccumulator agentSessionId: string - }) { + }): Promise { if (!session?.id) { const missingSessionError = new Error('Missing session_id for persisted message') logger.error('error persisting session message', { error: missingSessionError }) - return + throw missingSessionError } const sessionId = session.id @@ -356,10 +381,13 @@ export class SessionMessageService extends BaseService { updated_at: now } - await tx.insert(sessionMessagesTable).values(insertData).returning() + const [saved] = await tx.insert(sessionMessagesTable).values(insertData).returning() logger.debug('Success Persisted session message') + + return this.deserializeSessionMessage(saved) as AgentSessionMessageEntity } catch (error) { logger.error('Failed to persist session message', { error }) + throw error } }