diff --git a/src/main/apiServer/routes/chat.ts b/src/main/apiServer/routes/chat.ts index be43d866a4..678d92d115 100644 --- a/src/main/apiServer/routes/chat.ts +++ b/src/main/apiServer/routes/chat.ts @@ -1,15 +1,105 @@ import express, { Request, Response } from 'express' -import OpenAI from 'openai' import { ChatCompletionCreateParams } from 'openai/resources' import { loggerService } from '../../services/LoggerService' -import { chatCompletionService } from '../services/chat-completion' -import { validateModelId } from '../utils' +import { + ChatCompletionModelError, + chatCompletionService, + ChatCompletionValidationError +} from '../services/chat-completion' const logger = loggerService.withContext('ApiServerChatRoutes') const router = express.Router() +interface ErrorResponseBody { + error: { + message: string + type: string + code: string + } +} + +const mapChatCompletionError = (error: unknown): { status: number; body: ErrorResponseBody } => { + if (error instanceof ChatCompletionValidationError) { + logger.warn('Chat completion validation error:', { + errors: error.errors + }) + + return { + status: 400, + body: { + error: { + message: error.errors.join('; '), + type: 'invalid_request_error', + code: 'validation_failed' + } + } + } + } + + if (error instanceof ChatCompletionModelError) { + logger.warn('Chat completion model error:', error.error) + + return { + status: 400, + body: { + error: { + message: error.error.message, + type: 'invalid_request_error', + code: error.error.code + } + } + } + } + + if (error instanceof Error) { + let statusCode = 500 + let errorType = 'server_error' + let errorCode = 'internal_error' + + if (error.message.includes('API key') || error.message.includes('authentication')) { + statusCode = 401 + errorType = 'authentication_error' + errorCode = 'invalid_api_key' + } else if (error.message.includes('rate limit') || error.message.includes('quota')) { + statusCode = 429 + errorType = 'rate_limit_error' + errorCode = 'rate_limit_exceeded' + } else if (error.message.includes('timeout') || error.message.includes('connection')) { + statusCode = 502 + errorType = 'server_error' + errorCode = 'upstream_error' + } + + logger.error('Chat completion error:', { error }) + + return { + status: statusCode, + body: { + error: { + message: error.message || 'Internal server error', + type: errorType, + code: errorCode + } + } + } + } + + logger.error('Chat completion unknown error:', { error }) + + return { + status: 500, + body: { + error: { + message: 'Internal server error', + type: 'server_error', + code: 'internal_error' + } + } + } +} + /** * @swagger * /v1/chat/completions: @@ -60,7 +150,7 @@ const router = express.Router() * type: integer * total_tokens: * type: integer - * text/plain: + * text/event-stream: * schema: * type: string * description: Server-sent events stream (when stream=true) @@ -110,63 +200,22 @@ router.post('/completions', async (req: Request, res: Response) => { temperature: request.temperature }) - // Validate request - const validation = chatCompletionService.validateRequest(request) - if (!validation.isValid) { - return res.status(400).json({ - error: { - message: validation.errors.join('; '), - type: 'invalid_request_error', - code: 'validation_failed' - } - }) - } + const isStreaming = !!request.stream - // Validate model ID and get provider - const modelValidation = await validateModelId(request.model) - if (!modelValidation.valid) { - const error = modelValidation.error! - logger.warn(`Model validation failed for '${request.model}':`, error) - return res.status(400).json({ - error: { - message: error.message, - type: 'invalid_request_error', - code: error.code - } - }) - } + if (isStreaming) { + const { stream } = await chatCompletionService.processStreamingCompletion(request) - const provider = modelValidation.provider! - const modelId = modelValidation.modelId! - - logger.info('Model validation successful:', { - provider: provider.id, - providerType: provider.type, - modelId: modelId, - fullModelId: request.model - }) - - // Create OpenAI client - const client = new OpenAI({ - baseURL: provider.apiHost, - apiKey: provider.apiKey - }) - request.model = modelId - - // Handle streaming - if (request.stream) { - const streamResponse = await client.chat.completions.create(request) - - res.setHeader('Content-Type', 'text/plain; charset=utf-8') - res.setHeader('Cache-Control', 'no-cache') + res.setHeader('Content-Type', 'text/event-stream; charset=utf-8') + res.setHeader('Cache-Control', 'no-cache, no-transform') res.setHeader('Connection', 'keep-alive') + res.setHeader('X-Accel-Buffering', 'no') + res.flushHeaders() try { - for await (const chunk of streamResponse as any) { + for await (const chunk of stream) { res.write(`data: ${JSON.stringify(chunk)}\n\n`) } res.write('data: [DONE]\n\n') - res.end() } catch (streamError: any) { logger.error('Stream error:', streamError) res.write( @@ -178,47 +227,17 @@ router.post('/completions', async (req: Request, res: Response) => { } })}\n\n` ) + } finally { res.end() } return } - // Handle non-streaming - const response = await client.chat.completions.create(request) + const { response } = await chatCompletionService.processCompletion(request) return res.json(response) - } catch (error: any) { - logger.error('Chat completion error:', error) - - let statusCode = 500 - let errorType = 'server_error' - let errorCode = 'internal_error' - let errorMessage = 'Internal server error' - - if (error instanceof Error) { - errorMessage = error.message - - if (error.message.includes('API key') || error.message.includes('authentication')) { - statusCode = 401 - errorType = 'authentication_error' - errorCode = 'invalid_api_key' - } else if (error.message.includes('rate limit') || error.message.includes('quota')) { - statusCode = 429 - errorType = 'rate_limit_error' - errorCode = 'rate_limit_exceeded' - } else if (error.message.includes('timeout') || error.message.includes('connection')) { - statusCode = 502 - errorType = 'server_error' - errorCode = 'upstream_error' - } - } - - return res.status(statusCode).json({ - error: { - message: errorMessage, - type: errorType, - code: errorCode - } - }) + } catch (error: unknown) { + const { status, body } = mapChatCompletionError(error) + return res.status(status).json(body) } }) diff --git a/src/main/apiServer/routes/messages.ts b/src/main/apiServer/routes/messages.ts index fac281215a..faee304e4b 100644 --- a/src/main/apiServer/routes/messages.ts +++ b/src/main/apiServer/routes/messages.ts @@ -104,7 +104,7 @@ const router = express.Router() * type: integer * output_tokens: * type: integer - * text/plain: + * text/event-stream: * schema: * type: string * description: Server-sent events stream (when stream=true) @@ -154,18 +154,6 @@ router.post('/', async (req: Request, res: Response) => { temperature: request.temperature }) - // Validate request - const validation = messagesService.validateRequest(request) - if (!validation.isValid) { - return res.status(400).json({ - type: 'error', - error: { - type: 'invalid_request_error', - message: validation.errors.join('; ') - } - }) - } - // Validate model ID and get provider const modelValidation = await validateModelId(request.model) if (!modelValidation.valid) { @@ -203,18 +191,31 @@ router.post('/', async (req: Request, res: Response) => { fullModelId: request.model }) + // Validate request + const validation = messagesService.validateRequest(request) + if (!validation.isValid) { + return res.status(400).json({ + type: 'error', + error: { + type: 'invalid_request_error', + message: validation.errors.join('; ') + } + }) + } + // Handle streaming if (request.stream) { - res.setHeader('Content-Type', 'text/plain; charset=utf-8') - res.setHeader('Cache-Control', 'no-cache') + res.setHeader('Content-Type', 'text/event-stream; charset=utf-8') + res.setHeader('Cache-Control', 'no-cache, no-transform') res.setHeader('Connection', 'keep-alive') + res.setHeader('X-Accel-Buffering', 'no') + res.flushHeaders() try { for await (const chunk of messagesService.processStreamingMessage(request, provider)) { res.write(`data: ${JSON.stringify(chunk)}\n\n`) } res.write('data: [DONE]\n\n') - res.end() } catch (streamError: any) { logger.error('Stream error:', streamError) res.write( @@ -226,6 +227,7 @@ router.post('/', async (req: Request, res: Response) => { } })}\n\n` ) + } finally { res.end() } return @@ -241,9 +243,24 @@ router.post('/', async (req: Request, res: Response) => { let errorType = 'api_error' let errorMessage = 'Internal server error' - if (error instanceof Error) { - errorMessage = error.message + const anthropicStatus = typeof error?.status === 'number' ? error.status : undefined + const anthropicError = error?.error + if (anthropicStatus) { + statusCode = anthropicStatus + } + + if (anthropicError?.type) { + errorType = anthropicError.type + } + + if (anthropicError?.message) { + errorMessage = anthropicError.message + } else if (error instanceof Error && error.message) { + errorMessage = error.message + } + + if (!anthropicStatus && error instanceof Error) { if (error.message.includes('API key') || error.message.includes('authentication')) { statusCode = 401 errorType = 'authentication_error' @@ -263,7 +280,8 @@ router.post('/', async (req: Request, res: Response) => { type: 'error', error: { type: errorType, - message: errorMessage + message: errorMessage, + requestId: error?.request_id } }) } diff --git a/src/main/apiServer/services/chat-completion.ts b/src/main/apiServer/services/chat-completion.ts index ff03b54455..ba88c32343 100644 --- a/src/main/apiServer/services/chat-completion.ts +++ b/src/main/apiServer/services/chat-completion.ts @@ -1,8 +1,9 @@ +import { Provider } from '@types' import OpenAI from 'openai' -import { ChatCompletionCreateParams } from 'openai/resources' +import { ChatCompletionCreateParams, ChatCompletionCreateParamsStreaming } from 'openai/resources' import { loggerService } from '../../services/LoggerService' -import { getProviderByModel, getRealProviderModel, validateProvider } from '../utils' +import { ModelValidationError, validateModelId } from '../utils' const logger = loggerService.withContext('ChatCompletionService') @@ -11,19 +12,120 @@ export interface ValidationResult { errors: string[] } +export class ChatCompletionValidationError extends Error { + constructor(public readonly errors: string[]) { + super(`Request validation failed: ${errors.join('; ')}`) + this.name = 'ChatCompletionValidationError' + } +} + +export class ChatCompletionModelError extends Error { + constructor(public readonly error: ModelValidationError) { + super(`Model validation failed: ${error.message}`) + this.name = 'ChatCompletionModelError' + } +} + +export type PrepareRequestResult = + | { status: 'validation_error'; errors: string[] } + | { status: 'model_error'; error: ModelValidationError } + | { + status: 'ok' + provider: Provider + modelId: string + client: OpenAI + providerRequest: ChatCompletionCreateParams + } + export class ChatCompletionService { + async resolveProviderContext(model: string): Promise< + | { ok: false; error: ModelValidationError } + | { ok: true; provider: Provider; modelId: string; client: OpenAI } + > { + const modelValidation = await validateModelId(model) + if (!modelValidation.valid) { + return { + ok: false, + error: modelValidation.error! + } + } + + const provider = modelValidation.provider! + + if (provider.type !== 'openai') { + return { + ok: false, + error: { + type: 'unsupported_provider_type', + message: `Provider '${provider.id}' of type '${provider.type}' is not supported for OpenAI chat completions`, + code: 'unsupported_provider_type' + } + } + } + + const modelId = modelValidation.modelId! + + const client = new OpenAI({ + baseURL: provider.apiHost, + apiKey: provider.apiKey + }) + + return { + ok: true, + provider, + modelId, + client + } + } + + async prepareRequest(request: ChatCompletionCreateParams, stream: boolean): Promise { + const requestValidation = this.validateRequest(request) + if (!requestValidation.isValid) { + return { + status: 'validation_error', + errors: requestValidation.errors + } + } + + const providerContext = await this.resolveProviderContext(request.model!) + if (!providerContext.ok) { + return { + status: 'model_error', + error: providerContext.error + } + } + + const { provider, modelId, client } = providerContext + + logger.info('Model validation successful:', { + provider: provider.id, + providerType: provider.type, + modelId, + fullModelId: request.model + }) + + return { + status: 'ok', + provider, + modelId, + client, + providerRequest: stream + ? { + ...request, + model: modelId, + stream: true as const + } + : { + ...request, + model: modelId, + stream: false as const + } + } + } + validateRequest(request: ChatCompletionCreateParams): ValidationResult { const errors: string[] = [] - // Validate model - if (!request.model) { - errors.push('Model is required') - } else if (typeof request.model !== 'string') { - errors.push('Model must be a string') - } else if (!request.model.includes(':')) { - errors.push('Model must be in format "provider:model_id"') - } - // Validate messages if (!request.messages) { errors.push('Messages array is required') @@ -51,7 +153,11 @@ export class ChatCompletionService { } } - async processCompletion(request: ChatCompletionCreateParams): Promise { + async processCompletion(request: ChatCompletionCreateParams): Promise<{ + provider: Provider + modelId: string + response: OpenAI.Chat.Completions.ChatCompletion + }> { try { logger.info('Processing chat completion request:', { model: request.model, @@ -59,38 +165,16 @@ export class ChatCompletionService { stream: request.stream }) - // Validate request - const validation = this.validateRequest(request) - if (!validation.isValid) { - throw new Error(`Request validation failed: ${validation.errors.join(', ')}`) + const preparation = await this.prepareRequest(request, false) + if (preparation.status === 'validation_error') { + throw new ChatCompletionValidationError(preparation.errors) } - // Get provider for the model - const provider = await getProviderByModel(request.model!) - if (!provider) { - throw new Error(`Provider not found for model: ${request.model}`) + if (preparation.status === 'model_error') { + throw new ChatCompletionModelError(preparation.error) } - // Validate provider - if (!validateProvider(provider)) { - throw new Error(`Provider validation failed for: ${provider.id}`) - } - - // Extract model ID from the full model string - const modelId = getRealProviderModel(request.model) - - // Create OpenAI client for the provider - const client = new OpenAI({ - baseURL: provider.apiHost, - apiKey: provider.apiKey - }) - - // Prepare request with the actual model ID - const providerRequest = { - ...request, - model: modelId, - stream: false - } + const { provider, modelId, client, providerRequest } = preparation logger.debug('Sending request to provider:', { provider: provider.id, @@ -101,54 +185,40 @@ export class ChatCompletionService { const response = (await client.chat.completions.create(providerRequest)) as OpenAI.Chat.Completions.ChatCompletion logger.info('Successfully processed chat completion') - return response + return { + provider, + modelId, + response + } } catch (error: any) { logger.error('Error processing chat completion:', error) throw error } } - async *processStreamingCompletion( + async processStreamingCompletion( request: ChatCompletionCreateParams - ): AsyncIterable { + ): Promise<{ + provider: Provider + modelId: string + stream: AsyncIterable + }> { try { logger.info('Processing streaming chat completion request:', { model: request.model, messageCount: request.messages.length }) - // Validate request - const validation = this.validateRequest(request) - if (!validation.isValid) { - throw new Error(`Request validation failed: ${validation.errors.join(', ')}`) + const preparation = await this.prepareRequest(request, true) + if (preparation.status === 'validation_error') { + throw new ChatCompletionValidationError(preparation.errors) } - // Get provider for the model - const provider = await getProviderByModel(request.model!) - if (!provider) { - throw new Error(`Provider not found for model: ${request.model}`) + if (preparation.status === 'model_error') { + throw new ChatCompletionModelError(preparation.error) } - // Validate provider - if (!validateProvider(provider)) { - throw new Error(`Provider validation failed for: ${provider.id}`) - } - - // Extract model ID from the full model string - const modelId = getRealProviderModel(request.model) - - // Create OpenAI client for the provider - const client = new OpenAI({ - baseURL: provider.apiHost, - apiKey: provider.apiKey - }) - - // Prepare streaming request - const streamingRequest = { - ...request, - model: modelId, - stream: true as const - } + const { provider, modelId, client, providerRequest } = preparation logger.debug('Sending streaming request to provider:', { provider: provider.id, @@ -156,13 +226,17 @@ export class ChatCompletionService { apiHost: provider.apiHost }) - const stream = await client.chat.completions.create(streamingRequest) + const streamRequest = providerRequest as ChatCompletionCreateParamsStreaming + const stream = (await client.chat.completions.create(streamRequest)) as AsyncIterable< + OpenAI.Chat.Completions.ChatCompletionChunk + > - for await (const chunk of stream) { - yield chunk + logger.info('Successfully started streaming chat completion') + return { + provider, + modelId, + stream } - - logger.info('Successfully completed streaming chat completion') } catch (error: any) { logger.error('Error processing streaming chat completion:', error) throw error diff --git a/src/main/apiServer/services/messages.ts b/src/main/apiServer/services/messages.ts index a845218802..f3ee894ebb 100644 --- a/src/main/apiServer/services/messages.ts +++ b/src/main/apiServer/services/messages.ts @@ -21,6 +21,14 @@ export class MessagesService { errors.push('Model is required') } + if (!request.max_tokens || request.max_tokens < 1) { + errors.push('max_tokens is required and must be at least 1') + } + + if (!request.messages || !Array.isArray(request.messages) || request.messages.length === 0) { + errors.push('messages is required and must be a non-empty array') + } + return { isValid: errors.length === 0, errors @@ -28,91 +36,69 @@ export class MessagesService { } async processMessage(request: MessageCreateParams, provider: Provider): Promise { - try { - logger.info('Processing Anthropic message request:', { - model: request.model, - messageCount: request.messages.length, - stream: request.stream, - max_tokens: request.max_tokens - }) + logger.info('Processing Anthropic message request:', { + model: request.model, + messageCount: request.messages.length, + stream: request.stream, + max_tokens: request.max_tokens + }) - // Validate request - const validation = this.validateRequest(request) - if (!validation.isValid) { - throw new Error(`Request validation failed: ${validation.errors.join(', ')}`) - } + // Create Anthropic client for the provider + const client = new Anthropic({ + baseURL: provider.apiHost, + apiKey: provider.apiKey + }) - // Create Anthropic client for the provider - const client = new Anthropic({ - baseURL: provider.apiHost, - apiKey: provider.apiKey - }) - - // Prepare request with the actual model ID - const anthropicRequest: MessageCreateParams = { - ...request, - stream: false - } - - logger.debug('Sending request to Anthropic provider:', { - provider: provider.id, - apiHost: provider.apiHost - }) - - const response = await client.messages.create(anthropicRequest) - - logger.info('Successfully processed Anthropic message') - return response - } catch (error: any) { - logger.error('Error processing Anthropic message:', error) - throw error + // Prepare request with the actual model ID + const anthropicRequest: MessageCreateParams = { + ...request, + stream: false } + + logger.debug('Sending request to Anthropic provider:', { + provider: provider.id, + apiHost: provider.apiHost + }) + + const response = await client.messages.create(anthropicRequest) + + logger.info('Successfully processed Anthropic message') + return response } async *processStreamingMessage( request: MessageCreateParams, provider: Provider ): AsyncIterable { - try { - logger.info('Processing streaming Anthropic message request:', { - model: request.model, - messageCount: request.messages.length - }) + logger.info('Processing streaming Anthropic message request:', { + model: request.model, + messageCount: request.messages.length + }) - // Validate request - const validation = this.validateRequest(request) - if (!validation.isValid) { - throw new Error(`Request validation failed: ${validation.errors.join(', ')}`) - } + // Create Anthropic client for the provider + const client = new Anthropic({ + baseURL: provider.apiHost, + apiKey: provider.apiKey + }) - // Create Anthropic client for the provider - const client = new Anthropic({ - baseURL: provider.apiHost, - apiKey: provider.apiKey - }) - - // Prepare streaming request - const streamingRequest: MessageCreateParams = { - ...request, - stream: true - } - - logger.debug('Sending streaming request to Anthropic provider:', { - provider: provider.id, - apiHost: provider.apiHost - }) - - const stream = client.messages.stream(streamingRequest) - - for await (const chunk of stream) { - yield chunk - } - - logger.info('Successfully completed streaming Anthropic message') - } catch (error: any) { - logger.error('Error processing streaming Anthropic message:', error) - throw error + // Prepare streaming request + const streamingRequest: MessageCreateParams = { + ...request, + stream: true } + + logger.debug('Sending streaming request to Anthropic provider:', { + provider: provider.id, + apiHost: provider.apiHost + }) + + const stream = client.messages.stream(streamingRequest) + + for await (const chunk of stream) { + yield chunk + } + + logger.info('Successfully completed streaming Anthropic message') } } diff --git a/tests/apis/chat.http b/tests/apis/chat.http new file mode 100644 index 0000000000..3025bb3b9d --- /dev/null +++ b/tests/apis/chat.http @@ -0,0 +1,79 @@ +@host=http://localhost:23333 +@token=cs-sk-af798ed4-7cf5-4fd7-ae4b-df203b164194 +@agent_id=agent_1758092281575_tn9dxio9k + + +### List All Models +GET {{host}}/v1/models +Authorization: Bearer {{token}} + + +### List Models With Filters +GET {{host}}/v1/models?provider=anthropic&limit=5 +Authorization: Bearer {{token}} + + +### OpenAI Chat Completion +POST {{host}}/v1/chat/completions +Authorization: Bearer {{token}} +Content-Type: application/json + +{ + "model": "tokenflux:openai/gpt-5-nano", + "messages": [ + { + "role": "user", + "content": "Explain the theory of relativity in simple terms." + } + ] +} + +### OpenAI Chat Completion with streaming +POST {{host}}/v1/chat/completions +Authorization: Bearer {{token}} +Content-Type: application/json + +{ + "model": "tokenflux:openai/gpt-5-nano", + "stream": true, + "messages": [ + { + "role": "user", + "content": "Explain the theory of relativity in simple terms." + } + ] +} + +### Anthropic Chat Message +POST {{host}}/v1/messages +Authorization: Bearer {{token}} +Content-Type: application/json + +{ + "model": "anthropic:claude-sonnet-4-20250514", + "stream": false, + "max_tokens": 1024, + "messages": [ + { + "role": "user", + "content": "Explain the theory of relativity in simple terms." + } + ] +} + +### Anthropic Chat Message with streaming +POST {{host}}/v1/messages +Authorization: Bearer {{token}} +Content-Type: application/json + +{ + "model": "anthropic:claude-sonnet-4-20250514", + "stream": true, + "max_tokens": 1024, + "messages": [ + { + "role": "user", + "content": "Explain the theory of relativity in simple terms." + } + ] +}