diff --git a/src/renderer/src/aiCore/clients/anthropic/AnthropicAPIClient.ts b/src/renderer/src/aiCore/clients/anthropic/AnthropicAPIClient.ts index c946f114fe..93176a9566 100644 --- a/src/renderer/src/aiCore/clients/anthropic/AnthropicAPIClient.ts +++ b/src/renderer/src/aiCore/clients/anthropic/AnthropicAPIClient.ts @@ -49,10 +49,10 @@ import { LLMWebSearchCompleteChunk, LLMWebSearchInProgressChunk, MCPToolCreatedChunk, - TextCompleteChunk, TextDeltaChunk, - ThinkingCompleteChunk, - ThinkingDeltaChunk + TextStartChunk, + ThinkingDeltaChunk, + ThinkingStartChunk } from '@renderer/types/chunk' import { type Message } from '@renderer/types/newMessage' import { @@ -519,7 +519,6 @@ export class AnthropicAPIClient extends BaseApiClient< return () => { let accumulatedJson = '' const toolCalls: Record = {} - const ChunkIdTypeMap: Record = {} return { async transform(rawChunk: AnthropicSdkRawChunk, controller: TransformStreamDefaultController) { switch (rawChunk.type) { @@ -615,16 +614,16 @@ export class AnthropicAPIClient extends BaseApiClient< break } case 'text': { - if (!ChunkIdTypeMap[rawChunk.index]) { - ChunkIdTypeMap[rawChunk.index] = ChunkType.TEXT_DELTA // 用textdelta代表文本块 - } + controller.enqueue({ + type: ChunkType.TEXT_START + } as TextStartChunk) break } case 'thinking': case 'redacted_thinking': { - if (!ChunkIdTypeMap[rawChunk.index]) { - ChunkIdTypeMap[rawChunk.index] = ChunkType.THINKING_DELTA // 用thinkingdelta代表思考块 - } + controller.enqueue({ + type: ChunkType.THINKING_START + } as ThinkingStartChunk) break } } @@ -661,15 +660,6 @@ export class AnthropicAPIClient extends BaseApiClient< break } case 'content_block_stop': { - if (ChunkIdTypeMap[rawChunk.index] === ChunkType.TEXT_DELTA) { - controller.enqueue({ - type: ChunkType.TEXT_COMPLETE - } as TextCompleteChunk) - } else if (ChunkIdTypeMap[rawChunk.index] === ChunkType.THINKING_DELTA) { - controller.enqueue({ - type: ChunkType.THINKING_COMPLETE - } as ThinkingCompleteChunk) - } const toolCall = toolCalls[rawChunk.index] if (toolCall) { try { diff --git a/src/renderer/src/aiCore/clients/gemini/GeminiAPIClient.ts b/src/renderer/src/aiCore/clients/gemini/GeminiAPIClient.ts index d32564d962..bcf7c0d592 100644 --- a/src/renderer/src/aiCore/clients/gemini/GeminiAPIClient.ts +++ b/src/renderer/src/aiCore/clients/gemini/GeminiAPIClient.ts @@ -41,7 +41,7 @@ import { ToolCallResponse, WebSearchSource } from '@renderer/types' -import { ChunkType, LLMWebSearchCompleteChunk } from '@renderer/types/chunk' +import { ChunkType, LLMWebSearchCompleteChunk, TextStartChunk, ThinkingStartChunk } from '@renderer/types/chunk' import { Message } from '@renderer/types/newMessage' import { GeminiOptions, @@ -547,20 +547,34 @@ export class GeminiAPIClient extends BaseApiClient< } getResponseChunkTransformer(): ResponseChunkTransformer { + const toolCalls: FunctionCall[] = [] + let isFirstTextChunk = true + let isFirstThinkingChunk = true return () => ({ async transform(chunk: GeminiSdkRawChunk, controller: TransformStreamDefaultController) { - const toolCalls: FunctionCall[] = [] if (chunk.candidates && chunk.candidates.length > 0) { for (const candidate of chunk.candidates) { if (candidate.content) { candidate.content.parts?.forEach((part) => { const text = part.text || '' if (part.thought) { + if (isFirstThinkingChunk) { + controller.enqueue({ + type: ChunkType.THINKING_START + } as ThinkingStartChunk) + isFirstThinkingChunk = false + } controller.enqueue({ type: ChunkType.THINKING_DELTA, text: text }) } else if (part.text) { + if (isFirstTextChunk) { + controller.enqueue({ + type: ChunkType.TEXT_START + } as TextStartChunk) + isFirstTextChunk = false + } controller.enqueue({ type: ChunkType.TEXT_DELTA, text: text diff --git a/src/renderer/src/aiCore/clients/openai/OpenAIApiClient.ts b/src/renderer/src/aiCore/clients/openai/OpenAIApiClient.ts index c1994dcb95..e3ccc8edd0 100644 --- a/src/renderer/src/aiCore/clients/openai/OpenAIApiClient.ts +++ b/src/renderer/src/aiCore/clients/openai/OpenAIApiClient.ts @@ -31,7 +31,7 @@ import { ToolCallResponse, WebSearchSource } from '@renderer/types' -import { ChunkType } from '@renderer/types/chunk' +import { ChunkType, TextStartChunk, ThinkingStartChunk } from '@renderer/types/chunk' import { Message } from '@renderer/types/newMessage' import { OpenAISdkMessageParam, @@ -659,6 +659,8 @@ export class OpenAIAPIClient extends OpenAIBaseClient< isFinished = true } + let isFirstThinkingChunk = true + let isFirstTextChunk = true return (context: ResponseChunkTransformerContext) => ({ async transform(chunk: OpenAISdkRawChunk, controller: TransformStreamDefaultController) { // 持续更新usage信息 @@ -699,6 +701,12 @@ export class OpenAIAPIClient extends OpenAIBaseClient< // @ts-ignore - reasoning_content is not in standard OpenAI types but some providers use it const reasoningText = contentSource.reasoning_content || contentSource.reasoning if (reasoningText) { + if (isFirstThinkingChunk) { + controller.enqueue({ + type: ChunkType.THINKING_START + } as ThinkingStartChunk) + isFirstThinkingChunk = false + } controller.enqueue({ type: ChunkType.THINKING_DELTA, text: reasoningText @@ -707,6 +715,12 @@ export class OpenAIAPIClient extends OpenAIBaseClient< // 处理文本内容 if (contentSource.content) { + if (isFirstTextChunk) { + controller.enqueue({ + type: ChunkType.TEXT_START + } as TextStartChunk) + isFirstTextChunk = false + } controller.enqueue({ type: ChunkType.TEXT_DELTA, text: contentSource.content diff --git a/src/renderer/src/aiCore/clients/openai/OpenAIResponseAPIClient.ts b/src/renderer/src/aiCore/clients/openai/OpenAIResponseAPIClient.ts index 2af0b8376f..898e7eec44 100644 --- a/src/renderer/src/aiCore/clients/openai/OpenAIResponseAPIClient.ts +++ b/src/renderer/src/aiCore/clients/openai/OpenAIResponseAPIClient.ts @@ -424,6 +424,8 @@ export class OpenAIResponseAPIClient extends OpenAIBaseClient< const outputItems: OpenAI.Responses.ResponseOutputItem[] = [] let hasBeenCollectedToolCalls = false let hasReasoningSummary = false + let isFirstThinkingChunk = true + let isFirstTextChunk = true return () => ({ async transform(chunk: OpenAIResponseSdkRawChunk, controller: TransformStreamDefaultController) { // 处理chunk @@ -435,6 +437,12 @@ export class OpenAIResponseAPIClient extends OpenAIBaseClient< switch (output.type) { case 'message': if (output.content[0].type === 'output_text') { + if (isFirstTextChunk) { + controller.enqueue({ + type: ChunkType.TEXT_START + }) + isFirstTextChunk = false + } controller.enqueue({ type: ChunkType.TEXT_DELTA, text: output.content[0].text @@ -451,6 +459,12 @@ export class OpenAIResponseAPIClient extends OpenAIBaseClient< } break case 'reasoning': + if (isFirstThinkingChunk) { + controller.enqueue({ + type: ChunkType.THINKING_START + }) + isFirstThinkingChunk = false + } controller.enqueue({ type: ChunkType.THINKING_DELTA, text: output.summary.map((s) => s.text).join('\n') @@ -510,6 +524,12 @@ export class OpenAIResponseAPIClient extends OpenAIBaseClient< hasReasoningSummary = true break case 'response.reasoning_summary_text.delta': + if (isFirstThinkingChunk) { + controller.enqueue({ + type: ChunkType.THINKING_START + }) + isFirstThinkingChunk = false + } controller.enqueue({ type: ChunkType.THINKING_DELTA, text: chunk.delta @@ -535,6 +555,12 @@ export class OpenAIResponseAPIClient extends OpenAIBaseClient< }) break case 'response.output_text.delta': { + if (isFirstTextChunk) { + controller.enqueue({ + type: ChunkType.TEXT_START + }) + isFirstTextChunk = false + } controller.enqueue({ type: ChunkType.TEXT_DELTA, text: chunk.delta diff --git a/src/renderer/src/aiCore/middleware/core/TextChunkMiddleware.ts b/src/renderer/src/aiCore/middleware/core/TextChunkMiddleware.ts index 3905d52058..0affc6b382 100644 --- a/src/renderer/src/aiCore/middleware/core/TextChunkMiddleware.ts +++ b/src/renderer/src/aiCore/middleware/core/TextChunkMiddleware.ts @@ -1,5 +1,5 @@ import Logger from '@renderer/config/logger' -import { ChunkType, TextCompleteChunk, TextDeltaChunk } from '@renderer/types/chunk' +import { ChunkType, TextDeltaChunk } from '@renderer/types/chunk' import { CompletionsParams, CompletionsResult, GenericChunk } from '../schemas' import { CompletionsContext, CompletionsMiddleware } from '../types' @@ -38,7 +38,6 @@ export const TextChunkMiddleware: CompletionsMiddleware = // 用于跨chunk的状态管理 let accumulatedTextContent = '' - let hasTextCompleteEventEnqueue = false const enhancedTextStream = resultFromUpstream.pipeThrough( new TransformStream({ transform(chunk: GenericChunk, controller) { @@ -53,18 +52,7 @@ export const TextChunkMiddleware: CompletionsMiddleware = // 创建新的chunk,包含处理后的文本 controller.enqueue(chunk) - } else if (chunk.type === ChunkType.TEXT_COMPLETE) { - const textChunk = chunk as TextCompleteChunk - controller.enqueue({ - ...textChunk, - text: accumulatedTextContent - }) - if (params.onResponse) { - params.onResponse(accumulatedTextContent, true) - } - hasTextCompleteEventEnqueue = true - accumulatedTextContent = '' - } else if (accumulatedTextContent && !hasTextCompleteEventEnqueue) { + } else if (accumulatedTextContent && chunk.type !== ChunkType.TEXT_START) { if (chunk.type === ChunkType.LLM_RESPONSE_COMPLETE) { const finalText = accumulatedTextContent ctx._internal.customState!.accumulatedText = finalText @@ -89,7 +77,6 @@ export const TextChunkMiddleware: CompletionsMiddleware = }) controller.enqueue(chunk) } - hasTextCompleteEventEnqueue = true accumulatedTextContent = '' } else { // 其他类型的chunk直接传递 diff --git a/src/renderer/src/aiCore/middleware/core/ThinkChunkMiddleware.ts b/src/renderer/src/aiCore/middleware/core/ThinkChunkMiddleware.ts index dccdde7f10..22eaabe96d 100644 --- a/src/renderer/src/aiCore/middleware/core/ThinkChunkMiddleware.ts +++ b/src/renderer/src/aiCore/middleware/core/ThinkChunkMiddleware.ts @@ -65,17 +65,7 @@ export const ThinkChunkMiddleware: CompletionsMiddleware = thinking_millsec: thinkingStartTime > 0 ? Date.now() - thinkingStartTime : 0 } controller.enqueue(enhancedChunk) - } else if (chunk.type === ChunkType.THINKING_COMPLETE) { - const thinkingCompleteChunk = chunk as ThinkingCompleteChunk - controller.enqueue({ - ...thinkingCompleteChunk, - text: accumulatedThinkingContent, - thinking_millsec: thinkingStartTime > 0 ? Date.now() - thinkingStartTime : 0 - }) - hasThinkingContent = false - accumulatedThinkingContent = '' - thinkingStartTime = 0 - } else if (hasThinkingContent && thinkingStartTime > 0) { + } else if (hasThinkingContent && thinkingStartTime > 0 && chunk.type !== ChunkType.THINKING_START) { // 收到任何非THINKING_DELTA的chunk时,如果有累积的思考内容,生成THINKING_COMPLETE const thinkingCompleteChunk: ThinkingCompleteChunk = { type: ChunkType.THINKING_COMPLETE, diff --git a/src/renderer/src/aiCore/middleware/feat/ToolUseExtractionMiddleware.ts b/src/renderer/src/aiCore/middleware/feat/ToolUseExtractionMiddleware.ts index b53d7348f1..3e606f6683 100644 --- a/src/renderer/src/aiCore/middleware/feat/ToolUseExtractionMiddleware.ts +++ b/src/renderer/src/aiCore/middleware/feat/ToolUseExtractionMiddleware.ts @@ -79,6 +79,7 @@ function createToolUseExtractionTransform( toolCounter += toolUseResponses.length if (toolUseResponses.length > 0) { + controller.enqueue({ type: ChunkType.TEXT_COMPLETE, text: '' }) // 生成 MCP_TOOL_CREATED chunk const mcpToolCreatedChunk: MCPToolCreatedChunk = { type: ChunkType.MCP_TOOL_CREATED, diff --git a/src/renderer/src/pages/home/Messages/Blocks/MainTextBlock.tsx b/src/renderer/src/pages/home/Messages/Blocks/MainTextBlock.tsx index 524fcd4160..0f0d52907d 100644 --- a/src/renderer/src/pages/home/Messages/Blocks/MainTextBlock.tsx +++ b/src/renderer/src/pages/home/Messages/Blocks/MainTextBlock.tsx @@ -19,8 +19,6 @@ interface Props { role: Message['role'] } -const toolUseRegex = /([\s\S]*?)<\/tool_use>/g - const MainTextBlock: React.FC = ({ block, citationBlockId, role, mentions = [] }) => { // Use the passed citationBlockId directly in the selector const { renderInputMessageAsMarkdown } = useSettings() @@ -38,10 +36,6 @@ const MainTextBlock: React.FC = ({ block, citationBlockId, role, mentions return withCitationTags(block.content, rawCitations, sourceType) }, [block.content, block.citationReferences, citationBlockId, rawCitations]) - const ignoreToolUse = useMemo(() => { - return processedContent.replace(toolUseRegex, '') - }, [processedContent]) - return ( <> {/* Render mentions associated with the message */} @@ -57,7 +51,7 @@ const MainTextBlock: React.FC = ({ block, citationBlockId, role, mentions {block.content}

) : ( - + )} ) diff --git a/src/renderer/src/pages/home/Messages/Blocks/__tests__/MainTextBlock.test.tsx b/src/renderer/src/pages/home/Messages/Blocks/__tests__/MainTextBlock.test.tsx index 551e0d9371..4683aae9bb 100644 --- a/src/renderer/src/pages/home/Messages/Blocks/__tests__/MainTextBlock.test.tsx +++ b/src/renderer/src/pages/home/Messages/Blocks/__tests__/MainTextBlock.test.tsx @@ -261,51 +261,6 @@ describe('MainTextBlock', () => { }) describe('content processing', () => { - it('should filter tool_use tags from content', () => { - const testCases = [ - { - name: 'single tool_use tag', - content: 'Before tool content after', - expectsFiltering: true - }, - { - name: 'multiple tool_use tags', - content: 'Start tool1 middle tool2 end', - expectsFiltering: true - }, - { - name: 'multiline tool_use', - content: `Text before - - multiline - tool content - -text after`, - expectsFiltering: true - }, - { - name: 'malformed tool_use', - content: 'Before unclosed tag', - expectsFiltering: false // Should preserve malformed tags - } - ] - - testCases.forEach(({ content, expectsFiltering }) => { - const block = createMainTextBlock({ content }) - const { unmount } = renderMainTextBlock({ block, role: 'assistant' }) - - const renderedContent = getRenderedMarkdown() - expect(renderedContent).toBeInTheDocument() - - if (expectsFiltering) { - // Check that tool_use content is not visible to user - expect(screen.queryByText(/tool content|tool1|tool2|multiline/)).not.toBeInTheDocument() - } - - unmount() - }) - }) - it('should process content through format utilities', () => { const block = createMainTextBlock({ content: 'Content to process', diff --git a/src/renderer/src/services/StreamProcessingService.ts b/src/renderer/src/services/StreamProcessingService.ts index 6c166ca6a9..c6afa85e39 100644 --- a/src/renderer/src/services/StreamProcessingService.ts +++ b/src/renderer/src/services/StreamProcessingService.ts @@ -8,10 +8,14 @@ import { AssistantMessageStatus } from '@renderer/types/newMessage' export interface StreamProcessorCallbacks { // LLM response created onLLMResponseCreated?: () => void + // Text content start + onTextStart?: () => void // Text content chunk received onTextChunk?: (text: string) => void // Full text content received onTextComplete?: (text: string) => void + // thinking content start + onThinkingStart?: () => void // Thinking/reasoning content chunk received (e.g., from Claude) onThinkingChunk?: (text: string, thinking_millsec?: number) => void onThinkingComplete?: (text: string, thinking_millsec?: number) => void @@ -54,6 +58,10 @@ export function createStreamProcessor(callbacks: StreamProcessorCallbacks = {}) if (callbacks.onLLMResponseCreated) callbacks.onLLMResponseCreated() break } + case ChunkType.TEXT_START: { + if (callbacks.onTextStart) callbacks.onTextStart() + break + } case ChunkType.TEXT_DELTA: { if (callbacks.onTextChunk) callbacks.onTextChunk(data.text) break @@ -62,6 +70,10 @@ export function createStreamProcessor(callbacks: StreamProcessorCallbacks = {}) if (callbacks.onTextComplete) callbacks.onTextComplete(data.text) break } + case ChunkType.THINKING_START: { + if (callbacks.onThinkingStart) callbacks.onThinkingStart() + break + } case ChunkType.THINKING_DELTA: { if (callbacks.onThinkingChunk) callbacks.onThinkingChunk(data.text, data.thinking_millsec) break diff --git a/src/renderer/src/store/thunk/messageThunk.ts b/src/renderer/src/store/thunk/messageThunk.ts index 996044bb24..63ca59fa43 100644 --- a/src/renderer/src/store/thunk/messageThunk.ts +++ b/src/renderer/src/store/thunk/messageThunk.ts @@ -41,7 +41,7 @@ import { createTranslationBlock, resetAssistantMessage } from '@renderer/utils/messageUtils/create' -import { getMainTextContent } from '@renderer/utils/messageUtils/find' +import { findMainTextBlocks, getMainTextContent } from '@renderer/utils/messageUtils/find' import { getTopicQueue } from '@renderer/utils/queue' import { waitForTopicQueue } from '@renderer/utils/queue' import { isOnHomePage } from '@renderer/utils/window' @@ -226,31 +226,6 @@ export const cleanupMultipleBlocks = (dispatch: AppDispatch, blockIds: string[]) } } -// // 修改: 节流更新单个块的内容/状态到数据库 (仅用于 Text/Thinking Chunks) -// export const throttledBlockDbUpdate = throttle( -// async (blockId: string, blockChanges: Partial) => { -// // Check if blockId is valid before attempting update -// if (!blockId) { -// console.warn('[DB Throttle Block Update] Attempted to update with null/undefined blockId. Skipping.') -// return -// } -// const state = store.getState() -// const block = state.messageBlocks.entities[blockId] -// // throttle是异步函数,可能会在complete事件触发后才执行 -// if ( -// blockChanges.status === MessageBlockStatus.STREAMING && -// (block?.status === MessageBlockStatus.SUCCESS || block?.status === MessageBlockStatus.ERROR) -// ) -// return -// try { -// } catch (error) { -// console.error(`[DB Throttle Block Update] Failed for block ${blockId}:`, error) -// } -// }, -// 300, // 可以调整节流间隔 -// { leading: false, trailing: true } -// ) - // 新增: 通用的、非节流的函数,用于保存消息和块的更新到数据库 const saveUpdatesToDB = async ( messageId: string, @@ -351,9 +326,9 @@ const fetchAndProcessAssistantResponseImpl = async ( let accumulatedContent = '' let accumulatedThinking = '' - // 专注于管理UI焦点和块切换 let lastBlockId: string | null = null let lastBlockType: MessageBlockType | null = null + let currentActiveBlockType: MessageBlockType | null = null // 专注于块内部的生命周期处理 let initialPlaceholderBlockId: string | null = null let citationBlockId: string | null = null @@ -365,6 +340,28 @@ const fetchAndProcessAssistantResponseImpl = async ( const toolCallIdToBlockIdMap = new Map() const notificationService = NotificationService.getInstance() + /** + * 智能更新策略:根据块类型连续性自动判断使用节流还是立即更新 + * - 连续同类块:使用节流(减少重渲染) + * - 块类型切换:立即更新(确保状态正确) + */ + const smartBlockUpdate = (blockId: string, changes: Partial, blockType: MessageBlockType) => { + const isBlockTypeChanged = currentActiveBlockType !== null && currentActiveBlockType !== blockType + + if (isBlockTypeChanged) { + if (lastBlockId && lastBlockId !== blockId) { + cancelThrottledBlockUpdate(lastBlockId) + } + dispatch(updateOneBlock({ id: blockId, changes })) + saveUpdatedBlockToDB(blockId, assistantMsgId, topicId, getState) + } else { + throttledBlockUpdate(blockId, changes) + } + + // 更新当前活跃块类型 + currentActiveBlockType = blockType + } + const handleBlockTransition = async (newBlock: MessageBlock, newBlockType: MessageBlockType) => { lastBlockId = newBlock.id lastBlockType = newBlockType @@ -428,6 +425,25 @@ const fetchAndProcessAssistantResponseImpl = async ( initialPlaceholderBlockId = baseBlock.id await handleBlockTransition(baseBlock as PlaceholderMessageBlock, MessageBlockType.UNKNOWN) }, + onTextStart: async () => { + if (initialPlaceholderBlockId) { + lastBlockType = MessageBlockType.MAIN_TEXT + const changes = { + type: MessageBlockType.MAIN_TEXT, + content: accumulatedContent, + status: MessageBlockStatus.STREAMING + } + smartBlockUpdate(initialPlaceholderBlockId, changes, MessageBlockType.MAIN_TEXT) + mainTextBlockId = initialPlaceholderBlockId + initialPlaceholderBlockId = null + } else if (!mainTextBlockId) { + const newBlock = createMainTextBlock(assistantMsgId, accumulatedContent, { + status: MessageBlockStatus.STREAMING + }) + mainTextBlockId = newBlock.id + await handleBlockTransition(newBlock, MessageBlockType.MAIN_TEXT) + } + }, onTextChunk: async (text) => { const citationBlockSource = citationBlockId ? (getState().messageBlocks.entities[citationBlockId] as CitationMessageBlock).response?.source @@ -435,31 +451,11 @@ const fetchAndProcessAssistantResponseImpl = async ( accumulatedContent += text if (mainTextBlockId) { const blockChanges: Partial = { - content: accumulatedContent, - status: MessageBlockStatus.STREAMING - } - throttledBlockUpdate(mainTextBlockId, blockChanges) - } else if (initialPlaceholderBlockId) { - // 将占位块转换为主文本块 - const initialChanges: Partial = { - type: MessageBlockType.MAIN_TEXT, content: accumulatedContent, status: MessageBlockStatus.STREAMING, citationReferences: citationBlockId ? [{ citationBlockId, citationBlockSource }] : [] } - mainTextBlockId = initialPlaceholderBlockId - // 清理占位块 - initialPlaceholderBlockId = null - lastBlockType = MessageBlockType.MAIN_TEXT - dispatch(updateOneBlock({ id: mainTextBlockId, changes: initialChanges })) - saveUpdatedBlockToDB(mainTextBlockId, assistantMsgId, topicId, getState) - } else { - const newBlock = createMainTextBlock(assistantMsgId, accumulatedContent, { - status: MessageBlockStatus.STREAMING, - citationReferences: citationBlockId ? [{ citationBlockId, citationBlockSource }] : [] - }) - mainTextBlockId = newBlock.id // 立即设置ID,防止竞态条件 - await handleBlockTransition(newBlock, MessageBlockType.MAIN_TEXT) + smartBlockUpdate(mainTextBlockId, blockChanges, MessageBlockType.MAIN_TEXT) } }, onTextComplete: async (finalText) => { @@ -468,18 +464,35 @@ const fetchAndProcessAssistantResponseImpl = async ( content: finalText, status: MessageBlockStatus.SUCCESS } - cancelThrottledBlockUpdate(mainTextBlockId) - dispatch(updateOneBlock({ id: mainTextBlockId, changes })) - saveUpdatedBlockToDB(mainTextBlockId, assistantMsgId, topicId, getState) - if (!assistant.enableWebSearch) { - mainTextBlockId = null - } + smartBlockUpdate(mainTextBlockId, changes, MessageBlockType.MAIN_TEXT) + mainTextBlockId = null } else { console.warn( `[onTextComplete] Received text.complete but last block was not MAIN_TEXT (was ${lastBlockType}) or lastBlockId is null.` ) } }, + onThinkingStart: async () => { + if (initialPlaceholderBlockId) { + lastBlockType = MessageBlockType.THINKING + const changes = { + type: MessageBlockType.THINKING, + content: accumulatedThinking, + status: MessageBlockStatus.STREAMING, + thinking_millsec: 0 + } + thinkingBlockId = initialPlaceholderBlockId + initialPlaceholderBlockId = null + smartBlockUpdate(thinkingBlockId, changes, MessageBlockType.THINKING) + } else if (!thinkingBlockId) { + const newBlock = createThinkingBlock(assistantMsgId, accumulatedThinking, { + status: MessageBlockStatus.STREAMING, + thinking_millsec: 0 + }) + thinkingBlockId = newBlock.id + await handleBlockTransition(newBlock, MessageBlockType.THINKING) + } + }, onThinkingChunk: async (text, thinking_millsec) => { accumulatedThinking += text if (thinkingBlockId) { @@ -488,26 +501,7 @@ const fetchAndProcessAssistantResponseImpl = async ( status: MessageBlockStatus.STREAMING, thinking_millsec: thinking_millsec } - throttledBlockUpdate(thinkingBlockId, blockChanges) - } else if (initialPlaceholderBlockId) { - // First chunk for this block: Update type and status immediately - lastBlockType = MessageBlockType.THINKING - const initialChanges: Partial = { - type: MessageBlockType.THINKING, - content: accumulatedThinking, - status: MessageBlockStatus.STREAMING - } - thinkingBlockId = initialPlaceholderBlockId - initialPlaceholderBlockId = null - dispatch(updateOneBlock({ id: thinkingBlockId, changes: initialChanges })) - saveUpdatedBlockToDB(thinkingBlockId, assistantMsgId, topicId, getState) - } else { - const newBlock = createThinkingBlock(assistantMsgId, accumulatedThinking, { - status: MessageBlockStatus.STREAMING, - thinking_millsec: 0 - }) - thinkingBlockId = newBlock.id // 立即设置ID,防止竞态条件 - await handleBlockTransition(newBlock, MessageBlockType.THINKING) + smartBlockUpdate(thinkingBlockId, blockChanges, MessageBlockType.THINKING) } }, onThinkingComplete: (finalText, final_thinking_millsec) => { @@ -518,9 +512,7 @@ const fetchAndProcessAssistantResponseImpl = async ( status: MessageBlockStatus.SUCCESS, thinking_millsec: final_thinking_millsec } - cancelThrottledBlockUpdate(thinkingBlockId) - dispatch(updateOneBlock({ id: thinkingBlockId, changes })) - saveUpdatedBlockToDB(thinkingBlockId, assistantMsgId, topicId, getState) + smartBlockUpdate(thinkingBlockId, changes, MessageBlockType.THINKING) } else { console.warn( `[onThinkingComplete] Received thinking.complete but last block was not THINKING (was ${lastBlockType}) or lastBlockId is null.` @@ -539,8 +531,7 @@ const fetchAndProcessAssistantResponseImpl = async ( } toolBlockId = initialPlaceholderBlockId initialPlaceholderBlockId = null - dispatch(updateOneBlock({ id: toolBlockId, changes })) - saveUpdatedBlockToDB(toolBlockId, assistantMsgId, topicId, getState) + smartBlockUpdate(toolBlockId, changes, MessageBlockType.TOOL) toolCallIdToBlockIdMap.set(toolResponse.id, toolBlockId) } else if (toolResponse.status === 'pending') { const toolBlock = createToolBlock(assistantMsgId, toolResponse.id, { @@ -566,8 +557,7 @@ const fetchAndProcessAssistantResponseImpl = async ( status: MessageBlockStatus.PROCESSING, metadata: { rawMcpToolResponse: toolResponse } } - dispatch(updateOneBlock({ id: targetBlockId, changes })) - saveUpdatedBlockToDB(targetBlockId, assistantMsgId, topicId, getState) + smartBlockUpdate(targetBlockId, changes, MessageBlockType.TOOL) } else if (!targetBlockId) { console.warn( `[onToolCallInProgress] No block ID found for tool ID: ${toolResponse.id}. Available mappings:`, @@ -601,9 +591,7 @@ const fetchAndProcessAssistantResponseImpl = async ( if (finalStatus === MessageBlockStatus.ERROR) { changes.error = { message: `Tool execution failed/error`, details: toolResponse.response } } - cancelThrottledBlockUpdate(existingBlockId) - dispatch(updateOneBlock({ id: existingBlockId, changes })) - saveUpdatedBlockToDB(existingBlockId, assistantMsgId, topicId, getState) + smartBlockUpdate(existingBlockId, changes, MessageBlockType.TOOL) } else { console.warn( `[onToolCallComplete] Received unhandled tool status: ${toolResponse.status} for ID: ${toolResponse.id}` @@ -624,8 +612,7 @@ const fetchAndProcessAssistantResponseImpl = async ( knowledge: externalToolResult.knowledge, status: MessageBlockStatus.SUCCESS } - dispatch(updateOneBlock({ id: citationBlockId, changes })) - saveUpdatedBlockToDB(citationBlockId, assistantMsgId, topicId, getState) + smartBlockUpdate(citationBlockId, changes, MessageBlockType.CITATION) } else { console.error('[onExternalToolComplete] citationBlockId is null. Cannot update.') } @@ -639,8 +626,7 @@ const fetchAndProcessAssistantResponseImpl = async ( status: MessageBlockStatus.PROCESSING } lastBlockType = MessageBlockType.CITATION - dispatch(updateOneBlock({ id: initialPlaceholderBlockId, changes })) - saveUpdatedBlockToDB(initialPlaceholderBlockId, assistantMsgId, topicId, getState) + smartBlockUpdate(initialPlaceholderBlockId, changes, MessageBlockType.CITATION) initialPlaceholderBlockId = null } else { const citationBlock = createCitationBlock(assistantMsgId, {}, { status: MessageBlockStatus.PROCESSING }) @@ -656,22 +642,19 @@ const fetchAndProcessAssistantResponseImpl = async ( response: llmWebSearchResult, status: MessageBlockStatus.SUCCESS } - dispatch(updateOneBlock({ id: blockId, changes })) - saveUpdatedBlockToDB(blockId, assistantMsgId, topicId, getState) + smartBlockUpdate(blockId, changes, MessageBlockType.CITATION) - if (mainTextBlockId) { - const state = getState() - const existingMainTextBlock = state.messageBlocks.entities[mainTextBlockId] - if (existingMainTextBlock && existingMainTextBlock.type === MessageBlockType.MAIN_TEXT) { - const currentRefs = existingMainTextBlock.citationReferences || [] - const mainTextChanges = { - citationReferences: [...currentRefs, { blockId, citationBlockSource: llmWebSearchResult.source }] - } - dispatch(updateOneBlock({ id: mainTextBlockId, changes: mainTextChanges })) - saveUpdatedBlockToDB(mainTextBlockId, assistantMsgId, topicId, getState) + const state = getState() + const existingMainTextBlocks = findMainTextBlocks(state.messages.entities[assistantMsgId]) + if (existingMainTextBlocks.length > 0) { + const existingMainTextBlock = existingMainTextBlocks[0] + const currentRefs = existingMainTextBlock.citationReferences || [] + const mainTextChanges = { + citationReferences: [...currentRefs, { blockId, citationBlockSource: llmWebSearchResult.source }] } - mainTextBlockId = null + smartBlockUpdate(existingMainTextBlock.id, mainTextChanges, MessageBlockType.MAIN_TEXT) } + if (initialPlaceholderBlockId) { citationBlockId = initialPlaceholderBlockId initialPlaceholderBlockId = null @@ -687,21 +670,15 @@ const fetchAndProcessAssistantResponseImpl = async ( } ) citationBlockId = citationBlock.id - if (mainTextBlockId) { - const state = getState() - const existingMainTextBlock = state.messageBlocks.entities[mainTextBlockId] - if (existingMainTextBlock && existingMainTextBlock.type === MessageBlockType.MAIN_TEXT) { - const currentRefs = existingMainTextBlock.citationReferences || [] - const mainTextChanges = { - citationReferences: [ - ...currentRefs, - { citationBlockId, citationBlockSource: llmWebSearchResult.source } - ] - } - dispatch(updateOneBlock({ id: mainTextBlockId, changes: mainTextChanges })) - saveUpdatedBlockToDB(mainTextBlockId, assistantMsgId, topicId, getState) + const state = getState() + const existingMainTextBlocks = findMainTextBlocks(state.messages.entities[assistantMsgId]) + if (existingMainTextBlocks.length > 0) { + const existingMainTextBlock = existingMainTextBlocks[0] + const currentRefs = existingMainTextBlock.citationReferences || [] + const mainTextChanges = { + citationReferences: [...currentRefs, { citationBlockId, citationBlockSource: llmWebSearchResult.source }] } - mainTextBlockId = null + smartBlockUpdate(existingMainTextBlock.id, mainTextChanges, MessageBlockType.MAIN_TEXT) } await handleBlockTransition(citationBlock, MessageBlockType.CITATION) } @@ -716,8 +693,7 @@ const fetchAndProcessAssistantResponseImpl = async ( lastBlockType = MessageBlockType.IMAGE imageBlockId = initialPlaceholderBlockId initialPlaceholderBlockId = null - dispatch(updateOneBlock({ id: imageBlockId, changes: initialChanges })) - saveUpdatedBlockToDB(imageBlockId, assistantMsgId, topicId, getState) + smartBlockUpdate(imageBlockId, initialChanges, MessageBlockType.IMAGE) } else if (!imageBlockId) { const imageBlock = createImageBlock(assistantMsgId, { status: MessageBlockStatus.STREAMING @@ -734,8 +710,7 @@ const fetchAndProcessAssistantResponseImpl = async ( metadata: { generateImageResponse: imageData }, status: MessageBlockStatus.STREAMING } - dispatch(updateOneBlock({ id: imageBlockId, changes })) - saveUpdatedBlockToDB(imageBlockId, assistantMsgId, topicId, getState) + smartBlockUpdate(imageBlockId, changes, MessageBlockType.IMAGE) } }, onImageGenerated: (imageData) => { @@ -744,8 +719,7 @@ const fetchAndProcessAssistantResponseImpl = async ( const changes: Partial = { status: MessageBlockStatus.SUCCESS } - dispatch(updateOneBlock({ id: imageBlockId, changes })) - saveUpdatedBlockToDB(imageBlockId, assistantMsgId, topicId, getState) + smartBlockUpdate(imageBlockId, changes, MessageBlockType.IMAGE) } else { const imageUrl = imageData.images?.[0] || 'placeholder_image_url' const changes: Partial = { @@ -753,8 +727,7 @@ const fetchAndProcessAssistantResponseImpl = async ( metadata: { generateImageResponse: imageData }, status: MessageBlockStatus.SUCCESS } - dispatch(updateOneBlock({ id: imageBlockId, changes })) - saveUpdatedBlockToDB(imageBlockId, assistantMsgId, topicId, getState) + smartBlockUpdate(imageBlockId, changes, MessageBlockType.IMAGE) } } else { console.error('[onImageGenerated] Last block was not an Image block or ID is missing.') @@ -802,9 +775,7 @@ const fetchAndProcessAssistantResponseImpl = async ( const changes: Partial = { status: isErrorTypeAbort ? MessageBlockStatus.PAUSED : MessageBlockStatus.ERROR } - cancelThrottledBlockUpdate(possibleBlockId) - dispatch(updateOneBlock({ id: possibleBlockId, changes })) - saveUpdatedBlockToDB(possibleBlockId, assistantMsgId, topicId, getState) + smartBlockUpdate(possibleBlockId, changes, MessageBlockType.MAIN_TEXT) } const errorBlock = createErrorBlock(assistantMsgId, serializableError, { status: MessageBlockStatus.SUCCESS }) @@ -846,9 +817,7 @@ const fetchAndProcessAssistantResponseImpl = async ( const changes: Partial = { status: MessageBlockStatus.SUCCESS } - cancelThrottledBlockUpdate(possibleBlockId) - dispatch(updateOneBlock({ id: possibleBlockId, changes })) - saveUpdatedBlockToDB(possibleBlockId, assistantMsgId, topicId, getState) + smartBlockUpdate(possibleBlockId, changes, lastBlockType!) } const endTime = Date.now() diff --git a/src/renderer/src/types/chunk.ts b/src/renderer/src/types/chunk.ts index c5e84a4673..1fdbbdae6f 100644 --- a/src/renderer/src/types/chunk.ts +++ b/src/renderer/src/types/chunk.ts @@ -19,13 +19,16 @@ export enum ChunkType { EXTERNEL_TOOL_COMPLETE = 'externel_tool_complete', LLM_RESPONSE_CREATED = 'llm_response_created', LLM_RESPONSE_IN_PROGRESS = 'llm_response_in_progress', + TEXT_START = 'text.start', TEXT_DELTA = 'text.delta', TEXT_COMPLETE = 'text.complete', + AUDIO_START = 'audio.start', AUDIO_DELTA = 'audio.delta', AUDIO_COMPLETE = 'audio.complete', IMAGE_CREATED = 'image.created', IMAGE_DELTA = 'image.delta', IMAGE_COMPLETE = 'image.complete', + THINKING_START = 'thinking.start', THINKING_DELTA = 'thinking.delta', THINKING_COMPLETE = 'thinking.complete', LLM_WEB_SEARCH_IN_PROGRESS = 'llm_websearch_in_progress', @@ -56,6 +59,18 @@ export interface LLMResponseInProgressChunk { response?: Response type: ChunkType.LLM_RESPONSE_IN_PROGRESS } + +export interface TextStartChunk { + /** + * The type of the chunk + */ + type: ChunkType.TEXT_START + + /** + * The ID of the chunk + */ + chunk_id?: number +} export interface TextDeltaChunk { /** * The text content of the chunk @@ -90,6 +105,13 @@ export interface TextCompleteChunk { type: ChunkType.TEXT_COMPLETE } +export interface AudioStartChunk { + /** + * The type of the chunk + */ + type: ChunkType.AUDIO_START +} + export interface AudioDeltaChunk { /** * A chunk of Base64 encoded audio data @@ -140,6 +162,13 @@ export interface ImageCompleteChunk { image?: { type: 'url' | 'base64'; images: string[] } } +export interface ThinkingStartChunk { + /** + * The type of the chunk + */ + type: ChunkType.THINKING_START +} + export interface ThinkingDeltaChunk { /** * The text content of the chunk @@ -365,13 +394,16 @@ export type Chunk = | ExternalToolCompleteChunk // 外部工具调用完成,外部工具包含搜索互联网,知识库,MCP服务器 | LLMResponseCreatedChunk // 大模型响应创建,返回即将创建的块类型 | LLMResponseInProgressChunk // 大模型响应进行中 + | TextStartChunk // 文本内容生成开始 | TextDeltaChunk // 文本内容生成中 | TextCompleteChunk // 文本内容生成完成 + | AudioStartChunk // 音频内容生成开始 | AudioDeltaChunk // 音频内容生成中 | AudioCompleteChunk // 音频内容生成完成 | ImageCreatedChunk // 图片内容创建 | ImageDeltaChunk // 图片内容生成中 | ImageCompleteChunk // 图片内容生成完成 + | ThinkingStartChunk // 思考内容生成开始 | ThinkingDeltaChunk // 思考内容生成中 | ThinkingCompleteChunk // 思考内容生成完成 | LLMWebSearchInProgressChunk // 大模型内部搜索进行中,无明显特征