From a17a198912bcd6d399b1a41b9c47fac041f7e155 Mon Sep 17 00:00:00 2001 From: suyao Date: Mon, 22 Sep 2025 20:40:53 +0800 Subject: [PATCH] Add V2 database service integration with feature flag support - Integrate V2 implementations for message operations (save, update, delete, clear) with feature flag control - Add topic creation fallback in DexieMessageDataSource when loading non-existent topics - Create integration status documentation tracking completed and pending V2 migrations - Update Topic type to include TopicType enum for proper topic classification --- .../src/services/db/DexieMessageDataSource.ts | 3 + .../src/services/db/INTEGRATION_STATUS.md | 145 +++++++++++++ src/renderer/src/store/thunk/messageThunk.ts | 190 +++++++++++++----- .../src/store/thunk/messageThunk.v2.ts | 19 +- src/renderer/src/types/index.ts | 6 + 5 files changed, 307 insertions(+), 56 deletions(-) create mode 100644 src/renderer/src/services/db/INTEGRATION_STATUS.md diff --git a/src/renderer/src/services/db/DexieMessageDataSource.ts b/src/renderer/src/services/db/DexieMessageDataSource.ts index 9d02387254..236cb39d7b 100644 --- a/src/renderer/src/services/db/DexieMessageDataSource.ts +++ b/src/renderer/src/services/db/DexieMessageDataSource.ts @@ -23,6 +23,9 @@ export class DexieMessageDataSource implements MessageDataSource { }> { try { const topic = await db.topics.get(topicId) + if (!topic) { + await db.topics.add({ id: topicId, messages: [] }) + } const messages = topic?.messages || [] if (messages.length === 0) { diff --git a/src/renderer/src/services/db/INTEGRATION_STATUS.md b/src/renderer/src/services/db/INTEGRATION_STATUS.md new file mode 100644 index 0000000000..cc5836335b --- /dev/null +++ b/src/renderer/src/services/db/INTEGRATION_STATUS.md @@ -0,0 +1,145 @@ +# V2 Database Service Integration Status + +## Overview +The unified database service (DbService) has been successfully integrated into messageThunk.ts with feature flag support. This allows gradual rollout and easy rollback if issues occur. + +## Feature Flag Control +```javascript +// Enable V2 implementation +VITE_USE_UNIFIED_DB_SERVICE=true yarn dev + +// Or via browser console +localStorage.setItem('featureFlags', JSON.stringify({ USE_UNIFIED_DB_SERVICE: true })) +location.reload() +``` + +## Integration Status + +### ✅ Completed Integrations + +#### Phase 2.1 - Read Operations (STABLE - Tested by user) +- **loadTopicMessagesThunk** → `loadTopicMessagesThunkV2` + - Location: messageThunk.ts:843 + - Status: ✅ STABLE (confirmed by user) + - Handles both regular topics and agent sessions + +#### Phase 2.2 - Helper Functions +- **updateFileCount** → `updateFileCountV2` + - Location: messageThunk.ts:1596 + - Status: ✅ Integrated + - Used in cloneMessagesToNewTopicThunk + +#### Phase 2.3 - Delete Operations +- **deleteSingleMessageThunk** → `deleteMessageFromDBV2` + - Location: messageThunk.ts:931 + - Status: ✅ Integrated + +- **deleteMessageGroupThunk** → `deleteMessagesFromDBV2` + - Location: messageThunk.ts:988 + - Status: ✅ Integrated + +- **clearTopicMessagesThunk** → `clearMessagesFromDBV2` + - Location: messageThunk.ts:1039 + - Status: ✅ Integrated + +#### Phase 2.4 - Write Operations +- **saveMessageAndBlocksToDB** → `saveMessageAndBlocksToDBV2` + - Location: messageThunk.ts:209 + - Status: ✅ Integrated + - Used in sendMessage, branches, and resends + +#### Phase 2.5 - Update Operations +- **updateSingleBlock** → `updateSingleBlockV2` + - Location: messageThunk.ts:326, 1351 + - Status: ✅ Integrated + - Used in throttled block updates and translation updates + +- **bulkAddBlocks** → `bulkAddBlocksV2` + - Location: messageThunk.ts:1587 + - Status: ✅ Integrated + - Used in cloneMessagesToNewTopicThunk + +- **updateBlocks (bulkPut)** → `updateBlocksV2` + - Location: messageThunk.ts:221, 259, 1684 + - Status: ✅ Integrated + - Used in saveMessageAndBlocksToDB, updateExistingMessageAndBlocksInDB, updateMessageAndBlocksThunk + +- **updateMessage** → `updateMessageV2` + - Location: messageThunk.ts:1669 + - Status: ✅ Integrated + - Used in updateMessageAndBlocksThunk + +## Not Yet Integrated + +### Functions Available but Not Used +These V2 functions exist but haven't been integrated yet as their usage patterns are different: + +- **getRawTopicV2** - Available but not directly replacing db.topics.get() calls +- **getTopicV2** - Available but not directly replacing db.topics.get() calls +- **persistExchangeV2** - Available for future use with message exchanges + +### Complex Operations Still Using Original Implementation +These operations involve complex transactions and topic management that would need careful refactoring: + +1. **Topic message list updates** (db.topics.update with messages array) + - Used after delete operations + - Used in resendMessageThunk + - Used in regenerateAssistantMessageThunk + +2. **Transaction-based operations** + - cloneMessagesToNewTopicThunk (partial integration) + - initiateTranslationThunk + - removeBlocksThunk + +## Testing Checklist + +### High Priority (Core Operations) +- [x] Load messages for regular topic +- [x] Load messages for agent session +- [ ] Send message in regular chat +- [ ] Send message in agent session +- [ ] Delete single message +- [ ] Delete message group +- [ ] Clear all messages + +### Medium Priority (Edit Operations) +- [ ] Update message content +- [ ] Update message blocks +- [ ] Update translation blocks +- [ ] File reference counting + +### Low Priority (Advanced Features) +- [ ] Clone messages to new topic +- [ ] Resend messages +- [ ] Regenerate assistant messages +- [ ] Multi-model responses + +## Next Steps + +1. **Test Current Integrations** + - Enable feature flag and test all integrated operations + - Monitor for any errors or performance issues + - Verify data consistency + +2. **Phase 3 Consideration** + - Consider refactoring complex topic update operations + - Evaluate if persistExchangeV2 should be used for user+assistant pairs + - Plan migration of remaining db.topics operations + +3. **Performance Monitoring** + - Compare load times between original and V2 + - Check memory usage with large message histories + - Verify agent session performance + +## Rollback Instructions +If issues occur, disable the feature flag immediately: +```javascript +localStorage.setItem('featureFlags', JSON.stringify({ USE_UNIFIED_DB_SERVICE: false })) +location.reload() +``` + +## Notes +- All V2 implementations maintain backward compatibility +- Agent session operations (IPC-based) are handled transparently +- File operations only apply to Dexie storage, not agent sessions +- Feature flag allows gradual rollout and A/B testing \ No newline at end of file diff --git a/src/renderer/src/store/thunk/messageThunk.ts b/src/renderer/src/store/thunk/messageThunk.ts index c086b5fcad..76df28625d 100644 --- a/src/renderer/src/store/thunk/messageThunk.ts +++ b/src/renderer/src/store/thunk/messageThunk.ts @@ -35,7 +35,18 @@ import { LRUCache } from 'lru-cache' import type { AppDispatch, RootState } from '../index' import { removeManyBlocks, updateOneBlock, upsertManyBlocks, upsertOneBlock } from '../messageBlock' import { newMessagesActions, selectMessagesForTopic } from '../newMessage' -import { loadTopicMessagesThunkV2 } from './messageThunk.v2' +import { + bulkAddBlocksV2, + clearMessagesFromDBV2, + deleteMessageFromDBV2, + deleteMessagesFromDBV2, + loadTopicMessagesThunkV2, + saveMessageAndBlocksToDBV2, + updateBlocksV2, + updateFileCountV2, + updateMessageV2, + updateSingleBlockV2 +} from './messageThunk.v2' const logger = loggerService.withContext('MessageThunk') @@ -192,12 +203,23 @@ const createAgentMessageStream = async ( } // TODO: 后续可以将db操作移到Listener Middleware中 export const saveMessageAndBlocksToDB = async (message: Message, blocks: MessageBlock[], messageIndex: number = -1) => { + // Use V2 implementation if feature flag is enabled + if (featureFlags.USE_UNIFIED_DB_SERVICE) { + return saveMessageAndBlocksToDBV2(message.topicId, message, blocks) + } + + // Original implementation try { if (isAgentSessionTopicId(message.topicId)) { return } if (blocks.length > 0) { - await db.message_blocks.bulkPut(blocks) + // Use V2 implementation if feature flag is enabled + if (featureFlags.USE_UNIFIED_DB_SERVICE) { + await updateBlocksV2(blocks) + } else { + await db.message_blocks.bulkPut(blocks) + } } const topic = await db.topics.get(message.topicId) if (topic) { @@ -234,7 +256,12 @@ const updateExistingMessageAndBlocksInDB = async ( await db.transaction('rw', db.topics, db.message_blocks, async () => { // Always update blocks if provided if (updatedBlocks.length > 0) { - await db.message_blocks.bulkPut(updatedBlocks) + // Use V2 implementation if feature flag is enabled + if (featureFlags.USE_UNIFIED_DB_SERVICE) { + await updateBlocksV2(updatedBlocks) + } else { + await db.message_blocks.bulkPut(updatedBlocks) + } } // Check if there are message properties to update beyond id and topicId @@ -303,7 +330,12 @@ const getBlockThrottler = (id: string) => { }) blockUpdateRafs.set(id, rafId) - await db.message_blocks.update(id, blockUpdate) + // Use V2 implementation if feature flag is enabled + if (featureFlags.USE_UNIFIED_DB_SERVICE) { + await updateSingleBlockV2(id, blockUpdate) + } else { + await db.message_blocks.update(id, blockUpdate) + } }, 150) blockUpdateThrottlers.set(id, throttler) @@ -907,12 +939,19 @@ export const deleteSingleMessageThunk = try { dispatch(newMessagesActions.removeMessage({ topicId, messageId })) cleanupMultipleBlocks(dispatch, blockIdsToDelete) - await db.message_blocks.bulkDelete(blockIdsToDelete) - const topic = await db.topics.get(topicId) - if (topic) { - const finalMessagesToSave = selectMessagesForTopic(getState(), topicId) - await db.topics.update(topicId, { messages: finalMessagesToSave }) - dispatch(updateTopicUpdatedAt({ topicId })) + + // Use V2 implementation if feature flag is enabled + if (featureFlags.USE_UNIFIED_DB_SERVICE) { + await deleteMessageFromDBV2(topicId, messageId) + } else { + // Original implementation + await db.message_blocks.bulkDelete(blockIdsToDelete) + const topic = await db.topics.get(topicId) + if (topic) { + const finalMessagesToSave = selectMessagesForTopic(getState(), topicId) + await db.topics.update(topicId, { messages: finalMessagesToSave }) + dispatch(updateTopicUpdatedAt({ topicId })) + } } } catch (error) { logger.error(`[deleteSingleMessage] Failed to delete message ${messageId}:`, error as Error) @@ -947,16 +986,24 @@ export const deleteMessageGroupThunk = } const blockIdsToDelete = messagesToDelete.flatMap((m) => m.blocks || []) + const messageIdsToDelete = messagesToDelete.map((m) => m.id) try { dispatch(newMessagesActions.removeMessagesByAskId({ topicId, askId })) cleanupMultipleBlocks(dispatch, blockIdsToDelete) - await db.message_blocks.bulkDelete(blockIdsToDelete) - const topic = await db.topics.get(topicId) - if (topic) { - const finalMessagesToSave = selectMessagesForTopic(getState(), topicId) - await db.topics.update(topicId, { messages: finalMessagesToSave }) - dispatch(updateTopicUpdatedAt({ topicId })) + + // Use V2 implementation if feature flag is enabled + if (featureFlags.USE_UNIFIED_DB_SERVICE) { + await deleteMessagesFromDBV2(topicId, messageIdsToDelete) + } else { + // Original implementation + await db.message_blocks.bulkDelete(blockIdsToDelete) + const topic = await db.topics.get(topicId) + if (topic) { + const finalMessagesToSave = selectMessagesForTopic(getState(), topicId) + await db.topics.update(topicId, { messages: finalMessagesToSave }) + dispatch(updateTopicUpdatedAt({ topicId })) + } } } catch (error) { logger.error(`[deleteMessageGroup] Failed to delete messages with askId ${askId}:`, error as Error) @@ -983,10 +1030,16 @@ export const clearTopicMessagesThunk = dispatch(newMessagesActions.clearTopicMessages(topicId)) cleanupMultipleBlocks(dispatch, blockIdsToDelete) - await db.topics.update(topicId, { messages: [] }) - dispatch(updateTopicUpdatedAt({ topicId })) - if (blockIdsToDelete.length > 0) { - await db.message_blocks.bulkDelete(blockIdsToDelete) + // Use V2 implementation if feature flag is enabled + if (featureFlags.USE_UNIFIED_DB_SERVICE) { + await clearMessagesFromDBV2(topicId) + } else { + // Original implementation + await db.topics.update(topicId, { messages: [] }) + dispatch(updateTopicUpdatedAt({ topicId })) + if (blockIdsToDelete.length > 0) { + await db.message_blocks.bulkDelete(blockIdsToDelete) + } } } catch (error) { logger.error(`[clearTopicMessagesThunk] Failed to clear messages for topic ${topicId}:`, error as Error) @@ -1309,7 +1362,12 @@ export const updateTranslationBlockThunk = dispatch(updateOneBlock({ id: blockId, changes })) // 更新数据库 - await db.message_blocks.update(blockId, changes) + // Use V2 implementation if feature flag is enabled + if (featureFlags.USE_UNIFIED_DB_SERVICE) { + await updateSingleBlockV2(blockId, changes) + } else { + await db.message_blocks.update(blockId, changes) + } // Logger.log(`[updateTranslationBlockThunk] Successfully updated translation block ${blockId}.`) } catch (error) { logger.error(`[updateTranslationBlockThunk] Failed to update translation block ${blockId}:`, error as Error) @@ -1522,20 +1580,33 @@ export const cloneMessagesToNewTopicThunk = // Add the NEW blocks if (clonedBlocks.length > 0) { - await db.message_blocks.bulkAdd(clonedBlocks) + // Use V2 implementation if feature flag is enabled + if (featureFlags.USE_UNIFIED_DB_SERVICE) { + await bulkAddBlocksV2(clonedBlocks) + } else { + await db.message_blocks.bulkAdd(clonedBlocks) + } } // Update file counts const uniqueFiles = [...new Map(filesToUpdateCount.map((f) => [f.id, f])).values()] - for (const file of uniqueFiles) { - await db.files - .where('id') - .equals(file.id) - .modify((f) => { - if (f) { - // Ensure file exists before modifying - f.count = (f.count || 0) + 1 - } - }) + if (featureFlags.USE_UNIFIED_DB_SERVICE) { + // Use V2 implementation for file count updates + for (const file of uniqueFiles) { + await updateFileCountV2(file.id, 1, false) + } + } else { + // Original implementation + for (const file of uniqueFiles) { + await db.files + .where('id') + .equals(file.id) + .modify((f) => { + if (f) { + // Ensure file exists before modifying + f.count = (f.count || 0) + 1 + } + }) + } } }) @@ -1589,33 +1660,46 @@ export const updateMessageAndBlocksThunk = } // 2. 更新数据库 (在事务中) - await db.transaction('rw', db.topics, db.message_blocks, async () => { - // Only update topic.messages if there were actual message changes - if (messageUpdates && Object.keys(messageUpdates).length > 0) { - const topic = await db.topics.get(topicId) - if (topic && topic.messages) { - const messageIndex = topic.messages.findIndex((m) => m.id === messageId) - if (messageIndex !== -1) { - Object.assign(topic.messages[messageIndex], messageUpdates) - await db.topics.update(topicId, { messages: topic.messages }) + // Use V2 implementation if feature flag is enabled + if (featureFlags.USE_UNIFIED_DB_SERVICE) { + // Update message properties if provided + if (messageUpdates && Object.keys(messageUpdates).length > 0 && messageId) { + await updateMessageV2(topicId, messageId, messageUpdates) + } + // Update blocks if provided + if (blockUpdatesList.length > 0) { + await updateBlocksV2(blockUpdatesList) + } + } else { + // Original implementation with transaction + await db.transaction('rw', db.topics, db.message_blocks, async () => { + // Only update topic.messages if there were actual message changes + if (messageUpdates && Object.keys(messageUpdates).length > 0) { + const topic = await db.topics.get(topicId) + if (topic && topic.messages) { + const messageIndex = topic.messages.findIndex((m) => m.id === messageId) + if (messageIndex !== -1) { + Object.assign(topic.messages[messageIndex], messageUpdates) + await db.topics.update(topicId, { messages: topic.messages }) + } else { + logger.error( + `[updateMessageAndBlocksThunk] Message ${messageId} not found in DB topic ${topicId} for property update.` + ) + throw new Error(`Message ${messageId} not found in DB topic ${topicId} for property update.`) + } } else { logger.error( - `[updateMessageAndBlocksThunk] Message ${messageId} not found in DB topic ${topicId} for property update.` + `[updateMessageAndBlocksThunk] Topic ${topicId} not found or empty for message property update.` ) - throw new Error(`Message ${messageId} not found in DB topic ${topicId} for property update.`) + throw new Error(`Topic ${topicId} not found or empty for message property update.`) } - } else { - logger.error( - `[updateMessageAndBlocksThunk] Topic ${topicId} not found or empty for message property update.` - ) - throw new Error(`Topic ${topicId} not found or empty for message property update.`) } - } - if (blockUpdatesList.length > 0) { - await db.message_blocks.bulkPut(blockUpdatesList) - } - }) + if (blockUpdatesList.length > 0) { + await db.message_blocks.bulkPut(blockUpdatesList) + } + }) + } dispatch(updateTopicUpdatedAt({ topicId })) } catch (error) { diff --git a/src/renderer/src/store/thunk/messageThunk.v2.ts b/src/renderer/src/store/thunk/messageThunk.v2.ts index 05effdf7c2..4aef651918 100644 --- a/src/renderer/src/store/thunk/messageThunk.v2.ts +++ b/src/renderer/src/store/thunk/messageThunk.v2.ts @@ -53,7 +53,7 @@ export const loadTopicMessagesThunkV2 = } dispatch(newMessagesActions.messagesReceived({ topicId, messages })) } catch (error) { - logger.error(`Failed to load messages for topic ${topicId}:`, error) + logger.error(`Failed to load messages for topic ${topicId}:`, error as Error) // Could dispatch an error action here if needed } finally { dispatch(newMessagesActions.setTopicLoading({ topicId, loading: false })) @@ -95,7 +95,7 @@ export const getTopicV2 = async (topicId: string): Promise => // Construct the full Topic object const topic: Topic = { id: rawTopic.id, - type: isAgentSessionTopicId(topicId) ? TopicType.AgentSession : TopicType.Chat, + type: isAgentSessionTopicId(topicId) ? TopicType.Session : TopicType.Chat, messages: rawTopic.messages, assistantId: '', // These fields would need to be fetched from appropriate source name: '', @@ -261,7 +261,7 @@ export const updateSingleBlockV2 = async (blockId: string, updates: Partial => { try { @@ -272,3 +272,16 @@ export const bulkAddBlocksV2 = async (blocks: MessageBlock[]): Promise => throw error } } + +/** + * Update multiple message blocks (upsert operation) + */ +export const updateBlocksV2 = async (blocks: MessageBlock[]): Promise => { + try { + await dbService.updateBlocks(blocks) + logger.info('Updated blocks via DbService', { count: blocks.length }) + } catch (error) { + logger.error('Failed to update blocks:', { count: blocks.length, error }) + throw error + } +} diff --git a/src/renderer/src/types/index.ts b/src/renderer/src/types/index.ts index 733672d043..e89d25463e 100644 --- a/src/renderer/src/types/index.ts +++ b/src/renderer/src/types/index.ts @@ -198,8 +198,14 @@ export type Metrics = { time_thinking_millsec?: number } +export enum TopicType { + Chat = 'chat', + Session = 'session' +} + export type Topic = { id: string + type: TopicType assistantId: string name: string createdAt: string