From d50149dccb3706dfc27b4e11a39cb04f362ebcf4 Mon Sep 17 00:00:00 2001 From: fullex <0xfullex@gmail.com> Date: Mon, 5 Jan 2026 13:33:41 +0800 Subject: [PATCH] refactor(cache): migrate StreamingService to schema-defined cache keys MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add 5 template keys in cacheSchemas.ts for streaming service: - message.streaming.session.${messageId} - message.streaming.topic_sessions.${topicId} - message.streaming.content.${messageId} - message.streaming.block.${blockId} - message.streaming.siblings_counter.${topicId} - Replace xxxCasual methods with type-safe get/set/has/delete - Update key naming to follow dot-separated convention - Use `any` types temporarily (TODO for v2 type migration) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- packages/shared/data/cache/cacheSchemas.ts | 24 +++++- .../messageStreaming/StreamingService.ts | 73 ++++++++++--------- 2 files changed, 60 insertions(+), 37 deletions(-) diff --git a/packages/shared/data/cache/cacheSchemas.ts b/packages/shared/data/cache/cacheSchemas.ts index 892009ea0a..dc182a9c7e 100644 --- a/packages/shared/data/cache/cacheSchemas.ts +++ b/packages/shared/data/cache/cacheSchemas.ts @@ -142,6 +142,21 @@ export type UseCacheSchema = { // Template key examples (for testing and demonstration) 'scroll.position.${topicId}': number 'entity.cache.${type}_${id}': { loaded: boolean; data: unknown } + + // ============================================================================ + // Message Streaming Cache (Temporary) + // ============================================================================ + // TODO [v2]: Replace `any` with proper types after newMessage.ts types are + // migrated to packages/shared/data/types/message.ts + // Current types: + // - StreamingSession: defined locally in StreamingService.ts + // - Message: src/renderer/src/types/newMessage.ts (renderer format, not shared/Message) + // - MessageBlock: src/renderer/src/types/newMessage.ts + 'message.streaming.session.${messageId}': any // StreamingSession + 'message.streaming.topic_sessions.${topicId}': string[] + 'message.streaming.content.${messageId}': any // Message (renderer format) + 'message.streaming.block.${blockId}': any // MessageBlock + 'message.streaming.siblings_counter.${topicId}': number } export const DefaultUseCache: UseCacheSchema = { @@ -184,7 +199,14 @@ export const DefaultUseCache: UseCacheSchema = { // Template key examples (for testing and demonstration) 'scroll.position.${topicId}': 0, - 'entity.cache.${type}_${id}': { loaded: false, data: null } + 'entity.cache.${type}_${id}': { loaded: false, data: null }, + + // Message Streaming Cache + 'message.streaming.session.${messageId}': null, + 'message.streaming.topic_sessions.${topicId}': [], + 'message.streaming.content.${messageId}': null, + 'message.streaming.block.${blockId}': null, + 'message.streaming.siblings_counter.${topicId}': 0 } /** diff --git a/src/renderer/src/services/messageStreaming/StreamingService.ts b/src/renderer/src/services/messageStreaming/StreamingService.ts index 1dc31fee35..8707b18a0c 100644 --- a/src/renderer/src/services/messageStreaming/StreamingService.ts +++ b/src/renderer/src/services/messageStreaming/StreamingService.ts @@ -11,11 +11,12 @@ * - On finalize, data is converted to new format and persisted via appropriate data source * - Throttling is handled externally by messageThunk.ts (preserves existing throttle logic) * - * Cache Key Strategy: - * - Session key: `streaming:session:${messageId}` - Internal session lifecycle management - * - Topic sessions index: `streaming:topic:${topicId}:sessions` - Track active sessions per topic - * - Message key: `streaming:message:${messageId}` - UI subscription for message-level changes - * - Block key: `streaming:block:${blockId}` - UI subscription for block content updates + * Cache Key Strategy (uses schema-defined template keys from cacheSchemas.ts): + * - Session key: `message.streaming.session.${messageId}` - Internal session lifecycle management + * - Topic sessions index: `message.streaming.topic_sessions.${topicId}` - Track active sessions per topic + * - Message key: `message.streaming.content.${messageId}` - UI subscription for message-level changes + * - Block key: `message.streaming.block.${blockId}` - UI subscription for block content updates + * - Siblings counter: `message.streaming.siblings_counter.${topicId}` - Multi-model response group counter */ import { cacheService } from '@data/CacheService' @@ -32,12 +33,12 @@ import { dbService } from '../db' const logger = loggerService.withContext('StreamingService') -// Cache key generators -const getSessionKey = (messageId: string) => `streaming:session:${messageId}` -const getTopicSessionsKey = (topicId: string) => `streaming:topic:${topicId}:sessions` -const getMessageKey = (messageId: string) => `streaming:message:${messageId}` -const getBlockKey = (blockId: string) => `streaming:block:${blockId}` -const getSiblingsGroupCounterKey = (topicId: string) => `streaming:topic:${topicId}:siblings-counter` +// Cache key generators (matches template keys in cacheSchemas.ts) +const getSessionKey = (messageId: string) => `message.streaming.session.${messageId}` as const +const getTopicSessionsKey = (topicId: string) => `message.streaming.topic_sessions.${topicId}` as const +const getMessageKey = (messageId: string) => `message.streaming.content.${messageId}` as const +const getBlockKey = (blockId: string) => `message.streaming.block.${blockId}` as const +const getSiblingsGroupCounterKey = (topicId: string) => `message.streaming.siblings_counter.${topicId}` as const // Session TTL for auto-cleanup (prevents memory leaks from crashed processes) const SESSION_TTL = 5 * 60 * 1000 // 5 minutes @@ -166,16 +167,16 @@ class StreamingService { } // Store session with TTL - cacheService.setCasual(getSessionKey(messageId), session, SESSION_TTL) + cacheService.set(getSessionKey(messageId), session, SESSION_TTL) // Store message data for UI subscription - cacheService.setCasual(getMessageKey(messageId), message, SESSION_TTL) + cacheService.set(getMessageKey(messageId), message, SESSION_TTL) // Add to topic sessions index - const topicSessions = cacheService.getCasual(getTopicSessionsKey(topicId)) || [] + const topicSessions = cacheService.get(getTopicSessionsKey(topicId)) || [] if (!topicSessions.includes(messageId)) { topicSessions.push(messageId) - cacheService.setCasual(getTopicSessionsKey(topicId), topicSessions, SESSION_TTL) + cacheService.set(getTopicSessionsKey(topicId), topicSessions, SESSION_TTL) } logger.debug('Started streaming session', { topicId, messageId, parentId, siblingsGroupId }) @@ -241,23 +242,23 @@ class StreamingService { // Remove block mappings Object.keys(session.blocks).forEach((blockId) => { this.blockToMessageMap.delete(blockId) - cacheService.deleteCasual(getBlockKey(blockId)) + cacheService.delete(getBlockKey(blockId)) }) // Remove message cache - cacheService.deleteCasual(getMessageKey(messageId)) + cacheService.delete(getMessageKey(messageId)) // Remove from topic sessions index - const topicSessions = cacheService.getCasual(getTopicSessionsKey(session.topicId)) || [] + const topicSessions = cacheService.get(getTopicSessionsKey(session.topicId)) || [] const updatedTopicSessions = topicSessions.filter((id) => id !== messageId) if (updatedTopicSessions.length > 0) { - cacheService.setCasual(getTopicSessionsKey(session.topicId), updatedTopicSessions, SESSION_TTL) + cacheService.set(getTopicSessionsKey(session.topicId), updatedTopicSessions, SESSION_TTL) } else { - cacheService.deleteCasual(getTopicSessionsKey(session.topicId)) + cacheService.delete(getTopicSessionsKey(session.topicId)) } // Remove session - cacheService.deleteCasual(getSessionKey(messageId)) + cacheService.delete(getSessionKey(messageId)) logger.debug('Cleared streaming session', { messageId, topicId: session.topicId }) } @@ -290,9 +291,9 @@ class StreamingService { } // Update caches - cacheService.setCasual(getSessionKey(messageId), session, SESSION_TTL) - cacheService.setCasual(getBlockKey(block.id), block, SESSION_TTL) - cacheService.setCasual(getMessageKey(messageId), session.message, SESSION_TTL) + cacheService.set(getSessionKey(messageId), session, SESSION_TTL) + cacheService.set(getBlockKey(block.id), block, SESSION_TTL) + cacheService.set(getMessageKey(messageId), session.message, SESSION_TTL) logger.debug('Added block to session', { messageId, blockId: block.id, blockType: block.type }) } @@ -331,8 +332,8 @@ class StreamingService { session.blocks[blockId] = updatedBlock // Update caches - cacheService.setCasual(getSessionKey(messageId), session, SESSION_TTL) - cacheService.setCasual(getBlockKey(blockId), updatedBlock, SESSION_TTL) + cacheService.set(getSessionKey(messageId), session, SESSION_TTL) + cacheService.set(getBlockKey(blockId), updatedBlock, SESSION_TTL) } /** @@ -342,7 +343,7 @@ class StreamingService { * @returns Block or null if not found */ getBlock(blockId: string): MessageBlock | null { - return cacheService.getCasual(getBlockKey(blockId)) || null + return cacheService.get(getBlockKey(blockId)) || null } // ============ Message Operations ============ @@ -365,8 +366,8 @@ class StreamingService { session.message = { ...session.message, ...updates } // Update caches - cacheService.setCasual(getSessionKey(messageId), session, SESSION_TTL) - cacheService.setCasual(getMessageKey(messageId), session.message, SESSION_TTL) + cacheService.set(getSessionKey(messageId), session, SESSION_TTL) + cacheService.set(getMessageKey(messageId), session.message, SESSION_TTL) } /** @@ -376,7 +377,7 @@ class StreamingService { * @returns Message or null if not found */ getMessage(messageId: string): Message | null { - return cacheService.getCasual(getMessageKey(messageId)) || null + return cacheService.get(getMessageKey(messageId)) || null } // ============ Query Methods ============ @@ -388,7 +389,7 @@ class StreamingService { * @returns True if streaming is active */ isStreaming(topicId: string): boolean { - const topicSessions = cacheService.getCasual(getTopicSessionsKey(topicId)) || [] + const topicSessions = cacheService.get(getTopicSessionsKey(topicId)) || [] return topicSessions.length > 0 } @@ -399,7 +400,7 @@ class StreamingService { * @returns True if message is streaming */ isMessageStreaming(messageId: string): boolean { - return cacheService.hasCasual(getSessionKey(messageId)) + return cacheService.has(getSessionKey(messageId)) } /** @@ -409,7 +410,7 @@ class StreamingService { * @returns Session or null if not found */ getSession(messageId: string): StreamingSession | null { - return cacheService.getCasual(getSessionKey(messageId)) || null + return cacheService.get(getSessionKey(messageId)) || null } /** @@ -419,7 +420,7 @@ class StreamingService { * @returns Array of message IDs */ getActiveMessageIds(topicId: string): string[] { - return cacheService.getCasual(getTopicSessionsKey(topicId)) || [] + return cacheService.get(getTopicSessionsKey(topicId)) || [] } // ============ siblingsGroupId Generation ============ @@ -445,10 +446,10 @@ class StreamingService { //FIXME [v2] 现在获取 siblingsGroupId 的方式是不正确,后续再做修改调整 generateNextGroupId(topicId: string): number { const counterKey = getSiblingsGroupCounterKey(topicId) - const currentCounter = cacheService.getCasual(counterKey) || 0 + const currentCounter = cacheService.get(counterKey) || 0 const nextGroupId = currentCounter + 1 // Store with no TTL (persistent within session, cleared on app restart) - cacheService.setCasual(counterKey, nextGroupId) + cacheService.set(counterKey, nextGroupId) logger.debug('Generated siblingsGroupId', { topicId, siblingsGroupId: nextGroupId }) return nextGroupId }