feat(claudecode): enhance streaming transform flow

This commit is contained in:
Vaayne 2025-09-25 23:15:30 +08:00
parent a7d6065b08
commit a1d14b9292
4 changed files with 1136 additions and 250 deletions

View File

@ -0,0 +1,290 @@
import type { SDKMessage } from '@anthropic-ai/claude-code'
import { describe, expect, it } from 'vitest'
import { ClaudeStreamState, transformSDKMessageToStreamParts } from '../transform'
const baseStreamMetadata = {
parent_tool_use_id: null,
session_id: 'session-123'
}
const uuid = (n: number) => `00000000-0000-0000-0000-${n.toString().padStart(12, '0')}`
describe('Claude → AiSDK transform', () => {
it('handles tool call streaming lifecycle', () => {
const state = new ClaudeStreamState()
const parts: ReturnType<typeof transformSDKMessageToStreamParts>[number][] = []
const messages: SDKMessage[] = [
{
...baseStreamMetadata,
type: 'stream_event',
uuid: uuid(1),
event: {
type: 'message_start',
message: {
id: 'msg-start',
type: 'message',
role: 'assistant',
model: 'claude-test',
content: [],
stop_reason: null,
stop_sequence: null,
usage: {}
}
}
} as unknown as SDKMessage,
{
...baseStreamMetadata,
type: 'stream_event',
uuid: uuid(2),
event: {
type: 'content_block_start',
index: 0,
content_block: {
type: 'tool_use',
id: 'tool-1',
name: 'Bash',
input: {}
}
}
} as unknown as SDKMessage,
{
...baseStreamMetadata,
type: 'stream_event',
uuid: uuid(3),
event: {
type: 'content_block_delta',
index: 0,
delta: {
type: 'input_json_delta',
partial_json: '{"command":"ls"}'
}
}
} as unknown as SDKMessage,
{
...baseStreamMetadata,
type: 'assistant',
uuid: uuid(4),
message: {
id: 'msg-tool',
type: 'message',
role: 'assistant',
model: 'claude-test',
content: [
{
type: 'tool_use',
id: 'tool-1',
name: 'Bash',
input: {
command: 'ls'
}
}
],
stop_reason: 'tool_use',
stop_sequence: null,
usage: {
input_tokens: 1,
output_tokens: 0
}
}
} as unknown as SDKMessage,
{
...baseStreamMetadata,
type: 'stream_event',
uuid: uuid(5),
event: {
type: 'content_block_stop',
index: 0
}
} as unknown as SDKMessage,
{
...baseStreamMetadata,
type: 'stream_event',
uuid: uuid(6),
event: {
type: 'message_delta',
delta: {
stop_reason: 'tool_use',
stop_sequence: null
},
usage: {
input_tokens: 1,
output_tokens: 5
}
}
} as unknown as SDKMessage,
{
...baseStreamMetadata,
type: 'stream_event',
uuid: uuid(7),
event: {
type: 'message_stop'
}
} as unknown as SDKMessage,
{
...baseStreamMetadata,
type: 'user',
uuid: uuid(8),
message: {
role: 'user',
content: [
{
type: 'tool_result',
tool_use_id: 'tool-1',
content: 'ok',
is_error: false
}
]
}
} as SDKMessage
]
for (const message of messages) {
const transformed = transformSDKMessageToStreamParts(message, state)
for (const part of transformed) {
parts.push(part)
}
}
const types = parts.map((part) => part.type)
expect(types).toEqual([
'start-step',
'tool-input-start',
'tool-input-delta',
'tool-call',
'tool-input-end',
'finish-step',
'tool-result'
])
const finishStep = parts.find((part) => part.type === 'finish-step') as Extract<
(typeof parts)[number],
{ type: 'finish-step' }
>
expect(finishStep.finishReason).toBe('tool-calls')
expect(finishStep.usage).toEqual({ inputTokens: 1, outputTokens: 5, totalTokens: 6 })
const toolResult = parts.find((part) => part.type === 'tool-result') as Extract<
(typeof parts)[number],
{ type: 'tool-result' }
>
expect(toolResult.toolCallId).toBe('tool-1')
expect(toolResult.toolName).toBe('Bash')
expect(toolResult.input).toEqual({ command: 'ls' })
expect(toolResult.output).toBe('ok')
})
it('handles streaming text completion', () => {
const state = new ClaudeStreamState()
const parts: ReturnType<typeof transformSDKMessageToStreamParts>[number][] = []
const messages: SDKMessage[] = [
{
...baseStreamMetadata,
type: 'stream_event',
uuid: uuid(9),
event: {
type: 'message_start',
message: {
id: 'msg-text',
type: 'message',
role: 'assistant',
model: 'claude-text',
content: [],
stop_reason: null,
stop_sequence: null,
usage: {}
}
}
} as unknown as SDKMessage,
{
...baseStreamMetadata,
type: 'stream_event',
uuid: uuid(10),
event: {
type: 'content_block_start',
index: 0,
content_block: {
type: 'text',
text: ''
}
}
} as unknown as SDKMessage,
{
...baseStreamMetadata,
type: 'stream_event',
uuid: uuid(11),
event: {
type: 'content_block_delta',
index: 0,
delta: {
type: 'text_delta',
text: 'Hello'
}
}
} as unknown as SDKMessage,
{
...baseStreamMetadata,
type: 'stream_event',
uuid: uuid(12),
event: {
type: 'content_block_delta',
index: 0,
delta: {
type: 'text_delta',
text: ' world'
}
}
} as unknown as SDKMessage,
{
...baseStreamMetadata,
type: 'stream_event',
uuid: uuid(13),
event: {
type: 'content_block_stop',
index: 0
}
} as unknown as SDKMessage,
{
...baseStreamMetadata,
type: 'stream_event',
uuid: uuid(14),
event: {
type: 'message_delta',
delta: {
stop_reason: 'end_turn',
stop_sequence: null
},
usage: {
input_tokens: 2,
output_tokens: 4
}
}
} as unknown as SDKMessage,
{
...baseStreamMetadata,
type: 'stream_event',
uuid: uuid(15),
event: {
type: 'message_stop'
}
} as SDKMessage
]
for (const message of messages) {
const transformed = transformSDKMessageToStreamParts(message, state)
parts.push(...transformed)
}
const types = parts.map((part) => part.type)
expect(types).toEqual(['start-step', 'text-start', 'text-delta', 'text-delta', 'text-end', 'finish-step'])
const finishStep = parts.find((part) => part.type === 'finish-step') as Extract<
(typeof parts)[number],
{ type: 'finish-step' }
>
expect(finishStep.finishReason).toBe('stop')
expect(finishStep.usage).toEqual({ inputTokens: 2, outputTokens: 4, totalTokens: 6 })
})
})

