feat(messages): add messages route and service for handling Anthropic messages

This commit is contained in:
Vaayne 2025-09-19 13:58:00 +08:00
parent 7fc676bbc3
commit 38076babcf
4 changed files with 401 additions and 7 deletions

View File

@ -9,6 +9,7 @@ import { setupOpenAPIDocumentation } from './middleware/openapi'
import { agentsRoutes } from './routes/agents'
import { chatRoutes } from './routes/chat'
import { mcpRoutes } from './routes/mcp'
import { messagesRoutes } from './routes/messages'
import { modelsRoutes } from './routes/models'
const logger = loggerService.withContext('ApiServer')
@ -114,6 +115,7 @@ apiRouter.use(express.json())
// Mount routes
apiRouter.use('/chat', chatRoutes)
apiRouter.use('/mcps', mcpRoutes)
apiRouter.use('/messages', messagesRoutes)
apiRouter.use('/models', modelsRoutes)
apiRouter.use('/agents', agentsRoutes)
app.use('/v1', apiRouter)

View File

@ -0,0 +1,272 @@
import { MessageCreateParams } from '@anthropic-ai/sdk/resources'
import express, { Request, Response } from 'express'
import { loggerService } from '../../services/LoggerService'
import { messagesService } from '../services/messages'
import { validateModelId } from '../utils'
const logger = loggerService.withContext('ApiServerMessagesRoutes')
const router = express.Router()
/**
* @swagger
* /v1/messages:
* post:
* summary: Create message
* description: Create a message response using Anthropic's API format
* tags: [Messages]
* requestBody:
* required: true
* content:
* application/json:
* schema:
* type: object
* required:
* - model
* - max_tokens
* - messages
* properties:
* model:
* type: string
* description: Model ID in format "provider:model_id"
* example: "my-anthropic:claude-3-5-sonnet-20241022"
* max_tokens:
* type: integer
* minimum: 1
* description: Maximum number of tokens to generate
* example: 1024
* messages:
* type: array
* items:
* type: object
* properties:
* role:
* type: string
* enum: [user, assistant]
* content:
* oneOf:
* - type: string
* - type: array
* system:
* type: string
* description: System message
* temperature:
* type: number
* minimum: 0
* maximum: 1
* description: Sampling temperature
* top_p:
* type: number
* minimum: 0
* maximum: 1
* description: Nucleus sampling
* top_k:
* type: integer
* minimum: 0
* description: Top-k sampling
* stream:
* type: boolean
* description: Whether to stream the response
* tools:
* type: array
* description: Available tools for the model
* responses:
* 200:
* description: Message response
* content:
* application/json:
* schema:
* type: object
* properties:
* id:
* type: string
* type:
* type: string
* example: message
* role:
* type: string
* example: assistant
* content:
* type: array
* items:
* type: object
* model:
* type: string
* stop_reason:
* type: string
* stop_sequence:
* type: string
* usage:
* type: object
* properties:
* input_tokens:
* type: integer
* output_tokens:
* type: integer
* text/plain:
* schema:
* type: string
* description: Server-sent events stream (when stream=true)
* 400:
* description: Bad request
* content:
* application/json:
* schema:
* type: object
* properties:
* type:
* type: string
* example: error
* error:
* type: object
* properties:
* type:
* type: string
* message:
* type: string
* 401:
* description: Unauthorized
* 429:
* description: Rate limit exceeded
* 500:
* description: Internal server error
*/
router.post('/', async (req: Request, res: Response) => {
try {
const request: MessageCreateParams = req.body
if (!request) {
return res.status(400).json({
type: 'error',
error: {
type: 'invalid_request_error',
message: 'Request body is required'
}
})
}
logger.info('Anthropic message request:', {
model: request.model,
messageCount: request.messages?.length || 0,
stream: request.stream,
max_tokens: request.max_tokens,
temperature: request.temperature
})
// Validate request
const validation = messagesService.validateRequest(request)
if (!validation.isValid) {
return res.status(400).json({
type: 'error',
error: {
type: 'invalid_request_error',
message: validation.errors.join('; ')
}
})
}
// Validate model ID and get provider
const modelValidation = await validateModelId(request.model)
if (!modelValidation.valid) {
const error = modelValidation.error!
logger.warn(`Model validation failed for '${request.model}':`, error)
return res.status(400).json({
type: 'error',
error: {
type: 'invalid_request_error',
message: error.message
}
})
}
const provider = modelValidation.provider!
// Ensure provider is Anthropic type
if (provider.type !== 'anthropic') {
return res.status(400).json({
type: 'error',
error: {
type: 'invalid_request_error',
message: `Invalid provider type '${provider.type}' for messages endpoint. Expected 'anthropic' provider.`
}
})
}
const modelId = modelValidation.modelId!
request.model = modelId
logger.info('Model validation successful:', {
provider: provider.id,
providerType: provider.type,
modelId: modelId,
fullModelId: request.model
})
// Handle streaming
if (request.stream) {
res.setHeader('Content-Type', 'text/plain; charset=utf-8')
res.setHeader('Cache-Control', 'no-cache')
res.setHeader('Connection', 'keep-alive')
try {
for await (const chunk of messagesService.processStreamingMessage(request, provider)) {
res.write(`data: ${JSON.stringify(chunk)}\n\n`)
}
res.write('data: [DONE]\n\n')
res.end()
} catch (streamError: any) {
logger.error('Stream error:', streamError)
res.write(
`data: ${JSON.stringify({
type: 'error',
error: {
type: 'api_error',
message: 'Stream processing error'
}
})}\n\n`
)
res.end()
}
return
}
// Handle non-streaming
const response = await messagesService.processMessage(request, provider)
return res.json(response)
} catch (error: any) {
logger.error('Anthropic message error:', error)
let statusCode = 500
let errorType = 'api_error'
let errorMessage = 'Internal server error'
if (error instanceof Error) {
errorMessage = error.message
if (error.message.includes('API key') || error.message.includes('authentication')) {
statusCode = 401
errorType = 'authentication_error'
} else if (error.message.includes('rate limit') || error.message.includes('quota')) {
statusCode = 429
errorType = 'rate_limit_error'
} else if (error.message.includes('timeout') || error.message.includes('connection')) {
statusCode = 502
errorType = 'api_error'
} else if (error.message.includes('validation') || error.message.includes('invalid')) {
statusCode = 400
errorType = 'invalid_request_error'
}
}
return res.status(statusCode).json({
type: 'error',
error: {
type: errorType,
message: errorMessage
}
})
}
})
export { router as messagesRoutes }

