diff --git a/src/renderer/src/services/messageStreaming/StreamingService.ts b/src/renderer/src/services/messageStreaming/StreamingService.ts index 8707b18a0c..674e5fbace 100644 --- a/src/renderer/src/services/messageStreaming/StreamingService.ts +++ b/src/renderer/src/services/messageStreaming/StreamingService.ts @@ -282,18 +282,23 @@ class StreamingService { // Register block mapping this.blockToMessageMap.set(block.id, messageId) - // Add to session - session.blocks[block.id] = block - - // Update message block references - if (!session.message.blocks.includes(block.id)) { - session.message.blocks = [...session.message.blocks, block.id] + // Create new message with updated blocks (immutable update for cache notification) + const newMessage: Message = { + ...session.message, + blocks: session.message.blocks.includes(block.id) ? session.message.blocks : [...session.message.blocks, block.id] } - // Update caches - cacheService.set(getSessionKey(messageId), session, SESSION_TTL) + // Create new session with updated blocks and message (immutable update for cache notification) + const newSession: StreamingSession = { + ...session, + blocks: { ...session.blocks, [block.id]: block }, + message: newMessage + } + + // Update caches with new references to trigger notifications + cacheService.set(getSessionKey(messageId), newSession, SESSION_TTL) cacheService.set(getBlockKey(block.id), block, SESSION_TTL) - cacheService.set(getMessageKey(messageId), session.message, SESSION_TTL) + cacheService.set(getMessageKey(messageId), newMessage, SESSION_TTL) logger.debug('Added block to session', { messageId, blockId: block.id, blockType: block.type }) } @@ -329,10 +334,15 @@ class StreamingService { // Merge changes - use type assertion since we're updating the same block type const updatedBlock = { ...existingBlock, ...changes } as MessageBlock - session.blocks[blockId] = updatedBlock - // Update caches - cacheService.set(getSessionKey(messageId), session, SESSION_TTL) + // Create new session with updated block (immutable update for cache notification) + const newSession: StreamingSession = { + ...session, + blocks: { ...session.blocks, [blockId]: updatedBlock } + } + + // Update caches with new references to trigger notifications + cacheService.set(getSessionKey(messageId), newSession, SESSION_TTL) cacheService.set(getBlockKey(blockId), updatedBlock, SESSION_TTL) } @@ -362,12 +372,18 @@ class StreamingService { return } - // Merge updates - session.message = { ...session.message, ...updates } + // Create new message with updates (immutable update for cache notification) + const newMessage = { ...session.message, ...updates } - // Update caches - cacheService.set(getSessionKey(messageId), session, SESSION_TTL) - cacheService.set(getMessageKey(messageId), session.message, SESSION_TTL) + // Create new session with updated message (immutable update for cache notification) + const newSession: StreamingSession = { + ...session, + message: newMessage + } + + // Update caches with new references to trigger notifications + cacheService.set(getSessionKey(messageId), newSession, SESSION_TTL) + cacheService.set(getMessageKey(messageId), newMessage, SESSION_TTL) } /**