Add V2 database service integration with feature flag support

- Integrate V2 implementations for message operations (save, update, delete, clear) with feature flag control
- Add topic creation fallback in DexieMessageDataSource when loading non-existent topics
- Create integration status documentation tracking completed and pending V2 migrations
- Update Topic type to include TopicType enum for proper topic classification
This commit is contained in:
suyao 2025-09-22 20:40:53 +08:00
parent 7e369bef00
commit a17a198912
No known key found for this signature in database
5 changed files with 307 additions and 56 deletions

View File

@ -23,6 +23,9 @@ export class DexieMessageDataSource implements MessageDataSource {
}> { }> {
try { try {
const topic = await db.topics.get(topicId) const topic = await db.topics.get(topicId)
if (!topic) {
await db.topics.add({ id: topicId, messages: [] })
}
const messages = topic?.messages || [] const messages = topic?.messages || []
if (messages.length === 0) { if (messages.length === 0) {

View File

@ -0,0 +1,145 @@
# V2 Database Service Integration Status
## Overview
The unified database service (DbService) has been successfully integrated into messageThunk.ts with feature flag support. This allows gradual rollout and easy rollback if issues occur.
## Feature Flag Control
```javascript
// Enable V2 implementation
VITE_USE_UNIFIED_DB_SERVICE=true yarn dev
// Or via browser console
localStorage.setItem('featureFlags', JSON.stringify({ USE_UNIFIED_DB_SERVICE: true }))
location.reload()
```
## Integration Status
### ✅ Completed Integrations
#### Phase 2.1 - Read Operations (STABLE - Tested by user)
- **loadTopicMessagesThunk**`loadTopicMessagesThunkV2`
- Location: messageThunk.ts:843
- Status: ✅ STABLE (confirmed by user)
- Handles both regular topics and agent sessions
#### Phase 2.2 - Helper Functions
- **updateFileCount**`updateFileCountV2`
- Location: messageThunk.ts:1596
- Status: ✅ Integrated
- Used in cloneMessagesToNewTopicThunk
#### Phase 2.3 - Delete Operations
- **deleteSingleMessageThunk**`deleteMessageFromDBV2`
- Location: messageThunk.ts:931
- Status: ✅ Integrated
- **deleteMessageGroupThunk**`deleteMessagesFromDBV2`
- Location: messageThunk.ts:988
- Status: ✅ Integrated
- **clearTopicMessagesThunk**`clearMessagesFromDBV2`
- Location: messageThunk.ts:1039
- Status: ✅ Integrated
#### Phase 2.4 - Write Operations
- **saveMessageAndBlocksToDB**`saveMessageAndBlocksToDBV2`
- Location: messageThunk.ts:209
- Status: ✅ Integrated
- Used in sendMessage, branches, and resends
#### Phase 2.5 - Update Operations
- **updateSingleBlock**`updateSingleBlockV2`
- Location: messageThunk.ts:326, 1351
- Status: ✅ Integrated
- Used in throttled block updates and translation updates
- **bulkAddBlocks**`bulkAddBlocksV2`
- Location: messageThunk.ts:1587
- Status: ✅ Integrated
- Used in cloneMessagesToNewTopicThunk
- **updateBlocks (bulkPut)**`updateBlocksV2`
- Location: messageThunk.ts:221, 259, 1684
- Status: ✅ Integrated
- Used in saveMessageAndBlocksToDB, updateExistingMessageAndBlocksInDB, updateMessageAndBlocksThunk
- **updateMessage**`updateMessageV2`
- Location: messageThunk.ts:1669
- Status: ✅ Integrated
- Used in updateMessageAndBlocksThunk
## Not Yet Integrated
### Functions Available but Not Used
These V2 functions exist but haven't been integrated yet as their usage patterns are different:
- **getRawTopicV2** - Available but not directly replacing db.topics.get() calls
- **getTopicV2** - Available but not directly replacing db.topics.get() calls
- **persistExchangeV2** - Available for future use with message exchanges
### Complex Operations Still Using Original Implementation
These operations involve complex transactions and topic management that would need careful refactoring:
1. **Topic message list updates** (db.topics.update with messages array)
- Used after delete operations
- Used in resendMessageThunk
- Used in regenerateAssistantMessageThunk
2. **Transaction-based operations**
- cloneMessagesToNewTopicThunk (partial integration)
- initiateTranslationThunk
- removeBlocksThunk
## Testing Checklist
### High Priority (Core Operations)
- [x] Load messages for regular topic
- [x] Load messages for agent session
- [ ] Send message in regular chat
- [ ] Send message in agent session
- [ ] Delete single message
- [ ] Delete message group
- [ ] Clear all messages
### Medium Priority (Edit Operations)
- [ ] Update message content
- [ ] Update message blocks
- [ ] Update translation blocks
- [ ] File reference counting
### Low Priority (Advanced Features)
- [ ] Clone messages to new topic
- [ ] Resend messages
- [ ] Regenerate assistant messages
- [ ] Multi-model responses
## Next Steps
1. **Test Current Integrations**
- Enable feature flag and test all integrated operations
- Monitor for any errors or performance issues
- Verify data consistency
2. **Phase 3 Consideration**
- Consider refactoring complex topic update operations
- Evaluate if persistExchangeV2 should be used for user+assistant pairs
- Plan migration of remaining db.topics operations
3. **Performance Monitoring**
- Compare load times between original and V2
- Check memory usage with large message histories
- Verify agent session performance
## Rollback Instructions
If issues occur, disable the feature flag immediately:
```javascript
localStorage.setItem('featureFlags', JSON.stringify({ USE_UNIFIED_DB_SERVICE: false }))
location.reload()
```
## Notes
- All V2 implementations maintain backward compatibility
- Agent session operations (IPC-based) are handled transparently
- File operations only apply to Dexie storage, not agent sessions
- Feature flag allows gradual rollout and A/B testing

View File

@ -35,7 +35,18 @@ import { LRUCache } from 'lru-cache'
import type { AppDispatch, RootState } from '../index' import type { AppDispatch, RootState } from '../index'
import { removeManyBlocks, updateOneBlock, upsertManyBlocks, upsertOneBlock } from '../messageBlock' import { removeManyBlocks, updateOneBlock, upsertManyBlocks, upsertOneBlock } from '../messageBlock'
import { newMessagesActions, selectMessagesForTopic } from '../newMessage' import { newMessagesActions, selectMessagesForTopic } from '../newMessage'
import { loadTopicMessagesThunkV2 } from './messageThunk.v2' import {
bulkAddBlocksV2,
clearMessagesFromDBV2,
deleteMessageFromDBV2,
deleteMessagesFromDBV2,
loadTopicMessagesThunkV2,
saveMessageAndBlocksToDBV2,
updateBlocksV2,
updateFileCountV2,
updateMessageV2,
updateSingleBlockV2
} from './messageThunk.v2'
const logger = loggerService.withContext('MessageThunk') const logger = loggerService.withContext('MessageThunk')
@ -192,12 +203,23 @@ const createAgentMessageStream = async (
} }
// TODO: 后续可以将db操作移到Listener Middleware中 // TODO: 后续可以将db操作移到Listener Middleware中
export const saveMessageAndBlocksToDB = async (message: Message, blocks: MessageBlock[], messageIndex: number = -1) => { export const saveMessageAndBlocksToDB = async (message: Message, blocks: MessageBlock[], messageIndex: number = -1) => {
// Use V2 implementation if feature flag is enabled
if (featureFlags.USE_UNIFIED_DB_SERVICE) {
return saveMessageAndBlocksToDBV2(message.topicId, message, blocks)
}
// Original implementation
try { try {
if (isAgentSessionTopicId(message.topicId)) { if (isAgentSessionTopicId(message.topicId)) {
return return
} }
if (blocks.length > 0) { if (blocks.length > 0) {
await db.message_blocks.bulkPut(blocks) // Use V2 implementation if feature flag is enabled
if (featureFlags.USE_UNIFIED_DB_SERVICE) {
await updateBlocksV2(blocks)
} else {
await db.message_blocks.bulkPut(blocks)
}
} }
const topic = await db.topics.get(message.topicId) const topic = await db.topics.get(message.topicId)
if (topic) { if (topic) {
@ -234,7 +256,12 @@ const updateExistingMessageAndBlocksInDB = async (
await db.transaction('rw', db.topics, db.message_blocks, async () => { await db.transaction('rw', db.topics, db.message_blocks, async () => {
// Always update blocks if provided // Always update blocks if provided
if (updatedBlocks.length > 0) { if (updatedBlocks.length > 0) {
await db.message_blocks.bulkPut(updatedBlocks) // Use V2 implementation if feature flag is enabled
if (featureFlags.USE_UNIFIED_DB_SERVICE) {
await updateBlocksV2(updatedBlocks)
} else {
await db.message_blocks.bulkPut(updatedBlocks)
}
} }
// Check if there are message properties to update beyond id and topicId // Check if there are message properties to update beyond id and topicId
@ -303,7 +330,12 @@ const getBlockThrottler = (id: string) => {
}) })
blockUpdateRafs.set(id, rafId) blockUpdateRafs.set(id, rafId)
await db.message_blocks.update(id, blockUpdate) // Use V2 implementation if feature flag is enabled
if (featureFlags.USE_UNIFIED_DB_SERVICE) {
await updateSingleBlockV2(id, blockUpdate)
} else {
await db.message_blocks.update(id, blockUpdate)
}
}, 150) }, 150)
blockUpdateThrottlers.set(id, throttler) blockUpdateThrottlers.set(id, throttler)
@ -907,12 +939,19 @@ export const deleteSingleMessageThunk =
try { try {
dispatch(newMessagesActions.removeMessage({ topicId, messageId })) dispatch(newMessagesActions.removeMessage({ topicId, messageId }))
cleanupMultipleBlocks(dispatch, blockIdsToDelete) cleanupMultipleBlocks(dispatch, blockIdsToDelete)
await db.message_blocks.bulkDelete(blockIdsToDelete)
const topic = await db.topics.get(topicId) // Use V2 implementation if feature flag is enabled
if (topic) { if (featureFlags.USE_UNIFIED_DB_SERVICE) {
const finalMessagesToSave = selectMessagesForTopic(getState(), topicId) await deleteMessageFromDBV2(topicId, messageId)
await db.topics.update(topicId, { messages: finalMessagesToSave }) } else {
dispatch(updateTopicUpdatedAt({ topicId })) // Original implementation
await db.message_blocks.bulkDelete(blockIdsToDelete)
const topic = await db.topics.get(topicId)
if (topic) {
const finalMessagesToSave = selectMessagesForTopic(getState(), topicId)
await db.topics.update(topicId, { messages: finalMessagesToSave })
dispatch(updateTopicUpdatedAt({ topicId }))
}
} }
} catch (error) { } catch (error) {
logger.error(`[deleteSingleMessage] Failed to delete message ${messageId}:`, error as Error) logger.error(`[deleteSingleMessage] Failed to delete message ${messageId}:`, error as Error)
@ -947,16 +986,24 @@ export const deleteMessageGroupThunk =
} }
const blockIdsToDelete = messagesToDelete.flatMap((m) => m.blocks || []) const blockIdsToDelete = messagesToDelete.flatMap((m) => m.blocks || [])
const messageIdsToDelete = messagesToDelete.map((m) => m.id)
try { try {
dispatch(newMessagesActions.removeMessagesByAskId({ topicId, askId })) dispatch(newMessagesActions.removeMessagesByAskId({ topicId, askId }))
cleanupMultipleBlocks(dispatch, blockIdsToDelete) cleanupMultipleBlocks(dispatch, blockIdsToDelete)
await db.message_blocks.bulkDelete(blockIdsToDelete)
const topic = await db.topics.get(topicId) // Use V2 implementation if feature flag is enabled
if (topic) { if (featureFlags.USE_UNIFIED_DB_SERVICE) {
const finalMessagesToSave = selectMessagesForTopic(getState(), topicId) await deleteMessagesFromDBV2(topicId, messageIdsToDelete)
await db.topics.update(topicId, { messages: finalMessagesToSave }) } else {
dispatch(updateTopicUpdatedAt({ topicId })) // Original implementation
await db.message_blocks.bulkDelete(blockIdsToDelete)
const topic = await db.topics.get(topicId)
if (topic) {
const finalMessagesToSave = selectMessagesForTopic(getState(), topicId)
await db.topics.update(topicId, { messages: finalMessagesToSave })
dispatch(updateTopicUpdatedAt({ topicId }))
}
} }
} catch (error) { } catch (error) {
logger.error(`[deleteMessageGroup] Failed to delete messages with askId ${askId}:`, error as Error) logger.error(`[deleteMessageGroup] Failed to delete messages with askId ${askId}:`, error as Error)
@ -983,10 +1030,16 @@ export const clearTopicMessagesThunk =
dispatch(newMessagesActions.clearTopicMessages(topicId)) dispatch(newMessagesActions.clearTopicMessages(topicId))
cleanupMultipleBlocks(dispatch, blockIdsToDelete) cleanupMultipleBlocks(dispatch, blockIdsToDelete)
await db.topics.update(topicId, { messages: [] }) // Use V2 implementation if feature flag is enabled
dispatch(updateTopicUpdatedAt({ topicId })) if (featureFlags.USE_UNIFIED_DB_SERVICE) {
if (blockIdsToDelete.length > 0) { await clearMessagesFromDBV2(topicId)
await db.message_blocks.bulkDelete(blockIdsToDelete) } else {
// Original implementation
await db.topics.update(topicId, { messages: [] })
dispatch(updateTopicUpdatedAt({ topicId }))
if (blockIdsToDelete.length > 0) {
await db.message_blocks.bulkDelete(blockIdsToDelete)
}
} }
} catch (error) { } catch (error) {
logger.error(`[clearTopicMessagesThunk] Failed to clear messages for topic ${topicId}:`, error as Error) logger.error(`[clearTopicMessagesThunk] Failed to clear messages for topic ${topicId}:`, error as Error)
@ -1309,7 +1362,12 @@ export const updateTranslationBlockThunk =
dispatch(updateOneBlock({ id: blockId, changes })) dispatch(updateOneBlock({ id: blockId, changes }))
// 更新数据库 // 更新数据库
await db.message_blocks.update(blockId, changes) // Use V2 implementation if feature flag is enabled
if (featureFlags.USE_UNIFIED_DB_SERVICE) {
await updateSingleBlockV2(blockId, changes)
} else {
await db.message_blocks.update(blockId, changes)
}
// Logger.log(`[updateTranslationBlockThunk] Successfully updated translation block ${blockId}.`) // Logger.log(`[updateTranslationBlockThunk] Successfully updated translation block ${blockId}.`)
} catch (error) { } catch (error) {
logger.error(`[updateTranslationBlockThunk] Failed to update translation block ${blockId}:`, error as Error) logger.error(`[updateTranslationBlockThunk] Failed to update translation block ${blockId}:`, error as Error)
@ -1522,20 +1580,33 @@ export const cloneMessagesToNewTopicThunk =
// Add the NEW blocks // Add the NEW blocks
if (clonedBlocks.length > 0) { if (clonedBlocks.length > 0) {
await db.message_blocks.bulkAdd(clonedBlocks) // Use V2 implementation if feature flag is enabled
if (featureFlags.USE_UNIFIED_DB_SERVICE) {
await bulkAddBlocksV2(clonedBlocks)
} else {
await db.message_blocks.bulkAdd(clonedBlocks)
}
} }
// Update file counts // Update file counts
const uniqueFiles = [...new Map(filesToUpdateCount.map((f) => [f.id, f])).values()] const uniqueFiles = [...new Map(filesToUpdateCount.map((f) => [f.id, f])).values()]
for (const file of uniqueFiles) { if (featureFlags.USE_UNIFIED_DB_SERVICE) {
await db.files // Use V2 implementation for file count updates
.where('id') for (const file of uniqueFiles) {
.equals(file.id) await updateFileCountV2(file.id, 1, false)
.modify((f) => { }
if (f) { } else {
// Ensure file exists before modifying // Original implementation
f.count = (f.count || 0) + 1 for (const file of uniqueFiles) {
} await db.files
}) .where('id')
.equals(file.id)
.modify((f) => {
if (f) {
// Ensure file exists before modifying
f.count = (f.count || 0) + 1
}
})
}
} }
}) })
@ -1589,33 +1660,46 @@ export const updateMessageAndBlocksThunk =
} }
// 2. 更新数据库 (在事务中) // 2. 更新数据库 (在事务中)
await db.transaction('rw', db.topics, db.message_blocks, async () => { // Use V2 implementation if feature flag is enabled
// Only update topic.messages if there were actual message changes if (featureFlags.USE_UNIFIED_DB_SERVICE) {
if (messageUpdates && Object.keys(messageUpdates).length > 0) { // Update message properties if provided
const topic = await db.topics.get(topicId) if (messageUpdates && Object.keys(messageUpdates).length > 0 && messageId) {
if (topic && topic.messages) { await updateMessageV2(topicId, messageId, messageUpdates)
const messageIndex = topic.messages.findIndex((m) => m.id === messageId) }
if (messageIndex !== -1) { // Update blocks if provided
Object.assign(topic.messages[messageIndex], messageUpdates) if (blockUpdatesList.length > 0) {
await db.topics.update(topicId, { messages: topic.messages }) await updateBlocksV2(blockUpdatesList)
}
} else {
// Original implementation with transaction
await db.transaction('rw', db.topics, db.message_blocks, async () => {
// Only update topic.messages if there were actual message changes
if (messageUpdates && Object.keys(messageUpdates).length > 0) {
const topic = await db.topics.get(topicId)
if (topic && topic.messages) {
const messageIndex = topic.messages.findIndex((m) => m.id === messageId)
if (messageIndex !== -1) {
Object.assign(topic.messages[messageIndex], messageUpdates)
await db.topics.update(topicId, { messages: topic.messages })
} else {
logger.error(
`[updateMessageAndBlocksThunk] Message ${messageId} not found in DB topic ${topicId} for property update.`
)
throw new Error(`Message ${messageId} not found in DB topic ${topicId} for property update.`)
}
} else { } else {
logger.error( logger.error(
`[updateMessageAndBlocksThunk] Message ${messageId} not found in DB topic ${topicId} for property update.` `[updateMessageAndBlocksThunk] Topic ${topicId} not found or empty for message property update.`
) )
throw new Error(`Message ${messageId} not found in DB topic ${topicId} for property update.`) throw new Error(`Topic ${topicId} not found or empty for message property update.`)
} }
} else {
logger.error(
`[updateMessageAndBlocksThunk] Topic ${topicId} not found or empty for message property update.`
)
throw new Error(`Topic ${topicId} not found or empty for message property update.`)
} }
}
if (blockUpdatesList.length > 0) { if (blockUpdatesList.length > 0) {
await db.message_blocks.bulkPut(blockUpdatesList) await db.message_blocks.bulkPut(blockUpdatesList)
} }
}) })
}
dispatch(updateTopicUpdatedAt({ topicId })) dispatch(updateTopicUpdatedAt({ topicId }))
} catch (error) { } catch (error) {

View File

@ -53,7 +53,7 @@ export const loadTopicMessagesThunkV2 =
} }
dispatch(newMessagesActions.messagesReceived({ topicId, messages })) dispatch(newMessagesActions.messagesReceived({ topicId, messages }))
} catch (error) { } catch (error) {
logger.error(`Failed to load messages for topic ${topicId}:`, error) logger.error(`Failed to load messages for topic ${topicId}:`, error as Error)
// Could dispatch an error action here if needed // Could dispatch an error action here if needed
} finally { } finally {
dispatch(newMessagesActions.setTopicLoading({ topicId, loading: false })) dispatch(newMessagesActions.setTopicLoading({ topicId, loading: false }))
@ -95,7 +95,7 @@ export const getTopicV2 = async (topicId: string): Promise<Topic | undefined> =>
// Construct the full Topic object // Construct the full Topic object
const topic: Topic = { const topic: Topic = {
id: rawTopic.id, id: rawTopic.id,
type: isAgentSessionTopicId(topicId) ? TopicType.AgentSession : TopicType.Chat, type: isAgentSessionTopicId(topicId) ? TopicType.Session : TopicType.Chat,
messages: rawTopic.messages, messages: rawTopic.messages,
assistantId: '', // These fields would need to be fetched from appropriate source assistantId: '', // These fields would need to be fetched from appropriate source
name: '', name: '',
@ -261,7 +261,7 @@ export const updateSingleBlockV2 = async (blockId: string, updates: Partial<Mess
} }
/** /**
* Bulk add message blocks * Bulk add message blocks (for new blocks)
*/ */
export const bulkAddBlocksV2 = async (blocks: MessageBlock[]): Promise<void> => { export const bulkAddBlocksV2 = async (blocks: MessageBlock[]): Promise<void> => {
try { try {
@ -272,3 +272,16 @@ export const bulkAddBlocksV2 = async (blocks: MessageBlock[]): Promise<void> =>
throw error throw error
} }
} }
/**
* Update multiple message blocks (upsert operation)
*/
export const updateBlocksV2 = async (blocks: MessageBlock[]): Promise<void> => {
try {
await dbService.updateBlocks(blocks)
logger.info('Updated blocks via DbService', { count: blocks.length })
} catch (error) {
logger.error('Failed to update blocks:', { count: blocks.length, error })
throw error
}
}

View File

@ -198,8 +198,14 @@ export type Metrics = {
time_thinking_millsec?: number time_thinking_millsec?: number
} }
export enum TopicType {
Chat = 'chat',
Session = 'session'
}
export type Topic = { export type Topic = {
id: string id: string
type: TopicType
assistantId: string assistantId: string
name: string name: string
createdAt: string createdAt: string