♻️ refactor: consolidate Claude Code system message handling and streaming logic

- Unify buildClaudeCodeSystemMessage implementation in shared package
- Refactor MessagesService to provide comprehensive message processing API
- Extract streaming logic, error handling, and header preparation into service methods
- Remove duplicate anthropic config from renderer, use shared implementation
- Update ClaudeCodeService to use append mode for custom instructions
- Improve type safety and request validation in message processing
This commit is contained in:
Vaayne 2025-09-30 23:20:01 +08:00
parent 142ad9e41e
commit 538291c03f
6 changed files with 361 additions and 276 deletions

View File

@ -12,8 +12,19 @@ import Anthropic from '@anthropic-ai/sdk'
import { TextBlockParam } from '@anthropic-ai/sdk/resources'
import { loggerService } from '@logger'
import { Provider } from '@types'
import type { ModelMessage } from 'ai'
const logger = loggerService.withContext('anthropic-sdk')
const defaultClaudeCodeSystemPrompt = `You are Claude Code, Anthropic's official CLI for Claude.`
const defaultClaudeCodeSystem: Array<TextBlockParam> = [
{
type: 'text',
text: defaultClaudeCodeSystemPrompt
}
]
/**
* Creates and configures an Anthropic SDK client based on the provider configuration.
*
@ -44,7 +55,11 @@ const logger = loggerService.withContext('anthropic-sdk')
* const apiKeyClient = getSdkClient(apiKeyProvider);
* ```
*/
export function getSdkClient(provider: Provider, oauthToken?: string | null): Anthropic {
export function getSdkClient(
provider: Provider,
oauthToken?: string | null,
extraHeaders?: Record<string, string | string[]>
): Anthropic {
if (provider.authType === 'oauth') {
if (!oauthToken) {
throw new Error('OAuth token is not available')
@ -68,7 +83,8 @@ export function getSdkClient(provider: Provider, oauthToken?: string | null): An
'x-stainless-os': 'MacOS',
'x-stainless-arch': 'arm64',
'x-stainless-runtime': 'node',
'x-stainless-runtime-version': 'v22.18.0'
'x-stainless-runtime-version': 'v22.18.0',
...extraHeaders
}
})
}
@ -87,7 +103,8 @@ export function getSdkClient(provider: Provider, oauthToken?: string | null): An
defaultHeaders: {
'anthropic-beta': 'output-128k-2025-02-19',
'APP-Code': 'MLTG2087',
...provider.extra_headers
...provider.extra_headers,
...extraHeaders
}
})
}
@ -118,53 +135,36 @@ export function getSdkClient(provider: Provider, oauthToken?: string | null): An
* @param system - Optional user-provided system message (string or TextBlockParam array)
* @returns Combined system message with Claude Code prompt prepended
*
* @example
* ```typescript
* // No system message
* const result1 = buildClaudeCodeSystemMessage();
* // Returns: "You are Claude Code, Anthropic's official CLI for Claude."
*
* // String system message
* const result2 = buildClaudeCodeSystemMessage("You are a helpful assistant.");
* // Returns: [
* // { type: 'text', text: "You are Claude Code, Anthropic's official CLI for Claude." },
* // { type: 'text', text: "You are a helpful assistant." }
* // ]
*
* // Array system message
* const systemArray = [{ type: 'text', text: 'Custom instructions' }];
* const result3 = buildClaudeCodeSystemMessage(systemArray);
* // Returns: Array with Claude Code message prepended
* ```
*/
export function buildClaudeCodeSystemMessage(system?: string | Array<TextBlockParam>): string | Array<TextBlockParam> {
const defaultClaudeCodeSystem = `You are Claude Code, Anthropic's official CLI for Claude.`
export function buildClaudeCodeSystemMessage(system?: string | Array<TextBlockParam>): Array<TextBlockParam> {
if (!system) {
return defaultClaudeCodeSystem
}
if (typeof system === 'string') {
if (system.trim() === defaultClaudeCodeSystem) {
return system
if (system.trim() === defaultClaudeCodeSystemPrompt || system.trim() === '') {
return defaultClaudeCodeSystem
} else {
return [...defaultClaudeCodeSystem, { type: 'text', text: system }]
}
}
if (Array.isArray(system)) {
const firstSystem = system[0]
if (firstSystem.type === 'text' && firstSystem.text.trim() === defaultClaudeCodeSystemPrompt) {
return system
} else {
return [...defaultClaudeCodeSystem, ...system]
}
return [
{
type: 'text',
text: defaultClaudeCodeSystem
},
{
type: 'text',
text: system
}
]
}
if (system[0].text.trim() != defaultClaudeCodeSystem) {
system.unshift({
type: 'text',
text: defaultClaudeCodeSystem
})
}
return system
return defaultClaudeCodeSystem
}
export function buildClaudeCodeSystemModelMessage(system?: string | Array<TextBlockParam>): Array<ModelMessage> {
const textBlocks = buildClaudeCodeSystemMessage(system)
return textBlocks.map((block) => ({
role: 'system',
content: block.text
}))
}

View File

@ -1,9 +1,9 @@
import { MessageCreateParams } from '@anthropic-ai/sdk/resources'
import { loggerService } from '@logger'
import { Provider } from '@types'
import express, { Request, Response } from 'express'
import { Provider } from '../../../renderer/src/types/provider'
import { MessagesService, messagesService } from '../services/messages'
import { messagesService } from '../services/messages'
import { getProviderById, validateModelId } from '../utils'
const logger = loggerService.withContext('ApiServerMessagesRoutes')
@ -11,7 +11,7 @@ const logger = loggerService.withContext('ApiServerMessagesRoutes')
const router = express.Router()
const providerRouter = express.Router({ mergeParams: true })
// Helper functions for shared logic
// Helper function for basic request validation
async function validateRequestBody(req: Request): Promise<{ valid: boolean; error?: any }> {
const request: MessageCreateParams = req.body
@ -31,157 +31,53 @@ async function validateRequestBody(req: Request): Promise<{ valid: boolean; erro
return { valid: true }
}
async function handleStreamingResponse(
res: Response,
request: MessageCreateParams,
provider: Provider,
messagesService: MessagesService
): Promise<void> {
res.setHeader('Content-Type', 'text/event-stream; charset=utf-8')
res.setHeader('Cache-Control', 'no-cache, no-transform')
res.setHeader('Connection', 'keep-alive')
res.setHeader('X-Accel-Buffering', 'no')
res.flushHeaders()
const flushableResponse = res as Response & { flush?: () => void }
const flushStream = () => {
if (typeof flushableResponse.flush !== 'function') {
return
}
try {
flushableResponse.flush()
} catch (flushError: unknown) {
logger.warn('Failed to flush streaming response', {
error: flushError
})
}
}
try {
for await (const chunk of messagesService.processStreamingMessage(request, provider)) {
res.write(`event: ${chunk.type}\n`)
res.write(`data: ${JSON.stringify(chunk)}\n\n`)
flushStream()
}
res.write('data: [DONE]\n\n')
flushStream()
} catch (streamError: any) {
logger.error('Stream error', {
error: streamError,
provider: provider.id,
model: request.model,
apiHost: provider.apiHost,
anthropicApiHost: provider.anthropicApiHost
})
res.write(
`data: ${JSON.stringify({
type: 'error',
error: {
type: 'api_error',
message: 'Stream processing error'
}
})}\n\n`
)
} finally {
res.end()
}
}
function handleErrorResponse(res: Response, error: any): Response {
logger.error('Message processing error', { error })
let statusCode = 500
let errorType = 'api_error'
let errorMessage = 'Internal server error'
const anthropicStatus = typeof error?.status === 'number' ? error.status : undefined
const anthropicError = error?.error
if (anthropicStatus) {
statusCode = anthropicStatus
}
if (anthropicError?.type) {
errorType = anthropicError.type
}
if (anthropicError?.message) {
errorMessage = anthropicError.message
} else if (error instanceof Error && error.message) {
errorMessage = error.message
}
if (!anthropicStatus && error instanceof Error) {
if (error.message.includes('API key') || error.message.includes('authentication')) {
statusCode = 401
errorType = 'authentication_error'
} else if (error.message.includes('rate limit') || error.message.includes('quota')) {
statusCode = 429
errorType = 'rate_limit_error'
} else if (error.message.includes('timeout') || error.message.includes('connection')) {
statusCode = 502
errorType = 'api_error'
} else if (error.message.includes('validation') || error.message.includes('invalid')) {
statusCode = 400
errorType = 'invalid_request_error'
}
}
return res.status(statusCode).json({
type: 'error',
error: {
type: errorType,
message: errorMessage,
requestId: error?.request_id
}
})
}
async function processMessageRequest(
req: Request,
res: Response,
provider: Provider,
interface HandleMessageProcessingOptions {
req: Request
res: Response
provider: Provider
request: MessageCreateParams
modelId?: string
): Promise<Response | void> {
}
async function handleMessageProcessing({
req,
res,
provider,
request,
modelId
}: HandleMessageProcessingOptions): Promise<void> {
try {
const request: MessageCreateParams = req.body
// Use provided modelId or keep original model
if (modelId) {
request.model = modelId
}
// Validate request
const validation = messagesService.validateRequest(request)
if (!validation.isValid) {
return res.status(400).json({
res.status(400).json({
type: 'error',
error: {
type: 'invalid_request_error',
message: validation.errors.join('; ')
}
})
}
logger.info('Processing anthropic messages request', {
provider: provider.id,
apiHost: provider.apiHost,
anthropicApiHost: provider.anthropicApiHost,
model: request.model,
stream: request.stream,
thinking: request.thinking
})
// Handle streaming
if (request.stream) {
await handleStreamingResponse(res, request, provider, messagesService)
return
}
// Handle non-streaming
const response = await messagesService.processMessage(request, provider)
return res.json(response)
const extraHeaders = messagesService.prepareHeaders(req.headers)
const { client, anthropicRequest } = await messagesService.processMessage({
provider,
request,
extraHeaders,
modelId
})
if (request.stream) {
await messagesService.handleStreaming(client, anthropicRequest, { response: res }, provider)
return
}
const response = await client.messages.create(anthropicRequest)
res.json(response)
} catch (error: any) {
return handleErrorResponse(res, error)
logger.error('Message processing error', { error })
const { statusCode, errorResponse } = messagesService.transformError(error)
res.status(statusCode).json(errorResponse)
}
}
@ -338,10 +234,11 @@ router.post('/', async (req: Request, res: Response) => {
const provider = modelValidation.provider!
const modelId = modelValidation.modelId!
// Use shared processing function
return await processMessageRequest(req, res, provider, modelId)
return handleMessageProcessing({ req, res, provider, request, modelId })
} catch (error: any) {
return handleErrorResponse(res, error)
logger.error('Message processing error', { error })
const { statusCode, errorResponse } = messagesService.transformError(error)
return res.status(statusCode).json(errorResponse)
}
})
@ -493,10 +390,13 @@ providerRouter.post('/', async (req: Request, res: Response) => {
})
}
// Use shared processing function (no modelId override needed)
return await processMessageRequest(req, res, provider)
const request: MessageCreateParams = req.body
return handleMessageProcessing({ req, res, provider, request })
} catch (error: any) {
return handleErrorResponse(res, error)
logger.error('Message processing error', { error })
const { statusCode, errorResponse } = messagesService.transformError(error)
return res.status(statusCode).json(errorResponse)
}
})

View File

@ -1,33 +1,93 @@
import Anthropic from '@anthropic-ai/sdk'
import { Message, MessageCreateParams, RawMessageStreamEvent } from '@anthropic-ai/sdk/resources'
import { MessageCreateParams, MessageStreamEvent } from '@anthropic-ai/sdk/resources'
import { loggerService } from '@logger'
import anthropicService from '@main/services/AnthropicService'
import { buildClaudeCodeSystemMessage, getSdkClient } from '@shared/anthropic'
import { Provider } from '@types'
import { Response } from 'express'
const logger = loggerService.withContext('MessagesService')
const EXCLUDED_FORWARD_HEADERS: ReadonlySet<string> = new Set([
'host',
'x-api-key',
'authorization',
'sentry-trace',
'baggage',
'content-length',
'connection'
])
export interface ValidationResult {
isValid: boolean
errors: string[]
}
export interface ErrorResponse {
type: 'error'
error: {
type: string
message: string
requestId?: string
}
}
export interface StreamConfig {
response: Response
onChunk?: (chunk: MessageStreamEvent) => void
onError?: (error: any) => void
onComplete?: () => void
}
export interface ProcessMessageOptions {
provider: Provider
request: MessageCreateParams
extraHeaders?: Record<string, string | string[]>
modelId?: string
}
export interface ProcessMessageResult {
client: Anthropic
anthropicRequest: MessageCreateParams
}
export class MessagesService {
// oxlint-disable-next-line no-unused-vars
validateRequest(request: MessageCreateParams): ValidationResult {
// TODO: Implement comprehensive request validation
const errors: string[] = []
if (!request.model) {
if (!request.model || typeof request.model !== 'string') {
errors.push('Model is required')
}
if (!request.max_tokens || request.max_tokens < 1) {
errors.push('max_tokens is required and must be at least 1')
if (typeof request.max_tokens !== 'number' || !Number.isFinite(request.max_tokens) || request.max_tokens < 1) {
errors.push('max_tokens is required and must be a positive number')
}
if (!request.messages || !Array.isArray(request.messages) || request.messages.length === 0) {
errors.push('messages is required and must be a non-empty array')
} else {
request.messages.forEach((message, index) => {
if (!message || typeof message !== 'object') {
errors.push(`messages[${index}] must be an object`)
return
}
if (!('role' in message) || typeof message.role !== 'string' || message.role.trim().length === 0) {
errors.push(`messages[${index}].role is required`)
}
const content: unknown = message.content
if (content === undefined || content === null) {
errors.push(`messages[${index}].content is required`)
return
}
if (typeof content === 'string' && content.trim().length === 0) {
errors.push(`messages[${index}].content cannot be empty`)
} else if (Array.isArray(content) && content.length === 0) {
errors.push(`messages[${index}].content must include at least one item when using an array`)
}
})
}
return {
@ -36,79 +96,224 @@ export class MessagesService {
}
}
async getClient(provider: Provider): Promise<Anthropic> {
async getClient(provider: Provider, extraHeaders?: Record<string, string | string[]>): Promise<Anthropic> {
// Create Anthropic client for the provider
if (provider.authType === 'oauth') {
const oauthToken = await anthropicService.getValidAccessToken()
return getSdkClient(provider, oauthToken)
return getSdkClient(provider, oauthToken, extraHeaders)
}
return getSdkClient(provider)
return getSdkClient(provider, null, extraHeaders)
}
async processMessage(request: MessageCreateParams, provider: Provider): Promise<Message> {
logger.debug('Preparing Anthropic message request', {
model: request.model,
messageCount: request.messages.length,
stream: request.stream,
maxTokens: request.max_tokens,
provider: provider.id
})
prepareHeaders(headers: Record<string, string | string[] | undefined>): Record<string, string | string[]> {
const extraHeaders: Record<string, string | string[]> = {}
// Create Anthropic client for the provider
const client = await this.getClient(provider)
for (const [key, value] of Object.entries(headers)) {
if (value === undefined) {
continue
}
// Prepare request with the actual model ID
const normalizedKey = key.toLowerCase()
if (EXCLUDED_FORWARD_HEADERS.has(normalizedKey)) {
continue
}
extraHeaders[normalizedKey] = value
}
return extraHeaders
}
createAnthropicRequest(request: MessageCreateParams, provider: Provider, modelId?: string): MessageCreateParams {
const anthropicRequest: MessageCreateParams = {
...request,
stream: false
stream: !!request.stream
}
if (provider.authType === 'oauth') {
anthropicRequest.system = buildClaudeCodeSystemMessage(request.system || '')
// Override model if provided
if (modelId) {
anthropicRequest.model = modelId
}
const response = await client.messages.create(anthropicRequest)
// Add Claude Code system message for OAuth providers
if (provider.type === 'anthropic' && provider.authType === 'oauth') {
anthropicRequest.system = buildClaudeCodeSystemMessage(request.system)
}
logger.info('Anthropic message completed', {
model: request.model,
provider: provider.id
})
return response
return anthropicRequest
}
async *processStreamingMessage(
async handleStreaming(
client: Anthropic,
request: MessageCreateParams,
config: StreamConfig,
provider: Provider
): AsyncIterable<RawMessageStreamEvent> {
logger.debug('Preparing streaming Anthropic message request', {
model: request.model,
messageCount: request.messages.length,
provider: provider.id
): Promise<void> {
const { response, onChunk, onError, onComplete } = config
// Set streaming headers
response.setHeader('Content-Type', 'text/event-stream; charset=utf-8')
response.setHeader('Cache-Control', 'no-cache, no-transform')
response.setHeader('Connection', 'keep-alive')
response.setHeader('X-Accel-Buffering', 'no')
response.flushHeaders()
const flushableResponse = response as Response & { flush?: () => void }
const flushStream = () => {
if (typeof flushableResponse.flush !== 'function') {
return
}
try {
flushableResponse.flush()
} catch (flushError: unknown) {
logger.warn('Failed to flush streaming response', { error: flushError })
}
}
const writeSse = (eventType: string | undefined, payload: unknown) => {
if (response.writableEnded || response.destroyed) {
return
}
if (eventType) {
response.write(`event: ${eventType}\n`)
}
const data = typeof payload === 'string' ? payload : JSON.stringify(payload)
response.write(`data: ${data}\n\n`)
flushStream()
}
try {
const stream = client.messages.stream(request)
for await (const chunk of stream) {
if (response.writableEnded || response.destroyed) {
logger.warn('Streaming response ended before stream completion', {
provider: provider.id,
model: request.model
})
break
}
writeSse(chunk.type, chunk)
if (onChunk) {
onChunk(chunk)
}
}
writeSse(undefined, '[DONE]')
if (onComplete) {
onComplete()
}
} catch (streamError: any) {
logger.error('Stream error', {
error: streamError,
provider: provider.id,
model: request.model,
apiHost: provider.apiHost,
anthropicApiHost: provider.anthropicApiHost
})
writeSse(undefined, {
type: 'error',
error: {
type: 'api_error',
message: 'Stream processing error'
}
})
if (onError) {
onError(streamError)
}
} finally {
if (!response.writableEnded) {
response.end()
}
}
}
transformError(error: any): { statusCode: number; errorResponse: ErrorResponse } {
let statusCode = 500
let errorType = 'api_error'
let errorMessage = 'Internal server error'
const anthropicStatus = typeof error?.status === 'number' ? error.status : undefined
const anthropicError = error?.error
if (anthropicStatus) {
statusCode = anthropicStatus
}
if (anthropicError?.type) {
errorType = anthropicError.type
}
if (anthropicError?.message) {
errorMessage = anthropicError.message
} else if (error instanceof Error && error.message) {
errorMessage = error.message
}
// Infer error type from message if not from Anthropic API
if (!anthropicStatus && error instanceof Error) {
const errorMessageText = error.message ?? ''
if (errorMessageText.includes('API key') || errorMessageText.includes('authentication')) {
statusCode = 401
errorType = 'authentication_error'
} else if (errorMessageText.includes('rate limit') || errorMessageText.includes('quota')) {
statusCode = 429
errorType = 'rate_limit_error'
} else if (errorMessageText.includes('timeout') || errorMessageText.includes('connection')) {
statusCode = 502
errorType = 'api_error'
} else if (errorMessageText.includes('validation') || errorMessageText.includes('invalid')) {
statusCode = 400
errorType = 'invalid_request_error'
}
}
const safeErrorMessage =
typeof errorMessage === 'string' && errorMessage.length > 0 ? errorMessage : 'Internal server error'
return {
statusCode,
errorResponse: {
type: 'error',
error: {
type: errorType,
message: safeErrorMessage,
requestId: error?.request_id
}
}
}
}
async processMessage(options: ProcessMessageOptions): Promise<ProcessMessageResult> {
const { provider, request, extraHeaders, modelId } = options
const client = await this.getClient(provider, extraHeaders)
const anthropicRequest = this.createAnthropicRequest(request, provider, modelId)
const messageCount = Array.isArray(request.messages) ? request.messages.length : 0
logger.info('Processing anthropic messages request', {
provider: provider.id,
apiHost: provider.apiHost,
anthropicApiHost: provider.anthropicApiHost,
model: anthropicRequest.model,
stream: !!anthropicRequest.stream,
// systemPrompt: JSON.stringify(!!request.system),
// messages: JSON.stringify(request.messages),
messageCount,
toolCount: Array.isArray(request.tools) ? request.tools.length : 0
})
// Create Anthropic client for the provider
const client = await this.getClient(provider)
// Prepare streaming request
const streamingRequest: MessageCreateParams = {
...request,
stream: true
// Return client and request for route layer to handle streaming/non-streaming
return {
client,
anthropicRequest
}
if (provider.authType === 'oauth') {
streamingRequest.system = buildClaudeCodeSystemMessage(request.system || '')
}
const stream = client.messages.stream(streamingRequest)
for await (const chunk of stream) {
yield chunk
}
logger.info('Completed streaming Anthropic message', {
model: request.model,
provider: provider.id
})
}
}

View File

@ -106,7 +106,13 @@ class ClaudeCodeService implements AgentServiceInterface {
logger.warn('claude stderr', { chunk })
errorChunks.push(chunk)
},
systemPrompt: session.instructions ? session.instructions : { type: 'preset', preset: 'claude_code' },
systemPrompt: session.instructions
? {
type: 'preset',
preset: 'claude_code',
append: session.instructions
}
: { type: 'preset', preset: 'claude_code' },
settingSources: ['project'],
includePartialMessages: true,
permissionMode: session.configuration?.permission_mode,
@ -136,6 +142,8 @@ class ClaudeCodeService implements AgentServiceInterface {
if (lastAgentSessionId) {
options.resume = lastAgentSessionId
// TODO: use fork session when we support branching sessions
// options.forkSession = true
}
logger.info('Starting Claude Code SDK query', {

View File

@ -14,6 +14,7 @@ import { addSpan, endSpan } from '@renderer/services/SpanManagerService'
import { StartSpanParams } from '@renderer/trace/types/ModelSpanEntity'
import type { Assistant, GenerateImageParams, Model, Provider } from '@renderer/types'
import type { AiSdkModel, StreamTextParams } from '@renderer/types/aiCoreTypes'
import { buildClaudeCodeSystemModelMessage } from '@shared/anthropic'
import { type ImageModel, type LanguageModel, type Provider as AiSdkProvider, wrapLanguageModel } from 'ai'
import AiSdkToChunkAdapter from './chunk/AiSdkToChunkAdapter'
@ -21,7 +22,6 @@ import LegacyAiProvider from './legacy/index'
import { CompletionsParams, CompletionsResult } from './legacy/middleware/schemas'
import { AiSdkMiddlewareConfig, buildAiSdkMiddlewares } from './middleware/AiSdkMiddlewareBuilder'
import { buildPlugins } from './plugins/PluginBuilder'
import { buildClaudeCodeSystemMessage } from './provider/config/anthropic'
import { createAiSdkProvider } from './provider/factory'
import {
getActualProvider,
@ -122,13 +122,9 @@ export default class ModernAiProvider {
}
if (this.actualProvider.id === 'anthropic' && this.actualProvider.authType === 'oauth') {
const claudeCodeSystemMessage = buildClaudeCodeSystemMessage(params.system)
const claudeCodeSystemMessage = buildClaudeCodeSystemModelMessage(params.system)
params.system = undefined // 清除原有system避免重复
if (Array.isArray(params.messages)) {
params.messages = [...claudeCodeSystemMessage, ...params.messages]
} else {
params.messages = claudeCodeSystemMessage
}
params.messages = [...claudeCodeSystemMessage, ...(params.messages || [])]
}
if (config.topicId && getEnableDeveloperMode()) {

View File

@ -1,24 +0,0 @@
import { SystemModelMessage } from 'ai'
export function buildClaudeCodeSystemMessage(system?: string): Array<SystemModelMessage> {
const defaultClaudeCodeSystem = `You are Claude Code, Anthropic's official CLI for Claude.`
if (!system || system.trim() === defaultClaudeCodeSystem) {
return [
{
role: 'system',
content: defaultClaudeCodeSystem
}
]
}
return [
{
role: 'system',
content: defaultClaudeCodeSystem
},
{
role: 'system',
content: system
}
]
}