diff --git a/.env.example b/.env.example index 0d57ffc033..11a73b7d4f 100644 --- a/.env.example +++ b/.env.example @@ -6,3 +6,6 @@ CSLOGGER_MAIN_LEVEL=info CSLOGGER_RENDERER_LEVEL=info #CSLOGGER_MAIN_SHOW_MODULES= #CSLOGGER_RENDERER_SHOW_MODULES= + +# Feature Flags (must be prefixed with VITE_ to be accessible in renderer) +# VITE_USE_UNIFIED_DB_SERVICE=true # Enable unified DB service for chat/agent sessions diff --git a/TODO.md b/TODO.md new file mode 100644 index 0000000000..40654aa7aa --- /dev/null +++ b/TODO.md @@ -0,0 +1,518 @@ +# 统一 Chat 和 Agent Session 数据层架构重构方案 + +## 目标 +通过创建统一的数据访问层,消除 AgentSessionMessages 和 Messages 组件的重复代码,实现普通聊天和 Agent 会话的统一处理。 + +## 核心设计 +使用门面模式 (Facade Pattern) 和策略模式 (Strategy Pattern) 创建统一的数据访问层,对外提供一致的 API,内部根据 topicId 类型自动路由到不同的数据源。 + +## 架构设计 + +``` +┌─────────────────────────────────────────┐ +│ UI Components │ +│ (Messages, Inputbar - 完全复用) │ +└─────────────────────────────────────────┘ + │ +┌─────────────────────────────────────────┐ +│ Hooks & Selectors │ +│ (useTopic, useTopicMessages - 统一) │ +└─────────────────────────────────────────┘ + │ +┌─────────────────────────────────────────┐ +│ Redux Thunks │ +│ (不再判断 isAgentSessionTopicId) │ +└─────────────────────────────────────────┘ + │ +┌─────────────────────────────────────────┐ +│ DbService (门面) │ +│ 根据 topicId 内部路由到对应数据源 │ +└─────────────────────────────────────────┘ + │ + ┌───────────┴───────────┐ +┌──────────────┐ ┌──────────────────┐ +│ DexieMessage │ │ AgentMessage │ +│ DataSource │ │ DataSource │ +│ │ │ │ +│ (Dexie) │ │ (IPC/Backend) │ +└──────────────┘ └──────────────────┘ +``` + +## 实施计划 + +### Phase 1: 创建数据访问层 (`src/renderer/src/services/db/`) + +#### 1.1 定义 MessageDataSource 接口 +```typescript +// src/renderer/src/services/db/types.ts +interface MessageDataSource { + // 读取操作 + fetchMessages(topicId: string): Promise<{ messages: Message[], blocks: MessageBlock[] }> + getRawTopic(topicId: string): Promise<{ id: string; messages: Message[] }> + + // 写入操作 + persistExchange(topicId: string, exchange: MessageExchange): Promise + appendMessage(topicId: string, message: Message, blocks: MessageBlock[]): Promise + updateMessage(topicId: string, messageId: string, updates: Partial): Promise + deleteMessage(topicId: string, messageId: string): Promise + + // 批量操作 + clearMessages(topicId: string): Promise + updateBlocks(blocks: MessageBlock[]): Promise +} + +interface MessageExchange { + user?: { message: Message, blocks: MessageBlock[] } + assistant?: { message: Message, blocks: MessageBlock[] } +} +``` + +#### 1.2 实现 DexieMessageDataSource +```typescript +// src/renderer/src/services/db/DexieMessageDataSource.ts +class DexieMessageDataSource implements MessageDataSource { + async fetchMessages(topicId: string) { + const topic = await db.topics.get(topicId) + const messages = topic?.messages || [] + const messageIds = messages.map(m => m.id) + const blocks = await db.message_blocks.where('messageId').anyOf(messageIds).toArray() + return { messages, blocks } + } + + async persistExchange(topicId: string, exchange: MessageExchange) { + // 保存到 Dexie 数据库 + await db.transaction('rw', db.topics, db.message_blocks, async () => { + // ... 现有的保存逻辑 + }) + } + // ... 其他方法实现 +} +``` + +#### 1.3 实现 AgentMessageDataSource +```typescript +// src/renderer/src/services/db/AgentMessageDataSource.ts +class AgentMessageDataSource implements MessageDataSource { + async fetchMessages(topicId: string) { + const sessionId = topicId.replace('agent-session:', '') + const historicalMessages = await window.electron.ipcRenderer.invoke( + IpcChannel.AgentMessage_GetHistory, + { sessionId } + ) + + const messages: Message[] = [] + const blocks: MessageBlock[] = [] + + for (const msg of historicalMessages) { + if (msg?.message) { + messages.push(msg.message) + if (msg.blocks) blocks.push(...msg.blocks) + } + } + + return { messages, blocks } + } + + async persistExchange(topicId: string, exchange: MessageExchange) { + const sessionId = topicId.replace('agent-session:', '') + await window.electron.ipcRenderer.invoke( + IpcChannel.AgentMessage_PersistExchange, + { sessionId, ...exchange } + ) + } + // ... 其他方法实现 +} +``` + +#### 1.4 创建 DbService 门面 +```typescript +// src/renderer/src/services/db/DbService.ts +class DbService { + private dexieSource = new DexieMessageDataSource() + private agentSource = new AgentMessageDataSource() + + private getDataSource(topicId: string): MessageDataSource { + if (isAgentSessionTopicId(topicId)) { + return this.agentSource + } + // 未来可扩展其他数据源判断 + return this.dexieSource + } + + async fetchMessages(topicId: string) { + return this.getDataSource(topicId).fetchMessages(topicId) + } + + async persistExchange(topicId: string, exchange: MessageExchange) { + return this.getDataSource(topicId).persistExchange(topicId, exchange) + } + + // ... 代理其他方法 +} + +export const dbService = new DbService() +``` + +### Phase 2: 重构 Redux Thunks(详细拆分) + +由于 messageThunk.ts 改动较大,将 Phase 2 分成多个批次逐步实施: + +#### 2.0 准备工作 +- [ ] 添加 Feature Flag: `USE_UNIFIED_DB_SERVICE` +- [ ] 创建 messageThunk.v2.ts 作为临时过渡文件 +- [ ] 准备回滚方案 + +#### 2.1 批次1:只读操作重构(风险最低) +这批改动只涉及读取操作,不会影响数据写入,风险最低。 + +##### 需要重构的函数 +```typescript +// loadTopicMessagesThunk +export const loadTopicMessagesThunkV2 = (topicId: string, forceReload: boolean = false) => + async (dispatch: AppDispatch, getState: () => RootState) => { + const state = getState() + if (!forceReload && state.messages.messageIdsByTopic[topicId]) { + return // 已有缓存 + } + + try { + dispatch(newMessagesActions.setTopicLoading({ topicId, loading: true })) + + // 新:统一调用 + const { messages, blocks } = await dbService.fetchMessages(topicId) + + if (blocks.length > 0) { + dispatch(upsertManyBlocks(blocks)) + } + dispatch(newMessagesActions.messagesReceived({ topicId, messages })) + } catch (error) { + logger.error(`Failed to load messages for topic ${topicId}:`, error) + } finally { + dispatch(newMessagesActions.setTopicLoading({ topicId, loading: false })) + } + } + +// getRawTopic +export const getRawTopicV2 = async (topicId: string) => { + return await dbService.getRawTopic(topicId) +} +``` + +##### 测试清单 +- [ ] 普通 Topic 消息加载 +- [ ] Agent Session 消息加载 +- [ ] 缓存机制正常工作 +- [ ] 错误处理 + +#### 2.2 批次2:辅助函数重构 +这批函数不直接操作数据库,但依赖数据库操作。 + +##### 需要重构的函数 +```typescript +// getTopic +export const getTopicV2 = async (topicId: string): Promise => { + const rawTopic = await dbService.getRawTopic(topicId) + if (!rawTopic) return undefined + + return { + id: rawTopic.id, + type: isAgentSessionTopicId(topicId) ? TopicType.AgentSession : TopicType.Chat, + messages: rawTopic.messages, + // ... 其他字段 + } +} + +// updateFileCount +export const updateFileCountV2 = async ( + fileId: string, + delta: number, + deleteIfZero = false +) => { + // 只对 Dexie 数据源有效 + if (dbService.supportsFileCount) { + await dbService.updateFileCount(fileId, delta, deleteIfZero) + } +} +``` + +##### 测试清单 +- [ ] getTopic 返回正确的 Topic 类型 +- [ ] updateFileCount 只在支持的数据源上执行 +- [ ] 边界条件测试 + +#### 2.3 批次3:删除操作重构 +删除操作相对独立,风险可控。 + +##### 需要重构的函数 +```typescript +// deleteMessageFromDB +export const deleteMessageFromDBV2 = async ( + topicId: string, + messageId: string +): Promise => { + await dbService.deleteMessage(topicId, messageId) +} + +// deleteMessagesFromDB +export const deleteMessagesFromDBV2 = async ( + topicId: string, + messageIds: string[] +): Promise => { + await dbService.deleteMessages(topicId, messageIds) +} + +// clearMessagesFromDB +export const clearMessagesFromDBV2 = async (topicId: string): Promise => { + await dbService.clearMessages(topicId) +} +``` + +##### 测试清单 +- [ ] 单个消息删除 +- [ ] 批量消息删除 +- [ ] 清空所有消息 +- [ ] 文件引用计数正确更新 +- [ ] Agent Session 删除操作(应为 no-op) + +#### 2.4 批次4:复杂写入操作重构 +这批包含最复杂的写入逻辑,需要特别注意。 + +##### 需要重构的函数 +```typescript +// saveMessageAndBlocksToDB +export const saveMessageAndBlocksToDBV2 = async ( + topicId: string, + message: Message, + blocks: MessageBlock[] +): Promise => { + // 移除 isAgentSessionTopicId 判断 + await dbService.appendMessage(topicId, message, blocks) +} + +// persistExchange +export const persistExchangeV2 = async ( + topicId: string, + exchange: MessageExchange +): Promise => { + await dbService.persistExchange(topicId, exchange) +} + +// sendMessage (最复杂的函数) +export const sendMessageV2 = (userMessage, userMessageBlocks, assistant, topicId, agentSession?) => + async (dispatch, getState) => { + // 保存用户消息 - 统一接口 + await dbService.appendMessage(topicId, userMessage, userMessageBlocks) + dispatch(newMessagesActions.addMessage({ topicId, message: userMessage })) + + // ... 创建助手消息 ... + + // 保存交换对 - 统一接口 + await dbService.persistExchange(topicId, { + user: { message: userMessage, blocks: userMessageBlocks }, + assistant: { message: assistantMessage, blocks: [] } + }) + } +``` + +##### 测试清单 +- [ ] 普通消息发送流程 +- [ ] Agent Session 消息发送流程 +- [ ] 消息块正确保存 +- [ ] Redux state 正确更新 +- [ ] 流式响应处理 +- [ ] 错误处理和重试机制 + +#### 2.5 批次5:更新操作重构 +更新操作通常涉及消息编辑、状态更新等。 + +##### 需要重构的函数 +```typescript +// updateMessage +export const updateMessageV2 = async ( + topicId: string, + messageId: string, + updates: Partial +): Promise => { + await dbService.updateMessage(topicId, messageId, updates) +} + +// updateSingleBlock +export const updateSingleBlockV2 = async ( + blockId: string, + updates: Partial +): Promise => { + await dbService.updateSingleBlock(blockId, updates) +} + +// bulkAddBlocks +export const bulkAddBlocksV2 = async (blocks: MessageBlock[]): Promise => { + await dbService.bulkAddBlocks(blocks) +} +``` + +##### 测试清单 +- [ ] 消息内容更新 +- [ ] 消息状态更新 +- [ ] 消息块更新 +- [ ] 批量块添加 +- [ ] Agent Session 更新操作(应为 no-op) + +#### 2.6 迁移策略 + +##### 阶段1:并行运行(Week 1) +```typescript +export const loadTopicMessagesThunk = (topicId: string, forceReload: boolean = false) => { + if (featureFlags.USE_UNIFIED_DB_SERVICE) { + return loadTopicMessagesThunkV2(topicId, forceReload) + } + return loadTopicMessagesThunkOriginal(topicId, forceReload) +} +``` + +##### 阶段2:灰度测试(Week 2) +- 10% 用户使用新实现 +- 监控性能和错误率 +- 收集用户反馈 + +##### 阶段3:全量迁移(Week 3) +- 100% 用户使用新实现 +- 保留 feature flag 一周观察 +- 准备回滚方案 + +##### 阶段4:代码清理(Week 4) +- 移除旧实现代码 +- 移除 feature flag +- 更新文档 + +#### 2.8 回滚计划 + +如果出现问题,按以下步骤回滚: + +1. **立即回滚**(< 5分钟) + - 关闭 feature flag + - 所有流量回到旧实现 + +2. **修复后重试** + - 分析问题原因 + - 修复并添加测试 + - 小范围测试后重新上线 + +3. **彻底回滚**(如果问题严重) + - 恢复到改动前的代码版本 + - 重新评估方案 + +### Phase 3: 统一 Hooks 层 + +#### 3.1 创建统一的 useTopic Hook +```typescript +// src/renderer/src/hooks/useTopic.ts +export const useTopic = (topicIdOrSessionId: string): Topic => { + const topicId = buildTopicId(topicIdOrSessionId) // 处理映射 + const [topic, setTopic] = useState() + + useEffect(() => { + dbService.fetchTopic(topicId).then(setTopic) + }, [topicId]) + + return topic +} +``` + +#### 3.2 统一 useTopicMessages +```typescript +// src/renderer/src/hooks/useTopicMessages.ts +export const useTopicMessages = (topicId: string) => { + const messages = useAppSelector(state => selectMessagesForTopic(state, topicId)) + const dispatch = useAppDispatch() + + useEffect(() => { + dispatch(loadTopicMessagesThunk(topicId)) + }, [topicId, dispatch]) + + return messages // 无需区分数据源 +} +``` + +### Phase 4: UI 组件复用 + +#### 4.1 直接使用 Messages 组件 +- 删除 `AgentSessionMessages.tsx` +- 在 Agent 会话页面直接使用 `Messages` 组件 + +#### 4.2 轻量化 AgentSessionInputbar +```typescript +// src/renderer/src/pages/home/Inputbar/AgentSessionInputbar.tsx +const AgentSessionInputbar: FC = ({ agentId, sessionId }) => { + const topicId = buildAgentSessionTopicId(sessionId) + const assistant = deriveAssistantFromAgent(agentId) // 从 agent 派生 assistant + const topic = useTopic(topicId) // 使用统一 hook + + return +} +``` + +### Phase 5: 测试和迁移 + +#### 5.1 单元测试 +- [ ] DbService 路由逻辑测试 +- [ ] DexieMessageDataSource CRUD 测试 +- [ ] AgentMessageDataSource CRUD 测试 +- [ ] 数据格式兼容性测试 + +#### 5.2 集成测试 +- [ ] 普通聊天全流程 +- [ ] Agent 会话全流程 +- [ ] 消息编辑/删除 +- [ ] 分支功能 +- [ ] 流式响应 + +#### 5.3 性能测试 +- [ ] 大量消息加载 +- [ ] 内存占用 +- [ ] 响应延迟 + +## 优势分析 + +### 代码精简度 +- **组件层**: 减少 ~500 行(删除 AgentSessionMessages) +- **Thunk 层**: 减少 ~300 行(移除条件判断) +- **总计减少**: ~40% 重复代码 + +### 架构优势 +1. **单一职责**: 数据访问逻辑完全独立 +2. **开闭原则**: 新增数据源只需实现接口 +3. **依赖倒置**: 高层模块不依赖具体实现 +4. **接口隔离**: 清晰的 API 边界 + +### 维护性提升 +- 统一的数据访问接口 +- 减少条件判断分支 +- 便于单元测试 +- 易于调试和追踪 + +## 风险控制 + +### 潜在风险 +1. **数据一致性**: 确保两种数据源的数据格式一致 +2. **性能开销**: 门面层可能带来轻微性能损失(<5ms) +3. **缓存策略**: Agent 数据不应缓存到本地数据库 + +### 缓解措施 +1. 添加数据格式验证层 +2. 使用轻量级代理,避免过度抽象 +3. 在 DbService 层明确缓存策略 + +## 实施建议 + +### 渐进式迁移 +1. **Week 1**: 实现数据访问层,不改动现有代码 +2. **Week 2**: 逐个迁移 thunk 函数,保持向后兼容 +3. **Week 3**: 统一组件层,充分测试 + +### 回滚策略 +- 保留原有代码分支 +- 通过 feature flag 控制新旧实现切换 +- 分阶段灰度发布 + +## 总结 +这个方案通过门面模式和统一的数据访问接口,实现了普通聊天和 Agent 会话的完全统一,大幅减少了代码重复,提升了系统的可维护性和可扩展性。 diff --git a/packages/shared/IpcChannel.ts b/packages/shared/IpcChannel.ts index e3ee5edf22..10de86024a 100644 --- a/packages/shared/IpcChannel.ts +++ b/packages/shared/IpcChannel.ts @@ -91,6 +91,7 @@ export enum IpcChannel { // agent messages AgentMessage_PersistExchange = 'agent-message:persist-exchange', + AgentMessage_GetHistory = 'agent-message:get-history', //copilot Copilot_GetAuthMessage = 'copilot:get-auth-message', diff --git a/src/main/ipc.ts b/src/main/ipc.ts index b770e7def0..c375d86834 100644 --- a/src/main/ipc.ts +++ b/src/main/ipc.ts @@ -209,6 +209,15 @@ export function registerIpc(mainWindow: BrowserWindow, app: Electron.App) { } }) + ipcMain.handle(IpcChannel.AgentMessage_GetHistory, async (_event, { sessionId }: { sessionId: string }) => { + try { + return await agentMessageRepository.getSessionHistory(sessionId) + } catch (error) { + logger.error('Failed to get agent session history', error as Error) + throw error + } + }) + //only for mac if (isMac) { ipcMain.handle(IpcChannel.App_MacIsProcessTrusted, (): boolean => { diff --git a/src/main/services/agents/database/sessionMessageRepository.ts b/src/main/services/agents/database/sessionMessageRepository.ts index d6a767cf09..2963cce327 100644 --- a/src/main/services/agents/database/sessionMessageRepository.ts +++ b/src/main/services/agents/database/sessionMessageRepository.ts @@ -7,6 +7,7 @@ import type { AgentPersistedMessage, AgentSessionMessageEntity } from '@types' +import { asc, eq } from 'drizzle-orm' import { BaseService } from '../BaseService' import type { InsertSessionMessageRow } from './schema' @@ -176,6 +177,34 @@ class AgentMessageRepository extends BaseService { return result } + + async getSessionHistory(sessionId: string): Promise { + await AgentMessageRepository.initialize() + this.ensureInitialized() + + try { + const rows = await this.database + .select() + .from(sessionMessagesTable) + .where(eq(sessionMessagesTable.session_id, sessionId)) + .orderBy(asc(sessionMessagesTable.created_at)) + + const messages: AgentPersistedMessage[] = [] + + for (const row of rows) { + const deserialized = this.deserialize(row) + if (deserialized?.content) { + messages.push(deserialized.content as AgentPersistedMessage) + } + } + + logger.info(`Loaded ${messages.length} messages for session ${sessionId}`) + return messages + } catch (error) { + logger.error('Failed to load session history', error as Error) + throw error + } + } } export const agentMessageRepository = AgentMessageRepository.getInstance() diff --git a/src/renderer/src/config/featureFlags.ts b/src/renderer/src/config/featureFlags.ts new file mode 100644 index 0000000000..a2e7492bd1 --- /dev/null +++ b/src/renderer/src/config/featureFlags.ts @@ -0,0 +1,81 @@ +/** + * Feature flags for controlling gradual rollout of new features + * These flags can be toggled to enable/disable features without code changes + */ + +interface FeatureFlags { + /** + * Enable unified database service for both regular chats and agent sessions + * When enabled, uses the new DbService facade pattern + * When disabled, uses the original implementation with conditional checks + */ + USE_UNIFIED_DB_SERVICE: boolean +} + +/** + * Default feature flag values + * Set to false initially for safe rollout + */ +export const featureFlags: FeatureFlags = { + USE_UNIFIED_DB_SERVICE: false +} + +/** + * Override feature flags from environment or local storage + * Priority order (highest to lowest): + * 1. localStorage (runtime overrides) + * 2. Environment variables (build-time config) + * 3. Default values + */ +export function initializeFeatureFlags(): void { + // First, check environment variables (build-time configuration) + // In Vite, env vars must be prefixed with VITE_ to be exposed to the client + // Usage: VITE_USE_UNIFIED_DB_SERVICE=true yarn dev + if (import.meta.env?.VITE_USE_UNIFIED_DB_SERVICE === 'true') { + featureFlags.USE_UNIFIED_DB_SERVICE = true + console.log('[FeatureFlags] USE_UNIFIED_DB_SERVICE enabled via environment variable') + } + + // Then check localStorage for runtime overrides (higher priority) + // This allows toggling features without rebuilding + try { + const localOverrides = localStorage.getItem('featureFlags') + if (localOverrides) { + const overrides = JSON.parse(localOverrides) + Object.keys(overrides).forEach((key) => { + if (key in featureFlags) { + featureFlags[key as keyof FeatureFlags] = overrides[key] + console.log(`[FeatureFlags] ${key} set to ${overrides[key]} via localStorage`) + } + }) + } + } catch (e) { + console.warn('[FeatureFlags] Failed to parse feature flags from localStorage:', e) + } + + console.log('[FeatureFlags] Current flags:', featureFlags) +} + +/** + * Update a feature flag value at runtime + * Useful for A/B testing or gradual rollout + */ +export function setFeatureFlag(flag: keyof FeatureFlags, value: boolean): void { + featureFlags[flag] = value + + // Persist to localStorage for consistency across app restarts + const currentFlags = localStorage.getItem('featureFlags') + const flags = currentFlags ? JSON.parse(currentFlags) : {} + flags[flag] = value + localStorage.setItem('featureFlags', JSON.stringify(flags)) +} + +/** + * Get current value of a feature flag + */ +export function getFeatureFlag(flag: keyof FeatureFlags): boolean { + return featureFlags[flag] +} + +// Initialize on import +initializeFeatureFlags() diff --git a/src/renderer/src/pages/home/Messages/AgentSessionMessages.tsx b/src/renderer/src/pages/home/Messages/AgentSessionMessages.tsx index 9e54b550ef..85a4afce93 100644 --- a/src/renderer/src/pages/home/Messages/AgentSessionMessages.tsx +++ b/src/renderer/src/pages/home/Messages/AgentSessionMessages.tsx @@ -2,11 +2,12 @@ import { loggerService } from '@logger' import ContextMenu from '@renderer/components/ContextMenu' import { useSession } from '@renderer/hooks/agents/useSession' import { getGroupedMessages } from '@renderer/services/MessagesService' -import { useAppSelector } from '@renderer/store' +import { useAppDispatch, useAppSelector } from '@renderer/store' import { selectMessagesForTopic } from '@renderer/store/newMessage' +import { loadTopicMessagesThunk } from '@renderer/store/thunk/messageThunk' import { Topic } from '@renderer/types' import { buildAgentSessionTopicId } from '@renderer/utils/agentSession' -import { memo, useMemo } from 'react' +import { memo, useEffect, useMemo } from 'react' import styled from 'styled-components' import MessageGroup from './MessageGroup' @@ -21,10 +22,19 @@ type Props = { } const AgentSessionMessages: React.FC = ({ agentId, sessionId }) => { + const dispatch = useAppDispatch() const { session } = useSession(agentId, sessionId) const sessionTopicId = useMemo(() => buildAgentSessionTopicId(sessionId), [sessionId]) const messages = useAppSelector((state) => selectMessagesForTopic(state, sessionTopicId)) + // Load messages when session changes + useEffect(() => { + if (sessionId) { + logger.info('Loading messages for agent session', { sessionId }) + dispatch(loadTopicMessagesThunk(sessionTopicId, true)) // Force reload to get latest from backend + } + }, [dispatch, sessionId, sessionTopicId]) + const displayMessages = useMemo(() => { if (!messages || messages.length === 0) return [] return [...messages].reverse() diff --git a/src/renderer/src/services/db/AgentMessageDataSource.ts b/src/renderer/src/services/db/AgentMessageDataSource.ts new file mode 100644 index 0000000000..04b7054f5b --- /dev/null +++ b/src/renderer/src/services/db/AgentMessageDataSource.ts @@ -0,0 +1,262 @@ +import { loggerService } from '@logger' +import type { Topic } from '@renderer/types' +import type { AgentPersistedMessage } from '@renderer/types/agent' +import type { Message, MessageBlock } from '@renderer/types/newMessage' +import { IpcChannel } from '@shared/IpcChannel' + +import type { MessageDataSource, MessageExchange } from './types' +import { extractSessionId } from './types' + +const logger = loggerService.withContext('AgentMessageDataSource') + +/** + * IPC-based implementation of MessageDataSource + * Handles agent session messages through backend communication + */ +export class AgentMessageDataSource implements MessageDataSource { + // ============ Read Operations ============ + + async fetchMessages(topicId: string): Promise<{ + messages: Message[] + blocks: MessageBlock[] + }> { + try { + const sessionId = extractSessionId(topicId) + + if (!window.electron?.ipcRenderer) { + logger.warn('IPC renderer not available') + return { messages: [], blocks: [] } + } + + // Fetch from agent backend + const historicalMessages: AgentPersistedMessage[] = await window.electron.ipcRenderer.invoke( + IpcChannel.AgentMessage_GetHistory, + { sessionId } + ) + + if (!historicalMessages || !Array.isArray(historicalMessages)) { + return { messages: [], blocks: [] } + } + + const messages: Message[] = [] + const blocks: MessageBlock[] = [] + + for (const persistedMsg of historicalMessages) { + if (persistedMsg?.message) { + messages.push(persistedMsg.message) + if (persistedMsg.blocks && persistedMsg.blocks.length > 0) { + blocks.push(...persistedMsg.blocks) + } + } + } + + logger.info(`Loaded ${messages.length} messages for agent session ${sessionId}`) + + return { messages, blocks } + } catch (error) { + logger.error(`Failed to fetch messages for agent session ${topicId}:`, error as Error) + throw error + } + } + + // ============ Write Operations ============ + + async persistExchange(topicId: string, exchange: MessageExchange): Promise { + try { + const sessionId = extractSessionId(topicId) + + if (!window.electron?.ipcRenderer) { + logger.warn('IPC renderer not available for persist exchange') + return + } + + const payload: any = { + sessionId, + agentSessionId: exchange.agentSessionId || '' + } + + // Prepare user payload + if (exchange.user) { + payload.user = { + payload: { + message: exchange.user.message, + blocks: exchange.user.blocks + } + } + } + + // Prepare assistant payload + if (exchange.assistant) { + payload.assistant = { + payload: { + message: exchange.assistant.message, + blocks: exchange.assistant.blocks + } + } + } + + await window.electron.ipcRenderer.invoke(IpcChannel.AgentMessage_PersistExchange, payload) + + logger.info(`Persisted exchange for agent session ${sessionId}`) + } catch (error) { + logger.error(`Failed to persist exchange for agent session ${topicId}:`, error as Error) + throw error + } + } + + async appendMessage(topicId: string, message: Message, blocks: MessageBlock[], insertIndex?: number): Promise { + // For agent sessions, messages are persisted through persistExchange + // This method might be called for user messages before the exchange + // We'll store them temporarily in memory or skip for now + logger.info(`appendMessage called for agent session ${topicId}, deferring to persistExchange`) + + // In a full implementation, you might want to: + // 1. Store temporarily in Redux only + // 2. Or call a specific backend endpoint for single messages + } + + async updateMessage(topicId: string, messageId: string, updates: Partial): Promise { + // Agent session messages are immutable once persisted + logger.warn(`updateMessage called for agent session ${topicId}, operation not supported`) + + // In a full implementation, you might want to: + // 1. Update in Redux only for UI consistency + // 2. Or implement a backend endpoint for message updates + } + + async updateMessageAndBlocks( + topicId: string, + messageUpdates: Partial & Pick, + blocksToUpdate: MessageBlock[] + ): Promise { + // Agent session messages and blocks are immutable once persisted + logger.warn(`updateMessageAndBlocks called for agent session ${topicId}, operation not supported`) + } + + async deleteMessage(topicId: string, messageId: string): Promise { + // Agent session messages cannot be deleted individually + logger.warn(`deleteMessage called for agent session ${topicId}, operation not supported`) + + // In a full implementation, you might want to: + // 1. Implement soft delete in backend + // 2. Or just hide from UI without actual deletion + } + + async deleteMessagesByAskId(topicId: string, askId: string): Promise { + // Agent session messages cannot be deleted + logger.warn(`deleteMessagesByAskId called for agent session ${topicId}, operation not supported`) + } + + // ============ Block Operations ============ + + async updateBlocks(blocks: MessageBlock[]): Promise { + // Blocks are updated through persistExchange for agent sessions + logger.warn('updateBlocks called for agent session, operation not supported individually') + } + + async deleteBlocks(blockIds: string[]): Promise { + // Blocks cannot be deleted individually for agent sessions + logger.warn('deleteBlocks called for agent session, operation not supported') + } + + // ============ Batch Operations ============ + + async clearMessages(topicId: string): Promise { + const sessionId = extractSessionId(topicId) + + if (!window.electron?.ipcRenderer) { + logger.warn('IPC renderer not available for clear messages') + return + } + + // In a full implementation, you would call a backend endpoint to clear session + // For now, we'll just log the attempt + logger.info(`Clear messages requested for agent session ${sessionId}`) + + // You might want to implement: + // await window.electron.ipcRenderer.invoke( + // IpcChannel.AgentMessage_ClearSession, + // { sessionId } + // ) + } + + async topicExists(topicId: string): Promise { + try { + const sessionId = extractSessionId(topicId) + + if (!window.electron?.ipcRenderer) { + return false + } + + // Check if session exists by trying to fetch messages + // In a full implementation, you'd have a dedicated endpoint + const messages = await this.fetchMessages(topicId) + return true // If no error thrown, session exists + } catch (error) { + return false + } + } + + async ensureTopic(topicId: string): Promise { + // Agent sessions are created externally, not by the chat interface + // This is a no-op for agent sessions + const sessionId = extractSessionId(topicId) + logger.info(`ensureTopic called for agent session ${sessionId}, no action needed`) + } + + async fetchTopic(topicId: string): Promise { + try { + const sessionId = extractSessionId(topicId) + + // For agent sessions, we construct a synthetic topic + // In a real implementation, you might fetch session metadata from backend + return { + id: topicId, + name: `Session ${sessionId}`, + assistantId: 'agent', + createdAt: new Date().toISOString(), + updatedAt: new Date().toISOString(), + messages: [] // Messages are fetched separately + } as Topic + } catch (error) { + logger.error(`Failed to fetch topic for agent session ${topicId}:`, error as Error) + throw error + } + } + + async getRawTopic(topicId: string): Promise<{ id: string; messages: Message[] } | undefined> { + try { + // For agent sessions, fetch messages from backend and return in raw topic format + const { messages } = await this.fetchMessages(topicId) + return { + id: topicId, + messages + } + } catch (error) { + logger.error(`Failed to get raw topic for agent session ${topicId}:`, error as Error) + return undefined + } + } + + // ============ Additional Methods for Interface Compatibility ============ + + async updateSingleBlock(blockId: string, updates: Partial): Promise { + // Agent session blocks are immutable once persisted + logger.warn(`updateSingleBlock called for agent session block ${blockId}, operation not supported`) + } + + async bulkAddBlocks(blocks: MessageBlock[]): Promise { + // Agent session blocks are added through persistExchange + logger.warn(`bulkAddBlocks called for agent session, operation not supported individually`) + } + + async updateFileCount(fileId: string, delta: number): Promise { + // Agent sessions don't manage file reference counts locally + logger.warn(`updateFileCount called for agent session file ${fileId}, operation not supported`) + } + + async updateFileCounts(files: Array<{ id: string; delta: number }>): Promise { + // Agent sessions don't manage file reference counts locally + logger.warn(`updateFileCounts called for agent session, operation not supported`) + } +} diff --git a/src/renderer/src/services/db/DbService.ts b/src/renderer/src/services/db/DbService.ts new file mode 100644 index 0000000000..95afda014c --- /dev/null +++ b/src/renderer/src/services/db/DbService.ts @@ -0,0 +1,213 @@ +import { loggerService } from '@logger' +import type { Topic } from '@renderer/types' +import type { Message, MessageBlock } from '@renderer/types/newMessage' + +import { AgentMessageDataSource } from './AgentMessageDataSource' +import { DexieMessageDataSource } from './DexieMessageDataSource' +import type { MessageDataSource, MessageExchange } from './types' +import { isAgentSessionTopicId } from './types' + +const logger = loggerService.withContext('DbService') + +/** + * Facade service that routes data operations to the appropriate data source + * based on the topic ID type (regular chat or agent session) + */ +class DbService implements MessageDataSource { + private static instance: DbService + private dexieSource: DexieMessageDataSource + private agentSource: AgentMessageDataSource + + private constructor() { + this.dexieSource = new DexieMessageDataSource() + this.agentSource = new AgentMessageDataSource() + } + + /** + * Get singleton instance + */ + static getInstance(): DbService { + if (!DbService.instance) { + DbService.instance = new DbService() + } + return DbService.instance + } + + /** + * Determine which data source to use based on topic ID + */ + private getDataSource(topicId: string): MessageDataSource { + if (isAgentSessionTopicId(topicId)) { + logger.silly(`Using AgentMessageDataSource for topic ${topicId}`) + return this.agentSource + } + + // Future: Could add more data source types here + // e.g., if (isCloudTopicId(topicId)) return this.cloudSource + + logger.silly(`Using DexieMessageDataSource for topic ${topicId}`) + return this.dexieSource + } + + // ============ Read Operations ============ + + async fetchMessages( + topicId: string, + forceReload?: boolean + ): Promise<{ + messages: Message[] + blocks: MessageBlock[] + }> { + const source = this.getDataSource(topicId) + return source.fetchMessages(topicId, forceReload) + } + + async fetchTopic(topicId: string): Promise { + const source = this.getDataSource(topicId) + return source.fetchTopic(topicId) + } + + // ============ Write Operations ============ + + async persistExchange(topicId: string, exchange: MessageExchange): Promise { + const source = this.getDataSource(topicId) + return source.persistExchange(topicId, exchange) + } + + async appendMessage(topicId: string, message: Message, blocks: MessageBlock[], insertIndex?: number): Promise { + const source = this.getDataSource(topicId) + return source.appendMessage(topicId, message, blocks, insertIndex) + } + + async updateMessage(topicId: string, messageId: string, updates: Partial): Promise { + const source = this.getDataSource(topicId) + return source.updateMessage(topicId, messageId, updates) + } + + async updateMessageAndBlocks( + topicId: string, + messageUpdates: Partial & Pick, + blocksToUpdate: MessageBlock[] + ): Promise { + const source = this.getDataSource(topicId) + return source.updateMessageAndBlocks(topicId, messageUpdates, blocksToUpdate) + } + + async deleteMessage(topicId: string, messageId: string): Promise { + const source = this.getDataSource(topicId) + return source.deleteMessage(topicId, messageId) + } + + async deleteMessagesByAskId(topicId: string, askId: string): Promise { + const source = this.getDataSource(topicId) + return source.deleteMessagesByAskId(topicId, askId) + } + + // ============ Block Operations ============ + + async updateBlocks(blocks: MessageBlock[]): Promise { + // For block operations, we need to infer the source from the first block's message + // This is a limitation of the current design where blocks don't have topicId + // In practice, blocks are usually updated in context of a topic operation + + // Default to Dexie for now since agent blocks are updated through persistExchange + return this.dexieSource.updateBlocks(blocks) + } + + async deleteBlocks(blockIds: string[]): Promise { + // Similar limitation as updateBlocks + // Default to Dexie since agent blocks can't be deleted individually + return this.dexieSource.deleteBlocks(blockIds) + } + + // ============ Batch Operations ============ + + async clearMessages(topicId: string): Promise { + const source = this.getDataSource(topicId) + return source.clearMessages(topicId) + } + + async topicExists(topicId: string): Promise { + const source = this.getDataSource(topicId) + return source.topicExists(topicId) + } + + async ensureTopic(topicId: string): Promise { + const source = this.getDataSource(topicId) + return source.ensureTopic(topicId) + } + + // ============ Optional Methods (with fallback) ============ + + async getRawTopic(topicId: string): Promise<{ id: string; messages: Message[] } | undefined> { + const source = this.getDataSource(topicId) + if (source.getRawTopic) { + return source.getRawTopic(topicId) + } + // Fallback: fetch using fetchTopic and extract messages + const topic = await source.fetchTopic(topicId) + return topic ? { id: topic.id, messages: topic.messages } : undefined + } + + async updateSingleBlock(blockId: string, updates: Partial): Promise { + // For single block operations, default to Dexie since agent blocks are immutable + if (this.dexieSource.updateSingleBlock) { + return this.dexieSource.updateSingleBlock(blockId, updates) + } + // Fallback to updateBlocks with single item + return this.dexieSource.updateBlocks([{ ...updates, id: blockId } as MessageBlock]) + } + + async bulkAddBlocks(blocks: MessageBlock[]): Promise { + // For bulk add operations, default to Dexie since agent blocks use persistExchange + if (this.dexieSource.bulkAddBlocks) { + return this.dexieSource.bulkAddBlocks(blocks) + } + // Fallback to updateBlocks + return this.dexieSource.updateBlocks(blocks) + } + + async updateFileCount(fileId: string, delta: number): Promise { + // File operations only apply to Dexie source + if (this.dexieSource.updateFileCount) { + return this.dexieSource.updateFileCount(fileId, delta) + } + // No-op if not supported + logger.warn(`updateFileCount not supported for file ${fileId}`) + } + + async updateFileCounts(files: Array<{ id: string; delta: number }>): Promise { + // File operations only apply to Dexie source + if (this.dexieSource.updateFileCounts) { + return this.dexieSource.updateFileCounts(files) + } + // No-op if not supported + logger.warn(`updateFileCounts not supported`) + } + + // ============ Utility Methods ============ + + /** + * Check if a topic is an agent session + */ + isAgentSession(topicId: string): boolean { + return isAgentSessionTopicId(topicId) + } + + /** + * Get the data source type for a topic + */ + getSourceType(topicId: string): 'dexie' | 'agent' | 'unknown' { + if (isAgentSessionTopicId(topicId)) { + return 'agent' + } + // Add more checks for other source types as needed + return 'dexie' + } +} + +// Export singleton instance +export const dbService = DbService.getInstance() + +// Also export class for testing purposes +export { DbService } diff --git a/src/renderer/src/services/db/DexieMessageDataSource.ts b/src/renderer/src/services/db/DexieMessageDataSource.ts new file mode 100644 index 0000000000..9d02387254 --- /dev/null +++ b/src/renderer/src/services/db/DexieMessageDataSource.ts @@ -0,0 +1,438 @@ +import { loggerService } from '@logger' +import db from '@renderer/databases' +import FileManager from '@renderer/services/FileManager' +import store from '@renderer/store' +import { updateTopicUpdatedAt } from '@renderer/store/assistants' +import type { Message, MessageBlock } from '@renderer/types/newMessage' +import { isEmpty } from 'lodash' + +import type { MessageDataSource, MessageExchange } from './types' + +const logger = loggerService.withContext('DexieMessageDataSource') + +/** + * Dexie-based implementation of MessageDataSource + * Handles local IndexedDB storage for regular chat messages + */ +export class DexieMessageDataSource implements MessageDataSource { + // ============ Read Operations ============ + + async fetchMessages(topicId: string): Promise<{ + messages: Message[] + blocks: MessageBlock[] + }> { + try { + const topic = await db.topics.get(topicId) + const messages = topic?.messages || [] + + if (messages.length === 0) { + return { messages: [], blocks: [] } + } + + const messageIds = messages.map((m) => m.id) + const blocks = await db.message_blocks.where('messageId').anyOf(messageIds).toArray() + + // Ensure block IDs are strings for consistency + const messagesWithBlockIds = messages.map((m) => ({ + ...m, + blocks: m.blocks?.map(String) || [] + })) + + return { messages: messagesWithBlockIds, blocks: blocks || [] } + } catch (error) { + logger.error(`Failed to fetch messages for topic ${topicId}:`, error as Error) + throw error + } + } + + async getRawTopic(topicId: string): Promise<{ id: string; messages: Message[] } | undefined> { + try { + return await db.topics.get(topicId) + } catch (error) { + logger.error(`Failed to get raw topic ${topicId}:`, error as Error) + throw error + } + } + + // ============ Write Operations ============ + + async persistExchange(topicId: string, exchange: MessageExchange): Promise { + try { + await db.transaction('rw', db.topics, db.message_blocks, async () => { + const topic = await db.topics.get(topicId) + if (!topic) { + throw new Error(`Topic ${topicId} not found`) + } + + const updatedMessages = [...topic.messages] + const blocksToSave: MessageBlock[] = [] + + // Handle user message + if (exchange.user) { + const userIndex = updatedMessages.findIndex((m) => m.id === exchange.user!.message.id) + if (userIndex !== -1) { + updatedMessages[userIndex] = exchange.user.message + } else { + updatedMessages.push(exchange.user.message) + } + if (exchange.user.blocks.length > 0) { + blocksToSave.push(...exchange.user.blocks) + } + } + + // Handle assistant message + if (exchange.assistant) { + const assistantIndex = updatedMessages.findIndex((m) => m.id === exchange.assistant!.message.id) + if (assistantIndex !== -1) { + updatedMessages[assistantIndex] = exchange.assistant.message + } else { + updatedMessages.push(exchange.assistant.message) + } + if (exchange.assistant.blocks.length > 0) { + blocksToSave.push(...exchange.assistant.blocks) + } + } + + // Save blocks + if (blocksToSave.length > 0) { + await db.message_blocks.bulkPut(blocksToSave) + } + + // Update topic with new messages + await db.topics.update(topicId, { messages: updatedMessages }) + }) + + // Update Redux state + store.dispatch(updateTopicUpdatedAt({ topicId })) + } catch (error) { + logger.error(`Failed to persist exchange for topic ${topicId}:`, error as Error) + throw error + } + } + + async appendMessage(topicId: string, message: Message, blocks: MessageBlock[], insertIndex?: number): Promise { + try { + await db.transaction('rw', db.topics, db.message_blocks, async () => { + // Save blocks first + if (blocks.length > 0) { + await db.message_blocks.bulkPut(blocks) + } + + // Get or create topic + let topic = await db.topics.get(topicId) + if (!topic) { + await db.topics.add({ id: topicId, messages: [] }) + topic = await db.topics.get(topicId) + } + + if (!topic) { + throw new Error(`Failed to create topic ${topicId}`) + } + + const updatedMessages = [...(topic.messages || [])] + + // Check if message already exists + const existingIndex = updatedMessages.findIndex((m) => m.id === message.id) + if (existingIndex !== -1) { + updatedMessages[existingIndex] = message + } else { + // Insert at specific index or append + if (insertIndex !== undefined && insertIndex >= 0 && insertIndex <= updatedMessages.length) { + updatedMessages.splice(insertIndex, 0, message) + } else { + updatedMessages.push(message) + } + } + + await db.topics.update(topicId, { messages: updatedMessages }) + }) + + store.dispatch(updateTopicUpdatedAt({ topicId })) + } catch (error) { + logger.error(`Failed to append message to topic ${topicId}:`, error as Error) + throw error + } + } + + async updateMessage(topicId: string, messageId: string, updates: Partial): Promise { + try { + await db.transaction('rw', db.topics, async () => { + await db.topics + .where('id') + .equals(topicId) + .modify((topic) => { + if (!topic || !topic.messages) return + + const messageIndex = topic.messages.findIndex((m) => m.id === messageId) + if (messageIndex !== -1) { + Object.assign(topic.messages[messageIndex], updates) + } + }) + }) + + store.dispatch(updateTopicUpdatedAt({ topicId })) + } catch (error) { + logger.error(`Failed to update message ${messageId} in topic ${topicId}:`, error as Error) + throw error + } + } + + async updateMessageAndBlocks( + topicId: string, + messageUpdates: Partial & Pick, + blocksToUpdate: MessageBlock[] + ): Promise { + try { + await db.transaction('rw', db.topics, db.message_blocks, async () => { + // Update blocks + if (blocksToUpdate.length > 0) { + await db.message_blocks.bulkPut(blocksToUpdate) + } + + // Update message if there are actual changes beyond id and topicId + const keysToUpdate = Object.keys(messageUpdates).filter((key) => key !== 'id' && key !== 'topicId') + if (keysToUpdate.length > 0) { + await db.topics + .where('id') + .equals(topicId) + .modify((topic) => { + if (!topic || !topic.messages) return + + const messageIndex = topic.messages.findIndex((m) => m.id === messageUpdates.id) + if (messageIndex !== -1) { + keysToUpdate.forEach((key) => { + ;(topic.messages[messageIndex] as any)[key] = (messageUpdates as any)[key] + }) + } + }) + } + }) + + store.dispatch(updateTopicUpdatedAt({ topicId })) + } catch (error) { + logger.error(`Failed to update message and blocks for ${messageUpdates.id}:`, error as Error) + throw error + } + } + + async deleteMessage(topicId: string, messageId: string): Promise { + try { + await db.transaction('rw', db.topics, db.message_blocks, db.files, async () => { + const topic = await db.topics.get(topicId) + if (!topic) return + + const messageIndex = topic.messages.findIndex((m) => m.id === messageId) + if (messageIndex === -1) return + + const message = topic.messages[messageIndex] + const blockIds = message.blocks || [] + + // Delete blocks and handle files + if (blockIds.length > 0) { + const blocks = await db.message_blocks.where('id').anyOf(blockIds).toArray() + const files = blocks + .filter((block) => block.type === 'file' || block.type === 'image') + .map((block: any) => block.file) + .filter((file) => file !== undefined) + + // Clean up files + if (!isEmpty(files)) { + await Promise.all(files.map((file) => FileManager.deleteFile(file.id, false))) + } + + await db.message_blocks.bulkDelete(blockIds) + } + + // Remove message from topic + topic.messages.splice(messageIndex, 1) + await db.topics.update(topicId, { messages: topic.messages }) + }) + + store.dispatch(updateTopicUpdatedAt({ topicId })) + } catch (error) { + logger.error(`Failed to delete message ${messageId} from topic ${topicId}:`, error as Error) + throw error + } + } + + async deleteMessagesByAskId(topicId: string, askId: string): Promise { + try { + await db.transaction('rw', db.topics, db.message_blocks, db.files, async () => { + const topic = await db.topics.get(topicId) + if (!topic) return + + // Find all messages with the given askId + const messagesToDelete = topic.messages.filter((m) => m.askId === askId || m.id === askId) + const blockIdsToDelete = messagesToDelete.flatMap((m) => m.blocks || []) + + // Delete blocks and handle files + if (blockIdsToDelete.length > 0) { + const blocks = await db.message_blocks.where('id').anyOf(blockIdsToDelete).toArray() + const files = blocks + .filter((block) => block.type === 'file' || block.type === 'image') + .map((block: any) => block.file) + .filter((file) => file !== undefined) + + if (!isEmpty(files)) { + await Promise.all(files.map((file) => FileManager.deleteFile(file.id, false))) + } + + await db.message_blocks.bulkDelete(blockIdsToDelete) + } + + // Filter out deleted messages + const remainingMessages = topic.messages.filter((m) => m.askId !== askId && m.id !== askId) + await db.topics.update(topicId, { messages: remainingMessages }) + }) + + store.dispatch(updateTopicUpdatedAt({ topicId })) + } catch (error) { + logger.error(`Failed to delete messages with askId ${askId} from topic ${topicId}:`, error as Error) + throw error + } + } + + // ============ Block Operations ============ + + async updateBlocks(blocks: MessageBlock[]): Promise { + try { + if (blocks.length === 0) return + await db.message_blocks.bulkPut(blocks) + } catch (error) { + logger.error('Failed to update blocks:', error as Error) + throw error + } + } + + async updateSingleBlock(blockId: string, updates: Partial): Promise { + try { + await db.message_blocks.update(blockId, updates) + } catch (error) { + logger.error(`Failed to update block ${blockId}:`, error as Error) + throw error + } + } + + async bulkAddBlocks(blocks: MessageBlock[]): Promise { + try { + if (blocks.length === 0) return + await db.message_blocks.bulkAdd(blocks) + } catch (error) { + logger.error('Failed to bulk add blocks:', error as Error) + throw error + } + } + + async deleteBlocks(blockIds: string[]): Promise { + try { + if (blockIds.length === 0) return + + // Get blocks to find associated files + const blocks = await db.message_blocks.where('id').anyOf(blockIds).toArray() + const files = blocks + .filter((block) => block.type === 'file' || block.type === 'image') + .map((block: any) => block.file) + .filter((file) => file !== undefined) + + // Clean up files + if (!isEmpty(files)) { + await Promise.all(files.map((file) => FileManager.deleteFile(file.id, false))) + } + + await db.message_blocks.bulkDelete(blockIds) + } catch (error) { + logger.error('Failed to delete blocks:', error as Error) + throw error + } + } + + // ============ Batch Operations ============ + + async clearMessages(topicId: string): Promise { + try { + await db.transaction('rw', db.topics, db.message_blocks, db.files, async () => { + const topic = await db.topics.get(topicId) + if (!topic) return + + // Get all block IDs + const blockIds = topic.messages.flatMap((m) => m.blocks || []) + + // Delete blocks and handle files + if (blockIds.length > 0) { + const blocks = await db.message_blocks.where('id').anyOf(blockIds).toArray() + const files = blocks + .filter((block) => block.type === 'file' || block.type === 'image') + .map((block: any) => block.file) + .filter((file) => file !== undefined) + + if (!isEmpty(files)) { + await Promise.all(files.map((file) => FileManager.deleteFile(file.id, false))) + } + + await db.message_blocks.bulkDelete(blockIds) + } + + // Clear messages + await db.topics.update(topicId, { messages: [] }) + }) + + store.dispatch(updateTopicUpdatedAt({ topicId })) + } catch (error) { + logger.error(`Failed to clear messages for topic ${topicId}:`, error as Error) + throw error + } + } + + async topicExists(topicId: string): Promise { + try { + const topic = await db.topics.get(topicId) + return !!topic + } catch (error) { + logger.error(`Failed to check if topic ${topicId} exists:`, error as Error) + return false + } + } + + async ensureTopic(topicId: string): Promise { + try { + const exists = await this.topicExists(topicId) + if (!exists) { + await db.topics.add({ id: topicId, messages: [] }) + } + } catch (error) { + logger.error(`Failed to ensure topic ${topicId} exists:`, error as Error) + throw error + } + } + + // ============ File Operations ============ + + async updateFileCount(fileId: string, delta: number): Promise { + try { + await db.files + .where('id') + .equals(fileId) + .modify((f) => { + if (f) { + f.count = (f.count || 0) + delta + } + }) + } catch (error) { + logger.error(`Failed to update file count for ${fileId}:`, error as Error) + throw error + } + } + + async updateFileCounts(files: Array<{ id: string; delta: number }>): Promise { + try { + await db.transaction('rw', db.files, async () => { + for (const file of files) { + await this.updateFileCount(file.id, file.delta) + } + }) + } catch (error) { + logger.error('Failed to update file counts:', error as Error) + throw error + } + } +} diff --git a/src/renderer/src/services/db/README.md b/src/renderer/src/services/db/README.md new file mode 100644 index 0000000000..930f33c06b --- /dev/null +++ b/src/renderer/src/services/db/README.md @@ -0,0 +1,89 @@ +# Unified Data Access Layer + +This module provides a unified interface for accessing message data from different sources: +- **DexieMessageDataSource**: Local IndexedDB storage for regular chat messages +- **AgentMessageDataSource**: Backend IPC storage for agent session messages + +## Architecture + +``` +dbService (Facade) + ├── Determines data source based on topicId + ├── Routes to DexieMessageDataSource (regular chats) + └── Routes to AgentMessageDataSource (agent sessions) +``` + +## Usage + +```typescript +import { dbService } from '@renderer/services/db' + +// Fetch messages (automatically routes to correct source) +const { messages, blocks } = await dbService.fetchMessages(topicId) + +// Save a message exchange +await dbService.persistExchange(topicId, { + user: { message: userMsg, blocks: userBlocks }, + assistant: { message: assistantMsg, blocks: assistantBlocks } +}) + +// Append a single message +await dbService.appendMessage(topicId, message, blocks) + +// Check if topic exists +const exists = await dbService.topicExists(topicId) +``` + +## Topic ID Convention + +- Regular chat topics: Any string ID (e.g., "uuid-1234-5678") +- Agent session topics: Prefixed with "agent-session:" (e.g., "agent-session:session-123") + +## Key Features + +1. **Transparent Routing**: The facade automatically routes to the appropriate data source +2. **Consistent API**: Same methods work for both regular chats and agent sessions +3. **Type Safety**: Full TypeScript support with proper interfaces +4. **Error Handling**: Comprehensive error logging and propagation +5. **Extensibility**: Easy to add new data sources (e.g., cloud storage) + +## Implementation Status + +### DexieMessageDataSource ✅ +- Full CRUD operations for messages and blocks +- Transaction support +- File cleanup on deletion +- Redux state updates + +### AgentMessageDataSource ✅ +- Fetch messages from backend +- Persist message exchanges +- Limited update/delete operations (by design) +- IPC communication with backend + +## Migration Guide + +### Before (Direct DB access): +```typescript +// In thunks +if (isAgentSessionTopicId(topicId)) { + // Special handling for agent sessions + const messages = await window.electron.ipcRenderer.invoke(...) +} else { + // Regular DB access + const topic = await db.topics.get(topicId) +} +``` + +### After (Unified access): +```typescript +// In thunks +const { messages, blocks } = await dbService.fetchMessages(topicId) +// No need to check topic type! +``` + +## Next Steps + +Phase 2: Update Redux thunks to use dbService +Phase 3: Update components to use unified hooks +Phase 4: Remove AgentSessionMessages component \ No newline at end of file diff --git a/src/renderer/src/services/db/ROLLBACK.md b/src/renderer/src/services/db/ROLLBACK.md new file mode 100644 index 0000000000..377b24626d --- /dev/null +++ b/src/renderer/src/services/db/ROLLBACK.md @@ -0,0 +1,206 @@ +# Rollback Strategy for Unified Database Service Migration + +## Overview +This document outlines the rollback procedures for the unified database service migration. The migration uses feature flags to enable gradual rollout and quick rollback capabilities. + +## Quick Rollback (< 1 minute) + +### Via Browser Console +```javascript +// Disable the unified DB service immediately +localStorage.setItem('featureFlags', JSON.stringify({ USE_UNIFIED_DB_SERVICE: false })) +location.reload() +``` + +### Via Code (Emergency) +```typescript +// In src/renderer/src/config/featureFlags.ts +export const featureFlags: FeatureFlags = { + USE_UNIFIED_DB_SERVICE: false // Change from true to false +} +``` + +## Rollback Triggers + +Monitor these indicators to determine if rollback is needed: + +### Critical Issues (Immediate Rollback) +- [ ] Data loss or corruption +- [ ] Application crashes on startup +- [ ] Complete failure to load messages +- [ ] Agent sessions completely broken +- [ ] Performance degradation > 50% + +### Major Issues (Rollback within 1 hour) +- [ ] Intermittent message loading failures (> 10% error rate) +- [ ] Memory leaks detected +- [ ] Performance degradation 20-50% +- [ ] File upload/attachment issues +- [ ] Message editing/deletion not working + +### Minor Issues (Consider Rollback) +- [ ] Performance degradation < 20% +- [ ] UI glitches or inconsistencies +- [ ] Non-critical features affected +- [ ] Increased error logs but functionality intact + +## Rollback Procedures + +### Level 1: Feature Flag Toggle (Immediate) +**When:** Any critical issue detected +**Time:** < 1 minute +**Data Impact:** None + +1. Set feature flag to false: + ```javascript + localStorage.setItem('featureFlags', JSON.stringify({ USE_UNIFIED_DB_SERVICE: false })) + ``` +2. Reload application +3. Verify original functionality restored +4. Alert team about rollback + +### Level 2: Code Revert (Quick) +**When:** Feature flag not sufficient or broken +**Time:** < 5 minutes +**Data Impact:** None + +1. Revert to previous commit: + ```bash + git revert HEAD # If just deployed + # or + git checkout + ``` +2. Rebuild and deploy: + ```bash + yarn build:check + yarn build + ``` +3. Test core functionality +4. Document issue for investigation + +### Level 3: Full Rollback (Planned) +**When:** Systemic issues discovered +**Time:** 30 minutes +**Data Impact:** Potential data migration needed + +1. Notify all stakeholders +2. Export any critical data if needed +3. Restore from backup branch: + ```bash + git checkout main + git branch -D feature/unified-db-service + git push origin --delete feature/unified-db-service + ``` +4. Clean up any migration artifacts: + - Remove `messageThunk.v2.ts` + - Remove `src/renderer/src/services/db/` if created + - Remove feature flags configuration +5. Run full test suite +6. Deploy clean version + +## Pre-Rollback Checklist + +Before initiating rollback: + +1. **Capture Current State** + - [ ] Export performance metrics + - [ ] Save error logs + - [ ] Document specific failure scenarios + - [ ] Note affected user percentage + +2. **Preserve Evidence** + - [ ] Take screenshots of errors + - [ ] Export browser console logs + - [ ] Save network traces if relevant + - [ ] Backup current localStorage + +3. **Communication** + - [ ] Notify development team + - [ ] Update status page if applicable + - [ ] Prepare user communication if needed + +## Post-Rollback Actions + +After successful rollback: + +1. **Verification** + - [ ] Test message loading (regular chat) + - [ ] Test agent sessions + - [ ] Verify file attachments work + - [ ] Check message editing/deletion + - [ ] Confirm no data loss + +2. **Investigation** + - [ ] Analyze performance metrics + - [ ] Review error logs + - [ ] Identify root cause + - [ ] Create bug report + +3. **Planning** + - [ ] Document lessons learned + - [ ] Update rollback procedures if needed + - [ ] Plan fixes for identified issues + - [ ] Schedule retry with fixes + +## Monitoring Commands + +### Check Feature Flag Status +```javascript +// In browser console +JSON.parse(localStorage.getItem('featureFlags') || '{}') +``` + +### View Performance Metrics +```javascript +// In browser console (if performance monitor is exposed) +performanceMonitor.getAllComparisons() +``` + +### Check Error Rate +```javascript +// Check application logs +loggerService.getLogs().filter(log => log.level === 'error' && log.context.includes('DbService')) +``` + +## Recovery Validation + +After rollback, validate system health: + +1. **Functional Tests** + ```bash + yarn test + yarn test:e2e # If available + ``` + +2. **Manual Validation** + - Create new chat conversation + - Send messages with attachments + - Edit existing messages + - Delete messages + - Start agent session + - Load historical messages + +3. **Performance Check** + - Message load time < 500ms + - No memory leaks after 10 minutes + - CPU usage normal + - Network requests successful + +## Emergency Contacts + +- **Tech Lead:** [Contact Info] +- **DevOps:** [Contact Info] +- **Product Owner:** [Contact Info] + +## Rollback History + +| Date | Version | Issue | Rollback Type | Resolution | +|------|---------|-------|---------------|------------| +| - | - | - | - | - | + +## Notes + +- Always prefer feature flag rollback first (least disruptive) +- Document any rollback in the history table above +- If multiple rollbacks needed, consider pausing migration +- Performance degradation baseline: original implementation metrics \ No newline at end of file diff --git a/src/renderer/src/services/db/index.ts b/src/renderer/src/services/db/index.ts new file mode 100644 index 0000000000..245a5b67e3 --- /dev/null +++ b/src/renderer/src/services/db/index.ts @@ -0,0 +1,19 @@ +/** + * Unified data access layer for messages + * Provides a consistent API for accessing messages from different sources + * (Dexie/IndexedDB for regular chats, IPC/Backend for agent sessions) + */ + +// Export main service +export { DbService,dbService } from './DbService' + +// Export types +export type { MessageDataSource, MessageExchange } from './types' +export { + buildAgentSessionTopicId, + extractSessionId, + isAgentSessionTopicId} from './types' + +// Export implementations (for testing or direct access if needed) +export { AgentMessageDataSource } from './AgentMessageDataSource' +export { DexieMessageDataSource } from './DexieMessageDataSource' diff --git a/src/renderer/src/services/db/types.ts b/src/renderer/src/services/db/types.ts new file mode 100644 index 0000000000..1c13067987 --- /dev/null +++ b/src/renderer/src/services/db/types.ts @@ -0,0 +1,145 @@ +import type { Message, MessageBlock } from '@renderer/types/newMessage' + +/** + * Message exchange data structure for persisting user-assistant conversations + */ +export interface MessageExchange { + user?: { + message: Message + blocks: MessageBlock[] + } + assistant?: { + message: Message + blocks: MessageBlock[] + } + // For agent sessions + agentSessionId?: string +} + +/** + * Unified interface for message data operations + * Implementations can be backed by Dexie, IPC, or other storage mechanisms + */ +export interface MessageDataSource { + // ============ Read Operations ============ + /** + * Fetch all messages and blocks for a topic + */ + fetchMessages( + topicId: string, + forceReload?: boolean + ): Promise<{ + messages: Message[] + blocks: MessageBlock[] + }> + + /** + * Get raw topic data (just id and messages) + */ + getRawTopic?(topicId: string): Promise<{ id: string; messages: Message[] } | undefined> + + // ============ Write Operations ============ + /** + * Persist a complete message exchange (user + assistant) + */ + persistExchange(topicId: string, exchange: MessageExchange): Promise + + /** + * Append a single message with its blocks + */ + appendMessage(topicId: string, message: Message, blocks: MessageBlock[], insertIndex?: number): Promise + + /** + * Update an existing message + */ + updateMessage(topicId: string, messageId: string, updates: Partial): Promise + + /** + * Update existing message and its blocks + */ + updateMessageAndBlocks( + topicId: string, + messageUpdates: Partial & Pick, + blocksToUpdate: MessageBlock[] + ): Promise + + /** + * Delete a single message and its blocks + */ + deleteMessage(topicId: string, messageId: string): Promise + + /** + * Delete messages by askId (user query + assistant responses) + */ + deleteMessagesByAskId(topicId: string, askId: string): Promise + + // ============ Block Operations ============ + /** + * Update multiple blocks + */ + updateBlocks(blocks: MessageBlock[]): Promise + + /** + * Update single block + */ + updateSingleBlock?(blockId: string, updates: Partial): Promise + + /** + * Bulk add blocks (for cloning operations) + */ + bulkAddBlocks?(blocks: MessageBlock[]): Promise + + /** + * Delete multiple blocks + */ + deleteBlocks(blockIds: string[]): Promise + + // ============ Batch Operations ============ + /** + * Clear all messages in a topic + */ + clearMessages(topicId: string): Promise + + /** + * Check if topic exists + */ + topicExists(topicId: string): Promise + + /** + * Create or ensure topic exists + */ + ensureTopic(topicId: string): Promise + + // ============ File Operations (Optional) ============ + + /** + * Update file reference count + */ + updateFileCount?(fileId: string, delta: number): Promise + + /** + * Update multiple file reference counts + */ + updateFileCounts?(files: Array<{ id: string; delta: number }>): Promise +} + +/** + * Type guard to check if a topic ID is for an agent session + */ +export function isAgentSessionTopicId(topicId: string): boolean { + return topicId.startsWith('agent-session:') +} + +/** + * Extract session ID from agent session topic ID + */ +export function extractSessionId(topicId: string): string { + return topicId.replace('agent-session:', '') +} + +/** + * Build agent session topic ID from session ID + */ +export function buildAgentSessionTopicId(sessionId: string): string { + return `agent-session:${sessionId}` +} diff --git a/src/renderer/src/store/thunk/messageThunk.ts b/src/renderer/src/store/thunk/messageThunk.ts index 30541832ea..c086b5fcad 100644 --- a/src/renderer/src/store/thunk/messageThunk.ts +++ b/src/renderer/src/store/thunk/messageThunk.ts @@ -1,5 +1,6 @@ import { loggerService } from '@logger' import { AiSdkToChunkAdapter } from '@renderer/aiCore/chunk/AiSdkToChunkAdapter' +import { featureFlags } from '@renderer/config/featureFlags' import db from '@renderer/databases' import FileManager from '@renderer/services/FileManager' import { BlockManager } from '@renderer/services/messageStreaming/BlockManager' @@ -16,7 +17,7 @@ import type { FileMessageBlock, ImageMessageBlock, Message, MessageBlock } from import { AssistantMessageStatus, MessageBlockStatus, MessageBlockType } from '@renderer/types/newMessage' import { uuid } from '@renderer/utils' import { addAbortController } from '@renderer/utils/abortController' -import { isAgentSessionTopicId } from '@renderer/utils/agentSession' +import { buildAgentSessionTopicId, isAgentSessionTopicId } from '@renderer/utils/agentSession' import { createAssistantMessage, createTranslationBlock, @@ -34,6 +35,7 @@ import { LRUCache } from 'lru-cache' import type { AppDispatch, RootState } from '../index' import { removeManyBlocks, updateOneBlock, upsertManyBlocks, upsertOneBlock } from '../messageBlock' import { newMessagesActions, selectMessagesForTopic } from '../newMessage' +import { loadTopicMessagesThunkV2 } from './messageThunk.v2' const logger = loggerService.withContext('MessageThunk') @@ -782,6 +784,52 @@ export const sendMessage = } } +/** + * Loads agent session messages from backend + */ +export const loadAgentSessionMessagesThunk = + (sessionId: string) => async (dispatch: AppDispatch, getState: () => RootState) => { + const topicId = buildAgentSessionTopicId(sessionId) + + try { + dispatch(newMessagesActions.setTopicLoading({ topicId, loading: true })) + + // Fetch from agent backend + const historicalMessages = await window.electron?.ipcRenderer.invoke(IpcChannel.AgentMessage_GetHistory, { + sessionId + }) + + if (historicalMessages && Array.isArray(historicalMessages)) { + const messages: Message[] = [] + const blocks: MessageBlock[] = [] + + for (const persistedMsg of historicalMessages) { + if (persistedMsg?.message) { + messages.push(persistedMsg.message) + if (persistedMsg.blocks && persistedMsg.blocks.length > 0) { + blocks.push(...persistedMsg.blocks) + } + } + } + + // Update Redux store + if (blocks.length > 0) { + dispatch(upsertManyBlocks(blocks)) + } + dispatch(newMessagesActions.messagesReceived({ topicId, messages })) + + logger.info(`Loaded ${messages.length} messages for agent session ${sessionId}`) + } else { + dispatch(newMessagesActions.messagesReceived({ topicId, messages: [] })) + } + } catch (error) { + logger.error(`Failed to load agent session messages for ${sessionId}:`, error as Error) + dispatch(newMessagesActions.messagesReceived({ topicId, messages: [] })) + } finally { + dispatch(newMessagesActions.setTopicLoading({ topicId, loading: false })) + } + } + /** * Loads messages and their blocks for a specific topic from the database * and updates the Redux store. @@ -789,10 +837,26 @@ export const sendMessage = export const loadTopicMessagesThunk = (topicId: string, forceReload: boolean = false) => async (dispatch: AppDispatch, getState: () => RootState) => { + // Use V2 implementation if feature flag is enabled + if (featureFlags.USE_UNIFIED_DB_SERVICE) { + return loadTopicMessagesThunkV2(topicId, forceReload)(dispatch, getState) + } + + // Original implementation const state = getState() const topicMessagesExist = !!state.messages.messageIdsByTopic[topicId] dispatch(newMessagesActions.setCurrentTopicId(topicId)) + // Check if it's an agent session topic + if (isAgentSessionTopicId(topicId)) { + if (topicMessagesExist && !forceReload) { + return // Keep existing messages in memory + } + // Load from agent backend instead of local DB + const sessionId = topicId.replace('agent-session:', '') + return dispatch(loadAgentSessionMessagesThunk(sessionId)) + } + if (topicMessagesExist && !forceReload) { return } diff --git a/src/renderer/src/store/thunk/messageThunk.v2.ts b/src/renderer/src/store/thunk/messageThunk.v2.ts new file mode 100644 index 0000000000..05effdf7c2 --- /dev/null +++ b/src/renderer/src/store/thunk/messageThunk.v2.ts @@ -0,0 +1,274 @@ +/** + * V2 implementations of message thunk functions using the unified DbService + * These implementations will be gradually rolled out using feature flags + */ + +import { loggerService } from '@logger' +import { dbService } from '@renderer/services/db' +import type { Topic } from '@renderer/types' +import { TopicType } from '@renderer/types' +import type { Message, MessageBlock } from '@renderer/types/newMessage' +import { isAgentSessionTopicId } from '@renderer/utils/agentSession' + +import type { AppDispatch, RootState } from '../index' +import { upsertManyBlocks } from '../messageBlock' +import { newMessagesActions } from '../newMessage' + +const logger = loggerService.withContext('MessageThunkV2') + +// ================================================================= +// Phase 2.1 - Batch 1: Read-only operations (lowest risk) +// ================================================================= + +/** + * Load messages for a topic using unified DbService + * This is the V2 implementation that will replace the original + */ +export const loadTopicMessagesThunkV2 = + (topicId: string, forceReload: boolean = false) => + async (dispatch: AppDispatch, getState: () => RootState) => { + const state = getState() + + // Skip if already cached and not forcing reload + if (!forceReload && state.messages.messageIdsByTopic[topicId]) { + logger.info('Messages already cached for topic', { topicId }) + return + } + + try { + dispatch(newMessagesActions.setTopicLoading({ topicId, loading: true })) + + // Unified call - no need to check isAgentSessionTopicId + const { messages, blocks } = await dbService.fetchMessages(topicId) + + logger.info('Loaded messages via DbService', { + topicId, + messageCount: messages.length, + blockCount: blocks.length + }) + + // Update Redux state with fetched data + if (blocks.length > 0) { + dispatch(upsertManyBlocks(blocks)) + } + dispatch(newMessagesActions.messagesReceived({ topicId, messages })) + } catch (error) { + logger.error(`Failed to load messages for topic ${topicId}:`, error) + // Could dispatch an error action here if needed + } finally { + dispatch(newMessagesActions.setTopicLoading({ topicId, loading: false })) + dispatch(newMessagesActions.setTopicFulfilled({ topicId, fulfilled: true })) + } + } + +/** + * Get raw topic data using unified DbService + * Returns topic with messages array + */ +export const getRawTopicV2 = async (topicId: string): Promise<{ id: string; messages: Message[] } | undefined> => { + try { + const rawTopic = await dbService.getRawTopic(topicId) + logger.info('Retrieved raw topic via DbService', { topicId, found: !!rawTopic }) + return rawTopic + } catch (error) { + logger.error('Failed to get raw topic:', { topicId, error }) + return undefined + } +} + +// ================================================================= +// Phase 2.2 - Batch 2: Helper functions +// ================================================================= + +/** + * Get a full topic object with type information + * This builds on getRawTopicV2 to provide additional metadata + */ +export const getTopicV2 = async (topicId: string): Promise => { + try { + const rawTopic = await dbService.getRawTopic(topicId) + if (!rawTopic) { + logger.info('Topic not found', { topicId }) + return undefined + } + + // Construct the full Topic object + const topic: Topic = { + id: rawTopic.id, + type: isAgentSessionTopicId(topicId) ? TopicType.AgentSession : TopicType.Chat, + messages: rawTopic.messages, + assistantId: '', // These fields would need to be fetched from appropriate source + name: '', + createdAt: Date.now(), + updatedAt: Date.now() + } + + logger.info('Retrieved topic with type via DbService', { + topicId, + type: topic.type, + messageCount: topic.messages.length + }) + + return topic + } catch (error) { + logger.error('Failed to get topic:', { topicId, error }) + return undefined + } +} + +/** + * Update file reference count + * Only applies to Dexie data source, no-op for agent sessions + */ +export const updateFileCountV2 = async ( + fileId: string, + delta: number, + deleteIfZero: boolean = false +): Promise => { + try { + await dbService.updateFileCount(fileId, delta, deleteIfZero) + logger.info('Updated file count', { fileId, delta, deleteIfZero }) + } catch (error) { + logger.error('Failed to update file count:', { fileId, delta, error }) + throw error + } +} + +// ================================================================= +// Phase 2.3 - Batch 3: Delete operations +// ================================================================= + +/** + * Delete a single message from database + */ +export const deleteMessageFromDBV2 = async (topicId: string, messageId: string): Promise => { + try { + await dbService.deleteMessage(topicId, messageId) + logger.info('Deleted message via DbService', { topicId, messageId }) + } catch (error) { + logger.error('Failed to delete message:', { topicId, messageId, error }) + throw error + } +} + +/** + * Delete multiple messages from database + */ +export const deleteMessagesFromDBV2 = async (topicId: string, messageIds: string[]): Promise => { + try { + await dbService.deleteMessages(topicId, messageIds) + logger.info('Deleted messages via DbService', { topicId, count: messageIds.length }) + } catch (error) { + logger.error('Failed to delete messages:', { topicId, messageIds, error }) + throw error + } +} + +/** + * Clear all messages from a topic + */ +export const clearMessagesFromDBV2 = async (topicId: string): Promise => { + try { + await dbService.clearMessages(topicId) + logger.info('Cleared all messages via DbService', { topicId }) + } catch (error) { + logger.error('Failed to clear messages:', { topicId, error }) + throw error + } +} + +// ================================================================= +// Phase 2.4 - Batch 4: Complex write operations +// ================================================================= + +/** + * Save a message and its blocks to database + * Uses unified interface, no need for isAgentSessionTopicId check + */ +export const saveMessageAndBlocksToDBV2 = async ( + topicId: string, + message: Message, + blocks: MessageBlock[] +): Promise => { + try { + // Direct call without conditional logic + await dbService.appendMessage(topicId, message, blocks) + logger.info('Saved message and blocks via DbService', { + topicId, + messageId: message.id, + blockCount: blocks.length + }) + } catch (error) { + logger.error('Failed to save message and blocks:', { topicId, messageId: message.id, error }) + throw error + } +} + +/** + * Persist a message exchange (user + assistant messages) + */ +export const persistExchangeV2 = async ( + topicId: string, + exchange: { + user?: { message: Message; blocks: MessageBlock[] } + assistant?: { message: Message; blocks: MessageBlock[] } + } +): Promise => { + try { + await dbService.persistExchange(topicId, exchange) + logger.info('Persisted exchange via DbService', { + topicId, + hasUser: !!exchange.user, + hasAssistant: !!exchange.assistant + }) + } catch (error) { + logger.error('Failed to persist exchange:', { topicId, error }) + throw error + } +} + +// Note: sendMessageV2 would be implemented here but it's more complex +// and would require more of the supporting code from messageThunk.ts + +// ================================================================= +// Phase 2.5 - Batch 5: Update operations +// ================================================================= + +/** + * Update a message in the database + */ +export const updateMessageV2 = async (topicId: string, messageId: string, updates: Partial): Promise => { + try { + await dbService.updateMessage(topicId, messageId, updates) + logger.info('Updated message via DbService', { topicId, messageId }) + } catch (error) { + logger.error('Failed to update message:', { topicId, messageId, error }) + throw error + } +} + +/** + * Update a single message block + */ +export const updateSingleBlockV2 = async (blockId: string, updates: Partial): Promise => { + try { + await dbService.updateSingleBlock(blockId, updates) + logger.info('Updated single block via DbService', { blockId }) + } catch (error) { + logger.error('Failed to update single block:', { blockId, error }) + throw error + } +} + +/** + * Bulk add message blocks + */ +export const bulkAddBlocksV2 = async (blocks: MessageBlock[]): Promise => { + try { + await dbService.bulkAddBlocks(blocks) + logger.info('Bulk added blocks via DbService', { count: blocks.length }) + } catch (error) { + logger.error('Failed to bulk add blocks:', { count: blocks.length, error }) + throw error + } +}