From ee5e4204192675fb995aea8544ae8afe0118f9e6 Mon Sep 17 00:00:00 2001 From: kangfenmao Date: Thu, 7 Aug 2025 14:54:22 +0800 Subject: [PATCH] feat(ApiService): enhance chat completion handling with chunk reception - Added onChunkReceived call to fetchChatCompletion to improve response handling. - Removed redundant onChunkReceived call from the completion parameters section in fetchChatCompletion. --- src/renderer/src/services/ApiService.ts | 3 +- src/renderer/src/store/thunk/messageThunk.ts | 543 ------------------- 2 files changed, 2 insertions(+), 544 deletions(-) diff --git a/src/renderer/src/services/ApiService.ts b/src/renderer/src/services/ApiService.ts index d59d242252..35495f3e82 100644 --- a/src/renderer/src/services/ApiService.ts +++ b/src/renderer/src/services/ApiService.ts @@ -437,6 +437,8 @@ export async function fetchChatCompletion({ }) { logger.debug('fetchChatCompletion', messages, assistant) + onChunkReceived({ type: ChunkType.LLM_RESPONSE_CREATED }) + if (assistant.prompt && containsSupportedVariables(assistant.prompt)) { assistant.prompt = await replacePromptVariables(assistant.prompt, assistant.model?.name) } @@ -485,7 +487,6 @@ export async function fetchChatCompletion({ isGenerateImageModel(model) && (isSupportedDisableGenerationModel(model) ? assistant.enableGenerateImage : true) // --- Call AI Completions --- - onChunkReceived({ type: ChunkType.LLM_RESPONSE_CREATED }) const completionsParams: CompletionsParams = { callType: 'chat', diff --git a/src/renderer/src/store/thunk/messageThunk.ts b/src/renderer/src/store/thunk/messageThunk.ts index 1a55e5f2c9..c8284eb31f 100644 --- a/src/renderer/src/store/thunk/messageThunk.ts +++ b/src/renderer/src/store/thunk/messageThunk.ts @@ -301,91 +301,6 @@ const fetchAndProcessAssistantResponseImpl = async ( try { dispatch(newMessagesActions.setTopicLoading({ topicId, loading: true })) - // let accumulatedContent = '' - // let accumulatedThinking = '' - // let lastBlockId: string | null = null - // let lastBlockType: MessageBlockType | null = null - // let currentActiveBlockType: MessageBlockType | null = null - // // 专注于块内部的生命周期处理 - // let initialPlaceholderBlockId: string | null = null - // let citationBlockId: string | null = null - // let mainTextBlockId: string | null = null - // let thinkingBlockId: string | null = null - // let imageBlockId: string | null = null - // let toolBlockId: string | null = null - - // const toolCallIdToBlockIdMap = new Map() - // const notificationService = NotificationService.getInstance() - - /** - * 智能更新策略:根据块类型连续性自动判断使用节流还是立即更新 - * - 连续同类块:使用节流(减少重渲染) - * - 块类型切换:立即更新(确保状态正确) - * @param blockId 块ID - * @param changes 块更新内容 - * @param blockType 块类型 - * @param isComplete 是否完成,如果完成,则需要保存块更新到redux中 - */ - // const smartBlockUpdate = ( - // blockId: string, - // changes: Partial, - // blockType: MessageBlockType, - // isComplete: boolean = false - // ) => { - // const isBlockTypeChanged = currentActiveBlockType !== null && currentActiveBlockType !== blockType - // if (isBlockTypeChanged || isComplete) { - // // 如果块类型改变,则取消上一个块的节流更新,并保存块更新到redux中(尽管有可能被上一个块本身的oncomplete事件的取消节流已经取消了) - // if (isBlockTypeChanged && lastBlockId) { - // cancelThrottledBlockUpdate(lastBlockId) - // } - // // 如果当前块完成,则取消当前块的节流更新,并保存块更新到redux中,避免streaming状态覆盖掉完成状态 - // if (isComplete) { - // cancelThrottledBlockUpdate(blockId) - // } - // dispatch(updateOneBlock({ id: blockId, changes })) - // saveUpdatedBlockToDB(blockId, assistantMsgId, topicId, getState) - // } else { - // throttledBlockUpdate(blockId, changes) - // } - - // // 更新当前活跃块类型 - // currentActiveBlockType = blockType - // } - - // const handleBlockTransition = async (newBlock: MessageBlock, newBlockType: MessageBlockType) => { - // lastBlockId = newBlock.id - // lastBlockType = newBlockType - // if (newBlockType !== MessageBlockType.MAIN_TEXT) { - // accumulatedContent = '' - // } - // if (newBlockType !== MessageBlockType.THINKING) { - // accumulatedThinking = '' - // } - // dispatch( - // newMessagesActions.updateMessage({ - // topicId, - // messageId: assistantMsgId, - // updates: { blockInstruction: { id: newBlock.id } } - // }) - // ) - // dispatch(upsertOneBlock(newBlock)) - // dispatch( - // newMessagesActions.upsertBlockReference({ - // messageId: assistantMsgId, - // blockId: newBlock.id, - // status: newBlock.status - // }) - // ) - - // const currentState = getState() - // const updatedMessage = currentState.messages.entities[assistantMsgId] - // if (updatedMessage) { - // await saveUpdatesToDB(assistantMsgId, topicId, { blocks: updatedMessage.blocks }, [newBlock]) - // } else { - // console.error(`[handleBlockTransition] Failed to get updated message ${assistantMsgId} from state for DB save.`) - // } - // } - // 创建 BlockManager 实例 const blockManager = new BlockManager({ dispatch, @@ -419,464 +334,6 @@ const fetchAndProcessAssistantResponseImpl = async ( messagesForContext = contextSlice.filter((m) => m && !m.status?.includes('ing')) } - // callbacks = { - // onLLMResponseCreated: async () => { - // const baseBlock = createBaseMessageBlock(assistantMsgId, MessageBlockType.UNKNOWN, { - // status: MessageBlockStatus.PROCESSING - // }) - // initialPlaceholderBlockId = baseBlock.id - // await handleBlockTransition(baseBlock as PlaceholderMessageBlock, MessageBlockType.UNKNOWN) - // }, - // onTextStart: async () => { - // if (initialPlaceholderBlockId) { - // lastBlockType = MessageBlockType.MAIN_TEXT - // const changes = { - // type: MessageBlockType.MAIN_TEXT, - // content: accumulatedContent, - // status: MessageBlockStatus.STREAMING - // } - // smartBlockUpdate(initialPlaceholderBlockId, changes, MessageBlockType.MAIN_TEXT, true) - // mainTextBlockId = initialPlaceholderBlockId - // initialPlaceholderBlockId = null - // } else if (!mainTextBlockId) { - // const newBlock = createMainTextBlock(assistantMsgId, accumulatedContent, { - // status: MessageBlockStatus.STREAMING - // }) - // mainTextBlockId = newBlock.id - // await handleBlockTransition(newBlock, MessageBlockType.MAIN_TEXT) - // } - // }, - // onTextChunk: async (text) => { - // const citationBlockSource = citationBlockId - // ? (getState().messageBlocks.entities[citationBlockId] as CitationMessageBlock).response?.source - // : WebSearchSource.WEBSEARCH - // accumulatedContent += text - // if (mainTextBlockId) { - // const blockChanges: Partial = { - // content: accumulatedContent, - // status: MessageBlockStatus.STREAMING, - // citationReferences: citationBlockId ? [{ citationBlockId, citationBlockSource }] : [] - // } - // smartBlockUpdate(mainTextBlockId, blockChanges, MessageBlockType.MAIN_TEXT) - // } - // }, - // onTextComplete: async (finalText) => { - // if (mainTextBlockId) { - // const changes = { - // content: finalText, - // status: MessageBlockStatus.SUCCESS - // } - // smartBlockUpdate(mainTextBlockId, changes, MessageBlockType.MAIN_TEXT, true) - // mainTextBlockId = null - // } else { - // console.warn( - // `[onTextComplete] Received text.complete but last block was not MAIN_TEXT (was ${lastBlockType}) or lastBlockId is null.` - // ) - // } - // }, - // onThinkingStart: async () => { - // if (initialPlaceholderBlockId) { - // lastBlockType = MessageBlockType.THINKING - // const changes = { - // type: MessageBlockType.THINKING, - // content: accumulatedThinking, - // status: MessageBlockStatus.STREAMING, - // thinking_millsec: 0 - // } - // thinkingBlockId = initialPlaceholderBlockId - // initialPlaceholderBlockId = null - // smartBlockUpdate(thinkingBlockId, changes, MessageBlockType.THINKING, true) - // } else if (!thinkingBlockId) { - // const newBlock = createThinkingBlock(assistantMsgId, accumulatedThinking, { - // status: MessageBlockStatus.STREAMING, - // thinking_millsec: 0 - // }) - // thinkingBlockId = newBlock.id - // await handleBlockTransition(newBlock, MessageBlockType.THINKING) - // } - // }, - // onThinkingChunk: async (text, thinking_millsec) => { - // accumulatedThinking += text - // if (thinkingBlockId) { - // const blockChanges: Partial = { - // content: accumulatedThinking, - // status: MessageBlockStatus.STREAMING, - // thinking_millsec: thinking_millsec - // } - // smartBlockUpdate(thinkingBlockId, blockChanges, MessageBlockType.THINKING) - // } - // }, - // onThinkingComplete: (finalText, final_thinking_millsec) => { - // if (thinkingBlockId) { - // const changes = { - // type: MessageBlockType.THINKING, - // content: finalText, - // status: MessageBlockStatus.SUCCESS, - // thinking_millsec: final_thinking_millsec - // } - // smartBlockUpdate(thinkingBlockId, changes, MessageBlockType.THINKING, true) - // } else { - // console.warn( - // `[onThinkingComplete] Received thinking.complete but last block was not THINKING (was ${lastBlockType}) or lastBlockId is null.` - // ) - // } - // thinkingBlockId = null - // }, - // onToolCallPending: (toolResponse: MCPToolResponse) => { - // if (initialPlaceholderBlockId) { - // lastBlockType = MessageBlockType.TOOL - // const changes = { - // type: MessageBlockType.TOOL, - // status: MessageBlockStatus.PENDING, - // toolName: toolResponse.tool.name, - // metadata: { rawMcpToolResponse: toolResponse } - // } - // toolBlockId = initialPlaceholderBlockId - // initialPlaceholderBlockId = null - // smartBlockUpdate(toolBlockId, changes, MessageBlockType.TOOL) - // toolCallIdToBlockIdMap.set(toolResponse.id, toolBlockId) - // } else if (toolResponse.status === 'pending') { - // const toolBlock = createToolBlock(assistantMsgId, toolResponse.id, { - // toolName: toolResponse.tool.name, - // status: MessageBlockStatus.PENDING, - // metadata: { rawMcpToolResponse: toolResponse } - // }) - // toolBlockId = toolBlock.id - // handleBlockTransition(toolBlock, MessageBlockType.TOOL) - // toolCallIdToBlockIdMap.set(toolResponse.id, toolBlock.id) - // } else { - // console.warn( - // `[onToolCallPending] Received unhandled tool status: ${toolResponse.status} for ID: ${toolResponse.id}` - // ) - // } - // }, - // onToolCallInProgress: (toolResponse: MCPToolResponse) => { - // // 根据 toolResponse.id 查找对应的块ID - // const targetBlockId = toolCallIdToBlockIdMap.get(toolResponse.id) - - // if (targetBlockId && toolResponse.status === 'invoking') { - // const changes = { - // status: MessageBlockStatus.PROCESSING, - // metadata: { rawMcpToolResponse: toolResponse } - // } - // smartBlockUpdate(targetBlockId, changes, MessageBlockType.TOOL) - // } else if (!targetBlockId) { - // console.warn( - // `[onToolCallInProgress] No block ID found for tool ID: ${toolResponse.id}. Available mappings:`, - // Array.from(toolCallIdToBlockIdMap.entries()) - // ) - // } else { - // console.warn( - // `[onToolCallInProgress] Received unhandled tool status: ${toolResponse.status} for ID: ${toolResponse.id}` - // ) - // } - // }, - // onToolCallComplete: (toolResponse: MCPToolResponse) => { - // const existingBlockId = toolCallIdToBlockIdMap.get(toolResponse.id) - // toolCallIdToBlockIdMap.delete(toolResponse.id) - // if (toolResponse.status === 'done' || toolResponse.status === 'error' || toolResponse.status === 'cancelled') { - // if (!existingBlockId) { - // console.error( - // `[onToolCallComplete] No existing block found for completed/error tool call ID: ${toolResponse.id}. Cannot update.` - // ) - // return - // } - // const finalStatus = - // toolResponse.status === 'done' || toolResponse.status === 'cancelled' - // ? MessageBlockStatus.SUCCESS - // : MessageBlockStatus.ERROR - // const changes: Partial = { - // content: toolResponse.response, - // status: finalStatus, - // metadata: { rawMcpToolResponse: toolResponse } - // } - // if (finalStatus === MessageBlockStatus.ERROR) { - // changes.error = { message: `Tool execution failed/error`, details: toolResponse.response } - // } - // smartBlockUpdate(existingBlockId, changes, MessageBlockType.TOOL, true) - // } else { - // console.warn( - // `[onToolCallComplete] Received unhandled tool status: ${toolResponse.status} for ID: ${toolResponse.id}` - // ) - // } - // toolBlockId = null - // }, - // onExternalToolInProgress: async () => { - // const citationBlock = createCitationBlock(assistantMsgId, {}, { status: MessageBlockStatus.PROCESSING }) - // citationBlockId = citationBlock.id - // await handleBlockTransition(citationBlock, MessageBlockType.CITATION) - // // saveUpdatedBlockToDB(citationBlock.id, assistantMsgId, topicId, getState) - // }, - // onExternalToolComplete: (externalToolResult: ExternalToolResult) => { - // if (citationBlockId) { - // const changes: Partial = { - // response: externalToolResult.webSearch, - // knowledge: externalToolResult.knowledge, - // status: MessageBlockStatus.SUCCESS - // } - // smartBlockUpdate(citationBlockId, changes, MessageBlockType.CITATION, true) - // } else { - // console.error('[onExternalToolComplete] citationBlockId is null. Cannot update.') - // } - // }, - // onLLMWebSearchInProgress: async () => { - // if (initialPlaceholderBlockId) { - // lastBlockType = MessageBlockType.CITATION - // citationBlockId = initialPlaceholderBlockId - // const changes = { - // type: MessageBlockType.CITATION, - // status: MessageBlockStatus.PROCESSING - // } - // lastBlockType = MessageBlockType.CITATION - // smartBlockUpdate(initialPlaceholderBlockId, changes, MessageBlockType.CITATION) - // initialPlaceholderBlockId = null - // } else { - // const citationBlock = createCitationBlock(assistantMsgId, {}, { status: MessageBlockStatus.PROCESSING }) - // citationBlockId = citationBlock.id - // await handleBlockTransition(citationBlock, MessageBlockType.CITATION) - // } - // }, - // onLLMWebSearchComplete: async (llmWebSearchResult) => { - // const blockId = citationBlockId || initialPlaceholderBlockId - // if (blockId) { - // const changes: Partial = { - // type: MessageBlockType.CITATION, - // response: llmWebSearchResult, - // status: MessageBlockStatus.SUCCESS - // } - // smartBlockUpdate(blockId, changes, MessageBlockType.CITATION) - - // const state = getState() - // const existingMainTextBlocks = findMainTextBlocks(state.messages.entities[assistantMsgId]) - // if (existingMainTextBlocks.length > 0) { - // const existingMainTextBlock = existingMainTextBlocks[0] - // const currentRefs = existingMainTextBlock.citationReferences || [] - // const mainTextChanges = { - // citationReferences: [...currentRefs, { blockId, citationBlockSource: llmWebSearchResult.source }] - // } - // smartBlockUpdate(existingMainTextBlock.id, mainTextChanges, MessageBlockType.MAIN_TEXT, true) - // } - - // if (initialPlaceholderBlockId) { - // citationBlockId = initialPlaceholderBlockId - // initialPlaceholderBlockId = null - // } - // } else { - // const citationBlock = createCitationBlock( - // assistantMsgId, - // { - // response: llmWebSearchResult - // }, - // { - // status: MessageBlockStatus.SUCCESS - // } - // ) - // citationBlockId = citationBlock.id - // const state = getState() - // const existingMainTextBlocks = findMainTextBlocks(state.messages.entities[assistantMsgId]) - // if (existingMainTextBlocks.length > 0) { - // const existingMainTextBlock = existingMainTextBlocks[0] - // const currentRefs = existingMainTextBlock.citationReferences || [] - // const mainTextChanges = { - // citationReferences: [...currentRefs, { citationBlockId, citationBlockSource: llmWebSearchResult.source }] - // } - // smartBlockUpdate(existingMainTextBlock.id, mainTextChanges, MessageBlockType.MAIN_TEXT, true) - // } - // await handleBlockTransition(citationBlock, MessageBlockType.CITATION) - // } - // }, - // onImageCreated: async () => { - // if (initialPlaceholderBlockId) { - // lastBlockType = MessageBlockType.IMAGE - // const initialChanges: Partial = { - // type: MessageBlockType.IMAGE, - // status: MessageBlockStatus.PENDING - // } - // lastBlockType = MessageBlockType.IMAGE - // imageBlockId = initialPlaceholderBlockId - // initialPlaceholderBlockId = null - // smartBlockUpdate(imageBlockId, initialChanges, MessageBlockType.IMAGE) - // } else if (!imageBlockId) { - // const imageBlock = createImageBlock(assistantMsgId, { - // status: MessageBlockStatus.PENDING - // }) - // imageBlockId = imageBlock.id - // await handleBlockTransition(imageBlock, MessageBlockType.IMAGE) - // } - // }, - // onImageDelta: (imageData) => { - // const imageUrl = imageData.images?.[0] || 'placeholder_image_url' - // if (imageBlockId) { - // const changes: Partial = { - // url: imageUrl, - // metadata: { generateImageResponse: imageData }, - // status: MessageBlockStatus.STREAMING - // } - // smartBlockUpdate(imageBlockId, changes, MessageBlockType.IMAGE, true) - // } - // }, - // onImageGenerated: (imageData) => { - // if (imageBlockId) { - // if (!imageData) { - // const changes: Partial = { - // status: MessageBlockStatus.SUCCESS - // } - // smartBlockUpdate(imageBlockId, changes, MessageBlockType.IMAGE) - // } else { - // const imageUrl = imageData.images?.[0] || 'placeholder_image_url' - // const changes: Partial = { - // url: imageUrl, - // metadata: { generateImageResponse: imageData }, - // status: MessageBlockStatus.SUCCESS - // } - // smartBlockUpdate(imageBlockId, changes, MessageBlockType.IMAGE, true) - // } - // } else { - // console.error('[onImageGenerated] Last block was not an Image block or ID is missing.') - // } - // imageBlockId = null - // }, - // onError: async (error) => { - // console.dir(error, { depth: null }) - // const isErrorTypeAbort = isAbortError(error) - // let pauseErrorLanguagePlaceholder = '' - // if (isErrorTypeAbort) { - // pauseErrorLanguagePlaceholder = 'pause_placeholder' - // } - - // const serializableError = { - // name: error.name, - // message: pauseErrorLanguagePlaceholder || error.message || formatErrorMessage(error), - // originalMessage: error.message, - // stack: error.stack, - // status: error.status || error.code, - // requestId: error.request_id - // } - // if (!isOnHomePage()) { - // await notificationService.send({ - // id: uuid(), - // type: 'error', - // title: t('notification.assistant'), - // message: serializableError.message, - // silent: false, - // timestamp: Date.now(), - // source: 'assistant' - // }) - // } - // const possibleBlockId = - // mainTextBlockId || - // thinkingBlockId || - // toolBlockId || - // imageBlockId || - // citationBlockId || - // initialPlaceholderBlockId || - // lastBlockId - - // if (possibleBlockId) { - // // 更改上一个block的状态为ERROR - // const changes: Partial = { - // status: isErrorTypeAbort ? MessageBlockStatus.PAUSED : MessageBlockStatus.ERROR - // } - // smartBlockUpdate(possibleBlockId, changes, lastBlockType!, true) - // } - - // const errorBlock = createErrorBlock(assistantMsgId, serializableError, { status: MessageBlockStatus.SUCCESS }) - // await handleBlockTransition(errorBlock, MessageBlockType.ERROR) - // const messageErrorUpdate = { - // status: isErrorTypeAbort ? AssistantMessageStatus.SUCCESS : AssistantMessageStatus.ERROR - // } - // dispatch(newMessagesActions.updateMessage({ topicId, messageId: assistantMsgId, updates: messageErrorUpdate })) - - // saveUpdatesToDB(assistantMsgId, topicId, messageErrorUpdate, []) - - // EventEmitter.emit(EVENT_NAMES.MESSAGE_COMPLETE, { - // id: assistantMsgId, - // topicId, - // status: isErrorTypeAbort ? 'pause' : 'error', - // error: error.message - // }) - // }, - // onComplete: async (status: AssistantMessageStatus, response?: Response) => { - // const finalStateOnComplete = getState() - // const finalAssistantMsg = finalStateOnComplete.messages.entities[assistantMsgId] - - // if (status === 'success' && finalAssistantMsg) { - // const userMsgId = finalAssistantMsg.askId - // const orderedMsgs = selectMessagesForTopic(finalStateOnComplete, topicId) - // const userMsgIndex = orderedMsgs.findIndex((m) => m.id === userMsgId) - // const contextForUsage = userMsgIndex !== -1 ? orderedMsgs.slice(0, userMsgIndex + 1) : [] - // const finalContextWithAssistant = [...contextForUsage, finalAssistantMsg] - - // const possibleBlockId = - // mainTextBlockId || - // thinkingBlockId || - // toolBlockId || - // imageBlockId || - // citationBlockId || - // initialPlaceholderBlockId || - // lastBlockId - // if (possibleBlockId) { - // const changes: Partial = { - // status: MessageBlockStatus.SUCCESS - // } - // smartBlockUpdate(possibleBlockId, changes, lastBlockType!, true) - // } - - // const endTime = Date.now() - // const duration = endTime - startTime - // const content = getMainTextContent(finalAssistantMsg) - // if (!isOnHomePage() && duration > 60 * 1000) { - // await notificationService.send({ - // id: uuid(), - // type: 'success', - // title: t('notification.assistant'), - // message: content.length > 50 ? content.slice(0, 47) + '...' : content, - // silent: false, - // timestamp: Date.now(), - // source: 'assistant' - // }) - // } - - // // 更新topic的name - // autoRenameTopic(assistant, topicId) - - // if ( - // response && - // (response.usage?.total_tokens === 0 || - // response?.usage?.prompt_tokens === 0 || - // response?.usage?.completion_tokens === 0) - // ) { - // const usage = await estimateMessagesUsage({ assistant, messages: finalContextWithAssistant }) - // response.usage = usage - // } - // // dispatch(newMessagesActions.setTopicLoading({ topicId, loading: false })) - // } - // if (response && response.metrics) { - // if (response.metrics.completion_tokens === 0 && response.usage?.completion_tokens) { - // response = { - // ...response, - // metrics: { - // ...response.metrics, - // completion_tokens: response.usage.completion_tokens - // } - // } - // } - // } - - // const messageUpdates: Partial = { status, metrics: response?.metrics, usage: response?.usage } - // dispatch( - // newMessagesActions.updateMessage({ - // topicId, - // messageId: assistantMsgId, - // updates: messageUpdates - // }) - // ) - // saveUpdatesToDB(assistantMsgId, topicId, messageUpdates, []) - - // EventEmitter.emit(EVENT_NAMES.MESSAGE_COMPLETE, { id: assistantMsgId, topicId, status }) - // } - // } - callbacks = createCallbacks({ blockManager, dispatch,