From 6560369b98f93a1ef6b682eac1710244539dbd1e Mon Sep 17 00:00:00 2001 From: SuYao Date: Thu, 17 Jul 2025 16:09:43 +0800 Subject: [PATCH] refactor(ActionUtils): streamline message processing logic (#8226) * refactor(ActionUtils): streamline message processing logic - Removed unnecessary content accumulation for thinking and text blocks. - Updated handling of message chunks to directly use incoming text for updates. - Improved state management for thinking and text blocks during streaming. - Enhanced the logic for creating and updating message blocks to ensure proper status and content handling. * chore: remove log * feat(ActionUtils): update message block instruction during processing - Added dispatch to update the message block instruction with the current block ID when processing messages. - Enhanced state management for message updates to ensure accurate tracking of block instructions during streaming. * feat(ActionUtils): enhance message processing with text block content tracking - Introduced a new variable to store the content of the text block during message processing. - Updated the logic to dispatch the current text block content upon completion of message chunks, improving state management and accuracy in message updates. * feat(ActionUtils): refine message processing error handling and status updates - Enhanced the logic to update the message status based on error conditions, ensuring accurate representation of message states. - Improved handling of text block content during processing, allowing for better state management and completion tracking. - Streamlined the dispatch of updates for message blocks, particularly in error scenarios, to maintain consistency in message processing. * feat(messageThunk): export throttled block update functions for improved message processing - Changed the visibility of `throttledBlockUpdate` and `cancelThrottledBlockUpdate` functions to export, allowing their use in other modules. - Updated `processMessages` in ActionUtils to utilize the newly exported functions for handling message updates, enhancing the efficiency of block updates during message processing. * fix(ActionUtils): correct text block content handling in message processing - Changed the declaration of `textBlockContent` to a constant to prevent unintended modifications. - Updated the logic in `processMessages` to use the text block ID for throttled updates instead of the content, ensuring accurate message updates during processing. * feat(HomeWindow): improve message processing with throttled updates - Integrated `throttledBlockUpdate` and `cancelThrottledBlockUpdate` for managing thinking and text block updates, enhancing performance during message streaming. - Updated logic to handle chunk types more effectively, ensuring accurate content updates and status management for message blocks. - Streamlined the dispatch of message updates upon completion and error handling, improving overall state management. --- src/renderer/src/store/thunk/messageThunk.ts | 4 +- .../src/windows/mini/home/HomeWindow.tsx | 89 +++++++++------ .../action/components/ActionUtils.ts | 102 +++++++++++++----- 3 files changed, 134 insertions(+), 61 deletions(-) diff --git a/src/renderer/src/store/thunk/messageThunk.ts b/src/renderer/src/store/thunk/messageThunk.ts index da4267d3be..8f380d523a 100644 --- a/src/renderer/src/store/thunk/messageThunk.ts +++ b/src/renderer/src/store/thunk/messageThunk.ts @@ -148,7 +148,7 @@ const getBlockThrottler = (id: string) => { /** * 更新单个消息块。 */ -const throttledBlockUpdate = (id: string, blockUpdate: any) => { +export const throttledBlockUpdate = (id: string, blockUpdate: any) => { const throttler = getBlockThrottler(id) throttler(blockUpdate) } @@ -156,7 +156,7 @@ const throttledBlockUpdate = (id: string, blockUpdate: any) => { /** * 取消单个块的节流更新,移除节流器和 RAF。 */ -const cancelThrottledBlockUpdate = (id: string) => { +export const cancelThrottledBlockUpdate = (id: string) => { const rafId = blockUpdateRafs.get(id) if (rafId) { cancelAnimationFrame(rafId) diff --git a/src/renderer/src/windows/mini/home/HomeWindow.tsx b/src/renderer/src/windows/mini/home/HomeWindow.tsx index 24c4fbcf20..325b74b215 100644 --- a/src/renderer/src/windows/mini/home/HomeWindow.tsx +++ b/src/renderer/src/windows/mini/home/HomeWindow.tsx @@ -9,6 +9,7 @@ import { getAssistantMessage, getUserMessage } from '@renderer/services/Messages import store, { useAppSelector } from '@renderer/store' import { updateOneBlock, upsertManyBlocks, upsertOneBlock } from '@renderer/store/messageBlock' import { newMessagesActions, selectMessagesForTopic } from '@renderer/store/newMessage' +import { cancelThrottledBlockUpdate, throttledBlockUpdate } from '@renderer/store/thunk/messageThunk' import { ThemeMode, Topic } from '@renderer/types' import { Chunk, ChunkType } from '@renderer/types/chunk' import { AssistantMessageStatus, MessageBlockStatus } from '@renderer/types/newMessage' @@ -243,9 +244,7 @@ const HomeWindow: FC = () => { .filter((m) => m && !m.status?.includes('ing')) let blockId: string | null = null - let blockContent: string = '' let thinkingBlockId: string | null = null - let thinkingBlockContent: string = '' setIsLoading(true) setIsOutputted(false) @@ -259,14 +258,16 @@ const HomeWindow: FC = () => { assistant: { ...currentAssistant, settings: { streamOutput: true } }, onChunkReceived: (chunk: Chunk) => { switch (chunk.type) { - case ChunkType.THINKING_DELTA: + case ChunkType.THINKING_START: { - thinkingBlockContent += chunk.text setIsOutputted(true) - if (!thinkingBlockId) { - const block = createThinkingBlock(assistantMessage.id, chunk.text, { - status: MessageBlockStatus.STREAMING, - thinking_millsec: chunk.thinking_millsec + if (thinkingBlockId) { + store.dispatch( + updateOneBlock({ id: thinkingBlockId, changes: { status: MessageBlockStatus.STREAMING } }) + ) + } else { + const block = createThinkingBlock(assistantMessage.id, '', { + status: MessageBlockStatus.STREAMING }) thinkingBlockId = block.id store.dispatch( @@ -277,19 +278,24 @@ const HomeWindow: FC = () => { }) ) store.dispatch(upsertOneBlock(block)) - } else { - store.dispatch( - updateOneBlock({ - id: thinkingBlockId, - changes: { content: thinkingBlockContent, thinking_millsec: chunk.thinking_millsec } - }) - ) + } + } + break + case ChunkType.THINKING_DELTA: + { + setIsOutputted(true) + if (thinkingBlockId) { + throttledBlockUpdate(thinkingBlockId, { + content: chunk.text, + thinking_millsec: chunk.thinking_millsec + }) } } break case ChunkType.THINKING_COMPLETE: { if (thinkingBlockId) { + cancelThrottledBlockUpdate(thinkingBlockId) store.dispatch( updateOneBlock({ id: thinkingBlockId, @@ -299,12 +305,13 @@ const HomeWindow: FC = () => { } } break - case ChunkType.TEXT_DELTA: + case ChunkType.TEXT_START: { - blockContent += chunk.text setIsOutputted(true) - if (!blockId) { - const block = createMainTextBlock(assistantMessage.id, chunk.text, { + if (blockId) { + store.dispatch(updateOneBlock({ id: blockId, changes: { status: MessageBlockStatus.STREAMING } })) + } else { + const block = createMainTextBlock(assistantMessage.id, '', { status: MessageBlockStatus.STREAMING }) blockId = block.id @@ -316,23 +323,29 @@ const HomeWindow: FC = () => { }) ) store.dispatch(upsertOneBlock(block)) - } else { - store.dispatch(updateOneBlock({ id: blockId, changes: { content: blockContent } })) + } + } + break + case ChunkType.TEXT_DELTA: + { + setIsOutputted(true) + if (blockId) { + throttledBlockUpdate(blockId, { content: chunk.text }) } } break case ChunkType.TEXT_COMPLETE: { - blockId && - store.dispatch(updateOneBlock({ id: blockId, changes: { status: MessageBlockStatus.SUCCESS } })) - store.dispatch( - newMessagesActions.updateMessage({ - topicId, - messageId: assistantMessage.id, - updates: { status: AssistantMessageStatus.SUCCESS } - }) - ) + if (blockId) { + cancelThrottledBlockUpdate(blockId) + store.dispatch( + updateOneBlock({ + id: blockId, + changes: { content: chunk.text, status: MessageBlockStatus.SUCCESS } + }) + ) + } } break case ChunkType.ERROR: { @@ -348,6 +361,15 @@ const HomeWindow: FC = () => { } }) ) + store.dispatch( + newMessagesActions.updateMessage({ + topicId, + messageId: assistantMessage.id, + updates: { + status: isAborted ? AssistantMessageStatus.PAUSED : AssistantMessageStatus.SUCCESS + } + }) + ) } if (!isAborted) { throw new Error(chunk.error.message) @@ -358,6 +380,13 @@ const HomeWindow: FC = () => { setIsLoading(false) setIsOutputted(true) currentAskId.current = '' + store.dispatch( + newMessagesActions.updateMessage({ + topicId, + messageId: assistantMessage.id, + updates: { status: AssistantMessageStatus.SUCCESS } + }) + ) break } } diff --git a/src/renderer/src/windows/selection/action/components/ActionUtils.ts b/src/renderer/src/windows/selection/action/components/ActionUtils.ts index d6b77cec47..91daa4eac8 100644 --- a/src/renderer/src/windows/selection/action/components/ActionUtils.ts +++ b/src/renderer/src/windows/selection/action/components/ActionUtils.ts @@ -3,6 +3,7 @@ import { getAssistantMessage, getUserMessage } from '@renderer/services/Messages import store from '@renderer/store' import { updateOneBlock, upsertManyBlocks, upsertOneBlock } from '@renderer/store/messageBlock' import { newMessagesActions } from '@renderer/store/newMessage' +import { cancelThrottledBlockUpdate, throttledBlockUpdate } from '@renderer/store/thunk/messageThunk' import { Assistant, Topic } from '@renderer/types' import { Chunk, ChunkType } from '@renderer/types/chunk' import { AssistantMessageStatus, MessageBlockStatus } from '@renderer/types/newMessage' @@ -33,10 +34,8 @@ export const processMessages = async ( store.dispatch(upsertManyBlocks(userBlocks)) let textBlockId: string | null = null - let textBlockContent: string = '' - let thinkingBlockId: string | null = null - let thinkingBlockContent: string = '' + const textBlockContent: string = '' const assistantMessage = getAssistantMessage({ assistant, @@ -54,13 +53,15 @@ export const processMessages = async ( assistant: { ...assistant, settings: { streamOutput: true } }, onChunkReceived: (chunk: Chunk) => { switch (chunk.type) { - case ChunkType.THINKING_DELTA: + case ChunkType.THINKING_START: { - thinkingBlockContent += chunk.text - if (!thinkingBlockId) { - const block = createThinkingBlock(assistantMessage.id, chunk.text, { - status: MessageBlockStatus.STREAMING, - thinking_millsec: chunk.thinking_millsec + if (thinkingBlockId) { + store.dispatch( + updateOneBlock({ id: thinkingBlockId, changes: { status: MessageBlockStatus.STREAMING } }) + ) + } else { + const block = createThinkingBlock(assistantMessage.id, '', { + status: MessageBlockStatus.STREAMING }) thinkingBlockId = block.id store.dispatch( @@ -71,13 +72,16 @@ export const processMessages = async ( }) ) store.dispatch(upsertOneBlock(block)) - } else { - store.dispatch( - updateOneBlock({ - id: thinkingBlockId, - changes: { content: thinkingBlockContent, thinking_millsec: chunk.thinking_millsec } - }) - ) + } + } + break + case ChunkType.THINKING_DELTA: + { + if (thinkingBlockId) { + throttledBlockUpdate(thinkingBlockId, { + content: chunk.text, + thinking_millsec: chunk.thinking_millsec + }) } onStream() } @@ -85,20 +89,27 @@ export const processMessages = async ( case ChunkType.THINKING_COMPLETE: { if (thinkingBlockId) { + cancelThrottledBlockUpdate(thinkingBlockId) store.dispatch( updateOneBlock({ id: thinkingBlockId, - changes: { status: MessageBlockStatus.SUCCESS, thinking_millsec: chunk.thinking_millsec } + changes: { + content: chunk.text, + status: MessageBlockStatus.SUCCESS, + thinking_millsec: chunk.thinking_millsec + } }) ) + thinkingBlockId = null } } break - case ChunkType.TEXT_DELTA: + case ChunkType.TEXT_START: { - textBlockContent += chunk.text - if (!textBlockId) { - const block = createMainTextBlock(assistantMessage.id, chunk.text, { + if (textBlockId) { + store.dispatch(updateOneBlock({ id: textBlockId, changes: { status: MessageBlockStatus.STREAMING } })) + } else { + const block = createMainTextBlock(assistantMessage.id, '', { status: MessageBlockStatus.STREAMING }) textBlockId = block.id @@ -110,22 +121,34 @@ export const processMessages = async ( }) ) store.dispatch(upsertOneBlock(block)) - } else { - store.dispatch(updateOneBlock({ id: textBlockId, changes: { content: textBlockContent } })) } - + } + break + case ChunkType.TEXT_DELTA: + { + if (textBlockId) { + throttledBlockUpdate(textBlockId, { content: chunk.text }) + } onStream() } break case ChunkType.TEXT_COMPLETE: { - textBlockId && + if (textBlockId) { + cancelThrottledBlockUpdate(textBlockId) store.dispatch( updateOneBlock({ id: textBlockId, - changes: { status: MessageBlockStatus.SUCCESS } + changes: { content: chunk.text, status: MessageBlockStatus.SUCCESS } }) ) + onFinish(chunk.text) + textBlockId = null + } + } + break + case ChunkType.BLOCK_COMPLETE: + { store.dispatch( newMessagesActions.updateMessage({ topicId: topic.id, @@ -133,12 +156,33 @@ export const processMessages = async ( updates: { status: AssistantMessageStatus.SUCCESS } }) ) - textBlockContent = chunk.text + onFinish(textBlockContent) } break - case ChunkType.BLOCK_COMPLETE: case ChunkType.ERROR: - onFinish(textBlockContent) + { + const blockId = textBlockId || thinkingBlockId + if (blockId) { + store.dispatch( + updateOneBlock({ + id: blockId, + changes: { + status: isAbortError(chunk.error) ? MessageBlockStatus.PAUSED : MessageBlockStatus.ERROR + } + }) + ) + } + store.dispatch( + newMessagesActions.updateMessage({ + topicId: topic.id, + messageId: assistantMessage.id, + updates: { + status: isAbortError(chunk.error) ? AssistantMessageStatus.PAUSED : AssistantMessageStatus.SUCCESS + } + }) + ) + onFinish(textBlockContent) + } break } }