Optimize agent message streaming with throttled persistence

- Prevent unnecessary message reloads by checking existing messages before loading session messages
- Implement LRU cache and throttled persistence for streaming agent messages to reduce backend load
- Add streaming state detection and proper cleanup for complete messages to improve performance
This commit is contained in:
suyao 2025-09-22 22:29:03 +08:00
parent 8645fe4ab1
commit e5aa58722c
No known key found for this signature in database
4 changed files with 238 additions and 45 deletions

View File

@ -27,13 +27,18 @@ const AgentSessionMessages: React.FC<Props> = ({ agentId, sessionId }) => {
const sessionTopicId = useMemo(() => buildAgentSessionTopicId(sessionId), [sessionId])
const messages = useAppSelector((state) => selectMessagesForTopic(state, sessionTopicId))
// Load messages when session changes
// Load messages when session changes or when messages are empty
useEffect(() => {
if (sessionId) {
logger.info('Loading messages for agent session', { sessionId })
dispatch(loadTopicMessagesThunk(sessionTopicId, true)) // Force reload to get latest from backend
// Only load if we don't have messages yet
// This prevents overwriting messages that were just added
const hasMessages = messages && messages.length > 0
if (!hasMessages) {
logger.info('Loading messages for agent session', { sessionId })
dispatch(loadTopicMessagesThunk(sessionTopicId, false)) // Don't force reload if we have messages in Redux
}
}
}, [dispatch, sessionId, sessionTopicId])
}, [dispatch, sessionId, sessionTopicId, messages?.length])
const displayMessages = useMemo(() => {
if (!messages || messages.length === 0) return []

View File

@ -2,17 +2,104 @@ import { loggerService } from '@logger'
import type { AgentPersistedMessage } from '@renderer/types/agent'
import type { Message, MessageBlock } from '@renderer/types/newMessage'
import { IpcChannel } from '@shared/IpcChannel'
import { throttle } from 'lodash'
import { LRUCache } from 'lru-cache'
import type { MessageDataSource } from './types'
import { extractSessionId } from './types'
const logger = loggerService.withContext('AgentMessageDataSource')
/**
* Streaming message cache to track messages being streamed
* Key: messageId, Value: { message, blocks, isComplete }
*/
const streamingMessageCache = new LRUCache<
string,
{
message: Message
blocks: MessageBlock[]
isComplete: boolean
sessionId: string
}
>({
max: 100,
ttl: 1000 * 60 * 5 // 5 minutes
})
/**
* Throttled persisters for each message to batch updates during streaming
*/
const messagePersistThrottlers = new LRUCache<string, ReturnType<typeof throttle>>({
max: 100,
ttl: 1000 * 60 * 5
})
/**
* IPC-based implementation of MessageDataSource
* Handles agent session messages through backend communication
*/
export class AgentMessageDataSource implements MessageDataSource {
// ============ Helper Methods ============
/**
* Get or create a throttled persister for a message
*/
private getMessagePersister(messageId: string): ReturnType<typeof throttle> {
if (!messagePersistThrottlers.has(messageId)) {
const persister = throttle(async () => {
const cached = streamingMessageCache.get(messageId)
if (!cached) return
const { message, blocks, sessionId, isComplete } = cached
try {
// Persist to backend
await window.electron.ipcRenderer.invoke(IpcChannel.AgentMessage_PersistExchange, {
sessionId,
agentSessionId: '',
...(message.role === 'user'
? { user: { payload: { message, blocks } } }
: { assistant: { payload: { message, blocks } } })
})
logger.debug(`Persisted ${isComplete ? 'complete' : 'streaming'} message ${messageId} to backend`)
// Clean up if complete
if (isComplete) {
streamingMessageCache.delete(messageId)
messagePersistThrottlers.delete(messageId)
}
} catch (error) {
logger.error(`Failed to persist message ${messageId}:`, error as Error)
}
}, 500) // Throttle to 500ms for agent messages (less frequent than chat)
messagePersistThrottlers.set(messageId, persister)
}
return messagePersistThrottlers.get(messageId)!
}
/**
* Check if a message is in streaming state based on status
*/
private isMessageStreaming(message: Partial<Message>): boolean {
return message.status?.includes('ing') ?? false
}
/**
* Clean up resources for a message
*/
private cleanupMessage(messageId: string): void {
streamingMessageCache.delete(messageId)
const throttler = messagePersistThrottlers.get(messageId)
if (throttler) {
throttler.cancel()
messagePersistThrottlers.delete(messageId)
}
}
// ============ Read Operations ============
async fetchMessages(topicId: string): Promise<{
@ -60,32 +147,50 @@ export class AgentMessageDataSource implements MessageDataSource {
// ============ Write Operations ============
async appendMessage(topicId: string, message: Message, blocks: MessageBlock[], _insertIndex?: number): Promise<void> {
// For agent sessions, we need to save messages immediately
// Don't wait for persistExchange which happens after response completion
const sessionId = extractSessionId(topicId)
if (!sessionId) {
throw new Error(`Invalid agent session topicId: ${topicId}`)
}
try {
// Create a persisted message payload
const isStreaming = this.isMessageStreaming(message)
// Always persist immediately for visibility in UI
const payload: AgentPersistedMessage = {
message,
blocks
}
// Save single message immediately to backend
// Use persistExchange with only one side of the conversation
await window.electron.ipcRenderer.invoke(IpcChannel.AgentMessage_PersistExchange, {
sessionId,
agentSessionId: '', // Will be set later if needed
agentSessionId: '',
...(message.role === 'user' ? { user: { payload } } : { assistant: { payload } })
})
logger.info(`Saved ${message.role} message for agent session ${sessionId}`, {
messageId: message.id,
blockCount: blocks.length
blockCount: blocks.length,
status: message.status,
isStreaming
})
// If streaming, also set up cache for throttled updates
if (isStreaming && message.role === 'assistant') {
streamingMessageCache.set(message.id, {
message,
blocks,
isComplete: false,
sessionId
})
// Set up throttled persister for future updates
this.getMessagePersister(message.id)
logger.debug(`Set up streaming cache for message ${message.id}`)
} else {
// Clean up any streaming cache for non-streaming messages
this.cleanupMessage(message.id)
}
} catch (error) {
logger.error(`Failed to save message for agent session ${topicId}:`, error as Error)
throw error
@ -141,44 +246,125 @@ export class AgentMessageDataSource implements MessageDataSource {
}
try {
// Fetch current message from backend if we need to merge
const historicalMessages: AgentPersistedMessage[] = await window.electron.ipcRenderer.invoke(
IpcChannel.AgentMessage_GetHistory,
{ sessionId }
)
const isStreaming = this.isMessageStreaming(messageUpdates)
const existingMessage = historicalMessages?.find((pm) => pm.message?.id === messageUpdates.id)
let finalMessage: Message
// Check if we have cached data for this message
const cached = streamingMessageCache.get(messageUpdates.id)
if (existingMessage?.message) {
// Merge updates with existing message
finalMessage = { ...existingMessage.message, ...messageUpdates }
} else {
// New message, ensure we have required fields
if (!messageUpdates.topicId || !messageUpdates.role) {
logger.warn(`Incomplete message data for ${messageUpdates.id}`)
return
if (isStreaming) {
// During streaming, update cache and trigger throttled persist
let currentMessage: Message
let currentBlocks: MessageBlock[]
if (cached) {
// Update existing cached message
currentMessage = { ...cached.message, ...messageUpdates }
// Merge blocks - use new blocks if provided, otherwise keep cached
currentBlocks = blocksToUpdate.length > 0 ? blocksToUpdate : cached.blocks
} else {
// First streaming update - fetch from backend or create new
const historicalMessages: AgentPersistedMessage[] = await window.electron.ipcRenderer.invoke(
IpcChannel.AgentMessage_GetHistory,
{ sessionId }
)
const existingMessage = historicalMessages?.find((pm) => pm.message?.id === messageUpdates.id)
if (existingMessage?.message) {
currentMessage = { ...existingMessage.message, ...messageUpdates }
currentBlocks = blocksToUpdate.length > 0 ? blocksToUpdate : existingMessage.blocks || []
} else {
// New message
if (!messageUpdates.topicId || !messageUpdates.role) {
logger.warn(`Incomplete message data for streaming message ${messageUpdates.id}`)
return
}
currentMessage = messageUpdates as Message
currentBlocks = blocksToUpdate
}
}
finalMessage = messageUpdates as Message
// Update cache
streamingMessageCache.set(messageUpdates.id, {
message: currentMessage,
blocks: currentBlocks,
isComplete: false,
sessionId
})
// Trigger throttled persist
const persister = this.getMessagePersister(messageUpdates.id)
persister()
logger.debug(`Updated streaming cache for message ${messageUpdates.id}`, {
status: messageUpdates.status,
blockCount: currentBlocks.length
})
} else {
// Not streaming - persist immediately
let finalMessage: Message
let finalBlocks: MessageBlock[]
if (cached) {
// Use cached data as base
finalMessage = { ...cached.message, ...messageUpdates }
finalBlocks = blocksToUpdate.length > 0 ? blocksToUpdate : cached.blocks
} else {
// Fetch from backend if no cache
const historicalMessages: AgentPersistedMessage[] = await window.electron.ipcRenderer.invoke(
IpcChannel.AgentMessage_GetHistory,
{ sessionId }
)
const existingMessage = historicalMessages?.find((pm) => pm.message?.id === messageUpdates.id)
if (existingMessage?.message) {
finalMessage = { ...existingMessage.message, ...messageUpdates }
finalBlocks = blocksToUpdate.length > 0 ? blocksToUpdate : existingMessage.blocks || []
} else {
if (!messageUpdates.topicId || !messageUpdates.role) {
logger.warn(`Incomplete message data for ${messageUpdates.id}`)
return
}
finalMessage = messageUpdates as Message
finalBlocks = blocksToUpdate
}
}
// Mark as complete in cache if it was streaming
if (cached) {
streamingMessageCache.set(messageUpdates.id, {
message: finalMessage,
blocks: finalBlocks,
isComplete: true,
sessionId
})
}
// Persist to backend
await window.electron.ipcRenderer.invoke(IpcChannel.AgentMessage_PersistExchange, {
sessionId,
agentSessionId: '',
...(finalMessage.role === 'user'
? { user: { payload: { message: finalMessage, blocks: finalBlocks } } }
: { assistant: { payload: { message: finalMessage, blocks: finalBlocks } } })
})
logger.info(`Persisted complete message ${messageUpdates.id} for agent session ${sessionId}`, {
status: finalMessage.status,
blockCount: finalBlocks.length
})
// Clean up
this.cleanupMessage(messageUpdates.id)
}
// Save updated message and blocks to backend
await window.electron.ipcRenderer.invoke(IpcChannel.AgentMessage_PersistExchange, {
sessionId,
agentSessionId: '',
...(finalMessage.role === 'user'
? { user: { payload: { message: finalMessage, blocks: blocksToUpdate } } }
: { assistant: { payload: { message: finalMessage, blocks: blocksToUpdate } } })
})
logger.info(`Updated message and blocks for ${messageUpdates.id} in agent session ${sessionId}`)
} catch (error) {
logger.error(`Failed to update message and blocks for agent session ${topicId}:`, error as Error)
throw error
}
}
async deleteMessage(topicId: string, messageId: string): Promise<void> {
async deleteMessage(topicId: string, _messageId: string): Promise<void> {
// Agent session messages cannot be deleted individually
logger.warn(`deleteMessage called for agent session ${topicId}, operation not supported`)
@ -187,7 +373,7 @@ export class AgentMessageDataSource implements MessageDataSource {
// 2. Or just hide from UI without actual deletion
}
async deleteMessagesByAskId(topicId: string, askId: string): Promise<void> {
async deleteMessagesByAskId(topicId: string, _askId: string): Promise<void> {
// Agent session messages cannot be deleted
logger.warn(`deleteMessagesByAskId called for agent session ${topicId}, operation not supported`)
}

View File

@ -204,7 +204,7 @@ const createAgentMessageStream = async (
export const saveMessageAndBlocksToDB = async (message: Message, blocks: MessageBlock[], messageIndex: number = -1) => {
// Use V2 implementation if feature flag is enabled
if (featureFlags.USE_UNIFIED_DB_SERVICE) {
return saveMessageAndBlocksToDBV2(message.topicId, message, blocks)
return saveMessageAndBlocksToDBV2(message.topicId, message, blocks, messageIndex)
}
// Original implementation

View File

@ -188,15 +188,17 @@ export const clearMessagesFromDBV2 = async (topicId: string): Promise<void> => {
export const saveMessageAndBlocksToDBV2 = async (
topicId: string,
message: Message,
blocks: MessageBlock[]
blocks: MessageBlock[],
messageIndex: number = -1
): Promise<void> => {
try {
// Direct call without conditional logic
await dbService.appendMessage(topicId, message, blocks)
// Direct call without conditional logic, now with messageIndex
await dbService.appendMessage(topicId, message, blocks, messageIndex)
logger.info('Saved message and blocks via DbService', {
topicId,
messageId: message.id,
blockCount: blocks.length
blockCount: blocks.length,
messageIndex
})
} catch (error) {
logger.error('Failed to save message and blocks:', { topicId, messageId: message.id, error })