♻️ refactor: centralize agent stream timeouts

This commit is contained in:
Vaayne 2025-09-29 13:06:47 +08:00
parent 9ea3f0842c
commit 3428d15299
5 changed files with 132 additions and 40 deletions

View File

@ -6,15 +6,11 @@ This file provides guidance to AI coding assistants when working with code in th
- **Keep it clear**: Write code that is easy to read, maintain, and explain.
- **Match the house style**: Reuse existing patterns, naming, and conventions.
- **Ship correctness**: Make changes that are robust, tested, and type-safe.
- **Mind performance**: Choose efficient solutions and avoid wasted resources.
- **Search smart**: Prefer `ast-grep` for semantic queries; fall back to `rg`/`grep` when needed.
- **Build with HeroUI**: Use HeroUI for every new UI component; never add `antd` or `styled-components`.
- **Guard quality**: Run `yarn build:check` before you finish or commit; fix formatting or i18n drift first if required.
- **Log centrally**: Route all logging through `loggerService` with the right context—no `console.log`.
- **Research via subagent**: Lean on `subagent` for external docs, APIs, news, and references.
- **Seek review**: Ask a human developer to review substantial changes before merging.
- **Document changes**: Update or create docs for new features or significant edits.
- **Commit in rhythm**: Keep commits small, conventional, and emoji-tagged.
## Development Commands

View File

