feat: Implement direct processing for Anthropic SDK and refactor message handling

This commit is contained in:
suyao 2025-11-27 20:43:47 +08:00
parent ccfb9423e0
commit f225fbe3e3
No known key found for this signature in database
4 changed files with 141 additions and 16 deletions

View File

@ -16,6 +16,20 @@ import type { ModelMessage } from 'ai'
const logger = loggerService.withContext('anthropic-sdk')
/**
* Context for Anthropic SDK client creation.
* This allows the shared module to be used in different environments
* by providing environment-specific implementations.
*/
export interface AnthropicSdkContext {
/**
* Custom fetch function to use for HTTP requests.
* In Electron main process, this should be `net.fetch`.
* In other environments, can use the default fetch or a custom implementation.
*/
fetch?: typeof globalThis.fetch
}
const defaultClaudeCodeSystemPrompt = `You are Claude Code, Anthropic's official CLI for Claude.`
const defaultClaudeCodeSystem: Array<TextBlockParam> = [
@ -58,8 +72,11 @@ const defaultClaudeCodeSystem: Array<TextBlockParam> = [
export function getSdkClient(
provider: Provider,
oauthToken?: string | null,
extraHeaders?: Record<string, string | string[]>
extraHeaders?: Record<string, string | string[]>,
context?: AnthropicSdkContext
): Anthropic {
const customFetch = context?.fetch
if (provider.authType === 'oauth') {
if (!oauthToken) {
throw new Error('OAuth token is not available')
@ -85,7 +102,8 @@ export function getSdkClient(
'x-stainless-runtime': 'node',
'x-stainless-runtime-version': 'v22.18.0',
...extraHeaders
}
},
fetch: customFetch
})
}
let baseURL =
@ -110,7 +128,8 @@ export function getSdkClient(
'APP-Code': 'MLTG2087',
...provider.extra_headers,
...extraHeaders
}
},
fetch: customFetch
})
}
@ -122,7 +141,8 @@ export function getSdkClient(
defaultHeaders: {
'anthropic-beta': 'output-128k-2025-02-19',
...provider.extra_headers
}
},
fetch: customFetch
})
}

View File

