feat(chat): enhance chat completion error handling and streaming support

feat(messages): improve message validation and add streaming support
feat(tests): add HTTP tests for chat and message endpoints
This commit is contained in:
Vaayne 2025-09-19 16:16:26 +08:00
parent 73380d76df
commit 2cf2f04a70
5 changed files with 432 additions and 256 deletions

View File

@ -1,15 +1,105 @@
import express, { Request, Response } from 'express'
import OpenAI from 'openai'
import { ChatCompletionCreateParams } from 'openai/resources'
import { loggerService } from '../../services/LoggerService'
import { chatCompletionService } from '../services/chat-completion'
import { validateModelId } from '../utils'
import {
ChatCompletionModelError,
chatCompletionService,
ChatCompletionValidationError
} from '../services/chat-completion'
const logger = loggerService.withContext('ApiServerChatRoutes')
const router = express.Router()
interface ErrorResponseBody {
error: {
message: string
type: string
code: string
}
}
const mapChatCompletionError = (error: unknown): { status: number; body: ErrorResponseBody } => {
if (error instanceof ChatCompletionValidationError) {
logger.warn('Chat completion validation error:', {
errors: error.errors
})
return {
status: 400,
body: {
error: {
message: error.errors.join('; '),
type: 'invalid_request_error',
code: 'validation_failed'
}
}
}
}
if (error instanceof ChatCompletionModelError) {
logger.warn('Chat completion model error:', error.error)
return {
status: 400,
body: {
error: {
message: error.error.message,
type: 'invalid_request_error',
code: error.error.code
}
}
}
}
if (error instanceof Error) {
let statusCode = 500
let errorType = 'server_error'
let errorCode = 'internal_error'
if (error.message.includes('API key') || error.message.includes('authentication')) {
statusCode = 401
errorType = 'authentication_error'
errorCode = 'invalid_api_key'
} else if (error.message.includes('rate limit') || error.message.includes('quota')) {
statusCode = 429
errorType = 'rate_limit_error'
errorCode = 'rate_limit_exceeded'
} else if (error.message.includes('timeout') || error.message.includes('connection')) {
statusCode = 502
errorType = 'server_error'
errorCode = 'upstream_error'
}
logger.error('Chat completion error:', { error })
return {
status: statusCode,
body: {
error: {
message: error.message || 'Internal server error',
type: errorType,
code: errorCode
}
}
}
}
logger.error('Chat completion unknown error:', { error })
return {
status: 500,
body: {
error: {
message: 'Internal server error',
type: 'server_error',
code: 'internal_error'
}
}
}
}
/**
* @swagger
* /v1/chat/completions:
@ -60,7 +150,7 @@ const router = express.Router()
* type: integer
* total_tokens:
* type: integer
* text/plain:
* text/event-stream:
* schema:
* type: string
* description: Server-sent events stream (when stream=true)
@ -110,63 +200,22 @@ router.post('/completions', async (req: Request, res: Response) => {
temperature: request.temperature
})
// Validate request
const validation = chatCompletionService.validateRequest(request)
if (!validation.isValid) {
return res.status(400).json({
error: {
message: validation.errors.join('; '),
type: 'invalid_request_error',
code: 'validation_failed'
}
})
}
const isStreaming = !!request.stream
// Validate model ID and get provider
const modelValidation = await validateModelId(request.model)
if (!modelValidation.valid) {
const error = modelValidation.error!
logger.warn(`Model validation failed for '${request.model}':`, error)
return res.status(400).json({
error: {
message: error.message,
type: 'invalid_request_error',
code: error.code
}
})
}
if (isStreaming) {
const { stream } = await chatCompletionService.processStreamingCompletion(request)
const provider = modelValidation.provider!
const modelId = modelValidation.modelId!
logger.info('Model validation successful:', {
provider: provider.id,
providerType: provider.type,
modelId: modelId,
fullModelId: request.model
})
// Create OpenAI client
const client = new OpenAI({
baseURL: provider.apiHost,
apiKey: provider.apiKey
})
request.model = modelId
// Handle streaming
if (request.stream) {
const streamResponse = await client.chat.completions.create(request)
res.setHeader('Content-Type', 'text/plain; charset=utf-8')
res.setHeader('Cache-Control', 'no-cache')
res.setHeader('Content-Type', 'text/event-stream; charset=utf-8')
res.setHeader('Cache-Control', 'no-cache, no-transform')
res.setHeader('Connection', 'keep-alive')
res.setHeader('X-Accel-Buffering', 'no')
res.flushHeaders()
try {
for await (const chunk of streamResponse as any) {
for await (const chunk of stream) {
res.write(`data: ${JSON.stringify(chunk)}\n\n`)
}
res.write('data: [DONE]\n\n')
res.end()
} catch (streamError: any) {
logger.error('Stream error:', streamError)
res.write(
@ -178,47 +227,17 @@ router.post('/completions', async (req: Request, res: Response) => {
}
})}\n\n`
)
} finally {
res.end()
}
return
}
// Handle non-streaming
const response = await client.chat.completions.create(request)
const { response } = await chatCompletionService.processCompletion(request)
return res.json(response)
} catch (error: any) {
logger.error('Chat completion error:', error)
let statusCode = 500
let errorType = 'server_error'
let errorCode = 'internal_error'
let errorMessage = 'Internal server error'
if (error instanceof Error) {
errorMessage = error.message
if (error.message.includes('API key') || error.message.includes('authentication')) {
statusCode = 401
errorType = 'authentication_error'
errorCode = 'invalid_api_key'
} else if (error.message.includes('rate limit') || error.message.includes('quota')) {
statusCode = 429
errorType = 'rate_limit_error'
errorCode = 'rate_limit_exceeded'
} else if (error.message.includes('timeout') || error.message.includes('connection')) {
statusCode = 502
errorType = 'server_error'
errorCode = 'upstream_error'
}
}
return res.status(statusCode).json({
error: {
message: errorMessage,
type: errorType,
code: errorCode
}
})
} catch (error: unknown) {
const { status, body } = mapChatCompletionError(error)
return res.status(status).json(body)
}
})

View File

@ -104,7 +104,7 @@ const router = express.Router()
* type: integer
* output_tokens:
* type: integer
* text/plain:
* text/event-stream:
* schema:
* type: string
* description: Server-sent events stream (when stream=true)
@ -154,18 +154,6 @@ router.post('/', async (req: Request, res: Response) => {
temperature: request.temperature
})
// Validate request
const validation = messagesService.validateRequest(request)
if (!validation.isValid) {
return res.status(400).json({
type: 'error',
error: {
type: 'invalid_request_error',
message: validation.errors.join('; ')
}
})
}
// Validate model ID and get provider
const modelValidation = await validateModelId(request.model)
if (!modelValidation.valid) {
@ -203,18 +191,31 @@ router.post('/', async (req: Request, res: Response) => {
fullModelId: request.model
})
// Validate request
const validation = messagesService.validateRequest(request)
if (!validation.isValid) {
return res.status(400).json({
type: 'error',
error: {
type: 'invalid_request_error',
message: validation.errors.join('; ')
}
})
}
// Handle streaming
if (request.stream) {
res.setHeader('Content-Type', 'text/plain; charset=utf-8')
res.setHeader('Cache-Control', 'no-cache')
res.setHeader('Content-Type', 'text/event-stream; charset=utf-8')
res.setHeader('Cache-Control', 'no-cache, no-transform')
res.setHeader('Connection', 'keep-alive')
res.setHeader('X-Accel-Buffering', 'no')
res.flushHeaders()
try {
for await (const chunk of messagesService.processStreamingMessage(request, provider)) {
res.write(`data: ${JSON.stringify(chunk)}\n\n`)
}
res.write('data: [DONE]\n\n')
res.end()
} catch (streamError: any) {
logger.error('Stream error:', streamError)
res.write(
@ -226,6 +227,7 @@ router.post('/', async (req: Request, res: Response) => {
}
})}\n\n`
)
} finally {
res.end()
}
return
@ -241,9 +243,24 @@ router.post('/', async (req: Request, res: Response) => {
let errorType = 'api_error'
let errorMessage = 'Internal server error'
if (error instanceof Error) {
errorMessage = error.message
const anthropicStatus = typeof error?.status === 'number' ? error.status : undefined
const anthropicError = error?.error
if (anthropicStatus) {
statusCode = anthropicStatus
}
if (anthropicError?.type) {
errorType = anthropicError.type
}
if (anthropicError?.message) {
errorMessage = anthropicError.message
} else if (error instanceof Error && error.message) {
errorMessage = error.message
}
if (!anthropicStatus && error instanceof Error) {
if (error.message.includes('API key') || error.message.includes('authentication')) {
statusCode = 401
errorType = 'authentication_error'
@ -263,7 +280,8 @@ router.post('/', async (req: Request, res: Response) => {
type: 'error',
error: {
type: errorType,
message: errorMessage
message: errorMessage,
requestId: error?.request_id
}
})
}

View File

@ -1,8 +1,9 @@
import { Provider } from '@types'
import OpenAI from 'openai'
import { ChatCompletionCreateParams } from 'openai/resources'
import { ChatCompletionCreateParams, ChatCompletionCreateParamsStreaming } from 'openai/resources'
import { loggerService } from '../../services/LoggerService'
import { getProviderByModel, getRealProviderModel, validateProvider } from '../utils'
import { ModelValidationError, validateModelId } from '../utils'
const logger = loggerService.withContext('ChatCompletionService')
@ -11,19 +12,120 @@ export interface ValidationResult {
errors: string[]
}
export class ChatCompletionValidationError extends Error {
constructor(public readonly errors: string[]) {
super(`Request validation failed: ${errors.join('; ')}`)
this.name = 'ChatCompletionValidationError'
}
}
export class ChatCompletionModelError extends Error {
constructor(public readonly error: ModelValidationError) {
super(`Model validation failed: ${error.message}`)
this.name = 'ChatCompletionModelError'
}
}
export type PrepareRequestResult =
| { status: 'validation_error'; errors: string[] }
| { status: 'model_error'; error: ModelValidationError }
| {
status: 'ok'
provider: Provider
modelId: string
client: OpenAI
providerRequest: ChatCompletionCreateParams
}
export class ChatCompletionService {
async resolveProviderContext(model: string): Promise<
| { ok: false; error: ModelValidationError }
| { ok: true; provider: Provider; modelId: string; client: OpenAI }
> {
const modelValidation = await validateModelId(model)
if (!modelValidation.valid) {
return {
ok: false,
error: modelValidation.error!
}
}
const provider = modelValidation.provider!
if (provider.type !== 'openai') {
return {
ok: false,
error: {
type: 'unsupported_provider_type',
message: `Provider '${provider.id}' of type '${provider.type}' is not supported for OpenAI chat completions`,
code: 'unsupported_provider_type'
}
}
}
const modelId = modelValidation.modelId!
const client = new OpenAI({
baseURL: provider.apiHost,
apiKey: provider.apiKey
})
return {
ok: true,
provider,
modelId,
client
}
}
async prepareRequest(request: ChatCompletionCreateParams, stream: boolean): Promise<PrepareRequestResult> {
const requestValidation = this.validateRequest(request)
if (!requestValidation.isValid) {
return {
status: 'validation_error',
errors: requestValidation.errors
}
}
const providerContext = await this.resolveProviderContext(request.model!)
if (!providerContext.ok) {
return {
status: 'model_error',
error: providerContext.error
}
}
const { provider, modelId, client } = providerContext
logger.info('Model validation successful:', {
provider: provider.id,
providerType: provider.type,
modelId,
fullModelId: request.model
})
return {
status: 'ok',
provider,
modelId,
client,
providerRequest: stream
? {
...request,
model: modelId,
stream: true as const
}
: {
...request,
model: modelId,
stream: false as const
}
}
}
validateRequest(request: ChatCompletionCreateParams): ValidationResult {
const errors: string[] = []
// Validate model
if (!request.model) {
errors.push('Model is required')
} else if (typeof request.model !== 'string') {
errors.push('Model must be a string')
} else if (!request.model.includes(':')) {
errors.push('Model must be in format "provider:model_id"')
}
// Validate messages
if (!request.messages) {
errors.push('Messages array is required')
@ -51,7 +153,11 @@ export class ChatCompletionService {
}
}
async processCompletion(request: ChatCompletionCreateParams): Promise<OpenAI.Chat.Completions.ChatCompletion> {
async processCompletion(request: ChatCompletionCreateParams): Promise<{
provider: Provider
modelId: string
response: OpenAI.Chat.Completions.ChatCompletion
}> {
try {
logger.info('Processing chat completion request:', {
model: request.model,
@ -59,38 +165,16 @@ export class ChatCompletionService {
stream: request.stream
})
// Validate request
const validation = this.validateRequest(request)
if (!validation.isValid) {
throw new Error(`Request validation failed: ${validation.errors.join(', ')}`)
const preparation = await this.prepareRequest(request, false)
if (preparation.status === 'validation_error') {
throw new ChatCompletionValidationError(preparation.errors)
}
// Get provider for the model
const provider = await getProviderByModel(request.model!)
if (!provider) {
throw new Error(`Provider not found for model: ${request.model}`)
if (preparation.status === 'model_error') {
throw new ChatCompletionModelError(preparation.error)
}
// Validate provider
if (!validateProvider(provider)) {
throw new Error(`Provider validation failed for: ${provider.id}`)
}
// Extract model ID from the full model string
const modelId = getRealProviderModel(request.model)
// Create OpenAI client for the provider
const client = new OpenAI({
baseURL: provider.apiHost,
apiKey: provider.apiKey
})
// Prepare request with the actual model ID
const providerRequest = {
...request,
model: modelId,
stream: false
}
const { provider, modelId, client, providerRequest } = preparation
logger.debug('Sending request to provider:', {
provider: provider.id,
@ -101,54 +185,40 @@ export class ChatCompletionService {
const response = (await client.chat.completions.create(providerRequest)) as OpenAI.Chat.Completions.ChatCompletion
logger.info('Successfully processed chat completion')
return response
return {
provider,
modelId,
response
}
} catch (error: any) {
logger.error('Error processing chat completion:', error)
throw error
}
}
async *processStreamingCompletion(
async processStreamingCompletion(
request: ChatCompletionCreateParams
): AsyncIterable<OpenAI.Chat.Completions.ChatCompletionChunk> {
): Promise<{
provider: Provider
modelId: string
stream: AsyncIterable<OpenAI.Chat.Completions.ChatCompletionChunk>
}> {
try {
logger.info('Processing streaming chat completion request:', {
model: request.model,
messageCount: request.messages.length
})
// Validate request
const validation = this.validateRequest(request)
if (!validation.isValid) {
throw new Error(`Request validation failed: ${validation.errors.join(', ')}`)
const preparation = await this.prepareRequest(request, true)
if (preparation.status === 'validation_error') {
throw new ChatCompletionValidationError(preparation.errors)
}
// Get provider for the model
const provider = await getProviderByModel(request.model!)
if (!provider) {
throw new Error(`Provider not found for model: ${request.model}`)
if (preparation.status === 'model_error') {
throw new ChatCompletionModelError(preparation.error)
}
// Validate provider
if (!validateProvider(provider)) {
throw new Error(`Provider validation failed for: ${provider.id}`)
}
// Extract model ID from the full model string
const modelId = getRealProviderModel(request.model)
// Create OpenAI client for the provider
const client = new OpenAI({
baseURL: provider.apiHost,
apiKey: provider.apiKey
})
// Prepare streaming request
const streamingRequest = {
...request,
model: modelId,
stream: true as const
}
const { provider, modelId, client, providerRequest } = preparation
logger.debug('Sending streaming request to provider:', {
provider: provider.id,
@ -156,13 +226,17 @@ export class ChatCompletionService {
apiHost: provider.apiHost
})
const stream = await client.chat.completions.create(streamingRequest)
const streamRequest = providerRequest as ChatCompletionCreateParamsStreaming
const stream = (await client.chat.completions.create(streamRequest)) as AsyncIterable<
OpenAI.Chat.Completions.ChatCompletionChunk
>
for await (const chunk of stream) {
yield chunk
logger.info('Successfully started streaming chat completion')
return {
provider,
modelId,
stream
}
logger.info('Successfully completed streaming chat completion')
} catch (error: any) {
logger.error('Error processing streaming chat completion:', error)
throw error

View File

@ -21,6 +21,14 @@ export class MessagesService {
errors.push('Model is required')
}
if (!request.max_tokens || request.max_tokens < 1) {
errors.push('max_tokens is required and must be at least 1')
}
if (!request.messages || !Array.isArray(request.messages) || request.messages.length === 0) {
errors.push('messages is required and must be a non-empty array')
}
return {
isValid: errors.length === 0,
errors
@ -28,7 +36,6 @@ export class MessagesService {
}
async processMessage(request: MessageCreateParams, provider: Provider): Promise<Message> {
try {
logger.info('Processing Anthropic message request:', {
model: request.model,
messageCount: request.messages.length,
@ -36,12 +43,6 @@ export class MessagesService {
max_tokens: request.max_tokens
})
// Validate request
const validation = this.validateRequest(request)
if (!validation.isValid) {
throw new Error(`Request validation failed: ${validation.errors.join(', ')}`)
}
// Create Anthropic client for the provider
const client = new Anthropic({
baseURL: provider.apiHost,
@ -63,28 +64,17 @@ export class MessagesService {
logger.info('Successfully processed Anthropic message')
return response
} catch (error: any) {
logger.error('Error processing Anthropic message:', error)
throw error
}
}
async *processStreamingMessage(
request: MessageCreateParams,
provider: Provider
): AsyncIterable<RawMessageStreamEvent> {
try {
logger.info('Processing streaming Anthropic message request:', {
model: request.model,
messageCount: request.messages.length
})
// Validate request
const validation = this.validateRequest(request)
if (!validation.isValid) {
throw new Error(`Request validation failed: ${validation.errors.join(', ')}`)
}
// Create Anthropic client for the provider
const client = new Anthropic({
baseURL: provider.apiHost,
@ -109,10 +99,6 @@ export class MessagesService {
}
logger.info('Successfully completed streaming Anthropic message')
} catch (error: any) {
logger.error('Error processing streaming Anthropic message:', error)
throw error
}
}
}

79
tests/apis/chat.http Normal file
View File

@ -0,0 +1,79 @@
@host=http://localhost:23333
@token=cs-sk-af798ed4-7cf5-4fd7-ae4b-df203b164194
@agent_id=agent_1758092281575_tn9dxio9k
### List All Models
GET {{host}}/v1/models
Authorization: Bearer {{token}}
### List Models With Filters
GET {{host}}/v1/models?provider=anthropic&limit=5
Authorization: Bearer {{token}}
### OpenAI Chat Completion
POST {{host}}/v1/chat/completions
Authorization: Bearer {{token}}
Content-Type: application/json
{
"model": "tokenflux:openai/gpt-5-nano",
"messages": [
{
"role": "user",
"content": "Explain the theory of relativity in simple terms."
}
]
}
### OpenAI Chat Completion with streaming
POST {{host}}/v1/chat/completions
Authorization: Bearer {{token}}
Content-Type: application/json
{
"model": "tokenflux:openai/gpt-5-nano",
"stream": true,
"messages": [
{
"role": "user",
"content": "Explain the theory of relativity in simple terms."
}
]
}
### Anthropic Chat Message
POST {{host}}/v1/messages
Authorization: Bearer {{token}}
Content-Type: application/json
{
"model": "anthropic:claude-sonnet-4-20250514",
"stream": false,
"max_tokens": 1024,
"messages": [
{
"role": "user",
"content": "Explain the theory of relativity in simple terms."
}
]
}
### Anthropic Chat Message with streaming
POST {{host}}/v1/messages
Authorization: Bearer {{token}}
Content-Type: application/json
{
"model": "anthropic:claude-sonnet-4-20250514",
"stream": true,
"max_tokens": 1024,
"messages": [
{
"role": "user",
"content": "Explain the theory of relativity in simple terms."
}
]
}