refactor(ActionUtils): streamline message processing logic (#8226)

* refactor(ActionUtils): streamline message processing logic

- Removed unnecessary content accumulation for thinking and text blocks.
- Updated handling of message chunks to directly use incoming text for updates.
- Improved state management for thinking and text blocks during streaming.
- Enhanced the logic for creating and updating message blocks to ensure proper status and content handling.

* chore: remove log

* feat(ActionUtils): update message block instruction during processing

- Added dispatch to update the message block instruction with the current block ID when processing messages.
- Enhanced state management for message updates to ensure accurate tracking of block instructions during streaming.

* feat(ActionUtils): enhance message processing with text block content tracking

- Introduced a new variable to store the content of the text block during message processing.
- Updated the logic to dispatch the current text block content upon completion of message chunks, improving state management and accuracy in message updates.

* feat(ActionUtils): refine message processing error handling and status updates

- Enhanced the logic to update the message status based on error conditions, ensuring accurate representation of message states.
- Improved handling of text block content during processing, allowing for better state management and completion tracking.
- Streamlined the dispatch of updates for message blocks, particularly in error scenarios, to maintain consistency in message processing.

* feat(messageThunk): export throttled block update functions for improved message processing

- Changed the visibility of `throttledBlockUpdate` and `cancelThrottledBlockUpdate` functions to export, allowing their use in other modules.
- Updated `processMessages` in ActionUtils to utilize the newly exported functions for handling message updates, enhancing the efficiency of block updates during message processing.

* fix(ActionUtils): correct text block content handling in message processing

- Changed the declaration of `textBlockContent` to a constant to prevent unintended modifications.
- Updated the logic in `processMessages` to use the text block ID for throttled updates instead of the content, ensuring accurate message updates during processing.

* feat(HomeWindow): improve message processing with throttled updates

- Integrated `throttledBlockUpdate` and `cancelThrottledBlockUpdate` for managing thinking and text block updates, enhancing performance during message streaming.
- Updated logic to handle chunk types more effectively, ensuring accurate content updates and status management for message blocks.
- Streamlined the dispatch of message updates upon completion and error handling, improving overall state management.
This commit is contained in:
SuYao 2025-07-17 16:09:43 +08:00 committed by GitHub
parent 30b080efbd
commit 6560369b98
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 134 additions and 61 deletions

View File

@ -148,7 +148,7 @@ const getBlockThrottler = (id: string) => {
/**
*
*/
const throttledBlockUpdate = (id: string, blockUpdate: any) => {
export const throttledBlockUpdate = (id: string, blockUpdate: any) => {
const throttler = getBlockThrottler(id)
throttler(blockUpdate)
}
@ -156,7 +156,7 @@ const throttledBlockUpdate = (id: string, blockUpdate: any) => {
/**
* RAF
*/
const cancelThrottledBlockUpdate = (id: string) => {
export const cancelThrottledBlockUpdate = (id: string) => {
const rafId = blockUpdateRafs.get(id)
if (rafId) {
cancelAnimationFrame(rafId)

View File

@ -9,6 +9,7 @@ import { getAssistantMessage, getUserMessage } from '@renderer/services/Messages
import store, { useAppSelector } from '@renderer/store'
import { updateOneBlock, upsertManyBlocks, upsertOneBlock } from '@renderer/store/messageBlock'
import { newMessagesActions, selectMessagesForTopic } from '@renderer/store/newMessage'
import { cancelThrottledBlockUpdate, throttledBlockUpdate } from '@renderer/store/thunk/messageThunk'
import { ThemeMode, Topic } from '@renderer/types'
import { Chunk, ChunkType } from '@renderer/types/chunk'
import { AssistantMessageStatus, MessageBlockStatus } from '@renderer/types/newMessage'
@ -243,9 +244,7 @@ const HomeWindow: FC = () => {
.filter((m) => m && !m.status?.includes('ing'))
let blockId: string | null = null
let blockContent: string = ''
let thinkingBlockId: string | null = null
let thinkingBlockContent: string = ''
setIsLoading(true)
setIsOutputted(false)
@ -259,14 +258,16 @@ const HomeWindow: FC = () => {
assistant: { ...currentAssistant, settings: { streamOutput: true } },
onChunkReceived: (chunk: Chunk) => {
switch (chunk.type) {
case ChunkType.THINKING_DELTA:
case ChunkType.THINKING_START:
{
thinkingBlockContent += chunk.text
setIsOutputted(true)
if (!thinkingBlockId) {
const block = createThinkingBlock(assistantMessage.id, chunk.text, {
status: MessageBlockStatus.STREAMING,
thinking_millsec: chunk.thinking_millsec
if (thinkingBlockId) {
store.dispatch(
updateOneBlock({ id: thinkingBlockId, changes: { status: MessageBlockStatus.STREAMING } })
)
} else {
const block = createThinkingBlock(assistantMessage.id, '', {
status: MessageBlockStatus.STREAMING
})
thinkingBlockId = block.id
store.dispatch(
@ -277,19 +278,24 @@ const HomeWindow: FC = () => {
})
)
store.dispatch(upsertOneBlock(block))
} else {
store.dispatch(
updateOneBlock({
id: thinkingBlockId,
changes: { content: thinkingBlockContent, thinking_millsec: chunk.thinking_millsec }
})
)
}
}
break
case ChunkType.THINKING_DELTA:
{
setIsOutputted(true)
if (thinkingBlockId) {
throttledBlockUpdate(thinkingBlockId, {
content: chunk.text,
thinking_millsec: chunk.thinking_millsec
})
}
}
break
case ChunkType.THINKING_COMPLETE:
{
if (thinkingBlockId) {
cancelThrottledBlockUpdate(thinkingBlockId)
store.dispatch(
updateOneBlock({
id: thinkingBlockId,
@ -299,12 +305,13 @@ const HomeWindow: FC = () => {
}
}
break
case ChunkType.TEXT_DELTA:
case ChunkType.TEXT_START:
{
blockContent += chunk.text
setIsOutputted(true)
if (!blockId) {
const block = createMainTextBlock(assistantMessage.id, chunk.text, {
if (blockId) {
store.dispatch(updateOneBlock({ id: blockId, changes: { status: MessageBlockStatus.STREAMING } }))
} else {
const block = createMainTextBlock(assistantMessage.id, '', {
status: MessageBlockStatus.STREAMING
})
blockId = block.id
@ -316,23 +323,29 @@ const HomeWindow: FC = () => {
})
)
store.dispatch(upsertOneBlock(block))
} else {
store.dispatch(updateOneBlock({ id: blockId, changes: { content: blockContent } }))
}
}
break
case ChunkType.TEXT_DELTA:
{
setIsOutputted(true)
if (blockId) {
throttledBlockUpdate(blockId, { content: chunk.text })
}
}
break
case ChunkType.TEXT_COMPLETE:
{
blockId &&
store.dispatch(updateOneBlock({ id: blockId, changes: { status: MessageBlockStatus.SUCCESS } }))
store.dispatch(
newMessagesActions.updateMessage({
topicId,
messageId: assistantMessage.id,
updates: { status: AssistantMessageStatus.SUCCESS }
})
)
if (blockId) {
cancelThrottledBlockUpdate(blockId)
store.dispatch(
updateOneBlock({
id: blockId,
changes: { content: chunk.text, status: MessageBlockStatus.SUCCESS }
})
)
}
}
break
case ChunkType.ERROR: {
@ -348,6 +361,15 @@ const HomeWindow: FC = () => {
}
})
)
store.dispatch(
newMessagesActions.updateMessage({
topicId,
messageId: assistantMessage.id,
updates: {
status: isAborted ? AssistantMessageStatus.PAUSED : AssistantMessageStatus.SUCCESS
}
})
)
}
if (!isAborted) {
throw new Error(chunk.error.message)
@ -358,6 +380,13 @@ const HomeWindow: FC = () => {
setIsLoading(false)
setIsOutputted(true)
currentAskId.current = ''
store.dispatch(
newMessagesActions.updateMessage({
topicId,
messageId: assistantMessage.id,
updates: { status: AssistantMessageStatus.SUCCESS }
})
)
break
}
}

View File

@ -3,6 +3,7 @@ import { getAssistantMessage, getUserMessage } from '@renderer/services/Messages
import store from '@renderer/store'
import { updateOneBlock, upsertManyBlocks, upsertOneBlock } from '@renderer/store/messageBlock'
import { newMessagesActions } from '@renderer/store/newMessage'
import { cancelThrottledBlockUpdate, throttledBlockUpdate } from '@renderer/store/thunk/messageThunk'
import { Assistant, Topic } from '@renderer/types'
import { Chunk, ChunkType } from '@renderer/types/chunk'
import { AssistantMessageStatus, MessageBlockStatus } from '@renderer/types/newMessage'
@ -33,10 +34,8 @@ export const processMessages = async (
store.dispatch(upsertManyBlocks(userBlocks))
let textBlockId: string | null = null
let textBlockContent: string = ''
let thinkingBlockId: string | null = null
let thinkingBlockContent: string = ''
const textBlockContent: string = ''
const assistantMessage = getAssistantMessage({
assistant,
@ -54,13 +53,15 @@ export const processMessages = async (
assistant: { ...assistant, settings: { streamOutput: true } },
onChunkReceived: (chunk: Chunk) => {
switch (chunk.type) {
case ChunkType.THINKING_DELTA:
case ChunkType.THINKING_START:
{
thinkingBlockContent += chunk.text
if (!thinkingBlockId) {
const block = createThinkingBlock(assistantMessage.id, chunk.text, {
status: MessageBlockStatus.STREAMING,
thinking_millsec: chunk.thinking_millsec
if (thinkingBlockId) {
store.dispatch(
updateOneBlock({ id: thinkingBlockId, changes: { status: MessageBlockStatus.STREAMING } })
)
} else {
const block = createThinkingBlock(assistantMessage.id, '', {
status: MessageBlockStatus.STREAMING
})
thinkingBlockId = block.id
store.dispatch(
@ -71,13 +72,16 @@ export const processMessages = async (
})
)
store.dispatch(upsertOneBlock(block))
} else {
store.dispatch(
updateOneBlock({
id: thinkingBlockId,
changes: { content: thinkingBlockContent, thinking_millsec: chunk.thinking_millsec }
})
)
}
}
break
case ChunkType.THINKING_DELTA:
{
if (thinkingBlockId) {
throttledBlockUpdate(thinkingBlockId, {
content: chunk.text,
thinking_millsec: chunk.thinking_millsec
})
}
onStream()
}
@ -85,20 +89,27 @@ export const processMessages = async (
case ChunkType.THINKING_COMPLETE:
{
if (thinkingBlockId) {
cancelThrottledBlockUpdate(thinkingBlockId)
store.dispatch(
updateOneBlock({
id: thinkingBlockId,
changes: { status: MessageBlockStatus.SUCCESS, thinking_millsec: chunk.thinking_millsec }
changes: {
content: chunk.text,
status: MessageBlockStatus.SUCCESS,
thinking_millsec: chunk.thinking_millsec
}
})
)
thinkingBlockId = null
}
}
break
case ChunkType.TEXT_DELTA:
case ChunkType.TEXT_START:
{
textBlockContent += chunk.text
if (!textBlockId) {
const block = createMainTextBlock(assistantMessage.id, chunk.text, {
if (textBlockId) {
store.dispatch(updateOneBlock({ id: textBlockId, changes: { status: MessageBlockStatus.STREAMING } }))
} else {
const block = createMainTextBlock(assistantMessage.id, '', {
status: MessageBlockStatus.STREAMING
})
textBlockId = block.id
@ -110,22 +121,34 @@ export const processMessages = async (
})
)
store.dispatch(upsertOneBlock(block))
} else {
store.dispatch(updateOneBlock({ id: textBlockId, changes: { content: textBlockContent } }))
}
}
break
case ChunkType.TEXT_DELTA:
{
if (textBlockId) {
throttledBlockUpdate(textBlockId, { content: chunk.text })
}
onStream()
}
break
case ChunkType.TEXT_COMPLETE:
{
textBlockId &&
if (textBlockId) {
cancelThrottledBlockUpdate(textBlockId)
store.dispatch(
updateOneBlock({
id: textBlockId,
changes: { status: MessageBlockStatus.SUCCESS }
changes: { content: chunk.text, status: MessageBlockStatus.SUCCESS }
})
)
onFinish(chunk.text)
textBlockId = null
}
}
break
case ChunkType.BLOCK_COMPLETE:
{
store.dispatch(
newMessagesActions.updateMessage({
topicId: topic.id,
@ -133,12 +156,33 @@ export const processMessages = async (
updates: { status: AssistantMessageStatus.SUCCESS }
})
)
textBlockContent = chunk.text
onFinish(textBlockContent)
}
break
case ChunkType.BLOCK_COMPLETE:
case ChunkType.ERROR:
onFinish(textBlockContent)
{
const blockId = textBlockId || thinkingBlockId
if (blockId) {
store.dispatch(
updateOneBlock({
id: blockId,
changes: {
status: isAbortError(chunk.error) ? MessageBlockStatus.PAUSED : MessageBlockStatus.ERROR
}
})
)
}
store.dispatch(
newMessagesActions.updateMessage({
topicId: topic.id,
messageId: assistantMessage.id,
updates: {
status: isAbortError(chunk.error) ? AssistantMessageStatus.PAUSED : AssistantMessageStatus.SUCCESS
}
})
)
onFinish(textBlockContent)
}
break
}
}