refactor: streamline block management and update StreamingService integration

- Consolidated block addition logic in BlockManager to enhance clarity and maintainability, ensuring that new blocks also update message references internally.
- Updated StreamingService to improve documentation and clarify the relationship between parentId and askId for backward compatibility.
- Removed redundant block reference handling, simplifying the message structure and improving performance during streaming sessions.
- Enhanced comments and documentation throughout to provide clearer guidance on the updated functionality and its implications for message processing.
This commit is contained in:
fullex 2026-01-04 10:47:42 +08:00
parent 37eac8c7fd
commit 61dddad22f
3 changed files with 38 additions and 45 deletions

View File

@ -130,8 +130,7 @@ export class BlockManager {
*
* This method:
* 1. Updates active block tracking state
* 2. Adds new block to StreamingService
* 3. Updates message block references
* 2. Adds new block to StreamingService (which also updates message.blocks references)
*
* NOTE: DB saves are removed - persistence happens during finalize()
*/
@ -140,12 +139,9 @@ export class BlockManager {
this._lastBlockType = newBlockType
this._activeBlockInfo = { id: newBlock.id, type: newBlockType } // Set new active block info
// Add new block to StreamingService (replaces dispatch(upsertOneBlock))
// Add new block to StreamingService (also updates message.blocks references internally)
streamingService.addBlock(this.deps.assistantMsgId, newBlock)
// Update block reference in message (replaces dispatch(upsertBlockReference))
streamingService.addBlockReference(this.deps.assistantMsgId, newBlock.id)
// TEMPORARY: The blockInstruction field was used for UI coordination.
// TODO: Evaluate if this is still needed with StreamingService approach
// For now, we update it in the message

View File

@ -55,7 +55,9 @@ interface StreamingSession {
// Tree structure information (v2 new fields)
parentId: string // Parent message ID (user message)
siblingsGroupId: number // Multi-model group ID (0=normal, >0=multi-model response)
// siblingsGroupId: 0 = single model response, >0 = multi-model response group
// Messages with the same parentId and siblingsGroupId (>0) are displayed together for comparison
siblingsGroupId: number
// Context for usage estimation (messages up to and including user message)
contextMessages?: Message[]
@ -66,15 +68,18 @@ interface StreamingSession {
/**
* Options for starting a streaming session
*
* NOTE: Internal naming uses v2 convention (parentId).
* The renderer Message format uses 'askId' for backward compatibility,
* which is set from parentId during message creation.
*/
interface StartSessionOptions {
parentId: string
siblingsGroupId?: number // Defaults to 0
siblingsGroupId?: number // Defaults to 0 (single model), >0 for multi-model response groups
role: 'assistant'
model?: Message['model']
modelId?: string
assistantId: string
askId?: string
traceId?: string
agentSessionId?: string
// Context messages for usage estimation (messages up to and including user message)
@ -126,13 +131,13 @@ class StreamingService {
model,
modelId,
assistantId,
askId,
traceId,
agentSessionId,
contextMessages
} = options
// Initialize message structure
// NOTE: askId is set from parentId for renderer format compatibility (v1 uses askId, v2 uses parentId)
const message: Message = {
id: messageId,
topicId,
@ -143,7 +148,7 @@ class StreamingService {
blocks: [],
model,
modelId,
askId,
askId: parentId, // Map v2 parentId to v1 renderer format askId
traceId,
agentSessionId
}
@ -181,9 +186,16 @@ class StreamingService {
*
* This method:
* 1. Converts streaming data to the appropriate format
* 2. Routes to Data API (normal topics) or dbService (agent topics)
* 2. Routes to the appropriate data source based on topic type
* 3. Cleans up all related cache keys
*
* ## Persistence Paths
*
* - **Normal topics** Data API (target architecture for v2)
* - **Agent sessions** dbService (TEMPORARY: This is a transitional approach.
* Agent message storage will be migrated to Data API in a later phase.
* Once migration is complete, all paths will use Data API uniformly.)
*
* @param messageId - Session message ID
* @param status - Final message status
*/
@ -201,13 +213,12 @@ class StreamingService {
try {
const updatePayload = this.convertToUpdatePayload(session, status)
// TRADEOFF: Using dbService for agent messages instead of Data API
// because agent message storage refactoring is planned for later phase.
// TODO: Unify to Data API when agent message migration is complete.
// Route to appropriate data source based on topic type
// TEMPORARY: Agent sessions use dbService until migration to Data API is complete
if (isAgentSessionTopicId(session.topicId)) {
await dbService.updateMessageAndBlocks(session.topicId, updatePayload.messageUpdates, updatePayload.blocks)
} else {
// Normal topic → Use Data API for persistence
// Normal topic → Use Data API for persistence (v2 target architecture)
const dataApiPayload = this.convertToDataApiFormat(session, status)
await dataApiService.patch(`/messages/${session.messageId}`, { body: dataApiPayload })
}
@ -376,30 +387,6 @@ class StreamingService {
cacheService.setCasual(getMessageKey(messageId), session.message, SESSION_TTL)
}
/**
* Add a block reference to the message
* (Replaces dispatch(newMessagesActions.upsertBlockReference))
*
* Note: In the streaming context, we just need to track the block ID in message.blocks
* The block reference details are maintained in the block itself
*
* @param messageId - Message ID
* @param blockId - Block ID to reference
*/
addBlockReference(messageId: string, blockId: string): void {
const session = this.getSession(messageId)
if (!session) {
logger.warn(`addBlockReference called for non-existent session: ${messageId}`)
return
}
if (!session.message.blocks.includes(blockId)) {
session.message.blocks = [...session.message.blocks, blockId]
cacheService.setCasual(getSessionKey(messageId), session, SESSION_TTL)
cacheService.setCasual(getMessageKey(messageId), session.message, SESSION_TTL)
}
}
/**
* Get a message from the streaming session
*
@ -458,14 +445,20 @@ class StreamingService {
/**
* Generate the next siblingsGroupId for a topic.
*
* Used for multi-model responses where multiple assistant messages
* ## siblingsGroupId Semantics
*
* - **0** = Single-model response (one assistant message per user message)
* - **>0** = Multi-model response group (multiple assistant messages sharing
* the same parentId belong to the same sibling group for parallel comparison)
*
* This method is used for multi-model responses where multiple assistant messages
* share the same parentId and siblingsGroupId (>0).
*
* The counter is stored in CacheService and auto-increments.
* The counter is stored in CacheService and auto-increments per topic.
* Single-model responses should use siblingsGroupId=0 (not generated here).
*
* @param topicId - Topic ID
* @returns Next siblingsGroupId (always > 0)
* @returns Next siblingsGroupId (always > 0 for multi-model groups)
*/
//FIXME [v2] 现在获取 siblingsGroupId 的方式是不正确,后续再做修改调整
generateNextGroupId(topicId: string): number {
@ -564,6 +557,9 @@ class StreamingService {
*
* For newly created pending messages, blocks are empty.
*
* NOTE: Field mapping for backward compatibility:
* - shared.parentId (v2 Data API) askId (v1 renderer format)
*
* @param shared - Message from Data API response
* @param assistantId - Assistant ID to include
* @param model - Optional Model object to include
@ -578,6 +574,7 @@ class StreamingService {
status: shared.status as AssistantMessageStatus,
blocks: [], // For new pending messages, blocks are empty
createdAt: shared.createdAt,
// v2 Data API uses 'parentId'; renderer format uses 'askId' for backward compatibility
askId: shared.parentId ?? undefined,
modelId: shared.modelId ?? undefined,
traceId: shared.traceId ?? undefined,

View File

@ -567,6 +567,7 @@ const fetchAndProcessAgentResponseImpl = async (
dispatch(newMessagesActions.setTopicLoading({ topicId, loading: true }))
// Initialize streaming session in StreamingService
// NOTE: parentId is used internally; askId in renderer format is derived from parentId
streamingService.startSession(topicId, assistantMessage.id, {
parentId: userMessageId,
siblingsGroupId: 0,
@ -574,7 +575,6 @@ const fetchAndProcessAgentResponseImpl = async (
model: assistant.model,
modelId: assistant.model?.id,
assistantId: assistant.id,
askId: userMessageId,
traceId: assistantMessage.traceId,
agentSessionId: agentSession.agentSessionId
})
@ -840,6 +840,7 @@ const fetchAndProcessAssistantResponseImpl = async (
}
// Initialize streaming session in StreamingService (includes context for usage estimation)
// NOTE: parentId is used internally; askId in renderer format is derived from parentId
streamingService.startSession(topicId, assistantMsgId, {
parentId: userMessageId!,
siblingsGroupId,
@ -847,7 +848,6 @@ const fetchAndProcessAssistantResponseImpl = async (
model: assistant.model,
modelId: assistant.model?.id,
assistantId: assistant.id,
askId: userMessageId,
traceId: assistantMessage.traceId,
contextMessages: messagesForContext
})