View File

@ -0,0 +1,241 @@
/**
* Lightweight state container shared by the Claude AiSDK transformer. Anthropic does not send
* deterministic identifiers for intermediate content blocks, so we stitch one together by tracking
* block indices and associated AiSDK ids. This class also keeps:
* incremental text / reasoning buffers so we can emit only deltas while retaining the full
* aggregate for later tool-call emission;
* a reverse lookup for tool calls so `tool_result` snapshots can recover their metadata;
* pending usage + finish reason from `message_delta` events until the corresponding
* `message_stop` arrives.
* Every Claude turn gets its own instance. `resetStep` should be invoked once the finish event has
* been emitted to avoid leaking state into the next turn.
*/
import type { FinishReason, LanguageModelUsage, ProviderMetadata } from 'ai'
/**
* Shared fields for every block that Claude can stream (text, reasoning, tool).
*/
type BaseBlockState = {
id: string
index: number
}
type TextBlockState = BaseBlockState & {
kind: 'text'
text: string
}
type ReasoningBlockState = BaseBlockState & {
kind: 'reasoning'
text: string
redacted: boolean
}
type ToolBlockState = BaseBlockState & {
kind: 'tool'
toolCallId: string
toolName: string
inputBuffer: string
providerMetadata?: ProviderMetadata
resolvedInput?: unknown
}
export type BlockState = TextBlockState | ReasoningBlockState | ToolBlockState
type PendingUsageState = {
usage?: LanguageModelUsage
finishReason?: FinishReason
}
type PendingToolCall = {
toolCallId: string
toolName: string
input: unknown
providerMetadata?: ProviderMetadata
}
/**
* Tracks the lifecycle of Claude streaming blocks (text, thinking, tool calls)
* across individual websocket events. The transformer relies on this class to
* stitch together deltas, manage pending tool inputs/results, and propagate
* usage/finish metadata once Anthropic closes a message.
*/
export class ClaudeStreamState {
private blocksByIndex = new Map<number, BlockState>()
private toolIndexById = new Map<string, number>()
private pendingUsage: PendingUsageState = {}
private pendingToolCalls = new Map<string, PendingToolCall>()
private stepActive = false
/** Marks the beginning of a new AiSDK step. */
beginStep(): void {
this.stepActive = true
}
hasActiveStep(): boolean {
return this.stepActive
}
/** Creates a text block placeholder so future deltas can accumulate into it. */
openTextBlock(index: number, id: string): TextBlockState {
const block: TextBlockState = {
kind: 'text',
id,
index,
text: ''
}
this.blocksByIndex.set(index, block)
return block
}
/** Starts tracking an Anthropic "thinking" block, optionally flagged as redacted. */
openReasoningBlock(index: number, id: string, redacted: boolean): ReasoningBlockState {
const block: ReasoningBlockState = {
kind: 'reasoning',
id,
index,
redacted,
text: ''
}
this.blocksByIndex.set(index, block)
return block
}
/** Caches tool metadata so subsequent input deltas and results can find it. */
openToolBlock(
index: number,
params: { toolCallId: string; toolName: string; providerMetadata?: ProviderMetadata }
): ToolBlockState {
const block: ToolBlockState = {
kind: 'tool',
id: params.toolCallId,
index,
toolCallId: params.toolCallId,
toolName: params.toolName,
inputBuffer: '',
providerMetadata: params.providerMetadata
}
this.blocksByIndex.set(index, block)
this.toolIndexById.set(params.toolCallId, index)
return block
}
getBlock(index: number): BlockState | undefined {
return this.blocksByIndex.get(index)
}
getToolBlockById(toolCallId: string): ToolBlockState | undefined {
const index = this.toolIndexById.get(toolCallId)
if (index === undefined) return undefined
const block = this.blocksByIndex.get(index)
if (!block || block.kind !== 'tool') return undefined
return block
}
/** Appends streamed text to a text block, returning the updated state when present. */
appendTextDelta(index: number, text: string): TextBlockState | undefined {
const block = this.blocksByIndex.get(index)
if (!block || block.kind !== 'text') return undefined
block.text += text
return block
}
/** Appends streamed "thinking" content to the tracked reasoning block. */
appendReasoningDelta(index: number, text: string): ReasoningBlockState | undefined {
const block = this.blocksByIndex.get(index)
if (!block || block.kind !== 'reasoning') return undefined
block.text += text
return block
}
/** Concatenates incremental JSON payloads for tool input blocks. */
appendToolInputDelta(index: number, jsonDelta: string): ToolBlockState | undefined {
const block = this.blocksByIndex.get(index)
if (!block || block.kind !== 'tool') return undefined
block.inputBuffer += jsonDelta
return block
}
/** Records a tool call to be consumed once its result arrives from the user. */
registerToolCall(
toolCallId: string,
payload: { toolName: string; input: unknown; providerMetadata?: ProviderMetadata }
): void {
this.pendingToolCalls.set(toolCallId, {
toolCallId,
toolName: payload.toolName,
input: payload.input,
providerMetadata: payload.providerMetadata
})
}
/** Retrieves and clears the buffered tool call metadata for the given id. */
consumePendingToolCall(toolCallId: string): PendingToolCall | undefined {
const entry = this.pendingToolCalls.get(toolCallId)
if (entry) {
this.pendingToolCalls.delete(toolCallId)
}
return entry
}
/**
* Persists the final input payload for a tool block once the provider signals
* completion so that downstream tool results can reference the original call.
*/
completeToolBlock(toolCallId: string, input: unknown, providerMetadata?: ProviderMetadata): void {
this.registerToolCall(toolCallId, {
toolName: this.getToolBlockById(toolCallId)?.toolName ?? 'unknown',
input,
providerMetadata
})
const block = this.getToolBlockById(toolCallId)
if (block) {
block.resolvedInput = input
}
}
/** Removes a block from the active index map when Claude signals it is done. */
closeBlock(index: number): BlockState | undefined {
const block = this.blocksByIndex.get(index)
if (!block) return undefined
this.blocksByIndex.delete(index)
if (block.kind === 'tool') {
this.toolIndexById.delete(block.toolCallId)
}
return block
}
/** Stores interim usage metrics so they can be emitted with the `finish-step`. */
setPendingUsage(usage?: LanguageModelUsage, finishReason?: FinishReason): void {
if (usage) {
this.pendingUsage.usage = usage
}
if (finishReason) {
this.pendingUsage.finishReason = finishReason
}
}
getPendingUsage(): PendingUsageState {
return { ...this.pendingUsage }
}
/** Clears any accumulated usage values for the next streamed message. */
resetPendingUsage(): void {
this.pendingUsage = {}
}
/** Drops cached block metadata for the currently active message. */
resetBlocks(): void {
this.blocksByIndex.clear()
this.toolIndexById.clear()
}
/** Resets the entire step lifecycle after emitting a terminal frame. */
resetStep(): void {
this.resetBlocks()
this.resetPendingUsage()
this.stepActive = false
}
}
export type { PendingToolCall }

