fix: enhance logging and update async handling in StreamProcessingService and messageThunk

- Enabled logging in `createStreamProcessor` for better debugging.
- Added logging for updated messages in `updateExistingMessageAndBlocksInDB` and `saveUpdatesToDB`.
- Updated `onTextComplete` and `onLLMWebSearchComplete` to handle asynchronous operations correctly.
- Commented out unused `saveUpdatedBlockToDB` calls to prevent unnecessary database updates.
This commit is contained in:
MyPrototypeWhat 2025-05-09 19:47:24 +08:00
parent c402a1d21f
commit 6d9107558e
2 changed files with 10 additions and 8 deletions

View File

@ -40,7 +40,7 @@ export function createStreamProcessor(callbacks: StreamProcessorCallbacks = {})
// The returned function processes a single chunk or a final signal // The returned function processes a single chunk or a final signal
return (chunk: Chunk) => { return (chunk: Chunk) => {
try { try {
// console.log(`[${new Date().toLocaleString()}] createStreamProcessor ${chunk.type}`, chunk) console.log(`[${new Date().toLocaleString()}] createStreamProcessor ${chunk.type}`, chunk)
// 1. Handle the manual final signal first // 1. Handle the manual final signal first
if (chunk?.type === ChunkType.BLOCK_COMPLETE) { if (chunk?.type === ChunkType.BLOCK_COMPLETE) {
callbacks.onComplete?.(AssistantMessageStatus.SUCCESS, chunk?.response) callbacks.onComplete?.(AssistantMessageStatus.SUCCESS, chunk?.response)

View File

@ -91,6 +91,7 @@ const updateExistingMessageAndBlocksInDB = async (
const newMessages = [...topic.messages] const newMessages = [...topic.messages]
// Apply the updates passed in updatedMessage // Apply the updates passed in updatedMessage
Object.assign(newMessages[messageIndex], updatedMessage) Object.assign(newMessages[messageIndex], updatedMessage)
console.log('updateExistingMessageAndBlocksInDB', updatedMessage)
await db.topics.update(updatedMessage.topicId, { messages: newMessages }) await db.topics.update(updatedMessage.topicId, { messages: newMessages })
} else { } else {
console.error(`[updateExistingMsg] Message ${updatedMessage.id} not found in topic ${updatedMessage.topicId}`) console.error(`[updateExistingMsg] Message ${updatedMessage.id} not found in topic ${updatedMessage.topicId}`)
@ -154,6 +155,7 @@ const saveUpdatesToDB = async (
messageUpdates: Partial<Message>, // 需要更新的消息字段 messageUpdates: Partial<Message>, // 需要更新的消息字段
blocksToUpdate: MessageBlock[] // 需要更新/创建的块 blocksToUpdate: MessageBlock[] // 需要更新/创建的块
) => { ) => {
console.log('messageUpdates', messageUpdates)
try { try {
const messageDataToSave: Partial<Message> & Pick<Message, 'id' | 'topicId'> = { const messageDataToSave: Partial<Message> & Pick<Message, 'id' | 'topicId'> = {
id: messageId, id: messageId,
@ -351,7 +353,7 @@ const fetchAndProcessAssistantResponseImpl = async (
} }
} }
}, },
onTextComplete: (finalText) => { onTextComplete: async (finalText) => {
if (lastBlockType === MessageBlockType.MAIN_TEXT && lastBlockId) { if (lastBlockType === MessageBlockType.MAIN_TEXT && lastBlockId) {
const changes = { const changes = {
content: finalText, content: finalText,
@ -368,8 +370,8 @@ const fetchAndProcessAssistantResponseImpl = async (
{ response: { source: WebSearchSource.OPENROUTER, results: extractedUrls } }, { response: { source: WebSearchSource.OPENROUTER, results: extractedUrls } },
{ status: MessageBlockStatus.SUCCESS } { status: MessageBlockStatus.SUCCESS }
) )
handleBlockTransition(citationBlock, MessageBlockType.CITATION) await handleBlockTransition(citationBlock, MessageBlockType.CITATION)
saveUpdatedBlockToDB(citationBlock.id, assistantMsgId, topicId, getState) // saveUpdatedBlockToDB(citationBlock.id, assistantMsgId, topicId, getState)
} }
} }
} else { } else {
@ -469,7 +471,7 @@ const fetchAndProcessAssistantResponseImpl = async (
const citationBlock = createCitationBlock(assistantMsgId, {}, { status: MessageBlockStatus.PROCESSING }) const citationBlock = createCitationBlock(assistantMsgId, {}, { status: MessageBlockStatus.PROCESSING })
citationBlockId = citationBlock.id citationBlockId = citationBlock.id
handleBlockTransition(citationBlock, MessageBlockType.CITATION) handleBlockTransition(citationBlock, MessageBlockType.CITATION)
saveUpdatedBlockToDB(citationBlock.id, assistantMsgId, topicId, getState) // saveUpdatedBlockToDB(citationBlock.id, assistantMsgId, topicId, getState)
}, },
onExternalToolComplete: (externalToolResult: ExternalToolResult) => { onExternalToolComplete: (externalToolResult: ExternalToolResult) => {
console.warn('onExternalToolComplete received.', externalToolResult) console.warn('onExternalToolComplete received.', externalToolResult)
@ -489,9 +491,9 @@ const fetchAndProcessAssistantResponseImpl = async (
const citationBlock = createCitationBlock(assistantMsgId, {}, { status: MessageBlockStatus.PROCESSING }) const citationBlock = createCitationBlock(assistantMsgId, {}, { status: MessageBlockStatus.PROCESSING })
citationBlockId = citationBlock.id citationBlockId = citationBlock.id
handleBlockTransition(citationBlock, MessageBlockType.CITATION) handleBlockTransition(citationBlock, MessageBlockType.CITATION)
saveUpdatedBlockToDB(citationBlock.id, assistantMsgId, topicId, getState) // saveUpdatedBlockToDB(citationBlock.id, assistantMsgId, topicId, getState)
}, },
onLLMWebSearchComplete(llmWebSearchResult) { onLLMWebSearchComplete: async (llmWebSearchResult) => {
if (citationBlockId) { if (citationBlockId) {
const changes: Partial<CitationMessageBlock> = { const changes: Partial<CitationMessageBlock> = {
response: llmWebSearchResult, response: llmWebSearchResult,
@ -624,7 +626,7 @@ const fetchAndProcessAssistantResponseImpl = async (
updates: messageUpdates updates: messageUpdates
}) })
) )
console.log('onComplete: saveUpdatesToDB', messageUpdates)
saveUpdatesToDB(assistantMsgId, topicId, messageUpdates, []) saveUpdatesToDB(assistantMsgId, topicId, messageUpdates, [])
EventEmitter.emit(EVENT_NAMES.MESSAGE_COMPLETE, { id: assistantMsgId, topicId, status }) EventEmitter.emit(EVENT_NAMES.MESSAGE_COMPLETE, { id: assistantMsgId, topicId, status })