From 61aae7376af8a61ccbff5bf2b232657d4d8903b4 Mon Sep 17 00:00:00 2001 From: SuYao Date: Thu, 8 Jan 2026 17:55:15 +0800 Subject: [PATCH] fix: add dispose method to prevent abort listener leak (#12269) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix: add dispose method to prevent abort listener leak Add dispose() method to StreamAbortController that explicitly removes the abort event listener when stream ends normally. Previously, the listener would only be removed when abort was triggered ({ once: true }), but if the stream completed normally without abort, the listener would remain attached until garbage collection. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude * chore: format code 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --------- Co-authored-by: Claude --- .../routes/agents/handlers/messages.ts | 34 +++++++++---------- .../utils/createStreamAbortController.ts | 13 ++++++- 2 files changed, 29 insertions(+), 18 deletions(-) diff --git a/src/main/apiServer/routes/agents/handlers/messages.ts b/src/main/apiServer/routes/agents/handlers/messages.ts index 1b547abba8..abec51ec01 100644 --- a/src/main/apiServer/routes/agents/handlers/messages.ts +++ b/src/main/apiServer/routes/agents/handlers/messages.ts @@ -1,6 +1,10 @@ import { loggerService } from '@logger' import { MESSAGE_STREAM_TIMEOUT_MS } from '@main/apiServer/config/timeouts' -import { createStreamAbortController, STREAM_TIMEOUT_REASON } from '@main/apiServer/utils/createStreamAbortController' +import { + createStreamAbortController, + STREAM_TIMEOUT_REASON, + type StreamAbortController +} from '@main/apiServer/utils/createStreamAbortController' import { agentService, sessionMessageService, sessionService } from '@main/services/agents' import type { Request, Response } from 'express' @@ -26,7 +30,7 @@ const verifyAgentAndSession = async (agentId: string, sessionId: string) => { } export const createMessage = async (req: Request, res: Response): Promise => { - let clearAbortTimeout: (() => void) | undefined + let streamController: StreamAbortController | undefined try { const { agentId, sessionId } = req.params @@ -45,14 +49,10 @@ export const createMessage = async (req: Request, res: Response): Promise res.setHeader('Access-Control-Allow-Origin', '*') res.setHeader('Access-Control-Allow-Headers', 'Cache-Control') - const { - abortController, - registerAbortHandler, - clearAbortTimeout: helperClearAbortTimeout - } = createStreamAbortController({ + streamController = createStreamAbortController({ timeoutMs: MESSAGE_STREAM_TIMEOUT_MS }) - clearAbortTimeout = helperClearAbortTimeout + const { abortController, registerAbortHandler, dispose } = streamController const { stream, completion } = await sessionMessageService.createSessionMessage( session, messageData, @@ -64,8 +64,8 @@ export const createMessage = async (req: Request, res: Response): Promise let responseEnded = false let streamFinished = false - const cleanupAbortTimeout = () => { - clearAbortTimeout?.() + const cleanup = () => { + dispose() } const finalizeResponse = () => { @@ -78,7 +78,7 @@ export const createMessage = async (req: Request, res: Response): Promise } responseEnded = true - cleanupAbortTimeout() + cleanup() try { // res.write('data: {"type":"finish"}\n\n') res.write('data: [DONE]\n\n') @@ -108,7 +108,7 @@ export const createMessage = async (req: Request, res: Response): Promise * - Mark the response as ended to prevent further writes */ registerAbortHandler((abortReason) => { - cleanupAbortTimeout() + cleanup() if (responseEnded) return @@ -189,7 +189,7 @@ export const createMessage = async (req: Request, res: Response): Promise logger.error('Error writing stream error to SSE', { error: writeError }) } responseEnded = true - cleanupAbortTimeout() + cleanup() res.end() } } @@ -221,14 +221,14 @@ export const createMessage = async (req: Request, res: Response): Promise logger.error('Error writing completion error to SSE stream', { error: writeError }) } responseEnded = true - cleanupAbortTimeout() + cleanup() res.end() }) // Clear timeout when response ends - res.on('close', cleanupAbortTimeout) - res.on('finish', cleanupAbortTimeout) + res.on('close', cleanup) + res.on('finish', cleanup) } catch (error: any) { - clearAbortTimeout?.() + streamController?.dispose() logger.error('Error in streaming message handler', { error, agentId: req.params.agentId, diff --git a/src/main/apiServer/utils/createStreamAbortController.ts b/src/main/apiServer/utils/createStreamAbortController.ts index 243ad5b96e..e07b9a31f0 100644 --- a/src/main/apiServer/utils/createStreamAbortController.ts +++ b/src/main/apiServer/utils/createStreamAbortController.ts @@ -4,6 +4,7 @@ export interface StreamAbortController { abortController: AbortController registerAbortHandler: (handler: StreamAbortHandler) => void clearAbortTimeout: () => void + dispose: () => void } export const STREAM_TIMEOUT_REASON = 'stream timeout' @@ -40,6 +41,15 @@ export const createStreamAbortController = (options: CreateStreamAbortController signal.addEventListener('abort', handleAbort, { once: true }) + let disposed = false + + const dispose = () => { + if (disposed) return + disposed = true + clearAbortTimeout() + signal.removeEventListener('abort', handleAbort) + } + const registerAbortHandler = (handler: StreamAbortHandler) => { abortHandler = handler @@ -59,6 +69,7 @@ export const createStreamAbortController = (options: CreateStreamAbortController return { abortController, registerAbortHandler, - clearAbortTimeout + clearAbortTimeout, + dispose } }