perf: improve responsiveness on streaming formulas (#6659)

* perf: improve performance on streaming formulas

* refactor: create throttlers for blocks

* refactor: use LRU cache for better memory management
This commit is contained in:
one 2025-06-06 03:07:59 +08:00 committed by GitHub
parent a212c68e56
commit c4c1e2fca1

View File

@ -38,6 +38,7 @@ import { getTopicQueue, waitForTopicQueue } from '@renderer/utils/queue'
import { isOnHomePage } from '@renderer/utils/window'
import { t } from 'i18next'
import { throttle } from 'lodash'
import { LRUCache } from 'lru-cache'
import type { AppDispatch, RootState } from '../index'
import { removeManyBlocks, updateOneBlock, upsertManyBlocks, upsertOneBlock } from '../messageBlock'
@ -114,22 +115,88 @@ const updateExistingMessageAndBlocksInDB = async (
}
}
// 更新单个块的逻辑,用于更新消息中的单个块
const throttledBlockUpdate = throttle(async (id, blockUpdate) => {
// const state = store.getState()
// const block = state.messageBlocks.entities[id]
// throttle是异步函数,可能会在complete事件触发后才执行
// if (
// blockUpdate.status === MessageBlockStatus.STREAMING &&
// (block?.status === MessageBlockStatus.SUCCESS || block?.status === MessageBlockStatus.ERROR)
// )
// return
/**
*
*
*/
const blockUpdateThrottlers = new LRUCache<string, ReturnType<typeof throttle>>({
max: 100,
ttl: 1000 * 60 * 5,
updateAgeOnGet: true
})
store.dispatch(updateOneBlock({ id, changes: blockUpdate }))
await db.message_blocks.update(id, blockUpdate)
}, 150)
/**
* RAF
* RAF
*/
const blockUpdateRafs = new LRUCache<string, number>({
max: 100,
ttl: 1000 * 60 * 5,
updateAgeOnGet: true
})
const cancelThrottledBlockUpdate = throttledBlockUpdate.cancel
/**
*
*/
const getBlockThrottler = (id: string) => {
if (!blockUpdateThrottlers.has(id)) {
const throttler = throttle(async (blockUpdate: any) => {
const existingRAF = blockUpdateRafs.get(id)
if (existingRAF) {
cancelAnimationFrame(existingRAF)
}
const rafId = requestAnimationFrame(() => {
store.dispatch(updateOneBlock({ id, changes: blockUpdate }))
blockUpdateRafs.delete(id)
})
blockUpdateRafs.set(id, rafId)
await db.message_blocks.update(id, blockUpdate)
}, 150)
blockUpdateThrottlers.set(id, throttler)
}
return blockUpdateThrottlers.get(id)!
}
/**
*
*/
const throttledBlockUpdate = (id: string, blockUpdate: any) => {
const throttler = getBlockThrottler(id)
throttler(blockUpdate)
}
/**
* RAF
*/
const cancelThrottledBlockUpdate = (id: string) => {
const rafId = blockUpdateRafs.get(id)
if (rafId) {
cancelAnimationFrame(rafId)
blockUpdateRafs.delete(id)
}
const throttler = blockUpdateThrottlers.get(id)
if (throttler) {
throttler.cancel()
blockUpdateThrottlers.delete(id)
}
}
/**
*
*/
export const cleanupMultipleBlocks = (dispatch: AppDispatch, blockIds: string[]) => {
blockIds.forEach((id) => {
cancelThrottledBlockUpdate(id)
})
if (blockIds.length > 0) {
dispatch(removeManyBlocks(blockIds))
}
}
// // 修改: 节流更新单个块的内容/状态到数据库 (仅用于 Text/Thinking Chunks)
// export const throttledBlockDbUpdate = throttle(
@ -357,12 +424,12 @@ const fetchAndProcessAssistantResponseImpl = async (
}
},
onTextComplete: async (finalText) => {
cancelThrottledBlockUpdate()
if (lastBlockType === MessageBlockType.MAIN_TEXT && lastBlockId) {
const changes = {
content: finalText,
status: MessageBlockStatus.SUCCESS
}
cancelThrottledBlockUpdate(lastBlockId)
dispatch(updateOneBlock({ id: lastBlockId, changes }))
saveUpdatedBlockToDB(lastBlockId, assistantMsgId, topicId, getState)
@ -415,8 +482,6 @@ const fetchAndProcessAssistantResponseImpl = async (
}
},
onThinkingComplete: (finalText, final_thinking_millsec) => {
cancelThrottledBlockUpdate()
if (lastBlockType === MessageBlockType.THINKING && lastBlockId) {
const changes = {
type: MessageBlockType.THINKING,
@ -424,6 +489,7 @@ const fetchAndProcessAssistantResponseImpl = async (
status: MessageBlockStatus.SUCCESS,
thinking_millsec: final_thinking_millsec
}
cancelThrottledBlockUpdate(lastBlockId)
dispatch(updateOneBlock({ id: lastBlockId, changes }))
saveUpdatedBlockToDB(lastBlockId, assistantMsgId, topicId, getState)
} else {
@ -458,7 +524,6 @@ const fetchAndProcessAssistantResponseImpl = async (
}
},
onToolCallComplete: (toolResponse: MCPToolResponse) => {
cancelThrottledBlockUpdate()
const existingBlockId = toolCallIdToBlockIdMap.get(toolResponse.id)
if (toolResponse.status === 'done' || toolResponse.status === 'error') {
if (!existingBlockId) {
@ -476,6 +541,7 @@ const fetchAndProcessAssistantResponseImpl = async (
if (finalStatus === MessageBlockStatus.ERROR) {
changes.error = { message: `Tool execution failed/error`, details: toolResponse.response }
}
cancelThrottledBlockUpdate(existingBlockId)
dispatch(updateOneBlock({ id: existingBlockId, changes }))
saveUpdatedBlockToDB(existingBlockId, assistantMsgId, topicId, getState)
} else {
@ -577,7 +643,6 @@ const fetchAndProcessAssistantResponseImpl = async (
}
},
onError: async (error) => {
cancelThrottledBlockUpdate()
console.dir(error, { depth: null })
const isErrorTypeAbort = isAbortError(error)
let pauseErrorLanguagePlaceholder = ''
@ -610,6 +675,7 @@ const fetchAndProcessAssistantResponseImpl = async (
const changes: Partial<MessageBlock> = {
status: isErrorTypeAbort ? MessageBlockStatus.PAUSED : MessageBlockStatus.ERROR
}
cancelThrottledBlockUpdate(lastBlockId)
dispatch(updateOneBlock({ id: lastBlockId, changes }))
saveUpdatedBlockToDB(lastBlockId, assistantMsgId, topicId, getState)
}
@ -631,8 +697,6 @@ const fetchAndProcessAssistantResponseImpl = async (
})
},
onComplete: async (status: AssistantMessageStatus, response?: Response) => {
cancelThrottledBlockUpdate()
const finalStateOnComplete = getState()
const finalAssistantMsg = finalStateOnComplete.messages.entities[assistantMsgId]
@ -647,6 +711,7 @@ const fetchAndProcessAssistantResponseImpl = async (
const changes: Partial<MessageBlock> = {
status: MessageBlockStatus.SUCCESS
}
cancelThrottledBlockUpdate(lastBlockId)
dispatch(updateOneBlock({ id: lastBlockId, changes }))
saveUpdatedBlockToDB(lastBlockId, assistantMsgId, topicId, getState)
}
@ -819,7 +884,7 @@ export const deleteSingleMessageThunk =
try {
dispatch(newMessagesActions.removeMessage({ topicId, messageId }))
dispatch(removeManyBlocks(blockIdsToDelete))
cleanupMultipleBlocks(dispatch, blockIdsToDelete)
await db.message_blocks.bulkDelete(blockIdsToDelete)
const topic = await db.topics.get(topicId)
if (topic) {
@ -862,7 +927,7 @@ export const deleteMessageGroupThunk =
try {
dispatch(newMessagesActions.removeMessagesByAskId({ topicId, askId }))
dispatch(removeManyBlocks(blockIdsToDelete))
cleanupMultipleBlocks(dispatch, blockIdsToDelete)
await db.message_blocks.bulkDelete(blockIdsToDelete)
const topic = await db.topics.get(topicId)
if (topic) {
@ -892,9 +957,7 @@ export const clearTopicMessagesThunk =
const blockIdsToDelete = Array.from(blockIdsToDeleteSet)
dispatch(newMessagesActions.clearTopicMessages(topicId))
if (blockIdsToDelete.length > 0) {
dispatch(removeManyBlocks(blockIdsToDelete))
}
cleanupMultipleBlocks(dispatch, blockIdsToDelete)
await db.topics.update(topicId, { messages: [] })
if (blockIdsToDelete.length > 0) {
@ -968,9 +1031,7 @@ export const resendMessageThunk =
}
messagesToUpdateInRedux.forEach((update) => dispatch(newMessagesActions.updateMessage(update)))
if (allBlockIdsToDelete.length > 0) {
dispatch(removeManyBlocks(allBlockIdsToDelete))
}
cleanupMultipleBlocks(dispatch, allBlockIdsToDelete)
try {
if (allBlockIdsToDelete.length > 0) {
@ -1069,9 +1130,7 @@ export const regenerateAssistantResponseThunk =
)
// 6. Remove old blocks from Redux
if (blockIdsToDelete.length > 0) {
dispatch(removeManyBlocks(blockIdsToDelete))
}
cleanupMultipleBlocks(dispatch, blockIdsToDelete)
// 7. Update DB: Save the reset message state within the topic and delete old blocks
// Fetch the current state *after* Redux updates to get the latest message list
@ -1516,9 +1575,7 @@ export const removeBlocksThunk =
// 1. Update Redux state
dispatch(newMessagesActions.updateMessage({ topicId, messageId, updates: { blocks: updatedBlockIds } }))
if (blockIdsToRemove.length > 0) {
dispatch(removeManyBlocks(blockIdsToRemove))
}
cleanupMultipleBlocks(dispatch, blockIdsToRemove)
const finalMessagesToSave = selectMessagesForTopic(getState(), topicId)