From 15f216b05068791694b0ad541c1524b364707698 Mon Sep 17 00:00:00 2001 From: suyao Date: Mon, 22 Sep 2025 21:46:40 +0800 Subject: [PATCH] Implement agent session message persistence and streaming state management - Add comprehensive solution documentation for status persistence and streaming state - Implement message update functionality in AgentMessageDataSource for agent sessions - Remove redundant persistAgentExchange logic to eliminate duplicate saves - Streamline message persistence flow to use appendMessage and updateMessageAndBlocks consistently --- BACKEND_STATUS_PERSISTENCE_SOLUTION.md | 247 +++++++++++++++++ STREAMING_STATE_SOLUTION.md | 249 ++++++++++++++++++ .../src/services/db/AgentMessageDataSource.ts | 82 +++++- src/renderer/src/store/thunk/messageThunk.ts | 89 +------ 4 files changed, 585 insertions(+), 82 deletions(-) create mode 100644 BACKEND_STATUS_PERSISTENCE_SOLUTION.md create mode 100644 STREAMING_STATE_SOLUTION.md diff --git a/BACKEND_STATUS_PERSISTENCE_SOLUTION.md b/BACKEND_STATUS_PERSISTENCE_SOLUTION.md new file mode 100644 index 0000000000..6c761c8593 --- /dev/null +++ b/BACKEND_STATUS_PERSISTENCE_SOLUTION.md @@ -0,0 +1,247 @@ +# Agent Session 消息状态持久化方案 + +## 问题分析 + +### 当前流程 +1. **发送消息时**: + - 创建助手消息,状态为 `PENDING` + - 通过 `appendMessage` 立即保存到后端(包含pending状态) + +2. **切换会话后重新加载**: + - 从后端加载消息 + - 但状态可能丢失或被覆盖 + +### 根本问题 +后端可能没有正确保存或返回消息的 `status` 字段。 + +## 解决方案:确保状态正确持久化 + +### 方案A:修改 AgentMessageDataSource(前端方案) + +```typescript +// src/renderer/src/services/db/AgentMessageDataSource.ts + +// 1. 保存消息时确保状态被保存 +async appendMessage(topicId: string, message: Message, blocks: MessageBlock[]): Promise { + const sessionId = extractSessionId(topicId) + + const payload: AgentPersistedMessage = { + message: { + ...message, + // 明确保存状态 + status: message.status || AssistantMessageStatus.PENDING + }, + blocks + } + + await window.electron.ipcRenderer.invoke(IpcChannel.AgentMessage_PersistExchange, { + sessionId, + agentSessionId: '', + ...(message.role === 'user' + ? { user: { payload } } + : { assistant: { payload } } + ) + }) +} + +// 2. 加载消息时恢复流式状态 +async fetchMessages(topicId: string): Promise<{ messages: Message[], blocks: MessageBlock[] }> { + const sessionId = extractSessionId(topicId) + const historicalMessages = await window.electron.ipcRenderer.invoke( + IpcChannel.AgentMessage_GetHistory, + { sessionId } + ) + + const messages: Message[] = [] + const blocks: MessageBlock[] = [] + let hasStreamingMessage = false + + for (const persistedMsg of historicalMessages) { + if (persistedMsg?.message) { + const message = persistedMsg.message + + // 检查是否有未完成的消息 + if (message.status === 'pending' || message.status === 'processing') { + hasStreamingMessage = true + + // 如果消息创建时间超过5分钟,标记为错误 + const messageAge = Date.now() - new Date(message.createdAt).getTime() + if (messageAge > 5 * 60 * 1000) { + message.status = 'error' + } + } + + messages.push(message) + if (persistedMsg.blocks) { + blocks.push(...persistedMsg.blocks) + } + } + } + + // 如果有流式消息,恢复loading状态 + if (hasStreamingMessage) { + // 这里需要dispatch action,可能需要通过回调或其他方式 + store.dispatch(newMessagesActions.setTopicLoading({ topicId, loading: true })) + } + + return { messages, blocks } +} +``` + +### 方案B:后端修改(更彻底的方案) + +需要确保后端: + +1. **sessionMessageRepository.ts** 正确保存消息状态 +```typescript +// src/main/services/agents/database/sessionMessageRepository.ts + +async persistExchange(params: PersistExchangeParams): Promise { + // 保存时确保状态字段被正确存储 + if (params.user) { + await this.saveMessage({ + ...params.user.payload.message, + status: params.user.payload.message.status // 确保状态被保存 + }) + } + + if (params.assistant) { + await this.saveMessage({ + ...params.assistant.payload.message, + status: params.assistant.payload.message.status // 确保状态被保存 + }) + } +} + +async getHistory(sessionId: string): Promise { + // 返回时确保状态字段被包含 + const messages = await this.db.getMessages(sessionId) + return messages.map(msg => ({ + message: { + ...msg, + status: msg.status // 确保状态被返回 + }, + blocks: msg.blocks + })) +} +``` + +2. **添加会话级别的流式状态** +```typescript +interface AgentSession { + id: string + // ... 其他字段 + streamingMessageId?: string // 当前正在流式的消息ID + streamingStartTime?: number // 流式开始时间 +} + +// 开始流式时更新 +async startStreaming(sessionId: string, messageId: string) { + await this.updateSession(sessionId, { + streamingMessageId: messageId, + streamingStartTime: Date.now() + }) +} + +// 结束流式时清除 +async stopStreaming(sessionId: string) { + await this.updateSession(sessionId, { + streamingMessageId: null, + streamingStartTime: null + }) +} +``` + +### 方案C:混合方案(推荐) + +1. **前端立即保存状态**(已实现) +2. **后端确保状态持久化** +3. **加载时智能恢复状态** + +```typescript +// AgentMessageDataSource.ts +async fetchMessages(topicId: string): Promise<{ messages: Message[], blocks: MessageBlock[] }> { + const sessionId = extractSessionId(topicId) + const historicalMessages = await window.electron.ipcRenderer.invoke( + IpcChannel.AgentMessage_GetHistory, + { sessionId } + ) + + const messages: Message[] = [] + const blocks: MessageBlock[] = [] + + for (const persistedMsg of historicalMessages) { + if (persistedMsg?.message) { + const message = { ...persistedMsg.message } + + // 智能恢复状态 + if (message.status === 'pending' || message.status === 'processing') { + // 检查消息年龄 + const age = Date.now() - new Date(message.createdAt).getTime() + + if (age > 5 * 60 * 1000) { + // 超过5分钟,标记为错误 + message.status = 'error' + } else if (age > 30 * 1000 && message.blocks?.length > 0) { + // 超过30秒且有内容,可能已完成 + message.status = 'success' + } + // 否则保持原状态,让UI显示暂停按钮 + } + + messages.push(message) + if (persistedMsg.blocks) { + blocks.push(...persistedMsg.blocks) + } + } + } + + return { messages, blocks } +} +``` + +## 实施步骤 + +### 步骤1:验证后端是否保存状态 +1. 在 `appendMessage` 中添加日志,确认状态被发送 +2. 检查后端数据库,确认状态被保存 +3. 在 `fetchMessages` 中添加日志,确认状态被返回 + +### 步骤2:修复状态持久化 +1. 如果后端没有保存状态,修改后端代码 +2. 如果后端保存了但没返回,修改返回逻辑 + +### 步骤3:添加状态恢复逻辑 +1. 在 `fetchMessages` 中智能恢复状态 +2. 对于未完成的消息,根据时间判断是否需要标记为错误 + +### 步骤4:恢复loading状态 +1. 如果有pending/processing消息,设置loading为true +2. 让UI正确显示暂停按钮 + +## 测试验证 + +1. **正常流程** + - 发送消息 + - 观察pending状态 + - 响应完成后状态变为success + +2. **切换会话** + - 发送消息开始响应 + - 立即切换会话 + - 切回来,pending状态应该保持 + - 暂停按钮应该显示 + +3. **页面刷新** + - 响应过程中刷新 + - 重新加载后状态应该合理(pending或error) + +4. **超时处理** + - 模拟长时间pending + - 验证超时后自动标记为error + +## 优势 +- 符合现有架构,数据统一持久化 +- 状态与消息一起保存,数据一致性好 +- 页面刷新也能恢复 +- 不需要额外的状态管理器 \ No newline at end of file diff --git a/STREAMING_STATE_SOLUTION.md b/STREAMING_STATE_SOLUTION.md new file mode 100644 index 0000000000..f8aa87dcfe --- /dev/null +++ b/STREAMING_STATE_SOLUTION.md @@ -0,0 +1,249 @@ +# Agent Session 流式状态保持方案 + +## 问题描述 +Agent会话中发送消息后,如果在响应过程中切换会话: +1. 消息内容不丢失了(已修复)✅ +2. 但是pending/processing状态丢失了 ❌ +3. loading状态丢失了 ❌ +4. 导致无法显示"暂停"按钮,无法中止正在进行的响应 + +## 问题分析 + +### 现状 +```javascript +// AgentSessionInputbar.tsx +const streamingAskIds = useMemo(() => { + // 检查消息的 status === 'processing' || 'pending' + // 切换会话后这些状态丢失了 +}, [topicMessages]) + +const canAbort = loading && streamingAskIds.length > 0 +// loading 状态也丢失了 +``` + +### 根本原因 +1. **消息保存时机问题**: + - 用户消息立即保存(状态为success) + - 助手消息创建时是pending状态 + - 但保存到后端时可能已经是最终状态 + +2. **状态管理问题**: + - loading状态只在Redux中,不持久化 + - 切换会话时Redux被清空 + - 重新加载时无法知道是否有正在进行的响应 + +## 解决方案 + +### 方案一:全局流式状态管理器(推荐)✅ + +创建一个全局的流式状态管理器,独立于Redux,跨会话保持状态。 + +```typescript +// src/renderer/src/services/StreamingStateManager.ts +class StreamingStateManager { + // 记录正在进行的流式响应 + private streamingSessions = new Map() + + startStreaming(topicId: string, askId: string, assistantMessageId: string, agentSession?: any) { + this.streamingSessions.set(topicId, { + topicId, + askId, + assistantMessageId, + startTime: Date.now(), + agentSession + }) + } + + stopStreaming(topicId: string) { + this.streamingSessions.delete(topicId) + } + + isStreaming(topicId: string): boolean { + return this.streamingSessions.has(topicId) + } + + getStreamingInfo(topicId: string) { + return this.streamingSessions.get(topicId) + } + + // 获取所有正在流式的会话 + getAllStreaming() { + return Array.from(this.streamingSessions.values()) + } + + // 清理超时的流式状态(防止内存泄漏) + cleanupStale(maxAge = 5 * 60 * 1000) { // 5分钟 + const now = Date.now() + for (const [topicId, info] of this.streamingSessions) { + if (now - info.startTime > maxAge) { + this.streamingSessions.delete(topicId) + } + } + } +} + +export const streamingStateManager = new StreamingStateManager() +``` + +**集成点**: + +1. **开始流式时**: +```typescript +// messageThunk.ts - fetchAndProcessAgentResponseImpl +streamingStateManager.startStreaming( + topicId, + userMessageId, + assistantMessage.id, + agentSession +) +``` + +2. **结束流式时**: +```typescript +// callbacks.ts - onComplete +streamingStateManager.stopStreaming(topicId) +``` + +3. **UI使用**: +```typescript +// AgentSessionInputbar.tsx +const isStreaming = streamingStateManager.isStreaming(sessionTopicId) +const streamingInfo = streamingStateManager.getStreamingInfo(sessionTopicId) + +const canAbort = isStreaming && streamingInfo?.askId +``` + +### 方案二:增强消息持久化(备选) + +修改消息保存逻辑,保留流式状态: + +```typescript +// AgentMessageDataSource.ts +async appendMessage(topicId: string, message: Message, blocks: MessageBlock[]) { + // 保存时保留 pending/processing 状态 + const messageToSave = { + ...message, + // 如果是助手消息且状态是pending,保持这个状态 + status: message.status === 'pending' ? 'pending' : message.status + } + + // ... 保存逻辑 +} + +// 加载时恢复状态 +async fetchMessages(topicId: string) { + const { messages, blocks } = // ... 从后端加载 + + // 检查是否有未完成的消息 + for (const msg of messages) { + if (msg.status === 'pending' || msg.status === 'processing') { + // 恢复loading状态 + dispatch(newMessagesActions.setTopicLoading({ topicId, loading: true })) + + // 可能需要重新启动流式处理或标记为失败 + } + } +} +``` + +### 方案三:Session级别状态存储(简单但有限) + +在localStorage或sessionStorage中保存流式状态: + +```typescript +// 保存流式状态 +const saveStreamingState = (topicId: string, state: any) => { + const states = JSON.parse(localStorage.getItem('streamingStates') || '{}') + states[topicId] = { + ...state, + timestamp: Date.now() + } + localStorage.setItem('streamingStates', JSON.stringify(states)) +} + +// 恢复流式状态 +const getStreamingState = (topicId: string) => { + const states = JSON.parse(localStorage.getItem('streamingStates') || '{}') + const state = states[topicId] + + // 检查是否过期(比如超过5分钟) + if (state && Date.now() - state.timestamp < 5 * 60 * 1000) { + return state + } + + // 清理过期状态 + delete states[topicId] + localStorage.setItem('streamingStates', JSON.stringify(states)) + return null +} +``` + +## 推荐实施步骤 + +### 步骤1:实现StreamingStateManager +1. 创建全局状态管理器 +2. 在开始/结束流式时更新状态 +3. 添加定期清理机制 + +### 步骤2:更新messageThunk.ts +1. 在`fetchAndProcessAgentResponseImpl`开始时注册流式状态 +2. 在完成/错误/中止时清除状态 +3. 确保所有退出路径都清理状态 + +### 步骤3:更新UI组件 +1. 修改`AgentSessionInputbar.tsx`使用StreamingStateManager +2. 不再依赖消息的status字段判断流式状态 +3. 使用全局状态判断是否显示暂停按钮 + +### 步骤4:处理边界情况 +1. 页面刷新时的状态恢复 +2. 网络中断的处理 +3. 超时自动清理 + +## 测试验证 + +### 测试场景 +1. **正常流式**: + - 发送消息 + - 观察流式响应 + - 验证暂停按钮显示 + +2. **切换会话**: + - 发送消息开始流式 + - 立即切换到其他会话 + - 切回来验证暂停按钮仍然显示 + - 可以正确暂停 + +3. **刷新页面**: + - 流式过程中刷新 + - 验证状态是否合理处理(显示失败或继续) + +4. **超时清理**: + - 模拟长时间流式 + - 验证超时后状态被清理 + +## 优势对比 + +| 方案 | 优点 | 缺点 | +|------|------|------| +| 全局状态管理器 | • 简单可靠
• 跨会话工作
• 易于调试 | • 需要额外内存
• 页面刷新丢失 | +| 增强持久化 | • 数据一致性好
• 页面刷新可恢复 | • 实现复杂
• 需要后端配合 | +| Session存储 | • 实现简单
• 可跨页面刷新 | • 容量限制
• 需要清理逻辑 | + +## 建议 +推荐使用**方案一:全局流式状态管理器**,因为: +1. 实现简单,不需要修改后端 +2. 可以快速解决当前问题 +3. 易于扩展和维护 +4. 对现有代码改动最小 + +如果需要页面刷新后也能恢复状态,可以结合方案三,将关键信息保存到localStorage。 \ No newline at end of file diff --git a/src/renderer/src/services/db/AgentMessageDataSource.ts b/src/renderer/src/services/db/AgentMessageDataSource.ts index 12f8ea4b72..6c02be955d 100644 --- a/src/renderer/src/services/db/AgentMessageDataSource.ts +++ b/src/renderer/src/services/db/AgentMessageDataSource.ts @@ -138,12 +138,41 @@ export class AgentMessageDataSource implements MessageDataSource { } async updateMessage(topicId: string, messageId: string, updates: Partial): Promise { - // Agent session messages are immutable once persisted - logger.warn(`updateMessage called for agent session ${topicId}, operation not supported`) + const sessionId = extractSessionId(topicId) + if (!sessionId) { + throw new Error(`Invalid agent session topicId: ${topicId}`) + } - // In a full implementation, you might want to: - // 1. Update in Redux only for UI consistency - // 2. Or implement a backend endpoint for message updates + try { + // Fetch current message from backend to merge updates + const historicalMessages: AgentPersistedMessage[] = await window.electron.ipcRenderer.invoke( + IpcChannel.AgentMessage_GetHistory, + { sessionId } + ) + + const existingMessage = historicalMessages?.find((pm) => pm.message?.id === messageId) + if (!existingMessage?.message) { + logger.warn(`Message ${messageId} not found in agent session ${sessionId}`) + return + } + + // Merge updates with existing message + const updatedMessage = { ...existingMessage.message, ...updates } + + // Save updated message back to backend + await window.electron.ipcRenderer.invoke(IpcChannel.AgentMessage_PersistExchange, { + sessionId, + agentSessionId: '', + ...(updatedMessage.role === 'user' + ? { user: { payload: { message: updatedMessage, blocks: existingMessage.blocks || [] } } } + : { assistant: { payload: { message: updatedMessage, blocks: existingMessage.blocks || [] } } }) + }) + + logger.info(`Updated message ${messageId} in agent session ${sessionId}`) + } catch (error) { + logger.error(`Failed to update message ${messageId} in agent session ${topicId}:`, error as Error) + throw error + } } async updateMessageAndBlocks( @@ -151,8 +180,47 @@ export class AgentMessageDataSource implements MessageDataSource { messageUpdates: Partial & Pick, blocksToUpdate: MessageBlock[] ): Promise { - // Agent session messages and blocks are immutable once persisted - logger.warn(`updateMessageAndBlocks called for agent session ${topicId}, operation not supported`) + const sessionId = extractSessionId(topicId) + if (!sessionId) { + throw new Error(`Invalid agent session topicId: ${topicId}`) + } + + try { + // Fetch current message from backend if we need to merge + const historicalMessages: AgentPersistedMessage[] = await window.electron.ipcRenderer.invoke( + IpcChannel.AgentMessage_GetHistory, + { sessionId } + ) + + const existingMessage = historicalMessages?.find((pm) => pm.message?.id === messageUpdates.id) + let finalMessage: Message + + if (existingMessage?.message) { + // Merge updates with existing message + finalMessage = { ...existingMessage.message, ...messageUpdates } + } else { + // New message, ensure we have required fields + if (!messageUpdates.topicId || !messageUpdates.role) { + logger.warn(`Incomplete message data for ${messageUpdates.id}`) + return + } + finalMessage = messageUpdates as Message + } + + // Save updated message and blocks to backend + await window.electron.ipcRenderer.invoke(IpcChannel.AgentMessage_PersistExchange, { + sessionId, + agentSessionId: '', + ...(finalMessage.role === 'user' + ? { user: { payload: { message: finalMessage, blocks: blocksToUpdate } } } + : { assistant: { payload: { message: finalMessage, blocks: blocksToUpdate } } }) + }) + + logger.info(`Updated message and blocks for ${messageUpdates.id} in agent session ${sessionId}`) + } catch (error) { + logger.error(`Failed to update message and blocks for agent session ${topicId}:`, error as Error) + throw error + } } async deleteMessage(topicId: string, messageId: string): Promise { diff --git a/src/renderer/src/store/thunk/messageThunk.ts b/src/renderer/src/store/thunk/messageThunk.ts index 76df28625d..f51dda46d1 100644 --- a/src/renderer/src/store/thunk/messageThunk.ts +++ b/src/renderer/src/store/thunk/messageThunk.ts @@ -11,7 +11,6 @@ import { createStreamProcessor, type StreamProcessorCallbacks } from '@renderer/ import store from '@renderer/store' import { updateTopicUpdatedAt } from '@renderer/store/assistants' import { type ApiServerConfig, type Assistant, type FileMetadata, type Model, type Topic } from '@renderer/types' -import type { AgentPersistedMessage } from '@renderer/types/agent' import { ChunkType } from '@renderer/types/chunk' import type { FileMessageBlock, ImageMessageBlock, Message, MessageBlock } from '@renderer/types/newMessage' import { AssistantMessageStatus, MessageBlockStatus, MessageBlockType } from '@renderer/types/newMessage' @@ -504,13 +503,17 @@ const fetchAndProcessAgentResponseImpl = async ( text: Promise.resolve('') }) - await persistAgentExchange({ - getState, - agentSession, - userMessageId, - assistantMessageId: assistantMessage.id, - latestAgentSessionId - }) + // No longer need persistAgentExchange here since: + // 1. User message is already saved via appendMessage when created + // 2. Assistant message is saved via appendMessage when created + // 3. Updates during streaming are saved via updateMessageAndBlocks + // This eliminates the duplicate save issue + + // Only persist the agentSessionId update if it changed + if (latestAgentSessionId) { + logger.info(`Agent session ID updated to: ${latestAgentSessionId}`) + // In the future, you might want to update some session metadata here + } } catch (error: any) { logger.error('Error in fetchAndProcessAgentResponseImpl:', error) try { @@ -523,73 +526,9 @@ const fetchAndProcessAgentResponseImpl = async ( } } -interface PersistAgentExchangeParams { - getState: () => RootState - agentSession: AgentSessionContext - userMessageId: string - assistantMessageId: string - latestAgentSessionId: string -} - -const persistAgentExchange = async ({ - getState, - agentSession, - userMessageId, - assistantMessageId, - latestAgentSessionId -}: PersistAgentExchangeParams) => { - if (!window.electron?.ipcRenderer) { - return - } - - try { - const state = getState() - const userMessage = state.messages.entities[userMessageId] - const assistantMessage = state.messages.entities[assistantMessageId] - - if (!userMessage || !assistantMessage) { - logger.warn('persistAgentExchange: missing user or assistant message entity') - return - } - - const userPersistedPayload = createPersistedMessagePayload(userMessage, state) - const assistantPersistedPayload = createPersistedMessagePayload(assistantMessage, state) - - await window.electron.ipcRenderer.invoke(IpcChannel.AgentMessage_PersistExchange, { - sessionId: agentSession.sessionId, - agentSessionId: latestAgentSessionId || '', - user: userPersistedPayload ? { payload: userPersistedPayload } : undefined, - assistant: assistantPersistedPayload ? { payload: assistantPersistedPayload } : undefined - }) - } catch (error) { - logger.warn('Failed to persist agent exchange', error as Error) - } -} - -const createPersistedMessagePayload = ( - message: Message | undefined, - state: RootState -): AgentPersistedMessage | undefined => { - if (!message) { - return undefined - } - - try { - const clonedMessage = JSON.parse(JSON.stringify(message)) as Message - const blockEntities = (message.blocks || []) - .map((blockId) => state.messageBlocks.entities[blockId]) - .filter((block): block is MessageBlock => Boolean(block)) - .map((block) => JSON.parse(JSON.stringify(block)) as MessageBlock) - - return { - message: clonedMessage, - blocks: blockEntities - } - } catch (error) { - logger.warn('Failed to build persisted payload for message', error as Error) - return undefined - } -} +// Removed persistAgentExchange and createPersistedMessagePayload functions +// These are no longer needed since messages are saved immediately via appendMessage +// and updated during streaming via updateMessageAndBlocks // --- Helper Function for Multi-Model Dispatch --- // 多模型创建和发送请求的逻辑,用于用户消息多模型发送和重发