diff --git a/src/renderer/src/store/thunk/messageThunk.ts b/src/renderer/src/store/thunk/messageThunk.ts index 4ad208c9e1..175e63c0f4 100644 --- a/src/renderer/src/store/thunk/messageThunk.ts +++ b/src/renderer/src/store/thunk/messageThunk.ts @@ -38,6 +38,7 @@ import { getTopicQueue, waitForTopicQueue } from '@renderer/utils/queue' import { isOnHomePage } from '@renderer/utils/window' import { t } from 'i18next' import { throttle } from 'lodash' +import { LRUCache } from 'lru-cache' import type { AppDispatch, RootState } from '../index' import { removeManyBlocks, updateOneBlock, upsertManyBlocks, upsertOneBlock } from '../messageBlock' @@ -114,22 +115,88 @@ const updateExistingMessageAndBlocksInDB = async ( } } -// 更新单个块的逻辑,用于更新消息中的单个块 -const throttledBlockUpdate = throttle(async (id, blockUpdate) => { - // const state = store.getState() - // const block = state.messageBlocks.entities[id] - // throttle是异步函数,可能会在complete事件触发后才执行 - // if ( - // blockUpdate.status === MessageBlockStatus.STREAMING && - // (block?.status === MessageBlockStatus.SUCCESS || block?.status === MessageBlockStatus.ERROR) - // ) - // return +/** + * 消息块节流器。 + * 每个消息块有独立节流器,并发更新时不会互相影响 + */ +const blockUpdateThrottlers = new LRUCache>({ + max: 100, + ttl: 1000 * 60 * 5, + updateAgeOnGet: true +}) - store.dispatch(updateOneBlock({ id, changes: blockUpdate })) - await db.message_blocks.update(id, blockUpdate) -}, 150) +/** + * 消息块 RAF 缓存。 + * 用于管理 RAF 请求创建和取消。 + */ +const blockUpdateRafs = new LRUCache({ + max: 100, + ttl: 1000 * 60 * 5, + updateAgeOnGet: true +}) -const cancelThrottledBlockUpdate = throttledBlockUpdate.cancel +/** + * 获取或创建消息块专用的节流函数。 + */ +const getBlockThrottler = (id: string) => { + if (!blockUpdateThrottlers.has(id)) { + const throttler = throttle(async (blockUpdate: any) => { + const existingRAF = blockUpdateRafs.get(id) + if (existingRAF) { + cancelAnimationFrame(existingRAF) + } + + const rafId = requestAnimationFrame(() => { + store.dispatch(updateOneBlock({ id, changes: blockUpdate })) + blockUpdateRafs.delete(id) + }) + + blockUpdateRafs.set(id, rafId) + await db.message_blocks.update(id, blockUpdate) + }, 150) + + blockUpdateThrottlers.set(id, throttler) + } + + return blockUpdateThrottlers.get(id)! +} + +/** + * 更新单个消息块。 + */ +const throttledBlockUpdate = (id: string, blockUpdate: any) => { + const throttler = getBlockThrottler(id) + throttler(blockUpdate) +} + +/** + * 取消单个块的节流更新,移除节流器和 RAF。 + */ +const cancelThrottledBlockUpdate = (id: string) => { + const rafId = blockUpdateRafs.get(id) + if (rafId) { + cancelAnimationFrame(rafId) + blockUpdateRafs.delete(id) + } + + const throttler = blockUpdateThrottlers.get(id) + if (throttler) { + throttler.cancel() + blockUpdateThrottlers.delete(id) + } +} + +/** + * 批量清理多个消息块。 + */ +export const cleanupMultipleBlocks = (dispatch: AppDispatch, blockIds: string[]) => { + blockIds.forEach((id) => { + cancelThrottledBlockUpdate(id) + }) + if (blockIds.length > 0) { + dispatch(removeManyBlocks(blockIds)) + } +} // // 修改: 节流更新单个块的内容/状态到数据库 (仅用于 Text/Thinking Chunks) // export const throttledBlockDbUpdate = throttle( @@ -357,12 +424,12 @@ const fetchAndProcessAssistantResponseImpl = async ( } }, onTextComplete: async (finalText) => { - cancelThrottledBlockUpdate() if (lastBlockType === MessageBlockType.MAIN_TEXT && lastBlockId) { const changes = { content: finalText, status: MessageBlockStatus.SUCCESS } + cancelThrottledBlockUpdate(lastBlockId) dispatch(updateOneBlock({ id: lastBlockId, changes })) saveUpdatedBlockToDB(lastBlockId, assistantMsgId, topicId, getState) @@ -415,8 +482,6 @@ const fetchAndProcessAssistantResponseImpl = async ( } }, onThinkingComplete: (finalText, final_thinking_millsec) => { - cancelThrottledBlockUpdate() - if (lastBlockType === MessageBlockType.THINKING && lastBlockId) { const changes = { type: MessageBlockType.THINKING, @@ -424,6 +489,7 @@ const fetchAndProcessAssistantResponseImpl = async ( status: MessageBlockStatus.SUCCESS, thinking_millsec: final_thinking_millsec } + cancelThrottledBlockUpdate(lastBlockId) dispatch(updateOneBlock({ id: lastBlockId, changes })) saveUpdatedBlockToDB(lastBlockId, assistantMsgId, topicId, getState) } else { @@ -458,7 +524,6 @@ const fetchAndProcessAssistantResponseImpl = async ( } }, onToolCallComplete: (toolResponse: MCPToolResponse) => { - cancelThrottledBlockUpdate() const existingBlockId = toolCallIdToBlockIdMap.get(toolResponse.id) if (toolResponse.status === 'done' || toolResponse.status === 'error') { if (!existingBlockId) { @@ -476,6 +541,7 @@ const fetchAndProcessAssistantResponseImpl = async ( if (finalStatus === MessageBlockStatus.ERROR) { changes.error = { message: `Tool execution failed/error`, details: toolResponse.response } } + cancelThrottledBlockUpdate(existingBlockId) dispatch(updateOneBlock({ id: existingBlockId, changes })) saveUpdatedBlockToDB(existingBlockId, assistantMsgId, topicId, getState) } else { @@ -577,7 +643,6 @@ const fetchAndProcessAssistantResponseImpl = async ( } }, onError: async (error) => { - cancelThrottledBlockUpdate() console.dir(error, { depth: null }) const isErrorTypeAbort = isAbortError(error) let pauseErrorLanguagePlaceholder = '' @@ -610,6 +675,7 @@ const fetchAndProcessAssistantResponseImpl = async ( const changes: Partial = { status: isErrorTypeAbort ? MessageBlockStatus.PAUSED : MessageBlockStatus.ERROR } + cancelThrottledBlockUpdate(lastBlockId) dispatch(updateOneBlock({ id: lastBlockId, changes })) saveUpdatedBlockToDB(lastBlockId, assistantMsgId, topicId, getState) } @@ -631,8 +697,6 @@ const fetchAndProcessAssistantResponseImpl = async ( }) }, onComplete: async (status: AssistantMessageStatus, response?: Response) => { - cancelThrottledBlockUpdate() - const finalStateOnComplete = getState() const finalAssistantMsg = finalStateOnComplete.messages.entities[assistantMsgId] @@ -647,6 +711,7 @@ const fetchAndProcessAssistantResponseImpl = async ( const changes: Partial = { status: MessageBlockStatus.SUCCESS } + cancelThrottledBlockUpdate(lastBlockId) dispatch(updateOneBlock({ id: lastBlockId, changes })) saveUpdatedBlockToDB(lastBlockId, assistantMsgId, topicId, getState) } @@ -819,7 +884,7 @@ export const deleteSingleMessageThunk = try { dispatch(newMessagesActions.removeMessage({ topicId, messageId })) - dispatch(removeManyBlocks(blockIdsToDelete)) + cleanupMultipleBlocks(dispatch, blockIdsToDelete) await db.message_blocks.bulkDelete(blockIdsToDelete) const topic = await db.topics.get(topicId) if (topic) { @@ -862,7 +927,7 @@ export const deleteMessageGroupThunk = try { dispatch(newMessagesActions.removeMessagesByAskId({ topicId, askId })) - dispatch(removeManyBlocks(blockIdsToDelete)) + cleanupMultipleBlocks(dispatch, blockIdsToDelete) await db.message_blocks.bulkDelete(blockIdsToDelete) const topic = await db.topics.get(topicId) if (topic) { @@ -892,9 +957,7 @@ export const clearTopicMessagesThunk = const blockIdsToDelete = Array.from(blockIdsToDeleteSet) dispatch(newMessagesActions.clearTopicMessages(topicId)) - if (blockIdsToDelete.length > 0) { - dispatch(removeManyBlocks(blockIdsToDelete)) - } + cleanupMultipleBlocks(dispatch, blockIdsToDelete) await db.topics.update(topicId, { messages: [] }) if (blockIdsToDelete.length > 0) { @@ -968,9 +1031,7 @@ export const resendMessageThunk = } messagesToUpdateInRedux.forEach((update) => dispatch(newMessagesActions.updateMessage(update))) - if (allBlockIdsToDelete.length > 0) { - dispatch(removeManyBlocks(allBlockIdsToDelete)) - } + cleanupMultipleBlocks(dispatch, allBlockIdsToDelete) try { if (allBlockIdsToDelete.length > 0) { @@ -1069,9 +1130,7 @@ export const regenerateAssistantResponseThunk = ) // 6. Remove old blocks from Redux - if (blockIdsToDelete.length > 0) { - dispatch(removeManyBlocks(blockIdsToDelete)) - } + cleanupMultipleBlocks(dispatch, blockIdsToDelete) // 7. Update DB: Save the reset message state within the topic and delete old blocks // Fetch the current state *after* Redux updates to get the latest message list @@ -1516,9 +1575,7 @@ export const removeBlocksThunk = // 1. Update Redux state dispatch(newMessagesActions.updateMessage({ topicId, messageId, updates: { blocks: updatedBlockIds } })) - if (blockIdsToRemove.length > 0) { - dispatch(removeManyBlocks(blockIdsToRemove)) - } + cleanupMultipleBlocks(dispatch, blockIdsToRemove) const finalMessagesToSave = selectMessagesForTopic(getState(), topicId)