fix: enhance logging and update async handling in StreamProcessingService and messageThunk

- Enabled logging in `createStreamProcessor` for better debugging.
- Added logging for updated messages in `updateExistingMessageAndBlocksInDB` and `saveUpdatesToDB`.
- Updated `onTextComplete` and `onLLMWebSearchComplete` to handle asynchronous operations correctly.
- Commented out unused `saveUpdatedBlockToDB` calls to prevent unnecessary database updates.
This commit is contained in:
MyPrototypeWhat 2025-05-09 19:47:24 +08:00
parent c402a1d21f
commit 6d9107558e
2 changed files with 10 additions and 8 deletions

View File

@ -40,7 +40,7 @@ export function createStreamProcessor(callbacks: StreamProcessorCallbacks = {})
// The returned function processes a single chunk or a final signal
return (chunk: Chunk) => {
try {
// console.log(`[${new Date().toLocaleString()}] createStreamProcessor ${chunk.type}`, chunk)
console.log(`[${new Date().toLocaleString()}] createStreamProcessor ${chunk.type}`, chunk)
// 1. Handle the manual final signal first
if (chunk?.type === ChunkType.BLOCK_COMPLETE) {
callbacks.onComplete?.(AssistantMessageStatus.SUCCESS, chunk?.response)

View File

@ -91,6 +91,7 @@ const updateExistingMessageAndBlocksInDB = async (
const newMessages = [...topic.messages]
// Apply the updates passed in updatedMessage
Object.assign(newMessages[messageIndex], updatedMessage)
console.log('updateExistingMessageAndBlocksInDB', updatedMessage)
await db.topics.update(updatedMessage.topicId, { messages: newMessages })
} else {
console.error(`[updateExistingMsg] Message ${updatedMessage.id} not found in topic ${updatedMessage.topicId}`)
@ -154,6 +155,7 @@ const saveUpdatesToDB = async (
messageUpdates: Partial<Message>, // 需要更新的消息字段
blocksToUpdate: MessageBlock[] // 需要更新/创建的块
) => {
console.log('messageUpdates', messageUpdates)
try {
const messageDataToSave: Partial<Message> & Pick<Message, 'id' | 'topicId'> = {
id: messageId,
@ -351,7 +353,7 @@ const fetchAndProcessAssistantResponseImpl = async (
}
}
},
onTextComplete: (finalText) => {
onTextComplete: async (finalText) => {
if (lastBlockType === MessageBlockType.MAIN_TEXT && lastBlockId) {
const changes = {
content: finalText,
@ -368,8 +370,8 @@ const fetchAndProcessAssistantResponseImpl = async (
{ response: { source: WebSearchSource.OPENROUTER, results: extractedUrls } },
{ status: MessageBlockStatus.SUCCESS }
)
handleBlockTransition(citationBlock, MessageBlockType.CITATION)
saveUpdatedBlockToDB(citationBlock.id, assistantMsgId, topicId, getState)
await handleBlockTransition(citationBlock, MessageBlockType.CITATION)
// saveUpdatedBlockToDB(citationBlock.id, assistantMsgId, topicId, getState)
}
}
} else {
@ -469,7 +471,7 @@ const fetchAndProcessAssistantResponseImpl = async (
const citationBlock = createCitationBlock(assistantMsgId, {}, { status: MessageBlockStatus.PROCESSING })
citationBlockId = citationBlock.id
handleBlockTransition(citationBlock, MessageBlockType.CITATION)
saveUpdatedBlockToDB(citationBlock.id, assistantMsgId, topicId, getState)
// saveUpdatedBlockToDB(citationBlock.id, assistantMsgId, topicId, getState)
},
onExternalToolComplete: (externalToolResult: ExternalToolResult) => {
console.warn('onExternalToolComplete received.', externalToolResult)
@ -489,9 +491,9 @@ const fetchAndProcessAssistantResponseImpl = async (
const citationBlock = createCitationBlock(assistantMsgId, {}, { status: MessageBlockStatus.PROCESSING })
citationBlockId = citationBlock.id
handleBlockTransition(citationBlock, MessageBlockType.CITATION)
saveUpdatedBlockToDB(citationBlock.id, assistantMsgId, topicId, getState)
// saveUpdatedBlockToDB(citationBlock.id, assistantMsgId, topicId, getState)
},
onLLMWebSearchComplete(llmWebSearchResult) {
onLLMWebSearchComplete: async (llmWebSearchResult) => {
if (citationBlockId) {
const changes: Partial<CitationMessageBlock> = {
response: llmWebSearchResult,
@ -624,7 +626,7 @@ const fetchAndProcessAssistantResponseImpl = async (
updates: messageUpdates
})
)
console.log('onComplete: saveUpdatesToDB', messageUpdates)
saveUpdatesToDB(assistantMsgId, topicId, messageUpdates, [])
EventEmitter.emit(EVENT_NAMES.MESSAGE_COMPLETE, { id: assistantMsgId, topicId, status })