feat: implement robust AbortController for Claude Code agent streams

- Add AbortController support to agent service interface and implementations
- Enhance client disconnect detection with multiple HTTP event listeners (req.close, req.aborted, res.close)
- Implement proper abort error handling in ClaudeCodeService with 'cancelled' event type
- Add comprehensive documentation explaining SSE disconnect detection behavior
- Clean up stream interfaces by removing unused properties and simplifying event structure
- Extend timeout from 5 to 10 minutes for longer-running agent tasks
- Ensure proper resource cleanup and prevent orphaned processes on client disconnect

This enables reliable cancellation of long-running Claude Code processes when clients
disconnect unexpectedly (browser tab close, curl Ctrl+C, network issues, etc.)
This commit is contained in:
Vaayne 2025-09-19 23:42:46 +08:00
parent c426876d0d
commit b282e4d729
4 changed files with 85 additions and 84 deletions

View File

@ -1,4 +1,5 @@
import { loggerService } from '@logger' import { loggerService } from '@logger'
import { AgentStreamEvent } from '@main/services/agents/interfaces/AgentStreamInterface'
import { Request, Response } from 'express' import { Request, Response } from 'express'
import { agentService, sessionMessageService, sessionService } from '../../../../services/agents' import { agentService, sessionMessageService, sessionService } from '../../../../services/agents'
@ -42,13 +43,14 @@ export const createMessage = async (req: Request, res: Response): Promise<void>
res.setHeader('Access-Control-Allow-Origin', '*') res.setHeader('Access-Control-Allow-Origin', '*')
res.setHeader('Access-Control-Allow-Headers', 'Cache-Control') res.setHeader('Access-Control-Allow-Headers', 'Cache-Control')
const messageStream = sessionMessageService.createSessionMessage(session, messageData) const abortController = new AbortController()
const messageStream = sessionMessageService.createSessionMessage(session, messageData, abortController)
// Track stream lifecycle so we keep the SSE connection open until persistence finishes // Track stream lifecycle so we keep the SSE connection open until persistence finishes
let responseEnded = false let responseEnded = false
let streamFinished = false let streamFinished = false
let awaitingPersistence = false let awaitingPersistence = false
let persistenceResolved = false const persistenceResolved = false
const finalizeResponse = () => { const finalizeResponse = () => {
if (responseEnded) { if (responseEnded) {
@ -73,15 +75,39 @@ export const createMessage = async (req: Request, res: Response): Promise<void>
res.end() res.end()
} }
// Handle client disconnect /**
req.on('close', () => { * Client Disconnect Detection for Server-Sent Events (SSE)
*
* We monitor multiple HTTP events to reliably detect when a client disconnects
* from the streaming response. This is crucial for:
* - Aborting long-running Claude Code processes
* - Cleaning up resources and preventing memory leaks
* - Avoiding orphaned processes
*
* Event Priority & Behavior:
* 1. res.on('close') - Most common for SSE client disconnects (browser tab close, curl Ctrl+C)
* 2. req.on('aborted') - Explicit request abortion
* 3. req.on('close') - Request object closure (less common with SSE)
*
* When any disconnect event fires, we:
* - Abort the Claude Code SDK process via abortController
* - Clean up event listeners to prevent memory leaks
* - Mark the response as ended to prevent further writes
*/
const handleDisconnect = () => {
if (responseEnded) return
logger.info(`Client disconnected from streaming message for session: ${sessionId}`) logger.info(`Client disconnected from streaming message for session: ${sessionId}`)
responseEnded = true responseEnded = true
messageStream.removeAllListeners() messageStream.removeAllListeners()
}) abortController.abort('Client disconnected')
}
req.on('close', handleDisconnect)
req.on('aborted', handleDisconnect)
res.on('close', handleDisconnect)
// Handle stream events // Handle stream events
messageStream.on('data', (event: any) => { messageStream.on('data', (event: AgentStreamEvent) => {
if (responseEnded) return if (responseEnded) return
try { try {
@ -101,12 +127,6 @@ export const createMessage = async (req: Request, res: Response): Promise<void>
logger.error(`Streaming message error for session: ${sessionId}:`, event.error) logger.error(`Streaming message error for session: ${sessionId}:`, event.error)
streamFinished = true streamFinished = true
awaitingPersistence = Boolean(event.persistScheduled)
if (!awaitingPersistence) {
persistenceResolved = true
}
finalizeResponse() finalizeResponse()
break break
} }
@ -121,23 +141,13 @@ export const createMessage = async (req: Request, res: Response): Promise<void>
break break
} }
case 'persisted': case 'cancelled': {
// Send persistence success event logger.info(`Streaming message cancelled for session: ${sessionId}`)
// res.write(`data: ${JSON.stringify(event)}\n\n`) // res.write(`data: ${JSON.stringify({ type: 'cancelled' })}\n\n`)
logger.debug(`Session message persisted for session: ${sessionId}`, { messageId: event.message?.id }) streamFinished = true
persistenceResolved = true
finalizeResponse()
break
case 'persist-error':
// Send persistence error event
// res.write(`data: ${JSON.stringify(event)}\n\n`)
logger.error(`Failed to persist session message for session: ${sessionId}:`, event.error)
persistenceResolved = true
finalizeResponse() finalizeResponse()
break break
}
default: default:
// Handle other event types as generic data // Handle other event types as generic data
@ -199,8 +209,8 @@ export const createMessage = async (req: Request, res: Response): Promise<void>
res.end() res.end()
} }
}, },
5 * 60 * 1000 10 * 60 * 1000
) // 5 minutes timeout ) // 10 minutes timeout
// Clear timeout when response ends // Clear timeout when response ends
res.on('close', () => clearTimeout(timeout)) res.on('close', () => clearTimeout(timeout))

