diff --git a/.env.example b/.env.example index 0d57ffc033..11a73b7d4f 100644 --- a/.env.example +++ b/.env.example @@ -6,3 +6,6 @@ CSLOGGER_MAIN_LEVEL=info CSLOGGER_RENDERER_LEVEL=info #CSLOGGER_MAIN_SHOW_MODULES= #CSLOGGER_RENDERER_SHOW_MODULES= + +# Feature Flags (must be prefixed with VITE_ to be accessible in renderer) +# VITE_USE_UNIFIED_DB_SERVICE=true # Enable unified DB service for chat/agent sessions diff --git a/STREAMING_STATE_SOLUTION.md b/STREAMING_STATE_SOLUTION.md new file mode 100644 index 0000000000..f8aa87dcfe --- /dev/null +++ b/STREAMING_STATE_SOLUTION.md @@ -0,0 +1,249 @@ +# Agent Session 流式状态保持方案 + +## 问题描述 +Agent会话中发送消息后,如果在响应过程中切换会话: +1. 消息内容不丢失了(已修复)✅ +2. 但是pending/processing状态丢失了 ❌ +3. loading状态丢失了 ❌ +4. 导致无法显示"暂停"按钮,无法中止正在进行的响应 + +## 问题分析 + +### 现状 +```javascript +// AgentSessionInputbar.tsx +const streamingAskIds = useMemo(() => { + // 检查消息的 status === 'processing' || 'pending' + // 切换会话后这些状态丢失了 +}, [topicMessages]) + +const canAbort = loading && streamingAskIds.length > 0 +// loading 状态也丢失了 +``` + +### 根本原因 +1. **消息保存时机问题**: + - 用户消息立即保存(状态为success) + - 助手消息创建时是pending状态 + - 但保存到后端时可能已经是最终状态 + +2. **状态管理问题**: + - loading状态只在Redux中,不持久化 + - 切换会话时Redux被清空 + - 重新加载时无法知道是否有正在进行的响应 + +## 解决方案 + +### 方案一:全局流式状态管理器(推荐)✅ + +创建一个全局的流式状态管理器,独立于Redux,跨会话保持状态。 + +```typescript +// src/renderer/src/services/StreamingStateManager.ts +class StreamingStateManager { + // 记录正在进行的流式响应 + private streamingSessions = new Map() + + startStreaming(topicId: string, askId: string, assistantMessageId: string, agentSession?: any) { + this.streamingSessions.set(topicId, { + topicId, + askId, + assistantMessageId, + startTime: Date.now(), + agentSession + }) + } + + stopStreaming(topicId: string) { + this.streamingSessions.delete(topicId) + } + + isStreaming(topicId: string): boolean { + return this.streamingSessions.has(topicId) + } + + getStreamingInfo(topicId: string) { + return this.streamingSessions.get(topicId) + } + + // 获取所有正在流式的会话 + getAllStreaming() { + return Array.from(this.streamingSessions.values()) + } + + // 清理超时的流式状态(防止内存泄漏) + cleanupStale(maxAge = 5 * 60 * 1000) { // 5分钟 + const now = Date.now() + for (const [topicId, info] of this.streamingSessions) { + if (now - info.startTime > maxAge) { + this.streamingSessions.delete(topicId) + } + } + } +} + +export const streamingStateManager = new StreamingStateManager() +``` + +**集成点**: + +1. **开始流式时**: +```typescript +// messageThunk.ts - fetchAndProcessAgentResponseImpl +streamingStateManager.startStreaming( + topicId, + userMessageId, + assistantMessage.id, + agentSession +) +``` + +2. **结束流式时**: +```typescript +// callbacks.ts - onComplete +streamingStateManager.stopStreaming(topicId) +``` + +3. **UI使用**: +```typescript +// AgentSessionInputbar.tsx +const isStreaming = streamingStateManager.isStreaming(sessionTopicId) +const streamingInfo = streamingStateManager.getStreamingInfo(sessionTopicId) + +const canAbort = isStreaming && streamingInfo?.askId +``` + +### 方案二:增强消息持久化(备选) + +修改消息保存逻辑,保留流式状态: + +```typescript +// AgentMessageDataSource.ts +async appendMessage(topicId: string, message: Message, blocks: MessageBlock[]) { + // 保存时保留 pending/processing 状态 + const messageToSave = { + ...message, + // 如果是助手消息且状态是pending,保持这个状态 + status: message.status === 'pending' ? 'pending' : message.status + } + + // ... 保存逻辑 +} + +// 加载时恢复状态 +async fetchMessages(topicId: string) { + const { messages, blocks } = // ... 从后端加载 + + // 检查是否有未完成的消息 + for (const msg of messages) { + if (msg.status === 'pending' || msg.status === 'processing') { + // 恢复loading状态 + dispatch(newMessagesActions.setTopicLoading({ topicId, loading: true })) + + // 可能需要重新启动流式处理或标记为失败 + } + } +} +``` + +### 方案三:Session级别状态存储(简单但有限) + +在localStorage或sessionStorage中保存流式状态: + +```typescript +// 保存流式状态 +const saveStreamingState = (topicId: string, state: any) => { + const states = JSON.parse(localStorage.getItem('streamingStates') || '{}') + states[topicId] = { + ...state, + timestamp: Date.now() + } + localStorage.setItem('streamingStates', JSON.stringify(states)) +} + +// 恢复流式状态 +const getStreamingState = (topicId: string) => { + const states = JSON.parse(localStorage.getItem('streamingStates') || '{}') + const state = states[topicId] + + // 检查是否过期(比如超过5分钟) + if (state && Date.now() - state.timestamp < 5 * 60 * 1000) { + return state + } + + // 清理过期状态 + delete states[topicId] + localStorage.setItem('streamingStates', JSON.stringify(states)) + return null +} +``` + +## 推荐实施步骤 + +### 步骤1:实现StreamingStateManager +1. 创建全局状态管理器 +2. 在开始/结束流式时更新状态 +3. 添加定期清理机制 + +### 步骤2:更新messageThunk.ts +1. 在`fetchAndProcessAgentResponseImpl`开始时注册流式状态 +2. 在完成/错误/中止时清除状态 +3. 确保所有退出路径都清理状态 + +### 步骤3:更新UI组件 +1. 修改`AgentSessionInputbar.tsx`使用StreamingStateManager +2. 不再依赖消息的status字段判断流式状态 +3. 使用全局状态判断是否显示暂停按钮 + +### 步骤4:处理边界情况 +1. 页面刷新时的状态恢复 +2. 网络中断的处理 +3. 超时自动清理 + +## 测试验证 + +### 测试场景 +1. **正常流式**: + - 发送消息 + - 观察流式响应 + - 验证暂停按钮显示 + +2. **切换会话**: + - 发送消息开始流式 + - 立即切换到其他会话 + - 切回来验证暂停按钮仍然显示 + - 可以正确暂停 + +3. **刷新页面**: + - 流式过程中刷新 + - 验证状态是否合理处理(显示失败或继续) + +4. **超时清理**: + - 模拟长时间流式 + - 验证超时后状态被清理 + +## 优势对比 + +| 方案 | 优点 | 缺点 | +|------|------|------| +| 全局状态管理器 | • 简单可靠
• 跨会话工作
• 易于调试 | • 需要额外内存
• 页面刷新丢失 | +| 增强持久化 | • 数据一致性好
• 页面刷新可恢复 | • 实现复杂
• 需要后端配合 | +| Session存储 | • 实现简单
• 可跨页面刷新 | • 容量限制
• 需要清理逻辑 | + +## 建议 +推荐使用**方案一:全局流式状态管理器**,因为: +1. 实现简单,不需要修改后端 +2. 可以快速解决当前问题 +3. 易于扩展和维护 +4. 对现有代码改动最小 + +如果需要页面刷新后也能恢复状态,可以结合方案三,将关键信息保存到localStorage。 \ No newline at end of file diff --git a/packages/shared/IpcChannel.ts b/packages/shared/IpcChannel.ts index e3ee5edf22..10de86024a 100644 --- a/packages/shared/IpcChannel.ts +++ b/packages/shared/IpcChannel.ts @@ -91,6 +91,7 @@ export enum IpcChannel { // agent messages AgentMessage_PersistExchange = 'agent-message:persist-exchange', + AgentMessage_GetHistory = 'agent-message:get-history', //copilot Copilot_GetAuthMessage = 'copilot:get-auth-message', diff --git a/src/main/ipc.ts b/src/main/ipc.ts index b770e7def0..c375d86834 100644 --- a/src/main/ipc.ts +++ b/src/main/ipc.ts @@ -209,6 +209,15 @@ export function registerIpc(mainWindow: BrowserWindow, app: Electron.App) { } }) + ipcMain.handle(IpcChannel.AgentMessage_GetHistory, async (_event, { sessionId }: { sessionId: string }) => { + try { + return await agentMessageRepository.getSessionHistory(sessionId) + } catch (error) { + logger.error('Failed to get agent session history', error as Error) + throw error + } + }) + //only for mac if (isMac) { ipcMain.handle(IpcChannel.App_MacIsProcessTrusted, (): boolean => { diff --git a/src/main/services/agents/database/sessionMessageRepository.ts b/src/main/services/agents/database/sessionMessageRepository.ts index d6a767cf09..2963cce327 100644 --- a/src/main/services/agents/database/sessionMessageRepository.ts +++ b/src/main/services/agents/database/sessionMessageRepository.ts @@ -7,6 +7,7 @@ import type { AgentPersistedMessage, AgentSessionMessageEntity } from '@types' +import { asc, eq } from 'drizzle-orm' import { BaseService } from '../BaseService' import type { InsertSessionMessageRow } from './schema' @@ -176,6 +177,34 @@ class AgentMessageRepository extends BaseService { return result } + + async getSessionHistory(sessionId: string): Promise { + await AgentMessageRepository.initialize() + this.ensureInitialized() + + try { + const rows = await this.database + .select() + .from(sessionMessagesTable) + .where(eq(sessionMessagesTable.session_id, sessionId)) + .orderBy(asc(sessionMessagesTable.created_at)) + + const messages: AgentPersistedMessage[] = [] + + for (const row of rows) { + const deserialized = this.deserialize(row) + if (deserialized?.content) { + messages.push(deserialized.content as AgentPersistedMessage) + } + } + + logger.info(`Loaded ${messages.length} messages for session ${sessionId}`) + return messages + } catch (error) { + logger.error('Failed to load session history', error as Error) + throw error + } + } } export const agentMessageRepository = AgentMessageRepository.getInstance() diff --git a/src/renderer/src/config/featureFlags.ts b/src/renderer/src/config/featureFlags.ts new file mode 100644 index 0000000000..a2e7492bd1 --- /dev/null +++ b/src/renderer/src/config/featureFlags.ts @@ -0,0 +1,81 @@ +/** + * Feature flags for controlling gradual rollout of new features + * These flags can be toggled to enable/disable features without code changes + */ + +interface FeatureFlags { + /** + * Enable unified database service for both regular chats and agent sessions + * When enabled, uses the new DbService facade pattern + * When disabled, uses the original implementation with conditional checks + */ + USE_UNIFIED_DB_SERVICE: boolean +} + +/** + * Default feature flag values + * Set to false initially for safe rollout + */ +export const featureFlags: FeatureFlags = { + USE_UNIFIED_DB_SERVICE: false +} + +/** + * Override feature flags from environment or local storage + * Priority order (highest to lowest): + * 1. localStorage (runtime overrides) + * 2. Environment variables (build-time config) + * 3. Default values + */ +export function initializeFeatureFlags(): void { + // First, check environment variables (build-time configuration) + // In Vite, env vars must be prefixed with VITE_ to be exposed to the client + // Usage: VITE_USE_UNIFIED_DB_SERVICE=true yarn dev + if (import.meta.env?.VITE_USE_UNIFIED_DB_SERVICE === 'true') { + featureFlags.USE_UNIFIED_DB_SERVICE = true + console.log('[FeatureFlags] USE_UNIFIED_DB_SERVICE enabled via environment variable') + } + + // Then check localStorage for runtime overrides (higher priority) + // This allows toggling features without rebuilding + try { + const localOverrides = localStorage.getItem('featureFlags') + if (localOverrides) { + const overrides = JSON.parse(localOverrides) + Object.keys(overrides).forEach((key) => { + if (key in featureFlags) { + featureFlags[key as keyof FeatureFlags] = overrides[key] + console.log(`[FeatureFlags] ${key} set to ${overrides[key]} via localStorage`) + } + }) + } + } catch (e) { + console.warn('[FeatureFlags] Failed to parse feature flags from localStorage:', e) + } + + console.log('[FeatureFlags] Current flags:', featureFlags) +} + +/** + * Update a feature flag value at runtime + * Useful for A/B testing or gradual rollout + */ +export function setFeatureFlag(flag: keyof FeatureFlags, value: boolean): void { + featureFlags[flag] = value + + // Persist to localStorage for consistency across app restarts + const currentFlags = localStorage.getItem('featureFlags') + const flags = currentFlags ? JSON.parse(currentFlags) : {} + flags[flag] = value + localStorage.setItem('featureFlags', JSON.stringify(flags)) +} + +/** + * Get current value of a feature flag + */ +export function getFeatureFlag(flag: keyof FeatureFlags): boolean { + return featureFlags[flag] +} + +// Initialize on import +initializeFeatureFlags() diff --git a/src/renderer/src/hooks/agents/useSession.ts b/src/renderer/src/hooks/agents/useSession.ts index ba54447c52..ded5b9aabe 100644 --- a/src/renderer/src/hooks/agents/useSession.ts +++ b/src/renderer/src/hooks/agents/useSession.ts @@ -1,9 +1,8 @@ import { useAppDispatch } from '@renderer/store' -import { removeManyBlocks, upsertManyBlocks } from '@renderer/store/messageBlock' -import { newMessagesActions } from '@renderer/store/newMessage' -import { AgentPersistedMessage, UpdateSessionForm } from '@renderer/types' +import { loadTopicMessagesThunk } from '@renderer/store/thunk/messageThunk' +import { UpdateSessionForm } from '@renderer/types' import { buildAgentSessionTopicId } from '@renderer/utils/agentSession' -import { useCallback, useEffect, useMemo, useRef } from 'react' +import { useCallback, useEffect, useMemo } from 'react' import { useTranslation } from 'react-i18next' import useSWR from 'swr' @@ -15,7 +14,6 @@ export const useSession = (agentId: string, sessionId: string) => { const key = client.getSessionPaths(agentId).withId(sessionId) const dispatch = useAppDispatch() const sessionTopicId = useMemo(() => buildAgentSessionTopicId(sessionId), [sessionId]) - const blockIdsRef = useRef([]) const fetcher = async () => { const data = await client.getSession(agentId, sessionId) @@ -23,37 +21,15 @@ export const useSession = (agentId: string, sessionId: string) => { } const { data, error, isLoading, mutate } = useSWR(key, fetcher) + // Use loadTopicMessagesThunk to load messages (with caching mechanism) + // This ensures messages are preserved when switching between sessions/tabs useEffect(() => { - const messages = data?.messages ?? [] - if (!messages.length) { - dispatch(newMessagesActions.messagesReceived({ topicId: sessionTopicId, messages: [] })) - blockIdsRef.current = [] - return + if (sessionId) { + // loadTopicMessagesThunk will check if messages already exist in Redux + // and skip loading if they do (unless forceReload is true) + dispatch(loadTopicMessagesThunk(sessionTopicId)) } - - const persistedEntries = messages - .map((entity) => entity.content as AgentPersistedMessage | undefined) - .filter((entry): entry is AgentPersistedMessage => Boolean(entry)) - - const allBlocks = persistedEntries.flatMap((entry) => entry.blocks) - if (allBlocks.length > 0) { - dispatch(upsertManyBlocks(allBlocks)) - } - - blockIdsRef.current = allBlocks.map((block) => block.id) - - const messageRecords = persistedEntries.map((entry) => entry.message) - dispatch(newMessagesActions.messagesReceived({ topicId: sessionTopicId, messages: messageRecords })) - }, [data?.messages, dispatch, sessionTopicId]) - - useEffect(() => { - return () => { - if (blockIdsRef.current.length > 0) { - dispatch(removeManyBlocks(blockIdsRef.current)) - } - dispatch(newMessagesActions.clearTopicMessages(sessionTopicId)) - } - }, [dispatch, sessionTopicId]) + }, [dispatch, sessionId, sessionTopicId]) const updateSession = useCallback( async (form: UpdateSessionForm) => { diff --git a/src/renderer/src/pages/home/Messages/AgentSessionMessages.tsx b/src/renderer/src/pages/home/Messages/AgentSessionMessages.tsx index 9e54b550ef..90b284d6c4 100644 --- a/src/renderer/src/pages/home/Messages/AgentSessionMessages.tsx +++ b/src/renderer/src/pages/home/Messages/AgentSessionMessages.tsx @@ -1,10 +1,9 @@ import { loggerService } from '@logger' import ContextMenu from '@renderer/components/ContextMenu' import { useSession } from '@renderer/hooks/agents/useSession' +import { useTopicMessages } from '@renderer/hooks/useMessageOperations' import { getGroupedMessages } from '@renderer/services/MessagesService' -import { useAppSelector } from '@renderer/store' -import { selectMessagesForTopic } from '@renderer/store/newMessage' -import { Topic } from '@renderer/types' +import { type Topic, TopicType } from '@renderer/types' import { buildAgentSessionTopicId } from '@renderer/utils/agentSession' import { memo, useMemo } from 'react' import styled from 'styled-components' @@ -23,7 +22,8 @@ type Props = { const AgentSessionMessages: React.FC = ({ agentId, sessionId }) => { const { session } = useSession(agentId, sessionId) const sessionTopicId = useMemo(() => buildAgentSessionTopicId(sessionId), [sessionId]) - const messages = useAppSelector((state) => selectMessagesForTopic(state, sessionTopicId)) + // Use the same hook as Messages.tsx for consistent behavior + const messages = useTopicMessages(sessionTopicId) const displayMessages = useMemo(() => { if (!messages || messages.length === 0) return [] @@ -43,6 +43,7 @@ const AgentSessionMessages: React.FC = ({ agentId, sessionId }) => { const derivedTopic = useMemo( () => ({ id: sessionTopicId, + type: TopicType.Session, assistantId: sessionAssistantId, name: sessionName, createdAt: sessionCreatedAt, diff --git a/src/renderer/src/services/db/AgentMessageDataSource.ts b/src/renderer/src/services/db/AgentMessageDataSource.ts new file mode 100644 index 0000000000..94fa248e37 --- /dev/null +++ b/src/renderer/src/services/db/AgentMessageDataSource.ts @@ -0,0 +1,478 @@ +import { loggerService } from '@logger' +import type { AgentPersistedMessage } from '@renderer/types/agent' +import type { Message, MessageBlock } from '@renderer/types/newMessage' +import { IpcChannel } from '@shared/IpcChannel' +import { throttle } from 'lodash' +import { LRUCache } from 'lru-cache' + +import type { MessageDataSource } from './types' +import { extractSessionId } from './types' + +const logger = loggerService.withContext('AgentMessageDataSource') + +/** + * Streaming message cache to track messages being streamed + * Key: messageId, Value: { message, blocks, isComplete } + */ +const streamingMessageCache = new LRUCache< + string, + { + message: Message + blocks: MessageBlock[] + isComplete: boolean + sessionId: string + } +>({ + max: 100, + ttl: 1000 * 60 * 5 // 5 minutes +}) + +/** + * Throttled persisters for each message to batch updates during streaming + */ +const messagePersistThrottlers = new LRUCache>({ + max: 100, + ttl: 1000 * 60 * 5 +}) + +/** + * IPC-based implementation of MessageDataSource + * Handles agent session messages through backend communication + */ +export class AgentMessageDataSource implements MessageDataSource { + // ============ Helper Methods ============ + + /** + * Get or create a throttled persister for a message + */ + private getMessagePersister(messageId: string): ReturnType { + if (!messagePersistThrottlers.has(messageId)) { + const persister = throttle(async () => { + const cached = streamingMessageCache.get(messageId) + if (!cached) return + + const { message, blocks, sessionId, isComplete } = cached + + try { + // Persist to backend + await window.electron.ipcRenderer.invoke(IpcChannel.AgentMessage_PersistExchange, { + sessionId, + agentSessionId: '', + ...(message.role === 'user' + ? { user: { payload: { message, blocks } } } + : { assistant: { payload: { message, blocks } } }) + }) + + logger.debug(`Persisted ${isComplete ? 'complete' : 'streaming'} message ${messageId} to backend`) + + // Clean up if complete + if (isComplete) { + streamingMessageCache.delete(messageId) + messagePersistThrottlers.delete(messageId) + } + } catch (error) { + logger.error(`Failed to persist message ${messageId}:`, error as Error) + } + }, 500) // Throttle to 500ms for agent messages (less frequent than chat) + + messagePersistThrottlers.set(messageId, persister) + } + + return messagePersistThrottlers.get(messageId)! + } + + /** + * Check if a message is in streaming state based on status + */ + private isMessageStreaming(message: Partial): boolean { + return message.status?.includes('ing') ?? false + } + + /** + * Clean up resources for a message + */ + private cleanupMessage(messageId: string): void { + streamingMessageCache.delete(messageId) + const throttler = messagePersistThrottlers.get(messageId) + if (throttler) { + throttler.cancel() + messagePersistThrottlers.delete(messageId) + } + } + + // ============ Read Operations ============ + + async fetchMessages(topicId: string): Promise<{ + messages: Message[] + blocks: MessageBlock[] + }> { + try { + const sessionId = extractSessionId(topicId) + + if (!window.electron?.ipcRenderer) { + logger.warn('IPC renderer not available') + return { messages: [], blocks: [] } + } + + // Fetch from agent backend + const historicalMessages: AgentPersistedMessage[] = await window.electron.ipcRenderer.invoke( + IpcChannel.AgentMessage_GetHistory, + { sessionId } + ) + + if (!historicalMessages || !Array.isArray(historicalMessages)) { + return { messages: [], blocks: [] } + } + + const messages: Message[] = [] + const blocks: MessageBlock[] = [] + + for (const persistedMsg of historicalMessages) { + if (persistedMsg?.message) { + messages.push(persistedMsg.message) + if (persistedMsg.blocks && persistedMsg.blocks.length > 0) { + blocks.push(...persistedMsg.blocks) + } + } + } + + logger.info(`Loaded ${messages.length} messages for agent session ${sessionId}`) + + return { messages, blocks } + } catch (error) { + logger.error(`Failed to fetch messages for agent session ${topicId}:`, error as Error) + throw error + } + } + + // ============ Write Operations ============ + async appendMessage(topicId: string, message: Message, blocks: MessageBlock[], _insertIndex?: number): Promise { + const sessionId = extractSessionId(topicId) + if (!sessionId) { + throw new Error(`Invalid agent session topicId: ${topicId}`) + } + + try { + const isStreaming = this.isMessageStreaming(message) + + // Always persist immediately for visibility in UI + const payload: AgentPersistedMessage = { + message, + blocks + } + + await window.electron.ipcRenderer.invoke(IpcChannel.AgentMessage_PersistExchange, { + sessionId, + agentSessionId: '', + ...(message.role === 'user' ? { user: { payload } } : { assistant: { payload } }) + }) + + logger.info(`Saved ${message.role} message for agent session ${sessionId}`, { + messageId: message.id, + blockCount: blocks.length, + status: message.status, + isStreaming + }) + + // If streaming, also set up cache for throttled updates + if (isStreaming && message.role === 'assistant') { + streamingMessageCache.set(message.id, { + message, + blocks, + isComplete: false, + sessionId + }) + + // Set up throttled persister for future updates + this.getMessagePersister(message.id) + + logger.debug(`Set up streaming cache for message ${message.id}`) + } else { + // Clean up any streaming cache for non-streaming messages + this.cleanupMessage(message.id) + } + } catch (error) { + logger.error(`Failed to save message for agent session ${topicId}:`, error as Error) + throw error + } + } + + async updateMessage(topicId: string, messageId: string, updates: Partial): Promise { + const sessionId = extractSessionId(topicId) + if (!sessionId) { + throw new Error(`Invalid agent session topicId: ${topicId}`) + } + + try { + // Fetch current message from backend to merge updates + const historicalMessages: AgentPersistedMessage[] = await window.electron.ipcRenderer.invoke( + IpcChannel.AgentMessage_GetHistory, + { sessionId } + ) + + const existingMessage = historicalMessages?.find((pm) => pm.message?.id === messageId) + if (!existingMessage?.message) { + logger.warn(`Message ${messageId} not found in agent session ${sessionId}`) + return + } + + // Merge updates with existing message + const updatedMessage = { ...existingMessage.message, ...updates } + + // Save updated message back to backend + await window.electron.ipcRenderer.invoke(IpcChannel.AgentMessage_PersistExchange, { + sessionId, + agentSessionId: '', + ...(updatedMessage.role === 'user' + ? { user: { payload: { message: updatedMessage, blocks: existingMessage.blocks || [] } } } + : { assistant: { payload: { message: updatedMessage, blocks: existingMessage.blocks || [] } } }) + }) + + logger.info(`Updated message ${messageId} in agent session ${sessionId}`) + } catch (error) { + logger.error(`Failed to update message ${messageId} in agent session ${topicId}:`, error as Error) + throw error + } + } + + async updateMessageAndBlocks( + topicId: string, + messageUpdates: Partial & Pick, + blocksToUpdate: MessageBlock[] + ): Promise { + const sessionId = extractSessionId(topicId) + if (!sessionId) { + throw new Error(`Invalid agent session topicId: ${topicId}`) + } + + try { + const isStreaming = this.isMessageStreaming(messageUpdates) + + // Check if we have cached data for this message + const cached = streamingMessageCache.get(messageUpdates.id) + + if (isStreaming) { + // During streaming, update cache and trigger throttled persist + let currentMessage: Message + let currentBlocks: MessageBlock[] + + if (cached) { + // Update existing cached message + currentMessage = { ...cached.message, ...messageUpdates } + // Merge blocks - use new blocks if provided, otherwise keep cached + currentBlocks = blocksToUpdate.length > 0 ? blocksToUpdate : cached.blocks + } else { + // First streaming update - fetch from backend or create new + const historicalMessages: AgentPersistedMessage[] = await window.electron.ipcRenderer.invoke( + IpcChannel.AgentMessage_GetHistory, + { sessionId } + ) + + const existingMessage = historicalMessages?.find((pm) => pm.message?.id === messageUpdates.id) + + if (existingMessage?.message) { + currentMessage = { ...existingMessage.message, ...messageUpdates } + currentBlocks = blocksToUpdate.length > 0 ? blocksToUpdate : existingMessage.blocks || [] + } else { + // New message + if (!messageUpdates.topicId || !messageUpdates.role) { + logger.warn(`Incomplete message data for streaming message ${messageUpdates.id}`) + return + } + currentMessage = messageUpdates as Message + currentBlocks = blocksToUpdate + } + } + + // Update cache + streamingMessageCache.set(messageUpdates.id, { + message: currentMessage, + blocks: currentBlocks, + isComplete: false, + sessionId + }) + + // Trigger throttled persist + const persister = this.getMessagePersister(messageUpdates.id) + persister() + + logger.debug(`Updated streaming cache for message ${messageUpdates.id}`, { + status: messageUpdates.status, + blockCount: currentBlocks.length + }) + } else { + // Not streaming - persist immediately + let finalMessage: Message + let finalBlocks: MessageBlock[] + + if (cached) { + // Use cached data as base + finalMessage = { ...cached.message, ...messageUpdates } + finalBlocks = blocksToUpdate.length > 0 ? blocksToUpdate : cached.blocks + } else { + // Fetch from backend if no cache + const historicalMessages: AgentPersistedMessage[] = await window.electron.ipcRenderer.invoke( + IpcChannel.AgentMessage_GetHistory, + { sessionId } + ) + + const existingMessage = historicalMessages?.find((pm) => pm.message?.id === messageUpdates.id) + + if (existingMessage?.message) { + finalMessage = { ...existingMessage.message, ...messageUpdates } + finalBlocks = blocksToUpdate.length > 0 ? blocksToUpdate : existingMessage.blocks || [] + } else { + if (!messageUpdates.topicId || !messageUpdates.role) { + logger.warn(`Incomplete message data for ${messageUpdates.id}`) + return + } + finalMessage = messageUpdates as Message + finalBlocks = blocksToUpdate + } + } + + // Mark as complete in cache if it was streaming + if (cached) { + streamingMessageCache.set(messageUpdates.id, { + message: finalMessage, + blocks: finalBlocks, + isComplete: true, + sessionId + }) + } + + // Persist to backend + await window.electron.ipcRenderer.invoke(IpcChannel.AgentMessage_PersistExchange, { + sessionId, + agentSessionId: '', + ...(finalMessage.role === 'user' + ? { user: { payload: { message: finalMessage, blocks: finalBlocks } } } + : { assistant: { payload: { message: finalMessage, blocks: finalBlocks } } }) + }) + + logger.info(`Persisted complete message ${messageUpdates.id} for agent session ${sessionId}`, { + status: finalMessage.status, + blockCount: finalBlocks.length + }) + + // Clean up + this.cleanupMessage(messageUpdates.id) + } + } catch (error) { + logger.error(`Failed to update message and blocks for agent session ${topicId}:`, error as Error) + throw error + } + } + + async deleteMessage(topicId: string, _messageId: string): Promise { + // Agent session messages cannot be deleted individually + logger.warn(`deleteMessage called for agent session ${topicId}, operation not supported`) + + // In a full implementation, you might want to: + // 1. Implement soft delete in backend + // 2. Or just hide from UI without actual deletion + } + + async deleteMessages(topicId: string, _messageIds: string[]): Promise { + // Agent session messages cannot be deleted in batch + logger.warn(`deleteMessages called for agent session ${topicId}, operation not supported`) + + // In a full implementation, you might want to: + // 1. Implement batch soft delete in backend + // 2. Update local state accordingly + } + + async deleteMessagesByAskId(topicId: string, _askId: string): Promise { + // Agent session messages cannot be deleted + logger.warn(`deleteMessagesByAskId called for agent session ${topicId}, operation not supported`) + } + + // ============ Block Operations ============ + + async updateBlocks(_blocks: MessageBlock[]): Promise { + // Blocks are updated through persistExchange for agent sessions + logger.warn('updateBlocks called for agent session, operation not supported individually') + } + + async deleteBlocks(_blockIds: string[]): Promise { + // Blocks cannot be deleted individually for agent sessions + logger.warn('deleteBlocks called for agent session, operation not supported') + } + + // ============ Batch Operations ============ + + async clearMessages(topicId: string): Promise { + const sessionId = extractSessionId(topicId) + + if (!window.electron?.ipcRenderer) { + logger.warn('IPC renderer not available for clear messages') + return + } + + // In a full implementation, you would call a backend endpoint to clear session + // For now, we'll just log the attempt + logger.info(`Clear messages requested for agent session ${sessionId}`) + + // You might want to implement: + // await window.electron.ipcRenderer.invoke( + // IpcChannel.AgentMessage_ClearSession, + // { sessionId } + // ) + } + + async topicExists(topicId: string): Promise { + try { + const sessionId = extractSessionId(topicId) + + if (!window.electron?.ipcRenderer) { + return false + } + return sessionId != null + } catch (error) { + return false + } + } + + async ensureTopic(topicId: string): Promise { + // Agent sessions are created externally, not by the chat interface + // This is a no-op for agent sessions + const sessionId = extractSessionId(topicId) + logger.info(`ensureTopic called for agent session ${sessionId}, no action needed`) + } + + async getRawTopic(topicId: string): Promise<{ id: string; messages: Message[] } | undefined> { + try { + // For agent sessions, fetch messages from backend and return in raw topic format + const { messages } = await this.fetchMessages(topicId) + return { + id: topicId, + messages + } + } catch (error) { + logger.error(`Failed to get raw topic for agent session ${topicId}:`, error as Error) + return undefined + } + } + + // ============ Additional Methods for Interface Compatibility ============ + + async updateSingleBlock(blockId: string, _updates: Partial): Promise { + // Agent session blocks are immutable once persisted + logger.warn(`updateSingleBlock called for agent session block ${blockId}, operation not supported`) + } + + async bulkAddBlocks(_blocks: MessageBlock[]): Promise { + // Agent session blocks are added through persistExchange + logger.warn(`bulkAddBlocks called for agent session, operation not supported individually`) + } + + async updateFileCount(fileId: string, _delta: number, _deleteIfZero?: boolean): Promise { + // Agent sessions don't manage file reference counts locally + logger.warn(`updateFileCount called for agent session file ${fileId}, operation not supported`) + } + + async updateFileCounts(_files: Array<{ id: string; delta: number; deleteIfZero?: boolean }>): Promise { + // Agent sessions don't manage file reference counts locally + logger.warn(`updateFileCounts called for agent session, operation not supported`) + } +} diff --git a/src/renderer/src/services/db/DbService.ts b/src/renderer/src/services/db/DbService.ts new file mode 100644 index 0000000000..4dab69adc6 --- /dev/null +++ b/src/renderer/src/services/db/DbService.ts @@ -0,0 +1,196 @@ +import { loggerService } from '@logger' +import type { Message, MessageBlock } from '@renderer/types/newMessage' + +import { AgentMessageDataSource } from './AgentMessageDataSource' +import { DexieMessageDataSource } from './DexieMessageDataSource' +import type { MessageDataSource } from './types' +import { isAgentSessionTopicId } from './types' + +const logger = loggerService.withContext('DbService') + +/** + * Facade service that routes data operations to the appropriate data source + * based on the topic ID type (regular chat or agent session) + */ +class DbService implements MessageDataSource { + private static instance: DbService + private dexieSource: DexieMessageDataSource + private agentSource: AgentMessageDataSource + + private constructor() { + this.dexieSource = new DexieMessageDataSource() + this.agentSource = new AgentMessageDataSource() + } + + /** + * Get singleton instance + */ + static getInstance(): DbService { + if (!DbService.instance) { + DbService.instance = new DbService() + } + return DbService.instance + } + + /** + * Determine which data source to use based on topic ID + */ + private getDataSource(topicId: string): MessageDataSource { + if (isAgentSessionTopicId(topicId)) { + logger.silly(`Using AgentMessageDataSource for topic ${topicId}`) + return this.agentSource + } + + // Future: Could add more data source types here + // e.g., if (isCloudTopicId(topicId)) return this.cloudSource + + logger.silly(`Using DexieMessageDataSource for topic ${topicId}`) + return this.dexieSource + } + + // ============ Read Operations ============ + + async fetchMessages( + topicId: string, + forceReload?: boolean + ): Promise<{ + messages: Message[] + blocks: MessageBlock[] + }> { + const source = this.getDataSource(topicId) + return source.fetchMessages(topicId, forceReload) + } + + // ============ Write Operations ============ + async appendMessage(topicId: string, message: Message, blocks: MessageBlock[], insertIndex?: number): Promise { + const source = this.getDataSource(topicId) + return source.appendMessage(topicId, message, blocks, insertIndex) + } + + async updateMessage(topicId: string, messageId: string, updates: Partial): Promise { + const source = this.getDataSource(topicId) + return source.updateMessage(topicId, messageId, updates) + } + + async updateMessageAndBlocks( + topicId: string, + messageUpdates: Partial & Pick, + blocksToUpdate: MessageBlock[] + ): Promise { + const source = this.getDataSource(topicId) + return source.updateMessageAndBlocks(topicId, messageUpdates, blocksToUpdate) + } + + async deleteMessage(topicId: string, messageId: string): Promise { + const source = this.getDataSource(topicId) + return source.deleteMessage(topicId, messageId) + } + + async deleteMessages(topicId: string, messageIds: string[]): Promise { + const source = this.getDataSource(topicId) + return source.deleteMessages(topicId, messageIds) + } + + // ============ Block Operations ============ + + async updateBlocks(blocks: MessageBlock[]): Promise { + // For block operations, we need to infer the source from the first block's message + // This is a limitation of the current design where blocks don't have topicId + // In practice, blocks are usually updated in context of a topic operation + + // Default to Dexie for now since agent blocks are updated through persistExchange + return this.dexieSource.updateBlocks(blocks) + } + + async deleteBlocks(blockIds: string[]): Promise { + // Similar limitation as updateBlocks + // Default to Dexie since agent blocks can't be deleted individually + return this.dexieSource.deleteBlocks(blockIds) + } + + // ============ Batch Operations ============ + + async clearMessages(topicId: string): Promise { + const source = this.getDataSource(topicId) + return source.clearMessages(topicId) + } + + async topicExists(topicId: string): Promise { + const source = this.getDataSource(topicId) + return source.topicExists(topicId) + } + + async ensureTopic(topicId: string): Promise { + const source = this.getDataSource(topicId) + return source.ensureTopic(topicId) + } + + // ============ Optional Methods (with fallback) ============ + + async getRawTopic(topicId: string): Promise<{ id: string; messages: Message[] } | undefined> { + const source = this.getDataSource(topicId) + return source.getRawTopic(topicId) + } + + async updateSingleBlock(blockId: string, updates: Partial): Promise { + // For single block operations, default to Dexie since agent blocks are immutable + if (this.dexieSource.updateSingleBlock) { + return this.dexieSource.updateSingleBlock(blockId, updates) + } + // Fallback to updateBlocks with single item + return this.dexieSource.updateBlocks([{ ...updates, id: blockId } as MessageBlock]) + } + + async bulkAddBlocks(blocks: MessageBlock[]): Promise { + // For bulk add operations, default to Dexie since agent blocks use persistExchange + if (this.dexieSource.bulkAddBlocks) { + return this.dexieSource.bulkAddBlocks(blocks) + } + // Fallback to updateBlocks + return this.dexieSource.updateBlocks(blocks) + } + + async updateFileCount(fileId: string, delta: number, deleteIfZero: boolean = false): Promise { + // File operations only apply to Dexie source + if (this.dexieSource.updateFileCount) { + return this.dexieSource.updateFileCount(fileId, delta, deleteIfZero) + } + // No-op if not supported + logger.warn(`updateFileCount not supported for file ${fileId}`) + } + + async updateFileCounts(files: Array<{ id: string; delta: number; deleteIfZero?: boolean }>): Promise { + // File operations only apply to Dexie source + if (this.dexieSource.updateFileCounts) { + return this.dexieSource.updateFileCounts(files) + } + // No-op if not supported + logger.warn(`updateFileCounts not supported`) + } + + // ============ Utility Methods ============ + + /** + * Check if a topic is an agent session + */ + isAgentSession(topicId: string): boolean { + return isAgentSessionTopicId(topicId) + } + + /** + * Get the data source type for a topic + */ + getSourceType(topicId: string): 'dexie' | 'agent' | 'unknown' { + if (isAgentSessionTopicId(topicId)) { + return 'agent' + } + // Add more checks for other source types as needed + return 'dexie' + } +} + +// Export singleton instance +export const dbService = DbService.getInstance() + +// Also export class for testing purposes +export { DbService } diff --git a/src/renderer/src/services/db/DexieMessageDataSource.ts b/src/renderer/src/services/db/DexieMessageDataSource.ts new file mode 100644 index 0000000000..a8d3679840 --- /dev/null +++ b/src/renderer/src/services/db/DexieMessageDataSource.ts @@ -0,0 +1,406 @@ +import { loggerService } from '@logger' +import db from '@renderer/databases' +import FileManager from '@renderer/services/FileManager' +import store from '@renderer/store' +import { updateTopicUpdatedAt } from '@renderer/store/assistants' +import type { Message, MessageBlock } from '@renderer/types/newMessage' +import { isEmpty } from 'lodash' + +import type { MessageDataSource } from './types' + +const logger = loggerService.withContext('DexieMessageDataSource') + +/** + * Dexie-based implementation of MessageDataSource + * Handles local IndexedDB storage for regular chat messages + */ +export class DexieMessageDataSource implements MessageDataSource { + // ============ Read Operations ============ + + async fetchMessages(topicId: string): Promise<{ + messages: Message[] + blocks: MessageBlock[] + }> { + try { + const topic = await db.topics.get(topicId) + if (!topic) { + await db.topics.add({ id: topicId, messages: [] }) + } + const messages = topic?.messages || [] + + if (messages.length === 0) { + return { messages: [], blocks: [] } + } + + const messageIds = messages.map((m) => m.id) + const blocks = await db.message_blocks.where('messageId').anyOf(messageIds).toArray() + + // Ensure block IDs are strings for consistency + const messagesWithBlockIds = messages.map((m) => ({ + ...m, + blocks: m.blocks?.map(String) || [] + })) + + return { messages: messagesWithBlockIds, blocks: blocks || [] } + } catch (error) { + logger.error(`Failed to fetch messages for topic ${topicId}:`, error as Error) + throw error + } + } + + async getRawTopic(topicId: string): Promise<{ id: string; messages: Message[] } | undefined> { + try { + return await db.topics.get(topicId) + } catch (error) { + logger.error(`Failed to get raw topic ${topicId}:`, error as Error) + throw error + } + } + + // ============ Write Operations ============ + async appendMessage(topicId: string, message: Message, blocks: MessageBlock[], insertIndex?: number): Promise { + try { + await db.transaction('rw', db.topics, db.message_blocks, async () => { + // Save blocks first + if (blocks.length > 0) { + await db.message_blocks.bulkPut(blocks) + } + + // Get or create topic + let topic = await db.topics.get(topicId) + if (!topic) { + await db.topics.add({ id: topicId, messages: [] }) + topic = await db.topics.get(topicId) + } + + if (!topic) { + throw new Error(`Failed to create topic ${topicId}`) + } + + const updatedMessages = [...(topic.messages || [])] + + // Check if message already exists + const existingIndex = updatedMessages.findIndex((m) => m.id === message.id) + if (existingIndex !== -1) { + updatedMessages[existingIndex] = message + } else { + // Insert at specific index or append + if (insertIndex !== undefined && insertIndex >= 0 && insertIndex <= updatedMessages.length) { + updatedMessages.splice(insertIndex, 0, message) + } else { + updatedMessages.push(message) + } + } + + await db.topics.update(topicId, { messages: updatedMessages }) + }) + + store.dispatch(updateTopicUpdatedAt({ topicId })) + } catch (error) { + logger.error(`Failed to append message to topic ${topicId}:`, error as Error) + throw error + } + } + + async updateMessage(topicId: string, messageId: string, updates: Partial): Promise { + try { + await db.transaction('rw', db.topics, async () => { + await db.topics + .where('id') + .equals(topicId) + .modify((topic) => { + if (!topic || !topic.messages) return + + const messageIndex = topic.messages.findIndex((m) => m.id === messageId) + if (messageIndex !== -1) { + Object.assign(topic.messages[messageIndex], updates) + } + }) + }) + + store.dispatch(updateTopicUpdatedAt({ topicId })) + } catch (error) { + logger.error(`Failed to update message ${messageId} in topic ${topicId}:`, error as Error) + throw error + } + } + + async updateMessageAndBlocks( + topicId: string, + messageUpdates: Partial & Pick, + blocksToUpdate: MessageBlock[] + ): Promise { + try { + await db.transaction('rw', db.topics, db.message_blocks, async () => { + // Update blocks + if (blocksToUpdate.length > 0) { + await db.message_blocks.bulkPut(blocksToUpdate) + } + + // Update message if there are actual changes beyond id and topicId + const keysToUpdate = Object.keys(messageUpdates).filter((key) => key !== 'id' && key !== 'topicId') + if (keysToUpdate.length > 0) { + await db.topics + .where('id') + .equals(topicId) + .modify((topic) => { + if (!topic || !topic.messages) return + + const messageIndex = topic.messages.findIndex((m) => m.id === messageUpdates.id) + if (messageIndex !== -1) { + keysToUpdate.forEach((key) => { + ;(topic.messages[messageIndex] as any)[key] = (messageUpdates as any)[key] + }) + } + }) + } + }) + + store.dispatch(updateTopicUpdatedAt({ topicId })) + } catch (error) { + logger.error(`Failed to update message and blocks for ${messageUpdates.id}:`, error as Error) + throw error + } + } + + async deleteMessage(topicId: string, messageId: string): Promise { + try { + await db.transaction('rw', db.topics, db.message_blocks, db.files, async () => { + const topic = await db.topics.get(topicId) + if (!topic) return + + const messageIndex = topic.messages.findIndex((m) => m.id === messageId) + if (messageIndex === -1) return + + const message = topic.messages[messageIndex] + const blockIds = message.blocks || [] + + // Delete blocks and handle files + if (blockIds.length > 0) { + const blocks = await db.message_blocks.where('id').anyOf(blockIds).toArray() + const files = blocks + .filter((block) => block.type === 'file' || block.type === 'image') + .map((block: any) => block.file) + .filter((file) => file !== undefined) + + // Clean up files + if (!isEmpty(files)) { + await Promise.all(files.map((file) => FileManager.deleteFile(file.id, false))) + } + + await db.message_blocks.bulkDelete(blockIds) + } + + // Remove message from topic + topic.messages.splice(messageIndex, 1) + await db.topics.update(topicId, { messages: topic.messages }) + }) + + store.dispatch(updateTopicUpdatedAt({ topicId })) + } catch (error) { + logger.error(`Failed to delete message ${messageId} from topic ${topicId}:`, error as Error) + throw error + } + } + + async deleteMessages(topicId: string, messageIds: string[]): Promise { + try { + await db.transaction('rw', db.topics, db.message_blocks, db.files, async () => { + const topic = await db.topics.get(topicId) + if (!topic) return + + // Collect all block IDs from messages to be deleted + const allBlockIds: string[] = [] + const messagesToDelete: Message[] = [] + + for (const messageId of messageIds) { + const message = topic.messages.find((m) => m.id === messageId) + if (message) { + messagesToDelete.push(message) + if (message.blocks && message.blocks.length > 0) { + allBlockIds.push(...message.blocks) + } + } + } + + // Delete blocks and handle files + if (allBlockIds.length > 0) { + const blocks = await db.message_blocks.where('id').anyOf(allBlockIds).toArray() + const files = blocks + .filter((block) => block.type === 'file' || block.type === 'image') + .map((block: any) => block.file) + .filter((file) => file !== undefined) + + // Clean up files + if (!isEmpty(files)) { + await Promise.all(files.map((file) => FileManager.deleteFile(file.id, false))) + } + await db.message_blocks.bulkDelete(allBlockIds) + } + + // Remove messages from topic + const remainingMessages = topic.messages.filter((m) => !messageIds.includes(m.id)) + await db.topics.update(topicId, { messages: remainingMessages }) + }) + store.dispatch(updateTopicUpdatedAt({ topicId })) + } catch (error) { + logger.error(`Failed to delete messages from topic ${topicId}:`, error as Error) + throw error + } + } + + // ============ Block Operations ============ + + async updateBlocks(blocks: MessageBlock[]): Promise { + try { + if (blocks.length === 0) return + await db.message_blocks.bulkPut(blocks) + } catch (error) { + logger.error('Failed to update blocks:', error as Error) + throw error + } + } + + async updateSingleBlock(blockId: string, updates: Partial): Promise { + try { + await db.message_blocks.update(blockId, updates) + } catch (error) { + logger.error(`Failed to update block ${blockId}:`, error as Error) + throw error + } + } + + async bulkAddBlocks(blocks: MessageBlock[]): Promise { + try { + if (blocks.length === 0) return + await db.message_blocks.bulkAdd(blocks) + } catch (error) { + logger.error('Failed to bulk add blocks:', error as Error) + throw error + } + } + + async deleteBlocks(blockIds: string[]): Promise { + try { + if (blockIds.length === 0) return + + // Get blocks to find associated files + const blocks = await db.message_blocks.where('id').anyOf(blockIds).toArray() + const files = blocks + .filter((block) => block.type === 'file' || block.type === 'image') + .map((block: any) => block.file) + .filter((file) => file !== undefined) + + // Clean up files + if (!isEmpty(files)) { + await Promise.all(files.map((file) => FileManager.deleteFile(file.id, false))) + } + + await db.message_blocks.bulkDelete(blockIds) + } catch (error) { + logger.error('Failed to delete blocks:', error as Error) + throw error + } + } + + // ============ Batch Operations ============ + + async clearMessages(topicId: string): Promise { + try { + await db.transaction('rw', db.topics, db.message_blocks, db.files, async () => { + const topic = await db.topics.get(topicId) + if (!topic) return + + // Get all block IDs + const blockIds = topic.messages.flatMap((m) => m.blocks || []) + + // Delete blocks and handle files + if (blockIds.length > 0) { + const blocks = await db.message_blocks.where('id').anyOf(blockIds).toArray() + const files = blocks + .filter((block) => block.type === 'file' || block.type === 'image') + .map((block: any) => block.file) + .filter((file) => file !== undefined) + + if (!isEmpty(files)) { + await Promise.all(files.map((file) => FileManager.deleteFile(file.id, false))) + } + + await db.message_blocks.bulkDelete(blockIds) + } + + // Clear messages + await db.topics.update(topicId, { messages: [] }) + }) + + store.dispatch(updateTopicUpdatedAt({ topicId })) + } catch (error) { + logger.error(`Failed to clear messages for topic ${topicId}:`, error as Error) + throw error + } + } + + async topicExists(topicId: string): Promise { + try { + const topic = await db.topics.get(topicId) + return !!topic + } catch (error) { + logger.error(`Failed to check if topic ${topicId} exists:`, error as Error) + return false + } + } + + async ensureTopic(topicId: string): Promise { + try { + const exists = await this.topicExists(topicId) + if (!exists) { + await db.topics.add({ id: topicId, messages: [] }) + } + } catch (error) { + logger.error(`Failed to ensure topic ${topicId} exists:`, error as Error) + throw error + } + } + + // ============ File Operations ============ + + async updateFileCount(fileId: string, delta: number, deleteIfZero: boolean = false): Promise { + try { + await db.transaction('rw', db.files, async () => { + const file = await db.files.get(fileId) + + if (!file) { + logger.warn(`File ${fileId} not found for count update`) + return + } + + const newCount = (file.count || 0) + delta + + if (newCount <= 0 && deleteIfZero) { + // Delete the file when count reaches 0 or below + await FileManager.deleteFile(fileId, false) + await db.files.delete(fileId) + logger.info(`Deleted file ${fileId} as reference count reached ${newCount}`) + } else { + // Update the count + await db.files.update(fileId, { count: Math.max(0, newCount) }) + logger.debug(`Updated file ${fileId} count to ${Math.max(0, newCount)}`) + } + }) + } catch (error) { + logger.error(`Failed to update file count for ${fileId}:`, error as Error) + throw error + } + } + + async updateFileCounts(files: Array<{ id: string; delta: number; deleteIfZero?: boolean }>): Promise { + try { + for (const file of files) { + await this.updateFileCount(file.id, file.delta, file.deleteIfZero || false) + } + } catch (error) { + logger.error('Failed to update file counts:', error as Error) + throw error + } + } +} diff --git a/src/renderer/src/services/db/README.md b/src/renderer/src/services/db/README.md new file mode 100644 index 0000000000..930f33c06b --- /dev/null +++ b/src/renderer/src/services/db/README.md @@ -0,0 +1,89 @@ +# Unified Data Access Layer + +This module provides a unified interface for accessing message data from different sources: +- **DexieMessageDataSource**: Local IndexedDB storage for regular chat messages +- **AgentMessageDataSource**: Backend IPC storage for agent session messages + +## Architecture + +``` +dbService (Facade) + ├── Determines data source based on topicId + ├── Routes to DexieMessageDataSource (regular chats) + └── Routes to AgentMessageDataSource (agent sessions) +``` + +## Usage + +```typescript +import { dbService } from '@renderer/services/db' + +// Fetch messages (automatically routes to correct source) +const { messages, blocks } = await dbService.fetchMessages(topicId) + +// Save a message exchange +await dbService.persistExchange(topicId, { + user: { message: userMsg, blocks: userBlocks }, + assistant: { message: assistantMsg, blocks: assistantBlocks } +}) + +// Append a single message +await dbService.appendMessage(topicId, message, blocks) + +// Check if topic exists +const exists = await dbService.topicExists(topicId) +``` + +## Topic ID Convention + +- Regular chat topics: Any string ID (e.g., "uuid-1234-5678") +- Agent session topics: Prefixed with "agent-session:" (e.g., "agent-session:session-123") + +## Key Features + +1. **Transparent Routing**: The facade automatically routes to the appropriate data source +2. **Consistent API**: Same methods work for both regular chats and agent sessions +3. **Type Safety**: Full TypeScript support with proper interfaces +4. **Error Handling**: Comprehensive error logging and propagation +5. **Extensibility**: Easy to add new data sources (e.g., cloud storage) + +## Implementation Status + +### DexieMessageDataSource ✅ +- Full CRUD operations for messages and blocks +- Transaction support +- File cleanup on deletion +- Redux state updates + +### AgentMessageDataSource ✅ +- Fetch messages from backend +- Persist message exchanges +- Limited update/delete operations (by design) +- IPC communication with backend + +## Migration Guide + +### Before (Direct DB access): +```typescript +// In thunks +if (isAgentSessionTopicId(topicId)) { + // Special handling for agent sessions + const messages = await window.electron.ipcRenderer.invoke(...) +} else { + // Regular DB access + const topic = await db.topics.get(topicId) +} +``` + +### After (Unified access): +```typescript +// In thunks +const { messages, blocks } = await dbService.fetchMessages(topicId) +// No need to check topic type! +``` + +## Next Steps + +Phase 2: Update Redux thunks to use dbService +Phase 3: Update components to use unified hooks +Phase 4: Remove AgentSessionMessages component \ No newline at end of file diff --git a/src/renderer/src/services/db/index.ts b/src/renderer/src/services/db/index.ts new file mode 100644 index 0000000000..245a5b67e3 --- /dev/null +++ b/src/renderer/src/services/db/index.ts @@ -0,0 +1,19 @@ +/** + * Unified data access layer for messages + * Provides a consistent API for accessing messages from different sources + * (Dexie/IndexedDB for regular chats, IPC/Backend for agent sessions) + */ + +// Export main service +export { DbService,dbService } from './DbService' + +// Export types +export type { MessageDataSource, MessageExchange } from './types' +export { + buildAgentSessionTopicId, + extractSessionId, + isAgentSessionTopicId} from './types' + +// Export implementations (for testing or direct access if needed) +export { AgentMessageDataSource } from './AgentMessageDataSource' +export { DexieMessageDataSource } from './DexieMessageDataSource' diff --git a/src/renderer/src/services/db/types.ts b/src/renderer/src/services/db/types.ts new file mode 100644 index 0000000000..3852bcde21 --- /dev/null +++ b/src/renderer/src/services/db/types.ts @@ -0,0 +1,143 @@ +import type { Message, MessageBlock } from '@renderer/types/newMessage' + +/** + * Message exchange data structure for persisting user-assistant conversations + */ +export interface MessageExchange { + user?: { + message: Message + blocks: MessageBlock[] + } + assistant?: { + message: Message + blocks: MessageBlock[] + } + // For agent sessions + agentSessionId?: string +} + +/** + * Unified interface for message data operations + * Implementations can be backed by Dexie, IPC, or other storage mechanisms + */ +export interface MessageDataSource { + // ============ Read Operations ============ + /** + * Fetch all messages and blocks for a topic + */ + fetchMessages( + topicId: string, + forceReload?: boolean + ): Promise<{ + messages: Message[] + blocks: MessageBlock[] + }> + + /** + * Get raw topic data (just id and messages) + */ + getRawTopic(topicId: string): Promise<{ id: string; messages: Message[] } | undefined> + + // ============ Write Operations ============ + /** + * Append a single message with its blocks + */ + appendMessage(topicId: string, message: Message, blocks: MessageBlock[], insertIndex?: number): Promise + + /** + * Update an existing message + */ + updateMessage(topicId: string, messageId: string, updates: Partial): Promise + + /** + * Update existing message and its blocks + */ + updateMessageAndBlocks( + topicId: string, + messageUpdates: Partial & Pick, + blocksToUpdate: MessageBlock[] + ): Promise + + /** + * Delete a single message and its blocks + */ + deleteMessage(topicId: string, messageId: string): Promise + + /** + * Delete multiple messages and their blocks + */ + deleteMessages(topicId: string, messageIds: string[]): Promise + + // ============ Block Operations ============ + /** + * Update multiple blocks + */ + updateBlocks(blocks: MessageBlock[]): Promise + + /** + * Update single block + */ + updateSingleBlock?(blockId: string, updates: Partial): Promise + + /** + * Bulk add blocks (for cloning operations) + */ + bulkAddBlocks?(blocks: MessageBlock[]): Promise + + /** + * Delete multiple blocks + */ + deleteBlocks(blockIds: string[]): Promise + + // ============ Batch Operations ============ + /** + * Clear all messages in a topic + */ + clearMessages(topicId: string): Promise + + /** + * Check if topic exists + */ + topicExists(topicId: string): Promise + + /** + * Create or ensure topic exists + */ + ensureTopic(topicId: string): Promise + + // ============ File Operations (Optional) ============ + + /** + * Update file reference count + * @param fileId - The file ID to update + * @param delta - The change in reference count (positive or negative) + * @param deleteIfZero - Whether to delete the file when count reaches 0 + */ + updateFileCount?(fileId: string, delta: number, deleteIfZero?: boolean): Promise + + /** + * Update multiple file reference counts + */ + updateFileCounts?(files: Array<{ id: string; delta: number; deleteIfZero?: boolean }>): Promise +} + +/** + * Type guard to check if a topic ID is for an agent session + */ +export function isAgentSessionTopicId(topicId: string): boolean { + return topicId.startsWith('agent-session:') +} + +/** + * Extract session ID from agent session topic ID + */ +export function extractSessionId(topicId: string): string { + return topicId.replace('agent-session:', '') +} + +/** + * Build agent session topic ID from session ID + */ +export function buildAgentSessionTopicId(sessionId: string): string { + return `agent-session:${sessionId}` +} diff --git a/src/renderer/src/store/thunk/messageThunk.ts b/src/renderer/src/store/thunk/messageThunk.ts index 30541832ea..c86823999a 100644 --- a/src/renderer/src/store/thunk/messageThunk.ts +++ b/src/renderer/src/store/thunk/messageThunk.ts @@ -1,5 +1,6 @@ import { loggerService } from '@logger' import { AiSdkToChunkAdapter } from '@renderer/aiCore/chunk/AiSdkToChunkAdapter' +import { featureFlags } from '@renderer/config/featureFlags' import db from '@renderer/databases' import FileManager from '@renderer/services/FileManager' import { BlockManager } from '@renderer/services/messageStreaming/BlockManager' @@ -10,13 +11,12 @@ import { createStreamProcessor, type StreamProcessorCallbacks } from '@renderer/ import store from '@renderer/store' import { updateTopicUpdatedAt } from '@renderer/store/assistants' import { type ApiServerConfig, type Assistant, type FileMetadata, type Model, type Topic } from '@renderer/types' -import type { AgentPersistedMessage } from '@renderer/types/agent' import { ChunkType } from '@renderer/types/chunk' import type { FileMessageBlock, ImageMessageBlock, Message, MessageBlock } from '@renderer/types/newMessage' import { AssistantMessageStatus, MessageBlockStatus, MessageBlockType } from '@renderer/types/newMessage' import { uuid } from '@renderer/utils' import { addAbortController } from '@renderer/utils/abortController' -import { isAgentSessionTopicId } from '@renderer/utils/agentSession' +import { buildAgentSessionTopicId, isAgentSessionTopicId } from '@renderer/utils/agentSession' import { createAssistantMessage, createTranslationBlock, @@ -34,6 +34,18 @@ import { LRUCache } from 'lru-cache' import type { AppDispatch, RootState } from '../index' import { removeManyBlocks, updateOneBlock, upsertManyBlocks, upsertOneBlock } from '../messageBlock' import { newMessagesActions, selectMessagesForTopic } from '../newMessage' +import { + bulkAddBlocksV2, + clearMessagesFromDBV2, + deleteMessageFromDBV2, + deleteMessagesFromDBV2, + loadTopicMessagesThunkV2, + saveMessageAndBlocksToDBV2, + updateBlocksV2, + updateFileCountV2, + updateMessageV2, + updateSingleBlockV2 +} from './messageThunk.v2' const logger = loggerService.withContext('MessageThunk') @@ -190,12 +202,23 @@ const createAgentMessageStream = async ( } // TODO: 后续可以将db操作移到Listener Middleware中 export const saveMessageAndBlocksToDB = async (message: Message, blocks: MessageBlock[], messageIndex: number = -1) => { + // Use V2 implementation if feature flag is enabled + if (featureFlags.USE_UNIFIED_DB_SERVICE) { + return saveMessageAndBlocksToDBV2(message.topicId, message, blocks, messageIndex) + } + + // Original implementation try { if (isAgentSessionTopicId(message.topicId)) { return } if (blocks.length > 0) { - await db.message_blocks.bulkPut(blocks) + // Use V2 implementation if feature flag is enabled + if (featureFlags.USE_UNIFIED_DB_SERVICE) { + await updateBlocksV2(blocks) + } else { + await db.message_blocks.bulkPut(blocks) + } } const topic = await db.topics.get(message.topicId) if (topic) { @@ -232,7 +255,12 @@ const updateExistingMessageAndBlocksInDB = async ( await db.transaction('rw', db.topics, db.message_blocks, async () => { // Always update blocks if provided if (updatedBlocks.length > 0) { - await db.message_blocks.bulkPut(updatedBlocks) + // Use V2 implementation if feature flag is enabled + if (featureFlags.USE_UNIFIED_DB_SERVICE) { + await updateBlocksV2(updatedBlocks) + } else { + await db.message_blocks.bulkPut(updatedBlocks) + } } // Check if there are message properties to update beyond id and topicId @@ -301,7 +329,12 @@ const getBlockThrottler = (id: string) => { }) blockUpdateRafs.set(id, rafId) - await db.message_blocks.update(id, blockUpdate) + // Use V2 implementation if feature flag is enabled + if (featureFlags.USE_UNIFIED_DB_SERVICE) { + await updateSingleBlockV2(id, blockUpdate) + } else { + await db.message_blocks.update(id, blockUpdate) + } }, 150) blockUpdateThrottlers.set(id, throttler) @@ -470,13 +503,17 @@ const fetchAndProcessAgentResponseImpl = async ( text: Promise.resolve('') }) - await persistAgentExchange({ - getState, - agentSession, - userMessageId, - assistantMessageId: assistantMessage.id, - latestAgentSessionId - }) + // No longer need persistAgentExchange here since: + // 1. User message is already saved via appendMessage when created + // 2. Assistant message is saved via appendMessage when created + // 3. Updates during streaming are saved via updateMessageAndBlocks + // This eliminates the duplicate save issue + + // Only persist the agentSessionId update if it changed + if (latestAgentSessionId) { + logger.info(`Agent session ID updated to: ${latestAgentSessionId}`) + // In the future, you might want to update some session metadata here + } } catch (error: any) { logger.error('Error in fetchAndProcessAgentResponseImpl:', error) try { @@ -489,73 +526,9 @@ const fetchAndProcessAgentResponseImpl = async ( } } -interface PersistAgentExchangeParams { - getState: () => RootState - agentSession: AgentSessionContext - userMessageId: string - assistantMessageId: string - latestAgentSessionId: string -} - -const persistAgentExchange = async ({ - getState, - agentSession, - userMessageId, - assistantMessageId, - latestAgentSessionId -}: PersistAgentExchangeParams) => { - if (!window.electron?.ipcRenderer) { - return - } - - try { - const state = getState() - const userMessage = state.messages.entities[userMessageId] - const assistantMessage = state.messages.entities[assistantMessageId] - - if (!userMessage || !assistantMessage) { - logger.warn('persistAgentExchange: missing user or assistant message entity') - return - } - - const userPersistedPayload = createPersistedMessagePayload(userMessage, state) - const assistantPersistedPayload = createPersistedMessagePayload(assistantMessage, state) - - await window.electron.ipcRenderer.invoke(IpcChannel.AgentMessage_PersistExchange, { - sessionId: agentSession.sessionId, - agentSessionId: latestAgentSessionId || '', - user: userPersistedPayload ? { payload: userPersistedPayload } : undefined, - assistant: assistantPersistedPayload ? { payload: assistantPersistedPayload } : undefined - }) - } catch (error) { - logger.warn('Failed to persist agent exchange', error as Error) - } -} - -const createPersistedMessagePayload = ( - message: Message | undefined, - state: RootState -): AgentPersistedMessage | undefined => { - if (!message) { - return undefined - } - - try { - const clonedMessage = JSON.parse(JSON.stringify(message)) as Message - const blockEntities = (message.blocks || []) - .map((blockId) => state.messageBlocks.entities[blockId]) - .filter((block): block is MessageBlock => Boolean(block)) - .map((block) => JSON.parse(JSON.stringify(block)) as MessageBlock) - - return { - message: clonedMessage, - blocks: blockEntities - } - } catch (error) { - logger.warn('Failed to build persisted payload for message', error as Error) - return undefined - } -} +// Removed persistAgentExchange and createPersistedMessagePayload functions +// These are no longer needed since messages are saved immediately via appendMessage +// and updated during streaming via updateMessageAndBlocks // --- Helper Function for Multi-Model Dispatch --- // 多模型创建和发送请求的逻辑,用于用户消息多模型发送和重发 @@ -782,6 +755,52 @@ export const sendMessage = } } +/** + * Loads agent session messages from backend + */ +export const loadAgentSessionMessagesThunk = + (sessionId: string) => async (dispatch: AppDispatch, getState: () => RootState) => { + const topicId = buildAgentSessionTopicId(sessionId) + + try { + dispatch(newMessagesActions.setTopicLoading({ topicId, loading: true })) + + // Fetch from agent backend + const historicalMessages = await window.electron?.ipcRenderer.invoke(IpcChannel.AgentMessage_GetHistory, { + sessionId + }) + + if (historicalMessages && Array.isArray(historicalMessages)) { + const messages: Message[] = [] + const blocks: MessageBlock[] = [] + + for (const persistedMsg of historicalMessages) { + if (persistedMsg?.message) { + messages.push(persistedMsg.message) + if (persistedMsg.blocks && persistedMsg.blocks.length > 0) { + blocks.push(...persistedMsg.blocks) + } + } + } + + // Update Redux store + if (blocks.length > 0) { + dispatch(upsertManyBlocks(blocks)) + } + dispatch(newMessagesActions.messagesReceived({ topicId, messages })) + + logger.info(`Loaded ${messages.length} messages for agent session ${sessionId}`) + } else { + dispatch(newMessagesActions.messagesReceived({ topicId, messages: [] })) + } + } catch (error) { + logger.error(`Failed to load agent session messages for ${sessionId}:`, error as Error) + dispatch(newMessagesActions.messagesReceived({ topicId, messages: [] })) + } finally { + dispatch(newMessagesActions.setTopicLoading({ topicId, loading: false })) + } + } + /** * Loads messages and their blocks for a specific topic from the database * and updates the Redux store. @@ -789,10 +808,26 @@ export const sendMessage = export const loadTopicMessagesThunk = (topicId: string, forceReload: boolean = false) => async (dispatch: AppDispatch, getState: () => RootState) => { + // Use V2 implementation if feature flag is enabled + if (featureFlags.USE_UNIFIED_DB_SERVICE) { + return loadTopicMessagesThunkV2(topicId, forceReload)(dispatch, getState) + } + + // Original implementation const state = getState() const topicMessagesExist = !!state.messages.messageIdsByTopic[topicId] dispatch(newMessagesActions.setCurrentTopicId(topicId)) + // Check if it's an agent session topic + if (isAgentSessionTopicId(topicId)) { + if (topicMessagesExist && !forceReload) { + return // Keep existing messages in memory + } + // Load from agent backend instead of local DB + const sessionId = topicId.replace('agent-session:', '') + return dispatch(loadAgentSessionMessagesThunk(sessionId)) + } + if (topicMessagesExist && !forceReload) { return } @@ -843,12 +878,19 @@ export const deleteSingleMessageThunk = try { dispatch(newMessagesActions.removeMessage({ topicId, messageId })) cleanupMultipleBlocks(dispatch, blockIdsToDelete) - await db.message_blocks.bulkDelete(blockIdsToDelete) - const topic = await db.topics.get(topicId) - if (topic) { - const finalMessagesToSave = selectMessagesForTopic(getState(), topicId) - await db.topics.update(topicId, { messages: finalMessagesToSave }) - dispatch(updateTopicUpdatedAt({ topicId })) + + // Use V2 implementation if feature flag is enabled + if (featureFlags.USE_UNIFIED_DB_SERVICE) { + await deleteMessageFromDBV2(topicId, messageId) + } else { + // Original implementation + await db.message_blocks.bulkDelete(blockIdsToDelete) + const topic = await db.topics.get(topicId) + if (topic) { + const finalMessagesToSave = selectMessagesForTopic(getState(), topicId) + await db.topics.update(topicId, { messages: finalMessagesToSave }) + dispatch(updateTopicUpdatedAt({ topicId })) + } } } catch (error) { logger.error(`[deleteSingleMessage] Failed to delete message ${messageId}:`, error as Error) @@ -883,16 +925,24 @@ export const deleteMessageGroupThunk = } const blockIdsToDelete = messagesToDelete.flatMap((m) => m.blocks || []) + const messageIdsToDelete = messagesToDelete.map((m) => m.id) try { dispatch(newMessagesActions.removeMessagesByAskId({ topicId, askId })) cleanupMultipleBlocks(dispatch, blockIdsToDelete) - await db.message_blocks.bulkDelete(blockIdsToDelete) - const topic = await db.topics.get(topicId) - if (topic) { - const finalMessagesToSave = selectMessagesForTopic(getState(), topicId) - await db.topics.update(topicId, { messages: finalMessagesToSave }) - dispatch(updateTopicUpdatedAt({ topicId })) + + // Use V2 implementation if feature flag is enabled + if (featureFlags.USE_UNIFIED_DB_SERVICE) { + await deleteMessagesFromDBV2(topicId, messageIdsToDelete) + } else { + // Original implementation + await db.message_blocks.bulkDelete(blockIdsToDelete) + const topic = await db.topics.get(topicId) + if (topic) { + const finalMessagesToSave = selectMessagesForTopic(getState(), topicId) + await db.topics.update(topicId, { messages: finalMessagesToSave }) + dispatch(updateTopicUpdatedAt({ topicId })) + } } } catch (error) { logger.error(`[deleteMessageGroup] Failed to delete messages with askId ${askId}:`, error as Error) @@ -919,10 +969,16 @@ export const clearTopicMessagesThunk = dispatch(newMessagesActions.clearTopicMessages(topicId)) cleanupMultipleBlocks(dispatch, blockIdsToDelete) - await db.topics.update(topicId, { messages: [] }) - dispatch(updateTopicUpdatedAt({ topicId })) - if (blockIdsToDelete.length > 0) { - await db.message_blocks.bulkDelete(blockIdsToDelete) + // Use V2 implementation if feature flag is enabled + if (featureFlags.USE_UNIFIED_DB_SERVICE) { + await clearMessagesFromDBV2(topicId) + } else { + // Original implementation + await db.topics.update(topicId, { messages: [] }) + dispatch(updateTopicUpdatedAt({ topicId })) + if (blockIdsToDelete.length > 0) { + await db.message_blocks.bulkDelete(blockIdsToDelete) + } } } catch (error) { logger.error(`[clearTopicMessagesThunk] Failed to clear messages for topic ${topicId}:`, error as Error) @@ -1245,7 +1301,12 @@ export const updateTranslationBlockThunk = dispatch(updateOneBlock({ id: blockId, changes })) // 更新数据库 - await db.message_blocks.update(blockId, changes) + // Use V2 implementation if feature flag is enabled + if (featureFlags.USE_UNIFIED_DB_SERVICE) { + await updateSingleBlockV2(blockId, changes) + } else { + await db.message_blocks.update(blockId, changes) + } // Logger.log(`[updateTranslationBlockThunk] Successfully updated translation block ${blockId}.`) } catch (error) { logger.error(`[updateTranslationBlockThunk] Failed to update translation block ${blockId}:`, error as Error) @@ -1458,20 +1519,33 @@ export const cloneMessagesToNewTopicThunk = // Add the NEW blocks if (clonedBlocks.length > 0) { - await db.message_blocks.bulkAdd(clonedBlocks) + // Use V2 implementation if feature flag is enabled + if (featureFlags.USE_UNIFIED_DB_SERVICE) { + await bulkAddBlocksV2(clonedBlocks) + } else { + await db.message_blocks.bulkAdd(clonedBlocks) + } } // Update file counts const uniqueFiles = [...new Map(filesToUpdateCount.map((f) => [f.id, f])).values()] - for (const file of uniqueFiles) { - await db.files - .where('id') - .equals(file.id) - .modify((f) => { - if (f) { - // Ensure file exists before modifying - f.count = (f.count || 0) + 1 - } - }) + if (featureFlags.USE_UNIFIED_DB_SERVICE) { + // Use V2 implementation for file count updates + for (const file of uniqueFiles) { + await updateFileCountV2(file.id, 1, false) + } + } else { + // Original implementation + for (const file of uniqueFiles) { + await db.files + .where('id') + .equals(file.id) + .modify((f) => { + if (f) { + // Ensure file exists before modifying + f.count = (f.count || 0) + 1 + } + }) + } } }) @@ -1525,33 +1599,46 @@ export const updateMessageAndBlocksThunk = } // 2. 更新数据库 (在事务中) - await db.transaction('rw', db.topics, db.message_blocks, async () => { - // Only update topic.messages if there were actual message changes - if (messageUpdates && Object.keys(messageUpdates).length > 0) { - const topic = await db.topics.get(topicId) - if (topic && topic.messages) { - const messageIndex = topic.messages.findIndex((m) => m.id === messageId) - if (messageIndex !== -1) { - Object.assign(topic.messages[messageIndex], messageUpdates) - await db.topics.update(topicId, { messages: topic.messages }) + // Use V2 implementation if feature flag is enabled + if (featureFlags.USE_UNIFIED_DB_SERVICE) { + // Update message properties if provided + if (messageUpdates && Object.keys(messageUpdates).length > 0 && messageId) { + await updateMessageV2(topicId, messageId, messageUpdates) + } + // Update blocks if provided + if (blockUpdatesList.length > 0) { + await updateBlocksV2(blockUpdatesList) + } + } else { + // Original implementation with transaction + await db.transaction('rw', db.topics, db.message_blocks, async () => { + // Only update topic.messages if there were actual message changes + if (messageUpdates && Object.keys(messageUpdates).length > 0) { + const topic = await db.topics.get(topicId) + if (topic && topic.messages) { + const messageIndex = topic.messages.findIndex((m) => m.id === messageId) + if (messageIndex !== -1) { + Object.assign(topic.messages[messageIndex], messageUpdates) + await db.topics.update(topicId, { messages: topic.messages }) + } else { + logger.error( + `[updateMessageAndBlocksThunk] Message ${messageId} not found in DB topic ${topicId} for property update.` + ) + throw new Error(`Message ${messageId} not found in DB topic ${topicId} for property update.`) + } } else { logger.error( - `[updateMessageAndBlocksThunk] Message ${messageId} not found in DB topic ${topicId} for property update.` + `[updateMessageAndBlocksThunk] Topic ${topicId} not found or empty for message property update.` ) - throw new Error(`Message ${messageId} not found in DB topic ${topicId} for property update.`) + throw new Error(`Topic ${topicId} not found or empty for message property update.`) } - } else { - logger.error( - `[updateMessageAndBlocksThunk] Topic ${topicId} not found or empty for message property update.` - ) - throw new Error(`Topic ${topicId} not found or empty for message property update.`) } - } - if (blockUpdatesList.length > 0) { - await db.message_blocks.bulkPut(blockUpdatesList) - } - }) + if (blockUpdatesList.length > 0) { + await db.message_blocks.bulkPut(blockUpdatesList) + } + }) + } dispatch(updateTopicUpdatedAt({ topicId })) } catch (error) { diff --git a/src/renderer/src/store/thunk/messageThunk.v2.ts b/src/renderer/src/store/thunk/messageThunk.v2.ts new file mode 100644 index 0000000000..e017c64e9e --- /dev/null +++ b/src/renderer/src/store/thunk/messageThunk.v2.ts @@ -0,0 +1,228 @@ +/** + * V2 implementations of message thunk functions using the unified DbService + * These implementations will be gradually rolled out using feature flags + */ + +import { loggerService } from '@logger' +import { dbService } from '@renderer/services/db' +import type { Message, MessageBlock } from '@renderer/types/newMessage' + +import type { AppDispatch, RootState } from '../index' +import { upsertManyBlocks } from '../messageBlock' +import { newMessagesActions } from '../newMessage' + +const logger = loggerService.withContext('MessageThunkV2') + +// ================================================================= +// Phase 2.1 - Batch 1: Read-only operations (lowest risk) +// ================================================================= + +/** + * Load messages for a topic using unified DbService + * This is the V2 implementation that will replace the original + */ +export const loadTopicMessagesThunkV2 = + (topicId: string, forceReload: boolean = false) => + async (dispatch: AppDispatch, getState: () => RootState) => { + const state = getState() + + // Skip if already cached and not forcing reload + if (!forceReload && state.messages.messageIdsByTopic[topicId]) { + logger.info('Messages already cached for topic', { topicId }) + return + } + + try { + dispatch(newMessagesActions.setTopicLoading({ topicId, loading: true })) + + // Unified call - no need to check isAgentSessionTopicId + const { messages, blocks } = await dbService.fetchMessages(topicId) + + logger.info('Loaded messages via DbService', { + topicId, + messageCount: messages.length, + blockCount: blocks.length + }) + + // Update Redux state with fetched data + if (blocks.length > 0) { + dispatch(upsertManyBlocks(blocks)) + } + dispatch(newMessagesActions.messagesReceived({ topicId, messages })) + } catch (error) { + logger.error(`Failed to load messages for topic ${topicId}:`, error as Error) + // Could dispatch an error action here if needed + } finally { + dispatch(newMessagesActions.setTopicLoading({ topicId, loading: false })) + dispatch(newMessagesActions.setTopicFulfilled({ topicId, fulfilled: true })) + } + } + +/** + * Get raw topic data using unified DbService + * Returns topic with messages array + */ +export const getRawTopicV2 = async (topicId: string): Promise<{ id: string; messages: Message[] } | undefined> => { + try { + const rawTopic = await dbService.getRawTopic(topicId) + logger.info('Retrieved raw topic via DbService', { topicId, found: !!rawTopic }) + return rawTopic + } catch (error) { + logger.error('Failed to get raw topic:', { topicId, error }) + return undefined + } +} + +// ================================================================= +// Phase 2.2 - Batch 2: Helper functions +// ================================================================= + +/** + * Update file reference count + * Only applies to Dexie data source, no-op for agent sessions + */ +export const updateFileCountV2 = async ( + fileId: string, + delta: number, + deleteIfZero: boolean = false +): Promise => { + try { + // Pass all parameters to dbService, including deleteIfZero + await dbService.updateFileCount(fileId, delta, deleteIfZero) + logger.info('Updated file count', { fileId, delta, deleteIfZero }) + } catch (error) { + logger.error('Failed to update file count:', { fileId, delta, error }) + throw error + } +} + +// ================================================================= +// Phase 2.3 - Batch 3: Delete operations +// ================================================================= + +/** + * Delete a single message from database + */ +export const deleteMessageFromDBV2 = async (topicId: string, messageId: string): Promise => { + try { + await dbService.deleteMessage(topicId, messageId) + logger.info('Deleted message via DbService', { topicId, messageId }) + } catch (error) { + logger.error('Failed to delete message:', { topicId, messageId, error }) + throw error + } +} + +/** + * Delete multiple messages from database + */ +export const deleteMessagesFromDBV2 = async (topicId: string, messageIds: string[]): Promise => { + try { + await dbService.deleteMessages(topicId, messageIds) + logger.info('Deleted messages via DbService', { topicId, count: messageIds.length }) + } catch (error) { + logger.error('Failed to delete messages:', { topicId, messageIds, error }) + throw error + } +} + +/** + * Clear all messages from a topic + */ +export const clearMessagesFromDBV2 = async (topicId: string): Promise => { + try { + await dbService.clearMessages(topicId) + logger.info('Cleared all messages via DbService', { topicId }) + } catch (error) { + logger.error('Failed to clear messages:', { topicId, error }) + throw error + } +} + +// ================================================================= +// Phase 2.4 - Batch 4: Complex write operations +// ================================================================= + +/** + * Save a message and its blocks to database + * Uses unified interface, no need for isAgentSessionTopicId check + */ +export const saveMessageAndBlocksToDBV2 = async ( + topicId: string, + message: Message, + blocks: MessageBlock[], + messageIndex: number = -1 +): Promise => { + try { + // Direct call without conditional logic, now with messageIndex + await dbService.appendMessage(topicId, message, blocks, messageIndex) + logger.info('Saved message and blocks via DbService', { + topicId, + messageId: message.id, + blockCount: blocks.length, + messageIndex + }) + } catch (error) { + logger.error('Failed to save message and blocks:', { topicId, messageId: message.id, error }) + throw error + } +} + +// Note: sendMessageV2 would be implemented here but it's more complex +// and would require more of the supporting code from messageThunk.ts + +// ================================================================= +// Phase 2.5 - Batch 5: Update operations +// ================================================================= + +/** + * Update a message in the database + */ +export const updateMessageV2 = async (topicId: string, messageId: string, updates: Partial): Promise => { + try { + await dbService.updateMessage(topicId, messageId, updates) + logger.info('Updated message via DbService', { topicId, messageId }) + } catch (error) { + logger.error('Failed to update message:', { topicId, messageId, error }) + throw error + } +} + +/** + * Update a single message block + */ +export const updateSingleBlockV2 = async (blockId: string, updates: Partial): Promise => { + try { + await dbService.updateSingleBlock(blockId, updates) + logger.info('Updated single block via DbService', { blockId }) + } catch (error) { + logger.error('Failed to update single block:', { blockId, error }) + throw error + } +} + +/** + * Bulk add message blocks (for new blocks) + */ +export const bulkAddBlocksV2 = async (blocks: MessageBlock[]): Promise => { + try { + await dbService.bulkAddBlocks(blocks) + logger.info('Bulk added blocks via DbService', { count: blocks.length }) + } catch (error) { + logger.error('Failed to bulk add blocks:', { count: blocks.length, error }) + throw error + } +} + +/** + * Update multiple message blocks (upsert operation) + */ +export const updateBlocksV2 = async (blocks: MessageBlock[]): Promise => { + try { + await dbService.updateBlocks(blocks) + logger.info('Updated blocks via DbService', { count: blocks.length }) + } catch (error) { + logger.error('Failed to update blocks:', { count: blocks.length, error }) + throw error + } +} diff --git a/src/renderer/src/types/index.ts b/src/renderer/src/types/index.ts index be91d56bb2..fc6b8da85a 100644 --- a/src/renderer/src/types/index.ts +++ b/src/renderer/src/types/index.ts @@ -198,8 +198,14 @@ export type Metrics = { time_thinking_millsec?: number } +export enum TopicType { + Chat = 'chat', + Session = 'session' +} + export type Topic = { id: string + type: TopicType assistantId: string name: string createdAt: string