refactor(cache): migrate StreamingService to schema-defined cache keys

- Add 5 template keys in cacheSchemas.ts for streaming service:
  - message.streaming.session.${messageId}
  - message.streaming.topic_sessions.${topicId}
  - message.streaming.content.${messageId}
  - message.streaming.block.${blockId}
  - message.streaming.siblings_counter.${topicId}
- Replace xxxCasual methods with type-safe get/set/has/delete
- Update key naming to follow dot-separated convention
- Use `any` types temporarily (TODO for v2 type migration)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
fullex 2026-01-05 13:33:41 +08:00
parent 2093452e69
commit d50149dccb
2 changed files with 60 additions and 37 deletions

View File

@ -142,6 +142,21 @@ export type UseCacheSchema = {
// Template key examples (for testing and demonstration)
'scroll.position.${topicId}': number
'entity.cache.${type}_${id}': { loaded: boolean; data: unknown }
// ============================================================================
// Message Streaming Cache (Temporary)
// ============================================================================
// TODO [v2]: Replace `any` with proper types after newMessage.ts types are
// migrated to packages/shared/data/types/message.ts
// Current types:
// - StreamingSession: defined locally in StreamingService.ts
// - Message: src/renderer/src/types/newMessage.ts (renderer format, not shared/Message)
// - MessageBlock: src/renderer/src/types/newMessage.ts
'message.streaming.session.${messageId}': any // StreamingSession
'message.streaming.topic_sessions.${topicId}': string[]
'message.streaming.content.${messageId}': any // Message (renderer format)
'message.streaming.block.${blockId}': any // MessageBlock
'message.streaming.siblings_counter.${topicId}': number
}
export const DefaultUseCache: UseCacheSchema = {
@ -184,7 +199,14 @@ export const DefaultUseCache: UseCacheSchema = {
// Template key examples (for testing and demonstration)
'scroll.position.${topicId}': 0,
'entity.cache.${type}_${id}': { loaded: false, data: null }
'entity.cache.${type}_${id}': { loaded: false, data: null },
// Message Streaming Cache
'message.streaming.session.${messageId}': null,
'message.streaming.topic_sessions.${topicId}': [],
'message.streaming.content.${messageId}': null,
'message.streaming.block.${blockId}': null,
'message.streaming.siblings_counter.${topicId}': 0
}
/**

View File

@ -11,11 +11,12 @@
* - On finalize, data is converted to new format and persisted via appropriate data source
* - Throttling is handled externally by messageThunk.ts (preserves existing throttle logic)
*
* Cache Key Strategy:
* - Session key: `streaming:session:${messageId}` - Internal session lifecycle management
* - Topic sessions index: `streaming:topic:${topicId}:sessions` - Track active sessions per topic
* - Message key: `streaming:message:${messageId}` - UI subscription for message-level changes
* - Block key: `streaming:block:${blockId}` - UI subscription for block content updates
* Cache Key Strategy (uses schema-defined template keys from cacheSchemas.ts):
* - Session key: `message.streaming.session.${messageId}` - Internal session lifecycle management
* - Topic sessions index: `message.streaming.topic_sessions.${topicId}` - Track active sessions per topic
* - Message key: `message.streaming.content.${messageId}` - UI subscription for message-level changes
* - Block key: `message.streaming.block.${blockId}` - UI subscription for block content updates
* - Siblings counter: `message.streaming.siblings_counter.${topicId}` - Multi-model response group counter
*/
import { cacheService } from '@data/CacheService'
@ -32,12 +33,12 @@ import { dbService } from '../db'
const logger = loggerService.withContext('StreamingService')
// Cache key generators
const getSessionKey = (messageId: string) => `streaming:session:${messageId}`
const getTopicSessionsKey = (topicId: string) => `streaming:topic:${topicId}:sessions`
const getMessageKey = (messageId: string) => `streaming:message:${messageId}`
const getBlockKey = (blockId: string) => `streaming:block:${blockId}`
const getSiblingsGroupCounterKey = (topicId: string) => `streaming:topic:${topicId}:siblings-counter`
// Cache key generators (matches template keys in cacheSchemas.ts)
const getSessionKey = (messageId: string) => `message.streaming.session.${messageId}` as const
const getTopicSessionsKey = (topicId: string) => `message.streaming.topic_sessions.${topicId}` as const
const getMessageKey = (messageId: string) => `message.streaming.content.${messageId}` as const
const getBlockKey = (blockId: string) => `message.streaming.block.${blockId}` as const
const getSiblingsGroupCounterKey = (topicId: string) => `message.streaming.siblings_counter.${topicId}` as const
// Session TTL for auto-cleanup (prevents memory leaks from crashed processes)
const SESSION_TTL = 5 * 60 * 1000 // 5 minutes
@ -166,16 +167,16 @@ class StreamingService {
}
// Store session with TTL
cacheService.setCasual(getSessionKey(messageId), session, SESSION_TTL)
cacheService.set(getSessionKey(messageId), session, SESSION_TTL)
// Store message data for UI subscription
cacheService.setCasual(getMessageKey(messageId), message, SESSION_TTL)
cacheService.set(getMessageKey(messageId), message, SESSION_TTL)
// Add to topic sessions index
const topicSessions = cacheService.getCasual<string[]>(getTopicSessionsKey(topicId)) || []
const topicSessions = cacheService.get(getTopicSessionsKey(topicId)) || []
if (!topicSessions.includes(messageId)) {
topicSessions.push(messageId)
cacheService.setCasual(getTopicSessionsKey(topicId), topicSessions, SESSION_TTL)
cacheService.set(getTopicSessionsKey(topicId), topicSessions, SESSION_TTL)
}
logger.debug('Started streaming session', { topicId, messageId, parentId, siblingsGroupId })
@ -241,23 +242,23 @@ class StreamingService {
// Remove block mappings
Object.keys(session.blocks).forEach((blockId) => {
this.blockToMessageMap.delete(blockId)
cacheService.deleteCasual(getBlockKey(blockId))
cacheService.delete(getBlockKey(blockId))
})
// Remove message cache
cacheService.deleteCasual(getMessageKey(messageId))
cacheService.delete(getMessageKey(messageId))
// Remove from topic sessions index
const topicSessions = cacheService.getCasual<string[]>(getTopicSessionsKey(session.topicId)) || []
const topicSessions = cacheService.get(getTopicSessionsKey(session.topicId)) || []
const updatedTopicSessions = topicSessions.filter((id) => id !== messageId)
if (updatedTopicSessions.length > 0) {
cacheService.setCasual(getTopicSessionsKey(session.topicId), updatedTopicSessions, SESSION_TTL)
cacheService.set(getTopicSessionsKey(session.topicId), updatedTopicSessions, SESSION_TTL)
} else {
cacheService.deleteCasual(getTopicSessionsKey(session.topicId))
cacheService.delete(getTopicSessionsKey(session.topicId))
}
// Remove session
cacheService.deleteCasual(getSessionKey(messageId))
cacheService.delete(getSessionKey(messageId))
logger.debug('Cleared streaming session', { messageId, topicId: session.topicId })
}
@ -290,9 +291,9 @@ class StreamingService {
}
// Update caches
cacheService.setCasual(getSessionKey(messageId), session, SESSION_TTL)
cacheService.setCasual(getBlockKey(block.id), block, SESSION_TTL)
cacheService.setCasual(getMessageKey(messageId), session.message, SESSION_TTL)
cacheService.set(getSessionKey(messageId), session, SESSION_TTL)
cacheService.set(getBlockKey(block.id), block, SESSION_TTL)
cacheService.set(getMessageKey(messageId), session.message, SESSION_TTL)
logger.debug('Added block to session', { messageId, blockId: block.id, blockType: block.type })
}
@ -331,8 +332,8 @@ class StreamingService {
session.blocks[blockId] = updatedBlock
// Update caches
cacheService.setCasual(getSessionKey(messageId), session, SESSION_TTL)
cacheService.setCasual(getBlockKey(blockId), updatedBlock, SESSION_TTL)
cacheService.set(getSessionKey(messageId), session, SESSION_TTL)
cacheService.set(getBlockKey(blockId), updatedBlock, SESSION_TTL)
}
/**
@ -342,7 +343,7 @@ class StreamingService {
* @returns Block or null if not found
*/
getBlock(blockId: string): MessageBlock | null {
return cacheService.getCasual<MessageBlock>(getBlockKey(blockId)) || null
return cacheService.get(getBlockKey(blockId)) || null
}
// ============ Message Operations ============
@ -365,8 +366,8 @@ class StreamingService {
session.message = { ...session.message, ...updates }
// Update caches
cacheService.setCasual(getSessionKey(messageId), session, SESSION_TTL)
cacheService.setCasual(getMessageKey(messageId), session.message, SESSION_TTL)
cacheService.set(getSessionKey(messageId), session, SESSION_TTL)
cacheService.set(getMessageKey(messageId), session.message, SESSION_TTL)
}
/**
@ -376,7 +377,7 @@ class StreamingService {
* @returns Message or null if not found
*/
getMessage(messageId: string): Message | null {
return cacheService.getCasual<Message>(getMessageKey(messageId)) || null
return cacheService.get(getMessageKey(messageId)) || null
}
// ============ Query Methods ============
@ -388,7 +389,7 @@ class StreamingService {
* @returns True if streaming is active
*/
isStreaming(topicId: string): boolean {
const topicSessions = cacheService.getCasual<string[]>(getTopicSessionsKey(topicId)) || []
const topicSessions = cacheService.get(getTopicSessionsKey(topicId)) || []
return topicSessions.length > 0
}
@ -399,7 +400,7 @@ class StreamingService {
* @returns True if message is streaming
*/
isMessageStreaming(messageId: string): boolean {
return cacheService.hasCasual(getSessionKey(messageId))
return cacheService.has(getSessionKey(messageId))
}
/**
@ -409,7 +410,7 @@ class StreamingService {
* @returns Session or null if not found
*/
getSession(messageId: string): StreamingSession | null {
return cacheService.getCasual<StreamingSession>(getSessionKey(messageId)) || null
return cacheService.get(getSessionKey(messageId)) || null
}
/**
@ -419,7 +420,7 @@ class StreamingService {
* @returns Array of message IDs
*/
getActiveMessageIds(topicId: string): string[] {
return cacheService.getCasual<string[]>(getTopicSessionsKey(topicId)) || []
return cacheService.get(getTopicSessionsKey(topicId)) || []
}
// ============ siblingsGroupId Generation ============
@ -445,10 +446,10 @@ class StreamingService {
//FIXME [v2] 现在获取 siblingsGroupId 的方式是不正确,后续再做修改调整
generateNextGroupId(topicId: string): number {
const counterKey = getSiblingsGroupCounterKey(topicId)
const currentCounter = cacheService.getCasual<number>(counterKey) || 0
const currentCounter = cacheService.get(counterKey) || 0
const nextGroupId = currentCounter + 1
// Store with no TTL (persistent within session, cleared on app restart)
cacheService.setCasual(counterKey, nextGroupId)
cacheService.set(counterKey, nextGroupId)
logger.debug('Generated siblingsGroupId', { topicId, siblingsGroupId: nextGroupId })
return nextGroupId
}