View File

@ -0,0 +1,120 @@
import Anthropic from '@anthropic-ai/sdk'
import { Message, MessageCreateParams, RawMessageStreamEvent } from '@anthropic-ai/sdk/resources'
import { Provider } from '@types'
import { loggerService } from '../../services/LoggerService'
const logger = loggerService.withContext('MessagesService')
export interface ValidationResult {
isValid: boolean
errors: string[]
}
export class MessagesService {
// oxlint-disable-next-line no-unused-vars
validateRequest(request: MessageCreateParams): ValidationResult {
// TODO: Implement comprehensive request validation
const errors: string[] = []
if (!request.model) {
errors.push('Model is required')
}
return {
isValid: errors.length === 0,
errors
}
}
async processMessage(request: MessageCreateParams, provider: Provider): Promise<Message> {
try {
logger.info('Processing Anthropic message request:', {
model: request.model,
messageCount: request.messages.length,
stream: request.stream,
max_tokens: request.max_tokens
})
// Validate request
const validation = this.validateRequest(request)
if (!validation.isValid) {
throw new Error(`Request validation failed: ${validation.errors.join(', ')}`)
}
// Create Anthropic client for the provider
const client = new Anthropic({
baseURL: provider.apiHost,
apiKey: provider.apiKey
})
// Prepare request with the actual model ID
const anthropicRequest: MessageCreateParams = {
...request,
stream: false
}
logger.debug('Sending request to Anthropic provider:', {
provider: provider.id,
apiHost: provider.apiHost
})
const response = await client.messages.create(anthropicRequest)
logger.info('Successfully processed Anthropic message')
return response
} catch (error: any) {
logger.error('Error processing Anthropic message:', error)
throw error
}
}
async *processStreamingMessage(
request: MessageCreateParams,
provider: Provider
): AsyncIterable<RawMessageStreamEvent> {
try {
logger.info('Processing streaming Anthropic message request:', {
model: request.model,
messageCount: request.messages.length
})
// Validate request
const validation = this.validateRequest(request)
if (!validation.isValid) {
throw new Error(`Request validation failed: ${validation.errors.join(', ')}`)
}
// Create Anthropic client for the provider
const client = new Anthropic({
baseURL: provider.apiHost,
apiKey: provider.apiKey
})
// Prepare streaming request
const streamingRequest: MessageCreateParams = {
...request,
stream: true
}
logger.debug('Sending streaming request to Anthropic provider:', {
provider: provider.id,
apiHost: provider.apiHost
})
const stream = client.messages.stream(streamingRequest)
for await (const chunk of stream) {
yield chunk
}
logger.info('Successfully completed streaming Anthropic message')
} catch (error: any) {
logger.error('Error processing streaming Anthropic message:', error)
throw error
}
}
}
// Export singleton instance
export const messagesService = new MessagesService()

View File

@ -23,12 +23,12 @@ export async function getAvailableProviders(): Promise<Provider[]> {
return []
}
// Only support OpenAI type providers for API server
const openAIProviders = providers.filter((p: Provider) => p.enabled && p.type === 'openai')
// Support OpenAI and Anthropic type providers for API server
const supportedProviders = providers.filter((p: Provider) => p.enabled && (p.type === 'openai' || p.type === 'anthropic'))
logger.info(`Filtered to ${openAIProviders.length} OpenAI providers from ${providers.length} total providers`)
logger.info(`Filtered to ${supportedProviders.length} supported providers from ${providers.length} total providers`)
return openAIProviders
return supportedProviders
} catch (error: any) {
logger.error('Failed to get providers from Redux store:', error)
return []
@ -215,10 +215,10 @@ export function validateProvider(provider: Provider): boolean {
return false
}
// Only support OpenAI type providers
if (provider.type !== 'openai') {
// Support OpenAI and Anthropic type providers
if (provider.type !== 'openai' && provider.type !== 'anthropic') {
logger.debug(
`Provider type '${provider.type}' not supported, only 'openai' type is currently supported: ${provider.id}`
`Provider type '${provider.type}' not supported, only 'openai' and 'anthropic' types are currently supported: ${provider.id}`
)
return false
}