From b282e4d72986591c6d80f894e324e507ca6d2951 Mon Sep 17 00:00:00 2001 From: Vaayne Date: Fri, 19 Sep 2025 23:42:46 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E2=9C=A8=20feat:=20implement=20robust=20Ab?= =?UTF-8?q?ortController=20for=20Claude=20Code=20agent=20streams?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add AbortController support to agent service interface and implementations - Enhance client disconnect detection with multiple HTTP event listeners (req.close, req.aborted, res.close) - Implement proper abort error handling in ClaudeCodeService with 'cancelled' event type - Add comprehensive documentation explaining SSE disconnect detection behavior - Clean up stream interfaces by removing unused properties and simplifying event structure - Extend timeout from 5 to 10 minutes for longer-running agent tasks - Ensure proper resource cleanup and prevent orphaned processes on client disconnect This enables reliable cancellation of long-running Claude Code processes when clients disconnect unexpectedly (browser tab close, curl Ctrl+C, network issues, etc.) --- .../routes/agents/handlers/messages.ts | 68 +++++++++------- .../agents/interfaces/AgentStreamInterface.ts | 11 ++- .../agents/services/SessionMessageService.ts | 13 +++- .../agents/services/claudecode/index.ts | 77 ++++++++----------- 4 files changed, 85 insertions(+), 84 deletions(-) diff --git a/src/main/apiServer/routes/agents/handlers/messages.ts b/src/main/apiServer/routes/agents/handlers/messages.ts index bd1ef5bc42..41a3bf44d6 100644 --- a/src/main/apiServer/routes/agents/handlers/messages.ts +++ b/src/main/apiServer/routes/agents/handlers/messages.ts @@ -1,4 +1,5 @@ import { loggerService } from '@logger' +import { AgentStreamEvent } from '@main/services/agents/interfaces/AgentStreamInterface' import { Request, Response } from 'express' import { agentService, sessionMessageService, sessionService } from '../../../../services/agents' @@ -42,13 +43,14 @@ export const createMessage = async (req: Request, res: Response): Promise res.setHeader('Access-Control-Allow-Origin', '*') res.setHeader('Access-Control-Allow-Headers', 'Cache-Control') - const messageStream = sessionMessageService.createSessionMessage(session, messageData) + const abortController = new AbortController() + const messageStream = sessionMessageService.createSessionMessage(session, messageData, abortController) // Track stream lifecycle so we keep the SSE connection open until persistence finishes let responseEnded = false let streamFinished = false let awaitingPersistence = false - let persistenceResolved = false + const persistenceResolved = false const finalizeResponse = () => { if (responseEnded) { @@ -73,15 +75,39 @@ export const createMessage = async (req: Request, res: Response): Promise res.end() } - // Handle client disconnect - req.on('close', () => { + /** + * Client Disconnect Detection for Server-Sent Events (SSE) + * + * We monitor multiple HTTP events to reliably detect when a client disconnects + * from the streaming response. This is crucial for: + * - Aborting long-running Claude Code processes + * - Cleaning up resources and preventing memory leaks + * - Avoiding orphaned processes + * + * Event Priority & Behavior: + * 1. res.on('close') - Most common for SSE client disconnects (browser tab close, curl Ctrl+C) + * 2. req.on('aborted') - Explicit request abortion + * 3. req.on('close') - Request object closure (less common with SSE) + * + * When any disconnect event fires, we: + * - Abort the Claude Code SDK process via abortController + * - Clean up event listeners to prevent memory leaks + * - Mark the response as ended to prevent further writes + */ + const handleDisconnect = () => { + if (responseEnded) return logger.info(`Client disconnected from streaming message for session: ${sessionId}`) responseEnded = true messageStream.removeAllListeners() - }) + abortController.abort('Client disconnected') + } + + req.on('close', handleDisconnect) + req.on('aborted', handleDisconnect) + res.on('close', handleDisconnect) // Handle stream events - messageStream.on('data', (event: any) => { + messageStream.on('data', (event: AgentStreamEvent) => { if (responseEnded) return try { @@ -101,12 +127,6 @@ export const createMessage = async (req: Request, res: Response): Promise logger.error(`Streaming message error for session: ${sessionId}:`, event.error) streamFinished = true - awaitingPersistence = Boolean(event.persistScheduled) - - if (!awaitingPersistence) { - persistenceResolved = true - } - finalizeResponse() break } @@ -121,23 +141,13 @@ export const createMessage = async (req: Request, res: Response): Promise break } - case 'persisted': - // Send persistence success event - // res.write(`data: ${JSON.stringify(event)}\n\n`) - logger.debug(`Session message persisted for session: ${sessionId}`, { messageId: event.message?.id }) - - persistenceResolved = true - finalizeResponse() - break - - case 'persist-error': - // Send persistence error event - // res.write(`data: ${JSON.stringify(event)}\n\n`) - logger.error(`Failed to persist session message for session: ${sessionId}:`, event.error) - - persistenceResolved = true + case 'cancelled': { + logger.info(`Streaming message cancelled for session: ${sessionId}`) + // res.write(`data: ${JSON.stringify({ type: 'cancelled' })}\n\n`) + streamFinished = true finalizeResponse() break + } default: // Handle other event types as generic data @@ -199,8 +209,8 @@ export const createMessage = async (req: Request, res: Response): Promise res.end() } }, - 5 * 60 * 1000 - ) // 5 minutes timeout + 10 * 60 * 1000 + ) // 10 minutes timeout // Clear timeout when response ends res.on('close', () => clearTimeout(timeout)) diff --git a/src/main/services/agents/interfaces/AgentStreamInterface.ts b/src/main/services/agents/interfaces/AgentStreamInterface.ts index 2cb82c4942..245224248a 100644 --- a/src/main/services/agents/interfaces/AgentStreamInterface.ts +++ b/src/main/services/agents/interfaces/AgentStreamInterface.ts @@ -8,11 +8,9 @@ import { UIMessageChunk } from 'ai' // Generic agent stream event that works with any agent type export interface AgentStreamEvent { - type: 'chunk' | 'error' | 'complete' + type: 'chunk' | 'error' | 'complete' | 'cancelled' chunk?: UIMessageChunk // Standard AI SDK chunk for UI consumption - rawAgentMessage?: any // Agent-specific raw message (SDKMessage for Claude Code, different for other agents) error?: Error - agentResult?: any // Agent-specific result data } // Agent stream interface that all agents should implement @@ -24,5 +22,10 @@ export interface AgentStream extends EventEmitter { // Base agent service interface export interface AgentServiceInterface { - invoke(prompt: string, session: GetAgentSessionResponse, lastAgentSessionId?: string): Promise + invoke( + prompt: string, + session: GetAgentSessionResponse, + abortController: AbortController, + lastAgentSessionId?: string + ): Promise } diff --git a/src/main/services/agents/services/SessionMessageService.ts b/src/main/services/agents/services/SessionMessageService.ts index f83bff5756..e50c4e8ffc 100644 --- a/src/main/services/agents/services/SessionMessageService.ts +++ b/src/main/services/agents/services/SessionMessageService.ts @@ -170,14 +170,18 @@ export class SessionMessageService extends BaseService { return { messages } } - createSessionMessage(session: GetAgentSessionResponse, messageData: CreateSessionMessageRequest): EventEmitter { + createSessionMessage( + session: GetAgentSessionResponse, + messageData: CreateSessionMessageRequest, + abortController: AbortController + ): EventEmitter { this.ensureInitialized() // Create a new EventEmitter to manage the session message lifecycle const sessionStream = new EventEmitter() // No parent validation needed, start immediately - this.startSessionMessageStream(session, messageData, sessionStream) + this.startSessionMessageStream(session, messageData, sessionStream, abortController) return sessionStream } @@ -185,7 +189,8 @@ export class SessionMessageService extends BaseService { private async startSessionMessageStream( session: GetAgentSessionResponse, req: CreateSessionMessageRequest, - sessionStream: EventEmitter + sessionStream: EventEmitter, + abortController: AbortController ): Promise { const agentSessionId = await this.getLastAgentSessionId(session.id) let newAgentSessionId = '' @@ -198,7 +203,7 @@ export class SessionMessageService extends BaseService { } // Create the streaming agent invocation (using invokeStream for streaming) - const claudeStream = await this.cc.invoke(req.content, session, agentSessionId) + const claudeStream = await this.cc.invoke(req.content, session, abortController, agentSessionId) // Use chunk accumulator to manage streaming data const accumulator = new ChunkAccumulator() diff --git a/src/main/services/agents/services/claudecode/index.ts b/src/main/services/agents/services/claudecode/index.ts index 7e76c5417f..fe45f7edd3 100644 --- a/src/main/services/agents/services/claudecode/index.ts +++ b/src/main/services/agents/services/claudecode/index.ts @@ -14,15 +14,6 @@ import { transformSDKMessageToUIChunk } from './transform' const require_ = createRequire(import.meta.url) const logger = loggerService.withContext('ClaudeCodeService') -interface ClaudeCodeResult { - success: boolean - stdout: string - stderr: string - jsonOutput: any[] - error?: Error - exitCode?: number -} - class ClaudeCodeStream extends EventEmitter implements AgentStream { declare emit: (event: 'data', data: AgentStreamEvent) => boolean declare on: (event: 'data', listener: (data: AgentStreamEvent) => void) => this @@ -37,7 +28,12 @@ class ClaudeCodeService implements AgentServiceInterface { this.claudeExecutablePath = require_.resolve('@anthropic-ai/claude-code/cli.js') } - async invoke(prompt: string, session: GetAgentSessionResponse, lastAgentSessionId?: string): Promise { + async invoke( + prompt: string, + session: GetAgentSessionResponse, + abortController: AbortController, + lastAgentSessionId?: string + ): Promise { const aiStream = new ClaudeCodeStream() // Validate session accessible paths and make sure it exists as a directory @@ -76,6 +72,7 @@ class ClaudeCodeService implements AgentServiceInterface { // Build SDK options from parameters const options: Options = { + abortController, cwd, pathToClaudeCodeExecutable: this.claudeExecutablePath, stderr: (chunk: string) => { @@ -164,8 +161,7 @@ class ClaudeCodeService implements AgentServiceInterface { for (const chunk of chunks) { stream.emit('data', { type: 'chunk', - chunk, - rawAgentMessage: message + chunk }) } } @@ -179,57 +175,44 @@ class ClaudeCodeService implements AgentServiceInterface { messageCount: jsonOutput.length }) - const result: ClaudeCodeResult = { - success: true, - stdout: '', - stderr: '', - jsonOutput, - exitCode: 0 - } - // Emit completion event stream.emit('data', { - type: 'complete', - agentResult: { - ...result, - rawSDKMessages: jsonOutput, - agentType: 'claude-code' - } + type: 'complete' }) } catch (error) { if (hasCompleted) return hasCompleted = true const duration = Date.now() - startTime + + // Check if this is an abort error + const errorObj = error as any + const isAborted = + errorObj?.name === 'AbortError' || + errorObj?.message?.includes('aborted') || + options.abortController?.signal.aborted + + if (isAborted) { + logger.info('SDK query aborted by client disconnect', { duration }) + // Simply cleanup and return - don't emit error events + stream.emit('data', { + type: 'cancelled', + error: new Error('Request aborted by client') + }) + return + } + + // Original error handling for non-abort errors logger.error('SDK query error:', { - error: error instanceof Error ? error.message : String(error), + error: errorObj instanceof Error ? errorObj.message : String(errorObj), duration, messageCount: jsonOutput.length }) - const result: ClaudeCodeResult = { - success: false, - stdout: '', - stderr: error instanceof Error ? error.message : String(error), - jsonOutput, - error: error instanceof Error ? error : new Error(String(error)), - exitCode: 1 - } - // Emit error event stream.emit('data', { type: 'error', - error: error instanceof Error ? error : new Error(String(error)) - }) - - // Emit completion with error result - stream.emit('data', { - type: 'complete', - agentResult: { - ...result, - rawSDKMessages: jsonOutput, - agentType: 'claude-code' - } + error: errorObj instanceof Error ? errorObj : new Error(String(errorObj)) }) } } From d56521260c6334047073052952df0270979c7d48 Mon Sep 17 00:00:00 2001 From: Vaayne Date: Sat, 20 Sep 2025 00:13:26 +0800 Subject: [PATCH 2/3] =?UTF-8?q?=E2=9A=A1=EF=B8=8F=20perf:=20add=20caching?= =?UTF-8?q?=20layer=20for=20MCP=20servers=20and=20providers=20data=20acces?= =?UTF-8?q?s?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Extract getServersFromRedux to shared utility getMCPServersFromRedux - Implement 5-minute TTL cache for MCP servers and providers - Reduce redundant Redux store queries in API server - Improve response times for frequently accessed data --- src/main/apiServer/services/mcp.ts | 31 +++--------------------------- src/main/apiServer/utils/index.ts | 17 +++++++++++++++- src/main/apiServer/utils/mcp.ts | 30 ++++++++++++++++++++++++----- 3 files changed, 44 insertions(+), 34 deletions(-) diff --git a/src/main/apiServer/services/mcp.ts b/src/main/apiServer/services/mcp.ts index 99f1732114..f18e662f2e 100644 --- a/src/main/apiServer/services/mcp.ts +++ b/src/main/apiServer/services/mcp.ts @@ -13,8 +13,7 @@ import { Request, Response } from 'express' import { IncomingMessage, ServerResponse } from 'http' import { loggerService } from '../../services/LoggerService' -import { reduxService } from '../../services/ReduxService' -import { getMcpServerById } from '../utils/mcp' +import { getMcpServerById, getMCPServersFromRedux } from '../utils/mcp' const logger = loggerService.withContext('MCPApiService') const transports: Record = {} @@ -57,34 +56,10 @@ class MCPApiService extends EventEmitter { this.transport.onmessage = this.onMessage } - /** - * Get servers directly from Redux store - */ - private async getServersFromRedux(): Promise { - try { - logger.silly('Getting servers from Redux store') - - // Try to get from cache first (faster) - const cachedServers = reduxService.selectSync('state.mcp.servers') - if (cachedServers && Array.isArray(cachedServers)) { - logger.silly(`Found ${cachedServers.length} servers in Redux cache`) - return cachedServers - } - - // If cache is not available, get fresh data - const servers = await reduxService.select('state.mcp.servers') - logger.silly(`Fetched ${servers?.length || 0} servers from Redux store`) - return servers || [] - } catch (error: any) { - logger.error('Failed to get servers from Redux:', error) - return [] - } - } - // get all activated servers async getAllServers(req: Request): Promise { try { - const servers = await this.getServersFromRedux() + const servers = await getMCPServersFromRedux() logger.silly(`Returning ${servers.length} servers`) const resp: McpServersResp = { servers: {} @@ -111,7 +86,7 @@ class MCPApiService extends EventEmitter { async getServerById(id: string): Promise { try { logger.silly(`getServerById called with id: ${id}`) - const servers = await this.getServersFromRedux() + const servers = await getMCPServersFromRedux() const server = servers.find((s) => s.id === id) if (!server) { logger.warn(`Server with id ${id} not found`) diff --git a/src/main/apiServer/utils/index.ts b/src/main/apiServer/utils/index.ts index f1e0d68454..bd2a9aa927 100644 --- a/src/main/apiServer/utils/index.ts +++ b/src/main/apiServer/utils/index.ts @@ -1,12 +1,24 @@ +import { CacheService } from '@main/services/CacheService' import { loggerService } from '@main/services/LoggerService' import { reduxService } from '@main/services/ReduxService' import { ApiModel, Model, Provider } from '@types' const logger = loggerService.withContext('ApiServerUtils') +// Cache configuration +const PROVIDERS_CACHE_KEY = 'api-server:providers' +const PROVIDERS_CACHE_TTL = 5 * 60 * 1000 // 5 minutes + export async function getAvailableProviders(): Promise { try { - // Wait for store to be ready before accessing providers + // Try to get from cache first (faster) + const cachedSupportedProviders = CacheService.get(PROVIDERS_CACHE_KEY) + if (cachedSupportedProviders) { + logger.debug(`Found ${cachedSupportedProviders.length} supported providers (from cache)`) + return cachedSupportedProviders + } + + // If cache is not available, get fresh data from Redux const providers = await reduxService.select('state.llm.providers') if (!providers || !Array.isArray(providers)) { logger.warn('No providers found in Redux store, returning empty array') @@ -18,6 +30,9 @@ export async function getAvailableProviders(): Promise { (p: Provider) => p.enabled && (p.type === 'openai' || p.type === 'anthropic') ) + // Cache the filtered results + CacheService.set(PROVIDERS_CACHE_KEY, supportedProviders, PROVIDERS_CACHE_TTL) + logger.info(`Filtered to ${supportedProviders.length} supported providers from ${providers.length} total providers`) return supportedProviders diff --git a/src/main/apiServer/utils/mcp.ts b/src/main/apiServer/utils/mcp.ts index 1ebe06ba68..380b8f1d99 100644 --- a/src/main/apiServer/utils/mcp.ts +++ b/src/main/apiServer/utils/mcp.ts @@ -1,3 +1,4 @@ +import { CacheService } from '@main/services/CacheService' import mcpService from '@main/services/MCPService' import { Server } from '@modelcontextprotocol/sdk/server/index.js' import { CallToolRequestSchema, ListToolsRequestSchema, ListToolsResult } from '@modelcontextprotocol/sdk/types.js' @@ -8,6 +9,10 @@ import { reduxService } from '../../services/ReduxService' const logger = loggerService.withContext('MCPApiService') +// Cache configuration +const MCP_SERVERS_CACHE_KEY = 'api-server:mcp-servers' +const MCP_SERVERS_CACHE_TTL = 5 * 60 * 1000 // 5 minutes + const cachedServers: Record = {} async function handleListToolsRequest(request: any, extra: any): Promise { @@ -33,18 +38,33 @@ async function handleCallToolRequest(request: any, extra: any): Promise { } async function getMcpServerConfigById(id: string): Promise { - const servers = await getServersFromRedux() + const servers = await getMCPServersFromRedux() return servers.find((s) => s.id === id || s.name === id) } /** * Get servers directly from Redux store */ -async function getServersFromRedux(): Promise { +export async function getMCPServersFromRedux(): Promise { try { + logger.silly('Getting servers from Redux store') + + // Try to get from cache first (faster) + const cachedServers = CacheService.get(MCP_SERVERS_CACHE_KEY) + if (cachedServers) { + logger.silly(`Found ${cachedServers.length} servers (from cache)`) + return cachedServers + } + + // If cache is not available, get fresh data from Redux const servers = await reduxService.select('state.mcp.servers') - logger.silly(`Fetched ${servers?.length || 0} servers from Redux store`) - return servers || [] + const serverList = servers || [] + + // Cache the results + CacheService.set(MCP_SERVERS_CACHE_KEY, serverList, MCP_SERVERS_CACHE_TTL) + + logger.silly(`Fetched ${serverList.length} servers from Redux store`) + return serverList } catch (error: any) { logger.error('Failed to get servers from Redux:', error) return [] @@ -54,7 +74,7 @@ async function getServersFromRedux(): Promise { export async function getMcpServerById(id: string): Promise { const server = cachedServers[id] if (!server) { - const servers = await getServersFromRedux() + const servers = await getMCPServersFromRedux() const mcpServer = servers.find((s) => s.id === id || s.name === id) if (!mcpServer) { throw new Error(`Server not found: ${id}`) From 17df1db120902a6d9a8352adf203efaa9ca204e1 Mon Sep 17 00:00:00 2001 From: Vaayne Date: Sat, 20 Sep 2025 00:14:02 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20refactor:=20simplify?= =?UTF-8?q?=20streaming=20message=20lifecycle=20management?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Remove unused persistence tracking variables in message handler - Simplify finalizeResponse logic by removing unnecessary checks - Change 'finish' event type to 'complete' for consistency - Add debug logging for streaming events - Clean up dead code and improve readability --- .../apiServer/routes/agents/handlers/messages.ts | 10 ++-------- .../agents/services/SessionMessageService.ts | 15 ++++----------- 2 files changed, 6 insertions(+), 19 deletions(-) diff --git a/src/main/apiServer/routes/agents/handlers/messages.ts b/src/main/apiServer/routes/agents/handlers/messages.ts index 41a3bf44d6..4ae62dcbda 100644 --- a/src/main/apiServer/routes/agents/handlers/messages.ts +++ b/src/main/apiServer/routes/agents/handlers/messages.ts @@ -49,8 +49,6 @@ export const createMessage = async (req: Request, res: Response): Promise // Track stream lifecycle so we keep the SSE connection open until persistence finishes let responseEnded = false let streamFinished = false - let awaitingPersistence = false - const persistenceResolved = false const finalizeResponse = () => { if (responseEnded) { @@ -61,10 +59,6 @@ export const createMessage = async (req: Request, res: Response): Promise return } - if (awaitingPersistence && !persistenceResolved) { - return - } - responseEnded = true try { res.write('data: {"type":"finish"}\n\n') @@ -136,7 +130,6 @@ export const createMessage = async (req: Request, res: Response): Promise // res.write(`data: ${JSON.stringify({ type: 'complete', result: event.result })}\n\n`) streamFinished = true - awaitingPersistence = true finalizeResponse() break } @@ -151,7 +144,8 @@ export const createMessage = async (req: Request, res: Response): Promise default: // Handle other event types as generic data - res.write(`data: ${JSON.stringify(event)}\n\n`) + logger.info(`Streaming message event for session: ${sessionId}:`, { event }) + // res.write(`data: ${JSON.stringify(event)}\n\n`) break } } catch (writeError) { diff --git a/src/main/services/agents/services/SessionMessageService.ts b/src/main/services/agents/services/SessionMessageService.ts index e50c4e8ffc..77f4133872 100644 --- a/src/main/services/agents/services/SessionMessageService.ts +++ b/src/main/services/agents/services/SessionMessageService.ts @@ -238,22 +238,15 @@ export class SessionMessageService extends BaseService { error: serializeError(underlyingError), persistScheduled: false }) - // Always emit a finish chunk at the end + // Always emit a complete chunk at the end sessionStream.emit('data', { - type: 'finish', + type: 'complete', persistScheduled: false }) break } case 'complete': { - const completionPayload = event.result ?? accumulator.toModelMessage('assistant') - - sessionStream.emit('data', { - type: 'complete', - result: completionPayload - }) - try { const persisted = await this.database.transaction(async (tx) => { const userMessage = await this.persistUserMessage(tx, session.id, req.content, newAgentSessionId) @@ -278,9 +271,9 @@ export class SessionMessageService extends BaseService { error: serializeError(persistError) }) } finally { - // Always emit a finish chunk at the end + // Always emit a complete chunk at the end sessionStream.emit('data', { - type: 'finish', + type: 'complete', persistScheduled: true }) }