fix(Anthropic): content truncation (#7942)

* fix(Anthropic): content truncation

* feat: add start event and fix content truncation

* fix (gemini): some event

* revert: index.tsx

* revert(messageThunk): error block

* fix: ci

* chore: unuse log
This commit is contained in:
SuYao 2025-07-10 16:58:35 +08:00 committed by GitHub
parent 3350c3e2e5
commit 3afa81eb5d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 213 additions and 229 deletions

View File

@ -49,10 +49,10 @@ import {
LLMWebSearchCompleteChunk,
LLMWebSearchInProgressChunk,
MCPToolCreatedChunk,
TextCompleteChunk,
TextDeltaChunk,
ThinkingCompleteChunk,
ThinkingDeltaChunk
TextStartChunk,
ThinkingDeltaChunk,
ThinkingStartChunk
} from '@renderer/types/chunk'
import { type Message } from '@renderer/types/newMessage'
import {
@ -519,7 +519,6 @@ export class AnthropicAPIClient extends BaseApiClient<
return () => {
let accumulatedJson = ''
const toolCalls: Record<number, ToolUseBlock> = {}
const ChunkIdTypeMap: Record<number, ChunkType> = {}
return {
async transform(rawChunk: AnthropicSdkRawChunk, controller: TransformStreamDefaultController<GenericChunk>) {
switch (rawChunk.type) {
@ -615,16 +614,16 @@ export class AnthropicAPIClient extends BaseApiClient<
break
}
case 'text': {
if (!ChunkIdTypeMap[rawChunk.index]) {
ChunkIdTypeMap[rawChunk.index] = ChunkType.TEXT_DELTA // 用textdelta代表文本块
}
controller.enqueue({
type: ChunkType.TEXT_START
} as TextStartChunk)
break
}
case 'thinking':
case 'redacted_thinking': {
if (!ChunkIdTypeMap[rawChunk.index]) {
ChunkIdTypeMap[rawChunk.index] = ChunkType.THINKING_DELTA // 用thinkingdelta代表思考块
}
controller.enqueue({
type: ChunkType.THINKING_START
} as ThinkingStartChunk)
break
}
}
@ -661,15 +660,6 @@ export class AnthropicAPIClient extends BaseApiClient<
break
}
case 'content_block_stop': {
if (ChunkIdTypeMap[rawChunk.index] === ChunkType.TEXT_DELTA) {
controller.enqueue({
type: ChunkType.TEXT_COMPLETE
} as TextCompleteChunk)
} else if (ChunkIdTypeMap[rawChunk.index] === ChunkType.THINKING_DELTA) {
controller.enqueue({
type: ChunkType.THINKING_COMPLETE
} as ThinkingCompleteChunk)
}
const toolCall = toolCalls[rawChunk.index]
if (toolCall) {
try {

View File

@ -41,7 +41,7 @@ import {
ToolCallResponse,
WebSearchSource
} from '@renderer/types'
import { ChunkType, LLMWebSearchCompleteChunk } from '@renderer/types/chunk'
import { ChunkType, LLMWebSearchCompleteChunk, TextStartChunk, ThinkingStartChunk } from '@renderer/types/chunk'
import { Message } from '@renderer/types/newMessage'
import {
GeminiOptions,
@ -547,20 +547,34 @@ export class GeminiAPIClient extends BaseApiClient<
}
getResponseChunkTransformer(): ResponseChunkTransformer<GeminiSdkRawChunk> {
const toolCalls: FunctionCall[] = []
let isFirstTextChunk = true
let isFirstThinkingChunk = true
return () => ({
async transform(chunk: GeminiSdkRawChunk, controller: TransformStreamDefaultController<GenericChunk>) {
const toolCalls: FunctionCall[] = []
if (chunk.candidates && chunk.candidates.length > 0) {
for (const candidate of chunk.candidates) {
if (candidate.content) {
candidate.content.parts?.forEach((part) => {
const text = part.text || ''
if (part.thought) {
if (isFirstThinkingChunk) {
controller.enqueue({
type: ChunkType.THINKING_START
} as ThinkingStartChunk)
isFirstThinkingChunk = false
}
controller.enqueue({
type: ChunkType.THINKING_DELTA,
text: text
})
} else if (part.text) {
if (isFirstTextChunk) {
controller.enqueue({
type: ChunkType.TEXT_START
} as TextStartChunk)
isFirstTextChunk = false
}
controller.enqueue({
type: ChunkType.TEXT_DELTA,
text: text

View File

@ -31,7 +31,7 @@ import {
ToolCallResponse,
WebSearchSource
} from '@renderer/types'
import { ChunkType } from '@renderer/types/chunk'
import { ChunkType, TextStartChunk, ThinkingStartChunk } from '@renderer/types/chunk'
import { Message } from '@renderer/types/newMessage'
import {
OpenAISdkMessageParam,
@ -659,6 +659,8 @@ export class OpenAIAPIClient extends OpenAIBaseClient<
isFinished = true
}
let isFirstThinkingChunk = true
let isFirstTextChunk = true
return (context: ResponseChunkTransformerContext) => ({
async transform(chunk: OpenAISdkRawChunk, controller: TransformStreamDefaultController<GenericChunk>) {
// 持续更新usage信息
@ -699,6 +701,12 @@ export class OpenAIAPIClient extends OpenAIBaseClient<
// @ts-ignore - reasoning_content is not in standard OpenAI types but some providers use it
const reasoningText = contentSource.reasoning_content || contentSource.reasoning
if (reasoningText) {
if (isFirstThinkingChunk) {
controller.enqueue({
type: ChunkType.THINKING_START
} as ThinkingStartChunk)
isFirstThinkingChunk = false
}
controller.enqueue({
type: ChunkType.THINKING_DELTA,
text: reasoningText
@ -707,6 +715,12 @@ export class OpenAIAPIClient extends OpenAIBaseClient<
// 处理文本内容
if (contentSource.content) {
if (isFirstTextChunk) {
controller.enqueue({
type: ChunkType.TEXT_START
} as TextStartChunk)
isFirstTextChunk = false
}
controller.enqueue({
type: ChunkType.TEXT_DELTA,
text: contentSource.content

View File

@ -424,6 +424,8 @@ export class OpenAIResponseAPIClient extends OpenAIBaseClient<
const outputItems: OpenAI.Responses.ResponseOutputItem[] = []
let hasBeenCollectedToolCalls = false
let hasReasoningSummary = false
let isFirstThinkingChunk = true
let isFirstTextChunk = true
return () => ({
async transform(chunk: OpenAIResponseSdkRawChunk, controller: TransformStreamDefaultController<GenericChunk>) {
// 处理chunk
@ -435,6 +437,12 @@ export class OpenAIResponseAPIClient extends OpenAIBaseClient<
switch (output.type) {
case 'message':
if (output.content[0].type === 'output_text') {
if (isFirstTextChunk) {
controller.enqueue({
type: ChunkType.TEXT_START
})
isFirstTextChunk = false
}
controller.enqueue({
type: ChunkType.TEXT_DELTA,
text: output.content[0].text
@ -451,6 +459,12 @@ export class OpenAIResponseAPIClient extends OpenAIBaseClient<
}
break
case 'reasoning':
if (isFirstThinkingChunk) {
controller.enqueue({
type: ChunkType.THINKING_START
})
isFirstThinkingChunk = false
}
controller.enqueue({
type: ChunkType.THINKING_DELTA,
text: output.summary.map((s) => s.text).join('\n')
@ -510,6 +524,12 @@ export class OpenAIResponseAPIClient extends OpenAIBaseClient<
hasReasoningSummary = true
break
case 'response.reasoning_summary_text.delta':
if (isFirstThinkingChunk) {
controller.enqueue({
type: ChunkType.THINKING_START
})
isFirstThinkingChunk = false
}
controller.enqueue({
type: ChunkType.THINKING_DELTA,
text: chunk.delta
@ -535,6 +555,12 @@ export class OpenAIResponseAPIClient extends OpenAIBaseClient<
})
break
case 'response.output_text.delta': {
if (isFirstTextChunk) {
controller.enqueue({
type: ChunkType.TEXT_START
})
isFirstTextChunk = false
}
controller.enqueue({
type: ChunkType.TEXT_DELTA,
text: chunk.delta

View File

@ -1,5 +1,5 @@
import Logger from '@renderer/config/logger'
import { ChunkType, TextCompleteChunk, TextDeltaChunk } from '@renderer/types/chunk'
import { ChunkType, TextDeltaChunk } from '@renderer/types/chunk'
import { CompletionsParams, CompletionsResult, GenericChunk } from '../schemas'
import { CompletionsContext, CompletionsMiddleware } from '../types'
@ -38,7 +38,6 @@ export const TextChunkMiddleware: CompletionsMiddleware =
// 用于跨chunk的状态管理
let accumulatedTextContent = ''
let hasTextCompleteEventEnqueue = false
const enhancedTextStream = resultFromUpstream.pipeThrough(
new TransformStream<GenericChunk, GenericChunk>({
transform(chunk: GenericChunk, controller) {
@ -53,18 +52,7 @@ export const TextChunkMiddleware: CompletionsMiddleware =
// 创建新的chunk包含处理后的文本
controller.enqueue(chunk)
} else if (chunk.type === ChunkType.TEXT_COMPLETE) {
const textChunk = chunk as TextCompleteChunk
controller.enqueue({
...textChunk,
text: accumulatedTextContent
})
if (params.onResponse) {
params.onResponse(accumulatedTextContent, true)
}
hasTextCompleteEventEnqueue = true
accumulatedTextContent = ''
} else if (accumulatedTextContent && !hasTextCompleteEventEnqueue) {
} else if (accumulatedTextContent && chunk.type !== ChunkType.TEXT_START) {
if (chunk.type === ChunkType.LLM_RESPONSE_COMPLETE) {
const finalText = accumulatedTextContent
ctx._internal.customState!.accumulatedText = finalText
@ -89,7 +77,6 @@ export const TextChunkMiddleware: CompletionsMiddleware =
})
controller.enqueue(chunk)
}
hasTextCompleteEventEnqueue = true
accumulatedTextContent = ''
} else {
// 其他类型的chunk直接传递

View File

@ -65,17 +65,7 @@ export const ThinkChunkMiddleware: CompletionsMiddleware =
thinking_millsec: thinkingStartTime > 0 ? Date.now() - thinkingStartTime : 0
}
controller.enqueue(enhancedChunk)
} else if (chunk.type === ChunkType.THINKING_COMPLETE) {
const thinkingCompleteChunk = chunk as ThinkingCompleteChunk
controller.enqueue({
...thinkingCompleteChunk,
text: accumulatedThinkingContent,
thinking_millsec: thinkingStartTime > 0 ? Date.now() - thinkingStartTime : 0
})
hasThinkingContent = false
accumulatedThinkingContent = ''
thinkingStartTime = 0
} else if (hasThinkingContent && thinkingStartTime > 0) {
} else if (hasThinkingContent && thinkingStartTime > 0 && chunk.type !== ChunkType.THINKING_START) {
// 收到任何非THINKING_DELTA的chunk时如果有累积的思考内容生成THINKING_COMPLETE
const thinkingCompleteChunk: ThinkingCompleteChunk = {
type: ChunkType.THINKING_COMPLETE,

View File

@ -79,6 +79,7 @@ function createToolUseExtractionTransform(
toolCounter += toolUseResponses.length
if (toolUseResponses.length > 0) {
controller.enqueue({ type: ChunkType.TEXT_COMPLETE, text: '' })
// 生成 MCP_TOOL_CREATED chunk
const mcpToolCreatedChunk: MCPToolCreatedChunk = {
type: ChunkType.MCP_TOOL_CREATED,

View File

@ -19,8 +19,6 @@ interface Props {
role: Message['role']
}
const toolUseRegex = /<tool_use>([\s\S]*?)<\/tool_use>/g
const MainTextBlock: React.FC<Props> = ({ block, citationBlockId, role, mentions = [] }) => {
// Use the passed citationBlockId directly in the selector
const { renderInputMessageAsMarkdown } = useSettings()
@ -38,10 +36,6 @@ const MainTextBlock: React.FC<Props> = ({ block, citationBlockId, role, mentions
return withCitationTags(block.content, rawCitations, sourceType)
}, [block.content, block.citationReferences, citationBlockId, rawCitations])
const ignoreToolUse = useMemo(() => {
return processedContent.replace(toolUseRegex, '')
}, [processedContent])
return (
<>
{/* Render mentions associated with the message */}
@ -57,7 +51,7 @@ const MainTextBlock: React.FC<Props> = ({ block, citationBlockId, role, mentions
{block.content}
</p>
) : (
<Markdown block={{ ...block, content: ignoreToolUse }} />
<Markdown block={{ ...block, content: processedContent }} />
)}
</>
)

View File

@ -261,51 +261,6 @@ describe('MainTextBlock', () => {
})
describe('content processing', () => {
it('should filter tool_use tags from content', () => {
const testCases = [
{
name: 'single tool_use tag',
content: 'Before <tool_use>tool content</tool_use> after',
expectsFiltering: true
},
{
name: 'multiple tool_use tags',
content: 'Start <tool_use>tool1</tool_use> middle <tool_use>tool2</tool_use> end',
expectsFiltering: true
},
{
name: 'multiline tool_use',
content: `Text before
<tool_use>
multiline
tool content
</tool_use>
text after`,
expectsFiltering: true
},
{
name: 'malformed tool_use',
content: 'Before <tool_use>unclosed tag',
expectsFiltering: false // Should preserve malformed tags
}
]
testCases.forEach(({ content, expectsFiltering }) => {
const block = createMainTextBlock({ content })
const { unmount } = renderMainTextBlock({ block, role: 'assistant' })
const renderedContent = getRenderedMarkdown()
expect(renderedContent).toBeInTheDocument()
if (expectsFiltering) {
// Check that tool_use content is not visible to user
expect(screen.queryByText(/tool content|tool1|tool2|multiline/)).not.toBeInTheDocument()
}
unmount()
})
})
it('should process content through format utilities', () => {
const block = createMainTextBlock({
content: 'Content to process',

View File

@ -8,10 +8,14 @@ import { AssistantMessageStatus } from '@renderer/types/newMessage'
export interface StreamProcessorCallbacks {
// LLM response created
onLLMResponseCreated?: () => void
// Text content start
onTextStart?: () => void
// Text content chunk received
onTextChunk?: (text: string) => void
// Full text content received
onTextComplete?: (text: string) => void
// thinking content start
onThinkingStart?: () => void
// Thinking/reasoning content chunk received (e.g., from Claude)
onThinkingChunk?: (text: string, thinking_millsec?: number) => void
onThinkingComplete?: (text: string, thinking_millsec?: number) => void
@ -54,6 +58,10 @@ export function createStreamProcessor(callbacks: StreamProcessorCallbacks = {})
if (callbacks.onLLMResponseCreated) callbacks.onLLMResponseCreated()
break
}
case ChunkType.TEXT_START: {
if (callbacks.onTextStart) callbacks.onTextStart()
break
}
case ChunkType.TEXT_DELTA: {
if (callbacks.onTextChunk) callbacks.onTextChunk(data.text)
break
@ -62,6 +70,10 @@ export function createStreamProcessor(callbacks: StreamProcessorCallbacks = {})
if (callbacks.onTextComplete) callbacks.onTextComplete(data.text)
break
}
case ChunkType.THINKING_START: {
if (callbacks.onThinkingStart) callbacks.onThinkingStart()
break
}
case ChunkType.THINKING_DELTA: {
if (callbacks.onThinkingChunk) callbacks.onThinkingChunk(data.text, data.thinking_millsec)
break

View File

@ -41,7 +41,7 @@ import {
createTranslationBlock,
resetAssistantMessage
} from '@renderer/utils/messageUtils/create'
import { getMainTextContent } from '@renderer/utils/messageUtils/find'
import { findMainTextBlocks, getMainTextContent } from '@renderer/utils/messageUtils/find'
import { getTopicQueue } from '@renderer/utils/queue'
import { waitForTopicQueue } from '@renderer/utils/queue'
import { isOnHomePage } from '@renderer/utils/window'
@ -226,31 +226,6 @@ export const cleanupMultipleBlocks = (dispatch: AppDispatch, blockIds: string[])
}
}
// // 修改: 节流更新单个块的内容/状态到数据库 (仅用于 Text/Thinking Chunks)
// export const throttledBlockDbUpdate = throttle(
// async (blockId: string, blockChanges: Partial<MessageBlock>) => {
// // Check if blockId is valid before attempting update
// if (!blockId) {
// console.warn('[DB Throttle Block Update] Attempted to update with null/undefined blockId. Skipping.')
// return
// }
// const state = store.getState()
// const block = state.messageBlocks.entities[blockId]
// // throttle是异步函数,可能会在complete事件触发后才执行
// if (
// blockChanges.status === MessageBlockStatus.STREAMING &&
// (block?.status === MessageBlockStatus.SUCCESS || block?.status === MessageBlockStatus.ERROR)
// )
// return
// try {
// } catch (error) {
// console.error(`[DB Throttle Block Update] Failed for block ${blockId}:`, error)
// }
// },
// 300, // 可以调整节流间隔
// { leading: false, trailing: true }
// )
// 新增: 通用的、非节流的函数,用于保存消息和块的更新到数据库
const saveUpdatesToDB = async (
messageId: string,
@ -351,9 +326,9 @@ const fetchAndProcessAssistantResponseImpl = async (
let accumulatedContent = ''
let accumulatedThinking = ''
// 专注于管理UI焦点和块切换
let lastBlockId: string | null = null
let lastBlockType: MessageBlockType | null = null
let currentActiveBlockType: MessageBlockType | null = null
// 专注于块内部的生命周期处理
let initialPlaceholderBlockId: string | null = null
let citationBlockId: string | null = null
@ -365,6 +340,28 @@ const fetchAndProcessAssistantResponseImpl = async (
const toolCallIdToBlockIdMap = new Map<string, string>()
const notificationService = NotificationService.getInstance()
/**
* 使
* - 使
* -
*/
const smartBlockUpdate = (blockId: string, changes: Partial<MessageBlock>, blockType: MessageBlockType) => {
const isBlockTypeChanged = currentActiveBlockType !== null && currentActiveBlockType !== blockType
if (isBlockTypeChanged) {
if (lastBlockId && lastBlockId !== blockId) {
cancelThrottledBlockUpdate(lastBlockId)
}
dispatch(updateOneBlock({ id: blockId, changes }))
saveUpdatedBlockToDB(blockId, assistantMsgId, topicId, getState)
} else {
throttledBlockUpdate(blockId, changes)
}
// 更新当前活跃块类型
currentActiveBlockType = blockType
}
const handleBlockTransition = async (newBlock: MessageBlock, newBlockType: MessageBlockType) => {
lastBlockId = newBlock.id
lastBlockType = newBlockType
@ -428,6 +425,25 @@ const fetchAndProcessAssistantResponseImpl = async (
initialPlaceholderBlockId = baseBlock.id
await handleBlockTransition(baseBlock as PlaceholderMessageBlock, MessageBlockType.UNKNOWN)
},
onTextStart: async () => {
if (initialPlaceholderBlockId) {
lastBlockType = MessageBlockType.MAIN_TEXT
const changes = {
type: MessageBlockType.MAIN_TEXT,
content: accumulatedContent,
status: MessageBlockStatus.STREAMING
}
smartBlockUpdate(initialPlaceholderBlockId, changes, MessageBlockType.MAIN_TEXT)
mainTextBlockId = initialPlaceholderBlockId
initialPlaceholderBlockId = null
} else if (!mainTextBlockId) {
const newBlock = createMainTextBlock(assistantMsgId, accumulatedContent, {
status: MessageBlockStatus.STREAMING
})
mainTextBlockId = newBlock.id
await handleBlockTransition(newBlock, MessageBlockType.MAIN_TEXT)
}
},
onTextChunk: async (text) => {
const citationBlockSource = citationBlockId
? (getState().messageBlocks.entities[citationBlockId] as CitationMessageBlock).response?.source
@ -435,31 +451,11 @@ const fetchAndProcessAssistantResponseImpl = async (
accumulatedContent += text
if (mainTextBlockId) {
const blockChanges: Partial<MessageBlock> = {
content: accumulatedContent,
status: MessageBlockStatus.STREAMING
}
throttledBlockUpdate(mainTextBlockId, blockChanges)
} else if (initialPlaceholderBlockId) {
// 将占位块转换为主文本块
const initialChanges: Partial<MessageBlock> = {
type: MessageBlockType.MAIN_TEXT,
content: accumulatedContent,
status: MessageBlockStatus.STREAMING,
citationReferences: citationBlockId ? [{ citationBlockId, citationBlockSource }] : []
}
mainTextBlockId = initialPlaceholderBlockId
// 清理占位块
initialPlaceholderBlockId = null
lastBlockType = MessageBlockType.MAIN_TEXT
dispatch(updateOneBlock({ id: mainTextBlockId, changes: initialChanges }))
saveUpdatedBlockToDB(mainTextBlockId, assistantMsgId, topicId, getState)
} else {
const newBlock = createMainTextBlock(assistantMsgId, accumulatedContent, {
status: MessageBlockStatus.STREAMING,
citationReferences: citationBlockId ? [{ citationBlockId, citationBlockSource }] : []
})
mainTextBlockId = newBlock.id // 立即设置ID防止竞态条件
await handleBlockTransition(newBlock, MessageBlockType.MAIN_TEXT)
smartBlockUpdate(mainTextBlockId, blockChanges, MessageBlockType.MAIN_TEXT)
}
},
onTextComplete: async (finalText) => {
@ -468,18 +464,35 @@ const fetchAndProcessAssistantResponseImpl = async (
content: finalText,
status: MessageBlockStatus.SUCCESS
}
cancelThrottledBlockUpdate(mainTextBlockId)
dispatch(updateOneBlock({ id: mainTextBlockId, changes }))
saveUpdatedBlockToDB(mainTextBlockId, assistantMsgId, topicId, getState)
if (!assistant.enableWebSearch) {
mainTextBlockId = null
}
smartBlockUpdate(mainTextBlockId, changes, MessageBlockType.MAIN_TEXT)
mainTextBlockId = null
} else {
console.warn(
`[onTextComplete] Received text.complete but last block was not MAIN_TEXT (was ${lastBlockType}) or lastBlockId is null.`
)
}
},
onThinkingStart: async () => {
if (initialPlaceholderBlockId) {
lastBlockType = MessageBlockType.THINKING
const changes = {
type: MessageBlockType.THINKING,
content: accumulatedThinking,
status: MessageBlockStatus.STREAMING,
thinking_millsec: 0
}
thinkingBlockId = initialPlaceholderBlockId
initialPlaceholderBlockId = null
smartBlockUpdate(thinkingBlockId, changes, MessageBlockType.THINKING)
} else if (!thinkingBlockId) {
const newBlock = createThinkingBlock(assistantMsgId, accumulatedThinking, {
status: MessageBlockStatus.STREAMING,
thinking_millsec: 0
})
thinkingBlockId = newBlock.id
await handleBlockTransition(newBlock, MessageBlockType.THINKING)
}
},
onThinkingChunk: async (text, thinking_millsec) => {
accumulatedThinking += text
if (thinkingBlockId) {
@ -488,26 +501,7 @@ const fetchAndProcessAssistantResponseImpl = async (
status: MessageBlockStatus.STREAMING,
thinking_millsec: thinking_millsec
}
throttledBlockUpdate(thinkingBlockId, blockChanges)
} else if (initialPlaceholderBlockId) {
// First chunk for this block: Update type and status immediately
lastBlockType = MessageBlockType.THINKING
const initialChanges: Partial<MessageBlock> = {
type: MessageBlockType.THINKING,
content: accumulatedThinking,
status: MessageBlockStatus.STREAMING
}
thinkingBlockId = initialPlaceholderBlockId
initialPlaceholderBlockId = null
dispatch(updateOneBlock({ id: thinkingBlockId, changes: initialChanges }))
saveUpdatedBlockToDB(thinkingBlockId, assistantMsgId, topicId, getState)
} else {
const newBlock = createThinkingBlock(assistantMsgId, accumulatedThinking, {
status: MessageBlockStatus.STREAMING,
thinking_millsec: 0
})
thinkingBlockId = newBlock.id // 立即设置ID防止竞态条件
await handleBlockTransition(newBlock, MessageBlockType.THINKING)
smartBlockUpdate(thinkingBlockId, blockChanges, MessageBlockType.THINKING)
}
},
onThinkingComplete: (finalText, final_thinking_millsec) => {
@ -518,9 +512,7 @@ const fetchAndProcessAssistantResponseImpl = async (
status: MessageBlockStatus.SUCCESS,
thinking_millsec: final_thinking_millsec
}
cancelThrottledBlockUpdate(thinkingBlockId)
dispatch(updateOneBlock({ id: thinkingBlockId, changes }))
saveUpdatedBlockToDB(thinkingBlockId, assistantMsgId, topicId, getState)
smartBlockUpdate(thinkingBlockId, changes, MessageBlockType.THINKING)
} else {
console.warn(
`[onThinkingComplete] Received thinking.complete but last block was not THINKING (was ${lastBlockType}) or lastBlockId is null.`
@ -539,8 +531,7 @@ const fetchAndProcessAssistantResponseImpl = async (
}
toolBlockId = initialPlaceholderBlockId
initialPlaceholderBlockId = null
dispatch(updateOneBlock({ id: toolBlockId, changes }))
saveUpdatedBlockToDB(toolBlockId, assistantMsgId, topicId, getState)
smartBlockUpdate(toolBlockId, changes, MessageBlockType.TOOL)
toolCallIdToBlockIdMap.set(toolResponse.id, toolBlockId)
} else if (toolResponse.status === 'pending') {
const toolBlock = createToolBlock(assistantMsgId, toolResponse.id, {
@ -566,8 +557,7 @@ const fetchAndProcessAssistantResponseImpl = async (
status: MessageBlockStatus.PROCESSING,
metadata: { rawMcpToolResponse: toolResponse }
}
dispatch(updateOneBlock({ id: targetBlockId, changes }))
saveUpdatedBlockToDB(targetBlockId, assistantMsgId, topicId, getState)
smartBlockUpdate(targetBlockId, changes, MessageBlockType.TOOL)
} else if (!targetBlockId) {
console.warn(
`[onToolCallInProgress] No block ID found for tool ID: ${toolResponse.id}. Available mappings:`,
@ -601,9 +591,7 @@ const fetchAndProcessAssistantResponseImpl = async (
if (finalStatus === MessageBlockStatus.ERROR) {
changes.error = { message: `Tool execution failed/error`, details: toolResponse.response }
}
cancelThrottledBlockUpdate(existingBlockId)
dispatch(updateOneBlock({ id: existingBlockId, changes }))
saveUpdatedBlockToDB(existingBlockId, assistantMsgId, topicId, getState)
smartBlockUpdate(existingBlockId, changes, MessageBlockType.TOOL)
} else {
console.warn(
`[onToolCallComplete] Received unhandled tool status: ${toolResponse.status} for ID: ${toolResponse.id}`
@ -624,8 +612,7 @@ const fetchAndProcessAssistantResponseImpl = async (
knowledge: externalToolResult.knowledge,
status: MessageBlockStatus.SUCCESS
}
dispatch(updateOneBlock({ id: citationBlockId, changes }))
saveUpdatedBlockToDB(citationBlockId, assistantMsgId, topicId, getState)
smartBlockUpdate(citationBlockId, changes, MessageBlockType.CITATION)
} else {
console.error('[onExternalToolComplete] citationBlockId is null. Cannot update.')
}
@ -639,8 +626,7 @@ const fetchAndProcessAssistantResponseImpl = async (
status: MessageBlockStatus.PROCESSING
}
lastBlockType = MessageBlockType.CITATION
dispatch(updateOneBlock({ id: initialPlaceholderBlockId, changes }))
saveUpdatedBlockToDB(initialPlaceholderBlockId, assistantMsgId, topicId, getState)
smartBlockUpdate(initialPlaceholderBlockId, changes, MessageBlockType.CITATION)
initialPlaceholderBlockId = null
} else {
const citationBlock = createCitationBlock(assistantMsgId, {}, { status: MessageBlockStatus.PROCESSING })
@ -656,22 +642,19 @@ const fetchAndProcessAssistantResponseImpl = async (
response: llmWebSearchResult,
status: MessageBlockStatus.SUCCESS
}
dispatch(updateOneBlock({ id: blockId, changes }))
saveUpdatedBlockToDB(blockId, assistantMsgId, topicId, getState)
smartBlockUpdate(blockId, changes, MessageBlockType.CITATION)
if (mainTextBlockId) {
const state = getState()
const existingMainTextBlock = state.messageBlocks.entities[mainTextBlockId]
if (existingMainTextBlock && existingMainTextBlock.type === MessageBlockType.MAIN_TEXT) {
const currentRefs = existingMainTextBlock.citationReferences || []
const mainTextChanges = {
citationReferences: [...currentRefs, { blockId, citationBlockSource: llmWebSearchResult.source }]
}
dispatch(updateOneBlock({ id: mainTextBlockId, changes: mainTextChanges }))
saveUpdatedBlockToDB(mainTextBlockId, assistantMsgId, topicId, getState)
const state = getState()
const existingMainTextBlocks = findMainTextBlocks(state.messages.entities[assistantMsgId])
if (existingMainTextBlocks.length > 0) {
const existingMainTextBlock = existingMainTextBlocks[0]
const currentRefs = existingMainTextBlock.citationReferences || []
const mainTextChanges = {
citationReferences: [...currentRefs, { blockId, citationBlockSource: llmWebSearchResult.source }]
}
mainTextBlockId = null
smartBlockUpdate(existingMainTextBlock.id, mainTextChanges, MessageBlockType.MAIN_TEXT)
}
if (initialPlaceholderBlockId) {
citationBlockId = initialPlaceholderBlockId
initialPlaceholderBlockId = null
@ -687,21 +670,15 @@ const fetchAndProcessAssistantResponseImpl = async (
}
)
citationBlockId = citationBlock.id
if (mainTextBlockId) {
const state = getState()
const existingMainTextBlock = state.messageBlocks.entities[mainTextBlockId]
if (existingMainTextBlock && existingMainTextBlock.type === MessageBlockType.MAIN_TEXT) {
const currentRefs = existingMainTextBlock.citationReferences || []
const mainTextChanges = {
citationReferences: [
...currentRefs,
{ citationBlockId, citationBlockSource: llmWebSearchResult.source }
]
}
dispatch(updateOneBlock({ id: mainTextBlockId, changes: mainTextChanges }))
saveUpdatedBlockToDB(mainTextBlockId, assistantMsgId, topicId, getState)
const state = getState()
const existingMainTextBlocks = findMainTextBlocks(state.messages.entities[assistantMsgId])
if (existingMainTextBlocks.length > 0) {
const existingMainTextBlock = existingMainTextBlocks[0]
const currentRefs = existingMainTextBlock.citationReferences || []
const mainTextChanges = {
citationReferences: [...currentRefs, { citationBlockId, citationBlockSource: llmWebSearchResult.source }]
}
mainTextBlockId = null
smartBlockUpdate(existingMainTextBlock.id, mainTextChanges, MessageBlockType.MAIN_TEXT)
}
await handleBlockTransition(citationBlock, MessageBlockType.CITATION)
}
@ -716,8 +693,7 @@ const fetchAndProcessAssistantResponseImpl = async (
lastBlockType = MessageBlockType.IMAGE
imageBlockId = initialPlaceholderBlockId
initialPlaceholderBlockId = null
dispatch(updateOneBlock({ id: imageBlockId, changes: initialChanges }))
saveUpdatedBlockToDB(imageBlockId, assistantMsgId, topicId, getState)
smartBlockUpdate(imageBlockId, initialChanges, MessageBlockType.IMAGE)
} else if (!imageBlockId) {
const imageBlock = createImageBlock(assistantMsgId, {
status: MessageBlockStatus.STREAMING
@ -734,8 +710,7 @@ const fetchAndProcessAssistantResponseImpl = async (
metadata: { generateImageResponse: imageData },
status: MessageBlockStatus.STREAMING
}
dispatch(updateOneBlock({ id: imageBlockId, changes }))
saveUpdatedBlockToDB(imageBlockId, assistantMsgId, topicId, getState)
smartBlockUpdate(imageBlockId, changes, MessageBlockType.IMAGE)
}
},
onImageGenerated: (imageData) => {
@ -744,8 +719,7 @@ const fetchAndProcessAssistantResponseImpl = async (
const changes: Partial<ImageMessageBlock> = {
status: MessageBlockStatus.SUCCESS
}
dispatch(updateOneBlock({ id: imageBlockId, changes }))
saveUpdatedBlockToDB(imageBlockId, assistantMsgId, topicId, getState)
smartBlockUpdate(imageBlockId, changes, MessageBlockType.IMAGE)
} else {
const imageUrl = imageData.images?.[0] || 'placeholder_image_url'
const changes: Partial<ImageMessageBlock> = {
@ -753,8 +727,7 @@ const fetchAndProcessAssistantResponseImpl = async (
metadata: { generateImageResponse: imageData },
status: MessageBlockStatus.SUCCESS
}
dispatch(updateOneBlock({ id: imageBlockId, changes }))
saveUpdatedBlockToDB(imageBlockId, assistantMsgId, topicId, getState)
smartBlockUpdate(imageBlockId, changes, MessageBlockType.IMAGE)
}
} else {
console.error('[onImageGenerated] Last block was not an Image block or ID is missing.')
@ -802,9 +775,7 @@ const fetchAndProcessAssistantResponseImpl = async (
const changes: Partial<MessageBlock> = {
status: isErrorTypeAbort ? MessageBlockStatus.PAUSED : MessageBlockStatus.ERROR
}
cancelThrottledBlockUpdate(possibleBlockId)
dispatch(updateOneBlock({ id: possibleBlockId, changes }))
saveUpdatedBlockToDB(possibleBlockId, assistantMsgId, topicId, getState)
smartBlockUpdate(possibleBlockId, changes, MessageBlockType.MAIN_TEXT)
}
const errorBlock = createErrorBlock(assistantMsgId, serializableError, { status: MessageBlockStatus.SUCCESS })
@ -846,9 +817,7 @@ const fetchAndProcessAssistantResponseImpl = async (
const changes: Partial<MessageBlock> = {
status: MessageBlockStatus.SUCCESS
}
cancelThrottledBlockUpdate(possibleBlockId)
dispatch(updateOneBlock({ id: possibleBlockId, changes }))
saveUpdatedBlockToDB(possibleBlockId, assistantMsgId, topicId, getState)
smartBlockUpdate(possibleBlockId, changes, lastBlockType!)
}
const endTime = Date.now()

View File

@ -19,13 +19,16 @@ export enum ChunkType {
EXTERNEL_TOOL_COMPLETE = 'externel_tool_complete',
LLM_RESPONSE_CREATED = 'llm_response_created',
LLM_RESPONSE_IN_PROGRESS = 'llm_response_in_progress',
TEXT_START = 'text.start',
TEXT_DELTA = 'text.delta',
TEXT_COMPLETE = 'text.complete',
AUDIO_START = 'audio.start',
AUDIO_DELTA = 'audio.delta',
AUDIO_COMPLETE = 'audio.complete',
IMAGE_CREATED = 'image.created',
IMAGE_DELTA = 'image.delta',
IMAGE_COMPLETE = 'image.complete',
THINKING_START = 'thinking.start',
THINKING_DELTA = 'thinking.delta',
THINKING_COMPLETE = 'thinking.complete',
LLM_WEB_SEARCH_IN_PROGRESS = 'llm_websearch_in_progress',
@ -56,6 +59,18 @@ export interface LLMResponseInProgressChunk {
response?: Response
type: ChunkType.LLM_RESPONSE_IN_PROGRESS
}
export interface TextStartChunk {
/**
* The type of the chunk
*/
type: ChunkType.TEXT_START
/**
* The ID of the chunk
*/
chunk_id?: number
}
export interface TextDeltaChunk {
/**
* The text content of the chunk
@ -90,6 +105,13 @@ export interface TextCompleteChunk {
type: ChunkType.TEXT_COMPLETE
}
export interface AudioStartChunk {
/**
* The type of the chunk
*/
type: ChunkType.AUDIO_START
}
export interface AudioDeltaChunk {
/**
* A chunk of Base64 encoded audio data
@ -140,6 +162,13 @@ export interface ImageCompleteChunk {
image?: { type: 'url' | 'base64'; images: string[] }
}
export interface ThinkingStartChunk {
/**
* The type of the chunk
*/
type: ChunkType.THINKING_START
}
export interface ThinkingDeltaChunk {
/**
* The text content of the chunk
@ -365,13 +394,16 @@ export type Chunk =
| ExternalToolCompleteChunk // 外部工具调用完成外部工具包含搜索互联网知识库MCP服务器
| LLMResponseCreatedChunk // 大模型响应创建,返回即将创建的块类型
| LLMResponseInProgressChunk // 大模型响应进行中
| TextStartChunk // 文本内容生成开始
| TextDeltaChunk // 文本内容生成中
| TextCompleteChunk // 文本内容生成完成
| AudioStartChunk // 音频内容生成开始
| AudioDeltaChunk // 音频内容生成中
| AudioCompleteChunk // 音频内容生成完成
| ImageCreatedChunk // 图片内容创建
| ImageDeltaChunk // 图片内容生成中
| ImageCompleteChunk // 图片内容生成完成
| ThinkingStartChunk // 思考内容生成开始
| ThinkingDeltaChunk // 思考内容生成中
| ThinkingCompleteChunk // 思考内容生成完成
| LLMWebSearchInProgressChunk // 大模型内部搜索进行中,无明显特征