View File

@ -8,11 +8,9 @@ import { UIMessageChunk } from 'ai'
// Generic agent stream event that works with any agent type // Generic agent stream event that works with any agent type
export interface AgentStreamEvent { export interface AgentStreamEvent {
type: 'chunk' | 'error' | 'complete' type: 'chunk' | 'error' | 'complete' | 'cancelled'
chunk?: UIMessageChunk // Standard AI SDK chunk for UI consumption chunk?: UIMessageChunk // Standard AI SDK chunk for UI consumption
rawAgentMessage?: any // Agent-specific raw message (SDKMessage for Claude Code, different for other agents)
error?: Error error?: Error
agentResult?: any // Agent-specific result data
} }
// Agent stream interface that all agents should implement // Agent stream interface that all agents should implement
@ -24,5 +22,10 @@ export interface AgentStream extends EventEmitter {
// Base agent service interface // Base agent service interface
export interface AgentServiceInterface { export interface AgentServiceInterface {
invoke(prompt: string, session: GetAgentSessionResponse, lastAgentSessionId?: string): Promise<AgentStream> invoke(
prompt: string,
session: GetAgentSessionResponse,
abortController: AbortController,
lastAgentSessionId?: string
): Promise<AgentStream>
} }

View File

@ -170,14 +170,18 @@ export class SessionMessageService extends BaseService {
return { messages } return { messages }
} }
createSessionMessage(session: GetAgentSessionResponse, messageData: CreateSessionMessageRequest): EventEmitter { createSessionMessage(
session: GetAgentSessionResponse,
messageData: CreateSessionMessageRequest,
abortController: AbortController
): EventEmitter {
this.ensureInitialized() this.ensureInitialized()
// Create a new EventEmitter to manage the session message lifecycle // Create a new EventEmitter to manage the session message lifecycle
const sessionStream = new EventEmitter() const sessionStream = new EventEmitter()
// No parent validation needed, start immediately // No parent validation needed, start immediately
this.startSessionMessageStream(session, messageData, sessionStream) this.startSessionMessageStream(session, messageData, sessionStream, abortController)
return sessionStream return sessionStream
} }
@ -185,7 +189,8 @@ export class SessionMessageService extends BaseService {
private async startSessionMessageStream( private async startSessionMessageStream(
session: GetAgentSessionResponse, session: GetAgentSessionResponse,
req: CreateSessionMessageRequest, req: CreateSessionMessageRequest,
sessionStream: EventEmitter sessionStream: EventEmitter,
abortController: AbortController
): Promise<void> { ): Promise<void> {
const agentSessionId = await this.getLastAgentSessionId(session.id) const agentSessionId = await this.getLastAgentSessionId(session.id)
let newAgentSessionId = '' let newAgentSessionId = ''
@ -198,7 +203,7 @@ export class SessionMessageService extends BaseService {
} }
// Create the streaming agent invocation (using invokeStream for streaming) // Create the streaming agent invocation (using invokeStream for streaming)
const claudeStream = await this.cc.invoke(req.content, session, agentSessionId) const claudeStream = await this.cc.invoke(req.content, session, abortController, agentSessionId)
// Use chunk accumulator to manage streaming data // Use chunk accumulator to manage streaming data
const accumulator = new ChunkAccumulator() const accumulator = new ChunkAccumulator()

View File

@ -14,15 +14,6 @@ import { transformSDKMessageToUIChunk } from './transform'
const require_ = createRequire(import.meta.url) const require_ = createRequire(import.meta.url)
const logger = loggerService.withContext('ClaudeCodeService') const logger = loggerService.withContext('ClaudeCodeService')
interface ClaudeCodeResult {
success: boolean
stdout: string
stderr: string
jsonOutput: any[]
error?: Error
exitCode?: number
}
class ClaudeCodeStream extends EventEmitter implements AgentStream { class ClaudeCodeStream extends EventEmitter implements AgentStream {
declare emit: (event: 'data', data: AgentStreamEvent) => boolean declare emit: (event: 'data', data: AgentStreamEvent) => boolean
declare on: (event: 'data', listener: (data: AgentStreamEvent) => void) => this declare on: (event: 'data', listener: (data: AgentStreamEvent) => void) => this
@ -37,7 +28,12 @@ class ClaudeCodeService implements AgentServiceInterface {
this.claudeExecutablePath = require_.resolve('@anthropic-ai/claude-code/cli.js') this.claudeExecutablePath = require_.resolve('@anthropic-ai/claude-code/cli.js')
} }
async invoke(prompt: string, session: GetAgentSessionResponse, lastAgentSessionId?: string): Promise<AgentStream> { async invoke(
prompt: string,
session: GetAgentSessionResponse,
abortController: AbortController,
lastAgentSessionId?: string
): Promise<AgentStream> {
const aiStream = new ClaudeCodeStream() const aiStream = new ClaudeCodeStream()
// Validate session accessible paths and make sure it exists as a directory // Validate session accessible paths and make sure it exists as a directory
@ -76,6 +72,7 @@ class ClaudeCodeService implements AgentServiceInterface {
// Build SDK options from parameters // Build SDK options from parameters
const options: Options = { const options: Options = {
abortController,
cwd, cwd,
pathToClaudeCodeExecutable: this.claudeExecutablePath, pathToClaudeCodeExecutable: this.claudeExecutablePath,
stderr: (chunk: string) => { stderr: (chunk: string) => {
@ -164,8 +161,7 @@ class ClaudeCodeService implements AgentServiceInterface {
for (const chunk of chunks) { for (const chunk of chunks) {
stream.emit('data', { stream.emit('data', {
type: 'chunk', type: 'chunk',
chunk, chunk
rawAgentMessage: message
}) })
} }
} }
@ -179,57 +175,44 @@ class ClaudeCodeService implements AgentServiceInterface {
messageCount: jsonOutput.length messageCount: jsonOutput.length
}) })
const result: ClaudeCodeResult = {
success: true,
stdout: '',
stderr: '',
jsonOutput,
exitCode: 0
}
// Emit completion event // Emit completion event
stream.emit('data', { stream.emit('data', {
type: 'complete', type: 'complete'
agentResult: {
...result,
rawSDKMessages: jsonOutput,
agentType: 'claude-code'
}
}) })
} catch (error) { } catch (error) {
if (hasCompleted) return if (hasCompleted) return
hasCompleted = true hasCompleted = true
const duration = Date.now() - startTime const duration = Date.now() - startTime
// Check if this is an abort error
const errorObj = error as any
const isAborted =
errorObj?.name === 'AbortError' ||
errorObj?.message?.includes('aborted') ||
options.abortController?.signal.aborted
if (isAborted) {
logger.info('SDK query aborted by client disconnect', { duration })
// Simply cleanup and return - don't emit error events
stream.emit('data', {
type: 'cancelled',
error: new Error('Request aborted by client')
})
return
}
// Original error handling for non-abort errors
logger.error('SDK query error:', { logger.error('SDK query error:', {
error: error instanceof Error ? error.message : String(error), error: errorObj instanceof Error ? errorObj.message : String(errorObj),
duration, duration,
messageCount: jsonOutput.length messageCount: jsonOutput.length
}) })
const result: ClaudeCodeResult = {
success: false,
stdout: '',
stderr: error instanceof Error ? error.message : String(error),
jsonOutput,
error: error instanceof Error ? error : new Error(String(error)),
exitCode: 1
}
// Emit error event // Emit error event
stream.emit('data', { stream.emit('data', {
type: 'error', type: 'error',
error: error instanceof Error ? error : new Error(String(error)) error: errorObj instanceof Error ? errorObj : new Error(String(errorObj))
})
// Emit completion with error result
stream.emit('data', {
type: 'complete',
agentResult: {
...result,
rawSDKMessages: jsonOutput,
agentType: 'claude-code'
}
}) })
} }
} }