This commit is contained in:
suyao 2025-09-22 18:32:19 +08:00
parent 634c478e18
commit 1d5761b1fd
No known key found for this signature in database
16 changed files with 2364 additions and 3 deletions

View File

@ -6,3 +6,6 @@ CSLOGGER_MAIN_LEVEL=info
CSLOGGER_RENDERER_LEVEL=info
#CSLOGGER_MAIN_SHOW_MODULES=
#CSLOGGER_RENDERER_SHOW_MODULES=
# Feature Flags (must be prefixed with VITE_ to be accessible in renderer)
# VITE_USE_UNIFIED_DB_SERVICE=true # Enable unified DB service for chat/agent sessions

518
TODO.md Normal file
View File

@ -0,0 +1,518 @@
# 统一 Chat 和 Agent Session 数据层架构重构方案
## 目标
通过创建统一的数据访问层,消除 AgentSessionMessages 和 Messages 组件的重复代码,实现普通聊天和 Agent 会话的统一处理。
## 核心设计
使用门面模式 (Facade Pattern) 和策略模式 (Strategy Pattern) 创建统一的数据访问层,对外提供一致的 API内部根据 topicId 类型自动路由到不同的数据源。
## 架构设计
```
┌─────────────────────────────────────────┐
│ UI Components │
│ (Messages, Inputbar - 完全复用) │
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│ Hooks & Selectors │
│ (useTopic, useTopicMessages - 统一) │
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│ Redux Thunks │
│ (不再判断 isAgentSessionTopicId) │
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│ DbService (门面) │
│ 根据 topicId 内部路由到对应数据源 │
└─────────────────────────────────────────┘
┌───────────┴───────────┐
┌──────────────┐ ┌──────────────────┐
│ DexieMessage │ │ AgentMessage │
│ DataSource │ │ DataSource │
│ │ │ │
│ (Dexie) │ │ (IPC/Backend) │
└──────────────┘ └──────────────────┘
```
## 实施计划
### Phase 1: 创建数据访问层 (`src/renderer/src/services/db/`)
#### 1.1 定义 MessageDataSource 接口
```typescript
// src/renderer/src/services/db/types.ts
interface MessageDataSource {
// 读取操作
fetchMessages(topicId: string): Promise<{ messages: Message[], blocks: MessageBlock[] }>
getRawTopic(topicId: string): Promise<{ id: string; messages: Message[] }>
// 写入操作
persistExchange(topicId: string, exchange: MessageExchange): Promise<void>
appendMessage(topicId: string, message: Message, blocks: MessageBlock[]): Promise<void>
updateMessage(topicId: string, messageId: string, updates: Partial<Message>): Promise<void>
deleteMessage(topicId: string, messageId: string): Promise<void>
// 批量操作
clearMessages(topicId: string): Promise<void>
updateBlocks(blocks: MessageBlock[]): Promise<void>
}
interface MessageExchange {
user?: { message: Message, blocks: MessageBlock[] }
assistant?: { message: Message, blocks: MessageBlock[] }
}
```
#### 1.2 实现 DexieMessageDataSource
```typescript
// src/renderer/src/services/db/DexieMessageDataSource.ts
class DexieMessageDataSource implements MessageDataSource {
async fetchMessages(topicId: string) {
const topic = await db.topics.get(topicId)
const messages = topic?.messages || []
const messageIds = messages.map(m => m.id)
const blocks = await db.message_blocks.where('messageId').anyOf(messageIds).toArray()
return { messages, blocks }
}
async persistExchange(topicId: string, exchange: MessageExchange) {
// 保存到 Dexie 数据库
await db.transaction('rw', db.topics, db.message_blocks, async () => {
// ... 现有的保存逻辑
})
}
// ... 其他方法实现
}
```
#### 1.3 实现 AgentMessageDataSource
```typescript
// src/renderer/src/services/db/AgentMessageDataSource.ts
class AgentMessageDataSource implements MessageDataSource {
async fetchMessages(topicId: string) {
const sessionId = topicId.replace('agent-session:', '')
const historicalMessages = await window.electron.ipcRenderer.invoke(
IpcChannel.AgentMessage_GetHistory,
{ sessionId }
)
const messages: Message[] = []
const blocks: MessageBlock[] = []
for (const msg of historicalMessages) {
if (msg?.message) {
messages.push(msg.message)
if (msg.blocks) blocks.push(...msg.blocks)
}
}
return { messages, blocks }
}
async persistExchange(topicId: string, exchange: MessageExchange) {
const sessionId = topicId.replace('agent-session:', '')
await window.electron.ipcRenderer.invoke(
IpcChannel.AgentMessage_PersistExchange,
{ sessionId, ...exchange }
)
}
// ... 其他方法实现
}
```
#### 1.4 创建 DbService 门面
```typescript
// src/renderer/src/services/db/DbService.ts
class DbService {
private dexieSource = new DexieMessageDataSource()
private agentSource = new AgentMessageDataSource()
private getDataSource(topicId: string): MessageDataSource {
if (isAgentSessionTopicId(topicId)) {
return this.agentSource
}
// 未来可扩展其他数据源判断
return this.dexieSource
}
async fetchMessages(topicId: string) {
return this.getDataSource(topicId).fetchMessages(topicId)
}
async persistExchange(topicId: string, exchange: MessageExchange) {
return this.getDataSource(topicId).persistExchange(topicId, exchange)
}
// ... 代理其他方法
}
export const dbService = new DbService()
```
### Phase 2: 重构 Redux Thunks详细拆分
由于 messageThunk.ts 改动较大,将 Phase 2 分成多个批次逐步实施:
#### 2.0 准备工作
- [ ] 添加 Feature Flag: `USE_UNIFIED_DB_SERVICE`
- [ ] 创建 messageThunk.v2.ts 作为临时过渡文件
- [ ] 准备回滚方案
#### 2.1 批次1只读操作重构风险最低
这批改动只涉及读取操作,不会影响数据写入,风险最低。
##### 需要重构的函数
```typescript
// loadTopicMessagesThunk
export const loadTopicMessagesThunkV2 = (topicId: string, forceReload: boolean = false) =>
async (dispatch: AppDispatch, getState: () => RootState) => {
const state = getState()
if (!forceReload && state.messages.messageIdsByTopic[topicId]) {
return // 已有缓存
}
try {
dispatch(newMessagesActions.setTopicLoading({ topicId, loading: true }))
// 新:统一调用
const { messages, blocks } = await dbService.fetchMessages(topicId)
if (blocks.length > 0) {
dispatch(upsertManyBlocks(blocks))
}
dispatch(newMessagesActions.messagesReceived({ topicId, messages }))
} catch (error) {
logger.error(`Failed to load messages for topic ${topicId}:`, error)
} finally {
dispatch(newMessagesActions.setTopicLoading({ topicId, loading: false }))
}
}
// getRawTopic
export const getRawTopicV2 = async (topicId: string) => {
return await dbService.getRawTopic(topicId)
}
```
##### 测试清单
- [ ] 普通 Topic 消息加载
- [ ] Agent Session 消息加载
- [ ] 缓存机制正常工作
- [ ] 错误处理
#### 2.2 批次2辅助函数重构
这批函数不直接操作数据库,但依赖数据库操作。
##### 需要重构的函数
```typescript
// getTopic
export const getTopicV2 = async (topicId: string): Promise<Topic | undefined> => {
const rawTopic = await dbService.getRawTopic(topicId)
if (!rawTopic) return undefined
return {
id: rawTopic.id,
type: isAgentSessionTopicId(topicId) ? TopicType.AgentSession : TopicType.Chat,
messages: rawTopic.messages,
// ... 其他字段
}
}
// updateFileCount
export const updateFileCountV2 = async (
fileId: string,
delta: number,
deleteIfZero = false
) => {
// 只对 Dexie 数据源有效
if (dbService.supportsFileCount) {
await dbService.updateFileCount(fileId, delta, deleteIfZero)
}
}
```
##### 测试清单
- [ ] getTopic 返回正确的 Topic 类型
- [ ] updateFileCount 只在支持的数据源上执行
- [ ] 边界条件测试
#### 2.3 批次3删除操作重构
删除操作相对独立,风险可控。
##### 需要重构的函数
```typescript
// deleteMessageFromDB
export const deleteMessageFromDBV2 = async (
topicId: string,
messageId: string
): Promise<void> => {
await dbService.deleteMessage(topicId, messageId)
}
// deleteMessagesFromDB
export const deleteMessagesFromDBV2 = async (
topicId: string,
messageIds: string[]
): Promise<void> => {
await dbService.deleteMessages(topicId, messageIds)
}
// clearMessagesFromDB
export const clearMessagesFromDBV2 = async (topicId: string): Promise<void> => {
await dbService.clearMessages(topicId)
}
```
##### 测试清单
- [ ] 单个消息删除
- [ ] 批量消息删除
- [ ] 清空所有消息
- [ ] 文件引用计数正确更新
- [ ] Agent Session 删除操作(应为 no-op
#### 2.4 批次4复杂写入操作重构
这批包含最复杂的写入逻辑,需要特别注意。
##### 需要重构的函数
```typescript
// saveMessageAndBlocksToDB
export const saveMessageAndBlocksToDBV2 = async (
topicId: string,
message: Message,
blocks: MessageBlock[]
): Promise<void> => {
// 移除 isAgentSessionTopicId 判断
await dbService.appendMessage(topicId, message, blocks)
}
// persistExchange
export const persistExchangeV2 = async (
topicId: string,
exchange: MessageExchange
): Promise<void> => {
await dbService.persistExchange(topicId, exchange)
}
// sendMessage (最复杂的函数)
export const sendMessageV2 = (userMessage, userMessageBlocks, assistant, topicId, agentSession?) =>
async (dispatch, getState) => {
// 保存用户消息 - 统一接口
await dbService.appendMessage(topicId, userMessage, userMessageBlocks)
dispatch(newMessagesActions.addMessage({ topicId, message: userMessage }))
// ... 创建助手消息 ...
// 保存交换对 - 统一接口
await dbService.persistExchange(topicId, {
user: { message: userMessage, blocks: userMessageBlocks },
assistant: { message: assistantMessage, blocks: [] }
})
}
```
##### 测试清单
- [ ] 普通消息发送流程
- [ ] Agent Session 消息发送流程
- [ ] 消息块正确保存
- [ ] Redux state 正确更新
- [ ] 流式响应处理
- [ ] 错误处理和重试机制
#### 2.5 批次5更新操作重构
更新操作通常涉及消息编辑、状态更新等。
##### 需要重构的函数
```typescript
// updateMessage
export const updateMessageV2 = async (
topicId: string,
messageId: string,
updates: Partial<Message>
): Promise<void> => {
await dbService.updateMessage(topicId, messageId, updates)
}
// updateSingleBlock
export const updateSingleBlockV2 = async (
blockId: string,
updates: Partial<MessageBlock>
): Promise<void> => {
await dbService.updateSingleBlock(blockId, updates)
}
// bulkAddBlocks
export const bulkAddBlocksV2 = async (blocks: MessageBlock[]): Promise<void> => {
await dbService.bulkAddBlocks(blocks)
}
```
##### 测试清单
- [ ] 消息内容更新
- [ ] 消息状态更新
- [ ] 消息块更新
- [ ] 批量块添加
- [ ] Agent Session 更新操作(应为 no-op
#### 2.6 迁移策略
##### 阶段1并行运行Week 1
```typescript
export const loadTopicMessagesThunk = (topicId: string, forceReload: boolean = false) => {
if (featureFlags.USE_UNIFIED_DB_SERVICE) {
return loadTopicMessagesThunkV2(topicId, forceReload)
}
return loadTopicMessagesThunkOriginal(topicId, forceReload)
}
```
##### 阶段2灰度测试Week 2
- 10% 用户使用新实现
- 监控性能和错误率
- 收集用户反馈
##### 阶段3全量迁移Week 3
- 100% 用户使用新实现
- 保留 feature flag 一周观察
- 准备回滚方案
##### 阶段4代码清理Week 4
- 移除旧实现代码
- 移除 feature flag
- 更新文档
#### 2.8 回滚计划
如果出现问题,按以下步骤回滚:
1. **立即回滚**< 5分钟
- 关闭 feature flag
- 所有流量回到旧实现
2. **修复后重试**
- 分析问题原因
- 修复并添加测试
- 小范围测试后重新上线
3. **彻底回滚**(如果问题严重)
- 恢复到改动前的代码版本
- 重新评估方案
### Phase 3: 统一 Hooks 层
#### 3.1 创建统一的 useTopic Hook
```typescript
// src/renderer/src/hooks/useTopic.ts
export const useTopic = (topicIdOrSessionId: string): Topic => {
const topicId = buildTopicId(topicIdOrSessionId) // 处理映射
const [topic, setTopic] = useState<Topic>()
useEffect(() => {
dbService.fetchTopic(topicId).then(setTopic)
}, [topicId])
return topic
}
```
#### 3.2 统一 useTopicMessages
```typescript
// src/renderer/src/hooks/useTopicMessages.ts
export const useTopicMessages = (topicId: string) => {
const messages = useAppSelector(state => selectMessagesForTopic(state, topicId))
const dispatch = useAppDispatch()
useEffect(() => {
dispatch(loadTopicMessagesThunk(topicId))
}, [topicId, dispatch])
return messages // 无需区分数据源
}
```
### Phase 4: UI 组件复用
#### 4.1 直接使用 Messages 组件
- 删除 `AgentSessionMessages.tsx`
- 在 Agent 会话页面直接使用 `Messages` 组件
#### 4.2 轻量化 AgentSessionInputbar
```typescript
// src/renderer/src/pages/home/Inputbar/AgentSessionInputbar.tsx
const AgentSessionInputbar: FC<Props> = ({ agentId, sessionId }) => {
const topicId = buildAgentSessionTopicId(sessionId)
const assistant = deriveAssistantFromAgent(agentId) // 从 agent 派生 assistant
const topic = useTopic(topicId) // 使用统一 hook
return <Inputbar assistant={assistant} topic={topic} />
}
```
### Phase 5: 测试和迁移
#### 5.1 单元测试
- [ ] DbService 路由逻辑测试
- [ ] DexieMessageDataSource CRUD 测试
- [ ] AgentMessageDataSource CRUD 测试
- [ ] 数据格式兼容性测试
#### 5.2 集成测试
- [ ] 普通聊天全流程
- [ ] Agent 会话全流程
- [ ] 消息编辑/删除
- [ ] 分支功能
- [ ] 流式响应
#### 5.3 性能测试
- [ ] 大量消息加载
- [ ] 内存占用
- [ ] 响应延迟
## 优势分析
### 代码精简度
- **组件层**: 减少 ~500 行(删除 AgentSessionMessages
- **Thunk 层**: 减少 ~300 行(移除条件判断)
- **总计减少**: ~40% 重复代码
### 架构优势
1. **单一职责**: 数据访问逻辑完全独立
2. **开闭原则**: 新增数据源只需实现接口
3. **依赖倒置**: 高层模块不依赖具体实现
4. **接口隔离**: 清晰的 API 边界
### 维护性提升
- 统一的数据访问接口
- 减少条件判断分支
- 便于单元测试
- 易于调试和追踪
## 风险控制
### 潜在风险
1. **数据一致性**: 确保两种数据源的数据格式一致
2. **性能开销**: 门面层可能带来轻微性能损失(<5ms
3. **缓存策略**: Agent 数据不应缓存到本地数据库
### 缓解措施
1. 添加数据格式验证层
2. 使用轻量级代理,避免过度抽象
3. 在 DbService 层明确缓存策略
## 实施建议
### 渐进式迁移
1. **Week 1**: 实现数据访问层,不改动现有代码
2. **Week 2**: 逐个迁移 thunk 函数,保持向后兼容
3. **Week 3**: 统一组件层,充分测试
### 回滚策略
- 保留原有代码分支
- 通过 feature flag 控制新旧实现切换
- 分阶段灰度发布
## 总结
这个方案通过门面模式和统一的数据访问接口,实现了普通聊天和 Agent 会话的完全统一,大幅减少了代码重复,提升了系统的可维护性和可扩展性。

View File

@ -91,6 +91,7 @@ export enum IpcChannel {
// agent messages
AgentMessage_PersistExchange = 'agent-message:persist-exchange',
AgentMessage_GetHistory = 'agent-message:get-history',
//copilot
Copilot_GetAuthMessage = 'copilot:get-auth-message',

View File

@ -209,6 +209,15 @@ export function registerIpc(mainWindow: BrowserWindow, app: Electron.App) {
}
})
ipcMain.handle(IpcChannel.AgentMessage_GetHistory, async (_event, { sessionId }: { sessionId: string }) => {
try {
return await agentMessageRepository.getSessionHistory(sessionId)
} catch (error) {
logger.error('Failed to get agent session history', error as Error)
throw error
}
})
//only for mac
if (isMac) {
ipcMain.handle(IpcChannel.App_MacIsProcessTrusted, (): boolean => {

View File

@ -7,6 +7,7 @@ import type {
AgentPersistedMessage,
AgentSessionMessageEntity
} from '@types'
import { asc, eq } from 'drizzle-orm'
import { BaseService } from '../BaseService'
import type { InsertSessionMessageRow } from './schema'
@ -176,6 +177,34 @@ class AgentMessageRepository extends BaseService {
return result
}
async getSessionHistory(sessionId: string): Promise<AgentPersistedMessage[]> {
await AgentMessageRepository.initialize()
this.ensureInitialized()
try {
const rows = await this.database
.select()
.from(sessionMessagesTable)
.where(eq(sessionMessagesTable.session_id, sessionId))
.orderBy(asc(sessionMessagesTable.created_at))
const messages: AgentPersistedMessage[] = []
for (const row of rows) {
const deserialized = this.deserialize(row)
if (deserialized?.content) {
messages.push(deserialized.content as AgentPersistedMessage)
}
}
logger.info(`Loaded ${messages.length} messages for session ${sessionId}`)
return messages
} catch (error) {
logger.error('Failed to load session history', error as Error)
throw error
}
}
}
export const agentMessageRepository = AgentMessageRepository.getInstance()

View File

@ -0,0 +1,81 @@
/**
* Feature flags for controlling gradual rollout of new features
* These flags can be toggled to enable/disable features without code changes
*/
interface FeatureFlags {
/**
* Enable unified database service for both regular chats and agent sessions
* When enabled, uses the new DbService facade pattern
* When disabled, uses the original implementation with conditional checks
*/
USE_UNIFIED_DB_SERVICE: boolean
}
/**
* Default feature flag values
* Set to false initially for safe rollout
*/
export const featureFlags: FeatureFlags = {
USE_UNIFIED_DB_SERVICE: false
}
/**
* Override feature flags from environment or local storage
* Priority order (highest to lowest):
* 1. localStorage (runtime overrides)
* 2. Environment variables (build-time config)
* 3. Default values
*/
export function initializeFeatureFlags(): void {
// First, check environment variables (build-time configuration)
// In Vite, env vars must be prefixed with VITE_ to be exposed to the client
// Usage: VITE_USE_UNIFIED_DB_SERVICE=true yarn dev
if (import.meta.env?.VITE_USE_UNIFIED_DB_SERVICE === 'true') {
featureFlags.USE_UNIFIED_DB_SERVICE = true
console.log('[FeatureFlags] USE_UNIFIED_DB_SERVICE enabled via environment variable')
}
// Then check localStorage for runtime overrides (higher priority)
// This allows toggling features without rebuilding
try {
const localOverrides = localStorage.getItem('featureFlags')
if (localOverrides) {
const overrides = JSON.parse(localOverrides)
Object.keys(overrides).forEach((key) => {
if (key in featureFlags) {
featureFlags[key as keyof FeatureFlags] = overrides[key]
console.log(`[FeatureFlags] ${key} set to ${overrides[key]} via localStorage`)
}
})
}
} catch (e) {
console.warn('[FeatureFlags] Failed to parse feature flags from localStorage:', e)
}
console.log('[FeatureFlags] Current flags:', featureFlags)
}
/**
* Update a feature flag value at runtime
* Useful for A/B testing or gradual rollout
*/
export function setFeatureFlag(flag: keyof FeatureFlags, value: boolean): void {
featureFlags[flag] = value
// Persist to localStorage for consistency across app restarts
const currentFlags = localStorage.getItem('featureFlags')
const flags = currentFlags ? JSON.parse(currentFlags) : {}
flags[flag] = value
localStorage.setItem('featureFlags', JSON.stringify(flags))
}
/**
* Get current value of a feature flag
*/
export function getFeatureFlag(flag: keyof FeatureFlags): boolean {
return featureFlags[flag]
}
// Initialize on import
initializeFeatureFlags()

View File

@ -2,11 +2,12 @@ import { loggerService } from '@logger'
import ContextMenu from '@renderer/components/ContextMenu'
import { useSession } from '@renderer/hooks/agents/useSession'
import { getGroupedMessages } from '@renderer/services/MessagesService'
import { useAppSelector } from '@renderer/store'
import { useAppDispatch, useAppSelector } from '@renderer/store'
import { selectMessagesForTopic } from '@renderer/store/newMessage'
import { loadTopicMessagesThunk } from '@renderer/store/thunk/messageThunk'
import { Topic } from '@renderer/types'
import { buildAgentSessionTopicId } from '@renderer/utils/agentSession'
import { memo, useMemo } from 'react'
import { memo, useEffect, useMemo } from 'react'
import styled from 'styled-components'
import MessageGroup from './MessageGroup'
@ -21,10 +22,19 @@ type Props = {
}
const AgentSessionMessages: React.FC<Props> = ({ agentId, sessionId }) => {
const dispatch = useAppDispatch()
const { session } = useSession(agentId, sessionId)
const sessionTopicId = useMemo(() => buildAgentSessionTopicId(sessionId), [sessionId])
const messages = useAppSelector((state) => selectMessagesForTopic(state, sessionTopicId))
// Load messages when session changes
useEffect(() => {
if (sessionId) {
logger.info('Loading messages for agent session', { sessionId })
dispatch(loadTopicMessagesThunk(sessionTopicId, true)) // Force reload to get latest from backend
}
}, [dispatch, sessionId, sessionTopicId])
const displayMessages = useMemo(() => {
if (!messages || messages.length === 0) return []
return [...messages].reverse()

View File

@ -0,0 +1,262 @@
import { loggerService } from '@logger'
import type { Topic } from '@renderer/types'
import type { AgentPersistedMessage } from '@renderer/types/agent'
import type { Message, MessageBlock } from '@renderer/types/newMessage'
import { IpcChannel } from '@shared/IpcChannel'
import type { MessageDataSource, MessageExchange } from './types'
import { extractSessionId } from './types'
const logger = loggerService.withContext('AgentMessageDataSource')
/**
* IPC-based implementation of MessageDataSource
* Handles agent session messages through backend communication
*/
export class AgentMessageDataSource implements MessageDataSource {
// ============ Read Operations ============
async fetchMessages(topicId: string): Promise<{
messages: Message[]
blocks: MessageBlock[]
}> {
try {
const sessionId = extractSessionId(topicId)
if (!window.electron?.ipcRenderer) {
logger.warn('IPC renderer not available')
return { messages: [], blocks: [] }
}
// Fetch from agent backend
const historicalMessages: AgentPersistedMessage[] = await window.electron.ipcRenderer.invoke(
IpcChannel.AgentMessage_GetHistory,
{ sessionId }
)
if (!historicalMessages || !Array.isArray(historicalMessages)) {
return { messages: [], blocks: [] }
}
const messages: Message[] = []
const blocks: MessageBlock[] = []
for (const persistedMsg of historicalMessages) {
if (persistedMsg?.message) {
messages.push(persistedMsg.message)
if (persistedMsg.blocks && persistedMsg.blocks.length > 0) {
blocks.push(...persistedMsg.blocks)
}
}
}
logger.info(`Loaded ${messages.length} messages for agent session ${sessionId}`)
return { messages, blocks }
} catch (error) {
logger.error(`Failed to fetch messages for agent session ${topicId}:`, error as Error)
throw error
}
}
// ============ Write Operations ============
async persistExchange(topicId: string, exchange: MessageExchange): Promise<void> {
try {
const sessionId = extractSessionId(topicId)
if (!window.electron?.ipcRenderer) {
logger.warn('IPC renderer not available for persist exchange')
return
}
const payload: any = {
sessionId,
agentSessionId: exchange.agentSessionId || ''
}
// Prepare user payload
if (exchange.user) {
payload.user = {
payload: {
message: exchange.user.message,
blocks: exchange.user.blocks
}
}
}
// Prepare assistant payload
if (exchange.assistant) {
payload.assistant = {
payload: {
message: exchange.assistant.message,
blocks: exchange.assistant.blocks
}
}
}
await window.electron.ipcRenderer.invoke(IpcChannel.AgentMessage_PersistExchange, payload)
logger.info(`Persisted exchange for agent session ${sessionId}`)
} catch (error) {
logger.error(`Failed to persist exchange for agent session ${topicId}:`, error as Error)
throw error
}
}
async appendMessage(topicId: string, message: Message, blocks: MessageBlock[], insertIndex?: number): Promise<void> {
// For agent sessions, messages are persisted through persistExchange
// This method might be called for user messages before the exchange
// We'll store them temporarily in memory or skip for now
logger.info(`appendMessage called for agent session ${topicId}, deferring to persistExchange`)
// In a full implementation, you might want to:
// 1. Store temporarily in Redux only
// 2. Or call a specific backend endpoint for single messages
}
async updateMessage(topicId: string, messageId: string, updates: Partial<Message>): Promise<void> {
// Agent session messages are immutable once persisted
logger.warn(`updateMessage called for agent session ${topicId}, operation not supported`)
// In a full implementation, you might want to:
// 1. Update in Redux only for UI consistency
// 2. Or implement a backend endpoint for message updates
}
async updateMessageAndBlocks(
topicId: string,
messageUpdates: Partial<Message> & Pick<Message, 'id'>,
blocksToUpdate: MessageBlock[]
): Promise<void> {
// Agent session messages and blocks are immutable once persisted
logger.warn(`updateMessageAndBlocks called for agent session ${topicId}, operation not supported`)
}
async deleteMessage(topicId: string, messageId: string): Promise<void> {
// Agent session messages cannot be deleted individually
logger.warn(`deleteMessage called for agent session ${topicId}, operation not supported`)
// In a full implementation, you might want to:
// 1. Implement soft delete in backend
// 2. Or just hide from UI without actual deletion
}
async deleteMessagesByAskId(topicId: string, askId: string): Promise<void> {
// Agent session messages cannot be deleted
logger.warn(`deleteMessagesByAskId called for agent session ${topicId}, operation not supported`)
}
// ============ Block Operations ============
async updateBlocks(blocks: MessageBlock[]): Promise<void> {
// Blocks are updated through persistExchange for agent sessions
logger.warn('updateBlocks called for agent session, operation not supported individually')
}
async deleteBlocks(blockIds: string[]): Promise<void> {
// Blocks cannot be deleted individually for agent sessions
logger.warn('deleteBlocks called for agent session, operation not supported')
}
// ============ Batch Operations ============
async clearMessages(topicId: string): Promise<void> {
const sessionId = extractSessionId(topicId)
if (!window.electron?.ipcRenderer) {
logger.warn('IPC renderer not available for clear messages')
return
}
// In a full implementation, you would call a backend endpoint to clear session
// For now, we'll just log the attempt
logger.info(`Clear messages requested for agent session ${sessionId}`)
// You might want to implement:
// await window.electron.ipcRenderer.invoke(
// IpcChannel.AgentMessage_ClearSession,
// { sessionId }
// )
}
async topicExists(topicId: string): Promise<boolean> {
try {
const sessionId = extractSessionId(topicId)
if (!window.electron?.ipcRenderer) {
return false
}
// Check if session exists by trying to fetch messages
// In a full implementation, you'd have a dedicated endpoint
const messages = await this.fetchMessages(topicId)
return true // If no error thrown, session exists
} catch (error) {
return false
}
}
async ensureTopic(topicId: string): Promise<void> {
// Agent sessions are created externally, not by the chat interface
// This is a no-op for agent sessions
const sessionId = extractSessionId(topicId)
logger.info(`ensureTopic called for agent session ${sessionId}, no action needed`)
}
async fetchTopic(topicId: string): Promise<Topic | undefined> {
try {
const sessionId = extractSessionId(topicId)
// For agent sessions, we construct a synthetic topic
// In a real implementation, you might fetch session metadata from backend
return {
id: topicId,
name: `Session ${sessionId}`,
assistantId: 'agent',
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
messages: [] // Messages are fetched separately
} as Topic
} catch (error) {
logger.error(`Failed to fetch topic for agent session ${topicId}:`, error as Error)
throw error
}
}
async getRawTopic(topicId: string): Promise<{ id: string; messages: Message[] } | undefined> {
try {
// For agent sessions, fetch messages from backend and return in raw topic format
const { messages } = await this.fetchMessages(topicId)
return {
id: topicId,
messages
}
} catch (error) {
logger.error(`Failed to get raw topic for agent session ${topicId}:`, error as Error)
return undefined
}
}
// ============ Additional Methods for Interface Compatibility ============
async updateSingleBlock(blockId: string, updates: Partial<MessageBlock>): Promise<void> {
// Agent session blocks are immutable once persisted
logger.warn(`updateSingleBlock called for agent session block ${blockId}, operation not supported`)
}
async bulkAddBlocks(blocks: MessageBlock[]): Promise<void> {
// Agent session blocks are added through persistExchange
logger.warn(`bulkAddBlocks called for agent session, operation not supported individually`)
}
async updateFileCount(fileId: string, delta: number): Promise<void> {
// Agent sessions don't manage file reference counts locally
logger.warn(`updateFileCount called for agent session file ${fileId}, operation not supported`)
}
async updateFileCounts(files: Array<{ id: string; delta: number }>): Promise<void> {
// Agent sessions don't manage file reference counts locally
logger.warn(`updateFileCounts called for agent session, operation not supported`)
}
}

View File

@ -0,0 +1,213 @@
import { loggerService } from '@logger'
import type { Topic } from '@renderer/types'
import type { Message, MessageBlock } from '@renderer/types/newMessage'
import { AgentMessageDataSource } from './AgentMessageDataSource'
import { DexieMessageDataSource } from './DexieMessageDataSource'
import type { MessageDataSource, MessageExchange } from './types'
import { isAgentSessionTopicId } from './types'
const logger = loggerService.withContext('DbService')
/**
* Facade service that routes data operations to the appropriate data source
* based on the topic ID type (regular chat or agent session)
*/
class DbService implements MessageDataSource {
private static instance: DbService
private dexieSource: DexieMessageDataSource
private agentSource: AgentMessageDataSource
private constructor() {
this.dexieSource = new DexieMessageDataSource()
this.agentSource = new AgentMessageDataSource()
}
/**
* Get singleton instance
*/
static getInstance(): DbService {
if (!DbService.instance) {
DbService.instance = new DbService()
}
return DbService.instance
}
/**
* Determine which data source to use based on topic ID
*/
private getDataSource(topicId: string): MessageDataSource {
if (isAgentSessionTopicId(topicId)) {
logger.silly(`Using AgentMessageDataSource for topic ${topicId}`)
return this.agentSource
}
// Future: Could add more data source types here
// e.g., if (isCloudTopicId(topicId)) return this.cloudSource
logger.silly(`Using DexieMessageDataSource for topic ${topicId}`)
return this.dexieSource
}
// ============ Read Operations ============
async fetchMessages(
topicId: string,
forceReload?: boolean
): Promise<{
messages: Message[]
blocks: MessageBlock[]
}> {
const source = this.getDataSource(topicId)
return source.fetchMessages(topicId, forceReload)
}
async fetchTopic(topicId: string): Promise<Topic | undefined> {
const source = this.getDataSource(topicId)
return source.fetchTopic(topicId)
}
// ============ Write Operations ============
async persistExchange(topicId: string, exchange: MessageExchange): Promise<void> {
const source = this.getDataSource(topicId)
return source.persistExchange(topicId, exchange)
}
async appendMessage(topicId: string, message: Message, blocks: MessageBlock[], insertIndex?: number): Promise<void> {
const source = this.getDataSource(topicId)
return source.appendMessage(topicId, message, blocks, insertIndex)
}
async updateMessage(topicId: string, messageId: string, updates: Partial<Message>): Promise<void> {
const source = this.getDataSource(topicId)
return source.updateMessage(topicId, messageId, updates)
}
async updateMessageAndBlocks(
topicId: string,
messageUpdates: Partial<Message> & Pick<Message, 'id'>,
blocksToUpdate: MessageBlock[]
): Promise<void> {
const source = this.getDataSource(topicId)
return source.updateMessageAndBlocks(topicId, messageUpdates, blocksToUpdate)
}
async deleteMessage(topicId: string, messageId: string): Promise<void> {
const source = this.getDataSource(topicId)
return source.deleteMessage(topicId, messageId)
}
async deleteMessagesByAskId(topicId: string, askId: string): Promise<void> {
const source = this.getDataSource(topicId)
return source.deleteMessagesByAskId(topicId, askId)
}
// ============ Block Operations ============
async updateBlocks(blocks: MessageBlock[]): Promise<void> {
// For block operations, we need to infer the source from the first block's message
// This is a limitation of the current design where blocks don't have topicId
// In practice, blocks are usually updated in context of a topic operation
// Default to Dexie for now since agent blocks are updated through persistExchange
return this.dexieSource.updateBlocks(blocks)
}
async deleteBlocks(blockIds: string[]): Promise<void> {
// Similar limitation as updateBlocks
// Default to Dexie since agent blocks can't be deleted individually
return this.dexieSource.deleteBlocks(blockIds)
}
// ============ Batch Operations ============
async clearMessages(topicId: string): Promise<void> {
const source = this.getDataSource(topicId)
return source.clearMessages(topicId)
}
async topicExists(topicId: string): Promise<boolean> {
const source = this.getDataSource(topicId)
return source.topicExists(topicId)
}
async ensureTopic(topicId: string): Promise<void> {
const source = this.getDataSource(topicId)
return source.ensureTopic(topicId)
}
// ============ Optional Methods (with fallback) ============
async getRawTopic(topicId: string): Promise<{ id: string; messages: Message[] } | undefined> {
const source = this.getDataSource(topicId)
if (source.getRawTopic) {
return source.getRawTopic(topicId)
}
// Fallback: fetch using fetchTopic and extract messages
const topic = await source.fetchTopic(topicId)
return topic ? { id: topic.id, messages: topic.messages } : undefined
}
async updateSingleBlock(blockId: string, updates: Partial<MessageBlock>): Promise<void> {
// For single block operations, default to Dexie since agent blocks are immutable
if (this.dexieSource.updateSingleBlock) {
return this.dexieSource.updateSingleBlock(blockId, updates)
}
// Fallback to updateBlocks with single item
return this.dexieSource.updateBlocks([{ ...updates, id: blockId } as MessageBlock])
}
async bulkAddBlocks(blocks: MessageBlock[]): Promise<void> {
// For bulk add operations, default to Dexie since agent blocks use persistExchange
if (this.dexieSource.bulkAddBlocks) {
return this.dexieSource.bulkAddBlocks(blocks)
}
// Fallback to updateBlocks
return this.dexieSource.updateBlocks(blocks)
}
async updateFileCount(fileId: string, delta: number): Promise<void> {
// File operations only apply to Dexie source
if (this.dexieSource.updateFileCount) {
return this.dexieSource.updateFileCount(fileId, delta)
}
// No-op if not supported
logger.warn(`updateFileCount not supported for file ${fileId}`)
}
async updateFileCounts(files: Array<{ id: string; delta: number }>): Promise<void> {
// File operations only apply to Dexie source
if (this.dexieSource.updateFileCounts) {
return this.dexieSource.updateFileCounts(files)
}
// No-op if not supported
logger.warn(`updateFileCounts not supported`)
}
// ============ Utility Methods ============
/**
* Check if a topic is an agent session
*/
isAgentSession(topicId: string): boolean {
return isAgentSessionTopicId(topicId)
}
/**
* Get the data source type for a topic
*/
getSourceType(topicId: string): 'dexie' | 'agent' | 'unknown' {
if (isAgentSessionTopicId(topicId)) {
return 'agent'
}
// Add more checks for other source types as needed
return 'dexie'
}
}
// Export singleton instance
export const dbService = DbService.getInstance()
// Also export class for testing purposes
export { DbService }

View File

@ -0,0 +1,438 @@
import { loggerService } from '@logger'
import db from '@renderer/databases'
import FileManager from '@renderer/services/FileManager'
import store from '@renderer/store'
import { updateTopicUpdatedAt } from '@renderer/store/assistants'
import type { Message, MessageBlock } from '@renderer/types/newMessage'
import { isEmpty } from 'lodash'
import type { MessageDataSource, MessageExchange } from './types'
const logger = loggerService.withContext('DexieMessageDataSource')
/**
* Dexie-based implementation of MessageDataSource
* Handles local IndexedDB storage for regular chat messages
*/
export class DexieMessageDataSource implements MessageDataSource {
// ============ Read Operations ============
async fetchMessages(topicId: string): Promise<{
messages: Message[]
blocks: MessageBlock[]
}> {
try {
const topic = await db.topics.get(topicId)
const messages = topic?.messages || []
if (messages.length === 0) {
return { messages: [], blocks: [] }
}
const messageIds = messages.map((m) => m.id)
const blocks = await db.message_blocks.where('messageId').anyOf(messageIds).toArray()
// Ensure block IDs are strings for consistency
const messagesWithBlockIds = messages.map((m) => ({
...m,
blocks: m.blocks?.map(String) || []
}))
return { messages: messagesWithBlockIds, blocks: blocks || [] }
} catch (error) {
logger.error(`Failed to fetch messages for topic ${topicId}:`, error as Error)
throw error
}
}
async getRawTopic(topicId: string): Promise<{ id: string; messages: Message[] } | undefined> {
try {
return await db.topics.get(topicId)
} catch (error) {
logger.error(`Failed to get raw topic ${topicId}:`, error as Error)
throw error
}
}
// ============ Write Operations ============
async persistExchange(topicId: string, exchange: MessageExchange): Promise<void> {
try {
await db.transaction('rw', db.topics, db.message_blocks, async () => {
const topic = await db.topics.get(topicId)
if (!topic) {
throw new Error(`Topic ${topicId} not found`)
}
const updatedMessages = [...topic.messages]
const blocksToSave: MessageBlock[] = []
// Handle user message
if (exchange.user) {
const userIndex = updatedMessages.findIndex((m) => m.id === exchange.user!.message.id)
if (userIndex !== -1) {
updatedMessages[userIndex] = exchange.user.message
} else {
updatedMessages.push(exchange.user.message)
}
if (exchange.user.blocks.length > 0) {
blocksToSave.push(...exchange.user.blocks)
}
}
// Handle assistant message
if (exchange.assistant) {
const assistantIndex = updatedMessages.findIndex((m) => m.id === exchange.assistant!.message.id)
if (assistantIndex !== -1) {
updatedMessages[assistantIndex] = exchange.assistant.message
} else {
updatedMessages.push(exchange.assistant.message)
}
if (exchange.assistant.blocks.length > 0) {
blocksToSave.push(...exchange.assistant.blocks)
}
}
// Save blocks
if (blocksToSave.length > 0) {
await db.message_blocks.bulkPut(blocksToSave)
}
// Update topic with new messages
await db.topics.update(topicId, { messages: updatedMessages })
})
// Update Redux state
store.dispatch(updateTopicUpdatedAt({ topicId }))
} catch (error) {
logger.error(`Failed to persist exchange for topic ${topicId}:`, error as Error)
throw error
}
}
async appendMessage(topicId: string, message: Message, blocks: MessageBlock[], insertIndex?: number): Promise<void> {
try {
await db.transaction('rw', db.topics, db.message_blocks, async () => {
// Save blocks first
if (blocks.length > 0) {
await db.message_blocks.bulkPut(blocks)
}
// Get or create topic
let topic = await db.topics.get(topicId)
if (!topic) {
await db.topics.add({ id: topicId, messages: [] })
topic = await db.topics.get(topicId)
}
if (!topic) {
throw new Error(`Failed to create topic ${topicId}`)
}
const updatedMessages = [...(topic.messages || [])]
// Check if message already exists
const existingIndex = updatedMessages.findIndex((m) => m.id === message.id)
if (existingIndex !== -1) {
updatedMessages[existingIndex] = message
} else {
// Insert at specific index or append
if (insertIndex !== undefined && insertIndex >= 0 && insertIndex <= updatedMessages.length) {
updatedMessages.splice(insertIndex, 0, message)
} else {
updatedMessages.push(message)
}
}
await db.topics.update(topicId, { messages: updatedMessages })
})
store.dispatch(updateTopicUpdatedAt({ topicId }))
} catch (error) {
logger.error(`Failed to append message to topic ${topicId}:`, error as Error)
throw error
}
}
async updateMessage(topicId: string, messageId: string, updates: Partial<Message>): Promise<void> {
try {
await db.transaction('rw', db.topics, async () => {
await db.topics
.where('id')
.equals(topicId)
.modify((topic) => {
if (!topic || !topic.messages) return
const messageIndex = topic.messages.findIndex((m) => m.id === messageId)
if (messageIndex !== -1) {
Object.assign(topic.messages[messageIndex], updates)
}
})
})
store.dispatch(updateTopicUpdatedAt({ topicId }))
} catch (error) {
logger.error(`Failed to update message ${messageId} in topic ${topicId}:`, error as Error)
throw error
}
}
async updateMessageAndBlocks(
topicId: string,
messageUpdates: Partial<Message> & Pick<Message, 'id'>,
blocksToUpdate: MessageBlock[]
): Promise<void> {
try {
await db.transaction('rw', db.topics, db.message_blocks, async () => {
// Update blocks
if (blocksToUpdate.length > 0) {
await db.message_blocks.bulkPut(blocksToUpdate)
}
// Update message if there are actual changes beyond id and topicId
const keysToUpdate = Object.keys(messageUpdates).filter((key) => key !== 'id' && key !== 'topicId')
if (keysToUpdate.length > 0) {
await db.topics
.where('id')
.equals(topicId)
.modify((topic) => {
if (!topic || !topic.messages) return
const messageIndex = topic.messages.findIndex((m) => m.id === messageUpdates.id)
if (messageIndex !== -1) {
keysToUpdate.forEach((key) => {
;(topic.messages[messageIndex] as any)[key] = (messageUpdates as any)[key]
})
}
})
}
})
store.dispatch(updateTopicUpdatedAt({ topicId }))
} catch (error) {
logger.error(`Failed to update message and blocks for ${messageUpdates.id}:`, error as Error)
throw error
}
}
async deleteMessage(topicId: string, messageId: string): Promise<void> {
try {
await db.transaction('rw', db.topics, db.message_blocks, db.files, async () => {
const topic = await db.topics.get(topicId)
if (!topic) return
const messageIndex = topic.messages.findIndex((m) => m.id === messageId)
if (messageIndex === -1) return
const message = topic.messages[messageIndex]
const blockIds = message.blocks || []
// Delete blocks and handle files
if (blockIds.length > 0) {
const blocks = await db.message_blocks.where('id').anyOf(blockIds).toArray()
const files = blocks
.filter((block) => block.type === 'file' || block.type === 'image')
.map((block: any) => block.file)
.filter((file) => file !== undefined)
// Clean up files
if (!isEmpty(files)) {
await Promise.all(files.map((file) => FileManager.deleteFile(file.id, false)))
}
await db.message_blocks.bulkDelete(blockIds)
}
// Remove message from topic
topic.messages.splice(messageIndex, 1)
await db.topics.update(topicId, { messages: topic.messages })
})
store.dispatch(updateTopicUpdatedAt({ topicId }))
} catch (error) {
logger.error(`Failed to delete message ${messageId} from topic ${topicId}:`, error as Error)
throw error
}
}
async deleteMessagesByAskId(topicId: string, askId: string): Promise<void> {
try {
await db.transaction('rw', db.topics, db.message_blocks, db.files, async () => {
const topic = await db.topics.get(topicId)
if (!topic) return
// Find all messages with the given askId
const messagesToDelete = topic.messages.filter((m) => m.askId === askId || m.id === askId)
const blockIdsToDelete = messagesToDelete.flatMap((m) => m.blocks || [])
// Delete blocks and handle files
if (blockIdsToDelete.length > 0) {
const blocks = await db.message_blocks.where('id').anyOf(blockIdsToDelete).toArray()
const files = blocks
.filter((block) => block.type === 'file' || block.type === 'image')
.map((block: any) => block.file)
.filter((file) => file !== undefined)
if (!isEmpty(files)) {
await Promise.all(files.map((file) => FileManager.deleteFile(file.id, false)))
}
await db.message_blocks.bulkDelete(blockIdsToDelete)
}
// Filter out deleted messages
const remainingMessages = topic.messages.filter((m) => m.askId !== askId && m.id !== askId)
await db.topics.update(topicId, { messages: remainingMessages })
})
store.dispatch(updateTopicUpdatedAt({ topicId }))
} catch (error) {
logger.error(`Failed to delete messages with askId ${askId} from topic ${topicId}:`, error as Error)
throw error
}
}
// ============ Block Operations ============
async updateBlocks(blocks: MessageBlock[]): Promise<void> {
try {
if (blocks.length === 0) return
await db.message_blocks.bulkPut(blocks)
} catch (error) {
logger.error('Failed to update blocks:', error as Error)
throw error
}
}
async updateSingleBlock(blockId: string, updates: Partial<MessageBlock>): Promise<void> {
try {
await db.message_blocks.update(blockId, updates)
} catch (error) {
logger.error(`Failed to update block ${blockId}:`, error as Error)
throw error
}
}
async bulkAddBlocks(blocks: MessageBlock[]): Promise<void> {
try {
if (blocks.length === 0) return
await db.message_blocks.bulkAdd(blocks)
} catch (error) {
logger.error('Failed to bulk add blocks:', error as Error)
throw error
}
}
async deleteBlocks(blockIds: string[]): Promise<void> {
try {
if (blockIds.length === 0) return
// Get blocks to find associated files
const blocks = await db.message_blocks.where('id').anyOf(blockIds).toArray()
const files = blocks
.filter((block) => block.type === 'file' || block.type === 'image')
.map((block: any) => block.file)
.filter((file) => file !== undefined)
// Clean up files
if (!isEmpty(files)) {
await Promise.all(files.map((file) => FileManager.deleteFile(file.id, false)))
}
await db.message_blocks.bulkDelete(blockIds)
} catch (error) {
logger.error('Failed to delete blocks:', error as Error)
throw error
}
}
// ============ Batch Operations ============
async clearMessages(topicId: string): Promise<void> {
try {
await db.transaction('rw', db.topics, db.message_blocks, db.files, async () => {
const topic = await db.topics.get(topicId)
if (!topic) return
// Get all block IDs
const blockIds = topic.messages.flatMap((m) => m.blocks || [])
// Delete blocks and handle files
if (blockIds.length > 0) {
const blocks = await db.message_blocks.where('id').anyOf(blockIds).toArray()
const files = blocks
.filter((block) => block.type === 'file' || block.type === 'image')
.map((block: any) => block.file)
.filter((file) => file !== undefined)
if (!isEmpty(files)) {
await Promise.all(files.map((file) => FileManager.deleteFile(file.id, false)))
}
await db.message_blocks.bulkDelete(blockIds)
}
// Clear messages
await db.topics.update(topicId, { messages: [] })
})
store.dispatch(updateTopicUpdatedAt({ topicId }))
} catch (error) {
logger.error(`Failed to clear messages for topic ${topicId}:`, error as Error)
throw error
}
}
async topicExists(topicId: string): Promise<boolean> {
try {
const topic = await db.topics.get(topicId)
return !!topic
} catch (error) {
logger.error(`Failed to check if topic ${topicId} exists:`, error as Error)
return false
}
}
async ensureTopic(topicId: string): Promise<void> {
try {
const exists = await this.topicExists(topicId)
if (!exists) {
await db.topics.add({ id: topicId, messages: [] })
}
} catch (error) {
logger.error(`Failed to ensure topic ${topicId} exists:`, error as Error)
throw error
}
}
// ============ File Operations ============
async updateFileCount(fileId: string, delta: number): Promise<void> {
try {
await db.files
.where('id')
.equals(fileId)
.modify((f) => {
if (f) {
f.count = (f.count || 0) + delta
}
})
} catch (error) {
logger.error(`Failed to update file count for ${fileId}:`, error as Error)
throw error
}
}
async updateFileCounts(files: Array<{ id: string; delta: number }>): Promise<void> {
try {
await db.transaction('rw', db.files, async () => {
for (const file of files) {
await this.updateFileCount(file.id, file.delta)
}
})
} catch (error) {
logger.error('Failed to update file counts:', error as Error)
throw error
}
}
}

View File

@ -0,0 +1,89 @@
# Unified Data Access Layer
This module provides a unified interface for accessing message data from different sources:
- **DexieMessageDataSource**: Local IndexedDB storage for regular chat messages
- **AgentMessageDataSource**: Backend IPC storage for agent session messages
## Architecture
```
dbService (Facade)
├── Determines data source based on topicId
├── Routes to DexieMessageDataSource (regular chats)
└── Routes to AgentMessageDataSource (agent sessions)
```
## Usage
```typescript
import { dbService } from '@renderer/services/db'
// Fetch messages (automatically routes to correct source)
const { messages, blocks } = await dbService.fetchMessages(topicId)
// Save a message exchange
await dbService.persistExchange(topicId, {
user: { message: userMsg, blocks: userBlocks },
assistant: { message: assistantMsg, blocks: assistantBlocks }
})
// Append a single message
await dbService.appendMessage(topicId, message, blocks)
// Check if topic exists
const exists = await dbService.topicExists(topicId)
```
## Topic ID Convention
- Regular chat topics: Any string ID (e.g., "uuid-1234-5678")
- Agent session topics: Prefixed with "agent-session:" (e.g., "agent-session:session-123")
## Key Features
1. **Transparent Routing**: The facade automatically routes to the appropriate data source
2. **Consistent API**: Same methods work for both regular chats and agent sessions
3. **Type Safety**: Full TypeScript support with proper interfaces
4. **Error Handling**: Comprehensive error logging and propagation
5. **Extensibility**: Easy to add new data sources (e.g., cloud storage)
## Implementation Status
### DexieMessageDataSource ✅
- Full CRUD operations for messages and blocks
- Transaction support
- File cleanup on deletion
- Redux state updates
### AgentMessageDataSource ✅
- Fetch messages from backend
- Persist message exchanges
- Limited update/delete operations (by design)
- IPC communication with backend
## Migration Guide
### Before (Direct DB access):
```typescript
// In thunks
if (isAgentSessionTopicId(topicId)) {
// Special handling for agent sessions
const messages = await window.electron.ipcRenderer.invoke(...)
} else {
// Regular DB access
const topic = await db.topics.get(topicId)
}
```
### After (Unified access):
```typescript
// In thunks
const { messages, blocks } = await dbService.fetchMessages(topicId)
// No need to check topic type!
```
## Next Steps
Phase 2: Update Redux thunks to use dbService
Phase 3: Update components to use unified hooks
Phase 4: Remove AgentSessionMessages component

View File

@ -0,0 +1,206 @@
# Rollback Strategy for Unified Database Service Migration
## Overview
This document outlines the rollback procedures for the unified database service migration. The migration uses feature flags to enable gradual rollout and quick rollback capabilities.
## Quick Rollback (< 1 minute)
### Via Browser Console
```javascript
// Disable the unified DB service immediately
localStorage.setItem('featureFlags', JSON.stringify({ USE_UNIFIED_DB_SERVICE: false }))
location.reload()
```
### Via Code (Emergency)
```typescript
// In src/renderer/src/config/featureFlags.ts
export const featureFlags: FeatureFlags = {
USE_UNIFIED_DB_SERVICE: false // Change from true to false
}
```
## Rollback Triggers
Monitor these indicators to determine if rollback is needed:
### Critical Issues (Immediate Rollback)
- [ ] Data loss or corruption
- [ ] Application crashes on startup
- [ ] Complete failure to load messages
- [ ] Agent sessions completely broken
- [ ] Performance degradation > 50%
### Major Issues (Rollback within 1 hour)
- [ ] Intermittent message loading failures (> 10% error rate)
- [ ] Memory leaks detected
- [ ] Performance degradation 20-50%
- [ ] File upload/attachment issues
- [ ] Message editing/deletion not working
### Minor Issues (Consider Rollback)
- [ ] Performance degradation < 20%
- [ ] UI glitches or inconsistencies
- [ ] Non-critical features affected
- [ ] Increased error logs but functionality intact
## Rollback Procedures
### Level 1: Feature Flag Toggle (Immediate)
**When:** Any critical issue detected
**Time:** < 1 minute
**Data Impact:** None
1. Set feature flag to false:
```javascript
localStorage.setItem('featureFlags', JSON.stringify({ USE_UNIFIED_DB_SERVICE: false }))
```
2. Reload application
3. Verify original functionality restored
4. Alert team about rollback
### Level 2: Code Revert (Quick)
**When:** Feature flag not sufficient or broken
**Time:** < 5 minutes
**Data Impact:** None
1. Revert to previous commit:
```bash
git revert HEAD # If just deployed
# or
git checkout <last-known-good-commit>
```
2. Rebuild and deploy:
```bash
yarn build:check
yarn build
```
3. Test core functionality
4. Document issue for investigation
### Level 3: Full Rollback (Planned)
**When:** Systemic issues discovered
**Time:** 30 minutes
**Data Impact:** Potential data migration needed
1. Notify all stakeholders
2. Export any critical data if needed
3. Restore from backup branch:
```bash
git checkout main
git branch -D feature/unified-db-service
git push origin --delete feature/unified-db-service
```
4. Clean up any migration artifacts:
- Remove `messageThunk.v2.ts`
- Remove `src/renderer/src/services/db/` if created
- Remove feature flags configuration
5. Run full test suite
6. Deploy clean version
## Pre-Rollback Checklist
Before initiating rollback:
1. **Capture Current State**
- [ ] Export performance metrics
- [ ] Save error logs
- [ ] Document specific failure scenarios
- [ ] Note affected user percentage
2. **Preserve Evidence**
- [ ] Take screenshots of errors
- [ ] Export browser console logs
- [ ] Save network traces if relevant
- [ ] Backup current localStorage
3. **Communication**
- [ ] Notify development team
- [ ] Update status page if applicable
- [ ] Prepare user communication if needed
## Post-Rollback Actions
After successful rollback:
1. **Verification**
- [ ] Test message loading (regular chat)
- [ ] Test agent sessions
- [ ] Verify file attachments work
- [ ] Check message editing/deletion
- [ ] Confirm no data loss
2. **Investigation**
- [ ] Analyze performance metrics
- [ ] Review error logs
- [ ] Identify root cause
- [ ] Create bug report
3. **Planning**
- [ ] Document lessons learned
- [ ] Update rollback procedures if needed
- [ ] Plan fixes for identified issues
- [ ] Schedule retry with fixes
## Monitoring Commands
### Check Feature Flag Status
```javascript
// In browser console
JSON.parse(localStorage.getItem('featureFlags') || '{}')
```
### View Performance Metrics
```javascript
// In browser console (if performance monitor is exposed)
performanceMonitor.getAllComparisons()
```
### Check Error Rate
```javascript
// Check application logs
loggerService.getLogs().filter(log => log.level === 'error' && log.context.includes('DbService'))
```
## Recovery Validation
After rollback, validate system health:
1. **Functional Tests**
```bash
yarn test
yarn test:e2e # If available
```
2. **Manual Validation**
- Create new chat conversation
- Send messages with attachments
- Edit existing messages
- Delete messages
- Start agent session
- Load historical messages
3. **Performance Check**
- Message load time < 500ms
- No memory leaks after 10 minutes
- CPU usage normal
- Network requests successful
## Emergency Contacts
- **Tech Lead:** [Contact Info]
- **DevOps:** [Contact Info]
- **Product Owner:** [Contact Info]
## Rollback History
| Date | Version | Issue | Rollback Type | Resolution |
|------|---------|-------|---------------|------------|
| - | - | - | - | - |
## Notes
- Always prefer feature flag rollback first (least disruptive)
- Document any rollback in the history table above
- If multiple rollbacks needed, consider pausing migration
- Performance degradation baseline: original implementation metrics

View File

@ -0,0 +1,19 @@
/**
* Unified data access layer for messages
* Provides a consistent API for accessing messages from different sources
* (Dexie/IndexedDB for regular chats, IPC/Backend for agent sessions)
*/
// Export main service
export { DbService,dbService } from './DbService'
// Export types
export type { MessageDataSource, MessageExchange } from './types'
export {
buildAgentSessionTopicId,
extractSessionId,
isAgentSessionTopicId} from './types'
// Export implementations (for testing or direct access if needed)
export { AgentMessageDataSource } from './AgentMessageDataSource'
export { DexieMessageDataSource } from './DexieMessageDataSource'

View File

@ -0,0 +1,145 @@
import type { Message, MessageBlock } from '@renderer/types/newMessage'
/**
* Message exchange data structure for persisting user-assistant conversations
*/
export interface MessageExchange {
user?: {
message: Message
blocks: MessageBlock[]
}
assistant?: {
message: Message
blocks: MessageBlock[]
}
// For agent sessions
agentSessionId?: string
}
/**
* Unified interface for message data operations
* Implementations can be backed by Dexie, IPC, or other storage mechanisms
*/
export interface MessageDataSource {
// ============ Read Operations ============
/**
* Fetch all messages and blocks for a topic
*/
fetchMessages(
topicId: string,
forceReload?: boolean
): Promise<{
messages: Message[]
blocks: MessageBlock[]
}>
/**
* Get raw topic data (just id and messages)
*/
getRawTopic?(topicId: string): Promise<{ id: string; messages: Message[] } | undefined>
// ============ Write Operations ============
/**
* Persist a complete message exchange (user + assistant)
*/
persistExchange(topicId: string, exchange: MessageExchange): Promise<void>
/**
* Append a single message with its blocks
*/
appendMessage(topicId: string, message: Message, blocks: MessageBlock[], insertIndex?: number): Promise<void>
/**
* Update an existing message
*/
updateMessage(topicId: string, messageId: string, updates: Partial<Message>): Promise<void>
/**
* Update existing message and its blocks
*/
updateMessageAndBlocks(
topicId: string,
messageUpdates: Partial<Message> & Pick<Message, 'id'>,
blocksToUpdate: MessageBlock[]
): Promise<void>
/**
* Delete a single message and its blocks
*/
deleteMessage(topicId: string, messageId: string): Promise<void>
/**
* Delete messages by askId (user query + assistant responses)
*/
deleteMessagesByAskId(topicId: string, askId: string): Promise<void>
// ============ Block Operations ============
/**
* Update multiple blocks
*/
updateBlocks(blocks: MessageBlock[]): Promise<void>
/**
* Update single block
*/
updateSingleBlock?(blockId: string, updates: Partial<MessageBlock>): Promise<void>
/**
* Bulk add blocks (for cloning operations)
*/
bulkAddBlocks?(blocks: MessageBlock[]): Promise<void>
/**
* Delete multiple blocks
*/
deleteBlocks(blockIds: string[]): Promise<void>
// ============ Batch Operations ============
/**
* Clear all messages in a topic
*/
clearMessages(topicId: string): Promise<void>
/**
* Check if topic exists
*/
topicExists(topicId: string): Promise<boolean>
/**
* Create or ensure topic exists
*/
ensureTopic(topicId: string): Promise<void>
// ============ File Operations (Optional) ============
/**
* Update file reference count
*/
updateFileCount?(fileId: string, delta: number): Promise<void>
/**
* Update multiple file reference counts
*/
updateFileCounts?(files: Array<{ id: string; delta: number }>): Promise<void>
}
/**
* Type guard to check if a topic ID is for an agent session
*/
export function isAgentSessionTopicId(topicId: string): boolean {
return topicId.startsWith('agent-session:')
}
/**
* Extract session ID from agent session topic ID
*/
export function extractSessionId(topicId: string): string {
return topicId.replace('agent-session:', '')
}
/**
* Build agent session topic ID from session ID
*/
export function buildAgentSessionTopicId(sessionId: string): string {
return `agent-session:${sessionId}`
}

View File

@ -1,5 +1,6 @@
import { loggerService } from '@logger'
import { AiSdkToChunkAdapter } from '@renderer/aiCore/chunk/AiSdkToChunkAdapter'
import { featureFlags } from '@renderer/config/featureFlags'
import db from '@renderer/databases'
import FileManager from '@renderer/services/FileManager'
import { BlockManager } from '@renderer/services/messageStreaming/BlockManager'
@ -16,7 +17,7 @@ import type { FileMessageBlock, ImageMessageBlock, Message, MessageBlock } from
import { AssistantMessageStatus, MessageBlockStatus, MessageBlockType } from '@renderer/types/newMessage'
import { uuid } from '@renderer/utils'
import { addAbortController } from '@renderer/utils/abortController'
import { isAgentSessionTopicId } from '@renderer/utils/agentSession'
import { buildAgentSessionTopicId, isAgentSessionTopicId } from '@renderer/utils/agentSession'
import {
createAssistantMessage,
createTranslationBlock,
@ -34,6 +35,7 @@ import { LRUCache } from 'lru-cache'
import type { AppDispatch, RootState } from '../index'
import { removeManyBlocks, updateOneBlock, upsertManyBlocks, upsertOneBlock } from '../messageBlock'
import { newMessagesActions, selectMessagesForTopic } from '../newMessage'
import { loadTopicMessagesThunkV2 } from './messageThunk.v2'
const logger = loggerService.withContext('MessageThunk')
@ -782,6 +784,52 @@ export const sendMessage =
}
}
/**
* Loads agent session messages from backend
*/
export const loadAgentSessionMessagesThunk =
(sessionId: string) => async (dispatch: AppDispatch, getState: () => RootState) => {
const topicId = buildAgentSessionTopicId(sessionId)
try {
dispatch(newMessagesActions.setTopicLoading({ topicId, loading: true }))
// Fetch from agent backend
const historicalMessages = await window.electron?.ipcRenderer.invoke(IpcChannel.AgentMessage_GetHistory, {
sessionId
})
if (historicalMessages && Array.isArray(historicalMessages)) {
const messages: Message[] = []
const blocks: MessageBlock[] = []
for (const persistedMsg of historicalMessages) {
if (persistedMsg?.message) {
messages.push(persistedMsg.message)
if (persistedMsg.blocks && persistedMsg.blocks.length > 0) {
blocks.push(...persistedMsg.blocks)
}
}
}
// Update Redux store
if (blocks.length > 0) {
dispatch(upsertManyBlocks(blocks))
}
dispatch(newMessagesActions.messagesReceived({ topicId, messages }))
logger.info(`Loaded ${messages.length} messages for agent session ${sessionId}`)
} else {
dispatch(newMessagesActions.messagesReceived({ topicId, messages: [] }))
}
} catch (error) {
logger.error(`Failed to load agent session messages for ${sessionId}:`, error as Error)
dispatch(newMessagesActions.messagesReceived({ topicId, messages: [] }))
} finally {
dispatch(newMessagesActions.setTopicLoading({ topicId, loading: false }))
}
}
/**
* Loads messages and their blocks for a specific topic from the database
* and updates the Redux store.
@ -789,10 +837,26 @@ export const sendMessage =
export const loadTopicMessagesThunk =
(topicId: string, forceReload: boolean = false) =>
async (dispatch: AppDispatch, getState: () => RootState) => {
// Use V2 implementation if feature flag is enabled
if (featureFlags.USE_UNIFIED_DB_SERVICE) {
return loadTopicMessagesThunkV2(topicId, forceReload)(dispatch, getState)
}
// Original implementation
const state = getState()
const topicMessagesExist = !!state.messages.messageIdsByTopic[topicId]
dispatch(newMessagesActions.setCurrentTopicId(topicId))
// Check if it's an agent session topic
if (isAgentSessionTopicId(topicId)) {
if (topicMessagesExist && !forceReload) {
return // Keep existing messages in memory
}
// Load from agent backend instead of local DB
const sessionId = topicId.replace('agent-session:', '')
return dispatch(loadAgentSessionMessagesThunk(sessionId))
}
if (topicMessagesExist && !forceReload) {
return
}

View File

@ -0,0 +1,274 @@
/**
* V2 implementations of message thunk functions using the unified DbService
* These implementations will be gradually rolled out using feature flags
*/
import { loggerService } from '@logger'
import { dbService } from '@renderer/services/db'
import type { Topic } from '@renderer/types'
import { TopicType } from '@renderer/types'
import type { Message, MessageBlock } from '@renderer/types/newMessage'
import { isAgentSessionTopicId } from '@renderer/utils/agentSession'
import type { AppDispatch, RootState } from '../index'
import { upsertManyBlocks } from '../messageBlock'
import { newMessagesActions } from '../newMessage'
const logger = loggerService.withContext('MessageThunkV2')
// =================================================================
// Phase 2.1 - Batch 1: Read-only operations (lowest risk)
// =================================================================
/**
* Load messages for a topic using unified DbService
* This is the V2 implementation that will replace the original
*/
export const loadTopicMessagesThunkV2 =
(topicId: string, forceReload: boolean = false) =>
async (dispatch: AppDispatch, getState: () => RootState) => {
const state = getState()
// Skip if already cached and not forcing reload
if (!forceReload && state.messages.messageIdsByTopic[topicId]) {
logger.info('Messages already cached for topic', { topicId })
return
}
try {
dispatch(newMessagesActions.setTopicLoading({ topicId, loading: true }))
// Unified call - no need to check isAgentSessionTopicId
const { messages, blocks } = await dbService.fetchMessages(topicId)
logger.info('Loaded messages via DbService', {
topicId,
messageCount: messages.length,
blockCount: blocks.length
})
// Update Redux state with fetched data
if (blocks.length > 0) {
dispatch(upsertManyBlocks(blocks))
}
dispatch(newMessagesActions.messagesReceived({ topicId, messages }))
} catch (error) {
logger.error(`Failed to load messages for topic ${topicId}:`, error)
// Could dispatch an error action here if needed
} finally {
dispatch(newMessagesActions.setTopicLoading({ topicId, loading: false }))
dispatch(newMessagesActions.setTopicFulfilled({ topicId, fulfilled: true }))
}
}
/**
* Get raw topic data using unified DbService
* Returns topic with messages array
*/
export const getRawTopicV2 = async (topicId: string): Promise<{ id: string; messages: Message[] } | undefined> => {
try {
const rawTopic = await dbService.getRawTopic(topicId)
logger.info('Retrieved raw topic via DbService', { topicId, found: !!rawTopic })
return rawTopic
} catch (error) {
logger.error('Failed to get raw topic:', { topicId, error })
return undefined
}
}
// =================================================================
// Phase 2.2 - Batch 2: Helper functions
// =================================================================
/**
* Get a full topic object with type information
* This builds on getRawTopicV2 to provide additional metadata
*/
export const getTopicV2 = async (topicId: string): Promise<Topic | undefined> => {
try {
const rawTopic = await dbService.getRawTopic(topicId)
if (!rawTopic) {
logger.info('Topic not found', { topicId })
return undefined
}
// Construct the full Topic object
const topic: Topic = {
id: rawTopic.id,
type: isAgentSessionTopicId(topicId) ? TopicType.AgentSession : TopicType.Chat,
messages: rawTopic.messages,
assistantId: '', // These fields would need to be fetched from appropriate source
name: '',
createdAt: Date.now(),
updatedAt: Date.now()
}
logger.info('Retrieved topic with type via DbService', {
topicId,
type: topic.type,
messageCount: topic.messages.length
})
return topic
} catch (error) {
logger.error('Failed to get topic:', { topicId, error })
return undefined
}
}
/**
* Update file reference count
* Only applies to Dexie data source, no-op for agent sessions
*/
export const updateFileCountV2 = async (
fileId: string,
delta: number,
deleteIfZero: boolean = false
): Promise<void> => {
try {
await dbService.updateFileCount(fileId, delta, deleteIfZero)
logger.info('Updated file count', { fileId, delta, deleteIfZero })
} catch (error) {
logger.error('Failed to update file count:', { fileId, delta, error })
throw error
}
}
// =================================================================
// Phase 2.3 - Batch 3: Delete operations
// =================================================================
/**
* Delete a single message from database
*/
export const deleteMessageFromDBV2 = async (topicId: string, messageId: string): Promise<void> => {
try {
await dbService.deleteMessage(topicId, messageId)
logger.info('Deleted message via DbService', { topicId, messageId })
} catch (error) {
logger.error('Failed to delete message:', { topicId, messageId, error })
throw error
}
}
/**
* Delete multiple messages from database
*/
export const deleteMessagesFromDBV2 = async (topicId: string, messageIds: string[]): Promise<void> => {
try {
await dbService.deleteMessages(topicId, messageIds)
logger.info('Deleted messages via DbService', { topicId, count: messageIds.length })
} catch (error) {
logger.error('Failed to delete messages:', { topicId, messageIds, error })
throw error
}
}
/**
* Clear all messages from a topic
*/
export const clearMessagesFromDBV2 = async (topicId: string): Promise<void> => {
try {
await dbService.clearMessages(topicId)
logger.info('Cleared all messages via DbService', { topicId })
} catch (error) {
logger.error('Failed to clear messages:', { topicId, error })
throw error
}
}
// =================================================================
// Phase 2.4 - Batch 4: Complex write operations
// =================================================================
/**
* Save a message and its blocks to database
* Uses unified interface, no need for isAgentSessionTopicId check
*/
export const saveMessageAndBlocksToDBV2 = async (
topicId: string,
message: Message,
blocks: MessageBlock[]
): Promise<void> => {
try {
// Direct call without conditional logic
await dbService.appendMessage(topicId, message, blocks)
logger.info('Saved message and blocks via DbService', {
topicId,
messageId: message.id,
blockCount: blocks.length
})
} catch (error) {
logger.error('Failed to save message and blocks:', { topicId, messageId: message.id, error })
throw error
}
}
/**
* Persist a message exchange (user + assistant messages)
*/
export const persistExchangeV2 = async (
topicId: string,
exchange: {
user?: { message: Message; blocks: MessageBlock[] }
assistant?: { message: Message; blocks: MessageBlock[] }
}
): Promise<void> => {
try {
await dbService.persistExchange(topicId, exchange)
logger.info('Persisted exchange via DbService', {
topicId,
hasUser: !!exchange.user,
hasAssistant: !!exchange.assistant
})
} catch (error) {
logger.error('Failed to persist exchange:', { topicId, error })
throw error
}
}
// Note: sendMessageV2 would be implemented here but it's more complex
// and would require more of the supporting code from messageThunk.ts
// =================================================================
// Phase 2.5 - Batch 5: Update operations
// =================================================================
/**
* Update a message in the database
*/
export const updateMessageV2 = async (topicId: string, messageId: string, updates: Partial<Message>): Promise<void> => {
try {
await dbService.updateMessage(topicId, messageId, updates)
logger.info('Updated message via DbService', { topicId, messageId })
} catch (error) {
logger.error('Failed to update message:', { topicId, messageId, error })
throw error
}
}
/**
* Update a single message block
*/
export const updateSingleBlockV2 = async (blockId: string, updates: Partial<MessageBlock>): Promise<void> => {
try {
await dbService.updateSingleBlock(blockId, updates)
logger.info('Updated single block via DbService', { blockId })
} catch (error) {
logger.error('Failed to update single block:', { blockId, error })
throw error
}
}
/**
* Bulk add message blocks
*/
export const bulkAddBlocksV2 = async (blocks: MessageBlock[]): Promise<void> => {
try {
await dbService.bulkAddBlocks(blocks)
logger.info('Bulk added blocks via DbService', { count: blocks.length })
} catch (error) {
logger.error('Failed to bulk add blocks:', { count: blocks.length, error })
throw error
}
}