From 8efb738753fd480e7dd70019b2916e1b134b7fd8 Mon Sep 17 00:00:00 2001 From: fullex <0xfullex@gmail.com> Date: Fri, 9 Jan 2026 13:24:17 +0800 Subject: [PATCH] refactor(StreamingService): implement immutable updates for session and message handling - Updated the StreamingService to create new immutable instances of sessions and messages when adding blocks or applying updates. This change enhances cache notification mechanisms by ensuring that references are updated correctly. - Improved the handling of block additions and message updates to maintain immutability, which aids in better state management and performance. --- .../messageStreaming/StreamingService.ts | 50 ++++++++++++------- 1 file changed, 33 insertions(+), 17 deletions(-) diff --git a/src/renderer/src/services/messageStreaming/StreamingService.ts b/src/renderer/src/services/messageStreaming/StreamingService.ts index 8707b18a0c..674e5fbace 100644 --- a/src/renderer/src/services/messageStreaming/StreamingService.ts +++ b/src/renderer/src/services/messageStreaming/StreamingService.ts @@ -282,18 +282,23 @@ class StreamingService { // Register block mapping this.blockToMessageMap.set(block.id, messageId) - // Add to session - session.blocks[block.id] = block - - // Update message block references - if (!session.message.blocks.includes(block.id)) { - session.message.blocks = [...session.message.blocks, block.id] + // Create new message with updated blocks (immutable update for cache notification) + const newMessage: Message = { + ...session.message, + blocks: session.message.blocks.includes(block.id) ? session.message.blocks : [...session.message.blocks, block.id] } - // Update caches - cacheService.set(getSessionKey(messageId), session, SESSION_TTL) + // Create new session with updated blocks and message (immutable update for cache notification) + const newSession: StreamingSession = { + ...session, + blocks: { ...session.blocks, [block.id]: block }, + message: newMessage + } + + // Update caches with new references to trigger notifications + cacheService.set(getSessionKey(messageId), newSession, SESSION_TTL) cacheService.set(getBlockKey(block.id), block, SESSION_TTL) - cacheService.set(getMessageKey(messageId), session.message, SESSION_TTL) + cacheService.set(getMessageKey(messageId), newMessage, SESSION_TTL) logger.debug('Added block to session', { messageId, blockId: block.id, blockType: block.type }) } @@ -329,10 +334,15 @@ class StreamingService { // Merge changes - use type assertion since we're updating the same block type const updatedBlock = { ...existingBlock, ...changes } as MessageBlock - session.blocks[blockId] = updatedBlock - // Update caches - cacheService.set(getSessionKey(messageId), session, SESSION_TTL) + // Create new session with updated block (immutable update for cache notification) + const newSession: StreamingSession = { + ...session, + blocks: { ...session.blocks, [blockId]: updatedBlock } + } + + // Update caches with new references to trigger notifications + cacheService.set(getSessionKey(messageId), newSession, SESSION_TTL) cacheService.set(getBlockKey(blockId), updatedBlock, SESSION_TTL) } @@ -362,12 +372,18 @@ class StreamingService { return } - // Merge updates - session.message = { ...session.message, ...updates } + // Create new message with updates (immutable update for cache notification) + const newMessage = { ...session.message, ...updates } - // Update caches - cacheService.set(getSessionKey(messageId), session, SESSION_TTL) - cacheService.set(getMessageKey(messageId), session.message, SESSION_TTL) + // Create new session with updated message (immutable update for cache notification) + const newSession: StreamingSession = { + ...session, + message: newMessage + } + + // Update caches with new references to trigger notifications + cacheService.set(getSessionKey(messageId), newSession, SESSION_TTL) + cacheService.set(getMessageKey(messageId), newMessage, SESSION_TTL) } /**