perf: improve responsiveness on streaming formulas (#6659)

* perf: improve performance on streaming formulas

* refactor: create throttlers for blocks

* refactor: use LRU cache for better memory management
This commit is contained in:
one 2025-06-06 03:07:59 +08:00 committed by GitHub
parent d80513d011
commit b402cdf7ff
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -38,6 +38,7 @@ import { getTopicQueue, waitForTopicQueue } from '@renderer/utils/queue'
import { isOnHomePage } from '@renderer/utils/window' import { isOnHomePage } from '@renderer/utils/window'
import { t } from 'i18next' import { t } from 'i18next'
import { throttle } from 'lodash' import { throttle } from 'lodash'
import { LRUCache } from 'lru-cache'
import type { AppDispatch, RootState } from '../index' import type { AppDispatch, RootState } from '../index'
import { removeManyBlocks, updateOneBlock, upsertManyBlocks, upsertOneBlock } from '../messageBlock' import { removeManyBlocks, updateOneBlock, upsertManyBlocks, upsertOneBlock } from '../messageBlock'
@ -114,22 +115,88 @@ const updateExistingMessageAndBlocksInDB = async (
} }
} }
// 更新单个块的逻辑,用于更新消息中的单个块 /**
const throttledBlockUpdate = throttle(async (id, blockUpdate) => { *
// const state = store.getState() *
// const block = state.messageBlocks.entities[id] */
// throttle是异步函数,可能会在complete事件触发后才执行 const blockUpdateThrottlers = new LRUCache<string, ReturnType<typeof throttle>>({
// if ( max: 100,
// blockUpdate.status === MessageBlockStatus.STREAMING && ttl: 1000 * 60 * 5,
// (block?.status === MessageBlockStatus.SUCCESS || block?.status === MessageBlockStatus.ERROR) updateAgeOnGet: true
// ) })
// return
store.dispatch(updateOneBlock({ id, changes: blockUpdate })) /**
await db.message_blocks.update(id, blockUpdate) * RAF
}, 150) * RAF
*/
const blockUpdateRafs = new LRUCache<string, number>({
max: 100,
ttl: 1000 * 60 * 5,
updateAgeOnGet: true
})
const cancelThrottledBlockUpdate = throttledBlockUpdate.cancel /**
*
*/
const getBlockThrottler = (id: string) => {
if (!blockUpdateThrottlers.has(id)) {
const throttler = throttle(async (blockUpdate: any) => {
const existingRAF = blockUpdateRafs.get(id)
if (existingRAF) {
cancelAnimationFrame(existingRAF)
}
const rafId = requestAnimationFrame(() => {
store.dispatch(updateOneBlock({ id, changes: blockUpdate }))
blockUpdateRafs.delete(id)
})
blockUpdateRafs.set(id, rafId)
await db.message_blocks.update(id, blockUpdate)
}, 150)
blockUpdateThrottlers.set(id, throttler)
}
return blockUpdateThrottlers.get(id)!
}
/**
*
*/
const throttledBlockUpdate = (id: string, blockUpdate: any) => {
const throttler = getBlockThrottler(id)
throttler(blockUpdate)
}
/**
* RAF
*/
const cancelThrottledBlockUpdate = (id: string) => {
const rafId = blockUpdateRafs.get(id)
if (rafId) {
cancelAnimationFrame(rafId)
blockUpdateRafs.delete(id)
}
const throttler = blockUpdateThrottlers.get(id)
if (throttler) {
throttler.cancel()
blockUpdateThrottlers.delete(id)
}
}
/**
*
*/
export const cleanupMultipleBlocks = (dispatch: AppDispatch, blockIds: string[]) => {
blockIds.forEach((id) => {
cancelThrottledBlockUpdate(id)
})
if (blockIds.length > 0) {
dispatch(removeManyBlocks(blockIds))
}
}
// // 修改: 节流更新单个块的内容/状态到数据库 (仅用于 Text/Thinking Chunks) // // 修改: 节流更新单个块的内容/状态到数据库 (仅用于 Text/Thinking Chunks)
// export const throttledBlockDbUpdate = throttle( // export const throttledBlockDbUpdate = throttle(
@ -357,12 +424,12 @@ const fetchAndProcessAssistantResponseImpl = async (
} }
}, },
onTextComplete: async (finalText) => { onTextComplete: async (finalText) => {
cancelThrottledBlockUpdate()
if (lastBlockType === MessageBlockType.MAIN_TEXT && lastBlockId) { if (lastBlockType === MessageBlockType.MAIN_TEXT && lastBlockId) {
const changes = { const changes = {
content: finalText, content: finalText,
status: MessageBlockStatus.SUCCESS status: MessageBlockStatus.SUCCESS
} }
cancelThrottledBlockUpdate(lastBlockId)
dispatch(updateOneBlock({ id: lastBlockId, changes })) dispatch(updateOneBlock({ id: lastBlockId, changes }))
saveUpdatedBlockToDB(lastBlockId, assistantMsgId, topicId, getState) saveUpdatedBlockToDB(lastBlockId, assistantMsgId, topicId, getState)
@ -415,8 +482,6 @@ const fetchAndProcessAssistantResponseImpl = async (
} }
}, },
onThinkingComplete: (finalText, final_thinking_millsec) => { onThinkingComplete: (finalText, final_thinking_millsec) => {
cancelThrottledBlockUpdate()
if (lastBlockType === MessageBlockType.THINKING && lastBlockId) { if (lastBlockType === MessageBlockType.THINKING && lastBlockId) {
const changes = { const changes = {
type: MessageBlockType.THINKING, type: MessageBlockType.THINKING,
@ -424,6 +489,7 @@ const fetchAndProcessAssistantResponseImpl = async (
status: MessageBlockStatus.SUCCESS, status: MessageBlockStatus.SUCCESS,
thinking_millsec: final_thinking_millsec thinking_millsec: final_thinking_millsec
} }
cancelThrottledBlockUpdate(lastBlockId)
dispatch(updateOneBlock({ id: lastBlockId, changes })) dispatch(updateOneBlock({ id: lastBlockId, changes }))
saveUpdatedBlockToDB(lastBlockId, assistantMsgId, topicId, getState) saveUpdatedBlockToDB(lastBlockId, assistantMsgId, topicId, getState)
} else { } else {
@ -458,7 +524,6 @@ const fetchAndProcessAssistantResponseImpl = async (
} }
}, },
onToolCallComplete: (toolResponse: MCPToolResponse) => { onToolCallComplete: (toolResponse: MCPToolResponse) => {
cancelThrottledBlockUpdate()
const existingBlockId = toolCallIdToBlockIdMap.get(toolResponse.id) const existingBlockId = toolCallIdToBlockIdMap.get(toolResponse.id)
if (toolResponse.status === 'done' || toolResponse.status === 'error') { if (toolResponse.status === 'done' || toolResponse.status === 'error') {
if (!existingBlockId) { if (!existingBlockId) {
@ -476,6 +541,7 @@ const fetchAndProcessAssistantResponseImpl = async (
if (finalStatus === MessageBlockStatus.ERROR) { if (finalStatus === MessageBlockStatus.ERROR) {
changes.error = { message: `Tool execution failed/error`, details: toolResponse.response } changes.error = { message: `Tool execution failed/error`, details: toolResponse.response }
} }
cancelThrottledBlockUpdate(existingBlockId)
dispatch(updateOneBlock({ id: existingBlockId, changes })) dispatch(updateOneBlock({ id: existingBlockId, changes }))
saveUpdatedBlockToDB(existingBlockId, assistantMsgId, topicId, getState) saveUpdatedBlockToDB(existingBlockId, assistantMsgId, topicId, getState)
} else { } else {
@ -577,7 +643,6 @@ const fetchAndProcessAssistantResponseImpl = async (
} }
}, },
onError: async (error) => { onError: async (error) => {
cancelThrottledBlockUpdate()
console.dir(error, { depth: null }) console.dir(error, { depth: null })
const isErrorTypeAbort = isAbortError(error) const isErrorTypeAbort = isAbortError(error)
let pauseErrorLanguagePlaceholder = '' let pauseErrorLanguagePlaceholder = ''
@ -610,6 +675,7 @@ const fetchAndProcessAssistantResponseImpl = async (
const changes: Partial<MessageBlock> = { const changes: Partial<MessageBlock> = {
status: isErrorTypeAbort ? MessageBlockStatus.PAUSED : MessageBlockStatus.ERROR status: isErrorTypeAbort ? MessageBlockStatus.PAUSED : MessageBlockStatus.ERROR
} }
cancelThrottledBlockUpdate(lastBlockId)
dispatch(updateOneBlock({ id: lastBlockId, changes })) dispatch(updateOneBlock({ id: lastBlockId, changes }))
saveUpdatedBlockToDB(lastBlockId, assistantMsgId, topicId, getState) saveUpdatedBlockToDB(lastBlockId, assistantMsgId, topicId, getState)
} }
@ -631,8 +697,6 @@ const fetchAndProcessAssistantResponseImpl = async (
}) })
}, },
onComplete: async (status: AssistantMessageStatus, response?: Response) => { onComplete: async (status: AssistantMessageStatus, response?: Response) => {
cancelThrottledBlockUpdate()
const finalStateOnComplete = getState() const finalStateOnComplete = getState()
const finalAssistantMsg = finalStateOnComplete.messages.entities[assistantMsgId] const finalAssistantMsg = finalStateOnComplete.messages.entities[assistantMsgId]
@ -647,6 +711,7 @@ const fetchAndProcessAssistantResponseImpl = async (
const changes: Partial<MessageBlock> = { const changes: Partial<MessageBlock> = {
status: MessageBlockStatus.SUCCESS status: MessageBlockStatus.SUCCESS
} }
cancelThrottledBlockUpdate(lastBlockId)
dispatch(updateOneBlock({ id: lastBlockId, changes })) dispatch(updateOneBlock({ id: lastBlockId, changes }))
saveUpdatedBlockToDB(lastBlockId, assistantMsgId, topicId, getState) saveUpdatedBlockToDB(lastBlockId, assistantMsgId, topicId, getState)
} }
@ -819,7 +884,7 @@ export const deleteSingleMessageThunk =
try { try {
dispatch(newMessagesActions.removeMessage({ topicId, messageId })) dispatch(newMessagesActions.removeMessage({ topicId, messageId }))
dispatch(removeManyBlocks(blockIdsToDelete)) cleanupMultipleBlocks(dispatch, blockIdsToDelete)
await db.message_blocks.bulkDelete(blockIdsToDelete) await db.message_blocks.bulkDelete(blockIdsToDelete)
const topic = await db.topics.get(topicId) const topic = await db.topics.get(topicId)
if (topic) { if (topic) {
@ -862,7 +927,7 @@ export const deleteMessageGroupThunk =
try { try {
dispatch(newMessagesActions.removeMessagesByAskId({ topicId, askId })) dispatch(newMessagesActions.removeMessagesByAskId({ topicId, askId }))
dispatch(removeManyBlocks(blockIdsToDelete)) cleanupMultipleBlocks(dispatch, blockIdsToDelete)
await db.message_blocks.bulkDelete(blockIdsToDelete) await db.message_blocks.bulkDelete(blockIdsToDelete)
const topic = await db.topics.get(topicId) const topic = await db.topics.get(topicId)
if (topic) { if (topic) {
@ -892,9 +957,7 @@ export const clearTopicMessagesThunk =
const blockIdsToDelete = Array.from(blockIdsToDeleteSet) const blockIdsToDelete = Array.from(blockIdsToDeleteSet)
dispatch(newMessagesActions.clearTopicMessages(topicId)) dispatch(newMessagesActions.clearTopicMessages(topicId))
if (blockIdsToDelete.length > 0) { cleanupMultipleBlocks(dispatch, blockIdsToDelete)
dispatch(removeManyBlocks(blockIdsToDelete))
}
await db.topics.update(topicId, { messages: [] }) await db.topics.update(topicId, { messages: [] })
if (blockIdsToDelete.length > 0) { if (blockIdsToDelete.length > 0) {
@ -968,9 +1031,7 @@ export const resendMessageThunk =
} }
messagesToUpdateInRedux.forEach((update) => dispatch(newMessagesActions.updateMessage(update))) messagesToUpdateInRedux.forEach((update) => dispatch(newMessagesActions.updateMessage(update)))
if (allBlockIdsToDelete.length > 0) { cleanupMultipleBlocks(dispatch, allBlockIdsToDelete)
dispatch(removeManyBlocks(allBlockIdsToDelete))
}
try { try {
if (allBlockIdsToDelete.length > 0) { if (allBlockIdsToDelete.length > 0) {
@ -1069,9 +1130,7 @@ export const regenerateAssistantResponseThunk =
) )
// 6. Remove old blocks from Redux // 6. Remove old blocks from Redux
if (blockIdsToDelete.length > 0) { cleanupMultipleBlocks(dispatch, blockIdsToDelete)
dispatch(removeManyBlocks(blockIdsToDelete))
}
// 7. Update DB: Save the reset message state within the topic and delete old blocks // 7. Update DB: Save the reset message state within the topic and delete old blocks
// Fetch the current state *after* Redux updates to get the latest message list // Fetch the current state *after* Redux updates to get the latest message list
@ -1516,9 +1575,7 @@ export const removeBlocksThunk =
// 1. Update Redux state // 1. Update Redux state
dispatch(newMessagesActions.updateMessage({ topicId, messageId, updates: { blocks: updatedBlockIds } })) dispatch(newMessagesActions.updateMessage({ topicId, messageId, updates: { blocks: updatedBlockIds } }))
if (blockIdsToRemove.length > 0) { cleanupMultipleBlocks(dispatch, blockIdsToRemove)
dispatch(removeManyBlocks(blockIdsToRemove))
}
const finalMessagesToSave = selectMessagesForTopic(getState(), topicId) const finalMessagesToSave = selectMessagesForTopic(getState(), topicId)