refactor(StreamingService): implement immutable updates for session and message handling

- Updated the StreamingService to create new immutable instances of sessions and messages when adding blocks or applying updates. This change enhances cache notification mechanisms by ensuring that references are updated correctly.
- Improved the handling of block additions and message updates to maintain immutability, which aids in better state management and performance.
This commit is contained in:
fullex 2026-01-09 13:24:17 +08:00
parent c1e0de1a9a
commit 8efb738753

View File

@ -282,18 +282,23 @@ class StreamingService {
// Register block mapping
this.blockToMessageMap.set(block.id, messageId)
// Add to session
session.blocks[block.id] = block
// Update message block references
if (!session.message.blocks.includes(block.id)) {
session.message.blocks = [...session.message.blocks, block.id]
// Create new message with updated blocks (immutable update for cache notification)
const newMessage: Message = {
...session.message,
blocks: session.message.blocks.includes(block.id) ? session.message.blocks : [...session.message.blocks, block.id]
}
// Update caches
cacheService.set(getSessionKey(messageId), session, SESSION_TTL)
// Create new session with updated blocks and message (immutable update for cache notification)
const newSession: StreamingSession = {
...session,
blocks: { ...session.blocks, [block.id]: block },
message: newMessage
}
// Update caches with new references to trigger notifications
cacheService.set(getSessionKey(messageId), newSession, SESSION_TTL)
cacheService.set(getBlockKey(block.id), block, SESSION_TTL)
cacheService.set(getMessageKey(messageId), session.message, SESSION_TTL)
cacheService.set(getMessageKey(messageId), newMessage, SESSION_TTL)
logger.debug('Added block to session', { messageId, blockId: block.id, blockType: block.type })
}
@ -329,10 +334,15 @@ class StreamingService {
// Merge changes - use type assertion since we're updating the same block type
const updatedBlock = { ...existingBlock, ...changes } as MessageBlock
session.blocks[blockId] = updatedBlock
// Update caches
cacheService.set(getSessionKey(messageId), session, SESSION_TTL)
// Create new session with updated block (immutable update for cache notification)
const newSession: StreamingSession = {
...session,
blocks: { ...session.blocks, [blockId]: updatedBlock }
}
// Update caches with new references to trigger notifications
cacheService.set(getSessionKey(messageId), newSession, SESSION_TTL)
cacheService.set(getBlockKey(blockId), updatedBlock, SESSION_TTL)
}
@ -362,12 +372,18 @@ class StreamingService {
return
}
// Merge updates
session.message = { ...session.message, ...updates }
// Create new message with updates (immutable update for cache notification)
const newMessage = { ...session.message, ...updates }
// Update caches
cacheService.set(getSessionKey(messageId), session, SESSION_TTL)
cacheService.set(getMessageKey(messageId), session.message, SESSION_TTL)
// Create new session with updated message (immutable update for cache notification)
const newSession: StreamingSession = {
...session,
message: newMessage
}
// Update caches with new references to trigger notifications
cacheService.set(getSessionKey(messageId), newSession, SESSION_TTL)
cacheService.set(getMessageKey(messageId), newMessage, SESSION_TTL)
}
/**