refactor(StreamingService): rename session to task and update related logic

- Refactored the StreamingService to replace all instances of "session" with "task" to better reflect the functionality and improve clarity.
- Updated related types, methods, and cache handling to align with the new terminology, ensuring consistent usage throughout the codebase.
- Enhanced comments and documentation to reflect the changes in terminology and functionality.
This commit is contained in:
fullex 2026-01-09 16:15:01 +08:00
parent 8efb738753
commit e80c6b06ba
6 changed files with 142 additions and 142 deletions

View File

@ -149,11 +149,11 @@ export type UseCacheSchema = {
// TODO [v2]: Replace `any` with proper types after newMessage.ts types are
// migrated to packages/shared/data/types/message.ts
// Current types:
// - StreamingSession: defined locally in StreamingService.ts
// - StreamingTask: defined locally in StreamingService.ts
// - Message: src/renderer/src/types/newMessage.ts (renderer format, not shared/Message)
// - MessageBlock: src/renderer/src/types/newMessage.ts
'message.streaming.session.${messageId}': any // StreamingSession
'message.streaming.topic_sessions.${topicId}': string[]
'message.streaming.task.${messageId}': any // StreamingTask
'message.streaming.topic_tasks.${topicId}': string[]
'message.streaming.content.${messageId}': any // Message (renderer format)
'message.streaming.block.${blockId}': any // MessageBlock
'message.streaming.siblings_counter.${topicId}': number
@ -202,8 +202,8 @@ export const DefaultUseCache: UseCacheSchema = {
'entity.cache.${type}_${id}': { loaded: false, data: null },
// Message Streaming Cache
'message.streaming.session.${messageId}': null,
'message.streaming.topic_sessions.${topicId}': [],
'message.streaming.task.${messageId}': null,
'message.streaming.topic_tasks.${topicId}': [],
'message.streaming.content.${messageId}': null,
'message.streaming.block.${blockId}': null,
'message.streaming.siblings_counter.${topicId}': 0

View File

@ -6,14 +6,14 @@
* and persists final data to the database via Data API or dbService.
*
* Key Design Decisions:
* - Uses messageId as primary key for sessions (supports multi-model concurrent streaming)
* - Uses messageId as primary key for tasks (supports multi-model concurrent streaming)
* - Streaming data is stored in memory only (not Redux, not Dexie during streaming)
* - On finalize, data is converted to new format and persisted via appropriate data source
* - Throttling is handled externally by messageThunk.ts (preserves existing throttle logic)
*
* Cache Key Strategy (uses schema-defined template keys from cacheSchemas.ts):
* - Session key: `message.streaming.session.${messageId}` - Internal session lifecycle management
* - Topic sessions index: `message.streaming.topic_sessions.${topicId}` - Track active sessions per topic
* - Task key: `message.streaming.task.${messageId}` - Internal task lifecycle management
* - Topic tasks index: `message.streaming.topic_tasks.${topicId}` - Track active tasks per topic
* - Message key: `message.streaming.content.${messageId}` - UI subscription for message-level changes
* - Block key: `message.streaming.block.${blockId}` - UI subscription for block content updates
* - Siblings counter: `message.streaming.siblings_counter.${topicId}` - Multi-model response group counter
@ -34,19 +34,19 @@ import { dbService } from '../db'
const logger = loggerService.withContext('StreamingService')
// Cache key generators (matches template keys in cacheSchemas.ts)
const getSessionKey = (messageId: string) => `message.streaming.session.${messageId}` as const
const getTopicSessionsKey = (topicId: string) => `message.streaming.topic_sessions.${topicId}` as const
const getTaskKey = (messageId: string) => `message.streaming.task.${messageId}` as const
const getTopicTasksKey = (topicId: string) => `message.streaming.topic_tasks.${topicId}` as const
const getMessageKey = (messageId: string) => `message.streaming.content.${messageId}` as const
const getBlockKey = (blockId: string) => `message.streaming.block.${blockId}` as const
const getSiblingsGroupCounterKey = (topicId: string) => `message.streaming.siblings_counter.${topicId}` as const
// Session TTL for auto-cleanup (prevents memory leaks from crashed processes)
const SESSION_TTL = 5 * 60 * 1000 // 5 minutes
// Task TTL for auto-cleanup (prevents memory leaks from crashed processes)
const TASK_TTL = 5 * 60 * 1000 // 5 minutes
/**
* Streaming session data structure (stored in memory)
* Streaming task data structure (stored in memory)
*/
interface StreamingSession {
interface StreamingTask {
topicId: string
messageId: string
@ -68,13 +68,13 @@ interface StreamingSession {
}
/**
* Options for starting a streaming session
* Options for starting a streaming task
*
* NOTE: Internal naming uses v2 convention (parentId).
* The renderer Message format uses 'askId' for backward compatibility,
* which is set from parentId during message creation.
*/
interface StartSessionOptions {
interface StartTaskOptions {
parentId: string
siblingsGroupId?: number // Defaults to 0 (single model), >0 for multi-model response groups
role: 'assistant'
@ -103,7 +103,7 @@ interface CreateAssistantMessageOptions {
* StreamingService - Manages streaming message state during generation
*
* Responsibilities:
* - Session lifecycle management (start, update, finalize, clear)
* - Task lifecycle management (start, update, finalize, end)
* - Block operations (add, update, get)
* - Message operations (update, get)
* - Cache-based state management with automatic TTL cleanup
@ -112,19 +112,19 @@ class StreamingService {
// Internal mapping: blockId -> messageId (for efficient block updates)
private blockToMessageMap = new Map<string, string>()
// ============ Session Lifecycle ============
// ============ Task Lifecycle ============
/**
* Start a streaming session for a message
* Start a streaming task for a message
*
* IMPORTANT: The message must be created via Data API POST before calling this.
* This method initializes the in-memory streaming state.
*
* @param topicId - Topic ID (used for topic sessions index)
* @param topicId - Topic ID (used for topic tasks index)
* @param messageId - Message ID returned from Data API POST
* @param options - Session options including parentId and siblingsGroupId
* @param options - Task options including parentId and siblingsGroupId
*/
startSession(topicId: string, messageId: string, options: StartSessionOptions): void {
startTask(topicId: string, messageId: string, options: StartTaskOptions): void {
const {
parentId,
siblingsGroupId = 0,
@ -154,8 +154,8 @@ class StreamingService {
agentSessionId
}
// Create session
const session: StreamingSession = {
// Create task
const task: StreamingTask = {
topicId,
messageId,
message,
@ -166,24 +166,24 @@ class StreamingService {
startedAt: Date.now()
}
// Store session with TTL
cacheService.set(getSessionKey(messageId), session, SESSION_TTL)
// Store task with TTL
cacheService.set(getTaskKey(messageId), task, TASK_TTL)
// Store message data for UI subscription
cacheService.set(getMessageKey(messageId), message, SESSION_TTL)
cacheService.set(getMessageKey(messageId), message, TASK_TTL)
// Add to topic sessions index
const topicSessions = cacheService.get(getTopicSessionsKey(topicId)) || []
if (!topicSessions.includes(messageId)) {
topicSessions.push(messageId)
cacheService.set(getTopicSessionsKey(topicId), topicSessions, SESSION_TTL)
// Add to topic tasks index
const topicTasks = cacheService.get(getTopicTasksKey(topicId)) || []
if (!topicTasks.includes(messageId)) {
topicTasks.push(messageId)
cacheService.set(getTopicTasksKey(topicId), topicTasks, TASK_TTL)
}
logger.debug('Started streaming session', { topicId, messageId, parentId, siblingsGroupId })
logger.debug('Started streaming task', { topicId, messageId, parentId, siblingsGroupId })
}
/**
* Finalize a streaming session by persisting data to database
* Finalize a streaming task by persisting data to database
*
* This method:
* 1. Converts streaming data to the appropriate format
@ -197,50 +197,50 @@ class StreamingService {
* Agent message storage will be migrated to Data API in a later phase.
* Once migration is complete, all paths will use Data API uniformly.)
*
* @param messageId - Session message ID
* @param messageId - Task message ID
* @param status - Final message status
*/
async finalize(messageId: string, status: AssistantMessageStatus): Promise<void> {
const session = this.getSession(messageId)
if (!session) {
logger.warn(`finalize called for non-existent session: ${messageId}`)
const task = this.getTask(messageId)
if (!task) {
logger.warn(`finalize called for non-existent task: ${messageId}`)
return
}
try {
// Route to appropriate data source based on topic type
// TEMPORARY: Agent sessions use dbService until migration to Data API is complete
if (isAgentSessionTopicId(session.topicId)) {
const updatePayload = this.convertToUpdatePayload(session, status)
await dbService.updateMessageAndBlocks(session.topicId, updatePayload.messageUpdates, updatePayload.blocks)
if (isAgentSessionTopicId(task.topicId)) {
const updatePayload = this.convertToUpdatePayload(task, status)
await dbService.updateMessageAndBlocks(task.topicId, updatePayload.messageUpdates, updatePayload.blocks)
} else {
// Normal topic → Use Data API for persistence (has built-in retry)
const dataApiPayload = this.convertToDataApiFormat(session, status)
await dataApiService.patch(`/messages/${session.messageId}`, { body: dataApiPayload })
const dataApiPayload = this.convertToDataApiFormat(task, status)
await dataApiService.patch(`/messages/${task.messageId}`, { body: dataApiPayload })
}
this.clearSession(messageId)
logger.debug('Finalized streaming session', { messageId, status })
this.endTask(messageId)
logger.debug('Finalized streaming task', { messageId, status })
} catch (error) {
logger.error('finalize failed:', error as Error)
// Don't clear session on error - TTL will auto-clean to prevent memory leak
// Don't end task on error - TTL will auto-clean to prevent memory leak
throw error
}
}
/**
* Clear a streaming session and all related cache keys
* End a streaming task and clear all related cache keys
*
* @param messageId - Session message ID
* @param messageId - Task message ID
*/
clearSession(messageId: string): void {
const session = this.getSession(messageId)
if (!session) {
endTask(messageId: string): void {
const task = this.getTask(messageId)
if (!task) {
return
}
// Remove block mappings
Object.keys(session.blocks).forEach((blockId) => {
Object.keys(task.blocks).forEach((blockId) => {
this.blockToMessageMap.delete(blockId)
cacheService.delete(getBlockKey(blockId))
})
@ -248,34 +248,34 @@ class StreamingService {
// Remove message cache
cacheService.delete(getMessageKey(messageId))
// Remove from topic sessions index
const topicSessions = cacheService.get(getTopicSessionsKey(session.topicId)) || []
const updatedTopicSessions = topicSessions.filter((id) => id !== messageId)
if (updatedTopicSessions.length > 0) {
cacheService.set(getTopicSessionsKey(session.topicId), updatedTopicSessions, SESSION_TTL)
// Remove from topic tasks index
const topicTasks = cacheService.get(getTopicTasksKey(task.topicId)) || []
const updatedTopicTasks = topicTasks.filter((id: string) => id !== messageId)
if (updatedTopicTasks.length > 0) {
cacheService.set(getTopicTasksKey(task.topicId), updatedTopicTasks, TASK_TTL)
} else {
cacheService.delete(getTopicSessionsKey(session.topicId))
cacheService.delete(getTopicTasksKey(task.topicId))
}
// Remove session
cacheService.delete(getSessionKey(messageId))
// Remove task
cacheService.delete(getTaskKey(messageId))
logger.debug('Cleared streaming session', { messageId, topicId: session.topicId })
logger.debug('Ended streaming task', { messageId, topicId: task.topicId })
}
// ============ Block Operations ============
/**
* Add a new block to a streaming session
* Add a new block to a streaming task
* (Replaces dispatch(upsertOneBlock))
*
* @param messageId - Parent message ID
* @param block - Block to add
*/
addBlock(messageId: string, block: MessageBlock): void {
const session = this.getSession(messageId)
if (!session) {
logger.warn(`addBlock called for non-existent session: ${messageId}`)
const task = this.getTask(messageId)
if (!task) {
logger.warn(`addBlock called for non-existent task: ${messageId}`)
return
}
@ -284,27 +284,27 @@ class StreamingService {
// Create new message with updated blocks (immutable update for cache notification)
const newMessage: Message = {
...session.message,
blocks: session.message.blocks.includes(block.id) ? session.message.blocks : [...session.message.blocks, block.id]
...task.message,
blocks: task.message.blocks.includes(block.id) ? task.message.blocks : [...task.message.blocks, block.id]
}
// Create new session with updated blocks and message (immutable update for cache notification)
const newSession: StreamingSession = {
...session,
blocks: { ...session.blocks, [block.id]: block },
// Create new task with updated blocks and message (immutable update for cache notification)
const newTask: StreamingTask = {
...task,
blocks: { ...task.blocks, [block.id]: block },
message: newMessage
}
// Update caches with new references to trigger notifications
cacheService.set(getSessionKey(messageId), newSession, SESSION_TTL)
cacheService.set(getBlockKey(block.id), block, SESSION_TTL)
cacheService.set(getMessageKey(messageId), newMessage, SESSION_TTL)
cacheService.set(getTaskKey(messageId), newTask, TASK_TTL)
cacheService.set(getBlockKey(block.id), block, TASK_TTL)
cacheService.set(getMessageKey(messageId), newMessage, TASK_TTL)
logger.debug('Added block to session', { messageId, blockId: block.id, blockType: block.type })
logger.debug('Added block to task', { messageId, blockId: block.id, blockType: block.type })
}
/**
* Update a block in a streaming session
* Update a block in a streaming task
* (Replaces dispatch(updateOneBlock))
*
* NOTE: This method does NOT include throttling. Throttling is controlled
@ -320,34 +320,34 @@ class StreamingService {
return
}
const session = this.getSession(messageId)
if (!session) {
logger.warn(`updateBlock: Session not found for message ${messageId}`)
const task = this.getTask(messageId)
if (!task) {
logger.warn(`updateBlock: Task not found for message ${messageId}`)
return
}
const existingBlock = session.blocks[blockId]
const existingBlock = task.blocks[blockId]
if (!existingBlock) {
logger.warn(`updateBlock: Block ${blockId} not found in session`)
logger.warn(`updateBlock: Block ${blockId} not found in task`)
return
}
// Merge changes - use type assertion since we're updating the same block type
const updatedBlock = { ...existingBlock, ...changes } as MessageBlock
// Create new session with updated block (immutable update for cache notification)
const newSession: StreamingSession = {
...session,
blocks: { ...session.blocks, [blockId]: updatedBlock }
// Create new task with updated block (immutable update for cache notification)
const newTask: StreamingTask = {
...task,
blocks: { ...task.blocks, [blockId]: updatedBlock }
}
// Update caches with new references to trigger notifications
cacheService.set(getSessionKey(messageId), newSession, SESSION_TTL)
cacheService.set(getBlockKey(blockId), updatedBlock, SESSION_TTL)
cacheService.set(getTaskKey(messageId), newTask, TASK_TTL)
cacheService.set(getBlockKey(blockId), updatedBlock, TASK_TTL)
}
/**
* Get a block from the streaming session
* Get a block from the streaming task
*
* @param blockId - Block ID
* @returns Block or null if not found
@ -359,35 +359,35 @@ class StreamingService {
// ============ Message Operations ============
/**
* Update message properties in the streaming session
* Update message properties in the streaming task
* (Replaces dispatch(newMessagesActions.updateMessage))
*
* @param messageId - Message ID
* @param updates - Partial message updates
*/
updateMessage(messageId: string, updates: Partial<Message>): void {
const session = this.getSession(messageId)
if (!session) {
logger.warn(`updateMessage called for non-existent session: ${messageId}`)
const task = this.getTask(messageId)
if (!task) {
logger.warn(`updateMessage called for non-existent task: ${messageId}`)
return
}
// Create new message with updates (immutable update for cache notification)
const newMessage = { ...session.message, ...updates }
const newMessage = { ...task.message, ...updates }
// Create new session with updated message (immutable update for cache notification)
const newSession: StreamingSession = {
...session,
// Create new task with updated message (immutable update for cache notification)
const newTask: StreamingTask = {
...task,
message: newMessage
}
// Update caches with new references to trigger notifications
cacheService.set(getSessionKey(messageId), newSession, SESSION_TTL)
cacheService.set(getMessageKey(messageId), newMessage, SESSION_TTL)
cacheService.set(getTaskKey(messageId), newTask, TASK_TTL)
cacheService.set(getMessageKey(messageId), newMessage, TASK_TTL)
}
/**
* Get a message from the streaming session
* Get a message from the streaming task
*
* @param messageId - Message ID
* @returns Message or null if not found
@ -399,14 +399,14 @@ class StreamingService {
// ============ Query Methods ============
/**
* Check if a topic has any active streaming sessions
* Check if a topic has any active streaming tasks
*
* @param topicId - Topic ID
* @returns True if streaming is active
*/
isStreaming(topicId: string): boolean {
const topicSessions = cacheService.get(getTopicSessionsKey(topicId)) || []
return topicSessions.length > 0
const topicTasks = cacheService.get(getTopicTasksKey(topicId)) || []
return topicTasks.length > 0
}
/**
@ -416,17 +416,17 @@ class StreamingService {
* @returns True if message is streaming
*/
isMessageStreaming(messageId: string): boolean {
return cacheService.has(getSessionKey(messageId))
return cacheService.has(getTaskKey(messageId))
}
/**
* Get the streaming session for a message
* Get the streaming task for a message
*
* @param messageId - Message ID
* @returns Session or null if not found
* @returns Task or null if not found
*/
getSession(messageId: string): StreamingSession | null {
return cacheService.get(getSessionKey(messageId)) || null
getTask(messageId: string): StreamingTask | null {
return cacheService.get(getTaskKey(messageId)) || null
}
/**
@ -436,7 +436,7 @@ class StreamingService {
* @returns Array of message IDs
*/
getActiveMessageIds(topicId: string): string[] {
return cacheService.get(getTopicSessionsKey(topicId)) || []
return cacheService.get(getTopicTasksKey(topicId)) || []
}
// ============ siblingsGroupId Generation ============
@ -464,7 +464,7 @@ class StreamingService {
const counterKey = getSiblingsGroupCounterKey(topicId)
const currentCounter = cacheService.get(counterKey) || 0
const nextGroupId = currentCounter + 1
// Store with no TTL (persistent within session, cleared on app restart)
// Store with no TTL (persistent within task lifecycle, cleared on app restart)
cacheService.set(counterKey, nextGroupId)
logger.debug('Generated siblingsGroupId', { topicId, siblingsGroupId: nextGroupId })
return nextGroupId
@ -594,20 +594,20 @@ class StreamingService {
}
/**
* Convert session data to database update payload
* Convert task data to database update payload
*
* @param session - Streaming session
* @param task - Streaming task
* @param status - Final message status
* @returns Update payload for database
*/
private convertToUpdatePayload(
session: StreamingSession,
task: StreamingTask,
status: AssistantMessageStatus
): {
messageUpdates: Partial<Message> & Pick<Message, 'id'>
blocks: MessageBlock[]
} {
const blocks = Object.values(session.blocks)
const blocks = Object.values(task.blocks)
// Ensure all blocks have final status
// Use type assertion since we're only updating the status field
@ -623,13 +623,13 @@ class StreamingService {
})
const messageUpdates: Partial<Message> & Pick<Message, 'id'> = {
id: session.messageId,
id: task.messageId,
status,
blocks: session.message.blocks,
blocks: task.message.blocks,
updatedAt: new Date().toISOString(),
// Include usage and metrics if available
...(session.message.usage && { usage: session.message.usage }),
...(session.message.metrics && { metrics: session.message.metrics })
...(task.message.usage && { usage: task.message.usage }),
...(task.message.metrics && { metrics: task.message.metrics })
}
return {
@ -639,17 +639,17 @@ class StreamingService {
}
/**
* Convert session data to Data API UpdateMessageDto format
* Convert task data to Data API UpdateMessageDto format
*
* Converts from renderer format (MessageBlock with id/status) to
* shared format (MessageDataBlock without id/status) for Data API persistence.
*
* @param session - Streaming session
* @param task - Streaming task
* @param status - Final message status
* @returns UpdateMessageDto for Data API PATCH request
*/
private convertToDataApiFormat(session: StreamingSession, status: AssistantMessageStatus): UpdateMessageDto {
const blocks = Object.values(session.blocks)
private convertToDataApiFormat(task: StreamingTask, status: AssistantMessageStatus): UpdateMessageDto {
const blocks = Object.values(task.blocks)
// Convert MessageBlock[] to MessageDataBlock[]
// Remove id, status, messageId fields as they are renderer-specific, not part of MessageDataBlock
@ -664,13 +664,13 @@ class StreamingService {
// Build MessageStats from usage and metrics
// Note: Renderer uses 'time_first_token_millsec' while shared uses 'timeFirstTokenMs'
const stats: MessageStats | undefined =
session.message.usage || session.message.metrics
task.message.usage || task.message.metrics
? {
promptTokens: session.message.usage?.prompt_tokens,
completionTokens: session.message.usage?.completion_tokens,
totalTokens: session.message.usage?.total_tokens,
timeFirstTokenMs: session.message.metrics?.time_first_token_millsec,
timeCompletionMs: session.message.metrics?.time_completion_millsec
promptTokens: task.message.usage?.prompt_tokens,
completionTokens: task.message.usage?.completion_tokens,
totalTokens: task.message.usage?.total_tokens,
timeFirstTokenMs: task.message.metrics?.time_first_token_millsec,
timeCompletionMs: task.message.metrics?.time_completion_millsec
}
: undefined
@ -687,4 +687,4 @@ export const streamingService = new StreamingService()
// Also export class for testing
export { StreamingService }
export type { StartSessionOptions, StreamingSession }
export type { StartTaskOptions, StreamingTask }

View File

@ -235,15 +235,15 @@ export const createBaseCallbacks = (deps: BaseCallbacksDependencies) => {
response?.usage?.prompt_tokens === 0 ||
response?.usage?.completion_tokens === 0)
) {
// Use context from session for usage estimation
const session = streamingService.getSession(assistantMsgId)
if (session?.contextMessages && session.contextMessages.length > 0) {
// Use context from task for usage estimation
const task = streamingService.getTask(assistantMsgId)
if (task?.contextMessages && task.contextMessages.length > 0) {
// Include the final assistant message in context for accurate estimation
const finalContextWithAssistant = [...session.contextMessages, finalAssistantMsg]
const finalContextWithAssistant = [...task.contextMessages, finalAssistantMsg]
const usage = await estimateMessagesUsage({ assistant, messages: finalContextWithAssistant })
response.usage = usage
} else {
logger.debug('Skipping usage estimation - contextMessages not available in session')
logger.debug('Skipping usage estimation - contextMessages not available in task')
}
}
}

View File

@ -1,5 +1,5 @@
export { BlockManager } from './BlockManager'
export type { createCallbacks as CreateCallbacksFunction } from './callbacks'
export { createCallbacks } from './callbacks'
export type { StartSessionOptions, StreamingSession } from './StreamingService'
export type { StartTaskOptions, StreamingTask } from './StreamingService'
export { StreamingService, streamingService } from './StreamingService'

View File

@ -18,7 +18,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
* Create mock callbacks for testing.
*
* NOTE: Updated to use simplified dependencies after StreamingService refactoring.
* Now we need to initialize StreamingService session before creating callbacks.
* Now we need to initialize StreamingService task before creating callbacks.
*/
const createMockCallbacks = (
mockAssistantMsgId: string,
@ -26,8 +26,8 @@ const createMockCallbacks = (
mockAssistant: Assistant
// dispatch and getState are no longer needed after StreamingService refactoring
) => {
// Initialize streaming session for tests
streamingService.startSession(mockTopicId, mockAssistantMsgId, {
// Initialize streaming task for tests
streamingService.startTask(mockTopicId, mockAssistantMsgId, {
parentId: 'test-user-msg-id',
role: 'assistant',
assistantId: mockAssistant.id,

View File

@ -566,9 +566,9 @@ const fetchAndProcessAgentResponseImpl = async (
try {
dispatch(newMessagesActions.setTopicLoading({ topicId, loading: true }))
// Initialize streaming session in StreamingService
// Initialize streaming task in StreamingService
// NOTE: parentId is used internally; askId in renderer format is derived from parentId
streamingService.startSession(topicId, assistantMessage.id, {
streamingService.startTask(topicId, assistantMessage.id, {
parentId: userMessageId,
siblingsGroupId: 0,
role: 'assistant',
@ -809,7 +809,7 @@ const fetchAndProcessAssistantResponseImpl = async (
try {
dispatch(newMessagesActions.setTopicLoading({ topicId, loading: true }))
// Build context messages first (needed for startSession)
// Build context messages first (needed for startTask)
const allMessagesForTopic = selectMessagesForTopic(getState(), topicId)
let messagesForContext: Message[] = []
@ -839,9 +839,9 @@ const fetchAndProcessAssistantResponseImpl = async (
}
}
// Initialize streaming session in StreamingService (includes context for usage estimation)
// Initialize streaming task in StreamingService (includes context for usage estimation)
// NOTE: parentId is used internally; askId in renderer format is derived from parentId
streamingService.startSession(topicId, assistantMsgId, {
streamingService.startTask(topicId, assistantMsgId, {
parentId: userMessageId!,
siblingsGroupId,
role: 'assistant',