diff --git a/packages/shared/IpcChannel.ts b/packages/shared/IpcChannel.ts index a2ef66284c..ecfc9b967a 100644 --- a/packages/shared/IpcChannel.ts +++ b/packages/shared/IpcChannel.ts @@ -89,6 +89,9 @@ export enum IpcChannel { // Python Python_Execute = 'python:execute', + // agent messages + AgentMessage_PersistExchange = 'agent-message:persist-exchange', + //copilot Copilot_GetAuthMessage = 'copilot:get-auth-message', Copilot_GetCopilotToken = 'copilot:get-copilot-token', diff --git a/src/main/apiServer/routes/agents/handlers/messages.ts b/src/main/apiServer/routes/agents/handlers/messages.ts index 4ae62dcbda..8f05bf8c0c 100644 --- a/src/main/apiServer/routes/agents/handlers/messages.ts +++ b/src/main/apiServer/routes/agents/handlers/messages.ts @@ -1,5 +1,4 @@ import { loggerService } from '@logger' -import { AgentStreamEvent } from '@main/services/agents/interfaces/AgentStreamInterface' import { Request, Response } from 'express' import { agentService, sessionMessageService, sessionService } from '../../../../services/agents' @@ -44,7 +43,12 @@ export const createMessage = async (req: Request, res: Response): Promise res.setHeader('Access-Control-Allow-Headers', 'Cache-Control') const abortController = new AbortController() - const messageStream = sessionMessageService.createSessionMessage(session, messageData, abortController) + const { stream, completion } = await sessionMessageService.createSessionMessage( + session, + messageData, + abortController + ) + const reader = stream.getReader() // Track stream lifecycle so we keep the SSE connection open until persistence finishes let responseEnded = false @@ -61,7 +65,7 @@ export const createMessage = async (req: Request, res: Response): Promise responseEnded = true try { - res.write('data: {"type":"finish"}\n\n') + // res.write('data: {"type":"finish"}\n\n') res.write('data: [DONE]\n\n') } catch (writeError) { logger.error('Error writing final sentinel to SSE stream:', { error: writeError as Error }) @@ -92,93 +96,78 @@ export const createMessage = async (req: Request, res: Response): Promise if (responseEnded) return logger.info(`Client disconnected from streaming message for session: ${sessionId}`) responseEnded = true - messageStream.removeAllListeners() abortController.abort('Client disconnected') + reader.cancel('Client disconnected').catch(() => {}) } req.on('close', handleDisconnect) req.on('aborted', handleDisconnect) res.on('close', handleDisconnect) - // Handle stream events - messageStream.on('data', (event: AgentStreamEvent) => { - if (responseEnded) return - + const pumpStream = async () => { try { - switch (event.type) { - case 'chunk': - // Format UIMessageChunk as SSE event following AI SDK protocol - res.write(`data: ${JSON.stringify(event.chunk)}\n\n`) + while (!responseEnded) { + const { done, value } = await reader.read() + if (done) { break + } - case 'error': { - // Send error as AI SDK error chunk - const errorChunk = { + res.write(`data: ${JSON.stringify(value)}\n\n`) + } + + streamFinished = true + finalizeResponse() + } catch (error) { + if (responseEnded) return + logger.error('Error reading agent stream:', { error }) + try { + res.write( + `data: ${JSON.stringify({ type: 'error', - errorText: event.error?.message || 'Stream processing error' - } - res.write(`data: ${JSON.stringify(errorChunk)}\n\n`) - logger.error(`Streaming message error for session: ${sessionId}:`, event.error) - - streamFinished = true - finalizeResponse() - break - } - - case 'complete': { - logger.info(`Streaming message completed for session: ${sessionId}`) - // res.write(`data: ${JSON.stringify({ type: 'complete', result: event.result })}\n\n`) - - streamFinished = true - finalizeResponse() - break - } - - case 'cancelled': { - logger.info(`Streaming message cancelled for session: ${sessionId}`) - // res.write(`data: ${JSON.stringify({ type: 'cancelled' })}\n\n`) - streamFinished = true - finalizeResponse() - break - } - - default: - // Handle other event types as generic data - logger.info(`Streaming message event for session: ${sessionId}:`, { event }) - // res.write(`data: ${JSON.stringify(event)}\n\n`) - break - } - } catch (writeError) { - logger.error('Error writing to SSE stream:', { error: writeError }) - if (!responseEnded) { - responseEnded = true - res.end() + error: { + message: (error as Error).message || 'Stream processing error', + type: 'stream_error', + code: 'stream_processing_failed' + } + })}\n\n` + ) + } catch (writeError) { + logger.error('Error writing stream error to SSE:', { error: writeError }) } + responseEnded = true + res.end() } + } + + pumpStream().catch((error) => { + logger.error('Pump stream failure:', { error }) }) - // Handle stream errors - messageStream.on('error', (error: Error) => { - if (responseEnded) return - - logger.error(`Stream error for session: ${sessionId}:`, { error }) - try { - res.write( - `data: ${JSON.stringify({ - type: 'error', - error: { - message: error.message || 'Stream processing error', - type: 'stream_error', - code: 'stream_processing_failed' - } - })}\n\n` - ) - } catch (writeError) { - logger.error('Error writing error to SSE stream:', { error: writeError }) - } - responseEnded = true - res.end() - }) + completion + .then(() => { + streamFinished = true + finalizeResponse() + }) + .catch((error) => { + if (responseEnded) return + logger.error(`Streaming message error for session: ${sessionId}:`, error) + try { + res.write( + `data: ${JSON.stringify({ + type: 'error', + error: { + message: (error as { message?: string })?.message || 'Stream processing error', + type: 'stream_error', + code: 'stream_processing_failed' + } + })}\n\n` + ) + } catch (writeError) { + logger.error('Error writing completion error to SSE stream:', { error: writeError }) + } + responseEnded = true + res.end() + }) // Set a timeout to prevent hanging indefinitely const timeout = setTimeout( @@ -199,6 +188,8 @@ export const createMessage = async (req: Request, res: Response): Promise } catch (writeError) { logger.error('Error writing timeout to SSE stream:', { error: writeError }) } + abortController.abort('stream timeout') + reader.cancel('stream timeout').catch(() => {}) responseEnded = true res.end() } diff --git a/src/main/apiServer/utils/index.ts b/src/main/apiServer/utils/index.ts index bd2a9aa927..6a4a9a8ffe 100644 --- a/src/main/apiServer/utils/index.ts +++ b/src/main/apiServer/utils/index.ts @@ -190,13 +190,15 @@ export async function validateModelId( export function transformModelToOpenAI(model: Model, providers: Provider[]): ApiModel { const provider = providers.find((p) => p.id === model.provider) + const providerDisplayName = provider?.name return { id: `${model.provider}:${model.id}`, object: 'model', name: model.name, created: Math.floor(Date.now() / 1000), - owned_by: model.owned_by || model.provider, + owned_by: model.owned_by || providerDisplayName || model.provider, provider: model.provider, + provider_name: providerDisplayName, provider_type: provider?.type, provider_model_id: model.id } diff --git a/src/main/ipc.ts b/src/main/ipc.ts index 9805b7c6e6..30a02f8bd7 100644 --- a/src/main/ipc.ts +++ b/src/main/ipc.ts @@ -16,6 +16,7 @@ import checkDiskSpace from 'check-disk-space' import { BrowserWindow, dialog, ipcMain, ProxyConfig, session, shell, systemPreferences, webContents } from 'electron' import fontList from 'font-list' +import { agentMessageRepository } from './services/agents/database' import { apiServerService } from './services/ApiServerService' import appService from './services/AppService' import AppUpdater from './services/AppUpdater' @@ -199,6 +200,15 @@ export function registerIpc(mainWindow: BrowserWindow, app: Electron.App) { } }) + ipcMain.handle(IpcChannel.AgentMessage_PersistExchange, async (_event, payload) => { + try { + return await agentMessageRepository.persistExchange(payload) + } catch (error) { + logger.error('Failed to persist agent session messages', error as Error) + throw error + } + }) + //only for mac if (isMac) { ipcMain.handle(IpcChannel.App_MacIsProcessTrusted, (): boolean => { diff --git a/src/main/services/agents/TODO.md b/src/main/services/agents/TODO.md new file mode 100644 index 0000000000..ecd6402327 --- /dev/null +++ b/src/main/services/agents/TODO.md @@ -0,0 +1,35 @@ +# Agents Service Refactor TODO (interface-level) + +- [x] **SessionMessageService.createSessionMessage** + - Replace the current `EventEmitter` that emits `UIMessageChunk` with a readable stream of `TextStreamPart` objects (same shape produced by `/api/messages` in `messageThunk`). + - Update `startSessionMessageStream` to call a new adapter (`claudeToTextStreamPart(chunk)`) that maps Claude Code chunk payloads to `{ type: 'text-delta' | 'tool-call' | ... }` parts used by `AiSdkToChunkAdapter`. + - Add a secondary return value (promise) resolving to the persisted `ModelMessage[]` once streaming completes, so the renderer thunk can await save confirmation. + +- [x] **main -> renderer transport** + - Update the existing SSE handler in `src/main/apiServer/routes/agents/handlers/messages.ts` (e.g., `createMessage`) to forward the new `TextStreamPart` stream over HTTP, preserving the current agent endpoint contract. + - Keep abort handling compatible with the current HTTP server (honor `AbortController` on the request to terminate the stream). + +- [x] **renderer thunk integration** + - Introduce a thin IPC contract (e.g., `AgentMessagePersistence`) surfaced by `src/main/services/agents/database/index.ts` so the renderer thunk can request session-message writes without going through `SessionMessageService`. + - Define explicit entry points on the main side: + - `persistUserMessage({ sessionId, agentSessionId, payload, createdAt?, metadata? })` + - `persistAssistantMessage({ sessionId, agentSessionId, payload, createdAt?, metadata? })` + - `persistExchange({ sessionId, agentSessionId, user, assistant })` which runs the above in a single transaction and returns both records. + - Export these helpers via an `agentMessageRepository` object so both IPC handlers and legacy services share the same persistence path. + - Normalize persisted payloads to `{ message, blocks }` matching the renderer schema instead of AI-SDK `ModelMessage` chunks. + - Extend `messageThunk.sendMessage` to call the agent transport when the topic corresponds to a session, pipe chunks through `createStreamProcessor` + `AiSdkToChunkAdapter`, and invoke the new persistence interface once streaming resolves. + - Replace `useSession().createSessionMessage` optimistic insert with dispatching the thunk so Redux/Dexie persistence happens via the shared save helpers. + +- [x] **persistence alignment** + - Remove `persistUserMessage` / `persistAssistantMessage` calls from `SessionMessageService`; instead expose a `SessionMessageRepository` in `main` that the thunk invokes via existing Dexie helpers. + - On renderer side, persist agent exchanges via IPC after streaming completes, storing `{ message, blocks }` payloads while skipping Dexie writes for agent sessions so the single source of truth remains `session_messages`. + +- [x] **Blocks renderer** + - Replace `AgentSessionMessages` simple `
` render with the shared `Blocks` component (`src/renderer/src/pages/home/Messages/Blocks`) wired to the Redux store. + - Adjust `useSession` to only fetch metadata (e.g., session info) and rely on store selectors for message list. + +- [x] **API client clean-up** + - Remove `AgentApiClient.createMessage` direct POST once thunk is in place; calls should go through renderer thunk -> stream -> final persistence. + +- [ ] **Regression tests** + - Add integration test to assert agent sessions render incremental text the same way as standard assistant messages. diff --git a/src/main/services/agents/database/index.ts b/src/main/services/agents/database/index.ts index 1cc65a19c3..61b3a9ffcc 100644 --- a/src/main/services/agents/database/index.ts +++ b/src/main/services/agents/database/index.ts @@ -9,3 +9,6 @@ // Drizzle ORM schemas export * from './schema' + +// Repository helpers +export * from './sessionMessageRepository' diff --git a/src/main/services/agents/database/sessionMessageRepository.ts b/src/main/services/agents/database/sessionMessageRepository.ts new file mode 100644 index 0000000000..d6a767cf09 --- /dev/null +++ b/src/main/services/agents/database/sessionMessageRepository.ts @@ -0,0 +1,181 @@ +import { loggerService } from '@logger' +import type { + AgentMessageAssistantPersistPayload, + AgentMessagePersistExchangePayload, + AgentMessagePersistExchangeResult, + AgentMessageUserPersistPayload, + AgentPersistedMessage, + AgentSessionMessageEntity +} from '@types' + +import { BaseService } from '../BaseService' +import type { InsertSessionMessageRow } from './schema' +import { sessionMessagesTable } from './schema' + +const logger = loggerService.withContext('AgentMessageRepository') + +type TxClient = any + +export type PersistUserMessageParams = AgentMessageUserPersistPayload & { + sessionId: string + agentSessionId?: string + tx?: TxClient +} + +export type PersistAssistantMessageParams = AgentMessageAssistantPersistPayload & { + sessionId: string + agentSessionId: string + tx?: TxClient +} + +type PersistExchangeParams = AgentMessagePersistExchangePayload & { + tx?: TxClient +} + +type PersistExchangeResult = AgentMessagePersistExchangeResult + +class AgentMessageRepository extends BaseService { + private static instance: AgentMessageRepository | null = null + + static getInstance(): AgentMessageRepository { + if (!AgentMessageRepository.instance) { + AgentMessageRepository.instance = new AgentMessageRepository() + } + + return AgentMessageRepository.instance + } + + private serializeMessage(payload: AgentPersistedMessage): string { + return JSON.stringify(payload) + } + + private serializeMetadata(metadata?: Record): string | undefined { + if (!metadata) { + return undefined + } + + try { + return JSON.stringify(metadata) + } catch (error) { + logger.warn('Failed to serialize session message metadata', error as Error) + return undefined + } + } + + private deserialize(row: any): AgentSessionMessageEntity { + if (!row) return row + + const deserialized = { ...row } + + if (typeof deserialized.content === 'string') { + try { + deserialized.content = JSON.parse(deserialized.content) + } catch (error) { + logger.warn('Failed to parse session message content JSON', error as Error) + } + } + + if (typeof deserialized.metadata === 'string') { + try { + deserialized.metadata = JSON.parse(deserialized.metadata) + } catch (error) { + logger.warn('Failed to parse session message metadata JSON', error as Error) + } + } + + return deserialized + } + + private getWriter(tx?: TxClient): TxClient { + return tx ?? this.database + } + + async persistUserMessage(params: PersistUserMessageParams): Promise { + await AgentMessageRepository.initialize() + this.ensureInitialized() + + const writer = this.getWriter(params.tx) + const now = params.createdAt ?? params.payload.message.createdAt ?? new Date().toISOString() + + const insertData: InsertSessionMessageRow = { + session_id: params.sessionId, + role: params.payload.message.role, + content: this.serializeMessage(params.payload), + agent_session_id: params.agentSessionId ?? '', + metadata: this.serializeMetadata(params.metadata), + created_at: now, + updated_at: now + } + + const [saved] = await writer.insert(sessionMessagesTable).values(insertData).returning() + + return this.deserialize(saved) + } + + async persistAssistantMessage(params: PersistAssistantMessageParams): Promise { + await AgentMessageRepository.initialize() + this.ensureInitialized() + + const writer = this.getWriter(params.tx) + const now = params.createdAt ?? params.payload.message.createdAt ?? new Date().toISOString() + + const insertData: InsertSessionMessageRow = { + session_id: params.sessionId, + role: params.payload.message.role, + content: this.serializeMessage(params.payload), + agent_session_id: params.agentSessionId, + metadata: this.serializeMetadata(params.metadata), + created_at: now, + updated_at: now + } + + const [saved] = await writer.insert(sessionMessagesTable).values(insertData).returning() + + return this.deserialize(saved) + } + + async persistExchange(params: PersistExchangeParams): Promise { + await AgentMessageRepository.initialize() + this.ensureInitialized() + + const { sessionId, agentSessionId, user, assistant } = params + + const result = await this.database.transaction(async (tx) => { + const exchangeResult: PersistExchangeResult = {} + + if (user?.payload) { + if (!user.payload.message?.role) { + throw new Error('User message payload missing role') + } + exchangeResult.userMessage = await this.persistUserMessage({ + sessionId, + agentSessionId, + payload: user.payload, + metadata: user.metadata, + createdAt: user.createdAt, + tx + }) + } + + if (assistant?.payload) { + if (!assistant.payload.message?.role) { + throw new Error('Assistant message payload missing role') + } + exchangeResult.assistantMessage = await this.persistAssistantMessage({ + sessionId, + agentSessionId, + payload: assistant.payload, + metadata: assistant.metadata, + createdAt: assistant.createdAt, + tx + }) + } + + return exchangeResult + }) + + return result + } +} + +export const agentMessageRepository = AgentMessageRepository.getInstance() diff --git a/src/main/services/agents/interfaces/AgentStreamInterface.ts b/src/main/services/agents/interfaces/AgentStreamInterface.ts index 245224248a..1b9c6f136d 100644 --- a/src/main/services/agents/interfaces/AgentStreamInterface.ts +++ b/src/main/services/agents/interfaces/AgentStreamInterface.ts @@ -4,12 +4,12 @@ import { EventEmitter } from 'node:events' import { GetAgentSessionResponse } from '@types' -import { UIMessageChunk } from 'ai' +import type { TextStreamPart } from 'ai' // Generic agent stream event that works with any agent type export interface AgentStreamEvent { type: 'chunk' | 'error' | 'complete' | 'cancelled' - chunk?: UIMessageChunk // Standard AI SDK chunk for UI consumption + chunk?: TextStreamPart // Standard AI SDK chunk for UI consumption error?: Error } diff --git a/src/main/services/agents/services/SessionMessageService.ts b/src/main/services/agents/services/SessionMessageService.ts index 77f4133872..dd0d0ebbbe 100644 --- a/src/main/services/agents/services/SessionMessageService.ts +++ b/src/main/services/agents/services/SessionMessageService.ts @@ -1,5 +1,3 @@ -import { EventEmitter } from 'node:events' - import { loggerService } from '@logger' import type { AgentSessionMessageEntity, @@ -7,29 +5,22 @@ import type { GetAgentSessionResponse, ListOptions } from '@types' -import { ModelMessage, UIMessage, UIMessageChunk } from 'ai' -import { convertToModelMessages, readUIMessageStream } from 'ai' +import { ModelMessage, TextStreamPart } from 'ai' import { desc, eq } from 'drizzle-orm' import { BaseService } from '../BaseService' -import { InsertSessionMessageRow, sessionMessagesTable } from '../database/schema' +import { sessionMessagesTable } from '../database/schema' +import { AgentStreamEvent } from '../interfaces/AgentStreamInterface' import ClaudeCodeService from './claudecode' const logger = loggerService.withContext('SessionMessageService') -// Collapse a UIMessageChunk stream into a final UIMessage, then convert to ModelMessage[] -export async function chunksToModelMessages( - chunkStream: ReadableStream, - priorUiHistory: UIMessage[] = [] -): Promise { - let latest: UIMessage | undefined - - for await (const uiMsg of readUIMessageStream({ stream: chunkStream })) { - latest = uiMsg // each yield is a newer state; keep the last one - } - - const uiMessages = latest ? [...priorUiHistory, latest] : priorUiHistory - return convertToModelMessages(uiMessages) // -> ModelMessage[] +type SessionStreamResult = { + stream: ReadableStream>> + completion: Promise<{ + userMessage?: AgentSessionMessageEntity + assistantMessage?: AgentSessionMessageEntity + }> } // Ensure errors emitted through SSE are serializable @@ -51,71 +42,69 @@ function serializeError(error: unknown): { message: string; name?: string; stack } } -// Chunk accumulator class to collect and reconstruct streaming data -class ChunkAccumulator { - private streamedChunks: UIMessageChunk[] = [] - private agentType: string = 'unknown' +class TextStreamAccumulator { + private textBuffer = '' + private totalText = '' + private readonly toolCalls = new Map() + private readonly toolResults = new Map() - addChunk(chunk: UIMessageChunk): void { - this.streamedChunks.push(chunk) - } - - // Create a ReadableStream from accumulated chunks - createChunkStream(): ReadableStream { - const chunks = [...this.streamedChunks] - - return new ReadableStream({ - start(controller) { - // Enqueue all chunks - for (const chunk of chunks) { - controller.enqueue(chunk) + add(part: TextStreamPart>): void { + switch (part.type) { + case 'text-start': + this.textBuffer = '' + break + case 'text-delta': + if (part.text) { + this.textBuffer += part.text } - controller.close() + break + case 'text-end': { + const blockText = (part.providerMetadata?.text?.value as string | undefined) ?? this.textBuffer + if (blockText) { + this.totalText += blockText + } + this.textBuffer = '' + break } - }) - } - - // Convert accumulated chunks to ModelMessages using chunksToModelMessages - async toModelMessages(priorUiHistory: UIMessage[] = []): Promise { - const chunkStream = this.createChunkStream() - return await chunksToModelMessages(chunkStream, priorUiHistory) + case 'tool-call': + if (part.toolCallId) { + this.toolCalls.set(part.toolCallId, { + toolName: part.toolName, + input: part.input ?? part.args ?? part.providerMetadata?.raw?.input + }) + } + break + case 'tool-result': + if (part.toolCallId) { + this.toolResults.set(part.toolCallId, part.output ?? part.result ?? part.providerMetadata?.raw) + } + break + default: + break + } } toModelMessage(role: ModelMessage['role'] = 'assistant'): ModelMessage { - // Reconstruct the content from chunks - let textContent = '' - const toolCalls: any[] = [] + const content = this.totalText || this.textBuffer || '' - for (const chunk of this.streamedChunks) { - if (chunk.type === 'text-delta' && 'delta' in chunk) { - textContent += chunk.delta - } else if (chunk.type === 'tool-input-available' && 'toolCallId' in chunk && 'toolName' in chunk) { - // Handle tool calls - use tool-input-available chunks - const toolCall = { - toolCallId: chunk.toolCallId, - toolName: chunk.toolName, - args: chunk.input || {} - } - toolCalls.push(toolCall) - } - } + const toolInvocations = Array.from(this.toolCalls.entries()).map(([toolCallId, info]) => ({ + toolCallId, + toolName: info.toolName, + args: info.input, + result: this.toolResults.get(toolCallId) + })) - const message: any = { + const message: Record = { role, - content: textContent + content } - // Add tool invocations if any - if (toolCalls.length > 0) { - message.toolInvocations = toolCalls + if (toolInvocations.length > 0) { + message.toolInvocations = toolInvocations } return message as ModelMessage } - - getAgentType(): string { - return this.agentType - } } export class SessionMessageService extends BaseService { @@ -170,28 +159,21 @@ export class SessionMessageService extends BaseService { return { messages } } - createSessionMessage( + async createSessionMessage( session: GetAgentSessionResponse, messageData: CreateSessionMessageRequest, abortController: AbortController - ): EventEmitter { + ): Promise { this.ensureInitialized() - // Create a new EventEmitter to manage the session message lifecycle - const sessionStream = new EventEmitter() - - // No parent validation needed, start immediately - this.startSessionMessageStream(session, messageData, sessionStream, abortController) - - return sessionStream + return await this.startSessionMessageStream(session, messageData, abortController) } private async startSessionMessageStream( session: GetAgentSessionResponse, req: CreateSessionMessageRequest, - sessionStream: EventEmitter, abortController: AbortController - ): Promise { + ): Promise { const agentSessionId = await this.getLastAgentSessionId(session.id) let newAgentSessionId = '' logger.debug('Session Message stream message data:', { message: req, session_id: agentSessionId }) @@ -202,98 +184,98 @@ export class SessionMessageService extends BaseService { throw new Error('Unsupported agent type for streaming') } - // Create the streaming agent invocation (using invokeStream for streaming) const claudeStream = await this.cc.invoke(req.content, session, abortController, agentSessionId) + const accumulator = new TextStreamAccumulator() - // Use chunk accumulator to manage streaming data - const accumulator = new ChunkAccumulator() + let resolveCompletion!: (value: { + userMessage?: AgentSessionMessageEntity + assistantMessage?: AgentSessionMessageEntity + }) => void + let rejectCompletion!: (reason?: unknown) => void - // Handle agent stream events (agent-agnostic) - claudeStream.on('data', async (event: any) => { - try { - switch (event.type) { - case 'chunk': - // Forward UIMessageChunk directly and collect raw agent messages - if (event.chunk) { - const chunk = event.chunk as UIMessageChunk - if (chunk.type === 'start' && chunk.messageId) { - newAgentSessionId = chunk.messageId + const completion = new Promise<{ + userMessage?: AgentSessionMessageEntity + assistantMessage?: AgentSessionMessageEntity + }>((resolve, reject) => { + resolveCompletion = resolve + rejectCompletion = reject + }) + + let finished = false + + const cleanup = () => { + if (finished) return + finished = true + claudeStream.removeAllListeners() + } + + const stream = new ReadableStream>>({ + start: (controller) => { + claudeStream.on('data', async (event: AgentStreamEvent) => { + if (finished) return + try { + switch (event.type) { + case 'chunk': { + const chunk = event.chunk as TextStreamPart> | undefined + if (!chunk) { + logger.warn('Received agent chunk event without chunk payload') + return + } + + if (chunk.type === 'start' && chunk.messageId) { + newAgentSessionId = chunk.messageId + } + + accumulator.add(chunk) + controller.enqueue(chunk) + break } - accumulator.addChunk(chunk) - sessionStream.emit('data', { - type: 'chunk', - chunk - }) - } else { - logger.warn('Received agent chunk event without chunk payload') - } - break + case 'error': { + const stderrMessage = (event as any)?.data?.stderr as string | undefined + const underlyingError = event.error ?? (stderrMessage ? new Error(stderrMessage) : undefined) + cleanup() + const streamError = underlyingError ?? new Error('Stream error') + controller.error(streamError) + rejectCompletion(serializeError(streamError)) + break + } - case 'error': { - const underlyingError = event.error || (event.data?.stderr ? new Error(event.data.stderr) : undefined) + case 'complete': { + cleanup() + controller.close() + resolveCompletion({}) + break + } - sessionStream.emit('data', { - type: 'error', - error: serializeError(underlyingError), - persistScheduled: false - }) - // Always emit a complete chunk at the end - sessionStream.emit('data', { - type: 'complete', - persistScheduled: false - }) - break - } + case 'cancelled': { + cleanup() + controller.close() + resolveCompletion({}) + break + } - case 'complete': { - try { - const persisted = await this.database.transaction(async (tx) => { - const userMessage = await this.persistUserMessage(tx, session.id, req.content, newAgentSessionId) - const assistantMessage = await this.persistAssistantMessage({ - tx, - session, - accumulator, - agentSessionId: newAgentSessionId + default: + logger.warn('Unknown event type from Claude Code service:', { + type: event.type }) - - return { userMessage, assistantMessage } - }) - - sessionStream.emit('data', { - type: 'persisted', - message: persisted.assistantMessage, - userMessage: persisted.userMessage - }) - } catch (persistError) { - sessionStream.emit('data', { - type: 'persist-error', - error: serializeError(persistError) - }) - } finally { - // Always emit a complete chunk at the end - sessionStream.emit('data', { - type: 'complete', - persistScheduled: true - }) + break } - break + } catch (error) { + cleanup() + controller.error(error) + rejectCompletion(serializeError(error)) } - - default: - logger.warn('Unknown event type from Claude Code service:', { - type: event.type - }) - break - } - } catch (error) { - logger.error('Error handling Claude Code stream event:', { error }) - sessionStream.emit('data', { - type: 'error', - error: serializeError(error) }) + }, + cancel: (reason) => { + cleanup() + abortController.abort(typeof reason === 'string' ? reason : 'stream cancelled') + resolveCompletion({}) } }) + + return { stream, completion } } private async getLastAgentSessionId(sessionId: string): Promise { @@ -317,75 +299,6 @@ export class SessionMessageService extends BaseService { } } - async persistUserMessage( - tx: any, - sessionId: string, - prompt: string, - agentSessionId: string - ): Promise { - this.ensureInitialized() - - const now = new Date().toISOString() - const insertData: InsertSessionMessageRow = { - session_id: sessionId, - role: 'user', - content: JSON.stringify({ role: 'user', content: prompt }), - agent_session_id: agentSessionId, - created_at: now, - updated_at: now - } - - const [saved] = await tx.insert(sessionMessagesTable).values(insertData).returning() - - return this.deserializeSessionMessage(saved) as AgentSessionMessageEntity - } - - private async persistAssistantMessage({ - tx, - session, - accumulator, - agentSessionId - }: { - tx: any - session: GetAgentSessionResponse - accumulator: ChunkAccumulator - agentSessionId: string - }): Promise { - if (!session?.id) { - const missingSessionError = new Error('Missing session_id for persisted message') - logger.error('error persisting session message', { error: missingSessionError }) - throw missingSessionError - } - - const sessionId = session.id - const now = new Date().toISOString() - - try { - // Use chunksToModelMessages to convert chunks to ModelMessages - const modelMessages = await accumulator.toModelMessages() - // Get the last message (should be the assistant's response) - const modelMessage = - modelMessages.length > 0 ? modelMessages[modelMessages.length - 1] : accumulator.toModelMessage('assistant') - - const insertData: InsertSessionMessageRow = { - session_id: sessionId, - role: 'assistant', - content: JSON.stringify(modelMessage), - agent_session_id: agentSessionId, - created_at: now, - updated_at: now - } - - const [saved] = await tx.insert(sessionMessagesTable).values(insertData).returning() - logger.debug('Success Persisted session message') - - return this.deserializeSessionMessage(saved) as AgentSessionMessageEntity - } catch (error) { - logger.error('Failed to persist session message', { error }) - throw error - } - } - private deserializeSessionMessage(data: any): AgentSessionMessageEntity { if (!data) return data diff --git a/src/main/services/agents/services/claudecode/aisdk-stream-protocel.md b/src/main/services/agents/services/claudecode/aisdk-stream-protocel.md deleted file mode 100644 index b00da2a54c..0000000000 --- a/src/main/services/agents/services/claudecode/aisdk-stream-protocel.md +++ /dev/null @@ -1,384 +0,0 @@ -AI SDK UI functions such as `useChat` and `useCompletion` support both text streams and data streams. The stream protocol defines how the data is streamed to the frontend on top of the HTTP protocol. - -This page describes both protocols and how to use them in the backend and frontend. - -You can use this information to develop custom backends and frontends for your use case, e.g., to provide compatible API endpoints that are implemented in a different language such as Python. - -For instance, here's an example using [FastAPI](https://github.com/vercel/ai/tree/main/examples/next-fastapi) as a backend. - -## Text Stream Protocol - -A text stream contains chunks in plain text, that are streamed to the frontend. Each chunk is then appended together to form a full text response. - -Text streams are supported by `useChat`, `useCompletion`, and `useObject`. When you use `useChat` or `useCompletion`, you need to enable text streaming by setting the `streamProtocol` options to `text`. - -You can generate text streams with `streamText` in the backend. When you call `toTextStreamResponse()` on the result object, a streaming HTTP response is returned. - -Text streams only support basic text data. If you need to stream other types of data such as tool calls, use data streams. - -### Text Stream Example - -Here is a Next.js example that uses the text stream protocol: - -app/page.tsx - -```tsx -'use client'; - -import { useChat } from '@ai-sdk/react'; -import { TextStreamChatTransport } from 'ai'; -import { useState } from 'react'; - -export default function Chat() { - const [input, setInput] = useState(''); - const { messages, sendMessage } = useChat({ - transport: new TextStreamChatTransport({ api: '/api/chat' }), - }); - - return ( -
- {messages.map(message => ( -
- {message.role === 'user' ? 'User: ' : 'AI: '} - {message.parts.map((part, i) => { - switch (part.type) { - case 'text': - return
{part.text}
; - } - })} -
- ))} - -
{ - e.preventDefault(); - sendMessage({ text: input }); - setInput(''); - }} - > - setInput(e.currentTarget.value)} - /> -
-
- ); -} -``` - -## Data Stream Protocol - -A data stream follows a special protocol that the AI SDK provides to send information to the frontend. - -The data stream protocol uses Server-Sent Events (SSE) format for improved standardization, keep-alive through ping, reconnect capabilities, and better cache handling. - -The following stream parts are currently supported: - -### Message Start Part - -Indicates the beginning of a new message with metadata. - -Format: Server-Sent Event with JSON object - -Example: - -``` -data: {"type":"start","messageId":"..."} -``` - -### Text Parts - -Text content is streamed using a start/delta/end pattern with unique IDs for each text block. - -#### Text Start Part - -Indicates the beginning of a text block. - -Format: Server-Sent Event with JSON object - -Example: - -``` -data: {"type":"text-start","id":"msg_68679a454370819ca74c8eb3d04379630dd1afb72306ca5d"} -``` - -#### Text Delta Part - -Contains incremental text content for the text block. - -Format: Server-Sent Event with JSON object - -Example: - -``` -data: {"type":"text-delta","id":"msg_68679a454370819ca74c8eb3d04379630dd1afb72306ca5d","delta":"Hello"} -``` - -#### Text End Part - -Indicates the completion of a text block. - -Format: Server-Sent Event with JSON object - -Example: - -``` -data: {"type":"text-end","id":"msg_68679a454370819ca74c8eb3d04379630dd1afb72306ca5d"} -``` - -### Reasoning Parts - -Reasoning content is streamed using a start/delta/end pattern with unique IDs for each reasoning block. - -#### Reasoning Start Part - -Indicates the beginning of a reasoning block. - -Format: Server-Sent Event with JSON object - -Example: - -``` -data: {"type":"reasoning-start","id":"reasoning_123"} -``` - -#### Reasoning Delta Part - -Contains incremental reasoning content for the reasoning block. - -Format: Server-Sent Event with JSON object - -Example: - -``` -data: {"type":"reasoning-delta","id":"reasoning_123","delta":"This is some reasoning"} -``` - -#### Reasoning End Part - -Indicates the completion of a reasoning block. - -Format: Server-Sent Event with JSON object - -Example: - -``` -data: {"type":"reasoning-end","id":"reasoning_123"} -``` - -### Source Parts - -Source parts provide references to external content sources. - -#### Source URL Part - -References to external URLs. - -Format: Server-Sent Event with JSON object - -Example: - -``` -data: {"type":"source-url","sourceId":"https://example.com","url":"https://example.com"} -``` - -#### Source Document Part - -References to documents or files. - -Format: Server-Sent Event with JSON object - -Example: - -``` -data: {"type":"source-document","sourceId":"https://example.com","mediaType":"file","title":"Title"} -``` - -### File Part - -The file parts contain references to files with their media type. - -Format: Server-Sent Event with JSON object - -Example: - -``` -data: {"type":"file","url":"https://example.com/file.png","mediaType":"image/png"} -``` - -### Data Parts - -Custom data parts allow streaming of arbitrary structured data with type-specific handling. - -Format: Server-Sent Event with JSON object where the type includes a custom suffix - -Example: - -``` -data: {"type":"data-weather","data":{"location":"SF","temperature":100}} -``` - -The `data-*` type pattern allows you to define custom data types that your frontend can handle specifically. - -The error parts are appended to the message as they are received. - -Format: Server-Sent Event with JSON object - -Example: - -``` -data: {"type":"error","errorText":"error message"} -``` - -### Tool Input Start Part - -Indicates the beginning of tool input streaming. - -Format: Server-Sent Event with JSON object - -Example: - -``` -data: {"type":"tool-input-start","toolCallId":"call_fJdQDqnXeGxTmr4E3YPSR7Ar","toolName":"getWeatherInformation"} -``` - -### Tool Input Delta Part - -Incremental chunks of tool input as it's being generated. - -Format: Server-Sent Event with JSON object - -Example: - -``` -data: {"type":"tool-input-delta","toolCallId":"call_fJdQDqnXeGxTmr4E3YPSR7Ar","inputTextDelta":"San Francisco"} -``` - -### Tool Input Available Part - -Indicates that tool input is complete and ready for execution. - -Format: Server-Sent Event with JSON object - -Example: - -``` -data: {"type":"tool-input-available","toolCallId":"call_fJdQDqnXeGxTmr4E3YPSR7Ar","toolName":"getWeatherInformation","input":{"city":"San Francisco"}} -``` - -### Tool Output Available Part - -Contains the result of tool execution. - -Format: Server-Sent Event with JSON object - -Example: - -``` -data: {"type":"tool-output-available","toolCallId":"call_fJdQDqnXeGxTmr4E3YPSR7Ar","output":{"city":"San Francisco","weather":"sunny"}} -``` - -### Start Step Part - -A part indicating the start of a step. - -Format: Server-Sent Event with JSON object - -Example: - -``` -data: {"type":"start-step"} -``` - -### Finish Step Part - -A part indicating that a step (i.e., one LLM API call in the backend) has been completed. - -This part is necessary to correctly process multiple stitched assistant calls, e.g. when calling tools in the backend, and using steps in `useChat` at the same time. - -Format: Server-Sent Event with JSON object - -Example: - -``` -data: {"type":"finish-step"} -``` - -### Finish Message Part - -A part indicating the completion of a message. - -Format: Server-Sent Event with JSON object - -Example: - -``` -data: {"type":"finish"} -``` - -### Stream Termination - -The stream ends with a special `[DONE]` marker. - -Format: Server-Sent Event with literal `[DONE]` - -Example: - -``` -data: [DONE] -``` - -The data stream protocol is supported by `useChat` and `useCompletion` on the frontend and used by default.`useCompletion` only supports the `text` and `data` stream parts. - -On the backend, you can use `toUIMessageStreamResponse()` from the `streamText` result object to return a streaming HTTP response. - -### UI Message Stream Example - -Here is a Next.js example that uses the UI message stream protocol: - -app/page.tsx - -```tsx -'use client'; - -import { useChat } from '@ai-sdk/react'; -import { useState } from 'react'; - -export default function Chat() { - const [input, setInput] = useState(''); - const { messages, sendMessage } = useChat(); - - return ( -
- {messages.map(message => ( -
- {message.role === 'user' ? 'User: ' : 'AI: '} - {message.parts.map((part, i) => { - switch (part.type) { - case 'text': - return
{part.text}
; - } - })} -
- ))} - -
{ - e.preventDefault(); - sendMessage({ text: input }); - setInput(''); - }} - > - setInput(e.currentTarget.value)} - /> -
-
- ); -} -``` diff --git a/src/main/services/agents/services/claudecode/index.ts b/src/main/services/agents/services/claudecode/index.ts index fe45f7edd3..df4444b839 100644 --- a/src/main/services/agents/services/claudecode/index.ts +++ b/src/main/services/agents/services/claudecode/index.ts @@ -9,7 +9,7 @@ import { validateModelId } from '@main/apiServer/utils' import { GetAgentSessionResponse } from '../..' import { AgentServiceInterface, AgentStream, AgentStreamEvent } from '../../interfaces/AgentStreamInterface' -import { transformSDKMessageToUIChunk } from './transform' +import { transformSDKMessageToStreamParts } from './transform' const require_ = createRequire(import.meta.url) const logger = loggerService.withContext('ClaudeCodeService') @@ -157,7 +157,7 @@ class ClaudeCodeService implements AgentServiceInterface { } // Transform SDKMessage to UIMessageChunks - const chunks = transformSDKMessageToUIChunk(message) + const chunks = transformSDKMessageToStreamParts(message) for (const chunk of chunks) { stream.emit('data', { type: 'chunk', diff --git a/src/main/services/agents/services/claudecode/map-claude-code-finish-reason.ts b/src/main/services/agents/services/claudecode/map-claude-code-finish-reason.ts new file mode 100644 index 0000000000..04748fbb55 --- /dev/null +++ b/src/main/services/agents/services/claudecode/map-claude-code-finish-reason.ts @@ -0,0 +1,34 @@ +// ported from https://github.com/ben-vargas/ai-sdk-provider-claude-code/blob/main/src/map-claude-code-finish-reason.ts#L22 +import type { LanguageModelV2FinishReason } from '@ai-sdk/provider' + +/** + * Maps Claude Code SDK result subtypes to AI SDK finish reasons. + * + * @param subtype - The result subtype from Claude Code SDK + * @returns The corresponding AI SDK finish reason + * + * @example + * ```typescript + * const finishReason = mapClaudeCodeFinishReason('error_max_turns'); + * // Returns: 'length' + * ``` + * + * @remarks + * Mappings: + * - 'success' -> 'stop' (normal completion) + * - 'error_max_turns' -> 'length' (hit turn limit) + * - 'error_during_execution' -> 'error' (execution error) + * - default -> 'stop' (unknown subtypes treated as normal completion) + */ +export function mapClaudeCodeFinishReason(subtype?: string): LanguageModelV2FinishReason { + switch (subtype) { + case 'success': + return 'stop' + case 'error_max_turns': + return 'length' + case 'error_during_execution': + return 'error' + default: + return 'stop' + } +} diff --git a/src/main/services/agents/services/claudecode/transform.ts b/src/main/services/agents/services/claudecode/transform.ts index 8061fd19a0..34ce05172a 100644 --- a/src/main/services/agents/services/claudecode/transform.ts +++ b/src/main/services/agents/services/claudecode/transform.ts @@ -1,21 +1,34 @@ // This file is used to transform claude code json response to aisdk streaming format +import type { LanguageModelV2Usage } from '@ai-sdk/provider' import { SDKMessage } from '@anthropic-ai/claude-code' import { loggerService } from '@logger' -import { ProviderMetadata, UIMessageChunk } from 'ai' +import type { ProviderMetadata, TextStreamPart } from 'ai' import { v4 as uuidv4 } from 'uuid' +import { mapClaudeCodeFinishReason } from './map-claude-code-finish-reason' + const logger = loggerService.withContext('ClaudeCodeTransform') +type AgentStreamPart = TextStreamPart> + +const contentBlockState = new Map< + string, + { + type: 'text' | 'tool-call' + toolCallId?: string + toolName?: string + input?: string + } +>() + // Helper function to generate unique IDs for text blocks -const generateMessageId = (): string => { - return `msg_${uuidv4().replace(/-/g, '')}` -} +const generateMessageId = (): string => `msg_${uuidv4().replace(/-/g, '')}` // Main transform function -export function transformSDKMessageToUIChunk(sdkMessage: SDKMessage): UIMessageChunk[] { - const chunks: UIMessageChunk[] = [] - +export function transformSDKMessageToStreamParts(sdkMessage: SDKMessage): AgentStreamPart[] { + const chunks: AgentStreamPart[] = [] + logger.debug('Transforming SDKMessage to stream parts', sdkMessage) switch (sdkMessage.type) { case 'assistant': case 'user': @@ -35,7 +48,6 @@ export function transformSDKMessageToUIChunk(sdkMessage: SDKMessage): UIMessageC break default: - // Handle unknown message types gracefully logger.warn('Unknown SDKMessage type:', { type: (sdkMessage as any).type }) break } @@ -43,36 +55,45 @@ export function transformSDKMessageToUIChunk(sdkMessage: SDKMessage): UIMessageC return chunks } -function sdkMessageToProviderMetadata(message: SDKMessage): ProviderMetadata { - const meta: ProviderMetadata = { - message: message as Record +const sdkMessageToProviderMetadata = (message: SDKMessage): ProviderMetadata => { + return { + anthropic: { + uuid: message.uuid || generateMessageId(), + session_id: message.session_id + }, + raw: message as Record } - return meta } -function generateTextChunks(id: string, text: string, message: SDKMessage): UIMessageChunk[] { +function generateTextChunks(id: string, text: string, message: SDKMessage): AgentStreamPart[] { + const providerMetadata = sdkMessageToProviderMetadata(message) return [ { type: 'text-start', - id + id, + providerMetadata }, { type: 'text-delta', id, - delta: text + text, + providerMetadata }, { type: 'text-end', id, providerMetadata: { - rawMessage: sdkMessageToProviderMetadata(message) + ...providerMetadata, + text: { + value: text + } } } ] } -function handleUserOrAssistantMessage(message: Extract): UIMessageChunk[] { - const chunks: UIMessageChunk[] = [] +function handleUserOrAssistantMessage(message: Extract): AgentStreamPart[] { + const chunks: AgentStreamPart[] = [] const messageId = message.uuid?.toString() || generateMessageId() // handle normal text content @@ -89,29 +110,25 @@ function handleUserOrAssistantMessage(message: Extract): UIMessageChunk[] { - const chunks: UIMessageChunk[] = [] +function handleStreamEvent(message: Extract): AgentStreamPart[] { + const chunks: AgentStreamPart[] = [] const event = message.event + const blockKey = `${message.uuid ?? message.session_id ?? 'session'}:${event.index}` switch (event.type) { case 'message_start': @@ -132,69 +150,110 @@ function handleStreamEvent(message: Extract): UIMessageChunk[] { - const chunks: UIMessageChunk[] = [] - - if (message.subtype === 'init') { - chunks.push({ - type: 'start', - messageId: message.session_id - }) - - // System initialization - could emit as a data chunk or skip - chunks.push({ - type: 'data-system' as any, - data: { - type: 'init', - session_id: message.session_id, - raw: message - } - }) - } else if (message.subtype === 'compact_boundary') { - chunks.push({ - type: 'data-system' as any, - data: { - type: 'compact_boundary', - metadata: message.compact_metadata, - raw: message - } - }) +function handleSystemMessage(message: Extract): AgentStreamPart[] { + const chunks: AgentStreamPart[] = [] + logger.debug('Received system message', { + subtype: message.subtype + }) + switch (message.subtype) { + case 'init': { + chunks.push({ + type: 'start' + }) + } } - - return chunks + return [] } // Handle result messages (completion with usage stats) -function handleResultMessage(message: Extract): UIMessageChunk[] { - const chunks: UIMessageChunk[] = [] +function handleResultMessage(message: Extract): AgentStreamPart[] { + const chunks: AgentStreamPart[] = [] - const messageId = message.uuid + let usage: LanguageModelV2Usage | undefined + if ('usage' in message) { + usage = { + inputTokens: + (message.usage.cache_creation_input_tokens ?? 0) + + (message.usage.cache_read_input_tokens ?? 0) + + (message.usage.input_tokens ?? 0), + outputTokens: message.usage.output_tokens ?? 0, + totalTokens: + (message.usage.cache_creation_input_tokens ?? 0) + + (message.usage.cache_read_input_tokens ?? 0) + + (message.usage.input_tokens ?? 0) + + (message.usage.output_tokens ?? 0) + } + } if (message.subtype === 'success') { - // Emit final result data chunks.push({ - type: 'data-result' as any, - id: messageId, - data: message, - transient: true - }) + type: 'finish', + totalUsage: usage, + finishReason: mapClaudeCodeFinishReason(message.subtype), + providerMetadata: { + ...sdkMessageToProviderMetadata(message), + usage: message.usage, + durationMs: message.duration_ms, + costUsd: message.total_cost_usd, + raw: message + } + } as AgentStreamPart) } else { - // Handle error cases chunks.push({ type: 'error', - errorText: `${message.subtype}: Process failed after ${message.num_turns} turns` - }) - } - - // Emit usage and cost data - chunks.push({ - type: 'data-usage' as any, - data: { - cost: message.total_cost_usd, - usage: { - input_tokens: message.usage.input_tokens, - cache_creation_input_tokens: message.usage.cache_creation_input_tokens, - cache_read_input_tokens: message.usage.cache_read_input_tokens, - output_tokens: message.usage.output_tokens, - service_tier: 'standard' + error: { + message: `${message.subtype}: Process failed after ${message.num_turns} turns` } - } - }) + } as AgentStreamPart) + } return chunks } // Convenience function to transform a stream of SDKMessages -export function* transformSDKMessageStream(sdkMessages: SDKMessage[]): Generator { +export function* transformSDKMessageStream(sdkMessages: SDKMessage[]): Generator { for (const sdkMessage of sdkMessages) { - const chunks = transformSDKMessageToUIChunk(sdkMessage) + const chunks = transformSDKMessageToStreamParts(sdkMessage) for (const chunk of chunks) { yield chunk } @@ -297,9 +344,9 @@ export function* transformSDKMessageStream(sdkMessages: SDKMessage[]): Generator // Async version for async iterables export async function* transformSDKMessageStreamAsync( sdkMessages: AsyncIterable -): AsyncGenerator { +): AsyncGenerator { for await (const sdkMessage of sdkMessages) { - const chunks = transformSDKMessageToUIChunk(sdkMessage) + const chunks = transformSDKMessageToStreamParts(sdkMessage) for (const chunk of chunks) { yield chunk } diff --git a/src/renderer/src/aiCore/chunk/AiSdkToChunkAdapter.ts b/src/renderer/src/aiCore/chunk/AiSdkToChunkAdapter.ts index a65c6fe790..d8a3bcca72 100644 --- a/src/renderer/src/aiCore/chunk/AiSdkToChunkAdapter.ts +++ b/src/renderer/src/aiCore/chunk/AiSdkToChunkAdapter.ts @@ -32,16 +32,19 @@ export class AiSdkToChunkAdapter { private accumulate: boolean | undefined private isFirstChunk = true private enableWebSearch: boolean = false + private onSessionUpdate?: (sessionId: string) => void constructor( private onChunk: (chunk: Chunk) => void, mcpTools: MCPTool[] = [], accumulate?: boolean, - enableWebSearch?: boolean + enableWebSearch?: boolean, + onSessionUpdate?: (sessionId: string) => void ) { this.toolCallHandler = new ToolCallChunkHandler(onChunk, mcpTools) this.accumulate = accumulate this.enableWebSearch = enableWebSearch || false + this.onSessionUpdate = onSessionUpdate } /** @@ -108,6 +111,15 @@ export class AiSdkToChunkAdapter { chunk: TextStreamPart, final: { text: string; reasoningContent: string; webSearchResults: AISDKWebSearchResult[]; reasoningId: string } ) { + const sessionId = + (chunk.providerMetadata as any)?.anthropic?.session_id ?? + (chunk.providerMetadata as any)?.anthropic?.sessionId ?? + (chunk.providerMetadata as any)?.raw?.session_id + + if (typeof sessionId === 'string' && sessionId) { + this.onSessionUpdate?.(sessionId) + } + logger.silly(`AI SDK chunk type: ${chunk.type}`, chunk) switch (chunk.type) { // === 文本相关事件 === diff --git a/src/renderer/src/api/agent.ts b/src/renderer/src/api/agent.ts index d3efc2aeeb..088a895ff0 100644 --- a/src/renderer/src/api/agent.ts +++ b/src/renderer/src/api/agent.ts @@ -10,7 +10,6 @@ import { CreateAgentResponse, CreateAgentResponseSchema, CreateSessionForm, - CreateSessionMessageRequest, CreateSessionRequest, CreateSessionResponse, CreateSessionResponseSchema, @@ -225,16 +224,6 @@ export class AgentApiClient { } } - public async createMessage(agentId: string, sessionId: string, content: string): Promise { - const url = this.getSessionMessagesPath(agentId, sessionId) - try { - const payload = { content } satisfies CreateSessionMessageRequest - await this.axios.post(url, payload) - } catch (error) { - throw processError(error, 'Failed to post message.') - } - } - public async getModels(props?: ApiModelsFilter): Promise { const url = this.getModelsPath(props) try { diff --git a/src/renderer/src/components/Popups/agent/AgentModal.tsx b/src/renderer/src/components/Popups/agent/AgentModal.tsx index 31e486079c..269ed2eb10 100644 --- a/src/renderer/src/components/Popups/agent/AgentModal.tsx +++ b/src/renderer/src/components/Popups/agent/AgentModal.tsx @@ -158,6 +158,35 @@ export const AgentModal: React.FC = ({ agent, trigger, isOpen: _isOpen, o })) }, []) + const addAccessiblePath = useCallback(async () => { + try { + const selected = await window.api.file.selectFolder() + if (!selected) { + return + } + setForm((prev) => { + if (prev.accessible_paths.includes(selected)) { + window.toast.warning(t('agent.session.accessible_paths.duplicate')) + return prev + } + return { + ...prev, + accessible_paths: [...prev.accessible_paths, selected] + } + }) + } catch (error) { + logger.error('Failed to select accessible path:', error as Error) + window.toast.error(t('agent.session.accessible_paths.select_failed')) + } + }, [t]) + + const removeAccessiblePath = useCallback((path: string) => { + setForm((prev) => ({ + ...prev, + accessible_paths: prev.accessible_paths.filter((item) => item !== path) + })) + }, []) + const modelOptions = useMemo(() => { // mocked data. not final version return (models ?? []).map((model) => ({ @@ -165,7 +194,8 @@ export const AgentModal: React.FC = ({ agent, trigger, isOpen: _isOpen, o key: model.id, label: model.name, avatar: getModelLogo(model.id), - providerId: model.provider + providerId: model.provider, + providerName: model.provider_name })) satisfies ModelOption[] }, [models]) @@ -197,6 +227,12 @@ export const AgentModal: React.FC = ({ agent, trigger, isOpen: _isOpen, o return } + if (form.accessible_paths.length === 0) { + window.toast.error(t('agent.session.accessible_paths.required')) + loadingRef.current = false + return + } + if (isEditing(agent)) { if (!agent) { throw new Error('Agent is required for editing mode') @@ -207,7 +243,8 @@ export const AgentModal: React.FC = ({ agent, trigger, isOpen: _isOpen, o name: form.name, description: form.description, instructions: form.instructions, - model: form.model + model: form.model, + accessible_paths: [...form.accessible_paths] } satisfies UpdateAgentForm updateAgent(updatePayload) @@ -309,6 +346,34 @@ export const AgentModal: React.FC = ({ agent, trigger, isOpen: _isOpen, o value={form.description ?? ''} onValueChange={onDescChange} /> +
+
+ + {t('agent.session.accessible_paths.label')} + + +
+ {form.accessible_paths.length > 0 ? ( +
+ {form.accessible_paths.map((path) => ( +
+ + {path} + + +
+ ))} +
+ ) : ( +

{t('agent.session.accessible_paths.empty')}

+ )} +