mirror of
https://github.com/CherryHQ/cherry-studio.git
synced 2026-01-06 13:19:33 +08:00
Implement agent session message persistence and streaming state management
- Add comprehensive solution documentation for status persistence and streaming state - Implement message update functionality in AgentMessageDataSource for agent sessions - Remove redundant persistAgentExchange logic to eliminate duplicate saves - Streamline message persistence flow to use appendMessage and updateMessageAndBlocks consistently
This commit is contained in:
parent
b4df5bbb13
commit
15f216b050
247
BACKEND_STATUS_PERSISTENCE_SOLUTION.md
Normal file
247
BACKEND_STATUS_PERSISTENCE_SOLUTION.md
Normal file
@ -0,0 +1,247 @@
|
||||
# Agent Session 消息状态持久化方案
|
||||
|
||||
## 问题分析
|
||||
|
||||
### 当前流程
|
||||
1. **发送消息时**:
|
||||
- 创建助手消息,状态为 `PENDING`
|
||||
- 通过 `appendMessage` 立即保存到后端(包含pending状态)
|
||||
|
||||
2. **切换会话后重新加载**:
|
||||
- 从后端加载消息
|
||||
- 但状态可能丢失或被覆盖
|
||||
|
||||
### 根本问题
|
||||
后端可能没有正确保存或返回消息的 `status` 字段。
|
||||
|
||||
## 解决方案:确保状态正确持久化
|
||||
|
||||
### 方案A:修改 AgentMessageDataSource(前端方案)
|
||||
|
||||
```typescript
|
||||
// src/renderer/src/services/db/AgentMessageDataSource.ts
|
||||
|
||||
// 1. 保存消息时确保状态被保存
|
||||
async appendMessage(topicId: string, message: Message, blocks: MessageBlock[]): Promise<void> {
|
||||
const sessionId = extractSessionId(topicId)
|
||||
|
||||
const payload: AgentPersistedMessage = {
|
||||
message: {
|
||||
...message,
|
||||
// 明确保存状态
|
||||
status: message.status || AssistantMessageStatus.PENDING
|
||||
},
|
||||
blocks
|
||||
}
|
||||
|
||||
await window.electron.ipcRenderer.invoke(IpcChannel.AgentMessage_PersistExchange, {
|
||||
sessionId,
|
||||
agentSessionId: '',
|
||||
...(message.role === 'user'
|
||||
? { user: { payload } }
|
||||
: { assistant: { payload } }
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
// 2. 加载消息时恢复流式状态
|
||||
async fetchMessages(topicId: string): Promise<{ messages: Message[], blocks: MessageBlock[] }> {
|
||||
const sessionId = extractSessionId(topicId)
|
||||
const historicalMessages = await window.electron.ipcRenderer.invoke(
|
||||
IpcChannel.AgentMessage_GetHistory,
|
||||
{ sessionId }
|
||||
)
|
||||
|
||||
const messages: Message[] = []
|
||||
const blocks: MessageBlock[] = []
|
||||
let hasStreamingMessage = false
|
||||
|
||||
for (const persistedMsg of historicalMessages) {
|
||||
if (persistedMsg?.message) {
|
||||
const message = persistedMsg.message
|
||||
|
||||
// 检查是否有未完成的消息
|
||||
if (message.status === 'pending' || message.status === 'processing') {
|
||||
hasStreamingMessage = true
|
||||
|
||||
// 如果消息创建时间超过5分钟,标记为错误
|
||||
const messageAge = Date.now() - new Date(message.createdAt).getTime()
|
||||
if (messageAge > 5 * 60 * 1000) {
|
||||
message.status = 'error'
|
||||
}
|
||||
}
|
||||
|
||||
messages.push(message)
|
||||
if (persistedMsg.blocks) {
|
||||
blocks.push(...persistedMsg.blocks)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 如果有流式消息,恢复loading状态
|
||||
if (hasStreamingMessage) {
|
||||
// 这里需要dispatch action,可能需要通过回调或其他方式
|
||||
store.dispatch(newMessagesActions.setTopicLoading({ topicId, loading: true }))
|
||||
}
|
||||
|
||||
return { messages, blocks }
|
||||
}
|
||||
```
|
||||
|
||||
### 方案B:后端修改(更彻底的方案)
|
||||
|
||||
需要确保后端:
|
||||
|
||||
1. **sessionMessageRepository.ts** 正确保存消息状态
|
||||
```typescript
|
||||
// src/main/services/agents/database/sessionMessageRepository.ts
|
||||
|
||||
async persistExchange(params: PersistExchangeParams): Promise<void> {
|
||||
// 保存时确保状态字段被正确存储
|
||||
if (params.user) {
|
||||
await this.saveMessage({
|
||||
...params.user.payload.message,
|
||||
status: params.user.payload.message.status // 确保状态被保存
|
||||
})
|
||||
}
|
||||
|
||||
if (params.assistant) {
|
||||
await this.saveMessage({
|
||||
...params.assistant.payload.message,
|
||||
status: params.assistant.payload.message.status // 确保状态被保存
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
async getHistory(sessionId: string): Promise<AgentPersistedMessage[]> {
|
||||
// 返回时确保状态字段被包含
|
||||
const messages = await this.db.getMessages(sessionId)
|
||||
return messages.map(msg => ({
|
||||
message: {
|
||||
...msg,
|
||||
status: msg.status // 确保状态被返回
|
||||
},
|
||||
blocks: msg.blocks
|
||||
}))
|
||||
}
|
||||
```
|
||||
|
||||
2. **添加会话级别的流式状态**
|
||||
```typescript
|
||||
interface AgentSession {
|
||||
id: string
|
||||
// ... 其他字段
|
||||
streamingMessageId?: string // 当前正在流式的消息ID
|
||||
streamingStartTime?: number // 流式开始时间
|
||||
}
|
||||
|
||||
// 开始流式时更新
|
||||
async startStreaming(sessionId: string, messageId: string) {
|
||||
await this.updateSession(sessionId, {
|
||||
streamingMessageId: messageId,
|
||||
streamingStartTime: Date.now()
|
||||
})
|
||||
}
|
||||
|
||||
// 结束流式时清除
|
||||
async stopStreaming(sessionId: string) {
|
||||
await this.updateSession(sessionId, {
|
||||
streamingMessageId: null,
|
||||
streamingStartTime: null
|
||||
})
|
||||
}
|
||||
```
|
||||
|
||||
### 方案C:混合方案(推荐)
|
||||
|
||||
1. **前端立即保存状态**(已实现)
|
||||
2. **后端确保状态持久化**
|
||||
3. **加载时智能恢复状态**
|
||||
|
||||
```typescript
|
||||
// AgentMessageDataSource.ts
|
||||
async fetchMessages(topicId: string): Promise<{ messages: Message[], blocks: MessageBlock[] }> {
|
||||
const sessionId = extractSessionId(topicId)
|
||||
const historicalMessages = await window.electron.ipcRenderer.invoke(
|
||||
IpcChannel.AgentMessage_GetHistory,
|
||||
{ sessionId }
|
||||
)
|
||||
|
||||
const messages: Message[] = []
|
||||
const blocks: MessageBlock[] = []
|
||||
|
||||
for (const persistedMsg of historicalMessages) {
|
||||
if (persistedMsg?.message) {
|
||||
const message = { ...persistedMsg.message }
|
||||
|
||||
// 智能恢复状态
|
||||
if (message.status === 'pending' || message.status === 'processing') {
|
||||
// 检查消息年龄
|
||||
const age = Date.now() - new Date(message.createdAt).getTime()
|
||||
|
||||
if (age > 5 * 60 * 1000) {
|
||||
// 超过5分钟,标记为错误
|
||||
message.status = 'error'
|
||||
} else if (age > 30 * 1000 && message.blocks?.length > 0) {
|
||||
// 超过30秒且有内容,可能已完成
|
||||
message.status = 'success'
|
||||
}
|
||||
// 否则保持原状态,让UI显示暂停按钮
|
||||
}
|
||||
|
||||
messages.push(message)
|
||||
if (persistedMsg.blocks) {
|
||||
blocks.push(...persistedMsg.blocks)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return { messages, blocks }
|
||||
}
|
||||
```
|
||||
|
||||
## 实施步骤
|
||||
|
||||
### 步骤1:验证后端是否保存状态
|
||||
1. 在 `appendMessage` 中添加日志,确认状态被发送
|
||||
2. 检查后端数据库,确认状态被保存
|
||||
3. 在 `fetchMessages` 中添加日志,确认状态被返回
|
||||
|
||||
### 步骤2:修复状态持久化
|
||||
1. 如果后端没有保存状态,修改后端代码
|
||||
2. 如果后端保存了但没返回,修改返回逻辑
|
||||
|
||||
### 步骤3:添加状态恢复逻辑
|
||||
1. 在 `fetchMessages` 中智能恢复状态
|
||||
2. 对于未完成的消息,根据时间判断是否需要标记为错误
|
||||
|
||||
### 步骤4:恢复loading状态
|
||||
1. 如果有pending/processing消息,设置loading为true
|
||||
2. 让UI正确显示暂停按钮
|
||||
|
||||
## 测试验证
|
||||
|
||||
1. **正常流程**
|
||||
- 发送消息
|
||||
- 观察pending状态
|
||||
- 响应完成后状态变为success
|
||||
|
||||
2. **切换会话**
|
||||
- 发送消息开始响应
|
||||
- 立即切换会话
|
||||
- 切回来,pending状态应该保持
|
||||
- 暂停按钮应该显示
|
||||
|
||||
3. **页面刷新**
|
||||
- 响应过程中刷新
|
||||
- 重新加载后状态应该合理(pending或error)
|
||||
|
||||
4. **超时处理**
|
||||
- 模拟长时间pending
|
||||
- 验证超时后自动标记为error
|
||||
|
||||
## 优势
|
||||
- 符合现有架构,数据统一持久化
|
||||
- 状态与消息一起保存,数据一致性好
|
||||
- 页面刷新也能恢复
|
||||
- 不需要额外的状态管理器
|
||||
249
STREAMING_STATE_SOLUTION.md
Normal file
249
STREAMING_STATE_SOLUTION.md
Normal file
@ -0,0 +1,249 @@
|
||||
# Agent Session 流式状态保持方案
|
||||
|
||||
## 问题描述
|
||||
Agent会话中发送消息后,如果在响应过程中切换会话:
|
||||
1. 消息内容不丢失了(已修复)✅
|
||||
2. 但是pending/processing状态丢失了 ❌
|
||||
3. loading状态丢失了 ❌
|
||||
4. 导致无法显示"暂停"按钮,无法中止正在进行的响应
|
||||
|
||||
## 问题分析
|
||||
|
||||
### 现状
|
||||
```javascript
|
||||
// AgentSessionInputbar.tsx
|
||||
const streamingAskIds = useMemo(() => {
|
||||
// 检查消息的 status === 'processing' || 'pending'
|
||||
// 切换会话后这些状态丢失了
|
||||
}, [topicMessages])
|
||||
|
||||
const canAbort = loading && streamingAskIds.length > 0
|
||||
// loading 状态也丢失了
|
||||
```
|
||||
|
||||
### 根本原因
|
||||
1. **消息保存时机问题**:
|
||||
- 用户消息立即保存(状态为success)
|
||||
- 助手消息创建时是pending状态
|
||||
- 但保存到后端时可能已经是最终状态
|
||||
|
||||
2. **状态管理问题**:
|
||||
- loading状态只在Redux中,不持久化
|
||||
- 切换会话时Redux被清空
|
||||
- 重新加载时无法知道是否有正在进行的响应
|
||||
|
||||
## 解决方案
|
||||
|
||||
### 方案一:全局流式状态管理器(推荐)✅
|
||||
|
||||
创建一个全局的流式状态管理器,独立于Redux,跨会话保持状态。
|
||||
|
||||
```typescript
|
||||
// src/renderer/src/services/StreamingStateManager.ts
|
||||
class StreamingStateManager {
|
||||
// 记录正在进行的流式响应
|
||||
private streamingSessions = new Map<string, {
|
||||
topicId: string
|
||||
askId: string
|
||||
assistantMessageId: string
|
||||
startTime: number
|
||||
agentSession?: {
|
||||
agentId: string
|
||||
sessionId: string
|
||||
}
|
||||
}>()
|
||||
|
||||
startStreaming(topicId: string, askId: string, assistantMessageId: string, agentSession?: any) {
|
||||
this.streamingSessions.set(topicId, {
|
||||
topicId,
|
||||
askId,
|
||||
assistantMessageId,
|
||||
startTime: Date.now(),
|
||||
agentSession
|
||||
})
|
||||
}
|
||||
|
||||
stopStreaming(topicId: string) {
|
||||
this.streamingSessions.delete(topicId)
|
||||
}
|
||||
|
||||
isStreaming(topicId: string): boolean {
|
||||
return this.streamingSessions.has(topicId)
|
||||
}
|
||||
|
||||
getStreamingInfo(topicId: string) {
|
||||
return this.streamingSessions.get(topicId)
|
||||
}
|
||||
|
||||
// 获取所有正在流式的会话
|
||||
getAllStreaming() {
|
||||
return Array.from(this.streamingSessions.values())
|
||||
}
|
||||
|
||||
// 清理超时的流式状态(防止内存泄漏)
|
||||
cleanupStale(maxAge = 5 * 60 * 1000) { // 5分钟
|
||||
const now = Date.now()
|
||||
for (const [topicId, info] of this.streamingSessions) {
|
||||
if (now - info.startTime > maxAge) {
|
||||
this.streamingSessions.delete(topicId)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export const streamingStateManager = new StreamingStateManager()
|
||||
```
|
||||
|
||||
**集成点**:
|
||||
|
||||
1. **开始流式时**:
|
||||
```typescript
|
||||
// messageThunk.ts - fetchAndProcessAgentResponseImpl
|
||||
streamingStateManager.startStreaming(
|
||||
topicId,
|
||||
userMessageId,
|
||||
assistantMessage.id,
|
||||
agentSession
|
||||
)
|
||||
```
|
||||
|
||||
2. **结束流式时**:
|
||||
```typescript
|
||||
// callbacks.ts - onComplete
|
||||
streamingStateManager.stopStreaming(topicId)
|
||||
```
|
||||
|
||||
3. **UI使用**:
|
||||
```typescript
|
||||
// AgentSessionInputbar.tsx
|
||||
const isStreaming = streamingStateManager.isStreaming(sessionTopicId)
|
||||
const streamingInfo = streamingStateManager.getStreamingInfo(sessionTopicId)
|
||||
|
||||
const canAbort = isStreaming && streamingInfo?.askId
|
||||
```
|
||||
|
||||
### 方案二:增强消息持久化(备选)
|
||||
|
||||
修改消息保存逻辑,保留流式状态:
|
||||
|
||||
```typescript
|
||||
// AgentMessageDataSource.ts
|
||||
async appendMessage(topicId: string, message: Message, blocks: MessageBlock[]) {
|
||||
// 保存时保留 pending/processing 状态
|
||||
const messageToSave = {
|
||||
...message,
|
||||
// 如果是助手消息且状态是pending,保持这个状态
|
||||
status: message.status === 'pending' ? 'pending' : message.status
|
||||
}
|
||||
|
||||
// ... 保存逻辑
|
||||
}
|
||||
|
||||
// 加载时恢复状态
|
||||
async fetchMessages(topicId: string) {
|
||||
const { messages, blocks } = // ... 从后端加载
|
||||
|
||||
// 检查是否有未完成的消息
|
||||
for (const msg of messages) {
|
||||
if (msg.status === 'pending' || msg.status === 'processing') {
|
||||
// 恢复loading状态
|
||||
dispatch(newMessagesActions.setTopicLoading({ topicId, loading: true }))
|
||||
|
||||
// 可能需要重新启动流式处理或标记为失败
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### 方案三:Session级别状态存储(简单但有限)
|
||||
|
||||
在localStorage或sessionStorage中保存流式状态:
|
||||
|
||||
```typescript
|
||||
// 保存流式状态
|
||||
const saveStreamingState = (topicId: string, state: any) => {
|
||||
const states = JSON.parse(localStorage.getItem('streamingStates') || '{}')
|
||||
states[topicId] = {
|
||||
...state,
|
||||
timestamp: Date.now()
|
||||
}
|
||||
localStorage.setItem('streamingStates', JSON.stringify(states))
|
||||
}
|
||||
|
||||
// 恢复流式状态
|
||||
const getStreamingState = (topicId: string) => {
|
||||
const states = JSON.parse(localStorage.getItem('streamingStates') || '{}')
|
||||
const state = states[topicId]
|
||||
|
||||
// 检查是否过期(比如超过5分钟)
|
||||
if (state && Date.now() - state.timestamp < 5 * 60 * 1000) {
|
||||
return state
|
||||
}
|
||||
|
||||
// 清理过期状态
|
||||
delete states[topicId]
|
||||
localStorage.setItem('streamingStates', JSON.stringify(states))
|
||||
return null
|
||||
}
|
||||
```
|
||||
|
||||
## 推荐实施步骤
|
||||
|
||||
### 步骤1:实现StreamingStateManager
|
||||
1. 创建全局状态管理器
|
||||
2. 在开始/结束流式时更新状态
|
||||
3. 添加定期清理机制
|
||||
|
||||
### 步骤2:更新messageThunk.ts
|
||||
1. 在`fetchAndProcessAgentResponseImpl`开始时注册流式状态
|
||||
2. 在完成/错误/中止时清除状态
|
||||
3. 确保所有退出路径都清理状态
|
||||
|
||||
### 步骤3:更新UI组件
|
||||
1. 修改`AgentSessionInputbar.tsx`使用StreamingStateManager
|
||||
2. 不再依赖消息的status字段判断流式状态
|
||||
3. 使用全局状态判断是否显示暂停按钮
|
||||
|
||||
### 步骤4:处理边界情况
|
||||
1. 页面刷新时的状态恢复
|
||||
2. 网络中断的处理
|
||||
3. 超时自动清理
|
||||
|
||||
## 测试验证
|
||||
|
||||
### 测试场景
|
||||
1. **正常流式**:
|
||||
- 发送消息
|
||||
- 观察流式响应
|
||||
- 验证暂停按钮显示
|
||||
|
||||
2. **切换会话**:
|
||||
- 发送消息开始流式
|
||||
- 立即切换到其他会话
|
||||
- 切回来验证暂停按钮仍然显示
|
||||
- 可以正确暂停
|
||||
|
||||
3. **刷新页面**:
|
||||
- 流式过程中刷新
|
||||
- 验证状态是否合理处理(显示失败或继续)
|
||||
|
||||
4. **超时清理**:
|
||||
- 模拟长时间流式
|
||||
- 验证超时后状态被清理
|
||||
|
||||
## 优势对比
|
||||
|
||||
| 方案 | 优点 | 缺点 |
|
||||
|------|------|------|
|
||||
| 全局状态管理器 | • 简单可靠<br>• 跨会话工作<br>• 易于调试 | • 需要额外内存<br>• 页面刷新丢失 |
|
||||
| 增强持久化 | • 数据一致性好<br>• 页面刷新可恢复 | • 实现复杂<br>• 需要后端配合 |
|
||||
| Session存储 | • 实现简单<br>• 可跨页面刷新 | • 容量限制<br>• 需要清理逻辑 |
|
||||
|
||||
## 建议
|
||||
推荐使用**方案一:全局流式状态管理器**,因为:
|
||||
1. 实现简单,不需要修改后端
|
||||
2. 可以快速解决当前问题
|
||||
3. 易于扩展和维护
|
||||
4. 对现有代码改动最小
|
||||
|
||||
如果需要页面刷新后也能恢复状态,可以结合方案三,将关键信息保存到localStorage。
|
||||
@ -138,12 +138,41 @@ export class AgentMessageDataSource implements MessageDataSource {
|
||||
}
|
||||
|
||||
async updateMessage(topicId: string, messageId: string, updates: Partial<Message>): Promise<void> {
|
||||
// Agent session messages are immutable once persisted
|
||||
logger.warn(`updateMessage called for agent session ${topicId}, operation not supported`)
|
||||
const sessionId = extractSessionId(topicId)
|
||||
if (!sessionId) {
|
||||
throw new Error(`Invalid agent session topicId: ${topicId}`)
|
||||
}
|
||||
|
||||
// In a full implementation, you might want to:
|
||||
// 1. Update in Redux only for UI consistency
|
||||
// 2. Or implement a backend endpoint for message updates
|
||||
try {
|
||||
// Fetch current message from backend to merge updates
|
||||
const historicalMessages: AgentPersistedMessage[] = await window.electron.ipcRenderer.invoke(
|
||||
IpcChannel.AgentMessage_GetHistory,
|
||||
{ sessionId }
|
||||
)
|
||||
|
||||
const existingMessage = historicalMessages?.find((pm) => pm.message?.id === messageId)
|
||||
if (!existingMessage?.message) {
|
||||
logger.warn(`Message ${messageId} not found in agent session ${sessionId}`)
|
||||
return
|
||||
}
|
||||
|
||||
// Merge updates with existing message
|
||||
const updatedMessage = { ...existingMessage.message, ...updates }
|
||||
|
||||
// Save updated message back to backend
|
||||
await window.electron.ipcRenderer.invoke(IpcChannel.AgentMessage_PersistExchange, {
|
||||
sessionId,
|
||||
agentSessionId: '',
|
||||
...(updatedMessage.role === 'user'
|
||||
? { user: { payload: { message: updatedMessage, blocks: existingMessage.blocks || [] } } }
|
||||
: { assistant: { payload: { message: updatedMessage, blocks: existingMessage.blocks || [] } } })
|
||||
})
|
||||
|
||||
logger.info(`Updated message ${messageId} in agent session ${sessionId}`)
|
||||
} catch (error) {
|
||||
logger.error(`Failed to update message ${messageId} in agent session ${topicId}:`, error as Error)
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
async updateMessageAndBlocks(
|
||||
@ -151,8 +180,47 @@ export class AgentMessageDataSource implements MessageDataSource {
|
||||
messageUpdates: Partial<Message> & Pick<Message, 'id'>,
|
||||
blocksToUpdate: MessageBlock[]
|
||||
): Promise<void> {
|
||||
// Agent session messages and blocks are immutable once persisted
|
||||
logger.warn(`updateMessageAndBlocks called for agent session ${topicId}, operation not supported`)
|
||||
const sessionId = extractSessionId(topicId)
|
||||
if (!sessionId) {
|
||||
throw new Error(`Invalid agent session topicId: ${topicId}`)
|
||||
}
|
||||
|
||||
try {
|
||||
// Fetch current message from backend if we need to merge
|
||||
const historicalMessages: AgentPersistedMessage[] = await window.electron.ipcRenderer.invoke(
|
||||
IpcChannel.AgentMessage_GetHistory,
|
||||
{ sessionId }
|
||||
)
|
||||
|
||||
const existingMessage = historicalMessages?.find((pm) => pm.message?.id === messageUpdates.id)
|
||||
let finalMessage: Message
|
||||
|
||||
if (existingMessage?.message) {
|
||||
// Merge updates with existing message
|
||||
finalMessage = { ...existingMessage.message, ...messageUpdates }
|
||||
} else {
|
||||
// New message, ensure we have required fields
|
||||
if (!messageUpdates.topicId || !messageUpdates.role) {
|
||||
logger.warn(`Incomplete message data for ${messageUpdates.id}`)
|
||||
return
|
||||
}
|
||||
finalMessage = messageUpdates as Message
|
||||
}
|
||||
|
||||
// Save updated message and blocks to backend
|
||||
await window.electron.ipcRenderer.invoke(IpcChannel.AgentMessage_PersistExchange, {
|
||||
sessionId,
|
||||
agentSessionId: '',
|
||||
...(finalMessage.role === 'user'
|
||||
? { user: { payload: { message: finalMessage, blocks: blocksToUpdate } } }
|
||||
: { assistant: { payload: { message: finalMessage, blocks: blocksToUpdate } } })
|
||||
})
|
||||
|
||||
logger.info(`Updated message and blocks for ${messageUpdates.id} in agent session ${sessionId}`)
|
||||
} catch (error) {
|
||||
logger.error(`Failed to update message and blocks for agent session ${topicId}:`, error as Error)
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
async deleteMessage(topicId: string, messageId: string): Promise<void> {
|
||||
|
||||
@ -11,7 +11,6 @@ import { createStreamProcessor, type StreamProcessorCallbacks } from '@renderer/
|
||||
import store from '@renderer/store'
|
||||
import { updateTopicUpdatedAt } from '@renderer/store/assistants'
|
||||
import { type ApiServerConfig, type Assistant, type FileMetadata, type Model, type Topic } from '@renderer/types'
|
||||
import type { AgentPersistedMessage } from '@renderer/types/agent'
|
||||
import { ChunkType } from '@renderer/types/chunk'
|
||||
import type { FileMessageBlock, ImageMessageBlock, Message, MessageBlock } from '@renderer/types/newMessage'
|
||||
import { AssistantMessageStatus, MessageBlockStatus, MessageBlockType } from '@renderer/types/newMessage'
|
||||
@ -504,13 +503,17 @@ const fetchAndProcessAgentResponseImpl = async (
|
||||
text: Promise.resolve('')
|
||||
})
|
||||
|
||||
await persistAgentExchange({
|
||||
getState,
|
||||
agentSession,
|
||||
userMessageId,
|
||||
assistantMessageId: assistantMessage.id,
|
||||
latestAgentSessionId
|
||||
})
|
||||
// No longer need persistAgentExchange here since:
|
||||
// 1. User message is already saved via appendMessage when created
|
||||
// 2. Assistant message is saved via appendMessage when created
|
||||
// 3. Updates during streaming are saved via updateMessageAndBlocks
|
||||
// This eliminates the duplicate save issue
|
||||
|
||||
// Only persist the agentSessionId update if it changed
|
||||
if (latestAgentSessionId) {
|
||||
logger.info(`Agent session ID updated to: ${latestAgentSessionId}`)
|
||||
// In the future, you might want to update some session metadata here
|
||||
}
|
||||
} catch (error: any) {
|
||||
logger.error('Error in fetchAndProcessAgentResponseImpl:', error)
|
||||
try {
|
||||
@ -523,73 +526,9 @@ const fetchAndProcessAgentResponseImpl = async (
|
||||
}
|
||||
}
|
||||
|
||||
interface PersistAgentExchangeParams {
|
||||
getState: () => RootState
|
||||
agentSession: AgentSessionContext
|
||||
userMessageId: string
|
||||
assistantMessageId: string
|
||||
latestAgentSessionId: string
|
||||
}
|
||||
|
||||
const persistAgentExchange = async ({
|
||||
getState,
|
||||
agentSession,
|
||||
userMessageId,
|
||||
assistantMessageId,
|
||||
latestAgentSessionId
|
||||
}: PersistAgentExchangeParams) => {
|
||||
if (!window.electron?.ipcRenderer) {
|
||||
return
|
||||
}
|
||||
|
||||
try {
|
||||
const state = getState()
|
||||
const userMessage = state.messages.entities[userMessageId]
|
||||
const assistantMessage = state.messages.entities[assistantMessageId]
|
||||
|
||||
if (!userMessage || !assistantMessage) {
|
||||
logger.warn('persistAgentExchange: missing user or assistant message entity')
|
||||
return
|
||||
}
|
||||
|
||||
const userPersistedPayload = createPersistedMessagePayload(userMessage, state)
|
||||
const assistantPersistedPayload = createPersistedMessagePayload(assistantMessage, state)
|
||||
|
||||
await window.electron.ipcRenderer.invoke(IpcChannel.AgentMessage_PersistExchange, {
|
||||
sessionId: agentSession.sessionId,
|
||||
agentSessionId: latestAgentSessionId || '',
|
||||
user: userPersistedPayload ? { payload: userPersistedPayload } : undefined,
|
||||
assistant: assistantPersistedPayload ? { payload: assistantPersistedPayload } : undefined
|
||||
})
|
||||
} catch (error) {
|
||||
logger.warn('Failed to persist agent exchange', error as Error)
|
||||
}
|
||||
}
|
||||
|
||||
const createPersistedMessagePayload = (
|
||||
message: Message | undefined,
|
||||
state: RootState
|
||||
): AgentPersistedMessage | undefined => {
|
||||
if (!message) {
|
||||
return undefined
|
||||
}
|
||||
|
||||
try {
|
||||
const clonedMessage = JSON.parse(JSON.stringify(message)) as Message
|
||||
const blockEntities = (message.blocks || [])
|
||||
.map((blockId) => state.messageBlocks.entities[blockId])
|
||||
.filter((block): block is MessageBlock => Boolean(block))
|
||||
.map((block) => JSON.parse(JSON.stringify(block)) as MessageBlock)
|
||||
|
||||
return {
|
||||
message: clonedMessage,
|
||||
blocks: blockEntities
|
||||
}
|
||||
} catch (error) {
|
||||
logger.warn('Failed to build persisted payload for message', error as Error)
|
||||
return undefined
|
||||
}
|
||||
}
|
||||
// Removed persistAgentExchange and createPersistedMessagePayload functions
|
||||
// These are no longer needed since messages are saved immediately via appendMessage
|
||||
// and updated during streaming via updateMessageAndBlocks
|
||||
|
||||
// --- Helper Function for Multi-Model Dispatch ---
|
||||
// 多模型创建和发送请求的逻辑,用于用户消息多模型发送和重发
|
||||
|
||||
Loading…
Reference in New Issue
Block a user