Merge branch 'refactor/agent-assistant-unified' into feat/agents-new

This commit is contained in:
suyao 2025-09-22 23:14:46 +08:00
commit 1b8bb568b1
No known key found for this signature in database
17 changed files with 2167 additions and 166 deletions

View File

@ -6,3 +6,6 @@ CSLOGGER_MAIN_LEVEL=info
CSLOGGER_RENDERER_LEVEL=info CSLOGGER_RENDERER_LEVEL=info
#CSLOGGER_MAIN_SHOW_MODULES= #CSLOGGER_MAIN_SHOW_MODULES=
#CSLOGGER_RENDERER_SHOW_MODULES= #CSLOGGER_RENDERER_SHOW_MODULES=
# Feature Flags (must be prefixed with VITE_ to be accessible in renderer)
# VITE_USE_UNIFIED_DB_SERVICE=true # Enable unified DB service for chat/agent sessions

249
STREAMING_STATE_SOLUTION.md Normal file
View File

@ -0,0 +1,249 @@
# Agent Session 流式状态保持方案
## 问题描述
Agent会话中发送消息后如果在响应过程中切换会话
1. 消息内容不丢失了(已修复)✅
2. 但是pending/processing状态丢失了 ❌
3. loading状态丢失了 ❌
4. 导致无法显示"暂停"按钮,无法中止正在进行的响应
## 问题分析
### 现状
```javascript
// AgentSessionInputbar.tsx
const streamingAskIds = useMemo(() => {
// 检查消息的 status === 'processing' || 'pending'
// 切换会话后这些状态丢失了
}, [topicMessages])
const canAbort = loading && streamingAskIds.length > 0
// loading 状态也丢失了
```
### 根本原因
1. **消息保存时机问题**
- 用户消息立即保存状态为success
- 助手消息创建时是pending状态
- 但保存到后端时可能已经是最终状态
2. **状态管理问题**
- loading状态只在Redux中不持久化
- 切换会话时Redux被清空
- 重新加载时无法知道是否有正在进行的响应
## 解决方案
### 方案一:全局流式状态管理器(推荐)✅
创建一个全局的流式状态管理器独立于Redux跨会话保持状态。
```typescript
// src/renderer/src/services/StreamingStateManager.ts
class StreamingStateManager {
// 记录正在进行的流式响应
private streamingSessions = new Map<string, {
topicId: string
askId: string
assistantMessageId: string
startTime: number
agentSession?: {
agentId: string
sessionId: string
}
}>()
startStreaming(topicId: string, askId: string, assistantMessageId: string, agentSession?: any) {
this.streamingSessions.set(topicId, {
topicId,
askId,
assistantMessageId,
startTime: Date.now(),
agentSession
})
}
stopStreaming(topicId: string) {
this.streamingSessions.delete(topicId)
}
isStreaming(topicId: string): boolean {
return this.streamingSessions.has(topicId)
}
getStreamingInfo(topicId: string) {
return this.streamingSessions.get(topicId)
}
// 获取所有正在流式的会话
getAllStreaming() {
return Array.from(this.streamingSessions.values())
}
// 清理超时的流式状态(防止内存泄漏)
cleanupStale(maxAge = 5 * 60 * 1000) { // 5分钟
const now = Date.now()
for (const [topicId, info] of this.streamingSessions) {
if (now - info.startTime > maxAge) {
this.streamingSessions.delete(topicId)
}
}
}
}
export const streamingStateManager = new StreamingStateManager()
```
**集成点**
1. **开始流式时**
```typescript
// messageThunk.ts - fetchAndProcessAgentResponseImpl
streamingStateManager.startStreaming(
topicId,
userMessageId,
assistantMessage.id,
agentSession
)
```
2. **结束流式时**
```typescript
// callbacks.ts - onComplete
streamingStateManager.stopStreaming(topicId)
```
3. **UI使用**
```typescript
// AgentSessionInputbar.tsx
const isStreaming = streamingStateManager.isStreaming(sessionTopicId)
const streamingInfo = streamingStateManager.getStreamingInfo(sessionTopicId)
const canAbort = isStreaming && streamingInfo?.askId
```
### 方案二:增强消息持久化(备选)
修改消息保存逻辑,保留流式状态:
```typescript
// AgentMessageDataSource.ts
async appendMessage(topicId: string, message: Message, blocks: MessageBlock[]) {
// 保存时保留 pending/processing 状态
const messageToSave = {
...message,
// 如果是助手消息且状态是pending保持这个状态
status: message.status === 'pending' ? 'pending' : message.status
}
// ... 保存逻辑
}
// 加载时恢复状态
async fetchMessages(topicId: string) {
const { messages, blocks } = // ... 从后端加载
// 检查是否有未完成的消息
for (const msg of messages) {
if (msg.status === 'pending' || msg.status === 'processing') {
// 恢复loading状态
dispatch(newMessagesActions.setTopicLoading({ topicId, loading: true }))
// 可能需要重新启动流式处理或标记为失败
}
}
}
```
### 方案三Session级别状态存储简单但有限
在localStorage或sessionStorage中保存流式状态
```typescript
// 保存流式状态
const saveStreamingState = (topicId: string, state: any) => {
const states = JSON.parse(localStorage.getItem('streamingStates') || '{}')
states[topicId] = {
...state,
timestamp: Date.now()
}
localStorage.setItem('streamingStates', JSON.stringify(states))
}
// 恢复流式状态
const getStreamingState = (topicId: string) => {
const states = JSON.parse(localStorage.getItem('streamingStates') || '{}')
const state = states[topicId]
// 检查是否过期比如超过5分钟
if (state && Date.now() - state.timestamp < 5 * 60 * 1000) {
return state
}
// 清理过期状态
delete states[topicId]
localStorage.setItem('streamingStates', JSON.stringify(states))
return null
}
```
## 推荐实施步骤
### 步骤1实现StreamingStateManager
1. 创建全局状态管理器
2. 在开始/结束流式时更新状态
3. 添加定期清理机制
### 步骤2更新messageThunk.ts
1. 在`fetchAndProcessAgentResponseImpl`开始时注册流式状态
2. 在完成/错误/中止时清除状态
3. 确保所有退出路径都清理状态
### 步骤3更新UI组件
1. 修改`AgentSessionInputbar.tsx`使用StreamingStateManager
2. 不再依赖消息的status字段判断流式状态
3. 使用全局状态判断是否显示暂停按钮
### 步骤4处理边界情况
1. 页面刷新时的状态恢复
2. 网络中断的处理
3. 超时自动清理
## 测试验证
### 测试场景
1. **正常流式**
- 发送消息
- 观察流式响应
- 验证暂停按钮显示
2. **切换会话**
- 发送消息开始流式
- 立即切换到其他会话
- 切回来验证暂停按钮仍然显示
- 可以正确暂停
3. **刷新页面**
- 流式过程中刷新
- 验证状态是否合理处理(显示失败或继续)
4. **超时清理**
- 模拟长时间流式
- 验证超时后状态被清理
## 优势对比
| 方案 | 优点 | 缺点 |
|------|------|------|
| 全局状态管理器 | • 简单可靠<br>• 跨会话工作<br>• 易于调试 | • 需要额外内存<br>• 页面刷新丢失 |
| 增强持久化 | • 数据一致性好<br>• 页面刷新可恢复 | • 实现复杂<br>• 需要后端配合 |
| Session存储 | • 实现简单<br>• 可跨页面刷新 | • 容量限制<br>• 需要清理逻辑 |
## 建议
推荐使用**方案一:全局流式状态管理器**,因为:
1. 实现简单,不需要修改后端
2. 可以快速解决当前问题
3. 易于扩展和维护
4. 对现有代码改动最小
如果需要页面刷新后也能恢复状态可以结合方案三将关键信息保存到localStorage。

View File

