From 61dddad22f40002ea3a76cb9ae57b79cf36827d3 Mon Sep 17 00:00:00 2001 From: fullex <0xfullex@gmail.com> Date: Sun, 4 Jan 2026 10:47:42 +0800 Subject: [PATCH] refactor: streamline block management and update StreamingService integration - Consolidated block addition logic in BlockManager to enhance clarity and maintainability, ensuring that new blocks also update message references internally. - Updated StreamingService to improve documentation and clarify the relationship between parentId and askId for backward compatibility. - Removed redundant block reference handling, simplifying the message structure and improving performance during streaming sessions. - Enhanced comments and documentation throughout to provide clearer guidance on the updated functionality and its implications for message processing. --- .../services/messageStreaming/BlockManager.ts | 8 +-- .../messageStreaming/StreamingService.ts | 71 +++++++++---------- src/renderer/src/store/thunk/messageThunk.ts | 4 +- 3 files changed, 38 insertions(+), 45 deletions(-) diff --git a/src/renderer/src/services/messageStreaming/BlockManager.ts b/src/renderer/src/services/messageStreaming/BlockManager.ts index 31362d2215..8061574e1d 100644 --- a/src/renderer/src/services/messageStreaming/BlockManager.ts +++ b/src/renderer/src/services/messageStreaming/BlockManager.ts @@ -130,8 +130,7 @@ export class BlockManager { * * This method: * 1. Updates active block tracking state - * 2. Adds new block to StreamingService - * 3. Updates message block references + * 2. Adds new block to StreamingService (which also updates message.blocks references) * * NOTE: DB saves are removed - persistence happens during finalize() */ @@ -140,12 +139,9 @@ export class BlockManager { this._lastBlockType = newBlockType this._activeBlockInfo = { id: newBlock.id, type: newBlockType } // Set new active block info - // Add new block to StreamingService (replaces dispatch(upsertOneBlock)) + // Add new block to StreamingService (also updates message.blocks references internally) streamingService.addBlock(this.deps.assistantMsgId, newBlock) - // Update block reference in message (replaces dispatch(upsertBlockReference)) - streamingService.addBlockReference(this.deps.assistantMsgId, newBlock.id) - // TEMPORARY: The blockInstruction field was used for UI coordination. // TODO: Evaluate if this is still needed with StreamingService approach // For now, we update it in the message diff --git a/src/renderer/src/services/messageStreaming/StreamingService.ts b/src/renderer/src/services/messageStreaming/StreamingService.ts index bafcb2dbb1..9dd4bb3835 100644 --- a/src/renderer/src/services/messageStreaming/StreamingService.ts +++ b/src/renderer/src/services/messageStreaming/StreamingService.ts @@ -55,7 +55,9 @@ interface StreamingSession { // Tree structure information (v2 new fields) parentId: string // Parent message ID (user message) - siblingsGroupId: number // Multi-model group ID (0=normal, >0=multi-model response) + // siblingsGroupId: 0 = single model response, >0 = multi-model response group + // Messages with the same parentId and siblingsGroupId (>0) are displayed together for comparison + siblingsGroupId: number // Context for usage estimation (messages up to and including user message) contextMessages?: Message[] @@ -66,15 +68,18 @@ interface StreamingSession { /** * Options for starting a streaming session + * + * NOTE: Internal naming uses v2 convention (parentId). + * The renderer Message format uses 'askId' for backward compatibility, + * which is set from parentId during message creation. */ interface StartSessionOptions { parentId: string - siblingsGroupId?: number // Defaults to 0 + siblingsGroupId?: number // Defaults to 0 (single model), >0 for multi-model response groups role: 'assistant' model?: Message['model'] modelId?: string assistantId: string - askId?: string traceId?: string agentSessionId?: string // Context messages for usage estimation (messages up to and including user message) @@ -126,13 +131,13 @@ class StreamingService { model, modelId, assistantId, - askId, traceId, agentSessionId, contextMessages } = options // Initialize message structure + // NOTE: askId is set from parentId for renderer format compatibility (v1 uses askId, v2 uses parentId) const message: Message = { id: messageId, topicId, @@ -143,7 +148,7 @@ class StreamingService { blocks: [], model, modelId, - askId, + askId: parentId, // Map v2 parentId to v1 renderer format askId traceId, agentSessionId } @@ -181,9 +186,16 @@ class StreamingService { * * This method: * 1. Converts streaming data to the appropriate format - * 2. Routes to Data API (normal topics) or dbService (agent topics) + * 2. Routes to the appropriate data source based on topic type * 3. Cleans up all related cache keys * + * ## Persistence Paths + * + * - **Normal topics** → Data API (target architecture for v2) + * - **Agent sessions** → dbService (TEMPORARY: This is a transitional approach. + * Agent message storage will be migrated to Data API in a later phase. + * Once migration is complete, all paths will use Data API uniformly.) + * * @param messageId - Session message ID * @param status - Final message status */ @@ -201,13 +213,12 @@ class StreamingService { try { const updatePayload = this.convertToUpdatePayload(session, status) - // TRADEOFF: Using dbService for agent messages instead of Data API - // because agent message storage refactoring is planned for later phase. - // TODO: Unify to Data API when agent message migration is complete. + // Route to appropriate data source based on topic type + // TEMPORARY: Agent sessions use dbService until migration to Data API is complete if (isAgentSessionTopicId(session.topicId)) { await dbService.updateMessageAndBlocks(session.topicId, updatePayload.messageUpdates, updatePayload.blocks) } else { - // Normal topic → Use Data API for persistence + // Normal topic → Use Data API for persistence (v2 target architecture) const dataApiPayload = this.convertToDataApiFormat(session, status) await dataApiService.patch(`/messages/${session.messageId}`, { body: dataApiPayload }) } @@ -376,30 +387,6 @@ class StreamingService { cacheService.setCasual(getMessageKey(messageId), session.message, SESSION_TTL) } - /** - * Add a block reference to the message - * (Replaces dispatch(newMessagesActions.upsertBlockReference)) - * - * Note: In the streaming context, we just need to track the block ID in message.blocks - * The block reference details are maintained in the block itself - * - * @param messageId - Message ID - * @param blockId - Block ID to reference - */ - addBlockReference(messageId: string, blockId: string): void { - const session = this.getSession(messageId) - if (!session) { - logger.warn(`addBlockReference called for non-existent session: ${messageId}`) - return - } - - if (!session.message.blocks.includes(blockId)) { - session.message.blocks = [...session.message.blocks, blockId] - cacheService.setCasual(getSessionKey(messageId), session, SESSION_TTL) - cacheService.setCasual(getMessageKey(messageId), session.message, SESSION_TTL) - } - } - /** * Get a message from the streaming session * @@ -458,14 +445,20 @@ class StreamingService { /** * Generate the next siblingsGroupId for a topic. * - * Used for multi-model responses where multiple assistant messages + * ## siblingsGroupId Semantics + * + * - **0** = Single-model response (one assistant message per user message) + * - **>0** = Multi-model response group (multiple assistant messages sharing + * the same parentId belong to the same sibling group for parallel comparison) + * + * This method is used for multi-model responses where multiple assistant messages * share the same parentId and siblingsGroupId (>0). * - * The counter is stored in CacheService and auto-increments. + * The counter is stored in CacheService and auto-increments per topic. * Single-model responses should use siblingsGroupId=0 (not generated here). * * @param topicId - Topic ID - * @returns Next siblingsGroupId (always > 0) + * @returns Next siblingsGroupId (always > 0 for multi-model groups) */ //FIXME [v2] 现在获取 siblingsGroupId 的方式是不正确,后续再做修改调整 generateNextGroupId(topicId: string): number { @@ -564,6 +557,9 @@ class StreamingService { * * For newly created pending messages, blocks are empty. * + * NOTE: Field mapping for backward compatibility: + * - shared.parentId (v2 Data API) → askId (v1 renderer format) + * * @param shared - Message from Data API response * @param assistantId - Assistant ID to include * @param model - Optional Model object to include @@ -578,6 +574,7 @@ class StreamingService { status: shared.status as AssistantMessageStatus, blocks: [], // For new pending messages, blocks are empty createdAt: shared.createdAt, + // v2 Data API uses 'parentId'; renderer format uses 'askId' for backward compatibility askId: shared.parentId ?? undefined, modelId: shared.modelId ?? undefined, traceId: shared.traceId ?? undefined, diff --git a/src/renderer/src/store/thunk/messageThunk.ts b/src/renderer/src/store/thunk/messageThunk.ts index cbc7d8b82f..58af404677 100644 --- a/src/renderer/src/store/thunk/messageThunk.ts +++ b/src/renderer/src/store/thunk/messageThunk.ts @@ -567,6 +567,7 @@ const fetchAndProcessAgentResponseImpl = async ( dispatch(newMessagesActions.setTopicLoading({ topicId, loading: true })) // Initialize streaming session in StreamingService + // NOTE: parentId is used internally; askId in renderer format is derived from parentId streamingService.startSession(topicId, assistantMessage.id, { parentId: userMessageId, siblingsGroupId: 0, @@ -574,7 +575,6 @@ const fetchAndProcessAgentResponseImpl = async ( model: assistant.model, modelId: assistant.model?.id, assistantId: assistant.id, - askId: userMessageId, traceId: assistantMessage.traceId, agentSessionId: agentSession.agentSessionId }) @@ -840,6 +840,7 @@ const fetchAndProcessAssistantResponseImpl = async ( } // Initialize streaming session in StreamingService (includes context for usage estimation) + // NOTE: parentId is used internally; askId in renderer format is derived from parentId streamingService.startSession(topicId, assistantMsgId, { parentId: userMessageId!, siblingsGroupId, @@ -847,7 +848,6 @@ const fetchAndProcessAssistantResponseImpl = async ( model: assistant.model, modelId: assistant.model?.id, assistantId: assistant.id, - askId: userMessageId, traceId: assistantMessage.traceId, contextMessages: messagesForContext })