From a1d14b9292d17755270db9b4c6cd6e85f15f3c58 Mon Sep 17 00:00:00 2001 From: Vaayne Date: Thu, 25 Sep 2025 23:15:30 +0800 Subject: [PATCH] feat(claudecode): enhance streaming transform flow --- .../claudecode/__tests__/transform.test.ts | 290 ++++++ .../claudecode/claude-stream-state.ts | 241 +++++ .../agents/services/claudecode/index.ts | 18 +- .../agents/services/claudecode/transform.ts | 837 +++++++++++++----- 4 files changed, 1136 insertions(+), 250 deletions(-) create mode 100644 src/main/services/agents/services/claudecode/__tests__/transform.test.ts create mode 100644 src/main/services/agents/services/claudecode/claude-stream-state.ts diff --git a/src/main/services/agents/services/claudecode/__tests__/transform.test.ts b/src/main/services/agents/services/claudecode/__tests__/transform.test.ts new file mode 100644 index 0000000000..413acfa968 --- /dev/null +++ b/src/main/services/agents/services/claudecode/__tests__/transform.test.ts @@ -0,0 +1,290 @@ +import type { SDKMessage } from '@anthropic-ai/claude-code' +import { describe, expect, it } from 'vitest' + +import { ClaudeStreamState, transformSDKMessageToStreamParts } from '../transform' + +const baseStreamMetadata = { + parent_tool_use_id: null, + session_id: 'session-123' +} + +const uuid = (n: number) => `00000000-0000-0000-0000-${n.toString().padStart(12, '0')}` + +describe('Claude → AiSDK transform', () => { + it('handles tool call streaming lifecycle', () => { + const state = new ClaudeStreamState() + const parts: ReturnType[number][] = [] + + const messages: SDKMessage[] = [ + { + ...baseStreamMetadata, + type: 'stream_event', + uuid: uuid(1), + event: { + type: 'message_start', + message: { + id: 'msg-start', + type: 'message', + role: 'assistant', + model: 'claude-test', + content: [], + stop_reason: null, + stop_sequence: null, + usage: {} + } + } + } as unknown as SDKMessage, + { + ...baseStreamMetadata, + type: 'stream_event', + uuid: uuid(2), + event: { + type: 'content_block_start', + index: 0, + content_block: { + type: 'tool_use', + id: 'tool-1', + name: 'Bash', + input: {} + } + } + } as unknown as SDKMessage, + { + ...baseStreamMetadata, + type: 'stream_event', + uuid: uuid(3), + event: { + type: 'content_block_delta', + index: 0, + delta: { + type: 'input_json_delta', + partial_json: '{"command":"ls"}' + } + } + } as unknown as SDKMessage, + { + ...baseStreamMetadata, + type: 'assistant', + uuid: uuid(4), + message: { + id: 'msg-tool', + type: 'message', + role: 'assistant', + model: 'claude-test', + content: [ + { + type: 'tool_use', + id: 'tool-1', + name: 'Bash', + input: { + command: 'ls' + } + } + ], + stop_reason: 'tool_use', + stop_sequence: null, + usage: { + input_tokens: 1, + output_tokens: 0 + } + } + } as unknown as SDKMessage, + { + ...baseStreamMetadata, + type: 'stream_event', + uuid: uuid(5), + event: { + type: 'content_block_stop', + index: 0 + } + } as unknown as SDKMessage, + { + ...baseStreamMetadata, + type: 'stream_event', + uuid: uuid(6), + event: { + type: 'message_delta', + delta: { + stop_reason: 'tool_use', + stop_sequence: null + }, + usage: { + input_tokens: 1, + output_tokens: 5 + } + } + } as unknown as SDKMessage, + { + ...baseStreamMetadata, + type: 'stream_event', + uuid: uuid(7), + event: { + type: 'message_stop' + } + } as unknown as SDKMessage, + { + ...baseStreamMetadata, + type: 'user', + uuid: uuid(8), + message: { + role: 'user', + content: [ + { + type: 'tool_result', + tool_use_id: 'tool-1', + content: 'ok', + is_error: false + } + ] + } + } as SDKMessage + ] + + for (const message of messages) { + const transformed = transformSDKMessageToStreamParts(message, state) + for (const part of transformed) { + parts.push(part) + } + } + + const types = parts.map((part) => part.type) + expect(types).toEqual([ + 'start-step', + 'tool-input-start', + 'tool-input-delta', + 'tool-call', + 'tool-input-end', + 'finish-step', + 'tool-result' + ]) + + const finishStep = parts.find((part) => part.type === 'finish-step') as Extract< + (typeof parts)[number], + { type: 'finish-step' } + > + expect(finishStep.finishReason).toBe('tool-calls') + expect(finishStep.usage).toEqual({ inputTokens: 1, outputTokens: 5, totalTokens: 6 }) + + const toolResult = parts.find((part) => part.type === 'tool-result') as Extract< + (typeof parts)[number], + { type: 'tool-result' } + > + expect(toolResult.toolCallId).toBe('tool-1') + expect(toolResult.toolName).toBe('Bash') + expect(toolResult.input).toEqual({ command: 'ls' }) + expect(toolResult.output).toBe('ok') + }) + + it('handles streaming text completion', () => { + const state = new ClaudeStreamState() + const parts: ReturnType[number][] = [] + + const messages: SDKMessage[] = [ + { + ...baseStreamMetadata, + type: 'stream_event', + uuid: uuid(9), + event: { + type: 'message_start', + message: { + id: 'msg-text', + type: 'message', + role: 'assistant', + model: 'claude-text', + content: [], + stop_reason: null, + stop_sequence: null, + usage: {} + } + } + } as unknown as SDKMessage, + { + ...baseStreamMetadata, + type: 'stream_event', + uuid: uuid(10), + event: { + type: 'content_block_start', + index: 0, + content_block: { + type: 'text', + text: '' + } + } + } as unknown as SDKMessage, + { + ...baseStreamMetadata, + type: 'stream_event', + uuid: uuid(11), + event: { + type: 'content_block_delta', + index: 0, + delta: { + type: 'text_delta', + text: 'Hello' + } + } + } as unknown as SDKMessage, + { + ...baseStreamMetadata, + type: 'stream_event', + uuid: uuid(12), + event: { + type: 'content_block_delta', + index: 0, + delta: { + type: 'text_delta', + text: ' world' + } + } + } as unknown as SDKMessage, + { + ...baseStreamMetadata, + type: 'stream_event', + uuid: uuid(13), + event: { + type: 'content_block_stop', + index: 0 + } + } as unknown as SDKMessage, + { + ...baseStreamMetadata, + type: 'stream_event', + uuid: uuid(14), + event: { + type: 'message_delta', + delta: { + stop_reason: 'end_turn', + stop_sequence: null + }, + usage: { + input_tokens: 2, + output_tokens: 4 + } + } + } as unknown as SDKMessage, + { + ...baseStreamMetadata, + type: 'stream_event', + uuid: uuid(15), + event: { + type: 'message_stop' + } + } as SDKMessage + ] + + for (const message of messages) { + const transformed = transformSDKMessageToStreamParts(message, state) + parts.push(...transformed) + } + + const types = parts.map((part) => part.type) + expect(types).toEqual(['start-step', 'text-start', 'text-delta', 'text-delta', 'text-end', 'finish-step']) + + const finishStep = parts.find((part) => part.type === 'finish-step') as Extract< + (typeof parts)[number], + { type: 'finish-step' } + > + expect(finishStep.finishReason).toBe('stop') + expect(finishStep.usage).toEqual({ inputTokens: 2, outputTokens: 4, totalTokens: 6 }) + }) +}) diff --git a/src/main/services/agents/services/claudecode/claude-stream-state.ts b/src/main/services/agents/services/claudecode/claude-stream-state.ts new file mode 100644 index 0000000000..078f048ce8 --- /dev/null +++ b/src/main/services/agents/services/claudecode/claude-stream-state.ts @@ -0,0 +1,241 @@ +/** + * Lightweight state container shared by the Claude → AiSDK transformer. Anthropic does not send + * deterministic identifiers for intermediate content blocks, so we stitch one together by tracking + * block indices and associated AiSDK ids. This class also keeps: + * • incremental text / reasoning buffers so we can emit only deltas while retaining the full + * aggregate for later tool-call emission; + * • a reverse lookup for tool calls so `tool_result` snapshots can recover their metadata; + * • pending usage + finish reason from `message_delta` events until the corresponding + * `message_stop` arrives. + * Every Claude turn gets its own instance. `resetStep` should be invoked once the finish event has + * been emitted to avoid leaking state into the next turn. + */ +import type { FinishReason, LanguageModelUsage, ProviderMetadata } from 'ai' + +/** + * Shared fields for every block that Claude can stream (text, reasoning, tool). + */ +type BaseBlockState = { + id: string + index: number +} + +type TextBlockState = BaseBlockState & { + kind: 'text' + text: string +} + +type ReasoningBlockState = BaseBlockState & { + kind: 'reasoning' + text: string + redacted: boolean +} + +type ToolBlockState = BaseBlockState & { + kind: 'tool' + toolCallId: string + toolName: string + inputBuffer: string + providerMetadata?: ProviderMetadata + resolvedInput?: unknown +} + +export type BlockState = TextBlockState | ReasoningBlockState | ToolBlockState + +type PendingUsageState = { + usage?: LanguageModelUsage + finishReason?: FinishReason +} + +type PendingToolCall = { + toolCallId: string + toolName: string + input: unknown + providerMetadata?: ProviderMetadata +} + +/** + * Tracks the lifecycle of Claude streaming blocks (text, thinking, tool calls) + * across individual websocket events. The transformer relies on this class to + * stitch together deltas, manage pending tool inputs/results, and propagate + * usage/finish metadata once Anthropic closes a message. + */ +export class ClaudeStreamState { + private blocksByIndex = new Map() + private toolIndexById = new Map() + private pendingUsage: PendingUsageState = {} + private pendingToolCalls = new Map() + private stepActive = false + + /** Marks the beginning of a new AiSDK step. */ + beginStep(): void { + this.stepActive = true + } + + hasActiveStep(): boolean { + return this.stepActive + } + + /** Creates a text block placeholder so future deltas can accumulate into it. */ + openTextBlock(index: number, id: string): TextBlockState { + const block: TextBlockState = { + kind: 'text', + id, + index, + text: '' + } + this.blocksByIndex.set(index, block) + return block + } + + /** Starts tracking an Anthropic "thinking" block, optionally flagged as redacted. */ + openReasoningBlock(index: number, id: string, redacted: boolean): ReasoningBlockState { + const block: ReasoningBlockState = { + kind: 'reasoning', + id, + index, + redacted, + text: '' + } + this.blocksByIndex.set(index, block) + return block + } + + /** Caches tool metadata so subsequent input deltas and results can find it. */ + openToolBlock( + index: number, + params: { toolCallId: string; toolName: string; providerMetadata?: ProviderMetadata } + ): ToolBlockState { + const block: ToolBlockState = { + kind: 'tool', + id: params.toolCallId, + index, + toolCallId: params.toolCallId, + toolName: params.toolName, + inputBuffer: '', + providerMetadata: params.providerMetadata + } + this.blocksByIndex.set(index, block) + this.toolIndexById.set(params.toolCallId, index) + return block + } + + getBlock(index: number): BlockState | undefined { + return this.blocksByIndex.get(index) + } + + getToolBlockById(toolCallId: string): ToolBlockState | undefined { + const index = this.toolIndexById.get(toolCallId) + if (index === undefined) return undefined + const block = this.blocksByIndex.get(index) + if (!block || block.kind !== 'tool') return undefined + return block + } + + /** Appends streamed text to a text block, returning the updated state when present. */ + appendTextDelta(index: number, text: string): TextBlockState | undefined { + const block = this.blocksByIndex.get(index) + if (!block || block.kind !== 'text') return undefined + block.text += text + return block + } + + /** Appends streamed "thinking" content to the tracked reasoning block. */ + appendReasoningDelta(index: number, text: string): ReasoningBlockState | undefined { + const block = this.blocksByIndex.get(index) + if (!block || block.kind !== 'reasoning') return undefined + block.text += text + return block + } + + /** Concatenates incremental JSON payloads for tool input blocks. */ + appendToolInputDelta(index: number, jsonDelta: string): ToolBlockState | undefined { + const block = this.blocksByIndex.get(index) + if (!block || block.kind !== 'tool') return undefined + block.inputBuffer += jsonDelta + return block + } + + /** Records a tool call to be consumed once its result arrives from the user. */ + registerToolCall( + toolCallId: string, + payload: { toolName: string; input: unknown; providerMetadata?: ProviderMetadata } + ): void { + this.pendingToolCalls.set(toolCallId, { + toolCallId, + toolName: payload.toolName, + input: payload.input, + providerMetadata: payload.providerMetadata + }) + } + + /** Retrieves and clears the buffered tool call metadata for the given id. */ + consumePendingToolCall(toolCallId: string): PendingToolCall | undefined { + const entry = this.pendingToolCalls.get(toolCallId) + if (entry) { + this.pendingToolCalls.delete(toolCallId) + } + return entry + } + + /** + * Persists the final input payload for a tool block once the provider signals + * completion so that downstream tool results can reference the original call. + */ + completeToolBlock(toolCallId: string, input: unknown, providerMetadata?: ProviderMetadata): void { + this.registerToolCall(toolCallId, { + toolName: this.getToolBlockById(toolCallId)?.toolName ?? 'unknown', + input, + providerMetadata + }) + const block = this.getToolBlockById(toolCallId) + if (block) { + block.resolvedInput = input + } + } + + /** Removes a block from the active index map when Claude signals it is done. */ + closeBlock(index: number): BlockState | undefined { + const block = this.blocksByIndex.get(index) + if (!block) return undefined + this.blocksByIndex.delete(index) + if (block.kind === 'tool') { + this.toolIndexById.delete(block.toolCallId) + } + return block + } + + /** Stores interim usage metrics so they can be emitted with the `finish-step`. */ + setPendingUsage(usage?: LanguageModelUsage, finishReason?: FinishReason): void { + if (usage) { + this.pendingUsage.usage = usage + } + if (finishReason) { + this.pendingUsage.finishReason = finishReason + } + } + + getPendingUsage(): PendingUsageState { + return { ...this.pendingUsage } + } + + /** Clears any accumulated usage values for the next streamed message. */ + resetPendingUsage(): void { + this.pendingUsage = {} + } + + /** Drops cached block metadata for the currently active message. */ + resetBlocks(): void { + this.blocksByIndex.clear() + this.toolIndexById.clear() + } + + /** Resets the entire step lifecycle after emitting a terminal frame. */ + resetStep(): void { + this.resetBlocks() + this.resetPendingUsage() + this.stepActive = false + } +} + +export type { PendingToolCall } diff --git a/src/main/services/agents/services/claudecode/index.ts b/src/main/services/agents/services/claudecode/index.ts index a20b639daa..cc72b3c5a7 100644 --- a/src/main/services/agents/services/claudecode/index.ts +++ b/src/main/services/agents/services/claudecode/index.ts @@ -11,7 +11,7 @@ import { app } from 'electron' import { GetAgentSessionResponse } from '../..' import { AgentServiceInterface, AgentStream, AgentStreamEvent } from '../../interfaces/AgentStreamInterface' -import { transformSDKMessageToStreamParts } from './transform' +import { ClaudeStreamState, transformSDKMessageToStreamParts } from './transform' const require_ = createRequire(import.meta.url) const logger = loggerService.withContext('ClaudeCodeService') @@ -92,6 +92,7 @@ class ClaudeCodeService implements AgentServiceInterface { errorChunks.push(chunk) }, appendSystemPrompt: session.instructions, + includePartialMessages: true, permissionMode: session.configuration?.permission_mode, maxTurns: session.configuration?.max_turns, allowedTools: session.allowed_tools @@ -164,6 +165,7 @@ class ClaudeCodeService implements AgentServiceInterface { let hasCompleted = false const startTime = Date.now() + const streamState = new ClaudeStreamState() try { // Process streaming responses using SDK query for await (const message of query({ @@ -173,15 +175,21 @@ class ClaudeCodeService implements AgentServiceInterface { if (hasCompleted) break jsonOutput.push(message) - logger.silly('claude response', { message }) + if (message.type === 'assistant' || message.type === 'user') { - logger.silly('message content', { - message: JSON.stringify({ role: message.message.role, content: message.message.content }) + logger.silly('claude response', { + message, + content: JSON.stringify(message.message.content) + }) + } else if (message.type === 'stream_event') { + logger.silly('Claude stream event', { + message, + event: JSON.stringify(message.event) }) } // Transform SDKMessage to UIMessageChunks - const chunks = transformSDKMessageToStreamParts(message) + const chunks = transformSDKMessageToStreamParts(message, streamState) for (const chunk of chunks) { stream.emit('data', { type: 'chunk', diff --git a/src/main/services/agents/services/claudecode/transform.ts b/src/main/services/agents/services/claudecode/transform.ts index 782d71c0d4..9badef411c 100644 --- a/src/main/services/agents/services/claudecode/transform.ts +++ b/src/main/services/agents/services/claudecode/transform.ts @@ -1,66 +1,82 @@ -// This file is used to transform claude code json response to aisdk streaming format +/** + * Translates Anthropic Claude Code streaming messages into the generic AiSDK stream + * parts that the agent runtime understands. The transformer coordinates batched + * text/tool payloads, keeps per-message state using {@link ClaudeStreamState}, + * and normalises usage metadata and finish reasons so downstream consumers do + * not need to reason about Anthropic-specific payload shapes. + * + * Stream lifecycle cheatsheet (per Claude turn): + * 1. `stream_event.message_start` → emit `start-step` and mark the state as active. + * 2. `content_block_start` (by index) → open a stateful block; emits one of + * `text-start` | `reasoning-start` | `tool-input-start`. + * 3. `content_block_delta` → append incremental text / reasoning / tool JSON, + * emitting only the delta to minimise UI churn. + * 4. `content_block_stop` → emit the matching `*-end` event and release the block. + * 5. `message_delta` → capture usage + stop reason but defer emission. + * 6. `message_stop` → emit `finish-step` with cached usage & reason, then reset. + * 7. Assistant snapshots with `tool_use` finalise the tool block (`tool-call`). + * 8. User snapshots with `tool_result` emit `tool-result`/`tool-error` using the cached payload. + * 9. Assistant snapshots with plain text (when no stream events were provided) fall back to + * emitting `text-*` parts and a synthetic `finish-step`. + */ -import type { LanguageModelV2Usage } from '@ai-sdk/provider' import { SDKMessage } from '@anthropic-ai/claude-code' +import type { BetaStopReason } from '@anthropic-ai/sdk/resources/beta/messages/messages.mjs' import { loggerService } from '@logger' -import type { ClaudeCodeRawValue } from '@shared/agents/claudecode/types' -import type { ProviderMetadata, TextStreamPart } from 'ai' +import type { FinishReason, LanguageModelUsage, ProviderMetadata, TextStreamPart } from 'ai' import { v4 as uuidv4 } from 'uuid' +import { ClaudeStreamState } from './claude-stream-state' import { mapClaudeCodeFinishReason } from './map-claude-code-finish-reason' const logger = loggerService.withContext('ClaudeCodeTransform') type AgentStreamPart = TextStreamPart> -type contentBlock = - | { - type: 'text' - } - | { - type: 'tool-call' - toolCallId: string - toolName: string - input: unknown - } - -const contentBlockState = new Map() - -type toolCallBlock = Extract - -// Helper function to generate unique IDs for text blocks -const generateMessageId = (): string => `msg_${uuidv4().replace(/-/g, '')}` - -// Main transform function -export function transformSDKMessageToStreamParts(sdkMessage: SDKMessage): AgentStreamPart[] { - const chunks: AgentStreamPart[] = [] - // logger.silly('Transforming SDKMessage to stream parts', sdkMessage) - switch (sdkMessage.type) { - case 'assistant': - case 'user': - chunks.push(...handleUserOrAssistantMessage(sdkMessage)) - break - - case 'stream_event': - chunks.push(...handleStreamEvent(sdkMessage)) - break - - case 'system': - chunks.push(...handleSystemMessage(sdkMessage)) - break - - case 'result': - chunks.push(...handleResultMessage(sdkMessage)) - break - - default: - logger.warn('Unknown SDKMessage type:', { type: (sdkMessage as any).type }) - break - } - - return chunks +type ToolUseContent = { + type: 'tool_use' + id: string + name: string + input: unknown } +type ToolResultContent = { + type: 'tool_result' + tool_use_id: string + content: unknown + is_error?: boolean +} + +/** + * Maps Anthropic stop reasons to the AiSDK equivalents so higher level + * consumers can treat completion states uniformly across providers. + */ +const finishReasonMapping: Record = { + end_turn: 'stop', + max_tokens: 'length', + stop_sequence: 'stop', + tool_use: 'tool-calls', + pause_turn: 'unknown', + refusal: 'content-filter' +} + +const emptyUsage: LanguageModelUsage = { + inputTokens: 0, + outputTokens: 0, + totalTokens: 0 +} + +/** + * Generates deterministic-ish message identifiers that are compatible with the + * AiSDK text stream contract. Anthropic deltas sometimes omit ids, so we create + * our own to ensure the downstream renderer can stitch chunks together. + */ +const generateMessageId = (): string => `msg_${uuidv4().replace(/-/g, '')}` + +/** + * Extracts provider metadata from the raw Claude message so we can surface it + * on every emitted stream part for observability and debugging purposes. + */ const sdkMessageToProviderMetadata = (message: SDKMessage): ProviderMetadata => { return { anthropic: { @@ -71,250 +87,523 @@ const sdkMessageToProviderMetadata = (message: SDKMessage): ProviderMetadata => } } -function generateTextChunks(id: string, text: string, message: SDKMessage): AgentStreamPart[] { - const providerMetadata = sdkMessageToProviderMetadata(message) - return [ - { - type: 'text-start', - id - }, - { - type: 'text-delta', - id, - text - }, - { - type: 'text-end', - id, - providerMetadata: { - ...providerMetadata - } - } - ] +/** + * Central entrypoint that receives Claude Code websocket events and converts + * them into AiSDK `TextStreamPart`s. The state machine tracks outstanding + * blocks across calls so that incremental deltas can be correlated correctly. + */ +export function transformSDKMessageToStreamParts(sdkMessage: SDKMessage, state: ClaudeStreamState): AgentStreamPart[] { + switch (sdkMessage.type) { + case 'assistant': + return handleAssistantMessage(sdkMessage, state) + case 'user': + return handleUserMessage(sdkMessage, state) + case 'stream_event': + return handleStreamEvent(sdkMessage, state) + case 'system': + return handleSystemMessage(sdkMessage) + case 'result': + return handleResultMessage(sdkMessage) + default: + logger.warn('Unknown SDKMessage type', { type: (sdkMessage as any).type }) + return [] + } } -function handleUserOrAssistantMessage(message: Extract): AgentStreamPart[] { +/** + * Handles aggregated assistant messages that arrive outside of the streaming + * protocol (e.g. after a tool call finishes). We emit the appropriate + * text/tool events and close the active step once the payload is fully + * processed. + */ +function handleAssistantMessage( + message: Extract, + state: ClaudeStreamState +): AgentStreamPart[] { const chunks: AgentStreamPart[] = [] - const messageId = message.uuid?.toString() || generateMessageId() + const providerMetadata = sdkMessageToProviderMetadata(message) + const content = message.message.content + const isStreamingActive = state.hasActiveStep() - // handle normal text content - if (typeof message.message.content === 'string') { - const textContent = message.message.content - if (textContent) { - chunks.push(...generateTextChunks(messageId, textContent, message)) + if (typeof content === 'string') { + if (!content) { + return chunks } - } else if (Array.isArray(message.message.content)) { - for (const block of message.message.content) { - switch (block.type) { - case 'text': - chunks.push(...generateTextChunks(messageId, block.text, message)) - break - case 'tool_use': - chunks.push({ - type: 'tool-call', - toolCallId: block.id, - toolName: block.name, - input: block.input, - providerExecuted: true, - providerMetadata: sdkMessageToProviderMetadata(message) - }) - contentBlockState.set(block.id, { - type: 'tool-call', - toolCallId: block.id, - toolName: block.name, - input: block.input - }) - break - case 'tool_result': { - logger.silly('Handling tool result:', { block, content: contentBlockState }) - const hasToolCall = contentBlockState.has(block.tool_use_id) - const toolCall = contentBlockState.get(block.tool_use_id) as toolCallBlock - chunks.push({ - type: 'tool-result', - toolCallId: block.tool_use_id, - toolName: hasToolCall ? toolCall.toolName : 'Unknown', - input: hasToolCall ? toolCall.input : '', - output: block.content - }) - break + + if (!isStreamingActive) { + state.beginStep() + chunks.push({ + type: 'start-step', + request: { body: '' }, + warnings: [] + }) + } + + const textId = message.uuid?.toString() || generateMessageId() + chunks.push({ + type: 'text-start', + id: textId, + providerMetadata + }) + chunks.push({ + type: 'text-delta', + id: textId, + text: content, + providerMetadata + }) + chunks.push({ + type: 'text-end', + id: textId, + providerMetadata + }) + return finalizeNonStreamingStep(message, state, chunks) + } + + if (!Array.isArray(content)) { + return chunks + } + + const textBlocks: string[] = [] + + for (const block of content) { + switch (block.type) { + case 'text': + if (!isStreamingActive) { + textBlocks.push(block.text) } - default: - logger.warn('Unknown content block type in user/assistant message:', { - type: block.type - }) - chunks.push({ - type: 'raw', - rawValue: block - }) - break + break + case 'tool_use': + handleAssistantToolUse(block as ToolUseContent, providerMetadata, state, chunks) + break + default: + logger.warn('Unhandled assistant content block', { type: (block as any).type }) + break + } + } + + if (!isStreamingActive && textBlocks.length > 0) { + const id = message.uuid?.toString() || generateMessageId() + state.beginStep() + chunks.push({ + type: 'start-step', + request: { body: '' }, + warnings: [] + }) + chunks.push({ + type: 'text-start', + id, + providerMetadata + }) + chunks.push({ + type: 'text-delta', + id, + text: textBlocks.join(''), + providerMetadata + }) + chunks.push({ + type: 'text-end', + id, + providerMetadata + }) + return finalizeNonStreamingStep(message, state, chunks) + } + + return chunks +} + +/** + * Registers tool invocations with the stream state so that later tool results + * can be matched with the originating call. + */ +function handleAssistantToolUse( + block: ToolUseContent, + providerMetadata: ProviderMetadata, + state: ClaudeStreamState, + chunks: AgentStreamPart[] +): void { + chunks.push({ + type: 'tool-call', + toolCallId: block.id, + toolName: block.name, + input: block.input, + providerExecuted: true, + providerMetadata + }) + state.completeToolBlock(block.id, block.input, providerMetadata) +} + +/** + * Emits the terminating `finish-step` frame for non-streamed responses and + * clears the currently active step in the state tracker. + */ +function finalizeNonStreamingStep( + message: Extract, + state: ClaudeStreamState, + chunks: AgentStreamPart[] +): AgentStreamPart[] { + const usage = calculateUsageFromMessage(message) + const finishReason = inferFinishReason(message.message.stop_reason) + chunks.push({ + type: 'finish-step', + response: { + id: message.uuid, + timestamp: new Date(), + modelId: message.message.model ?? '' + }, + usage: usage ?? emptyUsage, + finishReason, + providerMetadata: sdkMessageToProviderMetadata(message) + }) + state.resetStep() + return chunks +} + +/** + * Converts user-originated websocket frames (text, tool results, etc.) into + * the AiSDK format. Tool results are matched back to pending tool calls via the + * shared `ClaudeStreamState` instance. + */ +function handleUserMessage( + message: Extract, + state: ClaudeStreamState +): AgentStreamPart[] { + const chunks: AgentStreamPart[] = [] + const providerMetadata = sdkMessageToProviderMetadata(message) + const content = message.message.content + + if (typeof content === 'string') { + if (!content) { + return chunks + } + + const id = message.uuid?.toString() || generateMessageId() + chunks.push({ + type: 'text-start', + id, + providerMetadata + }) + chunks.push({ + type: 'text-delta', + id, + text: content, + providerMetadata + }) + chunks.push({ + type: 'text-end', + id, + providerMetadata + }) + return chunks + } + + if (!Array.isArray(content)) { + return chunks + } + + for (const block of content) { + if (block.type === 'tool_result') { + const toolResult = block as ToolResultContent + const pendingCall = state.consumePendingToolCall(toolResult.tool_use_id) + if (toolResult.is_error) { + chunks.push({ + type: 'tool-error', + toolCallId: toolResult.tool_use_id, + toolName: pendingCall?.toolName ?? 'unknown', + input: pendingCall?.input, + error: toolResult.content, + providerExecuted: true + } as AgentStreamPart) + } else { + chunks.push({ + type: 'tool-result', + toolCallId: toolResult.tool_use_id, + toolName: pendingCall?.toolName ?? 'unknown', + input: pendingCall?.input, + output: toolResult.content, + providerExecuted: true + }) } + } else if (block.type === 'text') { + const id = message.uuid?.toString() || generateMessageId() + chunks.push({ + type: 'text-start', + id, + providerMetadata + }) + chunks.push({ + type: 'text-delta', + id, + text: (block as { text: string }).text, + providerMetadata + }) + chunks.push({ + type: 'text-end', + id, + providerMetadata + }) + } else { + logger.warn('Unhandled user content block', { type: (block as any).type }) } } return chunks } -// Handle stream events (real-time streaming) -function handleStreamEvent(message: Extract): AgentStreamPart[] { +/** + * Handles the fine-grained real-time streaming protocol where Anthropic emits + * discrete events for message lifecycle, content blocks, and usage deltas. + */ +function handleStreamEvent( + message: Extract, + state: ClaudeStreamState +): AgentStreamPart[] { const chunks: AgentStreamPart[] = [] - const event = message.event - const blockKey = `${message.uuid ?? message.session_id ?? 'session'}:${event.type}` - logger.silly('Handling stream event:', { event }) + const providerMetadata = sdkMessageToProviderMetadata(message) + const { event } = message + switch (event.type) { case 'message_start': - // No specific UI chunk needed for message start in this protocol + state.beginStep() + chunks.push({ + type: 'start-step', + request: { body: '' }, + warnings: [] + }) break case 'content_block_start': - switch (event.content_block.type) { - case 'text': { - contentBlockState.set(blockKey, { type: 'text' }) - chunks.push({ - type: 'text-start', - id: String(event.index), - providerMetadata: { - ...sdkMessageToProviderMetadata(message), - anthropic: { - uuid: message.uuid, - session_id: message.session_id, - content_block_index: event.index - } - } - }) - break - } - case 'tool_use': { - contentBlockState.set(event.content_block.id, { - type: 'tool-call', - toolCallId: event.content_block.id, - toolName: event.content_block.name, - input: '' - }) - chunks.push({ - type: 'tool-call', - toolCallId: event.content_block.id, - toolName: event.content_block.name, - input: event.content_block.input, - providerExecuted: true, - providerMetadata: sdkMessageToProviderMetadata(message) - }) - break - } - } - break - case 'content_block_delta': - switch (event.delta.type) { - case 'text_delta': { - chunks.push({ - type: 'text-delta', - id: String(event.index), - text: event.delta.text, - providerMetadata: { - ...sdkMessageToProviderMetadata(message), - anthropic: { - uuid: message.uuid, - session_id: message.session_id, - content_block_index: event.index - } - } - }) - break - } - // case 'thinking_delta': { - // chunks.push({ - // type: 'reasoning-delta', - // id: String(event.index), - // text: event.delta.thinking, - // }); - // break - // } - // case 'signature_delta': { - // if (blockType === 'thinking') { - // chunks.push({ - // type: 'reasoning-delta', - // id: String(event.index), - // text: '', - // providerMetadata: { - // ...sdkMessageToProviderMetadata(message), - // anthropic: { - // uuid: message.uuid, - // session_id: message.session_id, - // content_block_index: event.index, - // signature: event.delta.signature - // } - // } - // }) - // } - // break - // } - case 'input_json_delta': { - const contentBlock = contentBlockState.get(blockKey) - if (contentBlock && contentBlock.type === 'tool-call') { - contentBlockState.set(blockKey, { - ...contentBlock, - input: `${contentBlock.input ?? ''}${event.delta.partial_json ?? ''}` - }) - } - break - } - } + handleContentBlockStart(event.index, event.content_block, providerMetadata, state, chunks) break - case 'content_block_stop': - { - const contentBlock = contentBlockState.get(blockKey) - if (contentBlock?.type === 'text') { + case 'content_block_delta': + handleContentBlockDelta(event.index, event.delta, providerMetadata, state, chunks) + break + + case 'content_block_stop': { + const block = state.closeBlock(event.index) + if (!block) { + logger.warn('Received content_block_stop for unknown index', { index: event.index }) + break + } + + switch (block.kind) { + case 'text': chunks.push({ type: 'text-end', - id: String(event.index) + id: block.id, + providerMetadata }) - } - contentBlockState.delete(blockKey) + break + case 'reasoning': + chunks.push({ + type: 'reasoning-end', + id: block.id, + providerMetadata + }) + break + case 'tool': + chunks.push({ + type: 'tool-input-end', + id: block.toolCallId, + providerMetadata + }) + break + default: + break } break - case 'message_delta': - // Handle usage updates or other message-level deltas + } + + case 'message_delta': { + const finishReason = event.delta.stop_reason + ? mapStopReason(event.delta.stop_reason as BetaStopReason) + : undefined + const usage = convertUsage(event.usage) + state.setPendingUsage(usage, finishReason) break - case 'message_stop': - // This could signal the end of the message + } + + case 'message_stop': { + const pending = state.getPendingUsage() + chunks.push({ + type: 'finish-step', + response: { + id: message.uuid, + timestamp: new Date(), + modelId: '' + }, + usage: pending.usage ?? emptyUsage, + finishReason: pending.finishReason ?? 'stop', + providerMetadata + }) + state.resetStep() break + } + default: - logger.warn('Unknown stream event type:', { type: (event as any).type }) + logger.warn('Unknown stream event type', { type: (event as any).type }) break } return chunks } -// Handle system messages +/** + * Opens the appropriate block type when Claude starts streaming a new content + * section so later deltas know which logical entity to append to. + */ +function handleContentBlockStart( + index: number, + contentBlock: any, + providerMetadata: ProviderMetadata, + state: ClaudeStreamState, + chunks: AgentStreamPart[] +): void { + switch (contentBlock.type) { + case 'text': { + const block = state.openTextBlock(index, generateMessageId()) + chunks.push({ + type: 'text-start', + id: block.id, + providerMetadata + }) + break + } + case 'thinking': + case 'redacted_thinking': { + const block = state.openReasoningBlock(index, generateMessageId(), contentBlock.type === 'redacted_thinking') + chunks.push({ + type: 'reasoning-start', + id: block.id, + providerMetadata + }) + break + } + case 'tool_use': { + const block = state.openToolBlock(index, { + toolCallId: contentBlock.id, + toolName: contentBlock.name, + providerMetadata + }) + chunks.push({ + type: 'tool-input-start', + id: block.toolCallId, + toolName: block.toolName, + providerMetadata + }) + break + } + default: + logger.warn('Unhandled content_block_start type', { type: contentBlock.type }) + break + } +} + +/** + * Applies incremental deltas to the active block (text, thinking, tool input) + * and emits the translated AiSDK chunk immediately. + */ +function handleContentBlockDelta( + index: number, + delta: any, + providerMetadata: ProviderMetadata, + state: ClaudeStreamState, + chunks: AgentStreamPart[] +): void { + switch (delta.type) { + case 'text_delta': { + const block = state.appendTextDelta(index, delta.text) + if (!block) { + logger.warn('Received text_delta for unknown block', { index }) + return + } + chunks.push({ + type: 'text-delta', + id: block.id, + text: block.text, + providerMetadata + }) + break + } + case 'thinking_delta': { + const block = state.appendReasoningDelta(index, delta.thinking) + if (!block) { + logger.warn('Received thinking_delta for unknown block', { index }) + return + } + chunks.push({ + type: 'reasoning-delta', + id: block.id, + text: delta.thinking, + providerMetadata + }) + break + } + case 'signature_delta': { + const block = state.getBlock(index) + if (block && block.kind === 'reasoning') { + chunks.push({ + type: 'reasoning-delta', + id: block.id, + text: '', + providerMetadata + }) + } + break + } + case 'input_json_delta': { + const block = state.appendToolInputDelta(index, delta.partial_json) + if (!block) { + logger.warn('Received input_json_delta for unknown block', { index }) + return + } + chunks.push({ + type: 'tool-input-delta', + id: block.toolCallId, + delta: block.inputBuffer, + providerMetadata + }) + break + } + default: + logger.warn('Unhandled content_block_delta type', { type: delta.type }) + break + } +} + +/** + * System messages currently only deliver the session bootstrap payload. We + * forward it as both a `start` marker and a raw snapshot for diagnostics. + */ function handleSystemMessage(message: Extract): AgentStreamPart[] { const chunks: AgentStreamPart[] = [] - switch (message.subtype) { - case 'init': { - chunks.push({ - type: 'start' - }) - const rawValue: ClaudeCodeRawValue = { + if (message.subtype === 'init') { + chunks.push({ + type: 'start' + }) + chunks.push({ + type: 'raw', + rawValue: { type: 'init', session_id: message.session_id, slash_commands: message.slash_commands, tools: message.tools, raw: message } - chunks.push({ - type: 'raw', - rawValue - }) - } + }) } return chunks } -// Handle result messages (completion with usage stats) +/** + * Terminal result messages arrive once the Claude Code session concludes. + * Successful runs yield a `finish` frame with aggregated usage metrics, while + * failures are surfaced as `error` frames. + */ function handleResultMessage(message: Extract): AgentStreamPart[] { const chunks: AgentStreamPart[] = [] - let usage: LanguageModelV2Usage | undefined + let usage: LanguageModelUsage | undefined if ('usage' in message) { usage = { inputTokens: message.usage.input_tokens ?? 0, @@ -322,10 +611,11 @@ function handleResultMessage(message: Extract): totalTokens: (message.usage.input_tokens ?? 0) + (message.usage.output_tokens ?? 0) } } + if (message.subtype === 'success') { chunks.push({ type: 'finish', - totalUsage: usage, + totalUsage: usage ?? emptyUsage, finishReason: mapClaudeCodeFinishReason(message.subtype), providerMetadata: { ...sdkMessageToProviderMetadata(message), @@ -345,3 +635,60 @@ function handleResultMessage(message: Extract): } return chunks } + +/** + * Normalises usage payloads so the caller always receives numeric values even + * when the provider omits certain fields. + */ +function convertUsage( + usage?: { + input_tokens?: number | null + output_tokens?: number | null + } | null +): LanguageModelUsage | undefined { + if (!usage) { + return undefined + } + const inputTokens = usage.input_tokens ?? 0 + const outputTokens = usage.output_tokens ?? 0 + return { + inputTokens, + outputTokens, + totalTokens: inputTokens + outputTokens + } +} + +/** + * Anthropic-only wrapper around {@link finishReasonMapping} that defaults to + * `unknown` to avoid surprising downstream consumers when new stop reasons are + * introduced. + */ +function mapStopReason(reason: BetaStopReason): FinishReason { + return finishReasonMapping[reason] ?? 'unknown' +} + +/** + * Extracts token accounting details from an assistant message, if available. + */ +function calculateUsageFromMessage( + message: Extract +): LanguageModelUsage | undefined { + const usage = message.message.usage + if (!usage) return undefined + return { + inputTokens: usage.input_tokens ?? 0, + outputTokens: usage.output_tokens ?? 0, + totalTokens: (usage.input_tokens ?? 0) + (usage.output_tokens ?? 0) + } +} + +/** + * Converts Anthropic stop reasons into AiSDK finish reasons, falling back to a + * generic `stop` if the provider omits the detail entirely. + */ +function inferFinishReason(stopReason: BetaStopReason | null | undefined): FinishReason { + if (!stopReason) return 'stop' + return mapStopReason(stopReason) +} + +export { ClaudeStreamState }