From e80c6b06ba223fb1a87fe1687fb801262e945729 Mon Sep 17 00:00:00 2001 From: fullex <0xfullex@gmail.com> Date: Fri, 9 Jan 2026 16:15:01 +0800 Subject: [PATCH] refactor(StreamingService): rename session to task and update related logic - Refactored the StreamingService to replace all instances of "session" with "task" to better reflect the functionality and improve clarity. - Updated related types, methods, and cache handling to align with the new terminology, ensuring consistent usage throughout the codebase. - Enhanced comments and documentation to reflect the changes in terminology and functionality. --- packages/shared/data/cache/cacheSchemas.ts | 10 +- .../messageStreaming/StreamingService.ts | 246 +++++++++--------- .../callbacks/baseCallbacks.ts | 10 +- .../src/services/messageStreaming/index.ts | 2 +- .../streamCallback.integration.test.ts | 6 +- src/renderer/src/store/thunk/messageThunk.ts | 10 +- 6 files changed, 142 insertions(+), 142 deletions(-) diff --git a/packages/shared/data/cache/cacheSchemas.ts b/packages/shared/data/cache/cacheSchemas.ts index 5c411a172c..45b616da13 100644 --- a/packages/shared/data/cache/cacheSchemas.ts +++ b/packages/shared/data/cache/cacheSchemas.ts @@ -149,11 +149,11 @@ export type UseCacheSchema = { // TODO [v2]: Replace `any` with proper types after newMessage.ts types are // migrated to packages/shared/data/types/message.ts // Current types: - // - StreamingSession: defined locally in StreamingService.ts + // - StreamingTask: defined locally in StreamingService.ts // - Message: src/renderer/src/types/newMessage.ts (renderer format, not shared/Message) // - MessageBlock: src/renderer/src/types/newMessage.ts - 'message.streaming.session.${messageId}': any // StreamingSession - 'message.streaming.topic_sessions.${topicId}': string[] + 'message.streaming.task.${messageId}': any // StreamingTask + 'message.streaming.topic_tasks.${topicId}': string[] 'message.streaming.content.${messageId}': any // Message (renderer format) 'message.streaming.block.${blockId}': any // MessageBlock 'message.streaming.siblings_counter.${topicId}': number @@ -202,8 +202,8 @@ export const DefaultUseCache: UseCacheSchema = { 'entity.cache.${type}_${id}': { loaded: false, data: null }, // Message Streaming Cache - 'message.streaming.session.${messageId}': null, - 'message.streaming.topic_sessions.${topicId}': [], + 'message.streaming.task.${messageId}': null, + 'message.streaming.topic_tasks.${topicId}': [], 'message.streaming.content.${messageId}': null, 'message.streaming.block.${blockId}': null, 'message.streaming.siblings_counter.${topicId}': 0 diff --git a/src/renderer/src/services/messageStreaming/StreamingService.ts b/src/renderer/src/services/messageStreaming/StreamingService.ts index 674e5fbace..ce76359799 100644 --- a/src/renderer/src/services/messageStreaming/StreamingService.ts +++ b/src/renderer/src/services/messageStreaming/StreamingService.ts @@ -6,14 +6,14 @@ * and persists final data to the database via Data API or dbService. * * Key Design Decisions: - * - Uses messageId as primary key for sessions (supports multi-model concurrent streaming) + * - Uses messageId as primary key for tasks (supports multi-model concurrent streaming) * - Streaming data is stored in memory only (not Redux, not Dexie during streaming) * - On finalize, data is converted to new format and persisted via appropriate data source * - Throttling is handled externally by messageThunk.ts (preserves existing throttle logic) * * Cache Key Strategy (uses schema-defined template keys from cacheSchemas.ts): - * - Session key: `message.streaming.session.${messageId}` - Internal session lifecycle management - * - Topic sessions index: `message.streaming.topic_sessions.${topicId}` - Track active sessions per topic + * - Task key: `message.streaming.task.${messageId}` - Internal task lifecycle management + * - Topic tasks index: `message.streaming.topic_tasks.${topicId}` - Track active tasks per topic * - Message key: `message.streaming.content.${messageId}` - UI subscription for message-level changes * - Block key: `message.streaming.block.${blockId}` - UI subscription for block content updates * - Siblings counter: `message.streaming.siblings_counter.${topicId}` - Multi-model response group counter @@ -34,19 +34,19 @@ import { dbService } from '../db' const logger = loggerService.withContext('StreamingService') // Cache key generators (matches template keys in cacheSchemas.ts) -const getSessionKey = (messageId: string) => `message.streaming.session.${messageId}` as const -const getTopicSessionsKey = (topicId: string) => `message.streaming.topic_sessions.${topicId}` as const +const getTaskKey = (messageId: string) => `message.streaming.task.${messageId}` as const +const getTopicTasksKey = (topicId: string) => `message.streaming.topic_tasks.${topicId}` as const const getMessageKey = (messageId: string) => `message.streaming.content.${messageId}` as const const getBlockKey = (blockId: string) => `message.streaming.block.${blockId}` as const const getSiblingsGroupCounterKey = (topicId: string) => `message.streaming.siblings_counter.${topicId}` as const -// Session TTL for auto-cleanup (prevents memory leaks from crashed processes) -const SESSION_TTL = 5 * 60 * 1000 // 5 minutes +// Task TTL for auto-cleanup (prevents memory leaks from crashed processes) +const TASK_TTL = 5 * 60 * 1000 // 5 minutes /** - * Streaming session data structure (stored in memory) + * Streaming task data structure (stored in memory) */ -interface StreamingSession { +interface StreamingTask { topicId: string messageId: string @@ -68,13 +68,13 @@ interface StreamingSession { } /** - * Options for starting a streaming session + * Options for starting a streaming task * * NOTE: Internal naming uses v2 convention (parentId). * The renderer Message format uses 'askId' for backward compatibility, * which is set from parentId during message creation. */ -interface StartSessionOptions { +interface StartTaskOptions { parentId: string siblingsGroupId?: number // Defaults to 0 (single model), >0 for multi-model response groups role: 'assistant' @@ -103,7 +103,7 @@ interface CreateAssistantMessageOptions { * StreamingService - Manages streaming message state during generation * * Responsibilities: - * - Session lifecycle management (start, update, finalize, clear) + * - Task lifecycle management (start, update, finalize, end) * - Block operations (add, update, get) * - Message operations (update, get) * - Cache-based state management with automatic TTL cleanup @@ -112,19 +112,19 @@ class StreamingService { // Internal mapping: blockId -> messageId (for efficient block updates) private blockToMessageMap = new Map() - // ============ Session Lifecycle ============ + // ============ Task Lifecycle ============ /** - * Start a streaming session for a message + * Start a streaming task for a message * * IMPORTANT: The message must be created via Data API POST before calling this. * This method initializes the in-memory streaming state. * - * @param topicId - Topic ID (used for topic sessions index) + * @param topicId - Topic ID (used for topic tasks index) * @param messageId - Message ID returned from Data API POST - * @param options - Session options including parentId and siblingsGroupId + * @param options - Task options including parentId and siblingsGroupId */ - startSession(topicId: string, messageId: string, options: StartSessionOptions): void { + startTask(topicId: string, messageId: string, options: StartTaskOptions): void { const { parentId, siblingsGroupId = 0, @@ -154,8 +154,8 @@ class StreamingService { agentSessionId } - // Create session - const session: StreamingSession = { + // Create task + const task: StreamingTask = { topicId, messageId, message, @@ -166,24 +166,24 @@ class StreamingService { startedAt: Date.now() } - // Store session with TTL - cacheService.set(getSessionKey(messageId), session, SESSION_TTL) + // Store task with TTL + cacheService.set(getTaskKey(messageId), task, TASK_TTL) // Store message data for UI subscription - cacheService.set(getMessageKey(messageId), message, SESSION_TTL) + cacheService.set(getMessageKey(messageId), message, TASK_TTL) - // Add to topic sessions index - const topicSessions = cacheService.get(getTopicSessionsKey(topicId)) || [] - if (!topicSessions.includes(messageId)) { - topicSessions.push(messageId) - cacheService.set(getTopicSessionsKey(topicId), topicSessions, SESSION_TTL) + // Add to topic tasks index + const topicTasks = cacheService.get(getTopicTasksKey(topicId)) || [] + if (!topicTasks.includes(messageId)) { + topicTasks.push(messageId) + cacheService.set(getTopicTasksKey(topicId), topicTasks, TASK_TTL) } - logger.debug('Started streaming session', { topicId, messageId, parentId, siblingsGroupId }) + logger.debug('Started streaming task', { topicId, messageId, parentId, siblingsGroupId }) } /** - * Finalize a streaming session by persisting data to database + * Finalize a streaming task by persisting data to database * * This method: * 1. Converts streaming data to the appropriate format @@ -197,50 +197,50 @@ class StreamingService { * Agent message storage will be migrated to Data API in a later phase. * Once migration is complete, all paths will use Data API uniformly.) * - * @param messageId - Session message ID + * @param messageId - Task message ID * @param status - Final message status */ async finalize(messageId: string, status: AssistantMessageStatus): Promise { - const session = this.getSession(messageId) - if (!session) { - logger.warn(`finalize called for non-existent session: ${messageId}`) + const task = this.getTask(messageId) + if (!task) { + logger.warn(`finalize called for non-existent task: ${messageId}`) return } try { // Route to appropriate data source based on topic type // TEMPORARY: Agent sessions use dbService until migration to Data API is complete - if (isAgentSessionTopicId(session.topicId)) { - const updatePayload = this.convertToUpdatePayload(session, status) - await dbService.updateMessageAndBlocks(session.topicId, updatePayload.messageUpdates, updatePayload.blocks) + if (isAgentSessionTopicId(task.topicId)) { + const updatePayload = this.convertToUpdatePayload(task, status) + await dbService.updateMessageAndBlocks(task.topicId, updatePayload.messageUpdates, updatePayload.blocks) } else { // Normal topic → Use Data API for persistence (has built-in retry) - const dataApiPayload = this.convertToDataApiFormat(session, status) - await dataApiService.patch(`/messages/${session.messageId}`, { body: dataApiPayload }) + const dataApiPayload = this.convertToDataApiFormat(task, status) + await dataApiService.patch(`/messages/${task.messageId}`, { body: dataApiPayload }) } - this.clearSession(messageId) - logger.debug('Finalized streaming session', { messageId, status }) + this.endTask(messageId) + logger.debug('Finalized streaming task', { messageId, status }) } catch (error) { logger.error('finalize failed:', error as Error) - // Don't clear session on error - TTL will auto-clean to prevent memory leak + // Don't end task on error - TTL will auto-clean to prevent memory leak throw error } } /** - * Clear a streaming session and all related cache keys + * End a streaming task and clear all related cache keys * - * @param messageId - Session message ID + * @param messageId - Task message ID */ - clearSession(messageId: string): void { - const session = this.getSession(messageId) - if (!session) { + endTask(messageId: string): void { + const task = this.getTask(messageId) + if (!task) { return } // Remove block mappings - Object.keys(session.blocks).forEach((blockId) => { + Object.keys(task.blocks).forEach((blockId) => { this.blockToMessageMap.delete(blockId) cacheService.delete(getBlockKey(blockId)) }) @@ -248,34 +248,34 @@ class StreamingService { // Remove message cache cacheService.delete(getMessageKey(messageId)) - // Remove from topic sessions index - const topicSessions = cacheService.get(getTopicSessionsKey(session.topicId)) || [] - const updatedTopicSessions = topicSessions.filter((id) => id !== messageId) - if (updatedTopicSessions.length > 0) { - cacheService.set(getTopicSessionsKey(session.topicId), updatedTopicSessions, SESSION_TTL) + // Remove from topic tasks index + const topicTasks = cacheService.get(getTopicTasksKey(task.topicId)) || [] + const updatedTopicTasks = topicTasks.filter((id: string) => id !== messageId) + if (updatedTopicTasks.length > 0) { + cacheService.set(getTopicTasksKey(task.topicId), updatedTopicTasks, TASK_TTL) } else { - cacheService.delete(getTopicSessionsKey(session.topicId)) + cacheService.delete(getTopicTasksKey(task.topicId)) } - // Remove session - cacheService.delete(getSessionKey(messageId)) + // Remove task + cacheService.delete(getTaskKey(messageId)) - logger.debug('Cleared streaming session', { messageId, topicId: session.topicId }) + logger.debug('Ended streaming task', { messageId, topicId: task.topicId }) } // ============ Block Operations ============ /** - * Add a new block to a streaming session + * Add a new block to a streaming task * (Replaces dispatch(upsertOneBlock)) * * @param messageId - Parent message ID * @param block - Block to add */ addBlock(messageId: string, block: MessageBlock): void { - const session = this.getSession(messageId) - if (!session) { - logger.warn(`addBlock called for non-existent session: ${messageId}`) + const task = this.getTask(messageId) + if (!task) { + logger.warn(`addBlock called for non-existent task: ${messageId}`) return } @@ -284,27 +284,27 @@ class StreamingService { // Create new message with updated blocks (immutable update for cache notification) const newMessage: Message = { - ...session.message, - blocks: session.message.blocks.includes(block.id) ? session.message.blocks : [...session.message.blocks, block.id] + ...task.message, + blocks: task.message.blocks.includes(block.id) ? task.message.blocks : [...task.message.blocks, block.id] } - // Create new session with updated blocks and message (immutable update for cache notification) - const newSession: StreamingSession = { - ...session, - blocks: { ...session.blocks, [block.id]: block }, + // Create new task with updated blocks and message (immutable update for cache notification) + const newTask: StreamingTask = { + ...task, + blocks: { ...task.blocks, [block.id]: block }, message: newMessage } // Update caches with new references to trigger notifications - cacheService.set(getSessionKey(messageId), newSession, SESSION_TTL) - cacheService.set(getBlockKey(block.id), block, SESSION_TTL) - cacheService.set(getMessageKey(messageId), newMessage, SESSION_TTL) + cacheService.set(getTaskKey(messageId), newTask, TASK_TTL) + cacheService.set(getBlockKey(block.id), block, TASK_TTL) + cacheService.set(getMessageKey(messageId), newMessage, TASK_TTL) - logger.debug('Added block to session', { messageId, blockId: block.id, blockType: block.type }) + logger.debug('Added block to task', { messageId, blockId: block.id, blockType: block.type }) } /** - * Update a block in a streaming session + * Update a block in a streaming task * (Replaces dispatch(updateOneBlock)) * * NOTE: This method does NOT include throttling. Throttling is controlled @@ -320,34 +320,34 @@ class StreamingService { return } - const session = this.getSession(messageId) - if (!session) { - logger.warn(`updateBlock: Session not found for message ${messageId}`) + const task = this.getTask(messageId) + if (!task) { + logger.warn(`updateBlock: Task not found for message ${messageId}`) return } - const existingBlock = session.blocks[blockId] + const existingBlock = task.blocks[blockId] if (!existingBlock) { - logger.warn(`updateBlock: Block ${blockId} not found in session`) + logger.warn(`updateBlock: Block ${blockId} not found in task`) return } // Merge changes - use type assertion since we're updating the same block type const updatedBlock = { ...existingBlock, ...changes } as MessageBlock - // Create new session with updated block (immutable update for cache notification) - const newSession: StreamingSession = { - ...session, - blocks: { ...session.blocks, [blockId]: updatedBlock } + // Create new task with updated block (immutable update for cache notification) + const newTask: StreamingTask = { + ...task, + blocks: { ...task.blocks, [blockId]: updatedBlock } } // Update caches with new references to trigger notifications - cacheService.set(getSessionKey(messageId), newSession, SESSION_TTL) - cacheService.set(getBlockKey(blockId), updatedBlock, SESSION_TTL) + cacheService.set(getTaskKey(messageId), newTask, TASK_TTL) + cacheService.set(getBlockKey(blockId), updatedBlock, TASK_TTL) } /** - * Get a block from the streaming session + * Get a block from the streaming task * * @param blockId - Block ID * @returns Block or null if not found @@ -359,35 +359,35 @@ class StreamingService { // ============ Message Operations ============ /** - * Update message properties in the streaming session + * Update message properties in the streaming task * (Replaces dispatch(newMessagesActions.updateMessage)) * * @param messageId - Message ID * @param updates - Partial message updates */ updateMessage(messageId: string, updates: Partial): void { - const session = this.getSession(messageId) - if (!session) { - logger.warn(`updateMessage called for non-existent session: ${messageId}`) + const task = this.getTask(messageId) + if (!task) { + logger.warn(`updateMessage called for non-existent task: ${messageId}`) return } // Create new message with updates (immutable update for cache notification) - const newMessage = { ...session.message, ...updates } + const newMessage = { ...task.message, ...updates } - // Create new session with updated message (immutable update for cache notification) - const newSession: StreamingSession = { - ...session, + // Create new task with updated message (immutable update for cache notification) + const newTask: StreamingTask = { + ...task, message: newMessage } // Update caches with new references to trigger notifications - cacheService.set(getSessionKey(messageId), newSession, SESSION_TTL) - cacheService.set(getMessageKey(messageId), newMessage, SESSION_TTL) + cacheService.set(getTaskKey(messageId), newTask, TASK_TTL) + cacheService.set(getMessageKey(messageId), newMessage, TASK_TTL) } /** - * Get a message from the streaming session + * Get a message from the streaming task * * @param messageId - Message ID * @returns Message or null if not found @@ -399,14 +399,14 @@ class StreamingService { // ============ Query Methods ============ /** - * Check if a topic has any active streaming sessions + * Check if a topic has any active streaming tasks * * @param topicId - Topic ID * @returns True if streaming is active */ isStreaming(topicId: string): boolean { - const topicSessions = cacheService.get(getTopicSessionsKey(topicId)) || [] - return topicSessions.length > 0 + const topicTasks = cacheService.get(getTopicTasksKey(topicId)) || [] + return topicTasks.length > 0 } /** @@ -416,17 +416,17 @@ class StreamingService { * @returns True if message is streaming */ isMessageStreaming(messageId: string): boolean { - return cacheService.has(getSessionKey(messageId)) + return cacheService.has(getTaskKey(messageId)) } /** - * Get the streaming session for a message + * Get the streaming task for a message * * @param messageId - Message ID - * @returns Session or null if not found + * @returns Task or null if not found */ - getSession(messageId: string): StreamingSession | null { - return cacheService.get(getSessionKey(messageId)) || null + getTask(messageId: string): StreamingTask | null { + return cacheService.get(getTaskKey(messageId)) || null } /** @@ -436,7 +436,7 @@ class StreamingService { * @returns Array of message IDs */ getActiveMessageIds(topicId: string): string[] { - return cacheService.get(getTopicSessionsKey(topicId)) || [] + return cacheService.get(getTopicTasksKey(topicId)) || [] } // ============ siblingsGroupId Generation ============ @@ -464,7 +464,7 @@ class StreamingService { const counterKey = getSiblingsGroupCounterKey(topicId) const currentCounter = cacheService.get(counterKey) || 0 const nextGroupId = currentCounter + 1 - // Store with no TTL (persistent within session, cleared on app restart) + // Store with no TTL (persistent within task lifecycle, cleared on app restart) cacheService.set(counterKey, nextGroupId) logger.debug('Generated siblingsGroupId', { topicId, siblingsGroupId: nextGroupId }) return nextGroupId @@ -594,20 +594,20 @@ class StreamingService { } /** - * Convert session data to database update payload + * Convert task data to database update payload * - * @param session - Streaming session + * @param task - Streaming task * @param status - Final message status * @returns Update payload for database */ private convertToUpdatePayload( - session: StreamingSession, + task: StreamingTask, status: AssistantMessageStatus ): { messageUpdates: Partial & Pick blocks: MessageBlock[] } { - const blocks = Object.values(session.blocks) + const blocks = Object.values(task.blocks) // Ensure all blocks have final status // Use type assertion since we're only updating the status field @@ -623,13 +623,13 @@ class StreamingService { }) const messageUpdates: Partial & Pick = { - id: session.messageId, + id: task.messageId, status, - blocks: session.message.blocks, + blocks: task.message.blocks, updatedAt: new Date().toISOString(), // Include usage and metrics if available - ...(session.message.usage && { usage: session.message.usage }), - ...(session.message.metrics && { metrics: session.message.metrics }) + ...(task.message.usage && { usage: task.message.usage }), + ...(task.message.metrics && { metrics: task.message.metrics }) } return { @@ -639,17 +639,17 @@ class StreamingService { } /** - * Convert session data to Data API UpdateMessageDto format + * Convert task data to Data API UpdateMessageDto format * * Converts from renderer format (MessageBlock with id/status) to * shared format (MessageDataBlock without id/status) for Data API persistence. * - * @param session - Streaming session + * @param task - Streaming task * @param status - Final message status * @returns UpdateMessageDto for Data API PATCH request */ - private convertToDataApiFormat(session: StreamingSession, status: AssistantMessageStatus): UpdateMessageDto { - const blocks = Object.values(session.blocks) + private convertToDataApiFormat(task: StreamingTask, status: AssistantMessageStatus): UpdateMessageDto { + const blocks = Object.values(task.blocks) // Convert MessageBlock[] to MessageDataBlock[] // Remove id, status, messageId fields as they are renderer-specific, not part of MessageDataBlock @@ -664,13 +664,13 @@ class StreamingService { // Build MessageStats from usage and metrics // Note: Renderer uses 'time_first_token_millsec' while shared uses 'timeFirstTokenMs' const stats: MessageStats | undefined = - session.message.usage || session.message.metrics + task.message.usage || task.message.metrics ? { - promptTokens: session.message.usage?.prompt_tokens, - completionTokens: session.message.usage?.completion_tokens, - totalTokens: session.message.usage?.total_tokens, - timeFirstTokenMs: session.message.metrics?.time_first_token_millsec, - timeCompletionMs: session.message.metrics?.time_completion_millsec + promptTokens: task.message.usage?.prompt_tokens, + completionTokens: task.message.usage?.completion_tokens, + totalTokens: task.message.usage?.total_tokens, + timeFirstTokenMs: task.message.metrics?.time_first_token_millsec, + timeCompletionMs: task.message.metrics?.time_completion_millsec } : undefined @@ -687,4 +687,4 @@ export const streamingService = new StreamingService() // Also export class for testing export { StreamingService } -export type { StartSessionOptions, StreamingSession } +export type { StartTaskOptions, StreamingTask } diff --git a/src/renderer/src/services/messageStreaming/callbacks/baseCallbacks.ts b/src/renderer/src/services/messageStreaming/callbacks/baseCallbacks.ts index e8193041db..ddda03f626 100644 --- a/src/renderer/src/services/messageStreaming/callbacks/baseCallbacks.ts +++ b/src/renderer/src/services/messageStreaming/callbacks/baseCallbacks.ts @@ -235,15 +235,15 @@ export const createBaseCallbacks = (deps: BaseCallbacksDependencies) => { response?.usage?.prompt_tokens === 0 || response?.usage?.completion_tokens === 0) ) { - // Use context from session for usage estimation - const session = streamingService.getSession(assistantMsgId) - if (session?.contextMessages && session.contextMessages.length > 0) { + // Use context from task for usage estimation + const task = streamingService.getTask(assistantMsgId) + if (task?.contextMessages && task.contextMessages.length > 0) { // Include the final assistant message in context for accurate estimation - const finalContextWithAssistant = [...session.contextMessages, finalAssistantMsg] + const finalContextWithAssistant = [...task.contextMessages, finalAssistantMsg] const usage = await estimateMessagesUsage({ assistant, messages: finalContextWithAssistant }) response.usage = usage } else { - logger.debug('Skipping usage estimation - contextMessages not available in session') + logger.debug('Skipping usage estimation - contextMessages not available in task') } } } diff --git a/src/renderer/src/services/messageStreaming/index.ts b/src/renderer/src/services/messageStreaming/index.ts index 13b45bdd2e..5716201080 100644 --- a/src/renderer/src/services/messageStreaming/index.ts +++ b/src/renderer/src/services/messageStreaming/index.ts @@ -1,5 +1,5 @@ export { BlockManager } from './BlockManager' export type { createCallbacks as CreateCallbacksFunction } from './callbacks' export { createCallbacks } from './callbacks' -export type { StartSessionOptions, StreamingSession } from './StreamingService' +export type { StartTaskOptions, StreamingTask } from './StreamingService' export { StreamingService, streamingService } from './StreamingService' diff --git a/src/renderer/src/store/thunk/__tests__/streamCallback.integration.test.ts b/src/renderer/src/store/thunk/__tests__/streamCallback.integration.test.ts index 9cba03512b..30d796587d 100644 --- a/src/renderer/src/store/thunk/__tests__/streamCallback.integration.test.ts +++ b/src/renderer/src/store/thunk/__tests__/streamCallback.integration.test.ts @@ -18,7 +18,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' * Create mock callbacks for testing. * * NOTE: Updated to use simplified dependencies after StreamingService refactoring. - * Now we need to initialize StreamingService session before creating callbacks. + * Now we need to initialize StreamingService task before creating callbacks. */ const createMockCallbacks = ( mockAssistantMsgId: string, @@ -26,8 +26,8 @@ const createMockCallbacks = ( mockAssistant: Assistant // dispatch and getState are no longer needed after StreamingService refactoring ) => { - // Initialize streaming session for tests - streamingService.startSession(mockTopicId, mockAssistantMsgId, { + // Initialize streaming task for tests + streamingService.startTask(mockTopicId, mockAssistantMsgId, { parentId: 'test-user-msg-id', role: 'assistant', assistantId: mockAssistant.id, diff --git a/src/renderer/src/store/thunk/messageThunk.ts b/src/renderer/src/store/thunk/messageThunk.ts index 00397b7941..c71cfb1df2 100644 --- a/src/renderer/src/store/thunk/messageThunk.ts +++ b/src/renderer/src/store/thunk/messageThunk.ts @@ -566,9 +566,9 @@ const fetchAndProcessAgentResponseImpl = async ( try { dispatch(newMessagesActions.setTopicLoading({ topicId, loading: true })) - // Initialize streaming session in StreamingService + // Initialize streaming task in StreamingService // NOTE: parentId is used internally; askId in renderer format is derived from parentId - streamingService.startSession(topicId, assistantMessage.id, { + streamingService.startTask(topicId, assistantMessage.id, { parentId: userMessageId, siblingsGroupId: 0, role: 'assistant', @@ -809,7 +809,7 @@ const fetchAndProcessAssistantResponseImpl = async ( try { dispatch(newMessagesActions.setTopicLoading({ topicId, loading: true })) - // Build context messages first (needed for startSession) + // Build context messages first (needed for startTask) const allMessagesForTopic = selectMessagesForTopic(getState(), topicId) let messagesForContext: Message[] = [] @@ -839,9 +839,9 @@ const fetchAndProcessAssistantResponseImpl = async ( } } - // Initialize streaming session in StreamingService (includes context for usage estimation) + // Initialize streaming task in StreamingService (includes context for usage estimation) // NOTE: parentId is used internally; askId in renderer format is derived from parentId - streamingService.startSession(topicId, assistantMsgId, { + streamingService.startTask(topicId, assistantMsgId, { parentId: userMessageId!, siblingsGroupId, role: 'assistant',