feat(ApiService): enhance chat completion handling with chunk reception

- Added onChunkReceived call to fetchChatCompletion to improve response handling.
- Removed redundant onChunkReceived call from the completion parameters section in fetchChatCompletion.
This commit is contained in:
kangfenmao 2025-08-07 14:54:22 +08:00
parent d44fa1775c
commit ee5e420419
2 changed files with 2 additions and 544 deletions

View File

@ -437,6 +437,8 @@ export async function fetchChatCompletion({
}) {
logger.debug('fetchChatCompletion', messages, assistant)
onChunkReceived({ type: ChunkType.LLM_RESPONSE_CREATED })
if (assistant.prompt && containsSupportedVariables(assistant.prompt)) {
assistant.prompt = await replacePromptVariables(assistant.prompt, assistant.model?.name)
}
@ -485,7 +487,6 @@ export async function fetchChatCompletion({
isGenerateImageModel(model) && (isSupportedDisableGenerationModel(model) ? assistant.enableGenerateImage : true)
// --- Call AI Completions ---
onChunkReceived({ type: ChunkType.LLM_RESPONSE_CREATED })
const completionsParams: CompletionsParams = {
callType: 'chat',

View File

@ -301,91 +301,6 @@ const fetchAndProcessAssistantResponseImpl = async (
try {
dispatch(newMessagesActions.setTopicLoading({ topicId, loading: true }))
// let accumulatedContent = ''
// let accumulatedThinking = ''
// let lastBlockId: string | null = null
// let lastBlockType: MessageBlockType | null = null
// let currentActiveBlockType: MessageBlockType | null = null
// // 专注于块内部的生命周期处理
// let initialPlaceholderBlockId: string | null = null
// let citationBlockId: string | null = null
// let mainTextBlockId: string | null = null
// let thinkingBlockId: string | null = null
// let imageBlockId: string | null = null
// let toolBlockId: string | null = null
// const toolCallIdToBlockIdMap = new Map<string, string>()
// const notificationService = NotificationService.getInstance()
/**
* 使
* - 使
* -
* @param blockId ID
* @param changes
* @param blockType
* @param isComplete redux中
*/
// const smartBlockUpdate = (
// blockId: string,
// changes: Partial<MessageBlock>,
// blockType: MessageBlockType,
// isComplete: boolean = false
// ) => {
// const isBlockTypeChanged = currentActiveBlockType !== null && currentActiveBlockType !== blockType
// if (isBlockTypeChanged || isComplete) {
// // 如果块类型改变则取消上一个块的节流更新并保存块更新到redux中尽管有可能被上一个块本身的oncomplete事件的取消节流已经取消了
// if (isBlockTypeChanged && lastBlockId) {
// cancelThrottledBlockUpdate(lastBlockId)
// }
// // 如果当前块完成则取消当前块的节流更新并保存块更新到redux中避免streaming状态覆盖掉完成状态
// if (isComplete) {
// cancelThrottledBlockUpdate(blockId)
// }
// dispatch(updateOneBlock({ id: blockId, changes }))
// saveUpdatedBlockToDB(blockId, assistantMsgId, topicId, getState)
// } else {
// throttledBlockUpdate(blockId, changes)
// }
// // 更新当前活跃块类型
// currentActiveBlockType = blockType
// }
// const handleBlockTransition = async (newBlock: MessageBlock, newBlockType: MessageBlockType) => {
// lastBlockId = newBlock.id
// lastBlockType = newBlockType
// if (newBlockType !== MessageBlockType.MAIN_TEXT) {
// accumulatedContent = ''
// }
// if (newBlockType !== MessageBlockType.THINKING) {
// accumulatedThinking = ''
// }
// dispatch(
// newMessagesActions.updateMessage({
// topicId,
// messageId: assistantMsgId,
// updates: { blockInstruction: { id: newBlock.id } }
// })
// )
// dispatch(upsertOneBlock(newBlock))
// dispatch(
// newMessagesActions.upsertBlockReference({
// messageId: assistantMsgId,
// blockId: newBlock.id,
// status: newBlock.status
// })
// )
// const currentState = getState()
// const updatedMessage = currentState.messages.entities[assistantMsgId]
// if (updatedMessage) {
// await saveUpdatesToDB(assistantMsgId, topicId, { blocks: updatedMessage.blocks }, [newBlock])
// } else {
// console.error(`[handleBlockTransition] Failed to get updated message ${assistantMsgId} from state for DB save.`)
// }
// }
// 创建 BlockManager 实例
const blockManager = new BlockManager({
dispatch,
@ -419,464 +334,6 @@ const fetchAndProcessAssistantResponseImpl = async (
messagesForContext = contextSlice.filter((m) => m && !m.status?.includes('ing'))
}
// callbacks = {
// onLLMResponseCreated: async () => {
// const baseBlock = createBaseMessageBlock(assistantMsgId, MessageBlockType.UNKNOWN, {
// status: MessageBlockStatus.PROCESSING
// })
// initialPlaceholderBlockId = baseBlock.id
// await handleBlockTransition(baseBlock as PlaceholderMessageBlock, MessageBlockType.UNKNOWN)
// },
// onTextStart: async () => {
// if (initialPlaceholderBlockId) {
// lastBlockType = MessageBlockType.MAIN_TEXT
// const changes = {
// type: MessageBlockType.MAIN_TEXT,
// content: accumulatedContent,
// status: MessageBlockStatus.STREAMING
// }
// smartBlockUpdate(initialPlaceholderBlockId, changes, MessageBlockType.MAIN_TEXT, true)
// mainTextBlockId = initialPlaceholderBlockId
// initialPlaceholderBlockId = null
// } else if (!mainTextBlockId) {
// const newBlock = createMainTextBlock(assistantMsgId, accumulatedContent, {
// status: MessageBlockStatus.STREAMING
// })
// mainTextBlockId = newBlock.id
// await handleBlockTransition(newBlock, MessageBlockType.MAIN_TEXT)
// }
// },
// onTextChunk: async (text) => {
// const citationBlockSource = citationBlockId
// ? (getState().messageBlocks.entities[citationBlockId] as CitationMessageBlock).response?.source
// : WebSearchSource.WEBSEARCH
// accumulatedContent += text
// if (mainTextBlockId) {
// const blockChanges: Partial<MessageBlock> = {
// content: accumulatedContent,
// status: MessageBlockStatus.STREAMING,
// citationReferences: citationBlockId ? [{ citationBlockId, citationBlockSource }] : []
// }
// smartBlockUpdate(mainTextBlockId, blockChanges, MessageBlockType.MAIN_TEXT)
// }
// },
// onTextComplete: async (finalText) => {
// if (mainTextBlockId) {
// const changes = {
// content: finalText,
// status: MessageBlockStatus.SUCCESS
// }
// smartBlockUpdate(mainTextBlockId, changes, MessageBlockType.MAIN_TEXT, true)
// mainTextBlockId = null
// } else {
// console.warn(
// `[onTextComplete] Received text.complete but last block was not MAIN_TEXT (was ${lastBlockType}) or lastBlockId is null.`
// )
// }
// },
// onThinkingStart: async () => {
// if (initialPlaceholderBlockId) {
// lastBlockType = MessageBlockType.THINKING
// const changes = {
// type: MessageBlockType.THINKING,
// content: accumulatedThinking,
// status: MessageBlockStatus.STREAMING,
// thinking_millsec: 0
// }
// thinkingBlockId = initialPlaceholderBlockId
// initialPlaceholderBlockId = null
// smartBlockUpdate(thinkingBlockId, changes, MessageBlockType.THINKING, true)
// } else if (!thinkingBlockId) {
// const newBlock = createThinkingBlock(assistantMsgId, accumulatedThinking, {
// status: MessageBlockStatus.STREAMING,
// thinking_millsec: 0
// })
// thinkingBlockId = newBlock.id
// await handleBlockTransition(newBlock, MessageBlockType.THINKING)
// }
// },
// onThinkingChunk: async (text, thinking_millsec) => {
// accumulatedThinking += text
// if (thinkingBlockId) {
// const blockChanges: Partial<MessageBlock> = {
// content: accumulatedThinking,
// status: MessageBlockStatus.STREAMING,
// thinking_millsec: thinking_millsec
// }
// smartBlockUpdate(thinkingBlockId, blockChanges, MessageBlockType.THINKING)
// }
// },
// onThinkingComplete: (finalText, final_thinking_millsec) => {
// if (thinkingBlockId) {
// const changes = {
// type: MessageBlockType.THINKING,
// content: finalText,
// status: MessageBlockStatus.SUCCESS,
// thinking_millsec: final_thinking_millsec
// }
// smartBlockUpdate(thinkingBlockId, changes, MessageBlockType.THINKING, true)
// } else {
// console.warn(
// `[onThinkingComplete] Received thinking.complete but last block was not THINKING (was ${lastBlockType}) or lastBlockId is null.`
// )
// }
// thinkingBlockId = null
// },
// onToolCallPending: (toolResponse: MCPToolResponse) => {
// if (initialPlaceholderBlockId) {
// lastBlockType = MessageBlockType.TOOL
// const changes = {
// type: MessageBlockType.TOOL,
// status: MessageBlockStatus.PENDING,
// toolName: toolResponse.tool.name,
// metadata: { rawMcpToolResponse: toolResponse }
// }
// toolBlockId = initialPlaceholderBlockId
// initialPlaceholderBlockId = null
// smartBlockUpdate(toolBlockId, changes, MessageBlockType.TOOL)
// toolCallIdToBlockIdMap.set(toolResponse.id, toolBlockId)
// } else if (toolResponse.status === 'pending') {
// const toolBlock = createToolBlock(assistantMsgId, toolResponse.id, {
// toolName: toolResponse.tool.name,
// status: MessageBlockStatus.PENDING,
// metadata: { rawMcpToolResponse: toolResponse }
// })
// toolBlockId = toolBlock.id
// handleBlockTransition(toolBlock, MessageBlockType.TOOL)
// toolCallIdToBlockIdMap.set(toolResponse.id, toolBlock.id)
// } else {
// console.warn(
// `[onToolCallPending] Received unhandled tool status: ${toolResponse.status} for ID: ${toolResponse.id}`
// )
// }
// },
// onToolCallInProgress: (toolResponse: MCPToolResponse) => {
// // 根据 toolResponse.id 查找对应的块ID
// const targetBlockId = toolCallIdToBlockIdMap.get(toolResponse.id)
// if (targetBlockId && toolResponse.status === 'invoking') {
// const changes = {
// status: MessageBlockStatus.PROCESSING,
// metadata: { rawMcpToolResponse: toolResponse }
// }
// smartBlockUpdate(targetBlockId, changes, MessageBlockType.TOOL)
// } else if (!targetBlockId) {
// console.warn(
// `[onToolCallInProgress] No block ID found for tool ID: ${toolResponse.id}. Available mappings:`,
// Array.from(toolCallIdToBlockIdMap.entries())
// )
// } else {
// console.warn(
// `[onToolCallInProgress] Received unhandled tool status: ${toolResponse.status} for ID: ${toolResponse.id}`
// )
// }
// },
// onToolCallComplete: (toolResponse: MCPToolResponse) => {
// const existingBlockId = toolCallIdToBlockIdMap.get(toolResponse.id)
// toolCallIdToBlockIdMap.delete(toolResponse.id)
// if (toolResponse.status === 'done' || toolResponse.status === 'error' || toolResponse.status === 'cancelled') {
// if (!existingBlockId) {
// console.error(
// `[onToolCallComplete] No existing block found for completed/error tool call ID: ${toolResponse.id}. Cannot update.`
// )
// return
// }
// const finalStatus =
// toolResponse.status === 'done' || toolResponse.status === 'cancelled'
// ? MessageBlockStatus.SUCCESS
// : MessageBlockStatus.ERROR
// const changes: Partial<ToolMessageBlock> = {
// content: toolResponse.response,
// status: finalStatus,
// metadata: { rawMcpToolResponse: toolResponse }
// }
// if (finalStatus === MessageBlockStatus.ERROR) {
// changes.error = { message: `Tool execution failed/error`, details: toolResponse.response }
// }
// smartBlockUpdate(existingBlockId, changes, MessageBlockType.TOOL, true)
// } else {
// console.warn(
// `[onToolCallComplete] Received unhandled tool status: ${toolResponse.status} for ID: ${toolResponse.id}`
// )
// }
// toolBlockId = null
// },
// onExternalToolInProgress: async () => {
// const citationBlock = createCitationBlock(assistantMsgId, {}, { status: MessageBlockStatus.PROCESSING })
// citationBlockId = citationBlock.id
// await handleBlockTransition(citationBlock, MessageBlockType.CITATION)
// // saveUpdatedBlockToDB(citationBlock.id, assistantMsgId, topicId, getState)
// },
// onExternalToolComplete: (externalToolResult: ExternalToolResult) => {
// if (citationBlockId) {
// const changes: Partial<CitationMessageBlock> = {
// response: externalToolResult.webSearch,
// knowledge: externalToolResult.knowledge,
// status: MessageBlockStatus.SUCCESS
// }
// smartBlockUpdate(citationBlockId, changes, MessageBlockType.CITATION, true)
// } else {
// console.error('[onExternalToolComplete] citationBlockId is null. Cannot update.')
// }
// },
// onLLMWebSearchInProgress: async () => {
// if (initialPlaceholderBlockId) {
// lastBlockType = MessageBlockType.CITATION
// citationBlockId = initialPlaceholderBlockId
// const changes = {
// type: MessageBlockType.CITATION,
// status: MessageBlockStatus.PROCESSING
// }
// lastBlockType = MessageBlockType.CITATION
// smartBlockUpdate(initialPlaceholderBlockId, changes, MessageBlockType.CITATION)
// initialPlaceholderBlockId = null
// } else {
// const citationBlock = createCitationBlock(assistantMsgId, {}, { status: MessageBlockStatus.PROCESSING })
// citationBlockId = citationBlock.id
// await handleBlockTransition(citationBlock, MessageBlockType.CITATION)
// }
// },
// onLLMWebSearchComplete: async (llmWebSearchResult) => {
// const blockId = citationBlockId || initialPlaceholderBlockId
// if (blockId) {
// const changes: Partial<CitationMessageBlock> = {
// type: MessageBlockType.CITATION,
// response: llmWebSearchResult,
// status: MessageBlockStatus.SUCCESS
// }
// smartBlockUpdate(blockId, changes, MessageBlockType.CITATION)
// const state = getState()
// const existingMainTextBlocks = findMainTextBlocks(state.messages.entities[assistantMsgId])
// if (existingMainTextBlocks.length > 0) {
// const existingMainTextBlock = existingMainTextBlocks[0]
// const currentRefs = existingMainTextBlock.citationReferences || []
// const mainTextChanges = {
// citationReferences: [...currentRefs, { blockId, citationBlockSource: llmWebSearchResult.source }]
// }
// smartBlockUpdate(existingMainTextBlock.id, mainTextChanges, MessageBlockType.MAIN_TEXT, true)
// }
// if (initialPlaceholderBlockId) {
// citationBlockId = initialPlaceholderBlockId
// initialPlaceholderBlockId = null
// }
// } else {
// const citationBlock = createCitationBlock(
// assistantMsgId,
// {
// response: llmWebSearchResult
// },
// {
// status: MessageBlockStatus.SUCCESS
// }
// )
// citationBlockId = citationBlock.id
// const state = getState()
// const existingMainTextBlocks = findMainTextBlocks(state.messages.entities[assistantMsgId])
// if (existingMainTextBlocks.length > 0) {
// const existingMainTextBlock = existingMainTextBlocks[0]
// const currentRefs = existingMainTextBlock.citationReferences || []
// const mainTextChanges = {
// citationReferences: [...currentRefs, { citationBlockId, citationBlockSource: llmWebSearchResult.source }]
// }
// smartBlockUpdate(existingMainTextBlock.id, mainTextChanges, MessageBlockType.MAIN_TEXT, true)
// }
// await handleBlockTransition(citationBlock, MessageBlockType.CITATION)
// }
// },
// onImageCreated: async () => {
// if (initialPlaceholderBlockId) {
// lastBlockType = MessageBlockType.IMAGE
// const initialChanges: Partial<MessageBlock> = {
// type: MessageBlockType.IMAGE,
// status: MessageBlockStatus.PENDING
// }
// lastBlockType = MessageBlockType.IMAGE
// imageBlockId = initialPlaceholderBlockId
// initialPlaceholderBlockId = null
// smartBlockUpdate(imageBlockId, initialChanges, MessageBlockType.IMAGE)
// } else if (!imageBlockId) {
// const imageBlock = createImageBlock(assistantMsgId, {
// status: MessageBlockStatus.PENDING
// })
// imageBlockId = imageBlock.id
// await handleBlockTransition(imageBlock, MessageBlockType.IMAGE)
// }
// },
// onImageDelta: (imageData) => {
// const imageUrl = imageData.images?.[0] || 'placeholder_image_url'
// if (imageBlockId) {
// const changes: Partial<ImageMessageBlock> = {
// url: imageUrl,
// metadata: { generateImageResponse: imageData },
// status: MessageBlockStatus.STREAMING
// }
// smartBlockUpdate(imageBlockId, changes, MessageBlockType.IMAGE, true)
// }
// },
// onImageGenerated: (imageData) => {
// if (imageBlockId) {
// if (!imageData) {
// const changes: Partial<ImageMessageBlock> = {
// status: MessageBlockStatus.SUCCESS
// }
// smartBlockUpdate(imageBlockId, changes, MessageBlockType.IMAGE)
// } else {
// const imageUrl = imageData.images?.[0] || 'placeholder_image_url'
// const changes: Partial<ImageMessageBlock> = {
// url: imageUrl,
// metadata: { generateImageResponse: imageData },
// status: MessageBlockStatus.SUCCESS
// }
// smartBlockUpdate(imageBlockId, changes, MessageBlockType.IMAGE, true)
// }
// } else {
// console.error('[onImageGenerated] Last block was not an Image block or ID is missing.')
// }
// imageBlockId = null
// },
// onError: async (error) => {
// console.dir(error, { depth: null })
// const isErrorTypeAbort = isAbortError(error)
// let pauseErrorLanguagePlaceholder = ''
// if (isErrorTypeAbort) {
// pauseErrorLanguagePlaceholder = 'pause_placeholder'
// }
// const serializableError = {
// name: error.name,
// message: pauseErrorLanguagePlaceholder || error.message || formatErrorMessage(error),
// originalMessage: error.message,
// stack: error.stack,
// status: error.status || error.code,
// requestId: error.request_id
// }
// if (!isOnHomePage()) {
// await notificationService.send({
// id: uuid(),
// type: 'error',
// title: t('notification.assistant'),
// message: serializableError.message,
// silent: false,
// timestamp: Date.now(),
// source: 'assistant'
// })
// }
// const possibleBlockId =
// mainTextBlockId ||
// thinkingBlockId ||
// toolBlockId ||
// imageBlockId ||
// citationBlockId ||
// initialPlaceholderBlockId ||
// lastBlockId
// if (possibleBlockId) {
// // 更改上一个block的状态为ERROR
// const changes: Partial<MessageBlock> = {
// status: isErrorTypeAbort ? MessageBlockStatus.PAUSED : MessageBlockStatus.ERROR
// }
// smartBlockUpdate(possibleBlockId, changes, lastBlockType!, true)
// }
// const errorBlock = createErrorBlock(assistantMsgId, serializableError, { status: MessageBlockStatus.SUCCESS })
// await handleBlockTransition(errorBlock, MessageBlockType.ERROR)
// const messageErrorUpdate = {
// status: isErrorTypeAbort ? AssistantMessageStatus.SUCCESS : AssistantMessageStatus.ERROR
// }
// dispatch(newMessagesActions.updateMessage({ topicId, messageId: assistantMsgId, updates: messageErrorUpdate }))
// saveUpdatesToDB(assistantMsgId, topicId, messageErrorUpdate, [])
// EventEmitter.emit(EVENT_NAMES.MESSAGE_COMPLETE, {
// id: assistantMsgId,
// topicId,
// status: isErrorTypeAbort ? 'pause' : 'error',
// error: error.message
// })
// },
// onComplete: async (status: AssistantMessageStatus, response?: Response) => {
// const finalStateOnComplete = getState()
// const finalAssistantMsg = finalStateOnComplete.messages.entities[assistantMsgId]
// if (status === 'success' && finalAssistantMsg) {
// const userMsgId = finalAssistantMsg.askId
// const orderedMsgs = selectMessagesForTopic(finalStateOnComplete, topicId)
// const userMsgIndex = orderedMsgs.findIndex((m) => m.id === userMsgId)
// const contextForUsage = userMsgIndex !== -1 ? orderedMsgs.slice(0, userMsgIndex + 1) : []
// const finalContextWithAssistant = [...contextForUsage, finalAssistantMsg]
// const possibleBlockId =
// mainTextBlockId ||
// thinkingBlockId ||
// toolBlockId ||
// imageBlockId ||
// citationBlockId ||
// initialPlaceholderBlockId ||
// lastBlockId
// if (possibleBlockId) {
// const changes: Partial<MessageBlock> = {
// status: MessageBlockStatus.SUCCESS
// }
// smartBlockUpdate(possibleBlockId, changes, lastBlockType!, true)
// }
// const endTime = Date.now()
// const duration = endTime - startTime
// const content = getMainTextContent(finalAssistantMsg)
// if (!isOnHomePage() && duration > 60 * 1000) {
// await notificationService.send({
// id: uuid(),
// type: 'success',
// title: t('notification.assistant'),
// message: content.length > 50 ? content.slice(0, 47) + '...' : content,
// silent: false,
// timestamp: Date.now(),
// source: 'assistant'
// })
// }
// // 更新topic的name
// autoRenameTopic(assistant, topicId)
// if (
// response &&
// (response.usage?.total_tokens === 0 ||
// response?.usage?.prompt_tokens === 0 ||
// response?.usage?.completion_tokens === 0)
// ) {
// const usage = await estimateMessagesUsage({ assistant, messages: finalContextWithAssistant })
// response.usage = usage
// }
// // dispatch(newMessagesActions.setTopicLoading({ topicId, loading: false }))
// }
// if (response && response.metrics) {
// if (response.metrics.completion_tokens === 0 && response.usage?.completion_tokens) {
// response = {
// ...response,
// metrics: {
// ...response.metrics,
// completion_tokens: response.usage.completion_tokens
// }
// }
// }
// }
// const messageUpdates: Partial<Message> = { status, metrics: response?.metrics, usage: response?.usage }
// dispatch(
// newMessagesActions.updateMessage({
// topicId,
// messageId: assistantMsgId,
// updates: messageUpdates
// })
// )
// saveUpdatesToDB(assistantMsgId, topicId, messageUpdates, [])
// EventEmitter.emit(EVENT_NAMES.MESSAGE_COMPLETE, { id: assistantMsgId, topicId, status })
// }
// }
callbacks = createCallbacks({
blockManager,
dispatch,