View File

@ -11,7 +11,7 @@ import { app } from 'electron'
import { GetAgentSessionResponse } from '../..'
import { AgentServiceInterface, AgentStream, AgentStreamEvent } from '../../interfaces/AgentStreamInterface'
import { transformSDKMessageToStreamParts } from './transform'
import { ClaudeStreamState, transformSDKMessageToStreamParts } from './transform'
const require_ = createRequire(import.meta.url)
const logger = loggerService.withContext('ClaudeCodeService')
@ -92,6 +92,7 @@ class ClaudeCodeService implements AgentServiceInterface {
errorChunks.push(chunk)
},
appendSystemPrompt: session.instructions,
includePartialMessages: true,
permissionMode: session.configuration?.permission_mode,
maxTurns: session.configuration?.max_turns,
allowedTools: session.allowed_tools
@ -164,6 +165,7 @@ class ClaudeCodeService implements AgentServiceInterface {
let hasCompleted = false
const startTime = Date.now()
const streamState = new ClaudeStreamState()
try {
// Process streaming responses using SDK query
for await (const message of query({
@ -173,15 +175,21 @@ class ClaudeCodeService implements AgentServiceInterface {
if (hasCompleted) break
jsonOutput.push(message)
logger.silly('claude response', { message })
if (message.type === 'assistant' || message.type === 'user') {
logger.silly('message content', {
message: JSON.stringify({ role: message.message.role, content: message.message.content })
logger.silly('claude response', {
message,
content: JSON.stringify(message.message.content)
})
} else if (message.type === 'stream_event') {
logger.silly('Claude stream event', {
message,
event: JSON.stringify(message.event)
})
}
// Transform SDKMessage to UIMessageChunks
const chunks = transformSDKMessageToStreamParts(message)
const chunks = transformSDKMessageToStreamParts(message, streamState)
for (const chunk of chunks) {
stream.emit('data', {
type: 'chunk',

View File

@ -1,66 +1,82 @@
// This file is used to transform claude code json response to aisdk streaming format
/**
* Translates Anthropic Claude Code streaming messages into the generic AiSDK stream
* parts that the agent runtime understands. The transformer coordinates batched
* text/tool payloads, keeps per-message state using {@link ClaudeStreamState},
* and normalises usage metadata and finish reasons so downstream consumers do
* not need to reason about Anthropic-specific payload shapes.
*
* Stream lifecycle cheatsheet (per Claude turn):
* 1. `stream_event.message_start` emit `start-step` and mark the state as active.
* 2. `content_block_start` (by index) open a stateful block; emits one of
* `text-start` | `reasoning-start` | `tool-input-start`.
* 3. `content_block_delta` append incremental text / reasoning / tool JSON,
* emitting only the delta to minimise UI churn.
* 4. `content_block_stop` emit the matching `*-end` event and release the block.
* 5. `message_delta` capture usage + stop reason but defer emission.
* 6. `message_stop` emit `finish-step` with cached usage & reason, then reset.
* 7. Assistant snapshots with `tool_use` finalise the tool block (`tool-call`).
* 8. User snapshots with `tool_result` emit `tool-result`/`tool-error` using the cached payload.
* 9. Assistant snapshots with plain text (when no stream events were provided) fall back to
* emitting `text-*` parts and a synthetic `finish-step`.
*/
import type { LanguageModelV2Usage } from '@ai-sdk/provider'
import { SDKMessage } from '@anthropic-ai/claude-code'
import type { BetaStopReason } from '@anthropic-ai/sdk/resources/beta/messages/messages.mjs'
import { loggerService } from '@logger'
import type { ClaudeCodeRawValue } from '@shared/agents/claudecode/types'
import type { ProviderMetadata, TextStreamPart } from 'ai'
import type { FinishReason, LanguageModelUsage, ProviderMetadata, TextStreamPart } from 'ai'
import { v4 as uuidv4 } from 'uuid'
import { ClaudeStreamState } from './claude-stream-state'
import { mapClaudeCodeFinishReason } from './map-claude-code-finish-reason'
const logger = loggerService.withContext('ClaudeCodeTransform')
type AgentStreamPart = TextStreamPart<Record<string, any>>
type contentBlock =
| {
type: 'text'
}
| {
type: 'tool-call'
toolCallId: string
toolName: string
type ToolUseContent = {
type: 'tool_use'
id: string
name: string
input: unknown
}
const contentBlockState = new Map<string, contentBlock>()
type toolCallBlock = Extract<contentBlock, { type: 'tool-call' }>
// Helper function to generate unique IDs for text blocks
const generateMessageId = (): string => `msg_${uuidv4().replace(/-/g, '')}`
// Main transform function
export function transformSDKMessageToStreamParts(sdkMessage: SDKMessage): AgentStreamPart[] {
const chunks: AgentStreamPart[] = []
// logger.silly('Transforming SDKMessage to stream parts', sdkMessage)
switch (sdkMessage.type) {
case 'assistant':
case 'user':
chunks.push(...handleUserOrAssistantMessage(sdkMessage))
break
case 'stream_event':
chunks.push(...handleStreamEvent(sdkMessage))
break
case 'system':
chunks.push(...handleSystemMessage(sdkMessage))
break
case 'result':
chunks.push(...handleResultMessage(sdkMessage))
break
default:
logger.warn('Unknown SDKMessage type:', { type: (sdkMessage as any).type })
break
}
return chunks
}
type ToolResultContent = {
type: 'tool_result'
tool_use_id: string
content: unknown
is_error?: boolean
}
/**
* Maps Anthropic stop reasons to the AiSDK equivalents so higher level
* consumers can treat completion states uniformly across providers.
*/
const finishReasonMapping: Record<BetaStopReason, FinishReason> = {
end_turn: 'stop',
max_tokens: 'length',
stop_sequence: 'stop',
tool_use: 'tool-calls',
pause_turn: 'unknown',
refusal: 'content-filter'
}
const emptyUsage: LanguageModelUsage = {
inputTokens: 0,
outputTokens: 0,
totalTokens: 0
}
/**
* Generates deterministic-ish message identifiers that are compatible with the
* AiSDK text stream contract. Anthropic deltas sometimes omit ids, so we create
* our own to ensure the downstream renderer can stitch chunks together.
*/
const generateMessageId = (): string => `msg_${uuidv4().replace(/-/g, '')}`
/**
* Extracts provider metadata from the raw Claude message so we can surface it
* on every emitted stream part for observability and debugging purposes.
*/
const sdkMessageToProviderMetadata = (message: SDKMessage): ProviderMetadata => {
return {
anthropic: {
@ -71,250 +87,523 @@ const sdkMessageToProviderMetadata = (message: SDKMessage): ProviderMetadata =>
}
}
function generateTextChunks(id: string, text: string, message: SDKMessage): AgentStreamPart[] {
const providerMetadata = sdkMessageToProviderMetadata(message)
return [
{
type: 'text-start',
id
},
{
type: 'text-delta',
id,
text
},
{
type: 'text-end',
id,
providerMetadata: {
...providerMetadata
/**
* Central entrypoint that receives Claude Code websocket events and converts
* them into AiSDK `TextStreamPart`s. The state machine tracks outstanding
* blocks across calls so that incremental deltas can be correlated correctly.
*/
export function transformSDKMessageToStreamParts(sdkMessage: SDKMessage, state: ClaudeStreamState): AgentStreamPart[] {
switch (sdkMessage.type) {
case 'assistant':
return handleAssistantMessage(sdkMessage, state)
case 'user':
return handleUserMessage(sdkMessage, state)
case 'stream_event':
return handleStreamEvent(sdkMessage, state)
case 'system':
return handleSystemMessage(sdkMessage)
case 'result':
return handleResultMessage(sdkMessage)
default:
logger.warn('Unknown SDKMessage type', { type: (sdkMessage as any).type })
return []
}
}
]
}
function handleUserOrAssistantMessage(message: Extract<SDKMessage, { type: 'assistant' | 'user' }>): AgentStreamPart[] {
/**
* Handles aggregated assistant messages that arrive outside of the streaming
* protocol (e.g. after a tool call finishes). We emit the appropriate
* text/tool events and close the active step once the payload is fully
* processed.
*/
function handleAssistantMessage(
message: Extract<SDKMessage, { type: 'assistant' }>,
state: ClaudeStreamState
): AgentStreamPart[] {
const chunks: AgentStreamPart[] = []
const messageId = message.uuid?.toString() || generateMessageId()
const providerMetadata = sdkMessageToProviderMetadata(message)
const content = message.message.content
const isStreamingActive = state.hasActiveStep()
// handle normal text content
if (typeof message.message.content === 'string') {
const textContent = message.message.content
if (textContent) {
chunks.push(...generateTextChunks(messageId, textContent, message))
if (typeof content === 'string') {
if (!content) {
return chunks
}
} else if (Array.isArray(message.message.content)) {
for (const block of message.message.content) {
if (!isStreamingActive) {
state.beginStep()
chunks.push({
type: 'start-step',
request: { body: '' },
warnings: []
})
}
const textId = message.uuid?.toString() || generateMessageId()
chunks.push({
type: 'text-start',
id: textId,
providerMetadata
})
chunks.push({
type: 'text-delta',
id: textId,
text: content,
providerMetadata
})
chunks.push({
type: 'text-end',
id: textId,
providerMetadata
})
return finalizeNonStreamingStep(message, state, chunks)
}
if (!Array.isArray(content)) {
return chunks
}
const textBlocks: string[] = []
for (const block of content) {
switch (block.type) {
case 'text':
chunks.push(...generateTextChunks(messageId, block.text, message))
if (!isStreamingActive) {
textBlocks.push(block.text)
}
break
case 'tool_use':
handleAssistantToolUse(block as ToolUseContent, providerMetadata, state, chunks)
break
default:
logger.warn('Unhandled assistant content block', { type: (block as any).type })
break
}
}
if (!isStreamingActive && textBlocks.length > 0) {
const id = message.uuid?.toString() || generateMessageId()
state.beginStep()
chunks.push({
type: 'start-step',
request: { body: '' },
warnings: []
})
chunks.push({
type: 'text-start',
id,
providerMetadata
})
chunks.push({
type: 'text-delta',
id,
text: textBlocks.join(''),
providerMetadata
})
chunks.push({
type: 'text-end',
id,
providerMetadata
})
return finalizeNonStreamingStep(message, state, chunks)
}
return chunks
}
/**
* Registers tool invocations with the stream state so that later tool results
* can be matched with the originating call.
*/
function handleAssistantToolUse(
block: ToolUseContent,
providerMetadata: ProviderMetadata,
state: ClaudeStreamState,
chunks: AgentStreamPart[]
): void {
chunks.push({
type: 'tool-call',
toolCallId: block.id,
toolName: block.name,
input: block.input,
providerExecuted: true,
providerMetadata
})
state.completeToolBlock(block.id, block.input, providerMetadata)
}
/**
* Emits the terminating `finish-step` frame for non-streamed responses and
* clears the currently active step in the state tracker.
*/
function finalizeNonStreamingStep(
message: Extract<SDKMessage, { type: 'assistant' }>,
state: ClaudeStreamState,
chunks: AgentStreamPart[]
): AgentStreamPart[] {
const usage = calculateUsageFromMessage(message)
const finishReason = inferFinishReason(message.message.stop_reason)
chunks.push({
type: 'finish-step',
response: {
id: message.uuid,
timestamp: new Date(),
modelId: message.message.model ?? ''
},
usage: usage ?? emptyUsage,
finishReason,
providerMetadata: sdkMessageToProviderMetadata(message)
})
contentBlockState.set(block.id, {
type: 'tool-call',
toolCallId: block.id,
toolName: block.name,
input: block.input
state.resetStep()
return chunks
}
/**
* Converts user-originated websocket frames (text, tool results, etc.) into
* the AiSDK format. Tool results are matched back to pending tool calls via the
* shared `ClaudeStreamState` instance.
*/
function handleUserMessage(
message: Extract<SDKMessage, { type: 'user' }>,
state: ClaudeStreamState
): AgentStreamPart[] {
const chunks: AgentStreamPart[] = []
const providerMetadata = sdkMessageToProviderMetadata(message)
const content = message.message.content
if (typeof content === 'string') {
if (!content) {
return chunks
}
const id = message.uuid?.toString() || generateMessageId()
chunks.push({
type: 'text-start',
id,
providerMetadata
})
break
case 'tool_result': {
logger.silly('Handling tool result:', { block, content: contentBlockState })
const hasToolCall = contentBlockState.has(block.tool_use_id)
const toolCall = contentBlockState.get(block.tool_use_id) as toolCallBlock
chunks.push({
type: 'text-delta',
id,
text: content,
providerMetadata
})
chunks.push({
type: 'text-end',
id,
providerMetadata
})
return chunks
}
if (!Array.isArray(content)) {
return chunks
}
for (const block of content) {
if (block.type === 'tool_result') {
const toolResult = block as ToolResultContent
const pendingCall = state.consumePendingToolCall(toolResult.tool_use_id)
if (toolResult.is_error) {
chunks.push({
type: 'tool-error',
toolCallId: toolResult.tool_use_id,
toolName: pendingCall?.toolName ?? 'unknown',
input: pendingCall?.input,
error: toolResult.content,
providerExecuted: true
} as AgentStreamPart)
} else {
chunks.push({
type: 'tool-result',
toolCallId: block.tool_use_id,
toolName: hasToolCall ? toolCall.toolName : 'Unknown',
input: hasToolCall ? toolCall.input : '',
output: block.content
toolCallId: toolResult.tool_use_id,
toolName: pendingCall?.toolName ?? 'unknown',
input: pendingCall?.input,
output: toolResult.content,
providerExecuted: true
})
break
}
default:
logger.warn('Unknown content block type in user/assistant message:', {
type: block.type
} else if (block.type === 'text') {
const id = message.uuid?.toString() || generateMessageId()
chunks.push({
type: 'text-start',
id,
providerMetadata
})
chunks.push({
type: 'raw',
rawValue: block
type: 'text-delta',
id,
text: (block as { text: string }).text,
providerMetadata
})
break
}
chunks.push({
type: 'text-end',
id,
providerMetadata
})
} else {
logger.warn('Unhandled user content block', { type: (block as any).type })
}
}
return chunks
}
// Handle stream events (real-time streaming)
function handleStreamEvent(message: Extract<SDKMessage, { type: 'stream_event' }>): AgentStreamPart[] {
/**
* Handles the fine-grained real-time streaming protocol where Anthropic emits
* discrete events for message lifecycle, content blocks, and usage deltas.
*/
function handleStreamEvent(
message: Extract<SDKMessage, { type: 'stream_event' }>,
state: ClaudeStreamState
): AgentStreamPart[] {
const chunks: AgentStreamPart[] = []
const event = message.event
const blockKey = `${message.uuid ?? message.session_id ?? 'session'}:${event.type}`
logger.silly('Handling stream event:', { event })
const providerMetadata = sdkMessageToProviderMetadata(message)
const { event } = message
switch (event.type) {
case 'message_start':
// No specific UI chunk needed for message start in this protocol
state.beginStep()
chunks.push({
type: 'start-step',
request: { body: '' },
warnings: []
})
break
case 'content_block_start':
switch (event.content_block.type) {
handleContentBlockStart(event.index, event.content_block, providerMetadata, state, chunks)
break
case 'content_block_delta':
handleContentBlockDelta(event.index, event.delta, providerMetadata, state, chunks)
break
case 'content_block_stop': {
const block = state.closeBlock(event.index)
if (!block) {
logger.warn('Received content_block_stop for unknown index', { index: event.index })
break
}
switch (block.kind) {
case 'text':
chunks.push({
type: 'text-end',
id: block.id,
providerMetadata
})
break
case 'reasoning':
chunks.push({
type: 'reasoning-end',
id: block.id,
providerMetadata
})
break
case 'tool':
chunks.push({
type: 'tool-input-end',
id: block.toolCallId,
providerMetadata
})
break
default:
break
}
break
}
case 'message_delta': {
const finishReason = event.delta.stop_reason
? mapStopReason(event.delta.stop_reason as BetaStopReason)
: undefined
const usage = convertUsage(event.usage)
state.setPendingUsage(usage, finishReason)
break
}
case 'message_stop': {
const pending = state.getPendingUsage()
chunks.push({
type: 'finish-step',
response: {
id: message.uuid,
timestamp: new Date(),
modelId: ''
},
usage: pending.usage ?? emptyUsage,
finishReason: pending.finishReason ?? 'stop',
providerMetadata
})
state.resetStep()
break
}
default:
logger.warn('Unknown stream event type', { type: (event as any).type })
break
}
return chunks
}
/**
* Opens the appropriate block type when Claude starts streaming a new content
* section so later deltas know which logical entity to append to.
*/
function handleContentBlockStart(
index: number,
contentBlock: any,
providerMetadata: ProviderMetadata,
state: ClaudeStreamState,
chunks: AgentStreamPart[]
): void {
switch (contentBlock.type) {
case 'text': {
contentBlockState.set(blockKey, { type: 'text' })
const block = state.openTextBlock(index, generateMessageId())
chunks.push({
type: 'text-start',
id: String(event.index),
providerMetadata: {
...sdkMessageToProviderMetadata(message),
anthropic: {
uuid: message.uuid,
session_id: message.session_id,
content_block_index: event.index
}
id: block.id,
providerMetadata
})
break
}
case 'thinking':
case 'redacted_thinking': {
const block = state.openReasoningBlock(index, generateMessageId(), contentBlock.type === 'redacted_thinking')
chunks.push({
type: 'reasoning-start',
id: block.id,
providerMetadata
})
break
}
case 'tool_use': {
contentBlockState.set(event.content_block.id, {
type: 'tool-call',
toolCallId: event.content_block.id,
toolName: event.content_block.name,
input: ''
const block = state.openToolBlock(index, {
toolCallId: contentBlock.id,
toolName: contentBlock.name,
providerMetadata
})
chunks.push({
type: 'tool-call',
toolCallId: event.content_block.id,
toolName: event.content_block.name,
input: event.content_block.input,
providerExecuted: true,
providerMetadata: sdkMessageToProviderMetadata(message)
type: 'tool-input-start',
id: block.toolCallId,
toolName: block.toolName,
providerMetadata
})
break
}
}
break
case 'content_block_delta':
switch (event.delta.type) {
case 'text_delta': {
chunks.push({
type: 'text-delta',
id: String(event.index),
text: event.delta.text,
providerMetadata: {
...sdkMessageToProviderMetadata(message),
anthropic: {
uuid: message.uuid,
session_id: message.session_id,
content_block_index: event.index
}
}
})
break
}
// case 'thinking_delta': {
// chunks.push({
// type: 'reasoning-delta',
// id: String(event.index),
// text: event.delta.thinking,
// });
// break
// }
// case 'signature_delta': {
// if (blockType === 'thinking') {
// chunks.push({
// type: 'reasoning-delta',
// id: String(event.index),
// text: '',
// providerMetadata: {
// ...sdkMessageToProviderMetadata(message),
// anthropic: {
// uuid: message.uuid,
// session_id: message.session_id,
// content_block_index: event.index,
// signature: event.delta.signature
// }
// }
// })
// }
// break
// }
case 'input_json_delta': {
const contentBlock = contentBlockState.get(blockKey)
if (contentBlock && contentBlock.type === 'tool-call') {
contentBlockState.set(blockKey, {
...contentBlock,
input: `${contentBlock.input ?? ''}${event.delta.partial_json ?? ''}`
})
}
break
}
}
break
case 'content_block_stop':
{
const contentBlock = contentBlockState.get(blockKey)
if (contentBlock?.type === 'text') {
chunks.push({
type: 'text-end',
id: String(event.index)
})
}
contentBlockState.delete(blockKey)
}
break
case 'message_delta':
// Handle usage updates or other message-level deltas
break
case 'message_stop':
// This could signal the end of the message
break
default:
logger.warn('Unknown stream event type:', { type: (event as any).type })
logger.warn('Unhandled content_block_start type', { type: contentBlock.type })
break
}
return chunks
}
// Handle system messages
/**
* Applies incremental deltas to the active block (text, thinking, tool input)
* and emits the translated AiSDK chunk immediately.
*/
function handleContentBlockDelta(
index: number,
delta: any,
providerMetadata: ProviderMetadata,
state: ClaudeStreamState,
chunks: AgentStreamPart[]
): void {
switch (delta.type) {
case 'text_delta': {
const block = state.appendTextDelta(index, delta.text)
if (!block) {
logger.warn('Received text_delta for unknown block', { index })
return
}
chunks.push({
type: 'text-delta',
id: block.id,
text: block.text,
providerMetadata
})
break
}
case 'thinking_delta': {
const block = state.appendReasoningDelta(index, delta.thinking)
if (!block) {
logger.warn('Received thinking_delta for unknown block', { index })
return
}
chunks.push({
type: 'reasoning-delta',
id: block.id,
text: delta.thinking,
providerMetadata
})
break
}
case 'signature_delta': {
const block = state.getBlock(index)
if (block && block.kind === 'reasoning') {
chunks.push({
type: 'reasoning-delta',
id: block.id,
text: '',
providerMetadata
})
}
break
}
case 'input_json_delta': {
const block = state.appendToolInputDelta(index, delta.partial_json)
if (!block) {
logger.warn('Received input_json_delta for unknown block', { index })
return
}
chunks.push({
type: 'tool-input-delta',
id: block.toolCallId,
delta: block.inputBuffer,
providerMetadata
})
break
}
default:
logger.warn('Unhandled content_block_delta type', { type: delta.type })
break
}
}
/**
* System messages currently only deliver the session bootstrap payload. We
* forward it as both a `start` marker and a raw snapshot for diagnostics.
*/
function handleSystemMessage(message: Extract<SDKMessage, { type: 'system' }>): AgentStreamPart[] {
const chunks: AgentStreamPart[] = []
switch (message.subtype) {
case 'init': {
if (message.subtype === 'init') {
chunks.push({
type: 'start'
})
const rawValue: ClaudeCodeRawValue = {
chunks.push({
type: 'raw',
rawValue: {
type: 'init',
session_id: message.session_id,
slash_commands: message.slash_commands,
tools: message.tools,
raw: message
}
chunks.push({
type: 'raw',
rawValue
})
}
}
return chunks
}
// Handle result messages (completion with usage stats)
/**
* Terminal result messages arrive once the Claude Code session concludes.
* Successful runs yield a `finish` frame with aggregated usage metrics, while
* failures are surfaced as `error` frames.
*/
function handleResultMessage(message: Extract<SDKMessage, { type: 'result' }>): AgentStreamPart[] {
const chunks: AgentStreamPart[] = []
let usage: LanguageModelV2Usage | undefined
let usage: LanguageModelUsage | undefined
if ('usage' in message) {
usage = {
inputTokens: message.usage.input_tokens ?? 0,
@ -322,10 +611,11 @@ function handleResultMessage(message: Extract<SDKMessage, { type: 'result' }>):
totalTokens: (message.usage.input_tokens ?? 0) + (message.usage.output_tokens ?? 0)
}
}
if (message.subtype === 'success') {
chunks.push({
type: 'finish',
totalUsage: usage,
totalUsage: usage ?? emptyUsage,
finishReason: mapClaudeCodeFinishReason(message.subtype),
providerMetadata: {
...sdkMessageToProviderMetadata(message),
@ -345,3 +635,60 @@ function handleResultMessage(message: Extract<SDKMessage, { type: 'result' }>):
}
return chunks
}
/**
* Normalises usage payloads so the caller always receives numeric values even
* when the provider omits certain fields.
*/
function convertUsage(
usage?: {
input_tokens?: number | null
output_tokens?: number | null
} | null
): LanguageModelUsage | undefined {
if (!usage) {
return undefined
}
const inputTokens = usage.input_tokens ?? 0
const outputTokens = usage.output_tokens ?? 0
return {
inputTokens,
outputTokens,
totalTokens: inputTokens + outputTokens
}
}
/**
* Anthropic-only wrapper around {@link finishReasonMapping} that defaults to
* `unknown` to avoid surprising downstream consumers when new stop reasons are
* introduced.
*/
function mapStopReason(reason: BetaStopReason): FinishReason {
return finishReasonMapping[reason] ?? 'unknown'
}
/**
* Extracts token accounting details from an assistant message, if available.
*/
function calculateUsageFromMessage(
message: Extract<SDKMessage, { type: 'assistant' }>
): LanguageModelUsage | undefined {
const usage = message.message.usage
if (!usage) return undefined
return {
inputTokens: usage.input_tokens ?? 0,
outputTokens: usage.output_tokens ?? 0,
totalTokens: (usage.input_tokens ?? 0) + (usage.output_tokens ?? 0)
}
}
/**
* Converts Anthropic stop reasons into AiSDK finish reasons, falling back to a
* generic `stop` if the provider omits the detail entirely.
*/
function inferFinishReason(stopReason: BetaStopReason | null | undefined): FinishReason {
if (!stopReason) return 'stop'
return mapStopReason(stopReason)
}
export { ClaudeStreamState }