diff --git a/src/renderer/src/pages/home/Messages/AgentSessionMessages.tsx b/src/renderer/src/pages/home/Messages/AgentSessionMessages.tsx index 85a4afce93..70d5982987 100644 --- a/src/renderer/src/pages/home/Messages/AgentSessionMessages.tsx +++ b/src/renderer/src/pages/home/Messages/AgentSessionMessages.tsx @@ -27,13 +27,18 @@ const AgentSessionMessages: React.FC = ({ agentId, sessionId }) => { const sessionTopicId = useMemo(() => buildAgentSessionTopicId(sessionId), [sessionId]) const messages = useAppSelector((state) => selectMessagesForTopic(state, sessionTopicId)) - // Load messages when session changes + // Load messages when session changes or when messages are empty useEffect(() => { if (sessionId) { - logger.info('Loading messages for agent session', { sessionId }) - dispatch(loadTopicMessagesThunk(sessionTopicId, true)) // Force reload to get latest from backend + // Only load if we don't have messages yet + // This prevents overwriting messages that were just added + const hasMessages = messages && messages.length > 0 + if (!hasMessages) { + logger.info('Loading messages for agent session', { sessionId }) + dispatch(loadTopicMessagesThunk(sessionTopicId, false)) // Don't force reload if we have messages in Redux + } } - }, [dispatch, sessionId, sessionTopicId]) + }, [dispatch, sessionId, sessionTopicId, messages?.length]) const displayMessages = useMemo(() => { if (!messages || messages.length === 0) return [] diff --git a/src/renderer/src/services/db/AgentMessageDataSource.ts b/src/renderer/src/services/db/AgentMessageDataSource.ts index 40efb190a2..b4dda0c2d9 100644 --- a/src/renderer/src/services/db/AgentMessageDataSource.ts +++ b/src/renderer/src/services/db/AgentMessageDataSource.ts @@ -2,17 +2,104 @@ import { loggerService } from '@logger' import type { AgentPersistedMessage } from '@renderer/types/agent' import type { Message, MessageBlock } from '@renderer/types/newMessage' import { IpcChannel } from '@shared/IpcChannel' +import { throttle } from 'lodash' +import { LRUCache } from 'lru-cache' import type { MessageDataSource } from './types' import { extractSessionId } from './types' const logger = loggerService.withContext('AgentMessageDataSource') +/** + * Streaming message cache to track messages being streamed + * Key: messageId, Value: { message, blocks, isComplete } + */ +const streamingMessageCache = new LRUCache< + string, + { + message: Message + blocks: MessageBlock[] + isComplete: boolean + sessionId: string + } +>({ + max: 100, + ttl: 1000 * 60 * 5 // 5 minutes +}) + +/** + * Throttled persisters for each message to batch updates during streaming + */ +const messagePersistThrottlers = new LRUCache>({ + max: 100, + ttl: 1000 * 60 * 5 +}) + /** * IPC-based implementation of MessageDataSource * Handles agent session messages through backend communication */ export class AgentMessageDataSource implements MessageDataSource { + // ============ Helper Methods ============ + + /** + * Get or create a throttled persister for a message + */ + private getMessagePersister(messageId: string): ReturnType { + if (!messagePersistThrottlers.has(messageId)) { + const persister = throttle(async () => { + const cached = streamingMessageCache.get(messageId) + if (!cached) return + + const { message, blocks, sessionId, isComplete } = cached + + try { + // Persist to backend + await window.electron.ipcRenderer.invoke(IpcChannel.AgentMessage_PersistExchange, { + sessionId, + agentSessionId: '', + ...(message.role === 'user' + ? { user: { payload: { message, blocks } } } + : { assistant: { payload: { message, blocks } } }) + }) + + logger.debug(`Persisted ${isComplete ? 'complete' : 'streaming'} message ${messageId} to backend`) + + // Clean up if complete + if (isComplete) { + streamingMessageCache.delete(messageId) + messagePersistThrottlers.delete(messageId) + } + } catch (error) { + logger.error(`Failed to persist message ${messageId}:`, error as Error) + } + }, 500) // Throttle to 500ms for agent messages (less frequent than chat) + + messagePersistThrottlers.set(messageId, persister) + } + + return messagePersistThrottlers.get(messageId)! + } + + /** + * Check if a message is in streaming state based on status + */ + private isMessageStreaming(message: Partial): boolean { + return message.status?.includes('ing') ?? false + } + + /** + * Clean up resources for a message + */ + private cleanupMessage(messageId: string): void { + streamingMessageCache.delete(messageId) + const throttler = messagePersistThrottlers.get(messageId) + if (throttler) { + throttler.cancel() + messagePersistThrottlers.delete(messageId) + } + } + // ============ Read Operations ============ async fetchMessages(topicId: string): Promise<{ @@ -60,32 +147,50 @@ export class AgentMessageDataSource implements MessageDataSource { // ============ Write Operations ============ async appendMessage(topicId: string, message: Message, blocks: MessageBlock[], _insertIndex?: number): Promise { - // For agent sessions, we need to save messages immediately - // Don't wait for persistExchange which happens after response completion const sessionId = extractSessionId(topicId) if (!sessionId) { throw new Error(`Invalid agent session topicId: ${topicId}`) } try { - // Create a persisted message payload + const isStreaming = this.isMessageStreaming(message) + + // Always persist immediately for visibility in UI const payload: AgentPersistedMessage = { message, blocks } - // Save single message immediately to backend - // Use persistExchange with only one side of the conversation await window.electron.ipcRenderer.invoke(IpcChannel.AgentMessage_PersistExchange, { sessionId, - agentSessionId: '', // Will be set later if needed + agentSessionId: '', ...(message.role === 'user' ? { user: { payload } } : { assistant: { payload } }) }) logger.info(`Saved ${message.role} message for agent session ${sessionId}`, { messageId: message.id, - blockCount: blocks.length + blockCount: blocks.length, + status: message.status, + isStreaming }) + + // If streaming, also set up cache for throttled updates + if (isStreaming && message.role === 'assistant') { + streamingMessageCache.set(message.id, { + message, + blocks, + isComplete: false, + sessionId + }) + + // Set up throttled persister for future updates + this.getMessagePersister(message.id) + + logger.debug(`Set up streaming cache for message ${message.id}`) + } else { + // Clean up any streaming cache for non-streaming messages + this.cleanupMessage(message.id) + } } catch (error) { logger.error(`Failed to save message for agent session ${topicId}:`, error as Error) throw error @@ -141,44 +246,125 @@ export class AgentMessageDataSource implements MessageDataSource { } try { - // Fetch current message from backend if we need to merge - const historicalMessages: AgentPersistedMessage[] = await window.electron.ipcRenderer.invoke( - IpcChannel.AgentMessage_GetHistory, - { sessionId } - ) + const isStreaming = this.isMessageStreaming(messageUpdates) - const existingMessage = historicalMessages?.find((pm) => pm.message?.id === messageUpdates.id) - let finalMessage: Message + // Check if we have cached data for this message + const cached = streamingMessageCache.get(messageUpdates.id) - if (existingMessage?.message) { - // Merge updates with existing message - finalMessage = { ...existingMessage.message, ...messageUpdates } - } else { - // New message, ensure we have required fields - if (!messageUpdates.topicId || !messageUpdates.role) { - logger.warn(`Incomplete message data for ${messageUpdates.id}`) - return + if (isStreaming) { + // During streaming, update cache and trigger throttled persist + let currentMessage: Message + let currentBlocks: MessageBlock[] + + if (cached) { + // Update existing cached message + currentMessage = { ...cached.message, ...messageUpdates } + // Merge blocks - use new blocks if provided, otherwise keep cached + currentBlocks = blocksToUpdate.length > 0 ? blocksToUpdate : cached.blocks + } else { + // First streaming update - fetch from backend or create new + const historicalMessages: AgentPersistedMessage[] = await window.electron.ipcRenderer.invoke( + IpcChannel.AgentMessage_GetHistory, + { sessionId } + ) + + const existingMessage = historicalMessages?.find((pm) => pm.message?.id === messageUpdates.id) + + if (existingMessage?.message) { + currentMessage = { ...existingMessage.message, ...messageUpdates } + currentBlocks = blocksToUpdate.length > 0 ? blocksToUpdate : existingMessage.blocks || [] + } else { + // New message + if (!messageUpdates.topicId || !messageUpdates.role) { + logger.warn(`Incomplete message data for streaming message ${messageUpdates.id}`) + return + } + currentMessage = messageUpdates as Message + currentBlocks = blocksToUpdate + } } - finalMessage = messageUpdates as Message + + // Update cache + streamingMessageCache.set(messageUpdates.id, { + message: currentMessage, + blocks: currentBlocks, + isComplete: false, + sessionId + }) + + // Trigger throttled persist + const persister = this.getMessagePersister(messageUpdates.id) + persister() + + logger.debug(`Updated streaming cache for message ${messageUpdates.id}`, { + status: messageUpdates.status, + blockCount: currentBlocks.length + }) + } else { + // Not streaming - persist immediately + let finalMessage: Message + let finalBlocks: MessageBlock[] + + if (cached) { + // Use cached data as base + finalMessage = { ...cached.message, ...messageUpdates } + finalBlocks = blocksToUpdate.length > 0 ? blocksToUpdate : cached.blocks + } else { + // Fetch from backend if no cache + const historicalMessages: AgentPersistedMessage[] = await window.electron.ipcRenderer.invoke( + IpcChannel.AgentMessage_GetHistory, + { sessionId } + ) + + const existingMessage = historicalMessages?.find((pm) => pm.message?.id === messageUpdates.id) + + if (existingMessage?.message) { + finalMessage = { ...existingMessage.message, ...messageUpdates } + finalBlocks = blocksToUpdate.length > 0 ? blocksToUpdate : existingMessage.blocks || [] + } else { + if (!messageUpdates.topicId || !messageUpdates.role) { + logger.warn(`Incomplete message data for ${messageUpdates.id}`) + return + } + finalMessage = messageUpdates as Message + finalBlocks = blocksToUpdate + } + } + + // Mark as complete in cache if it was streaming + if (cached) { + streamingMessageCache.set(messageUpdates.id, { + message: finalMessage, + blocks: finalBlocks, + isComplete: true, + sessionId + }) + } + + // Persist to backend + await window.electron.ipcRenderer.invoke(IpcChannel.AgentMessage_PersistExchange, { + sessionId, + agentSessionId: '', + ...(finalMessage.role === 'user' + ? { user: { payload: { message: finalMessage, blocks: finalBlocks } } } + : { assistant: { payload: { message: finalMessage, blocks: finalBlocks } } }) + }) + + logger.info(`Persisted complete message ${messageUpdates.id} for agent session ${sessionId}`, { + status: finalMessage.status, + blockCount: finalBlocks.length + }) + + // Clean up + this.cleanupMessage(messageUpdates.id) } - - // Save updated message and blocks to backend - await window.electron.ipcRenderer.invoke(IpcChannel.AgentMessage_PersistExchange, { - sessionId, - agentSessionId: '', - ...(finalMessage.role === 'user' - ? { user: { payload: { message: finalMessage, blocks: blocksToUpdate } } } - : { assistant: { payload: { message: finalMessage, blocks: blocksToUpdate } } }) - }) - - logger.info(`Updated message and blocks for ${messageUpdates.id} in agent session ${sessionId}`) } catch (error) { logger.error(`Failed to update message and blocks for agent session ${topicId}:`, error as Error) throw error } } - async deleteMessage(topicId: string, messageId: string): Promise { + async deleteMessage(topicId: string, _messageId: string): Promise { // Agent session messages cannot be deleted individually logger.warn(`deleteMessage called for agent session ${topicId}, operation not supported`) @@ -187,7 +373,7 @@ export class AgentMessageDataSource implements MessageDataSource { // 2. Or just hide from UI without actual deletion } - async deleteMessagesByAskId(topicId: string, askId: string): Promise { + async deleteMessagesByAskId(topicId: string, _askId: string): Promise { // Agent session messages cannot be deleted logger.warn(`deleteMessagesByAskId called for agent session ${topicId}, operation not supported`) } diff --git a/src/renderer/src/store/thunk/messageThunk.ts b/src/renderer/src/store/thunk/messageThunk.ts index f51dda46d1..c86823999a 100644 --- a/src/renderer/src/store/thunk/messageThunk.ts +++ b/src/renderer/src/store/thunk/messageThunk.ts @@ -204,7 +204,7 @@ const createAgentMessageStream = async ( export const saveMessageAndBlocksToDB = async (message: Message, blocks: MessageBlock[], messageIndex: number = -1) => { // Use V2 implementation if feature flag is enabled if (featureFlags.USE_UNIFIED_DB_SERVICE) { - return saveMessageAndBlocksToDBV2(message.topicId, message, blocks) + return saveMessageAndBlocksToDBV2(message.topicId, message, blocks, messageIndex) } // Original implementation diff --git a/src/renderer/src/store/thunk/messageThunk.v2.ts b/src/renderer/src/store/thunk/messageThunk.v2.ts index a42b521393..c0c0e38e04 100644 --- a/src/renderer/src/store/thunk/messageThunk.v2.ts +++ b/src/renderer/src/store/thunk/messageThunk.v2.ts @@ -188,15 +188,17 @@ export const clearMessagesFromDBV2 = async (topicId: string): Promise => { export const saveMessageAndBlocksToDBV2 = async ( topicId: string, message: Message, - blocks: MessageBlock[] + blocks: MessageBlock[], + messageIndex: number = -1 ): Promise => { try { - // Direct call without conditional logic - await dbService.appendMessage(topicId, message, blocks) + // Direct call without conditional logic, now with messageIndex + await dbService.appendMessage(topicId, message, blocks, messageIndex) logger.info('Saved message and blocks via DbService', { topicId, messageId: message.id, - blockCount: blocks.length + blockCount: blocks.length, + messageIndex }) } catch (error) { logger.error('Failed to save message and blocks:', { topicId, messageId: message.id, error })