diff --git a/src/main/apiServer/routes/agents/handlers/messages.ts b/src/main/apiServer/routes/agents/handlers/messages.ts index 1b547abba8..abec51ec01 100644 --- a/src/main/apiServer/routes/agents/handlers/messages.ts +++ b/src/main/apiServer/routes/agents/handlers/messages.ts @@ -1,6 +1,10 @@ import { loggerService } from '@logger' import { MESSAGE_STREAM_TIMEOUT_MS } from '@main/apiServer/config/timeouts' -import { createStreamAbortController, STREAM_TIMEOUT_REASON } from '@main/apiServer/utils/createStreamAbortController' +import { + createStreamAbortController, + STREAM_TIMEOUT_REASON, + type StreamAbortController +} from '@main/apiServer/utils/createStreamAbortController' import { agentService, sessionMessageService, sessionService } from '@main/services/agents' import type { Request, Response } from 'express' @@ -26,7 +30,7 @@ const verifyAgentAndSession = async (agentId: string, sessionId: string) => { } export const createMessage = async (req: Request, res: Response): Promise => { - let clearAbortTimeout: (() => void) | undefined + let streamController: StreamAbortController | undefined try { const { agentId, sessionId } = req.params @@ -45,14 +49,10 @@ export const createMessage = async (req: Request, res: Response): Promise res.setHeader('Access-Control-Allow-Origin', '*') res.setHeader('Access-Control-Allow-Headers', 'Cache-Control') - const { - abortController, - registerAbortHandler, - clearAbortTimeout: helperClearAbortTimeout - } = createStreamAbortController({ + streamController = createStreamAbortController({ timeoutMs: MESSAGE_STREAM_TIMEOUT_MS }) - clearAbortTimeout = helperClearAbortTimeout + const { abortController, registerAbortHandler, dispose } = streamController const { stream, completion } = await sessionMessageService.createSessionMessage( session, messageData, @@ -64,8 +64,8 @@ export const createMessage = async (req: Request, res: Response): Promise let responseEnded = false let streamFinished = false - const cleanupAbortTimeout = () => { - clearAbortTimeout?.() + const cleanup = () => { + dispose() } const finalizeResponse = () => { @@ -78,7 +78,7 @@ export const createMessage = async (req: Request, res: Response): Promise } responseEnded = true - cleanupAbortTimeout() + cleanup() try { // res.write('data: {"type":"finish"}\n\n') res.write('data: [DONE]\n\n') @@ -108,7 +108,7 @@ export const createMessage = async (req: Request, res: Response): Promise * - Mark the response as ended to prevent further writes */ registerAbortHandler((abortReason) => { - cleanupAbortTimeout() + cleanup() if (responseEnded) return @@ -189,7 +189,7 @@ export const createMessage = async (req: Request, res: Response): Promise logger.error('Error writing stream error to SSE', { error: writeError }) } responseEnded = true - cleanupAbortTimeout() + cleanup() res.end() } } @@ -221,14 +221,14 @@ export const createMessage = async (req: Request, res: Response): Promise logger.error('Error writing completion error to SSE stream', { error: writeError }) } responseEnded = true - cleanupAbortTimeout() + cleanup() res.end() }) // Clear timeout when response ends - res.on('close', cleanupAbortTimeout) - res.on('finish', cleanupAbortTimeout) + res.on('close', cleanup) + res.on('finish', cleanup) } catch (error: any) { - clearAbortTimeout?.() + streamController?.dispose() logger.error('Error in streaming message handler', { error, agentId: req.params.agentId, diff --git a/src/main/apiServer/utils/createStreamAbortController.ts b/src/main/apiServer/utils/createStreamAbortController.ts index 243ad5b96e..e07b9a31f0 100644 --- a/src/main/apiServer/utils/createStreamAbortController.ts +++ b/src/main/apiServer/utils/createStreamAbortController.ts @@ -4,6 +4,7 @@ export interface StreamAbortController { abortController: AbortController registerAbortHandler: (handler: StreamAbortHandler) => void clearAbortTimeout: () => void + dispose: () => void } export const STREAM_TIMEOUT_REASON = 'stream timeout' @@ -40,6 +41,15 @@ export const createStreamAbortController = (options: CreateStreamAbortController signal.addEventListener('abort', handleAbort, { once: true }) + let disposed = false + + const dispose = () => { + if (disposed) return + disposed = true + clearAbortTimeout() + signal.removeEventListener('abort', handleAbort) + } + const registerAbortHandler = (handler: StreamAbortHandler) => { abortHandler = handler @@ -59,6 +69,7 @@ export const createStreamAbortController = (options: CreateStreamAbortController return { abortController, registerAbortHandler, - clearAbortTimeout + clearAbortTimeout, + dispose } }