@ -3,6 +3,7 @@ import cors from 'cors'
import express from 'express'
import { v4 as uuidv4 } from 'uuid'
import { LONG_POLL_TIMEOUT_MS } from './config/timeouts'
import { authMiddleware } from './middleware/auth'
import { errorHandler } from './middleware/error'
import { setupOpenAPIDocumentation } from './middleware/openapi'
@ -14,7 +15,6 @@ import { modelsRoutes } from './routes/models'
const logger = loggerService.withContext('ApiServer')
const LONG_POLL_TIMEOUT_MS = 120 * 60_000 // 120 minutes
const extendMessagesTimeout: express.RequestHandler = (req, res, next) => {
req.setTimeout(LONG_POLL_TIMEOUT_MS)
res.setTimeout(LONG_POLL_TIMEOUT_MS)

View File

@ -0,0 +1,3 @@
export const LONG_POLL_TIMEOUT_MS = 120 * 60_000 // 120 minutes
export const MESSAGE_STREAM_TIMEOUT_MS = LONG_POLL_TIMEOUT_MS

View File

@ -2,6 +2,8 @@ import { loggerService } from '@logger'
import { Request, Response } from 'express'
import { agentService, sessionMessageService, sessionService } from '../../../../services/agents'
import { MESSAGE_STREAM_TIMEOUT_MS } from '../../../config/timeouts'
import { createStreamAbortController, STREAM_TIMEOUT_REASON } from '../../../utils/createStreamAbortController'
const logger = loggerService.withContext('ApiServerMessagesHandlers')
@ -25,6 +27,8 @@ const verifyAgentAndSession = async (agentId: string, sessionId: string) => {
}
export const createMessage = async (req: Request, res: Response): Promise<void> => {
let clearAbortTimeout: (() => void) | undefined
try {
const { agentId, sessionId } = req.params
@ -42,7 +46,14 @@ export const createMessage = async (req: Request, res: Response): Promise<void>
res.setHeader('Access-Control-Allow-Origin', '*')
res.setHeader('Access-Control-Allow-Headers', 'Cache-Control')
const abortController = new AbortController()
const {
abortController,
registerAbortHandler,
clearAbortTimeout: helperClearAbortTimeout
} = createStreamAbortController({
timeoutMs: MESSAGE_STREAM_TIMEOUT_MS
})
clearAbortTimeout = helperClearAbortTimeout
const { stream, completion } = await sessionMessageService.createSessionMessage(
session,
messageData,
@ -54,6 +65,10 @@ export const createMessage = async (req: Request, res: Response): Promise<void>
let responseEnded = false
let streamFinished = false
const cleanupAbortTimeout = () => {
clearAbortTimeout?.()
}
const finalizeResponse = () => {
if (responseEnded) {
return
@ -64,6 +79,7 @@ export const createMessage = async (req: Request, res: Response): Promise<void>
}
responseEnded = true
cleanupAbortTimeout()
try {
// res.write('data: {"type":"finish"}\n\n')
res.write('data: [DONE]\n\n')
@ -92,12 +108,51 @@ export const createMessage = async (req: Request, res: Response): Promise<void>
* - Clean up event listeners to prevent memory leaks
* - Mark the response as ended to prevent further writes
*/
const handleDisconnect = () => {
registerAbortHandler((abortReason) => {
cleanupAbortTimeout()
if (responseEnded) return
logger.info('Streaming client disconnected', { agentId, sessionId })
responseEnded = true
if (abortReason === STREAM_TIMEOUT_REASON) {
logger.error('Streaming message timeout', { agentId, sessionId })
try {
res.write(
`data: ${JSON.stringify({
type: 'error',
error: {
message: 'Stream timeout',
type: 'timeout_error',
code: 'stream_timeout'
}
})}\n\n`
)
} catch (writeError) {
logger.error('Error writing timeout to SSE stream', { error: writeError })
}
} else if (abortReason === 'Client disconnected') {
logger.info('Streaming client disconnected', { agentId, sessionId })
} else {
logger.warn('Streaming aborted', { agentId, sessionId, reason: abortReason })
}
reader.cancel(abortReason ?? 'stream aborted').catch(() => {})
if (!res.headersSent) {
res.setHeader('Content-Type', 'text/event-stream')
res.setHeader('Cache-Control', 'no-cache')
res.setHeader('Connection', 'keep-alive')
}
if (!res.writableEnded) {
res.end()
}
})
const handleDisconnect = () => {
if (abortController.signal.aborted) return
abortController.abort('Client disconnected')
reader.cancel('Client disconnected').catch(() => {})
}
req.on('close', handleDisconnect)
@ -135,6 +190,7 @@ export const createMessage = async (req: Request, res: Response): Promise<void>
logger.error('Error writing stream error to SSE', { error: writeError })
}
responseEnded = true
cleanupAbortTimeout()
res.end()
}
}
@ -166,41 +222,14 @@ export const createMessage = async (req: Request, res: Response): Promise<void>
logger.error('Error writing completion error to SSE stream', { error: writeError })
}
responseEnded = true
cleanupAbortTimeout()
res.end()
})
// Set a timeout to prevent hanging indefinitely
const timeout = setTimeout(
() => {
if (!responseEnded) {
logger.error('Streaming message timeout', { agentId, sessionId })
try {
res.write(
`data: ${JSON.stringify({
type: 'error',
error: {
message: 'Stream timeout',
type: 'timeout_error',
code: 'stream_timeout'
}
})}\n\n`
)
} catch (writeError) {
logger.error('Error writing timeout to SSE stream', { error: writeError })
}
abortController.abort('stream timeout')
reader.cancel('stream timeout').catch(() => {})
responseEnded = true
res.end()
}
},
10 * 60 * 1000
) // 10 minutes timeout
// Clear timeout when response ends
res.on('close', () => clearTimeout(timeout))
res.on('finish', () => clearTimeout(timeout))
res.on('close', cleanupAbortTimeout)
res.on('finish', cleanupAbortTimeout)
} catch (error: any) {
clearAbortTimeout?.()
logger.error('Error in streaming message handler', {
error,
agentId: req.params.agentId,

View File

@ -0,0 +1,64 @@
export type StreamAbortHandler = (reason: unknown) => void
export interface StreamAbortController {
abortController: AbortController
registerAbortHandler: (handler: StreamAbortHandler) => void
clearAbortTimeout: () => void
}
export const STREAM_TIMEOUT_REASON = 'stream timeout'
interface CreateStreamAbortControllerOptions {
timeoutMs: number
}
export const createStreamAbortController = (options: CreateStreamAbortControllerOptions): StreamAbortController => {
const { timeoutMs } = options
const abortController = new AbortController()
const signal = abortController.signal
let timeoutId: NodeJS.Timeout | undefined
let abortHandler: StreamAbortHandler | undefined
const clearAbortTimeout = () => {
if (!timeoutId) {
return
}
clearTimeout(timeoutId)
timeoutId = undefined
}
const handleAbort = () => {
clearAbortTimeout()
if (!abortHandler) {
return
}
abortHandler(signal.reason)
}
signal.addEventListener('abort', handleAbort, { once: true })
const registerAbortHandler = (handler: StreamAbortHandler) => {
abortHandler = handler
if (signal.aborted) {
abortHandler(signal.reason)
}
}
if (timeoutMs > 0) {
timeoutId = setTimeout(() => {
if (!signal.aborted) {
abortController.abort(STREAM_TIMEOUT_REASON)
}
}, timeoutMs)
}
return {
abortController,
registerAbortHandler,
clearAbortTimeout
}
}