@ -91,6 +91,7 @@ export enum IpcChannel {
// agent messages // agent messages
AgentMessage_PersistExchange = 'agent-message:persist-exchange', AgentMessage_PersistExchange = 'agent-message:persist-exchange',
AgentMessage_GetHistory = 'agent-message:get-history',
//copilot //copilot
Copilot_GetAuthMessage = 'copilot:get-auth-message', Copilot_GetAuthMessage = 'copilot:get-auth-message',

View File

@ -209,6 +209,15 @@ export function registerIpc(mainWindow: BrowserWindow, app: Electron.App) {
} }
}) })
ipcMain.handle(IpcChannel.AgentMessage_GetHistory, async (_event, { sessionId }: { sessionId: string }) => {
try {
return await agentMessageRepository.getSessionHistory(sessionId)
} catch (error) {
logger.error('Failed to get agent session history', error as Error)
throw error
}
})
//only for mac //only for mac
if (isMac) { if (isMac) {
ipcMain.handle(IpcChannel.App_MacIsProcessTrusted, (): boolean => { ipcMain.handle(IpcChannel.App_MacIsProcessTrusted, (): boolean => {

View File

@ -7,6 +7,7 @@ import type {
AgentPersistedMessage, AgentPersistedMessage,
AgentSessionMessageEntity AgentSessionMessageEntity
} from '@types' } from '@types'
import { asc, eq } from 'drizzle-orm'
import { BaseService } from '../BaseService' import { BaseService } from '../BaseService'
import type { InsertSessionMessageRow } from './schema' import type { InsertSessionMessageRow } from './schema'
@ -176,6 +177,34 @@ class AgentMessageRepository extends BaseService {
return result return result
} }
async getSessionHistory(sessionId: string): Promise<AgentPersistedMessage[]> {
await AgentMessageRepository.initialize()
this.ensureInitialized()
try {
const rows = await this.database
.select()
.from(sessionMessagesTable)
.where(eq(sessionMessagesTable.session_id, sessionId))
.orderBy(asc(sessionMessagesTable.created_at))
const messages: AgentPersistedMessage[] = []
for (const row of rows) {
const deserialized = this.deserialize(row)
if (deserialized?.content) {
messages.push(deserialized.content as AgentPersistedMessage)
}
}
logger.info(`Loaded ${messages.length} messages for session ${sessionId}`)
return messages
} catch (error) {
logger.error('Failed to load session history', error as Error)
throw error
}
}
} }
export const agentMessageRepository = AgentMessageRepository.getInstance() export const agentMessageRepository = AgentMessageRepository.getInstance()

View File

@ -0,0 +1,81 @@
/**
* Feature flags for controlling gradual rollout of new features
* These flags can be toggled to enable/disable features without code changes
*/
interface FeatureFlags {
/**
* Enable unified database service for both regular chats and agent sessions
* When enabled, uses the new DbService facade pattern
* When disabled, uses the original implementation with conditional checks
*/
USE_UNIFIED_DB_SERVICE: boolean
}
/**
* Default feature flag values
* Set to false initially for safe rollout
*/
export const featureFlags: FeatureFlags = {
USE_UNIFIED_DB_SERVICE: false
}
/**
* Override feature flags from environment or local storage
* Priority order (highest to lowest):
* 1. localStorage (runtime overrides)
* 2. Environment variables (build-time config)
* 3. Default values
*/
export function initializeFeatureFlags(): void {
// First, check environment variables (build-time configuration)
// In Vite, env vars must be prefixed with VITE_ to be exposed to the client
// Usage: VITE_USE_UNIFIED_DB_SERVICE=true yarn dev
if (import.meta.env?.VITE_USE_UNIFIED_DB_SERVICE === 'true') {
featureFlags.USE_UNIFIED_DB_SERVICE = true
console.log('[FeatureFlags] USE_UNIFIED_DB_SERVICE enabled via environment variable')
}
// Then check localStorage for runtime overrides (higher priority)
// This allows toggling features without rebuilding
try {
const localOverrides = localStorage.getItem('featureFlags')
if (localOverrides) {
const overrides = JSON.parse(localOverrides)
Object.keys(overrides).forEach((key) => {
if (key in featureFlags) {
featureFlags[key as keyof FeatureFlags] = overrides[key]
console.log(`[FeatureFlags] ${key} set to ${overrides[key]} via localStorage`)
}
})
}
} catch (e) {
console.warn('[FeatureFlags] Failed to parse feature flags from localStorage:', e)
}
console.log('[FeatureFlags] Current flags:', featureFlags)
}
/**
* Update a feature flag value at runtime
* Useful for A/B testing or gradual rollout
*/
export function setFeatureFlag(flag: keyof FeatureFlags, value: boolean): void {
featureFlags[flag] = value
// Persist to localStorage for consistency across app restarts
const currentFlags = localStorage.getItem('featureFlags')
const flags = currentFlags ? JSON.parse(currentFlags) : {}
flags[flag] = value
localStorage.setItem('featureFlags', JSON.stringify(flags))
}
/**
* Get current value of a feature flag
*/
export function getFeatureFlag(flag: keyof FeatureFlags): boolean {
return featureFlags[flag]
}
// Initialize on import
initializeFeatureFlags()

View File

@ -1,9 +1,8 @@
import { useAppDispatch } from '@renderer/store' import { useAppDispatch } from '@renderer/store'
import { removeManyBlocks, upsertManyBlocks } from '@renderer/store/messageBlock' import { loadTopicMessagesThunk } from '@renderer/store/thunk/messageThunk'
import { newMessagesActions } from '@renderer/store/newMessage' import { UpdateSessionForm } from '@renderer/types'
import { AgentPersistedMessage, UpdateSessionForm } from '@renderer/types'
import { buildAgentSessionTopicId } from '@renderer/utils/agentSession' import { buildAgentSessionTopicId } from '@renderer/utils/agentSession'
import { useCallback, useEffect, useMemo, useRef } from 'react' import { useCallback, useEffect, useMemo } from 'react'
import { useTranslation } from 'react-i18next' import { useTranslation } from 'react-i18next'
import useSWR from 'swr' import useSWR from 'swr'
@ -15,7 +14,6 @@ export const useSession = (agentId: string, sessionId: string) => {
const key = client.getSessionPaths(agentId).withId(sessionId) const key = client.getSessionPaths(agentId).withId(sessionId)
const dispatch = useAppDispatch() const dispatch = useAppDispatch()
const sessionTopicId = useMemo(() => buildAgentSessionTopicId(sessionId), [sessionId]) const sessionTopicId = useMemo(() => buildAgentSessionTopicId(sessionId), [sessionId])
const blockIdsRef = useRef<string[]>([])
const fetcher = async () => { const fetcher = async () => {
const data = await client.getSession(agentId, sessionId) const data = await client.getSession(agentId, sessionId)
@ -23,37 +21,15 @@ export const useSession = (agentId: string, sessionId: string) => {
} }
const { data, error, isLoading, mutate } = useSWR(key, fetcher) const { data, error, isLoading, mutate } = useSWR(key, fetcher)
// Use loadTopicMessagesThunk to load messages (with caching mechanism)
// This ensures messages are preserved when switching between sessions/tabs
useEffect(() => { useEffect(() => {
const messages = data?.messages ?? [] if (sessionId) {
if (!messages.length) { // loadTopicMessagesThunk will check if messages already exist in Redux
dispatch(newMessagesActions.messagesReceived({ topicId: sessionTopicId, messages: [] })) // and skip loading if they do (unless forceReload is true)
blockIdsRef.current = [] dispatch(loadTopicMessagesThunk(sessionTopicId))
return
} }
}, [dispatch, sessionId, sessionTopicId])
const persistedEntries = messages
.map((entity) => entity.content as AgentPersistedMessage | undefined)
.filter((entry): entry is AgentPersistedMessage => Boolean(entry))
const allBlocks = persistedEntries.flatMap((entry) => entry.blocks)
if (allBlocks.length > 0) {
dispatch(upsertManyBlocks(allBlocks))
}
blockIdsRef.current = allBlocks.map((block) => block.id)
const messageRecords = persistedEntries.map((entry) => entry.message)
dispatch(newMessagesActions.messagesReceived({ topicId: sessionTopicId, messages: messageRecords }))
}, [data?.messages, dispatch, sessionTopicId])
useEffect(() => {
return () => {
if (blockIdsRef.current.length > 0) {
dispatch(removeManyBlocks(blockIdsRef.current))
}
dispatch(newMessagesActions.clearTopicMessages(sessionTopicId))
}
}, [dispatch, sessionTopicId])
const updateSession = useCallback( const updateSession = useCallback(
async (form: UpdateSessionForm) => { async (form: UpdateSessionForm) => {

View File

@ -1,10 +1,9 @@
import { loggerService } from '@logger' import { loggerService } from '@logger'
import ContextMenu from '@renderer/components/ContextMenu' import ContextMenu from '@renderer/components/ContextMenu'
import { useSession } from '@renderer/hooks/agents/useSession' import { useSession } from '@renderer/hooks/agents/useSession'
import { useTopicMessages } from '@renderer/hooks/useMessageOperations'
import { getGroupedMessages } from '@renderer/services/MessagesService' import { getGroupedMessages } from '@renderer/services/MessagesService'
import { useAppSelector } from '@renderer/store' import { type Topic, TopicType } from '@renderer/types'
import { selectMessagesForTopic } from '@renderer/store/newMessage'
import { Topic } from '@renderer/types'
import { buildAgentSessionTopicId } from '@renderer/utils/agentSession' import { buildAgentSessionTopicId } from '@renderer/utils/agentSession'
import { memo, useMemo } from 'react' import { memo, useMemo } from 'react'
import styled from 'styled-components' import styled from 'styled-components'
@ -23,7 +22,8 @@ type Props = {
const AgentSessionMessages: React.FC<Props> = ({ agentId, sessionId }) => { const AgentSessionMessages: React.FC<Props> = ({ agentId, sessionId }) => {
const { session } = useSession(agentId, sessionId) const { session } = useSession(agentId, sessionId)
const sessionTopicId = useMemo(() => buildAgentSessionTopicId(sessionId), [sessionId]) const sessionTopicId = useMemo(() => buildAgentSessionTopicId(sessionId), [sessionId])
const messages = useAppSelector((state) => selectMessagesForTopic(state, sessionTopicId)) // Use the same hook as Messages.tsx for consistent behavior
const messages = useTopicMessages(sessionTopicId)
const displayMessages = useMemo(() => { const displayMessages = useMemo(() => {
if (!messages || messages.length === 0) return [] if (!messages || messages.length === 0) return []
@ -43,6 +43,7 @@ const AgentSessionMessages: React.FC<Props> = ({ agentId, sessionId }) => {
const derivedTopic = useMemo<Topic>( const derivedTopic = useMemo<Topic>(
() => ({ () => ({
id: sessionTopicId, id: sessionTopicId,
type: TopicType.Session,
assistantId: sessionAssistantId, assistantId: sessionAssistantId,
name: sessionName, name: sessionName,
createdAt: sessionCreatedAt, createdAt: sessionCreatedAt,

View File

@ -0,0 +1,478 @@
import { loggerService } from '@logger'
import type { AgentPersistedMessage } from '@renderer/types/agent'
import type { Message, MessageBlock } from '@renderer/types/newMessage'
import { IpcChannel } from '@shared/IpcChannel'
import { throttle } from 'lodash'
import { LRUCache } from 'lru-cache'
import type { MessageDataSource } from './types'
import { extractSessionId } from './types'
const logger = loggerService.withContext('AgentMessageDataSource')
/**
* Streaming message cache to track messages being streamed
* Key: messageId, Value: { message, blocks, isComplete }
*/
const streamingMessageCache = new LRUCache<
string,
{
message: Message
blocks: MessageBlock[]
isComplete: boolean
sessionId: string
}
>({
max: 100,
ttl: 1000 * 60 * 5 // 5 minutes
})
/**
* Throttled persisters for each message to batch updates during streaming
*/
const messagePersistThrottlers = new LRUCache<string, ReturnType<typeof throttle>>({
max: 100,
ttl: 1000 * 60 * 5
})
/**
* IPC-based implementation of MessageDataSource
* Handles agent session messages through backend communication
*/
export class AgentMessageDataSource implements MessageDataSource {
// ============ Helper Methods ============
/**
* Get or create a throttled persister for a message
*/
private getMessagePersister(messageId: string): ReturnType<typeof throttle> {
if (!messagePersistThrottlers.has(messageId)) {
const persister = throttle(async () => {
const cached = streamingMessageCache.get(messageId)
if (!cached) return
const { message, blocks, sessionId, isComplete } = cached
try {
// Persist to backend
await window.electron.ipcRenderer.invoke(IpcChannel.AgentMessage_PersistExchange, {
sessionId,
agentSessionId: '',
...(message.role === 'user'
? { user: { payload: { message, blocks } } }
: { assistant: { payload: { message, blocks } } })
})
logger.debug(`Persisted ${isComplete ? 'complete' : 'streaming'} message ${messageId} to backend`)
// Clean up if complete
if (isComplete) {
streamingMessageCache.delete(messageId)
messagePersistThrottlers.delete(messageId)
}
} catch (error) {
logger.error(`Failed to persist message ${messageId}:`, error as Error)
}
}, 500) // Throttle to 500ms for agent messages (less frequent than chat)
messagePersistThrottlers.set(messageId, persister)
}
return messagePersistThrottlers.get(messageId)!
}
/**
* Check if a message is in streaming state based on status
*/
private isMessageStreaming(message: Partial<Message>): boolean {
return message.status?.includes('ing') ?? false
}
/**
* Clean up resources for a message
*/
private cleanupMessage(messageId: string): void {
streamingMessageCache.delete(messageId)
const throttler = messagePersistThrottlers.get(messageId)
if (throttler) {
throttler.cancel()
messagePersistThrottlers.delete(messageId)
}
}
// ============ Read Operations ============
async fetchMessages(topicId: string): Promise<{
messages: Message[]
blocks: MessageBlock[]
}> {
try {
const sessionId = extractSessionId(topicId)
if (!window.electron?.ipcRenderer) {
logger.warn('IPC renderer not available')
return { messages: [], blocks: [] }
}
// Fetch from agent backend
const historicalMessages: AgentPersistedMessage[] = await window.electron.ipcRenderer.invoke(
IpcChannel.AgentMessage_GetHistory,
{ sessionId }
)
if (!historicalMessages || !Array.isArray(historicalMessages)) {
return { messages: [], blocks: [] }
}
const messages: Message[] = []
const blocks: MessageBlock[] = []
for (const persistedMsg of historicalMessages) {
if (persistedMsg?.message) {
messages.push(persistedMsg.message)
if (persistedMsg.blocks && persistedMsg.blocks.length > 0) {
blocks.push(...persistedMsg.blocks)
}
}
}
logger.info(`Loaded ${messages.length} messages for agent session ${sessionId}`)
return { messages, blocks }
} catch (error) {
logger.error(`Failed to fetch messages for agent session ${topicId}:`, error as Error)
throw error
}
}
// ============ Write Operations ============
async appendMessage(topicId: string, message: Message, blocks: MessageBlock[], _insertIndex?: number): Promise<void> {
const sessionId = extractSessionId(topicId)
if (!sessionId) {
throw new Error(`Invalid agent session topicId: ${topicId}`)
}
try {
const isStreaming = this.isMessageStreaming(message)
// Always persist immediately for visibility in UI
const payload: AgentPersistedMessage = {
message,
blocks
}
await window.electron.ipcRenderer.invoke(IpcChannel.AgentMessage_PersistExchange, {
sessionId,
agentSessionId: '',
...(message.role === 'user' ? { user: { payload } } : { assistant: { payload } })
})
logger.info(`Saved ${message.role} message for agent session ${sessionId}`, {
messageId: message.id,
blockCount: blocks.length,
status: message.status,
isStreaming
})
// If streaming, also set up cache for throttled updates
if (isStreaming && message.role === 'assistant') {
streamingMessageCache.set(message.id, {
message,
blocks,
isComplete: false,
sessionId
})
// Set up throttled persister for future updates
this.getMessagePersister(message.id)
logger.debug(`Set up streaming cache for message ${message.id}`)
} else {
// Clean up any streaming cache for non-streaming messages
this.cleanupMessage(message.id)
}
} catch (error) {
logger.error(`Failed to save message for agent session ${topicId}:`, error as Error)
throw error
}
}
async updateMessage(topicId: string, messageId: string, updates: Partial<Message>): Promise<void> {
const sessionId = extractSessionId(topicId)
if (!sessionId) {
throw new Error(`Invalid agent session topicId: ${topicId}`)
}
try {
// Fetch current message from backend to merge updates
const historicalMessages: AgentPersistedMessage[] = await window.electron.ipcRenderer.invoke(
IpcChannel.AgentMessage_GetHistory,
{ sessionId }
)
const existingMessage = historicalMessages?.find((pm) => pm.message?.id === messageId)
if (!existingMessage?.message) {
logger.warn(`Message ${messageId} not found in agent session ${sessionId}`)
return
}
// Merge updates with existing message
const updatedMessage = { ...existingMessage.message, ...updates }
// Save updated message back to backend
await window.electron.ipcRenderer.invoke(IpcChannel.AgentMessage_PersistExchange, {
sessionId,
agentSessionId: '',
...(updatedMessage.role === 'user'
? { user: { payload: { message: updatedMessage, blocks: existingMessage.blocks || [] } } }
: { assistant: { payload: { message: updatedMessage, blocks: existingMessage.blocks || [] } } })
})
logger.info(`Updated message ${messageId} in agent session ${sessionId}`)
} catch (error) {
logger.error(`Failed to update message ${messageId} in agent session ${topicId}:`, error as Error)
throw error
}
}
async updateMessageAndBlocks(
topicId: string,
messageUpdates: Partial<Message> & Pick<Message, 'id'>,
blocksToUpdate: MessageBlock[]
): Promise<void> {
const sessionId = extractSessionId(topicId)
if (!sessionId) {
throw new Error(`Invalid agent session topicId: ${topicId}`)
}
try {
const isStreaming = this.isMessageStreaming(messageUpdates)
// Check if we have cached data for this message
const cached = streamingMessageCache.get(messageUpdates.id)
if (isStreaming) {
// During streaming, update cache and trigger throttled persist
let currentMessage: Message
let currentBlocks: MessageBlock[]
if (cached) {
// Update existing cached message
currentMessage = { ...cached.message, ...messageUpdates }
// Merge blocks - use new blocks if provided, otherwise keep cached
currentBlocks = blocksToUpdate.length > 0 ? blocksToUpdate : cached.blocks
} else {
// First streaming update - fetch from backend or create new
const historicalMessages: AgentPersistedMessage[] = await window.electron.ipcRenderer.invoke(
IpcChannel.AgentMessage_GetHistory,
{ sessionId }
)
const existingMessage = historicalMessages?.find((pm) => pm.message?.id === messageUpdates.id)
if (existingMessage?.message) {
currentMessage = { ...existingMessage.message, ...messageUpdates }
currentBlocks = blocksToUpdate.length > 0 ? blocksToUpdate : existingMessage.blocks || []
} else {
// New message
if (!messageUpdates.topicId || !messageUpdates.role) {
logger.warn(`Incomplete message data for streaming message ${messageUpdates.id}`)
return
}
currentMessage = messageUpdates as Message
currentBlocks = blocksToUpdate
}
}
// Update cache
streamingMessageCache.set(messageUpdates.id, {
message: currentMessage,
blocks: currentBlocks,
isComplete: false,
sessionId
})
// Trigger throttled persist
const persister = this.getMessagePersister(messageUpdates.id)
persister()
logger.debug(`Updated streaming cache for message ${messageUpdates.id}`, {
status: messageUpdates.status,
blockCount: currentBlocks.length
})
} else {
// Not streaming - persist immediately
let finalMessage: Message
let finalBlocks: MessageBlock[]
if (cached) {
// Use cached data as base
finalMessage = { ...cached.message, ...messageUpdates }
finalBlocks = blocksToUpdate.length > 0 ? blocksToUpdate : cached.blocks
} else {
// Fetch from backend if no cache
const historicalMessages: AgentPersistedMessage[] = await window.electron.ipcRenderer.invoke(
IpcChannel.AgentMessage_GetHistory,
{ sessionId }
)
const existingMessage = historicalMessages?.find((pm) => pm.message?.id === messageUpdates.id)
if (existingMessage?.message) {
finalMessage = { ...existingMessage.message, ...messageUpdates }
finalBlocks = blocksToUpdate.length > 0 ? blocksToUpdate : existingMessage.blocks || []
} else {
if (!messageUpdates.topicId || !messageUpdates.role) {
logger.warn(`Incomplete message data for ${messageUpdates.id}`)
return
}
finalMessage = messageUpdates as Message
finalBlocks = blocksToUpdate
}
}
// Mark as complete in cache if it was streaming
if (cached) {
streamingMessageCache.set(messageUpdates.id, {
message: finalMessage,
blocks: finalBlocks,
isComplete: true,
sessionId
})
}
// Persist to backend
await window.electron.ipcRenderer.invoke(IpcChannel.AgentMessage_PersistExchange, {
sessionId,
agentSessionId: '',
...(finalMessage.role === 'user'
? { user: { payload: { message: finalMessage, blocks: finalBlocks } } }
: { assistant: { payload: { message: finalMessage, blocks: finalBlocks } } })
})
logger.info(`Persisted complete message ${messageUpdates.id} for agent session ${sessionId}`, {
status: finalMessage.status,
blockCount: finalBlocks.length
})
// Clean up
this.cleanupMessage(messageUpdates.id)
}
} catch (error) {
logger.error(`Failed to update message and blocks for agent session ${topicId}:`, error as Error)
throw error
}
}
async deleteMessage(topicId: string, _messageId: string): Promise<void> {
// Agent session messages cannot be deleted individually
logger.warn(`deleteMessage called for agent session ${topicId}, operation not supported`)
// In a full implementation, you might want to:
// 1. Implement soft delete in backend
// 2. Or just hide from UI without actual deletion
}
async deleteMessages(topicId: string, _messageIds: string[]): Promise<void> {
// Agent session messages cannot be deleted in batch
logger.warn(`deleteMessages called for agent session ${topicId}, operation not supported`)
// In a full implementation, you might want to:
// 1. Implement batch soft delete in backend
// 2. Update local state accordingly
}
async deleteMessagesByAskId(topicId: string, _askId: string): Promise<void> {
// Agent session messages cannot be deleted
logger.warn(`deleteMessagesByAskId called for agent session ${topicId}, operation not supported`)
}
// ============ Block Operations ============
async updateBlocks(_blocks: MessageBlock[]): Promise<void> {
// Blocks are updated through persistExchange for agent sessions
logger.warn('updateBlocks called for agent session, operation not supported individually')
}
async deleteBlocks(_blockIds: string[]): Promise<void> {
// Blocks cannot be deleted individually for agent sessions
logger.warn('deleteBlocks called for agent session, operation not supported')
}
// ============ Batch Operations ============
async clearMessages(topicId: string): Promise<void> {
const sessionId = extractSessionId(topicId)
if (!window.electron?.ipcRenderer) {
logger.warn('IPC renderer not available for clear messages')
return
}
// In a full implementation, you would call a backend endpoint to clear session
// For now, we'll just log the attempt
logger.info(`Clear messages requested for agent session ${sessionId}`)
// You might want to implement:
// await window.electron.ipcRenderer.invoke(
// IpcChannel.AgentMessage_ClearSession,
// { sessionId }
// )
}
async topicExists(topicId: string): Promise<boolean> {
try {
const sessionId = extractSessionId(topicId)
if (!window.electron?.ipcRenderer) {
return false
}
return sessionId != null
} catch (error) {
return false
}
}
async ensureTopic(topicId: string): Promise<void> {
// Agent sessions are created externally, not by the chat interface
// This is a no-op for agent sessions
const sessionId = extractSessionId(topicId)
logger.info(`ensureTopic called for agent session ${sessionId}, no action needed`)
}
async getRawTopic(topicId: string): Promise<{ id: string; messages: Message[] } | undefined> {
try {
// For agent sessions, fetch messages from backend and return in raw topic format
const { messages } = await this.fetchMessages(topicId)
return {
id: topicId,
messages
}
} catch (error) {
logger.error(`Failed to get raw topic for agent session ${topicId}:`, error as Error)
return undefined
}
}
// ============ Additional Methods for Interface Compatibility ============
async updateSingleBlock(blockId: string, _updates: Partial<MessageBlock>): Promise<void> {
// Agent session blocks are immutable once persisted
logger.warn(`updateSingleBlock called for agent session block ${blockId}, operation not supported`)
}
async bulkAddBlocks(_blocks: MessageBlock[]): Promise<void> {
// Agent session blocks are added through persistExchange
logger.warn(`bulkAddBlocks called for agent session, operation not supported individually`)
}
async updateFileCount(fileId: string, _delta: number, _deleteIfZero?: boolean): Promise<void> {
// Agent sessions don't manage file reference counts locally
logger.warn(`updateFileCount called for agent session file ${fileId}, operation not supported`)
}
async updateFileCounts(_files: Array<{ id: string; delta: number; deleteIfZero?: boolean }>): Promise<void> {
// Agent sessions don't manage file reference counts locally
logger.warn(`updateFileCounts called for agent session, operation not supported`)
}
}

View File

@ -0,0 +1,196 @@
import { loggerService } from '@logger'
import type { Message, MessageBlock } from '@renderer/types/newMessage'
import { AgentMessageDataSource } from './AgentMessageDataSource'
import { DexieMessageDataSource } from './DexieMessageDataSource'
import type { MessageDataSource } from './types'
import { isAgentSessionTopicId } from './types'
const logger = loggerService.withContext('DbService')
/**
* Facade service that routes data operations to the appropriate data source
* based on the topic ID type (regular chat or agent session)
*/
class DbService implements MessageDataSource {
private static instance: DbService
private dexieSource: DexieMessageDataSource
private agentSource: AgentMessageDataSource
private constructor() {
this.dexieSource = new DexieMessageDataSource()
this.agentSource = new AgentMessageDataSource()
}
/**
* Get singleton instance
*/
static getInstance(): DbService {
if (!DbService.instance) {
DbService.instance = new DbService()
}
return DbService.instance
}
/**
* Determine which data source to use based on topic ID
*/
private getDataSource(topicId: string): MessageDataSource {
if (isAgentSessionTopicId(topicId)) {
logger.silly(`Using AgentMessageDataSource for topic ${topicId}`)
return this.agentSource
}
// Future: Could add more data source types here
// e.g., if (isCloudTopicId(topicId)) return this.cloudSource
logger.silly(`Using DexieMessageDataSource for topic ${topicId}`)
return this.dexieSource
}
// ============ Read Operations ============
async fetchMessages(
topicId: string,
forceReload?: boolean
): Promise<{
messages: Message[]
blocks: MessageBlock[]
}> {
const source = this.getDataSource(topicId)
return source.fetchMessages(topicId, forceReload)
}
// ============ Write Operations ============
async appendMessage(topicId: string, message: Message, blocks: MessageBlock[], insertIndex?: number): Promise<void> {
const source = this.getDataSource(topicId)
return source.appendMessage(topicId, message, blocks, insertIndex)
}
async updateMessage(topicId: string, messageId: string, updates: Partial<Message>): Promise<void> {
const source = this.getDataSource(topicId)
return source.updateMessage(topicId, messageId, updates)
}
async updateMessageAndBlocks(
topicId: string,
messageUpdates: Partial<Message> & Pick<Message, 'id'>,
blocksToUpdate: MessageBlock[]
): Promise<void> {
const source = this.getDataSource(topicId)
return source.updateMessageAndBlocks(topicId, messageUpdates, blocksToUpdate)
}
async deleteMessage(topicId: string, messageId: string): Promise<void> {
const source = this.getDataSource(topicId)
return source.deleteMessage(topicId, messageId)
}
async deleteMessages(topicId: string, messageIds: string[]): Promise<void> {
const source = this.getDataSource(topicId)
return source.deleteMessages(topicId, messageIds)
}
// ============ Block Operations ============
async updateBlocks(blocks: MessageBlock[]): Promise<void> {
// For block operations, we need to infer the source from the first block's message
// This is a limitation of the current design where blocks don't have topicId
// In practice, blocks are usually updated in context of a topic operation
// Default to Dexie for now since agent blocks are updated through persistExchange
return this.dexieSource.updateBlocks(blocks)
}
async deleteBlocks(blockIds: string[]): Promise<void> {
// Similar limitation as updateBlocks
// Default to Dexie since agent blocks can't be deleted individually
return this.dexieSource.deleteBlocks(blockIds)
}
// ============ Batch Operations ============
async clearMessages(topicId: string): Promise<void> {
const source = this.getDataSource(topicId)
return source.clearMessages(topicId)
}
async topicExists(topicId: string): Promise<boolean> {
const source = this.getDataSource(topicId)
return source.topicExists(topicId)
}
async ensureTopic(topicId: string): Promise<void> {
const source = this.getDataSource(topicId)
return source.ensureTopic(topicId)
}
// ============ Optional Methods (with fallback) ============
async getRawTopic(topicId: string): Promise<{ id: string; messages: Message[] } | undefined> {
const source = this.getDataSource(topicId)
return source.getRawTopic(topicId)
}
async updateSingleBlock(blockId: string, updates: Partial<MessageBlock>): Promise<void> {
// For single block operations, default to Dexie since agent blocks are immutable
if (this.dexieSource.updateSingleBlock) {
return this.dexieSource.updateSingleBlock(blockId, updates)
}
// Fallback to updateBlocks with single item
return this.dexieSource.updateBlocks([{ ...updates, id: blockId } as MessageBlock])
}
async bulkAddBlocks(blocks: MessageBlock[]): Promise<void> {
// For bulk add operations, default to Dexie since agent blocks use persistExchange
if (this.dexieSource.bulkAddBlocks) {
return this.dexieSource.bulkAddBlocks(blocks)
}
// Fallback to updateBlocks
return this.dexieSource.updateBlocks(blocks)
}
async updateFileCount(fileId: string, delta: number, deleteIfZero: boolean = false): Promise<void> {
// File operations only apply to Dexie source
if (this.dexieSource.updateFileCount) {
return this.dexieSource.updateFileCount(fileId, delta, deleteIfZero)
}
// No-op if not supported
logger.warn(`updateFileCount not supported for file ${fileId}`)
}
async updateFileCounts(files: Array<{ id: string; delta: number; deleteIfZero?: boolean }>): Promise<void> {
// File operations only apply to Dexie source
if (this.dexieSource.updateFileCounts) {
return this.dexieSource.updateFileCounts(files)
}
// No-op if not supported
logger.warn(`updateFileCounts not supported`)
}
// ============ Utility Methods ============
/**
* Check if a topic is an agent session
*/
isAgentSession(topicId: string): boolean {
return isAgentSessionTopicId(topicId)
}
/**
* Get the data source type for a topic
*/
getSourceType(topicId: string): 'dexie' | 'agent' | 'unknown' {
if (isAgentSessionTopicId(topicId)) {
return 'agent'
}
// Add more checks for other source types as needed
return 'dexie'
}
}
// Export singleton instance
export const dbService = DbService.getInstance()
// Also export class for testing purposes
export { DbService }

View File

@ -0,0 +1,406 @@
import { loggerService } from '@logger'
import db from '@renderer/databases'
import FileManager from '@renderer/services/FileManager'
import store from '@renderer/store'
import { updateTopicUpdatedAt } from '@renderer/store/assistants'
import type { Message, MessageBlock } from '@renderer/types/newMessage'
import { isEmpty } from 'lodash'
import type { MessageDataSource } from './types'
const logger = loggerService.withContext('DexieMessageDataSource')
/**
* Dexie-based implementation of MessageDataSource
* Handles local IndexedDB storage for regular chat messages
*/
export class DexieMessageDataSource implements MessageDataSource {
// ============ Read Operations ============
async fetchMessages(topicId: string): Promise<{
messages: Message[]
blocks: MessageBlock[]
}> {
try {
const topic = await db.topics.get(topicId)
if (!topic) {
await db.topics.add({ id: topicId, messages: [] })
}
const messages = topic?.messages || []
if (messages.length === 0) {
return { messages: [], blocks: [] }
}
const messageIds = messages.map((m) => m.id)
const blocks = await db.message_blocks.where('messageId').anyOf(messageIds).toArray()
// Ensure block IDs are strings for consistency
const messagesWithBlockIds = messages.map((m) => ({
...m,
blocks: m.blocks?.map(String) || []
}))
return { messages: messagesWithBlockIds, blocks: blocks || [] }
} catch (error) {
logger.error(`Failed to fetch messages for topic ${topicId}:`, error as Error)
throw error
}
}
async getRawTopic(topicId: string): Promise<{ id: string; messages: Message[] } | undefined> {
try {
return await db.topics.get(topicId)
} catch (error) {
logger.error(`Failed to get raw topic ${topicId}:`, error as Error)
throw error
}
}
// ============ Write Operations ============
async appendMessage(topicId: string, message: Message, blocks: MessageBlock[], insertIndex?: number): Promise<void> {
try {
await db.transaction('rw', db.topics, db.message_blocks, async () => {
// Save blocks first
if (blocks.length > 0) {
await db.message_blocks.bulkPut(blocks)
}
// Get or create topic
let topic = await db.topics.get(topicId)
if (!topic) {
await db.topics.add({ id: topicId, messages: [] })
topic = await db.topics.get(topicId)
}
if (!topic) {
throw new Error(`Failed to create topic ${topicId}`)
}
const updatedMessages = [...(topic.messages || [])]
// Check if message already exists
const existingIndex = updatedMessages.findIndex((m) => m.id === message.id)
if (existingIndex !== -1) {
updatedMessages[existingIndex] = message
} else {
// Insert at specific index or append
if (insertIndex !== undefined && insertIndex >= 0 && insertIndex <= updatedMessages.length) {
updatedMessages.splice(insertIndex, 0, message)
} else {
updatedMessages.push(message)
}
}
await db.topics.update(topicId, { messages: updatedMessages })
})
store.dispatch(updateTopicUpdatedAt({ topicId }))
} catch (error) {
logger.error(`Failed to append message to topic ${topicId}:`, error as Error)
throw error
}
}
async updateMessage(topicId: string, messageId: string, updates: Partial<Message>): Promise<void> {
try {
await db.transaction('rw', db.topics, async () => {
await db.topics
.where('id')
.equals(topicId)
.modify((topic) => {
if (!topic || !topic.messages) return
const messageIndex = topic.messages.findIndex((m) => m.id === messageId)
if (messageIndex !== -1) {
Object.assign(topic.messages[messageIndex], updates)
}
})
})
store.dispatch(updateTopicUpdatedAt({ topicId }))
} catch (error) {
logger.error(`Failed to update message ${messageId} in topic ${topicId}:`, error as Error)
throw error
}
}
async updateMessageAndBlocks(
topicId: string,
messageUpdates: Partial<Message> & Pick<Message, 'id'>,
blocksToUpdate: MessageBlock[]
): Promise<void> {
try {
await db.transaction('rw', db.topics, db.message_blocks, async () => {
// Update blocks
if (blocksToUpdate.length > 0) {
await db.message_blocks.bulkPut(blocksToUpdate)
}
// Update message if there are actual changes beyond id and topicId
const keysToUpdate = Object.keys(messageUpdates).filter((key) => key !== 'id' && key !== 'topicId')
if (keysToUpdate.length > 0) {
await db.topics
.where('id')
.equals(topicId)
.modify((topic) => {
if (!topic || !topic.messages) return
const messageIndex = topic.messages.findIndex((m) => m.id === messageUpdates.id)
if (messageIndex !== -1) {
keysToUpdate.forEach((key) => {
;(topic.messages[messageIndex] as any)[key] = (messageUpdates as any)[key]
})
}
})
}
})
store.dispatch(updateTopicUpdatedAt({ topicId }))
} catch (error) {
logger.error(`Failed to update message and blocks for ${messageUpdates.id}:`, error as Error)
throw error
}
}
async deleteMessage(topicId: string, messageId: string): Promise<void> {
try {
await db.transaction('rw', db.topics, db.message_blocks, db.files, async () => {
const topic = await db.topics.get(topicId)
if (!topic) return
const messageIndex = topic.messages.findIndex((m) => m.id === messageId)
if (messageIndex === -1) return
const message = topic.messages[messageIndex]
const blockIds = message.blocks || []
// Delete blocks and handle files
if (blockIds.length > 0) {
const blocks = await db.message_blocks.where('id').anyOf(blockIds).toArray()
const files = blocks
.filter((block) => block.type === 'file' || block.type === 'image')
.map((block: any) => block.file)
.filter((file) => file !== undefined)
// Clean up files
if (!isEmpty(files)) {
await Promise.all(files.map((file) => FileManager.deleteFile(file.id, false)))
}
await db.message_blocks.bulkDelete(blockIds)
}
// Remove message from topic
topic.messages.splice(messageIndex, 1)
await db.topics.update(topicId, { messages: topic.messages })
})
store.dispatch(updateTopicUpdatedAt({ topicId }))
} catch (error) {
logger.error(`Failed to delete message ${messageId} from topic ${topicId}:`, error as Error)
throw error
}
}
async deleteMessages(topicId: string, messageIds: string[]): Promise<void> {
try {
await db.transaction('rw', db.topics, db.message_blocks, db.files, async () => {
const topic = await db.topics.get(topicId)
if (!topic) return
// Collect all block IDs from messages to be deleted
const allBlockIds: string[] = []
const messagesToDelete: Message[] = []
for (const messageId of messageIds) {
const message = topic.messages.find((m) => m.id === messageId)
if (message) {
messagesToDelete.push(message)
if (message.blocks && message.blocks.length > 0) {
allBlockIds.push(...message.blocks)
}
}
}
// Delete blocks and handle files
if (allBlockIds.length > 0) {
const blocks = await db.message_blocks.where('id').anyOf(allBlockIds).toArray()
const files = blocks
.filter((block) => block.type === 'file' || block.type === 'image')
.map((block: any) => block.file)
.filter((file) => file !== undefined)
// Clean up files
if (!isEmpty(files)) {
await Promise.all(files.map((file) => FileManager.deleteFile(file.id, false)))
}
await db.message_blocks.bulkDelete(allBlockIds)
}
// Remove messages from topic
const remainingMessages = topic.messages.filter((m) => !messageIds.includes(m.id))
await db.topics.update(topicId, { messages: remainingMessages })
})
store.dispatch(updateTopicUpdatedAt({ topicId }))
} catch (error) {
logger.error(`Failed to delete messages from topic ${topicId}:`, error as Error)
throw error
}
}
// ============ Block Operations ============
async updateBlocks(blocks: MessageBlock[]): Promise<void> {
try {
if (blocks.length === 0) return
await db.message_blocks.bulkPut(blocks)
} catch (error) {
logger.error('Failed to update blocks:', error as Error)
throw error
}
}
async updateSingleBlock(blockId: string, updates: Partial<MessageBlock>): Promise<void> {
try {
await db.message_blocks.update(blockId, updates)
} catch (error) {
logger.error(`Failed to update block ${blockId}:`, error as Error)
throw error
}
}
async bulkAddBlocks(blocks: MessageBlock[]): Promise<void> {
try {
if (blocks.length === 0) return
await db.message_blocks.bulkAdd(blocks)
} catch (error) {
logger.error('Failed to bulk add blocks:', error as Error)
throw error
}
}
async deleteBlocks(blockIds: string[]): Promise<void> {
try {
if (blockIds.length === 0) return
// Get blocks to find associated files
const blocks = await db.message_blocks.where('id').anyOf(blockIds).toArray()
const files = blocks
.filter((block) => block.type === 'file' || block.type === 'image')
.map((block: any) => block.file)
.filter((file) => file !== undefined)
// Clean up files
if (!isEmpty(files)) {
await Promise.all(files.map((file) => FileManager.deleteFile(file.id, false)))
}
await db.message_blocks.bulkDelete(blockIds)
} catch (error) {
logger.error('Failed to delete blocks:', error as Error)
throw error
}
}
// ============ Batch Operations ============
async clearMessages(topicId: string): Promise<void> {
try {
await db.transaction('rw', db.topics, db.message_blocks, db.files, async () => {
const topic = await db.topics.get(topicId)
if (!topic) return
// Get all block IDs
const blockIds = topic.messages.flatMap((m) => m.blocks || [])
// Delete blocks and handle files
if (blockIds.length > 0) {
const blocks = await db.message_blocks.where('id').anyOf(blockIds).toArray()
const files = blocks
.filter((block) => block.type === 'file' || block.type === 'image')
.map((block: any) => block.file)
.filter((file) => file !== undefined)
if (!isEmpty(files)) {
await Promise.all(files.map((file) => FileManager.deleteFile(file.id, false)))
}
await db.message_blocks.bulkDelete(blockIds)
}
// Clear messages
await db.topics.update(topicId, { messages: [] })
})
store.dispatch(updateTopicUpdatedAt({ topicId }))
} catch (error) {
logger.error(`Failed to clear messages for topic ${topicId}:`, error as Error)
throw error
}
}
async topicExists(topicId: string): Promise<boolean> {
try {
const topic = await db.topics.get(topicId)
return !!topic
} catch (error) {
logger.error(`Failed to check if topic ${topicId} exists:`, error as Error)
return false
}
}
async ensureTopic(topicId: string): Promise<void> {
try {
const exists = await this.topicExists(topicId)
if (!exists) {
await db.topics.add({ id: topicId, messages: [] })
}
} catch (error) {
logger.error(`Failed to ensure topic ${topicId} exists:`, error as Error)
throw error
}
}
// ============ File Operations ============
async updateFileCount(fileId: string, delta: number, deleteIfZero: boolean = false): Promise<void> {
try {
await db.transaction('rw', db.files, async () => {
const file = await db.files.get(fileId)
if (!file) {
logger.warn(`File ${fileId} not found for count update`)
return
}
const newCount = (file.count || 0) + delta
if (newCount <= 0 && deleteIfZero) {
// Delete the file when count reaches 0 or below
await FileManager.deleteFile(fileId, false)
await db.files.delete(fileId)
logger.info(`Deleted file ${fileId} as reference count reached ${newCount}`)
} else {
// Update the count
await db.files.update(fileId, { count: Math.max(0, newCount) })
logger.debug(`Updated file ${fileId} count to ${Math.max(0, newCount)}`)
}
})
} catch (error) {
logger.error(`Failed to update file count for ${fileId}:`, error as Error)
throw error
}
}
async updateFileCounts(files: Array<{ id: string; delta: number; deleteIfZero?: boolean }>): Promise<void> {
try {
for (const file of files) {
await this.updateFileCount(file.id, file.delta, file.deleteIfZero || false)
}
} catch (error) {
logger.error('Failed to update file counts:', error as Error)
throw error
}
}
}

View File

@ -0,0 +1,89 @@
# Unified Data Access Layer
This module provides a unified interface for accessing message data from different sources:
- **DexieMessageDataSource**: Local IndexedDB storage for regular chat messages
- **AgentMessageDataSource**: Backend IPC storage for agent session messages
## Architecture
```
dbService (Facade)
├── Determines data source based on topicId
├── Routes to DexieMessageDataSource (regular chats)
└── Routes to AgentMessageDataSource (agent sessions)
```
## Usage
```typescript
import { dbService } from '@renderer/services/db'
// Fetch messages (automatically routes to correct source)
const { messages, blocks } = await dbService.fetchMessages(topicId)
// Save a message exchange
await dbService.persistExchange(topicId, {
user: { message: userMsg, blocks: userBlocks },
assistant: { message: assistantMsg, blocks: assistantBlocks }
})
// Append a single message
await dbService.appendMessage(topicId, message, blocks)
// Check if topic exists
const exists = await dbService.topicExists(topicId)
```
## Topic ID Convention
- Regular chat topics: Any string ID (e.g., "uuid-1234-5678")
- Agent session topics: Prefixed with "agent-session:" (e.g., "agent-session:session-123")
## Key Features
1. **Transparent Routing**: The facade automatically routes to the appropriate data source
2. **Consistent API**: Same methods work for both regular chats and agent sessions
3. **Type Safety**: Full TypeScript support with proper interfaces
4. **Error Handling**: Comprehensive error logging and propagation
5. **Extensibility**: Easy to add new data sources (e.g., cloud storage)
## Implementation Status
### DexieMessageDataSource ✅
- Full CRUD operations for messages and blocks
- Transaction support
- File cleanup on deletion
- Redux state updates
### AgentMessageDataSource ✅
- Fetch messages from backend
- Persist message exchanges
- Limited update/delete operations (by design)
- IPC communication with backend
## Migration Guide
### Before (Direct DB access):
```typescript
// In thunks
if (isAgentSessionTopicId(topicId)) {
// Special handling for agent sessions
const messages = await window.electron.ipcRenderer.invoke(...)
} else {
// Regular DB access
const topic = await db.topics.get(topicId)
}
```
### After (Unified access):
```typescript
// In thunks
const { messages, blocks } = await dbService.fetchMessages(topicId)
// No need to check topic type!
```
## Next Steps
Phase 2: Update Redux thunks to use dbService
Phase 3: Update components to use unified hooks
Phase 4: Remove AgentSessionMessages component

View File

@ -0,0 +1,19 @@
/**
* Unified data access layer for messages
* Provides a consistent API for accessing messages from different sources
* (Dexie/IndexedDB for regular chats, IPC/Backend for agent sessions)
*/
// Export main service
export { DbService,dbService } from './DbService'
// Export types
export type { MessageDataSource, MessageExchange } from './types'
export {
buildAgentSessionTopicId,
extractSessionId,
isAgentSessionTopicId} from './types'
// Export implementations (for testing or direct access if needed)
export { AgentMessageDataSource } from './AgentMessageDataSource'
export { DexieMessageDataSource } from './DexieMessageDataSource'

View File

@ -0,0 +1,143 @@
import type { Message, MessageBlock } from '@renderer/types/newMessage'
/**
* Message exchange data structure for persisting user-assistant conversations
*/
export interface MessageExchange {
user?: {
message: Message
blocks: MessageBlock[]
}
assistant?: {
message: Message
blocks: MessageBlock[]
}
// For agent sessions
agentSessionId?: string
}
/**
* Unified interface for message data operations
* Implementations can be backed by Dexie, IPC, or other storage mechanisms
*/
export interface MessageDataSource {
// ============ Read Operations ============
/**
* Fetch all messages and blocks for a topic
*/
fetchMessages(
topicId: string,
forceReload?: boolean
): Promise<{
messages: Message[]
blocks: MessageBlock[]
}>
/**
* Get raw topic data (just id and messages)
*/
getRawTopic(topicId: string): Promise<{ id: string; messages: Message[] } | undefined>
// ============ Write Operations ============
/**
* Append a single message with its blocks
*/
appendMessage(topicId: string, message: Message, blocks: MessageBlock[], insertIndex?: number): Promise<void>
/**
* Update an existing message
*/
updateMessage(topicId: string, messageId: string, updates: Partial<Message>): Promise<void>
/**
* Update existing message and its blocks
*/
updateMessageAndBlocks(
topicId: string,
messageUpdates: Partial<Message> & Pick<Message, 'id'>,
blocksToUpdate: MessageBlock[]
): Promise<void>
/**
* Delete a single message and its blocks
*/
deleteMessage(topicId: string, messageId: string): Promise<void>
/**
* Delete multiple messages and their blocks
*/
deleteMessages(topicId: string, messageIds: string[]): Promise<void>
// ============ Block Operations ============
/**
* Update multiple blocks
*/
updateBlocks(blocks: MessageBlock[]): Promise<void>
/**
* Update single block
*/
updateSingleBlock?(blockId: string, updates: Partial<MessageBlock>): Promise<void>
/**
* Bulk add blocks (for cloning operations)
*/
bulkAddBlocks?(blocks: MessageBlock[]): Promise<void>
/**
* Delete multiple blocks
*/
deleteBlocks(blockIds: string[]): Promise<void>
// ============ Batch Operations ============
/**
* Clear all messages in a topic
*/
clearMessages(topicId: string): Promise<void>
/**
* Check if topic exists
*/
topicExists(topicId: string): Promise<boolean>
/**
* Create or ensure topic exists
*/
ensureTopic(topicId: string): Promise<void>
// ============ File Operations (Optional) ============
/**
* Update file reference count
* @param fileId - The file ID to update
* @param delta - The change in reference count (positive or negative)
* @param deleteIfZero - Whether to delete the file when count reaches 0
*/
updateFileCount?(fileId: string, delta: number, deleteIfZero?: boolean): Promise<void>
/**
* Update multiple file reference counts
*/
updateFileCounts?(files: Array<{ id: string; delta: number; deleteIfZero?: boolean }>): Promise<void>
}
/**
* Type guard to check if a topic ID is for an agent session
*/
export function isAgentSessionTopicId(topicId: string): boolean {
return topicId.startsWith('agent-session:')
}
/**
* Extract session ID from agent session topic ID
*/
export function extractSessionId(topicId: string): string {
return topicId.replace('agent-session:', '')
}
/**
* Build agent session topic ID from session ID
*/
export function buildAgentSessionTopicId(sessionId: string): string {
return `agent-session:${sessionId}`
}

View File

@ -1,5 +1,6 @@
import { loggerService } from '@logger' import { loggerService } from '@logger'
import { AiSdkToChunkAdapter } from '@renderer/aiCore/chunk/AiSdkToChunkAdapter' import { AiSdkToChunkAdapter } from '@renderer/aiCore/chunk/AiSdkToChunkAdapter'
import { featureFlags } from '@renderer/config/featureFlags'
import db from '@renderer/databases' import db from '@renderer/databases'
import FileManager from '@renderer/services/FileManager' import FileManager from '@renderer/services/FileManager'
import { BlockManager } from '@renderer/services/messageStreaming/BlockManager' import { BlockManager } from '@renderer/services/messageStreaming/BlockManager'
@ -10,13 +11,12 @@ import { createStreamProcessor, type StreamProcessorCallbacks } from '@renderer/
import store from '@renderer/store' import store from '@renderer/store'
import { updateTopicUpdatedAt } from '@renderer/store/assistants' import { updateTopicUpdatedAt } from '@renderer/store/assistants'
import { type ApiServerConfig, type Assistant, type FileMetadata, type Model, type Topic } from '@renderer/types' import { type ApiServerConfig, type Assistant, type FileMetadata, type Model, type Topic } from '@renderer/types'
import type { AgentPersistedMessage } from '@renderer/types/agent'
import { ChunkType } from '@renderer/types/chunk' import { ChunkType } from '@renderer/types/chunk'
import type { FileMessageBlock, ImageMessageBlock, Message, MessageBlock } from '@renderer/types/newMessage' import type { FileMessageBlock, ImageMessageBlock, Message, MessageBlock } from '@renderer/types/newMessage'
import { AssistantMessageStatus, MessageBlockStatus, MessageBlockType } from '@renderer/types/newMessage' import { AssistantMessageStatus, MessageBlockStatus, MessageBlockType } from '@renderer/types/newMessage'
import { uuid } from '@renderer/utils' import { uuid } from '@renderer/utils'
import { addAbortController } from '@renderer/utils/abortController' import { addAbortController } from '@renderer/utils/abortController'
import { isAgentSessionTopicId } from '@renderer/utils/agentSession' import { buildAgentSessionTopicId, isAgentSessionTopicId } from '@renderer/utils/agentSession'
import { import {
createAssistantMessage, createAssistantMessage,
createTranslationBlock, createTranslationBlock,
@ -34,6 +34,18 @@ import { LRUCache } from 'lru-cache'
import type { AppDispatch, RootState } from '../index' import type { AppDispatch, RootState } from '../index'
import { removeManyBlocks, updateOneBlock, upsertManyBlocks, upsertOneBlock } from '../messageBlock' import { removeManyBlocks, updateOneBlock, upsertManyBlocks, upsertOneBlock } from '../messageBlock'
import { newMessagesActions, selectMessagesForTopic } from '../newMessage' import { newMessagesActions, selectMessagesForTopic } from '../newMessage'
import {
bulkAddBlocksV2,
clearMessagesFromDBV2,
deleteMessageFromDBV2,
deleteMessagesFromDBV2,
loadTopicMessagesThunkV2,
saveMessageAndBlocksToDBV2,
updateBlocksV2,
updateFileCountV2,
updateMessageV2,
updateSingleBlockV2
} from './messageThunk.v2'
const logger = loggerService.withContext('MessageThunk') const logger = loggerService.withContext('MessageThunk')
@ -190,12 +202,23 @@ const createAgentMessageStream = async (
} }
// TODO: 后续可以将db操作移到Listener Middleware中 // TODO: 后续可以将db操作移到Listener Middleware中
export const saveMessageAndBlocksToDB = async (message: Message, blocks: MessageBlock[], messageIndex: number = -1) => { export const saveMessageAndBlocksToDB = async (message: Message, blocks: MessageBlock[], messageIndex: number = -1) => {
// Use V2 implementation if feature flag is enabled
if (featureFlags.USE_UNIFIED_DB_SERVICE) {
return saveMessageAndBlocksToDBV2(message.topicId, message, blocks, messageIndex)
}
// Original implementation
try { try {
if (isAgentSessionTopicId(message.topicId)) { if (isAgentSessionTopicId(message.topicId)) {
return return
} }
if (blocks.length > 0) { if (blocks.length > 0) {
await db.message_blocks.bulkPut(blocks) // Use V2 implementation if feature flag is enabled
if (featureFlags.USE_UNIFIED_DB_SERVICE) {
await updateBlocksV2(blocks)
} else {
await db.message_blocks.bulkPut(blocks)
}
} }
const topic = await db.topics.get(message.topicId) const topic = await db.topics.get(message.topicId)
if (topic) { if (topic) {
@ -232,7 +255,12 @@ const updateExistingMessageAndBlocksInDB = async (
await db.transaction('rw', db.topics, db.message_blocks, async () => { await db.transaction('rw', db.topics, db.message_blocks, async () => {
// Always update blocks if provided // Always update blocks if provided
if (updatedBlocks.length > 0) { if (updatedBlocks.length > 0) {
await db.message_blocks.bulkPut(updatedBlocks) // Use V2 implementation if feature flag is enabled
if (featureFlags.USE_UNIFIED_DB_SERVICE) {
await updateBlocksV2(updatedBlocks)
} else {
await db.message_blocks.bulkPut(updatedBlocks)
}
} }
// Check if there are message properties to update beyond id and topicId // Check if there are message properties to update beyond id and topicId
@ -301,7 +329,12 @@ const getBlockThrottler = (id: string) => {
}) })
blockUpdateRafs.set(id, rafId) blockUpdateRafs.set(id, rafId)
await db.message_blocks.update(id, blockUpdate) // Use V2 implementation if feature flag is enabled
if (featureFlags.USE_UNIFIED_DB_SERVICE) {
await updateSingleBlockV2(id, blockUpdate)
} else {
await db.message_blocks.update(id, blockUpdate)
}
}, 150) }, 150)
blockUpdateThrottlers.set(id, throttler) blockUpdateThrottlers.set(id, throttler)
@ -470,13 +503,17 @@ const fetchAndProcessAgentResponseImpl = async (
text: Promise.resolve('') text: Promise.resolve('')
}) })
await persistAgentExchange({ // No longer need persistAgentExchange here since:
getState, // 1. User message is already saved via appendMessage when created
agentSession, // 2. Assistant message is saved via appendMessage when created
userMessageId, // 3. Updates during streaming are saved via updateMessageAndBlocks
assistantMessageId: assistantMessage.id, // This eliminates the duplicate save issue
latestAgentSessionId
}) // Only persist the agentSessionId update if it changed
if (latestAgentSessionId) {
logger.info(`Agent session ID updated to: ${latestAgentSessionId}`)
// In the future, you might want to update some session metadata here
}
} catch (error: any) { } catch (error: any) {
logger.error('Error in fetchAndProcessAgentResponseImpl:', error) logger.error('Error in fetchAndProcessAgentResponseImpl:', error)
try { try {
@ -489,73 +526,9 @@ const fetchAndProcessAgentResponseImpl = async (
} }
} }
interface PersistAgentExchangeParams { // Removed persistAgentExchange and createPersistedMessagePayload functions
getState: () => RootState // These are no longer needed since messages are saved immediately via appendMessage
agentSession: AgentSessionContext // and updated during streaming via updateMessageAndBlocks
userMessageId: string
assistantMessageId: string
latestAgentSessionId: string
}
const persistAgentExchange = async ({
getState,
agentSession,
userMessageId,
assistantMessageId,
latestAgentSessionId
}: PersistAgentExchangeParams) => {
if (!window.electron?.ipcRenderer) {
return
}
try {
const state = getState()
const userMessage = state.messages.entities[userMessageId]
const assistantMessage = state.messages.entities[assistantMessageId]
if (!userMessage || !assistantMessage) {
logger.warn('persistAgentExchange: missing user or assistant message entity')
return
}
const userPersistedPayload = createPersistedMessagePayload(userMessage, state)
const assistantPersistedPayload = createPersistedMessagePayload(assistantMessage, state)
await window.electron.ipcRenderer.invoke(IpcChannel.AgentMessage_PersistExchange, {
sessionId: agentSession.sessionId,
agentSessionId: latestAgentSessionId || '',
user: userPersistedPayload ? { payload: userPersistedPayload } : undefined,
assistant: assistantPersistedPayload ? { payload: assistantPersistedPayload } : undefined
})
} catch (error) {
logger.warn('Failed to persist agent exchange', error as Error)
}
}
const createPersistedMessagePayload = (
message: Message | undefined,
state: RootState
): AgentPersistedMessage | undefined => {
if (!message) {
return undefined
}
try {
const clonedMessage = JSON.parse(JSON.stringify(message)) as Message
const blockEntities = (message.blocks || [])
.map((blockId) => state.messageBlocks.entities[blockId])
.filter((block): block is MessageBlock => Boolean(block))
.map((block) => JSON.parse(JSON.stringify(block)) as MessageBlock)
return {
message: clonedMessage,
blocks: blockEntities
}
} catch (error) {
logger.warn('Failed to build persisted payload for message', error as Error)
return undefined
}
}
// --- Helper Function for Multi-Model Dispatch --- // --- Helper Function for Multi-Model Dispatch ---
// 多模型创建和发送请求的逻辑,用于用户消息多模型发送和重发 // 多模型创建和发送请求的逻辑,用于用户消息多模型发送和重发
@ -782,6 +755,52 @@ export const sendMessage =
} }
} }
/**
* Loads agent session messages from backend
*/
export const loadAgentSessionMessagesThunk =
(sessionId: string) => async (dispatch: AppDispatch, getState: () => RootState) => {
const topicId = buildAgentSessionTopicId(sessionId)
try {
dispatch(newMessagesActions.setTopicLoading({ topicId, loading: true }))
// Fetch from agent backend
const historicalMessages = await window.electron?.ipcRenderer.invoke(IpcChannel.AgentMessage_GetHistory, {
sessionId
})
if (historicalMessages && Array.isArray(historicalMessages)) {
const messages: Message[] = []
const blocks: MessageBlock[] = []
for (const persistedMsg of historicalMessages) {
if (persistedMsg?.message) {
messages.push(persistedMsg.message)
if (persistedMsg.blocks && persistedMsg.blocks.length > 0) {
blocks.push(...persistedMsg.blocks)
}
}
}
// Update Redux store
if (blocks.length > 0) {
dispatch(upsertManyBlocks(blocks))
}
dispatch(newMessagesActions.messagesReceived({ topicId, messages }))
logger.info(`Loaded ${messages.length} messages for agent session ${sessionId}`)
} else {
dispatch(newMessagesActions.messagesReceived({ topicId, messages: [] }))
}
} catch (error) {
logger.error(`Failed to load agent session messages for ${sessionId}:`, error as Error)
dispatch(newMessagesActions.messagesReceived({ topicId, messages: [] }))
} finally {
dispatch(newMessagesActions.setTopicLoading({ topicId, loading: false }))
}
}
/** /**
* Loads messages and their blocks for a specific topic from the database * Loads messages and their blocks for a specific topic from the database
* and updates the Redux store. * and updates the Redux store.
@ -789,10 +808,26 @@ export const sendMessage =
export const loadTopicMessagesThunk = export const loadTopicMessagesThunk =
(topicId: string, forceReload: boolean = false) => (topicId: string, forceReload: boolean = false) =>
async (dispatch: AppDispatch, getState: () => RootState) => { async (dispatch: AppDispatch, getState: () => RootState) => {
// Use V2 implementation if feature flag is enabled
if (featureFlags.USE_UNIFIED_DB_SERVICE) {
return loadTopicMessagesThunkV2(topicId, forceReload)(dispatch, getState)
}
// Original implementation
const state = getState() const state = getState()
const topicMessagesExist = !!state.messages.messageIdsByTopic[topicId] const topicMessagesExist = !!state.messages.messageIdsByTopic[topicId]
dispatch(newMessagesActions.setCurrentTopicId(topicId)) dispatch(newMessagesActions.setCurrentTopicId(topicId))
// Check if it's an agent session topic
if (isAgentSessionTopicId(topicId)) {
if (topicMessagesExist && !forceReload) {
return // Keep existing messages in memory
}
// Load from agent backend instead of local DB
const sessionId = topicId.replace('agent-session:', '')
return dispatch(loadAgentSessionMessagesThunk(sessionId))
}
if (topicMessagesExist && !forceReload) { if (topicMessagesExist && !forceReload) {
return return
} }
@ -843,12 +878,19 @@ export const deleteSingleMessageThunk =
try { try {
dispatch(newMessagesActions.removeMessage({ topicId, messageId })) dispatch(newMessagesActions.removeMessage({ topicId, messageId }))
cleanupMultipleBlocks(dispatch, blockIdsToDelete) cleanupMultipleBlocks(dispatch, blockIdsToDelete)
await db.message_blocks.bulkDelete(blockIdsToDelete)
const topic = await db.topics.get(topicId) // Use V2 implementation if feature flag is enabled
if (topic) { if (featureFlags.USE_UNIFIED_DB_SERVICE) {
const finalMessagesToSave = selectMessagesForTopic(getState(), topicId) await deleteMessageFromDBV2(topicId, messageId)
await db.topics.update(topicId, { messages: finalMessagesToSave }) } else {
dispatch(updateTopicUpdatedAt({ topicId })) // Original implementation
await db.message_blocks.bulkDelete(blockIdsToDelete)
const topic = await db.topics.get(topicId)
if (topic) {
const finalMessagesToSave = selectMessagesForTopic(getState(), topicId)
await db.topics.update(topicId, { messages: finalMessagesToSave })
dispatch(updateTopicUpdatedAt({ topicId }))
}
} }
} catch (error) { } catch (error) {
logger.error(`[deleteSingleMessage] Failed to delete message ${messageId}:`, error as Error) logger.error(`[deleteSingleMessage] Failed to delete message ${messageId}:`, error as Error)
@ -883,16 +925,24 @@ export const deleteMessageGroupThunk =
} }
const blockIdsToDelete = messagesToDelete.flatMap((m) => m.blocks || []) const blockIdsToDelete = messagesToDelete.flatMap((m) => m.blocks || [])
const messageIdsToDelete = messagesToDelete.map((m) => m.id)
try { try {
dispatch(newMessagesActions.removeMessagesByAskId({ topicId, askId })) dispatch(newMessagesActions.removeMessagesByAskId({ topicId, askId }))
cleanupMultipleBlocks(dispatch, blockIdsToDelete) cleanupMultipleBlocks(dispatch, blockIdsToDelete)
await db.message_blocks.bulkDelete(blockIdsToDelete)
const topic = await db.topics.get(topicId) // Use V2 implementation if feature flag is enabled
if (topic) { if (featureFlags.USE_UNIFIED_DB_SERVICE) {
const finalMessagesToSave = selectMessagesForTopic(getState(), topicId) await deleteMessagesFromDBV2(topicId, messageIdsToDelete)
await db.topics.update(topicId, { messages: finalMessagesToSave }) } else {
dispatch(updateTopicUpdatedAt({ topicId })) // Original implementation
await db.message_blocks.bulkDelete(blockIdsToDelete)
const topic = await db.topics.get(topicId)
if (topic) {
const finalMessagesToSave = selectMessagesForTopic(getState(), topicId)
await db.topics.update(topicId, { messages: finalMessagesToSave })
dispatch(updateTopicUpdatedAt({ topicId }))
}
} }
} catch (error) { } catch (error) {
logger.error(`[deleteMessageGroup] Failed to delete messages with askId ${askId}:`, error as Error) logger.error(`[deleteMessageGroup] Failed to delete messages with askId ${askId}:`, error as Error)
@ -919,10 +969,16 @@ export const clearTopicMessagesThunk =
dispatch(newMessagesActions.clearTopicMessages(topicId)) dispatch(newMessagesActions.clearTopicMessages(topicId))
cleanupMultipleBlocks(dispatch, blockIdsToDelete) cleanupMultipleBlocks(dispatch, blockIdsToDelete)
await db.topics.update(topicId, { messages: [] }) // Use V2 implementation if feature flag is enabled
dispatch(updateTopicUpdatedAt({ topicId })) if (featureFlags.USE_UNIFIED_DB_SERVICE) {
if (blockIdsToDelete.length > 0) { await clearMessagesFromDBV2(topicId)
await db.message_blocks.bulkDelete(blockIdsToDelete) } else {
// Original implementation
await db.topics.update(topicId, { messages: [] })
dispatch(updateTopicUpdatedAt({ topicId }))
if (blockIdsToDelete.length > 0) {
await db.message_blocks.bulkDelete(blockIdsToDelete)
}
} }
} catch (error) { } catch (error) {
logger.error(`[clearTopicMessagesThunk] Failed to clear messages for topic ${topicId}:`, error as Error) logger.error(`[clearTopicMessagesThunk] Failed to clear messages for topic ${topicId}:`, error as Error)
@ -1245,7 +1301,12 @@ export const updateTranslationBlockThunk =
dispatch(updateOneBlock({ id: blockId, changes })) dispatch(updateOneBlock({ id: blockId, changes }))
// 更新数据库 // 更新数据库
await db.message_blocks.update(blockId, changes) // Use V2 implementation if feature flag is enabled
if (featureFlags.USE_UNIFIED_DB_SERVICE) {
await updateSingleBlockV2(blockId, changes)
} else {
await db.message_blocks.update(blockId, changes)
}
// Logger.log(`[updateTranslationBlockThunk] Successfully updated translation block ${blockId}.`) // Logger.log(`[updateTranslationBlockThunk] Successfully updated translation block ${blockId}.`)
} catch (error) { } catch (error) {
logger.error(`[updateTranslationBlockThunk] Failed to update translation block ${blockId}:`, error as Error) logger.error(`[updateTranslationBlockThunk] Failed to update translation block ${blockId}:`, error as Error)
@ -1458,20 +1519,33 @@ export const cloneMessagesToNewTopicThunk =
// Add the NEW blocks // Add the NEW blocks
if (clonedBlocks.length > 0) { if (clonedBlocks.length > 0) {
await db.message_blocks.bulkAdd(clonedBlocks) // Use V2 implementation if feature flag is enabled
if (featureFlags.USE_UNIFIED_DB_SERVICE) {
await bulkAddBlocksV2(clonedBlocks)
} else {
await db.message_blocks.bulkAdd(clonedBlocks)
}
} }
// Update file counts // Update file counts
const uniqueFiles = [...new Map(filesToUpdateCount.map((f) => [f.id, f])).values()] const uniqueFiles = [...new Map(filesToUpdateCount.map((f) => [f.id, f])).values()]
for (const file of uniqueFiles) { if (featureFlags.USE_UNIFIED_DB_SERVICE) {
await db.files // Use V2 implementation for file count updates
.where('id') for (const file of uniqueFiles) {
.equals(file.id) await updateFileCountV2(file.id, 1, false)
.modify((f) => { }
if (f) { } else {
// Ensure file exists before modifying // Original implementation
f.count = (f.count || 0) + 1 for (const file of uniqueFiles) {
} await db.files
}) .where('id')
.equals(file.id)
.modify((f) => {
if (f) {
// Ensure file exists before modifying
f.count = (f.count || 0) + 1
}
})
}
} }
}) })
@ -1525,33 +1599,46 @@ export const updateMessageAndBlocksThunk =
} }
// 2. 更新数据库 (在事务中) // 2. 更新数据库 (在事务中)
await db.transaction('rw', db.topics, db.message_blocks, async () => { // Use V2 implementation if feature flag is enabled
// Only update topic.messages if there were actual message changes if (featureFlags.USE_UNIFIED_DB_SERVICE) {
if (messageUpdates && Object.keys(messageUpdates).length > 0) { // Update message properties if provided
const topic = await db.topics.get(topicId) if (messageUpdates && Object.keys(messageUpdates).length > 0 && messageId) {
if (topic && topic.messages) { await updateMessageV2(topicId, messageId, messageUpdates)
const messageIndex = topic.messages.findIndex((m) => m.id === messageId) }
if (messageIndex !== -1) { // Update blocks if provided
Object.assign(topic.messages[messageIndex], messageUpdates) if (blockUpdatesList.length > 0) {
await db.topics.update(topicId, { messages: topic.messages }) await updateBlocksV2(blockUpdatesList)
}
} else {
// Original implementation with transaction
await db.transaction('rw', db.topics, db.message_blocks, async () => {
// Only update topic.messages if there were actual message changes
if (messageUpdates && Object.keys(messageUpdates).length > 0) {
const topic = await db.topics.get(topicId)
if (topic && topic.messages) {
const messageIndex = topic.messages.findIndex((m) => m.id === messageId)
if (messageIndex !== -1) {
Object.assign(topic.messages[messageIndex], messageUpdates)
await db.topics.update(topicId, { messages: topic.messages })
} else {
logger.error(
`[updateMessageAndBlocksThunk] Message ${messageId} not found in DB topic ${topicId} for property update.`
)
throw new Error(`Message ${messageId} not found in DB topic ${topicId} for property update.`)
}
} else { } else {
logger.error( logger.error(
`[updateMessageAndBlocksThunk] Message ${messageId} not found in DB topic ${topicId} for property update.` `[updateMessageAndBlocksThunk] Topic ${topicId} not found or empty for message property update.`
) )
throw new Error(`Message ${messageId} not found in DB topic ${topicId} for property update.`) throw new Error(`Topic ${topicId} not found or empty for message property update.`)
} }
} else {
logger.error(
`[updateMessageAndBlocksThunk] Topic ${topicId} not found or empty for message property update.`
)
throw new Error(`Topic ${topicId} not found or empty for message property update.`)
} }
}
if (blockUpdatesList.length > 0) { if (blockUpdatesList.length > 0) {
await db.message_blocks.bulkPut(blockUpdatesList) await db.message_blocks.bulkPut(blockUpdatesList)
} }
}) })
}
dispatch(updateTopicUpdatedAt({ topicId })) dispatch(updateTopicUpdatedAt({ topicId }))
} catch (error) { } catch (error) {

View File

@ -0,0 +1,228 @@
/**
* V2 implementations of message thunk functions using the unified DbService
* These implementations will be gradually rolled out using feature flags
*/
import { loggerService } from '@logger'
import { dbService } from '@renderer/services/db'
import type { Message, MessageBlock } from '@renderer/types/newMessage'
import type { AppDispatch, RootState } from '../index'
import { upsertManyBlocks } from '../messageBlock'
import { newMessagesActions } from '../newMessage'
const logger = loggerService.withContext('MessageThunkV2')
// =================================================================
// Phase 2.1 - Batch 1: Read-only operations (lowest risk)
// =================================================================
/**
* Load messages for a topic using unified DbService
* This is the V2 implementation that will replace the original
*/
export const loadTopicMessagesThunkV2 =
(topicId: string, forceReload: boolean = false) =>
async (dispatch: AppDispatch, getState: () => RootState) => {
const state = getState()
// Skip if already cached and not forcing reload
if (!forceReload && state.messages.messageIdsByTopic[topicId]) {
logger.info('Messages already cached for topic', { topicId })
return
}
try {
dispatch(newMessagesActions.setTopicLoading({ topicId, loading: true }))
// Unified call - no need to check isAgentSessionTopicId
const { messages, blocks } = await dbService.fetchMessages(topicId)
logger.info('Loaded messages via DbService', {
topicId,
messageCount: messages.length,
blockCount: blocks.length
})
// Update Redux state with fetched data
if (blocks.length > 0) {
dispatch(upsertManyBlocks(blocks))
}
dispatch(newMessagesActions.messagesReceived({ topicId, messages }))
} catch (error) {
logger.error(`Failed to load messages for topic ${topicId}:`, error as Error)
// Could dispatch an error action here if needed
} finally {
dispatch(newMessagesActions.setTopicLoading({ topicId, loading: false }))
dispatch(newMessagesActions.setTopicFulfilled({ topicId, fulfilled: true }))
}
}
/**
* Get raw topic data using unified DbService
* Returns topic with messages array
*/
export const getRawTopicV2 = async (topicId: string): Promise<{ id: string; messages: Message[] } | undefined> => {
try {
const rawTopic = await dbService.getRawTopic(topicId)
logger.info('Retrieved raw topic via DbService', { topicId, found: !!rawTopic })
return rawTopic
} catch (error) {
logger.error('Failed to get raw topic:', { topicId, error })
return undefined
}
}
// =================================================================
// Phase 2.2 - Batch 2: Helper functions
// =================================================================
/**
* Update file reference count
* Only applies to Dexie data source, no-op for agent sessions
*/
export const updateFileCountV2 = async (
fileId: string,
delta: number,
deleteIfZero: boolean = false
): Promise<void> => {
try {
// Pass all parameters to dbService, including deleteIfZero
await dbService.updateFileCount(fileId, delta, deleteIfZero)
logger.info('Updated file count', { fileId, delta, deleteIfZero })
} catch (error) {
logger.error('Failed to update file count:', { fileId, delta, error })
throw error
}
}
// =================================================================
// Phase 2.3 - Batch 3: Delete operations
// =================================================================
/**
* Delete a single message from database
*/
export const deleteMessageFromDBV2 = async (topicId: string, messageId: string): Promise<void> => {
try {
await dbService.deleteMessage(topicId, messageId)
logger.info('Deleted message via DbService', { topicId, messageId })
} catch (error) {
logger.error('Failed to delete message:', { topicId, messageId, error })
throw error
}
}
/**
* Delete multiple messages from database
*/
export const deleteMessagesFromDBV2 = async (topicId: string, messageIds: string[]): Promise<void> => {
try {
await dbService.deleteMessages(topicId, messageIds)
logger.info('Deleted messages via DbService', { topicId, count: messageIds.length })
} catch (error) {
logger.error('Failed to delete messages:', { topicId, messageIds, error })
throw error
}
}
/**
* Clear all messages from a topic
*/
export const clearMessagesFromDBV2 = async (topicId: string): Promise<void> => {
try {
await dbService.clearMessages(topicId)
logger.info('Cleared all messages via DbService', { topicId })
} catch (error) {
logger.error('Failed to clear messages:', { topicId, error })
throw error
}
}
// =================================================================
// Phase 2.4 - Batch 4: Complex write operations
// =================================================================
/**
* Save a message and its blocks to database
* Uses unified interface, no need for isAgentSessionTopicId check
*/
export const saveMessageAndBlocksToDBV2 = async (
topicId: string,
message: Message,
blocks: MessageBlock[],
messageIndex: number = -1
): Promise<void> => {
try {
// Direct call without conditional logic, now with messageIndex
await dbService.appendMessage(topicId, message, blocks, messageIndex)
logger.info('Saved message and blocks via DbService', {
topicId,
messageId: message.id,
blockCount: blocks.length,
messageIndex
})
} catch (error) {
logger.error('Failed to save message and blocks:', { topicId, messageId: message.id, error })
throw error
}
}
// Note: sendMessageV2 would be implemented here but it's more complex
// and would require more of the supporting code from messageThunk.ts
// =================================================================
// Phase 2.5 - Batch 5: Update operations
// =================================================================
/**
* Update a message in the database
*/
export const updateMessageV2 = async (topicId: string, messageId: string, updates: Partial<Message>): Promise<void> => {
try {
await dbService.updateMessage(topicId, messageId, updates)
logger.info('Updated message via DbService', { topicId, messageId })
} catch (error) {
logger.error('Failed to update message:', { topicId, messageId, error })
throw error
}
}
/**
* Update a single message block
*/
export const updateSingleBlockV2 = async (blockId: string, updates: Partial<MessageBlock>): Promise<void> => {
try {
await dbService.updateSingleBlock(blockId, updates)
logger.info('Updated single block via DbService', { blockId })
} catch (error) {
logger.error('Failed to update single block:', { blockId, error })
throw error
}
}
/**
* Bulk add message blocks (for new blocks)
*/
export const bulkAddBlocksV2 = async (blocks: MessageBlock[]): Promise<void> => {
try {
await dbService.bulkAddBlocks(blocks)
logger.info('Bulk added blocks via DbService', { count: blocks.length })
} catch (error) {
logger.error('Failed to bulk add blocks:', { count: blocks.length, error })
throw error
}
}
/**
* Update multiple message blocks (upsert operation)
*/
export const updateBlocksV2 = async (blocks: MessageBlock[]): Promise<void> => {
try {
await dbService.updateBlocks(blocks)
logger.info('Updated blocks via DbService', { count: blocks.length })
} catch (error) {
logger.error('Failed to update blocks:', { count: blocks.length, error })
throw error
}
}

View File

@ -198,8 +198,14 @@ export type Metrics = {
time_thinking_millsec?: number time_thinking_millsec?: number
} }
export enum TopicType {
Chat = 'chat',
Session = 'session'
}
export type Topic = { export type Topic = {
id: string id: string
type: TopicType
assistantId: string assistantId: string
name: string name: string
createdAt: string createdAt: string