From 49add96dc06dfb3688bfe4d67a75a76206075083 Mon Sep 17 00:00:00 2001 From: Vaayne Date: Thu, 18 Sep 2025 17:44:06 +0800 Subject: [PATCH 1/2] feat(database): add agent_session_id to session_messages table and update related services --- .../drizzle/0001_woozy_captain_flint.sql | 1 + .../database/drizzle/meta/0001_snapshot.json | 339 ++++++++++++++++++ resources/database/drizzle/meta/_journal.json | 7 + .../routes/agents/handlers/messages.ts | 6 +- .../agents/database/schema/messages.schema.ts | 1 + .../agents/services/SessionMessageService.ts | 177 +++------ .../agents/services/claudecode/index.ts | 18 +- src/renderer/src/types/agent.ts | 1 + 8 files changed, 415 insertions(+), 135 deletions(-) create mode 100644 resources/database/drizzle/0001_woozy_captain_flint.sql create mode 100644 resources/database/drizzle/meta/0001_snapshot.json diff --git a/resources/database/drizzle/0001_woozy_captain_flint.sql b/resources/database/drizzle/0001_woozy_captain_flint.sql new file mode 100644 index 0000000000..f80f483c72 --- /dev/null +++ b/resources/database/drizzle/0001_woozy_captain_flint.sql @@ -0,0 +1 @@ +ALTER TABLE `session_messages` ADD `agent_session_id` text DEFAULT ''; \ No newline at end of file diff --git a/resources/database/drizzle/meta/0001_snapshot.json b/resources/database/drizzle/meta/0001_snapshot.json new file mode 100644 index 0000000000..df409871c7 --- /dev/null +++ b/resources/database/drizzle/meta/0001_snapshot.json @@ -0,0 +1,339 @@ +{ + "version": "6", + "dialect": "sqlite", + "id": "dabab6db-a2cd-4e96-b06e-6cb87d445a87", + "prevId": "35efb412-0230-4767-9c76-7b7c4d40369f", + "tables": { + "agents": { + "name": "agents", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true, + "autoincrement": false + }, + "type": { + "name": "type", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "name": { + "name": "name", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "description": { + "name": "description", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "accessible_paths": { + "name": "accessible_paths", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "instructions": { + "name": "instructions", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "model": { + "name": "model", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "plan_model": { + "name": "plan_model", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "small_model": { + "name": "small_model", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "mcps": { + "name": "mcps", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "allowed_tools": { + "name": "allowed_tools", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "configuration": { + "name": "configuration", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "created_at": { + "name": "created_at", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "updated_at": { + "name": "updated_at", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + } + }, + "indexes": {}, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "checkConstraints": {} + }, + "session_messages": { + "name": "session_messages", + "columns": { + "id": { + "name": "id", + "type": "integer", + "primaryKey": true, + "notNull": true, + "autoincrement": true + }, + "session_id": { + "name": "session_id", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "role": { + "name": "role", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "content": { + "name": "content", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "agent_session_id": { + "name": "agent_session_id", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": "''" + }, + "metadata": { + "name": "metadata", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "created_at": { + "name": "created_at", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "updated_at": { + "name": "updated_at", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + } + }, + "indexes": {}, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "checkConstraints": {} + }, + "migrations": { + "name": "migrations", + "columns": { + "version": { + "name": "version", + "type": "integer", + "primaryKey": true, + "notNull": true, + "autoincrement": false + }, + "tag": { + "name": "tag", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "executed_at": { + "name": "executed_at", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false + } + }, + "indexes": {}, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "checkConstraints": {} + }, + "sessions": { + "name": "sessions", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true, + "autoincrement": false + }, + "agent_type": { + "name": "agent_type", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "agent_id": { + "name": "agent_id", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "name": { + "name": "name", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "description": { + "name": "description", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "accessible_paths": { + "name": "accessible_paths", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "instructions": { + "name": "instructions", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "model": { + "name": "model", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "plan_model": { + "name": "plan_model", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "small_model": { + "name": "small_model", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "mcps": { + "name": "mcps", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "allowed_tools": { + "name": "allowed_tools", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "configuration": { + "name": "configuration", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "created_at": { + "name": "created_at", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "updated_at": { + "name": "updated_at", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + } + }, + "indexes": {}, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "checkConstraints": {} + } + }, + "views": {}, + "enums": {}, + "_meta": { + "schemas": {}, + "tables": {}, + "columns": {} + }, + "internal": { + "indexes": {} + } +} \ No newline at end of file diff --git a/resources/database/drizzle/meta/_journal.json b/resources/database/drizzle/meta/_journal.json index 74624cded7..8648e01703 100644 --- a/resources/database/drizzle/meta/_journal.json +++ b/resources/database/drizzle/meta/_journal.json @@ -8,6 +8,13 @@ "when": 1758091173882, "tag": "0000_confused_wendigo", "breakpoints": true + }, + { + "idx": 1, + "version": "6", + "when": 1758187378775, + "tag": "0001_woozy_captain_flint", + "breakpoints": true } ] } diff --git a/src/main/apiServer/routes/agents/handlers/messages.ts b/src/main/apiServer/routes/agents/handlers/messages.ts index 72f286415a..0f731fe089 100644 --- a/src/main/apiServer/routes/agents/handlers/messages.ts +++ b/src/main/apiServer/routes/agents/handlers/messages.ts @@ -35,9 +35,6 @@ export const createMessage = async (req: Request, res: Response): Promise logger.info(`Creating streaming message for session: ${sessionId}`) logger.debug('Streaming message data:', messageData) - // Step 1: Save user message first - const userMessage = await sessionMessageService.saveUserMessage(sessionId, messageData.content) - // Set SSE headers res.setHeader('Content-Type', 'text/event-stream') res.setHeader('Cache-Control', 'no-cache') @@ -45,7 +42,8 @@ export const createMessage = async (req: Request, res: Response): Promise res.setHeader('Access-Control-Allow-Origin', '*') res.setHeader('Access-Control-Allow-Headers', 'Cache-Control') - const messageStream = sessionMessageService.createSessionMessage(session, messageData, userMessage.id) + + const messageStream = sessionMessageService.createSessionMessage(session, messageData) // Track stream lifecycle so we keep the SSE connection open until persistence finishes let responseEnded = false diff --git a/src/main/services/agents/database/schema/messages.schema.ts b/src/main/services/agents/database/schema/messages.schema.ts index d2754d6ec4..d14c755014 100644 --- a/src/main/services/agents/database/schema/messages.schema.ts +++ b/src/main/services/agents/database/schema/messages.schema.ts @@ -8,6 +8,7 @@ export const sessionMessagesTable = sqliteTable('session_messages', { session_id: text('session_id').notNull(), role: text('role').notNull(), // 'user', 'agent', 'system', 'tool' content: text('content').notNull(), // JSON structured data + agent_session_id: text('agent_session_id').default(''), metadata: text('metadata'), // JSON metadata (optional) created_at: text('created_at').notNull(), updated_at: text('updated_at').notNull() diff --git a/src/main/services/agents/services/SessionMessageService.ts b/src/main/services/agents/services/SessionMessageService.ts index acd7cb1ab9..81cb330024 100644 --- a/src/main/services/agents/services/SessionMessageService.ts +++ b/src/main/services/agents/services/SessionMessageService.ts @@ -33,17 +33,6 @@ export async function chunksToModelMessages( return convertToModelMessages(uiMessages) // -> ModelMessage[] } -// Utility function to normalize content to ModelMessage -function normalizeModelMessage(content: string | ModelMessage): ModelMessage { - if (typeof content === 'string') { - return { - role: 'user', - content: content - } - } - return content -} - // Ensure errors emitted through SSE are serializable function serializeError(error: unknown): { message: string; name?: string; stack?: string } { if (error instanceof Error) { @@ -63,52 +52,15 @@ function serializeError(error: unknown): { message: string; name?: string; stack } } -// Interface for persistence context -interface PersistContext { - session: GetAgentSessionResponse - accumulator: ChunkAccumulator - userMessageId: number -} - // Chunk accumulator class to collect and reconstruct streaming data class ChunkAccumulator { private streamedChunks: UIMessageChunk[] = [] - private rawAgentMessages: any[] = [] - private agentResult: any = null private agentType: string = 'unknown' - private uniqueIds: Set = new Set() addChunk(chunk: UIMessageChunk): void { this.streamedChunks.push(chunk) } - addRawMessage(message: any): void { - if (message.uuid && this.uniqueIds.has(message.uuid)) { - // Duplicate message based on uuid; skip adding - return - } - if (message.uuid) { - this.uniqueIds.add(message.uuid) - } - this.rawAgentMessages.push(message) - } - - setAgentResult(result: any): void { - this.agentResult = result - if (result?.agentType) { - this.agentType = result.agentType - } - } - - buildStructuredContent() { - return { - aiSDKChunks: this.streamedChunks, - rawAgentMessages: this.rawAgentMessages, - agentResult: this.agentResult, - agentType: this.agentType - } - } - // Create a ReadableStream from accumulated chunks createChunkStream(): ReadableStream { const chunks = [...this.streamedChunks] @@ -162,14 +114,6 @@ class ChunkAccumulator { return message as ModelMessage } - getChunkCount(): number { - return this.streamedChunks.length - } - - getRawMessageCount(): number { - return this.rawAgentMessages.length - } - getAgentType(): string { return this.agentType } @@ -235,68 +179,65 @@ export class SessionMessageService extends BaseService { return { messages, total } } - async saveUserMessage(sessionId: string, content: ModelMessage | string): Promise { + async saveUserMessage( + tx: any, + sessionId: string, + prompt: string, + agentSessionId: string + ): Promise { this.ensureInitialized() const now = new Date().toISOString() - const userContent: ModelMessage = normalizeModelMessage(content) - const insertData: InsertSessionMessageRow = { session_id: sessionId, role: 'user', - content: JSON.stringify(userContent), - metadata: JSON.stringify({ - timestamp: now, - source: 'api' - }), + content: prompt, + agent_session_id: agentSessionId, created_at: now, updated_at: now } - const [saved] = await this.database.insert(sessionMessagesTable).values(insertData).returning() + const [saved] = await tx.insert(sessionMessagesTable).values(insertData).returning() return this.deserializeSessionMessage(saved) as AgentSessionMessageEntity } - createSessionMessage( - session: GetAgentSessionResponse, - messageData: CreateSessionMessageRequest, - userMessageId: number - ): EventEmitter { + createSessionMessage(session: GetAgentSessionResponse, messageData: CreateSessionMessageRequest): EventEmitter { this.ensureInitialized() // Create a new EventEmitter to manage the session message lifecycle const sessionStream = new EventEmitter() // No parent validation needed, start immediately - this.startSessionMessageStream(session, messageData, sessionStream, userMessageId) + this.startSessionMessageStream(session, messageData, sessionStream) return sessionStream } - private startSessionMessageStream( + private async startSessionMessageStream( session: GetAgentSessionResponse, req: CreateSessionMessageRequest, - sessionStream: EventEmitter, - userMessageId: number - ): void { + sessionStream: EventEmitter + ): Promise { const previousMessages = session.messages || [] - let session_id: string = '' + let agentSessionId: string = '' if (previousMessages.length > 0) { - session_id = previousMessages[0].session_id + agentSessionId = previousMessages[previousMessages.length - 1].agent_session_id } - logger.debug('Session Message stream message data:', { message: req, session_id }) + logger.debug('Session Message stream message data:', { message: req, session_id: agentSessionId }) if (session.agent_type !== 'claude-code') { + // TODO: Implement support for other agent types logger.error('Unsupported agent type for streaming:', { agent_type: session.agent_type }) throw new Error('Unsupported agent type for streaming') } + let newAgentSessionId = '' // Create the streaming agent invocation (using invokeStream for streaming) - const claudeStream = this.cc.invoke(req.content, session.accessible_paths[0], session_id, { - permissionMode: (session.configuration?.permissionMode as PermissionMode) || 'default', - maxTurns: (session.configuration?.maxTurns as number) || 10 + const claudeStream = this.cc.invoke(req.content, session.accessible_paths[0], agentSessionId, { + permissionMode: session.configuration?.permission_mode, + maxTurns: session.configuration?.max_turns }) // Use chunk accumulator to manage streaming data @@ -310,12 +251,10 @@ export class SessionMessageService extends BaseService { // Forward UIMessageChunk directly and collect raw agent messages if (event.chunk) { const chunk = event.chunk as UIMessageChunk - accumulator.addChunk(chunk) - - // Collect raw agent message if available (agent-agnostic) - if (event.rawAgentMessage) { - accumulator.addRawMessage(event.rawAgentMessage) + if (chunk.type === 'start' && chunk.messageId) { + newAgentSessionId = chunk.messageId } + accumulator.addChunk(chunk) sessionStream.emit('data', { type: 'chunk', @@ -328,27 +267,10 @@ export class SessionMessageService extends BaseService { case 'error': { const underlyingError = event.error || (event.data?.stderr ? new Error(event.data.stderr) : undefined) - const persistScheduled = accumulator.getChunkCount() > 0 - - if (persistScheduled) { - // Try to save partial state with error metadata when possible - accumulator.setAgentResult({ - error: serializeError(underlyingError), - agentType: 'claude-code', - incomplete: true - }) - - void this.persistSessionMessageAsync({ - session, - accumulator, - userMessageId - }) - } sessionStream.emit('data', { type: 'error', - error: serializeError(underlyingError), - persistScheduled + error: serializeError(underlyingError) }) // Always emit a finish chunk at the end sessionStream.emit('data', { @@ -358,19 +280,15 @@ export class SessionMessageService extends BaseService { } case 'complete': { - // Extract additional raw agent messages from agentResult if available - if (event.agentResult?.rawSDKMessages) { - event.agentResult.rawSDKMessages.forEach((msg: any) => accumulator.addRawMessage(msg)) - } - - // Set the agent result in the accumulator - accumulator.setAgentResult(event.agentResult) - // Then handle async persistence - void this.persistSessionMessageAsync({ - session, - accumulator, - userMessageId + this.database.transaction(async (tx) => { + await this.saveUserMessage(tx, session.id, req.content, newAgentSessionId) + await this.persistSessionMessageAsync({ + tx, + session, + accumulator, + agentSessionId: newAgentSessionId + }) }) // Always emit a finish chunk at the end sessionStream.emit('data', { @@ -395,7 +313,17 @@ export class SessionMessageService extends BaseService { }) } - private async persistSessionMessageAsync({ session, accumulator, userMessageId }: PersistContext) { + private async persistSessionMessageAsync({ + tx, + session, + accumulator, + agentSessionId + }: { + tx: any + session: GetAgentSessionResponse + accumulator: ChunkAccumulator + agentSessionId: string + }) { if (!session?.id) { const missingSessionError = new Error('Missing session_id for persisted message') logger.error('error persisting session message', { error: missingSessionError }) @@ -404,7 +332,6 @@ export class SessionMessageService extends BaseService { const sessionId = session.id const now = new Date().toISOString() - const structured = accumulator.buildStructuredContent() try { // Use chunksToModelMessages to convert chunks to ModelMessages @@ -413,24 +340,16 @@ export class SessionMessageService extends BaseService { const modelMessage = modelMessages.length > 0 ? modelMessages[modelMessages.length - 1] : accumulator.toModelMessage('assistant') - const metadata = { - userMessageId, - chunkCount: accumulator.getChunkCount(), - rawMessageCount: accumulator.getRawMessageCount(), - agentType: accumulator.getAgentType(), - completedAt: now - } - const insertData: InsertSessionMessageRow = { session_id: sessionId, role: 'assistant', - content: JSON.stringify({ modelMessage, ...structured }), - metadata: JSON.stringify(metadata), + content: JSON.stringify(modelMessage), + agent_session_id: agentSessionId, created_at: now, updated_at: now } - await this.database.insert(sessionMessagesTable).values(insertData).returning() + await tx.insert(sessionMessagesTable).values(insertData).returning() logger.debug('Success Persisted session message') } catch (error) { logger.error('Failed to persist session message', { error }) diff --git a/src/main/services/agents/services/claudecode/index.ts b/src/main/services/agents/services/claudecode/index.ts index 02362a85a6..e1a7971fdb 100644 --- a/src/main/services/agents/services/claudecode/index.ts +++ b/src/main/services/agents/services/claudecode/index.ts @@ -53,7 +53,7 @@ class ClaudeCodeService implements AgentServiceInterface { logger.info('Starting Claude Code SDK query', { prompt, - options: { cwd, maxTurns: options.maxTurns, permissionMode: options.permissionMode } + options }) // Start async processing @@ -62,6 +62,20 @@ class ClaudeCodeService implements AgentServiceInterface { return aiStream } + private async *userMessages(prompt: string) { + { + yield { + type: 'user' as const, + parent_tool_use_id: null, + session_id: '', + message: { + role: 'user' as const, + content: prompt + } + } + } + } + /** * Process SDK query and emit stream events */ @@ -73,7 +87,7 @@ class ClaudeCodeService implements AgentServiceInterface { try { // Process streaming responses using SDK query for await (const message of query({ - prompt, + prompt: this.userMessages(prompt), options })) { if (hasCompleted) break diff --git a/src/renderer/src/types/agent.ts b/src/renderer/src/types/agent.ts index d29e334953..51671ab27b 100644 --- a/src/renderer/src/types/agent.ts +++ b/src/renderer/src/types/agent.ts @@ -97,6 +97,7 @@ export const AgentSessionMessageEntitySchema = z.object({ // manual defined. may not synced with ai sdk definition role: z.enum(['assistant', 'user', 'system', 'tool']), // 'assistant' | 'user' | 'system' | 'tool' content: modelMessageSchema, + agent_session_id: z.string(), // agent session id, use to resume agent session metadata: z.record(z.string(), z.any()).optional(), // Additional metadata (optional) created_at: z.iso.datetime(), // ISO timestamp updated_at: z.iso.datetime() // ISO timestamp From 984c28d4bebe10d56cf13d5f94a7f5a83fcd0e79 Mon Sep 17 00:00:00 2001 From: Vaayne Date: Thu, 18 Sep 2025 17:51:45 +0800 Subject: [PATCH 2/2] refactor(SessionMessageService): remove unused PermissionMode import --- src/main/services/agents/services/SessionMessageService.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/services/agents/services/SessionMessageService.ts b/src/main/services/agents/services/SessionMessageService.ts index 81cb330024..362ab4539d 100644 --- a/src/main/services/agents/services/SessionMessageService.ts +++ b/src/main/services/agents/services/SessionMessageService.ts @@ -1,6 +1,5 @@ import { EventEmitter } from 'node:events' -import { PermissionMode } from '@anthropic-ai/claude-code' import { loggerService } from '@logger' import type { AgentSessionMessageEntity,