fix: add dispose method to prevent abort listener leak (#12269)

* fix: add dispose method to prevent abort listener leak

Add dispose() method to StreamAbortController that explicitly removes
the abort event listener when stream ends normally. Previously, the
listener would only be removed when abort was triggered ({ once: true }),
but if the stream completed normally without abort, the listener would
remain attached until garbage collection.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

* chore: format code

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

---------

Co-authored-by: Claude <noreply@anthropic.com>
This commit is contained in:
SuYao 2026-01-08 17:55:15 +08:00 committed by GitHub
parent 74e1d0887d
commit 61aae7376a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 29 additions and 18 deletions

View File

@ -1,6 +1,10 @@
import { loggerService } from '@logger' import { loggerService } from '@logger'
import { MESSAGE_STREAM_TIMEOUT_MS } from '@main/apiServer/config/timeouts' import { MESSAGE_STREAM_TIMEOUT_MS } from '@main/apiServer/config/timeouts'
import { createStreamAbortController, STREAM_TIMEOUT_REASON } from '@main/apiServer/utils/createStreamAbortController' import {
createStreamAbortController,
STREAM_TIMEOUT_REASON,
type StreamAbortController
} from '@main/apiServer/utils/createStreamAbortController'
import { agentService, sessionMessageService, sessionService } from '@main/services/agents' import { agentService, sessionMessageService, sessionService } from '@main/services/agents'
import type { Request, Response } from 'express' import type { Request, Response } from 'express'
@ -26,7 +30,7 @@ const verifyAgentAndSession = async (agentId: string, sessionId: string) => {
} }
export const createMessage = async (req: Request, res: Response): Promise<void> => { export const createMessage = async (req: Request, res: Response): Promise<void> => {
let clearAbortTimeout: (() => void) | undefined let streamController: StreamAbortController | undefined
try { try {
const { agentId, sessionId } = req.params const { agentId, sessionId } = req.params
@ -45,14 +49,10 @@ export const createMessage = async (req: Request, res: Response): Promise<void>
res.setHeader('Access-Control-Allow-Origin', '*') res.setHeader('Access-Control-Allow-Origin', '*')
res.setHeader('Access-Control-Allow-Headers', 'Cache-Control') res.setHeader('Access-Control-Allow-Headers', 'Cache-Control')
const { streamController = createStreamAbortController({
abortController,
registerAbortHandler,
clearAbortTimeout: helperClearAbortTimeout
} = createStreamAbortController({
timeoutMs: MESSAGE_STREAM_TIMEOUT_MS timeoutMs: MESSAGE_STREAM_TIMEOUT_MS
}) })
clearAbortTimeout = helperClearAbortTimeout const { abortController, registerAbortHandler, dispose } = streamController
const { stream, completion } = await sessionMessageService.createSessionMessage( const { stream, completion } = await sessionMessageService.createSessionMessage(
session, session,
messageData, messageData,
@ -64,8 +64,8 @@ export const createMessage = async (req: Request, res: Response): Promise<void>
let responseEnded = false let responseEnded = false
let streamFinished = false let streamFinished = false
const cleanupAbortTimeout = () => { const cleanup = () => {
clearAbortTimeout?.() dispose()
} }
const finalizeResponse = () => { const finalizeResponse = () => {
@ -78,7 +78,7 @@ export const createMessage = async (req: Request, res: Response): Promise<void>
} }
responseEnded = true responseEnded = true
cleanupAbortTimeout() cleanup()
try { try {
// res.write('data: {"type":"finish"}\n\n') // res.write('data: {"type":"finish"}\n\n')
res.write('data: [DONE]\n\n') res.write('data: [DONE]\n\n')
@ -108,7 +108,7 @@ export const createMessage = async (req: Request, res: Response): Promise<void>
* - Mark the response as ended to prevent further writes * - Mark the response as ended to prevent further writes
*/ */
registerAbortHandler((abortReason) => { registerAbortHandler((abortReason) => {
cleanupAbortTimeout() cleanup()
if (responseEnded) return if (responseEnded) return
@ -189,7 +189,7 @@ export const createMessage = async (req: Request, res: Response): Promise<void>
logger.error('Error writing stream error to SSE', { error: writeError }) logger.error('Error writing stream error to SSE', { error: writeError })
} }
responseEnded = true responseEnded = true
cleanupAbortTimeout() cleanup()
res.end() res.end()
} }
} }
@ -221,14 +221,14 @@ export const createMessage = async (req: Request, res: Response): Promise<void>
logger.error('Error writing completion error to SSE stream', { error: writeError }) logger.error('Error writing completion error to SSE stream', { error: writeError })
} }
responseEnded = true responseEnded = true
cleanupAbortTimeout() cleanup()
res.end() res.end()
}) })
// Clear timeout when response ends // Clear timeout when response ends
res.on('close', cleanupAbortTimeout) res.on('close', cleanup)
res.on('finish', cleanupAbortTimeout) res.on('finish', cleanup)
} catch (error: any) { } catch (error: any) {
clearAbortTimeout?.() streamController?.dispose()
logger.error('Error in streaming message handler', { logger.error('Error in streaming message handler', {
error, error,
agentId: req.params.agentId, agentId: req.params.agentId,

View File

@ -4,6 +4,7 @@ export interface StreamAbortController {
abortController: AbortController abortController: AbortController
registerAbortHandler: (handler: StreamAbortHandler) => void registerAbortHandler: (handler: StreamAbortHandler) => void
clearAbortTimeout: () => void clearAbortTimeout: () => void
dispose: () => void
} }
export const STREAM_TIMEOUT_REASON = 'stream timeout' export const STREAM_TIMEOUT_REASON = 'stream timeout'
@ -40,6 +41,15 @@ export const createStreamAbortController = (options: CreateStreamAbortController
signal.addEventListener('abort', handleAbort, { once: true }) signal.addEventListener('abort', handleAbort, { once: true })
let disposed = false
const dispose = () => {
if (disposed) return
disposed = true
clearAbortTimeout()
signal.removeEventListener('abort', handleAbort)
}
const registerAbortHandler = (handler: StreamAbortHandler) => { const registerAbortHandler = (handler: StreamAbortHandler) => {
abortHandler = handler abortHandler = handler
@ -59,6 +69,7 @@ export const createStreamAbortController = (options: CreateStreamAbortController
return { return {
abortController, abortController,
registerAbortHandler, registerAbortHandler,
clearAbortTimeout clearAbortTimeout,
dispose
} }
} }