@ -8,6 +8,17 @@ import { messagesService } from '../services/messages'
import { generateUnifiedMessage, streamUnifiedMessages } from '../services/unified-messages'
import { getProviderById, validateModelId } from '../utils'
/**
* Check if provider should use direct Anthropic SDK
*
* A provider is considered "Anthropic-compatible" if:
* 1. It's a native Anthropic provider (type === 'anthropic'), OR
* 2. It has anthropicApiHost configured (aggregated providers routing to Anthropic-compatible endpoints)
*/
function shouldUseDirectAnthropic(provider: Provider): boolean {
return provider.type === 'anthropic' || !!(provider.anthropicApiHost && provider.anthropicApiHost.trim())
}
const logger = loggerService.withContext('ApiServerMessagesRoutes')
const router = express.Router()
@ -41,12 +52,70 @@ interface HandleMessageProcessingOptions {
}
/**
* Handle message processing using unified AI SDK
* All providers (including Anthropic) are handled through AI SDK:
* - Anthropic providers use @ai-sdk/anthropic which outputs native Anthropic SSE
* - Other providers use their respective AI SDK adapters, with output converted to Anthropic SSE
* Handle message processing using direct Anthropic SDK
* Used for providers with anthropicApiHost or native Anthropic providers
* This bypasses AI SDK conversion and uses native Anthropic protocol
*/
async function handleMessageProcessing({
async function handleDirectAnthropicProcessing({
res,
provider,
request,
modelId,
extraHeaders
}: HandleMessageProcessingOptions & { extraHeaders?: Record<string, string | string[]> }): Promise<void> {
const actualModelId = modelId || request.model
logger.info('Processing message via direct Anthropic SDK', {
providerId: provider.id,
providerType: provider.type,
modelId: actualModelId,
stream: !!request.stream,
anthropicApiHost: provider.anthropicApiHost
})
try {
// Validate request
const validation = messagesService.validateRequest(request)
if (!validation.isValid) {
res.status(400).json({
type: 'error',
error: {
type: 'invalid_request_error',
message: validation.errors.join('; ')
}
})
return
}
// Process message using messagesService (native Anthropic SDK)
const { client, anthropicRequest } = await messagesService.processMessage({
provider,
request,
extraHeaders,
modelId: actualModelId
})
if (request.stream) {
// Use native Anthropic streaming
await messagesService.handleStreaming(client, anthropicRequest, { response: res }, provider)
} else {
// Use native Anthropic non-streaming
const response = await client.messages.create(anthropicRequest)
res.json(response)
}
} catch (error: any) {
logger.error('Direct Anthropic processing error', { error })
const { statusCode, errorResponse } = messagesService.transformError(error)
res.status(statusCode).json(errorResponse)
}
}
/**
* Handle message processing using unified AI SDK
* Used for non-Anthropic providers that need format conversion
* - Uses AI SDK adapters with output converted to Anthropic SSE format
*/
async function handleUnifiedProcessing({
res,
provider,
request,
@ -93,12 +162,31 @@ async function handleMessageProcessing({
res.json(response)
}
} catch (error: any) {
logger.error('Message processing error', { error })
logger.error('Unified processing error', { error })
const { statusCode, errorResponse } = messagesService.transformError(error)
res.status(statusCode).json(errorResponse)
}
}
/**
* Handle message processing - routes to appropriate handler based on provider
*
* Routing logic:
* - Providers with anthropicApiHost OR type 'anthropic': Direct Anthropic SDK (no conversion)
* - Other providers: Unified AI SDK with Anthropic SSE conversion
*/
async function handleMessageProcessing({
res,
provider,
request,
modelId
}: HandleMessageProcessingOptions): Promise<void> {
if (shouldUseDirectAnthropic(provider)) {
return handleDirectAnthropicProcessing({ res, provider, request, modelId })
}
return handleUnifiedProcessing({ res, provider, request, modelId })
}
/**
* @swagger
* /v1/messages:

View File

@ -4,6 +4,7 @@ import { loggerService } from '@logger'
import anthropicService from '@main/services/AnthropicService'
import { buildClaudeCodeSystemMessage, getSdkClient } from '@shared/anthropic'
import type { Provider } from '@types'
import { net } from 'electron'
import type { Response } from 'express'
const logger = loggerService.withContext('MessagesService')
@ -98,11 +99,30 @@ export class MessagesService {
async getClient(provider: Provider, extraHeaders?: Record<string, string | string[]>): Promise<Anthropic> {
// Create Anthropic client for the provider
// Wrap net.fetch to handle compatibility issues:
// 1. net.fetch expects string URLs, not Request objects
// 2. net.fetch doesn't support 'agent' option from Node.js http module
const electronFetch: typeof globalThis.fetch = async (input: URL | RequestInfo, init?: RequestInit) => {
const url = typeof input === 'string' ? input : input instanceof URL ? input.toString() : input.url
// Remove unsupported options for Electron's net.fetch
if (init) {
const initWithAgent = init as RequestInit & { agent?: unknown }
delete initWithAgent.agent
const headers = new Headers(initWithAgent.headers)
if (headers.has('content-length')) {
headers.delete('content-length')
}
initWithAgent.headers = headers
return net.fetch(url, initWithAgent)
}
return net.fetch(url)
}
const context = { fetch: electronFetch }
if (provider.authType === 'oauth') {
const oauthToken = await anthropicService.getValidAccessToken()
return getSdkClient(provider, oauthToken, extraHeaders)
return getSdkClient(provider, oauthToken, extraHeaders, context)
}
return getSdkClient(provider, null, extraHeaders)
return getSdkClient(provider, null, extraHeaders, context)
}
prepareHeaders(headers: Record<string, string | string[] | undefined>): Record<string, string | string[]> {

View File

@ -42,10 +42,6 @@ export interface UnifiedStreamConfig {
onComplete?: () => void
}
// ============================================================================
// Provider Factory
// ============================================================================
/**
* Main process format context for formatProviderApiHost
* Unlike renderer, main process doesn't have direct access to store getters, so use reduxService cache
@ -338,6 +334,7 @@ function convertAnthropicToAiMessages(params: MessageCreateParams): ModelMessage
/**
* Stream a message request using AI SDK and convert to Anthropic SSE format
*/
// TODO: 使用ai-core executor集成中间件和transformstream进来
export async function streamUnifiedMessages(config: UnifiedStreamConfig): Promise<void> {
const { response, provider, modelId, params, onError, onComplete } = config