From dea4bad4cab8d3d1fae087d2413759c383a14dc9 Mon Sep 17 00:00:00 2001 From: suyao Date: Sun, 4 Jan 2026 17:29:13 +0800 Subject: [PATCH] fix: add dispose method to prevent abort listener leak MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add dispose() method to StreamAbortController that explicitly removes the abort event listener when stream ends normally. Previously, the listener would only be removed when abort was triggered ({ once: true }), but if the stream completed normally without abort, the listener would remain attached until garbage collection. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- .../routes/agents/handlers/messages.ts | 34 +++++++++---------- .../utils/createStreamAbortController.ts | 13 ++++++- 2 files changed, 29 insertions(+), 18 deletions(-) diff --git a/src/main/apiServer/routes/agents/handlers/messages.ts b/src/main/apiServer/routes/agents/handlers/messages.ts index 1b547abba8..abec51ec01 100644 --- a/src/main/apiServer/routes/agents/handlers/messages.ts +++ b/src/main/apiServer/routes/agents/handlers/messages.ts @@ -1,6 +1,10 @@ import { loggerService } from '@logger' import { MESSAGE_STREAM_TIMEOUT_MS } from '@main/apiServer/config/timeouts' -import { createStreamAbortController, STREAM_TIMEOUT_REASON } from '@main/apiServer/utils/createStreamAbortController' +import { + createStreamAbortController, + STREAM_TIMEOUT_REASON, + type StreamAbortController +} from '@main/apiServer/utils/createStreamAbortController' import { agentService, sessionMessageService, sessionService } from '@main/services/agents' import type { Request, Response } from 'express' @@ -26,7 +30,7 @@ const verifyAgentAndSession = async (agentId: string, sessionId: string) => { } export const createMessage = async (req: Request, res: Response): Promise => { - let clearAbortTimeout: (() => void) | undefined + let streamController: StreamAbortController | undefined try { const { agentId, sessionId } = req.params @@ -45,14 +49,10 @@ export const createMessage = async (req: Request, res: Response): Promise res.setHeader('Access-Control-Allow-Origin', '*') res.setHeader('Access-Control-Allow-Headers', 'Cache-Control') - const { - abortController, - registerAbortHandler, - clearAbortTimeout: helperClearAbortTimeout - } = createStreamAbortController({ + streamController = createStreamAbortController({ timeoutMs: MESSAGE_STREAM_TIMEOUT_MS }) - clearAbortTimeout = helperClearAbortTimeout + const { abortController, registerAbortHandler, dispose } = streamController const { stream, completion } = await sessionMessageService.createSessionMessage( session, messageData, @@ -64,8 +64,8 @@ export const createMessage = async (req: Request, res: Response): Promise let responseEnded = false let streamFinished = false - const cleanupAbortTimeout = () => { - clearAbortTimeout?.() + const cleanup = () => { + dispose() } const finalizeResponse = () => { @@ -78,7 +78,7 @@ export const createMessage = async (req: Request, res: Response): Promise } responseEnded = true - cleanupAbortTimeout() + cleanup() try { // res.write('data: {"type":"finish"}\n\n') res.write('data: [DONE]\n\n') @@ -108,7 +108,7 @@ export const createMessage = async (req: Request, res: Response): Promise * - Mark the response as ended to prevent further writes */ registerAbortHandler((abortReason) => { - cleanupAbortTimeout() + cleanup() if (responseEnded) return @@ -189,7 +189,7 @@ export const createMessage = async (req: Request, res: Response): Promise logger.error('Error writing stream error to SSE', { error: writeError }) } responseEnded = true - cleanupAbortTimeout() + cleanup() res.end() } } @@ -221,14 +221,14 @@ export const createMessage = async (req: Request, res: Response): Promise logger.error('Error writing completion error to SSE stream', { error: writeError }) } responseEnded = true - cleanupAbortTimeout() + cleanup() res.end() }) // Clear timeout when response ends - res.on('close', cleanupAbortTimeout) - res.on('finish', cleanupAbortTimeout) + res.on('close', cleanup) + res.on('finish', cleanup) } catch (error: any) { - clearAbortTimeout?.() + streamController?.dispose() logger.error('Error in streaming message handler', { error, agentId: req.params.agentId, diff --git a/src/main/apiServer/utils/createStreamAbortController.ts b/src/main/apiServer/utils/createStreamAbortController.ts index 243ad5b96e..e07b9a31f0 100644 --- a/src/main/apiServer/utils/createStreamAbortController.ts +++ b/src/main/apiServer/utils/createStreamAbortController.ts @@ -4,6 +4,7 @@ export interface StreamAbortController { abortController: AbortController registerAbortHandler: (handler: StreamAbortHandler) => void clearAbortTimeout: () => void + dispose: () => void } export const STREAM_TIMEOUT_REASON = 'stream timeout' @@ -40,6 +41,15 @@ export const createStreamAbortController = (options: CreateStreamAbortController signal.addEventListener('abort', handleAbort, { once: true }) + let disposed = false + + const dispose = () => { + if (disposed) return + disposed = true + clearAbortTimeout() + signal.removeEventListener('abort', handleAbort) + } + const registerAbortHandler = (handler: StreamAbortHandler) => { abortHandler = handler @@ -59,6 +69,7 @@ export const createStreamAbortController = (options: CreateStreamAbortController return { abortController, registerAbortHandler, - clearAbortTimeout + clearAbortTimeout, + dispose } }