From 3428d15299b86af1e6de000dbef08d7210cb4081 Mon Sep 17 00:00:00 2001 From: Vaayne Date: Mon, 29 Sep 2025 13:06:47 +0800 Subject: [PATCH] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20refactor:=20centralize=20a?= =?UTF-8?q?gent=20stream=20timeouts?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CLAUDE.md | 4 - src/main/apiServer/app.ts | 2 +- src/main/apiServer/config/timeouts.ts | 3 + .../routes/agents/handlers/messages.ts | 99 ++++++++++++------- .../utils/createStreamAbortController.ts | 64 ++++++++++++ 5 files changed, 132 insertions(+), 40 deletions(-) create mode 100644 src/main/apiServer/config/timeouts.ts create mode 100644 src/main/apiServer/utils/createStreamAbortController.ts diff --git a/CLAUDE.md b/CLAUDE.md index bfd724e316..748c48f608 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -6,15 +6,11 @@ This file provides guidance to AI coding assistants when working with code in th - **Keep it clear**: Write code that is easy to read, maintain, and explain. - **Match the house style**: Reuse existing patterns, naming, and conventions. -- **Ship correctness**: Make changes that are robust, tested, and type-safe. -- **Mind performance**: Choose efficient solutions and avoid wasted resources. - **Search smart**: Prefer `ast-grep` for semantic queries; fall back to `rg`/`grep` when needed. - **Build with HeroUI**: Use HeroUI for every new UI component; never add `antd` or `styled-components`. -- **Guard quality**: Run `yarn build:check` before you finish or commit; fix formatting or i18n drift first if required. - **Log centrally**: Route all logging through `loggerService` with the right context—no `console.log`. - **Research via subagent**: Lean on `subagent` for external docs, APIs, news, and references. - **Seek review**: Ask a human developer to review substantial changes before merging. -- **Document changes**: Update or create docs for new features or significant edits. - **Commit in rhythm**: Keep commits small, conventional, and emoji-tagged. ## Development Commands diff --git a/src/main/apiServer/app.ts b/src/main/apiServer/app.ts index fa195549a9..a645e96740 100644 --- a/src/main/apiServer/app.ts +++ b/src/main/apiServer/app.ts @@ -3,6 +3,7 @@ import cors from 'cors' import express from 'express' import { v4 as uuidv4 } from 'uuid' +import { LONG_POLL_TIMEOUT_MS } from './config/timeouts' import { authMiddleware } from './middleware/auth' import { errorHandler } from './middleware/error' import { setupOpenAPIDocumentation } from './middleware/openapi' @@ -14,7 +15,6 @@ import { modelsRoutes } from './routes/models' const logger = loggerService.withContext('ApiServer') -const LONG_POLL_TIMEOUT_MS = 120 * 60_000 // 120 minutes const extendMessagesTimeout: express.RequestHandler = (req, res, next) => { req.setTimeout(LONG_POLL_TIMEOUT_MS) res.setTimeout(LONG_POLL_TIMEOUT_MS) diff --git a/src/main/apiServer/config/timeouts.ts b/src/main/apiServer/config/timeouts.ts new file mode 100644 index 0000000000..2c5e077430 --- /dev/null +++ b/src/main/apiServer/config/timeouts.ts @@ -0,0 +1,3 @@ +export const LONG_POLL_TIMEOUT_MS = 120 * 60_000 // 120 minutes + +export const MESSAGE_STREAM_TIMEOUT_MS = LONG_POLL_TIMEOUT_MS diff --git a/src/main/apiServer/routes/agents/handlers/messages.ts b/src/main/apiServer/routes/agents/handlers/messages.ts index 844ac71160..2cc8668e07 100644 --- a/src/main/apiServer/routes/agents/handlers/messages.ts +++ b/src/main/apiServer/routes/agents/handlers/messages.ts @@ -2,6 +2,8 @@ import { loggerService } from '@logger' import { Request, Response } from 'express' import { agentService, sessionMessageService, sessionService } from '../../../../services/agents' +import { MESSAGE_STREAM_TIMEOUT_MS } from '../../../config/timeouts' +import { createStreamAbortController, STREAM_TIMEOUT_REASON } from '../../../utils/createStreamAbortController' const logger = loggerService.withContext('ApiServerMessagesHandlers') @@ -25,6 +27,8 @@ const verifyAgentAndSession = async (agentId: string, sessionId: string) => { } export const createMessage = async (req: Request, res: Response): Promise => { + let clearAbortTimeout: (() => void) | undefined + try { const { agentId, sessionId } = req.params @@ -42,7 +46,14 @@ export const createMessage = async (req: Request, res: Response): Promise res.setHeader('Access-Control-Allow-Origin', '*') res.setHeader('Access-Control-Allow-Headers', 'Cache-Control') - const abortController = new AbortController() + const { + abortController, + registerAbortHandler, + clearAbortTimeout: helperClearAbortTimeout + } = createStreamAbortController({ + timeoutMs: MESSAGE_STREAM_TIMEOUT_MS + }) + clearAbortTimeout = helperClearAbortTimeout const { stream, completion } = await sessionMessageService.createSessionMessage( session, messageData, @@ -54,6 +65,10 @@ export const createMessage = async (req: Request, res: Response): Promise let responseEnded = false let streamFinished = false + const cleanupAbortTimeout = () => { + clearAbortTimeout?.() + } + const finalizeResponse = () => { if (responseEnded) { return @@ -64,6 +79,7 @@ export const createMessage = async (req: Request, res: Response): Promise } responseEnded = true + cleanupAbortTimeout() try { // res.write('data: {"type":"finish"}\n\n') res.write('data: [DONE]\n\n') @@ -92,12 +108,51 @@ export const createMessage = async (req: Request, res: Response): Promise * - Clean up event listeners to prevent memory leaks * - Mark the response as ended to prevent further writes */ - const handleDisconnect = () => { + registerAbortHandler((abortReason) => { + cleanupAbortTimeout() + if (responseEnded) return - logger.info('Streaming client disconnected', { agentId, sessionId }) + responseEnded = true + + if (abortReason === STREAM_TIMEOUT_REASON) { + logger.error('Streaming message timeout', { agentId, sessionId }) + try { + res.write( + `data: ${JSON.stringify({ + type: 'error', + error: { + message: 'Stream timeout', + type: 'timeout_error', + code: 'stream_timeout' + } + })}\n\n` + ) + } catch (writeError) { + logger.error('Error writing timeout to SSE stream', { error: writeError }) + } + } else if (abortReason === 'Client disconnected') { + logger.info('Streaming client disconnected', { agentId, sessionId }) + } else { + logger.warn('Streaming aborted', { agentId, sessionId, reason: abortReason }) + } + + reader.cancel(abortReason ?? 'stream aborted').catch(() => {}) + + if (!res.headersSent) { + res.setHeader('Content-Type', 'text/event-stream') + res.setHeader('Cache-Control', 'no-cache') + res.setHeader('Connection', 'keep-alive') + } + + if (!res.writableEnded) { + res.end() + } + }) + + const handleDisconnect = () => { + if (abortController.signal.aborted) return abortController.abort('Client disconnected') - reader.cancel('Client disconnected').catch(() => {}) } req.on('close', handleDisconnect) @@ -135,6 +190,7 @@ export const createMessage = async (req: Request, res: Response): Promise logger.error('Error writing stream error to SSE', { error: writeError }) } responseEnded = true + cleanupAbortTimeout() res.end() } } @@ -166,41 +222,14 @@ export const createMessage = async (req: Request, res: Response): Promise logger.error('Error writing completion error to SSE stream', { error: writeError }) } responseEnded = true + cleanupAbortTimeout() res.end() }) - - // Set a timeout to prevent hanging indefinitely - const timeout = setTimeout( - () => { - if (!responseEnded) { - logger.error('Streaming message timeout', { agentId, sessionId }) - try { - res.write( - `data: ${JSON.stringify({ - type: 'error', - error: { - message: 'Stream timeout', - type: 'timeout_error', - code: 'stream_timeout' - } - })}\n\n` - ) - } catch (writeError) { - logger.error('Error writing timeout to SSE stream', { error: writeError }) - } - abortController.abort('stream timeout') - reader.cancel('stream timeout').catch(() => {}) - responseEnded = true - res.end() - } - }, - 10 * 60 * 1000 - ) // 10 minutes timeout - // Clear timeout when response ends - res.on('close', () => clearTimeout(timeout)) - res.on('finish', () => clearTimeout(timeout)) + res.on('close', cleanupAbortTimeout) + res.on('finish', cleanupAbortTimeout) } catch (error: any) { + clearAbortTimeout?.() logger.error('Error in streaming message handler', { error, agentId: req.params.agentId, diff --git a/src/main/apiServer/utils/createStreamAbortController.ts b/src/main/apiServer/utils/createStreamAbortController.ts new file mode 100644 index 0000000000..243ad5b96e --- /dev/null +++ b/src/main/apiServer/utils/createStreamAbortController.ts @@ -0,0 +1,64 @@ +export type StreamAbortHandler = (reason: unknown) => void + +export interface StreamAbortController { + abortController: AbortController + registerAbortHandler: (handler: StreamAbortHandler) => void + clearAbortTimeout: () => void +} + +export const STREAM_TIMEOUT_REASON = 'stream timeout' + +interface CreateStreamAbortControllerOptions { + timeoutMs: number +} + +export const createStreamAbortController = (options: CreateStreamAbortControllerOptions): StreamAbortController => { + const { timeoutMs } = options + const abortController = new AbortController() + const signal = abortController.signal + + let timeoutId: NodeJS.Timeout | undefined + let abortHandler: StreamAbortHandler | undefined + + const clearAbortTimeout = () => { + if (!timeoutId) { + return + } + clearTimeout(timeoutId) + timeoutId = undefined + } + + const handleAbort = () => { + clearAbortTimeout() + + if (!abortHandler) { + return + } + + abortHandler(signal.reason) + } + + signal.addEventListener('abort', handleAbort, { once: true }) + + const registerAbortHandler = (handler: StreamAbortHandler) => { + abortHandler = handler + + if (signal.aborted) { + abortHandler(signal.reason) + } + } + + if (timeoutMs > 0) { + timeoutId = setTimeout(() => { + if (!signal.aborted) { + abortController.abort(STREAM_TIMEOUT_REASON) + } + }, timeoutMs) + } + + return { + abortController, + registerAbortHandler, + clearAbortTimeout + } +}