From c426876d0d009a9c7f79bb9ac0cf53b2e66e45a2 Mon Sep 17 00:00:00 2001 From: Vaayne Date: Fri, 19 Sep 2025 22:36:19 +0800 Subject: [PATCH] feat(transform): refactor message handling to unify user and assistant processing --- .../routes/agents/handlers/messages.ts | 8 +- .../agents/services/claudecode/transform.ts | 168 +++++++----------- 2 files changed, 65 insertions(+), 111 deletions(-) diff --git a/src/main/apiServer/routes/agents/handlers/messages.ts b/src/main/apiServer/routes/agents/handlers/messages.ts index 4364868dac..bd1ef5bc42 100644 --- a/src/main/apiServer/routes/agents/handlers/messages.ts +++ b/src/main/apiServer/routes/agents/handlers/messages.ts @@ -42,7 +42,6 @@ export const createMessage = async (req: Request, res: Response): Promise res.setHeader('Access-Control-Allow-Origin', '*') res.setHeader('Access-Control-Allow-Headers', 'Cache-Control') - const messageStream = sessionMessageService.createSessionMessage(session, messageData) // Track stream lifecycle so we keep the SSE connection open until persistence finishes @@ -66,6 +65,7 @@ export const createMessage = async (req: Request, res: Response): Promise responseEnded = true try { + res.write('data: {"type":"finish"}\n\n') res.write('data: [DONE]\n\n') } catch (writeError) { logger.error('Error writing final sentinel to SSE stream:', { error: writeError as Error }) @@ -113,7 +113,7 @@ export const createMessage = async (req: Request, res: Response): Promise case 'complete': { logger.info(`Streaming message completed for session: ${sessionId}`) - res.write(`data: ${JSON.stringify({ type: 'complete', result: event.result })}\n\n`) + // res.write(`data: ${JSON.stringify({ type: 'complete', result: event.result })}\n\n`) streamFinished = true awaitingPersistence = true @@ -123,7 +123,7 @@ export const createMessage = async (req: Request, res: Response): Promise case 'persisted': // Send persistence success event - res.write(`data: ${JSON.stringify(event)}\n\n`) + // res.write(`data: ${JSON.stringify(event)}\n\n`) logger.debug(`Session message persisted for session: ${sessionId}`, { messageId: event.message?.id }) persistenceResolved = true @@ -132,7 +132,7 @@ export const createMessage = async (req: Request, res: Response): Promise case 'persist-error': // Send persistence error event - res.write(`data: ${JSON.stringify(event)}\n\n`) + // res.write(`data: ${JSON.stringify(event)}\n\n`) logger.error(`Failed to persist session message for session: ${sessionId}:`, event.error) persistenceResolved = true diff --git a/src/main/services/agents/services/claudecode/transform.ts b/src/main/services/agents/services/claudecode/transform.ts index e3ea8ceef0..8061fd19a0 100644 --- a/src/main/services/agents/services/claudecode/transform.ts +++ b/src/main/services/agents/services/claudecode/transform.ts @@ -1,7 +1,6 @@ // This file is used to transform claude code json response to aisdk streaming format import { SDKMessage } from '@anthropic-ai/claude-code' -import { MessageParam } from '@anthropic-ai/sdk/resources' import { loggerService } from '@logger' import { ProviderMetadata, UIMessageChunk } from 'ai' import { v4 as uuidv4 } from 'uuid' @@ -13,42 +12,14 @@ const generateMessageId = (): string => { return `msg_${uuidv4().replace(/-/g, '')}` } -// Helper function to extract text content from Anthropic messages -const extractTextContent = (message: MessageParam): string => { - if (typeof message.content === 'string') { - return message.content - } - - if (Array.isArray(message.content)) { - return message.content - .filter((block) => block.type === 'text') - .map((block) => ('text' in block ? block.text : '')) - .join('') - } - - return '' -} - -// Helper function to extract tool calls from assistant messages -const extractToolCalls = (message: any): any[] => { - if (!message.content || !Array.isArray(message.content)) { - return [] - } - - return message.content.filter((block: any) => block.type === 'tool_use') -} - // Main transform function export function transformSDKMessageToUIChunk(sdkMessage: SDKMessage): UIMessageChunk[] { const chunks: UIMessageChunk[] = [] switch (sdkMessage.type) { case 'assistant': - chunks.push(...handleAssistantMessage(sdkMessage)) - break - case 'user': - chunks.push(...handleUserMessage(sdkMessage)) + chunks.push(...handleUserOrAssistantMessage(sdkMessage)) break case 'stream_event': @@ -79,89 +50,72 @@ function sdkMessageToProviderMetadata(message: SDKMessage): ProviderMetadata { return meta } -// Handle assistant messages -function handleAssistantMessage(message: Extract): UIMessageChunk[] { - const chunks: UIMessageChunk[] = [] - const messageId = message.uuid - - // Extract text content - const textContent = extractTextContent(message.message as MessageParam) - if (textContent) { - chunks.push( - { - type: 'text-start', - id: messageId - }, - { - type: 'text-delta', - id: messageId, - delta: textContent - }, - { - type: 'text-end', - id: messageId, - providerMetadata: { - rawMessage: sdkMessageToProviderMetadata(message) - } +function generateTextChunks(id: string, text: string, message: SDKMessage): UIMessageChunk[] { + return [ + { + type: 'text-start', + id + }, + { + type: 'text-delta', + id, + delta: text + }, + { + type: 'text-end', + id, + providerMetadata: { + rawMessage: sdkMessageToProviderMetadata(message) } - ) - } - - // Handle tool calls - const toolCalls = extractToolCalls(message.message) - for (const toolCall of toolCalls) { - chunks.push({ - type: 'tool-input-available', - toolCallId: toolCall.id, - toolName: toolCall.name, - input: toolCall.input, - providerExecuted: true - }) - } - - return chunks + } + ] } -// Handle user messages -function handleUserMessage(message: Extract): UIMessageChunk[] { +function handleUserOrAssistantMessage(message: Extract): UIMessageChunk[] { const chunks: UIMessageChunk[] = [] - const messageId = generateMessageId() + const messageId = message.uuid?.toString() || generateMessageId() - const textContent = extractTextContent(message.message) - if (textContent) { - chunks.push( - { - type: 'text-start', - id: messageId, - providerMetadata: { - anthropic: { - session_id: message.session_id, - role: 'user' - } - } - }, - { - type: 'text-delta', - id: messageId, - delta: textContent, - providerMetadata: { - anthropic: { - session_id: message.session_id, - role: 'user' - } - } - }, - { - type: 'text-end', - id: messageId, - providerMetadata: { - anthropic: { - session_id: message.session_id, - role: 'user' - } - } + // handle normal text content + if (typeof message.message.content === 'string') { + const textContent = message.message.content + if (textContent) { + chunks.push(...generateTextChunks(messageId, textContent, message)) + } + } else if (Array.isArray(message.message.content)) { + for (const block of message.message.content) { + switch (block.type) { + case 'text': + chunks.push(...generateTextChunks(messageId, block.text, message)) + break + case 'tool_use': + chunks.push({ + type: 'tool-input-available', + toolCallId: block.id, + toolName: block.name, + input: block.input, + providerExecuted: true, + providerMetadata: { + rawMessage: sdkMessageToProviderMetadata(message) + } + }) + break + case 'tool_result': + chunks.push({ + type: 'tool-output-available', + toolCallId: block.tool_use_id, + output: block.content, + providerExecuted: true, + dynamic: false, + preliminary: false + }) + break + default: + logger.warn('Unknown content block type in user/assistant message:', { + type: (block as any).type + }) + break } - ) + } } return chunks