feat(api): implement message branching API with tree structure support

- Add Topic and Message API endpoints for CRUD operations
  - Implement tree visualization queries (GET /topics/:id/tree)
  - Implement branch message queries with pagination (GET /topics/:id/messages)
  - Add multi-model response grouping via siblingsGroupId
  - Support topic forking from existing message nodes
  - Add INVALID_OPERATION error code for business rule violations
  - Update API design guidelines documentation
This commit is contained in:
fullex 2025-12-28 12:54:06 +08:00
parent 939100d495
commit 7faff7ad4b
13 changed files with 1453 additions and 19 deletions

View File

@ -146,6 +146,7 @@ Use standard HTTP status codes consistently:
| 201 Created | Successful POST | Return created resource |
| 204 No Content | Successful DELETE | No body |
| 400 Bad Request | Invalid request format | Malformed JSON |
| 400 Invalid Operation | Business rule violation | Delete root without cascade, cycle creation |
| 401 Unauthorized | Authentication required | Missing/invalid token |
| 403 Permission Denied | Insufficient permissions | Access denied to resource |
| 404 Not Found | Resource not found | Invalid ID |
@ -209,6 +210,14 @@ interface SerializedDataApiError {
status: 504,
details: { operation: 'fetch topics', timeoutMs: 3000 }
}
// 400 Invalid Operation
{
code: 'INVALID_OPERATION',
message: 'Invalid operation: delete root message - cascade=true required',
status: 400,
details: { operation: 'delete root message', reason: 'cascade=true required' }
}
```
Use `DataApiErrorFactory` utilities to create consistent errors:
@ -222,6 +231,7 @@ throw DataApiErrorFactory.validation({ name: ['Required'] })
throw DataApiErrorFactory.database(error, 'insert topic')
throw DataApiErrorFactory.timeout('fetch topics', 3000)
throw DataApiErrorFactory.dataInconsistent('Topic', 'parent reference broken')
throw DataApiErrorFactory.invalidOperation('delete root message', 'cascade=true required')
// Check if error is retryable
if (error instanceof DataApiError && error.isRetryable) {

View File

@ -58,6 +58,13 @@ export enum ErrorCode {
/** 403 - Authenticated but lacks required permissions */
PERMISSION_DENIED = 'PERMISSION_DENIED',
/**
* 400 - Operation is not valid in current state.
* Use when: deleting root message without cascade, moving node would create cycle,
* or any operation that violates business rules but isn't a validation error.
*/
INVALID_OPERATION = 'INVALID_OPERATION',
// ─────────────────────────────────────────────────────────────────
// Server errors (5xx) - Issues on the server side
// ─────────────────────────────────────────────────────────────────
@ -123,6 +130,7 @@ export const ERROR_STATUS_MAP: Record<ErrorCode, number> = {
[ErrorCode.VALIDATION_ERROR]: 422,
[ErrorCode.RATE_LIMIT_EXCEEDED]: 429,
[ErrorCode.PERMISSION_DENIED]: 403,
[ErrorCode.INVALID_OPERATION]: 400,
// Server errors (5xx)
[ErrorCode.INTERNAL_SERVER_ERROR]: 500,
@ -149,6 +157,7 @@ export const ERROR_MESSAGES: Record<ErrorCode, string> = {
[ErrorCode.VALIDATION_ERROR]: 'Validation error: Request data does not meet requirements',
[ErrorCode.RATE_LIMIT_EXCEEDED]: 'Rate limit exceeded: Too many requests',
[ErrorCode.PERMISSION_DENIED]: 'Permission denied: Insufficient permissions for this operation',
[ErrorCode.INVALID_OPERATION]: 'Invalid operation: Operation not allowed in current state',
[ErrorCode.INTERNAL_SERVER_ERROR]: 'Internal server error: An unexpected error occurred',
[ErrorCode.DATABASE_ERROR]: 'Database error: Failed to access or modify data',
@ -232,6 +241,14 @@ export interface PermissionDeniedErrorDetails {
resource?: string
}
/**
* Details for INVALID_OPERATION - what operation was invalid.
*/
export interface InvalidOperationErrorDetails {
operation: string
reason?: string
}
/**
* Details for RESOURCE_LOCKED - which resource is locked.
*/
@ -272,6 +289,7 @@ export type ErrorDetailsMap = {
[ErrorCode.TIMEOUT]: TimeoutErrorDetails
[ErrorCode.DATA_INCONSISTENT]: DataInconsistentErrorDetails
[ErrorCode.PERMISSION_DENIED]: PermissionDeniedErrorDetails
[ErrorCode.INVALID_OPERATION]: InvalidOperationErrorDetails
[ErrorCode.RESOURCE_LOCKED]: ResourceLockedErrorDetails
[ErrorCode.CONCURRENT_MODIFICATION]: ConcurrentModificationErrorDetails
[ErrorCode.INTERNAL_SERVER_ERROR]: InternalErrorDetails
@ -631,6 +649,29 @@ export class DataApiErrorFactory {
)
}
/**
* Create an invalid operation error.
* Use when an operation violates business rules but isn't a validation error.
* @param operation - Description of the invalid operation
* @param reason - Optional reason why the operation is invalid
* @param requestContext - Optional request context
* @returns DataApiError with INVALID_OPERATION code
*/
static invalidOperation(
operation: string,
reason?: string,
requestContext?: RequestContext
): DataApiError<ErrorCode.INVALID_OPERATION> {
const message = reason ? `Invalid operation: ${operation} - ${reason}` : `Invalid operation: ${operation}`
return new DataApiError(
ErrorCode.INVALID_OPERATION,
message,
ERROR_STATUS_MAP[ErrorCode.INVALID_OPERATION],
{ operation, reason },
requestContext
)
}
/**
* Create a data inconsistency error.
* @param resource - The resource with inconsistent data

View File

@ -86,6 +86,7 @@ export type {
DetailsForCode,
ErrorDetailsMap,
InternalErrorDetails,
InvalidOperationErrorDetails,
NotFoundErrorDetails,
PermissionDeniedErrorDetails,
RequestContext,

View File

@ -14,12 +14,15 @@
*
* // Domain DTOs directly from schema files
* import type { TestItem, CreateTestItemDto } from '@shared/data/api/schemas/test'
* import type { Topic, CreateTopicDto } from '@shared/data/api/schemas/topic'
* import type { Topic, CreateTopicDto } from '@shared/data/api/schemas/topics'
* import type { Message, CreateMessageDto } from '@shared/data/api/schemas/messages'
* ```
*/
import type { AssertValidSchemas } from '../apiTypes'
import type { MessageSchemas } from './messages'
import type { TestSchemas } from './test'
import type { TopicSchemas } from './topics'
/**
* Merged API Schemas - single source of truth for all API endpoints
@ -32,11 +35,5 @@ import type { TestSchemas } from './test'
* When adding a new domain:
* 1. Create the schema file (e.g., topic.ts)
* 2. Import and add to intersection below
*
* @example
* ```typescript
* import type { TopicSchemas } from './topic'
* export type ApiSchemas = AssertValidSchemas<TestSchemas & TopicSchemas>
* ```
*/
export type ApiSchemas = AssertValidSchemas<TestSchemas>
export type ApiSchemas = AssertValidSchemas<TestSchemas & TopicSchemas & MessageSchemas>

View File

@ -0,0 +1,175 @@
/**
* Message API Schema definitions
*
* Contains all message-related endpoints for tree operations and message management.
* Includes endpoints for tree visualization and conversation view.
*/
import type {
BranchMessagesResponse,
Message,
MessageData,
MessageRole,
MessageStats,
TreeResponse
} from '@shared/data/types/message'
import type { AssistantMeta, ModelMeta } from '@shared/data/types/meta'
// ============================================================================
// DTOs
// ============================================================================
/**
* DTO for creating a new message
*/
export interface CreateMessageDto {
/** Parent message ID (null for root) */
parentId: string | null
/** Message role */
role: MessageRole
/** Message content */
data: MessageData
/** Message status */
status?: 'success' | 'error' | 'paused'
/** Siblings group ID (0 = normal, >0 = multi-model group) */
siblingsGroupId?: number
/** Assistant ID */
assistantId?: string
/** Preserved assistant info */
assistantMeta?: AssistantMeta
/** Model identifier */
modelId?: string
/** Preserved model info */
modelMeta?: ModelMeta
/** Trace ID */
traceId?: string
/** Statistics */
stats?: MessageStats
}
/**
* DTO for updating an existing message
*/
export interface UpdateMessageDto {
/** Updated message content */
data?: MessageData
/** Move message to new parent */
parentId?: string | null
/** Change siblings group */
siblingsGroupId?: number
/** Update status */
status?: 'success' | 'error' | 'paused'
}
/**
* Response for delete operation
*/
export interface DeleteMessageResponse {
/** IDs of deleted messages */
deletedIds: string[]
/** IDs of reparented children (only when cascade=false) */
reparentedIds?: string[]
}
// ============================================================================
// Query Parameters
// ============================================================================
/**
* Query parameters for GET /topics/:id/tree
*/
export interface TreeQueryParams {
/** Root node ID (defaults to tree root) */
rootId?: string
/** End node ID (defaults to topic.activeNodeId) */
nodeId?: string
/** Depth to expand beyond active path (-1 = all, 0 = path only, 1+ = layers) */
depth?: number
}
/**
* Query parameters for GET /topics/:id/messages
*/
export interface BranchMessagesQueryParams {
/** End node ID (defaults to topic.activeNodeId) */
nodeId?: string
/** Pagination cursor: return messages before this node */
beforeNodeId?: string
/** Number of messages to return */
limit?: number
/** Whether to include siblingsGroup in response */
includeSiblings?: boolean
}
// ============================================================================
// API Schema Definitions
// ============================================================================
/**
* Message API Schema definitions
*
* Organized by domain responsibility:
* - /topics/:id/tree - Tree visualization
* - /topics/:id/messages - Branch messages for conversation
* - /messages/:id - Individual message operations
*/
export interface MessageSchemas {
/**
* Tree query endpoint for visualization
* @example GET /topics/abc123/tree?depth=1
*/
'/topics/:topicId/tree': {
/** Get tree structure for visualization */
GET: {
params: { topicId: string }
query?: TreeQueryParams
response: TreeResponse
}
}
/**
* Branch messages endpoint for conversation view
* @example GET /topics/abc123/messages?limit=20
* @example POST /topics/abc123/messages { "parentId": "msg1", "role": "user", "data": {...} }
*/
'/topics/:topicId/messages': {
/** Get messages along active branch with pagination */
GET: {
params: { topicId: string }
query?: BranchMessagesQueryParams
response: BranchMessagesResponse
}
/** Create a new message in the topic */
POST: {
params: { topicId: string }
body: CreateMessageDto
response: Message
}
}
/**
* Individual message endpoint
* @example GET /messages/msg123
* @example PATCH /messages/msg123 { "data": {...} }
* @example DELETE /messages/msg123?cascade=true
*/
'/messages/:id': {
/** Get a single message by ID */
GET: {
params: { id: string }
response: Message
}
/** Update a message (content, move to new parent, etc.) */
PATCH: {
params: { id: string }
body: UpdateMessageDto
response: Message
}
/** Delete a message (cascade=true deletes descendants, cascade=false reparents children) */
DELETE: {
params: { id: string }
query?: { cascade?: boolean }
response: DeleteMessageResponse
}
}
}

View File

@ -0,0 +1,133 @@
/**
* Topic API Schema definitions
*
* Contains all topic-related endpoints for CRUD operations and branch switching.
*/
import type { AssistantMeta } from '@shared/data/types/meta'
import type { Topic } from '@shared/data/types/topic'
// ============================================================================
// DTOs
// ============================================================================
/**
* DTO for creating a new topic
*/
export interface CreateTopicDto {
/** Topic name */
name?: string
/** Associated assistant ID */
assistantId?: string
/** Preserved assistant info */
assistantMeta?: AssistantMeta
/** Topic-specific prompt */
prompt?: string
/** Group ID for organization */
groupId?: string
/**
* Source node ID for fork operation.
* When provided, copies the path from root to this node into the new topic.
*/
sourceNodeId?: string
}
/**
* DTO for updating an existing topic
*/
export interface UpdateTopicDto {
/** Updated topic name */
name?: string
/** Mark name as manually edited */
isNameManuallyEdited?: boolean
/** Updated assistant ID */
assistantId?: string
/** Updated assistant meta */
assistantMeta?: AssistantMeta
/** Updated prompt */
prompt?: string
/** Updated group ID */
groupId?: string
/** Updated sort order */
sortOrder?: number
/** Updated pin state */
isPinned?: boolean
/** Updated pin order */
pinnedOrder?: number
}
/**
* DTO for setting active node
*/
export interface SetActiveNodeDto {
/** Node ID to set as active */
nodeId: string
}
/**
* Response for active node update
*/
export interface ActiveNodeResponse {
/** The new active node ID */
activeNodeId: string
}
// ============================================================================
// API Schema Definitions
// ============================================================================
/**
* Topic API Schema definitions
*/
export interface TopicSchemas {
/**
* Topics collection endpoint
* @example POST /topics { "name": "New Topic", "assistantId": "asst_123" }
*/
'/topics': {
/** Create a new topic (optionally fork from existing node) */
POST: {
body: CreateTopicDto
response: Topic
}
}
/**
* Individual topic endpoint
* @example GET /topics/abc123
* @example PATCH /topics/abc123 { "name": "Updated Name" }
* @example DELETE /topics/abc123
*/
'/topics/:id': {
/** Get a topic by ID */
GET: {
params: { id: string }
response: Topic
}
/** Update a topic */
PATCH: {
params: { id: string }
body: UpdateTopicDto
response: Topic
}
/** Delete a topic and all its messages */
DELETE: {
params: { id: string }
response: void
}
}
/**
* Active node sub-resource endpoint
* High-frequency operation for branch switching
* @example PUT /topics/abc123/active-node { "nodeId": "msg456" }
*/
'/topics/:id/active-node': {
/** Set the active node for a topic */
PUT: {
params: { id: string }
body: SetActiveNodeDto
response: ActiveNodeResponse
}
}
}

View File

@ -169,3 +169,136 @@ export type MessageDataBlock =
| VideoBlock
| ErrorBlock
| CompactBlock
// ============================================================================
// Message Entity Types
// ============================================================================
import type { AssistantMeta, ModelMeta } from './meta'
/**
* Message role - user, assistant, or system
*/
export type MessageRole = 'user' | 'assistant' | 'system'
/**
* Message status - final state after processing
*/
export type MessageStatus = 'success' | 'error' | 'paused'
/**
* Complete message entity as stored in database
*/
export interface Message {
/** Message ID (UUIDv7) */
id: string
/** Topic ID this message belongs to */
topicId: string
/** Parent message ID (null for root) */
parentId: string | null
/** Message role */
role: MessageRole
/** Message content (blocks, mentions, etc.) */
data: MessageData
/** Searchable text extracted from data.blocks */
searchableText?: string | null
/** Message status */
status: MessageStatus
/** Siblings group ID (0 = normal branch, >0 = multi-model response group) */
siblingsGroupId: number
/** Assistant ID */
assistantId?: string | null
/** Preserved assistant info for display */
assistantMeta?: AssistantMeta | null
/** Model identifier */
modelId?: string | null
/** Preserved model info (provider, name) */
modelMeta?: ModelMeta | null
/** Trace ID for tracking */
traceId?: string | null
/** Statistics: token usage, performance metrics */
stats?: MessageStats | null
/** Creation timestamp (ISO string) */
createdAt: string
/** Last update timestamp (ISO string) */
updatedAt: string
}
// ============================================================================
// Tree Structure Types
// ============================================================================
/**
* Lightweight tree node for tree visualization (ReactFlow)
* Contains only essential display info, not full message content
*/
export interface TreeNode {
/** Message ID */
id: string
/** Parent message ID (null for root, omitted in SiblingsGroup.nodes) */
parentId?: string | null
/** Message role */
role: MessageRole
/** Content preview (first 50 characters) */
preview: string
/** Model identifier */
modelId?: string | null
/** Model display info */
modelMeta?: ModelMeta | null
/** Message status */
status: MessageStatus
/** Creation timestamp (ISO string) */
createdAt: string
/** Whether this node has children (for expand indicator) */
hasChildren: boolean
}
/**
* Group of sibling nodes with same parentId and siblingsGroupId
* Used for multi-model responses in tree view
*/
export interface SiblingsGroup {
/** Parent message ID */
parentId: string
/** Siblings group ID (non-zero) */
siblingsGroupId: number
/** Nodes in this group (parentId omitted to avoid redundancy) */
nodes: Omit<TreeNode, 'parentId'>[]
}
/**
* Tree query response structure
*/
export interface TreeResponse {
/** Regular nodes (siblingsGroupId = 0) */
nodes: TreeNode[]
/** Multi-model response groups (siblingsGroupId != 0) */
siblingsGroups: SiblingsGroup[]
/** Current active node ID */
activeNodeId: string | null
}
// ============================================================================
// Branch Message Types
// ============================================================================
/**
* Message with optional siblings group for conversation view
* Used in GET /topics/:id/messages response
*/
export interface BranchMessage {
/** The message itself */
message: Message
/** Other messages in the same siblings group (only when siblingsGroupId != 0 and includeSiblings=true) */
siblingsGroup?: Message[]
}
/**
* Branch messages response structure
*/
export interface BranchMessagesResponse {
/** Messages in root-to-leaf order */
messages: BranchMessage[]
/** Current active node ID */
activeNodeId: string | null
}

View File

@ -0,0 +1,40 @@
/**
* Topic entity types
*
* Topics are containers for messages and belong to assistants.
* They can be organized into groups and have tags for categorization.
*/
import type { AssistantMeta } from './meta'
/**
* Complete topic entity as stored in database
*/
export interface Topic {
/** Topic ID */
id: string
/** Topic name */
name?: string | null
/** Whether the name was manually edited by user */
isNameManuallyEdited: boolean
/** Associated assistant ID */
assistantId?: string | null
/** Preserved assistant info for display when assistant is deleted */
assistantMeta?: AssistantMeta | null
/** Topic-specific prompt override */
prompt?: string | null
/** Active node ID in the message tree */
activeNodeId?: string | null
/** Group ID for organization */
groupId?: string | null
/** Sort order within group */
sortOrder: number
/** Whether topic is pinned */
isPinned: boolean
/** Pinned order */
pinnedOrder: number
/** Creation timestamp (ISO string) */
createdAt: string
/** Last update timestamp (ISO string) */
updatedAt: string
}

View File

@ -6,21 +6,15 @@
*
* Handler files are organized by domain:
* - test.ts - Test API handlers
*
* @example Adding a new domain:
* ```typescript
* import { topicHandlers } from './topic'
*
* export const apiHandlers: ApiImplementation = {
* ...testHandlers,
* ...topicHandlers // Add new domain handlers here
* }
* ```
* - topics.ts - Topic API handlers
* - messages.ts - Message API handlers
*/
import type { ApiImplementation } from '@shared/data/api/apiTypes'
import { messageHandlers } from './messages'
import { testHandlers } from './test'
import { topicHandlers } from './topics'
/**
* Complete API handlers implementation
@ -30,5 +24,7 @@ import { testHandlers } from './test'
* TypeScript ensures exhaustive coverage - missing handlers cause compile errors.
*/
export const apiHandlers: ApiImplementation = {
...testHandlers
...testHandlers,
...topicHandlers,
...messageHandlers
}

View File

@ -0,0 +1,69 @@
/**
* Message API Handlers
*
* Implements all message-related API endpoints including:
* - Tree visualization queries
* - Branch message queries with pagination
* - Message CRUD operations
*/
import { messageService } from '@data/services/MessageService'
import type { ApiHandler, ApiMethods } from '@shared/data/api/apiTypes'
import type { BranchMessagesQueryParams, MessageSchemas, TreeQueryParams } from '@shared/data/api/schemas/messages'
/**
* Handler type for a specific message endpoint
*/
type MessageHandler<Path extends keyof MessageSchemas, Method extends ApiMethods<Path>> = ApiHandler<Path, Method>
/**
* Message API handlers implementation
*/
export const messageHandlers: {
[Path in keyof MessageSchemas]: {
[Method in keyof MessageSchemas[Path]]: MessageHandler<Path, Method & ApiMethods<Path>>
}
} = {
'/topics/:topicId/tree': {
GET: async ({ params, query }) => {
const q = (query || {}) as TreeQueryParams
return await messageService.getTree(params.topicId, {
rootId: q.rootId,
nodeId: q.nodeId,
depth: q.depth
})
}
},
'/topics/:topicId/messages': {
GET: async ({ params, query }) => {
const q = (query || {}) as BranchMessagesQueryParams
return await messageService.getBranchMessages(params.topicId, {
nodeId: q.nodeId,
beforeNodeId: q.beforeNodeId,
limit: q.limit,
includeSiblings: q.includeSiblings
})
},
POST: async ({ params, body }) => {
return await messageService.create(params.topicId, body)
}
},
'/messages/:id': {
GET: async ({ params }) => {
return await messageService.getById(params.id)
},
PATCH: async ({ params, body }) => {
return await messageService.update(params.id, body)
},
DELETE: async ({ params, query }) => {
const q = (query || {}) as { cascade?: boolean }
const cascade = q.cascade ?? false
return await messageService.delete(params.id, cascade)
}
}
}

View File

@ -0,0 +1,52 @@
/**
* Topic API Handlers
*
* Implements all topic-related API endpoints including:
* - Topic CRUD operations
* - Active node switching for branch navigation
*/
import { topicService } from '@data/services/TopicService'
import type { ApiHandler, ApiMethods } from '@shared/data/api/apiTypes'
import type { TopicSchemas } from '@shared/data/api/schemas/topics'
/**
* Handler type for a specific topic endpoint
*/
type TopicHandler<Path extends keyof TopicSchemas, Method extends ApiMethods<Path>> = ApiHandler<Path, Method>
/**
* Topic API handlers implementation
*/
export const topicHandlers: {
[Path in keyof TopicSchemas]: {
[Method in keyof TopicSchemas[Path]]: TopicHandler<Path, Method & ApiMethods<Path>>
}
} = {
'/topics': {
POST: async ({ body }) => {
return await topicService.create(body)
}
},
'/topics/:id': {
GET: async ({ params }) => {
return await topicService.getById(params.id)
},
PATCH: async ({ params, body }) => {
return await topicService.update(params.id, body)
},
DELETE: async ({ params }) => {
await topicService.delete(params.id)
return undefined
}
},
'/topics/:id/active-node': {
PUT: async ({ params, body }) => {
return await topicService.setActiveNode(params.id, body.nodeId)
}
}
}

View File

@ -0,0 +1,543 @@
/**
* Message Service - handles message CRUD and tree operations
*
* Provides business logic for:
* - Tree visualization queries
* - Branch message queries with pagination
* - Message CRUD with tree structure maintenance
* - Cascade delete and reparenting
*/
import { dbService } from '@data/db/DbService'
import { messageTable } from '@data/db/schemas/message'
import { topicTable } from '@data/db/schemas/topic'
import { loggerService } from '@logger'
import { DataApiErrorFactory } from '@shared/data/api'
import type { CreateMessageDto, UpdateMessageDto } from '@shared/data/api/schemas/messages'
import type {
BranchMessage,
BranchMessagesResponse,
Message,
SiblingsGroup,
TreeNode,
TreeResponse
} from '@shared/data/types/message'
import { and, eq, inArray, isNull, sql } from 'drizzle-orm'
import { v7 as uuidv7 } from 'uuid'
const logger = loggerService.withContext('MessageService')
/**
* Preview length for tree nodes
*/
const PREVIEW_LENGTH = 50
/**
* Default pagination limit
*/
const DEFAULT_LIMIT = 20
/**
* Convert database row to Message entity
*/
function rowToMessage(row: typeof messageTable.$inferSelect): Message {
return {
id: row.id,
topicId: row.topicId,
parentId: row.parentId,
role: row.role as Message['role'],
data: row.data,
searchableText: row.searchableText,
status: row.status as Message['status'],
siblingsGroupId: row.siblingsGroupId ?? 0,
assistantId: row.assistantId,
assistantMeta: row.assistantMeta,
modelId: row.modelId,
modelMeta: row.modelMeta,
traceId: row.traceId,
stats: row.stats,
createdAt: row.createdAt ? new Date(row.createdAt).toISOString() : new Date().toISOString(),
updatedAt: row.updatedAt ? new Date(row.updatedAt).toISOString() : new Date().toISOString()
}
}
/**
* Extract preview text from message data
*/
function extractPreview(message: Message): string {
const blocks = message.data?.blocks || []
for (const block of blocks) {
if ('content' in block && typeof block.content === 'string') {
const text = block.content.trim()
if (text.length > 0) {
return text.length > PREVIEW_LENGTH ? text.substring(0, PREVIEW_LENGTH) + '...' : text
}
}
}
return ''
}
/**
* Convert Message to TreeNode
*/
function messageToTreeNode(message: Message, hasChildren: boolean): TreeNode {
return {
id: message.id,
parentId: message.parentId,
role: message.role === 'system' ? 'assistant' : message.role,
preview: extractPreview(message),
modelId: message.modelId,
modelMeta: message.modelMeta,
status: message.status,
createdAt: message.createdAt,
hasChildren
}
}
export class MessageService {
private static instance: MessageService
private constructor() {}
public static getInstance(): MessageService {
if (!MessageService.instance) {
MessageService.instance = new MessageService()
}
return MessageService.instance
}
/**
* Get tree structure for visualization
*/
async getTree(
topicId: string,
options: { rootId?: string; nodeId?: string; depth?: number } = {}
): Promise<TreeResponse> {
const db = dbService.getDb()
const { depth = 1 } = options
// Get topic to verify existence and get activeNodeId
const [topic] = await db.select().from(topicTable).where(eq(topicTable.id, topicId)).limit(1)
if (!topic) {
throw DataApiErrorFactory.notFound('Topic', topicId)
}
const activeNodeId = options.nodeId || topic.activeNodeId
// Get all messages for this topic
const allMessages = await db
.select()
.from(messageTable)
.where(and(eq(messageTable.topicId, topicId), isNull(messageTable.deletedAt)))
if (allMessages.length === 0) {
return { nodes: [], siblingsGroups: [], activeNodeId: null }
}
const messagesById = new Map<string, Message>()
const childrenMap = new Map<string, string[]>()
for (const row of allMessages) {
const message = rowToMessage(row)
messagesById.set(message.id, message)
const parentId = message.parentId || 'root'
if (!childrenMap.has(parentId)) {
childrenMap.set(parentId, [])
}
childrenMap.get(parentId)!.push(message.id)
}
// Find root node(s) and build active path
const rootIds = childrenMap.get('root') || []
const rootId = options.rootId || rootIds[0]
// Build path from rootId to activeNodeId
const activePath = new Set<string>()
if (activeNodeId) {
let currentId: string | null = activeNodeId
while (currentId) {
activePath.add(currentId)
const message = messagesById.get(currentId)
currentId = message?.parentId || null
}
}
// Collect nodes based on depth
const resultNodes: TreeNode[] = []
const siblingsGroups: SiblingsGroup[] = []
const visitedGroups = new Set<string>()
const collectNodes = (nodeId: string, currentDepth: number, isOnActivePath: boolean) => {
const message = messagesById.get(nodeId)
if (!message) return
const children = childrenMap.get(nodeId) || []
const hasChildren = children.length > 0
// Check if this message is part of a siblings group
if (message.siblingsGroupId !== 0) {
const groupKey = `${message.parentId}-${message.siblingsGroupId}`
if (!visitedGroups.has(groupKey)) {
visitedGroups.add(groupKey)
// Find all siblings in this group
const parentChildren = childrenMap.get(message.parentId || 'root') || []
const groupMembers = parentChildren
.map((id) => messagesById.get(id)!)
.filter((m) => m.siblingsGroupId === message.siblingsGroupId)
if (groupMembers.length > 1) {
siblingsGroups.push({
parentId: message.parentId!,
siblingsGroupId: message.siblingsGroupId,
nodes: groupMembers.map((m) => {
const memberChildren = childrenMap.get(m.id) || []
const node = messageToTreeNode(m, memberChildren.length > 0)
const { parentId: _parentId, ...rest } = node
void _parentId // Intentionally unused - removing parentId from TreeNode for SiblingsGroup
return rest
})
})
} else {
// Single member, add as regular node
resultNodes.push(messageToTreeNode(message, hasChildren))
}
}
} else {
resultNodes.push(messageToTreeNode(message, hasChildren))
}
// Recurse to children
const shouldExpand = isOnActivePath || (depth === -1 ? true : currentDepth < depth)
if (shouldExpand) {
for (const childId of children) {
const childOnPath = activePath.has(childId)
collectNodes(childId, isOnActivePath ? 0 : currentDepth + 1, childOnPath)
}
}
}
// Start from root
if (rootId) {
collectNodes(rootId, 0, activePath.has(rootId))
}
return {
nodes: resultNodes,
siblingsGroups,
activeNodeId
}
}
/**
* Get branch messages for conversation view
*/
async getBranchMessages(
topicId: string,
options: { nodeId?: string; beforeNodeId?: string; limit?: number; includeSiblings?: boolean } = {}
): Promise<BranchMessagesResponse> {
const db = dbService.getDb()
const { limit = DEFAULT_LIMIT, includeSiblings = true } = options
// Get topic
const [topic] = await db.select().from(topicTable).where(eq(topicTable.id, topicId)).limit(1)
if (!topic) {
throw DataApiErrorFactory.notFound('Topic', topicId)
}
// Get all messages for this topic
const allMessages = await db
.select()
.from(messageTable)
.where(and(eq(messageTable.topicId, topicId), isNull(messageTable.deletedAt)))
if (allMessages.length === 0) {
return { messages: [], activeNodeId: null }
}
// Check for data inconsistency
if (!topic.activeNodeId) {
throw DataApiErrorFactory.dataInconsistent('Topic', 'has messages but no active node')
}
const nodeId = options.nodeId || topic.activeNodeId
const messagesById = new Map<string, Message>()
for (const row of allMessages) {
messagesById.set(row.id, rowToMessage(row))
}
// Build path from root to nodeId
const path: string[] = []
let currentId: string | null = nodeId
while (currentId) {
path.unshift(currentId)
const message = messagesById.get(currentId)
if (!message) {
throw DataApiErrorFactory.notFound('Message', currentId)
}
currentId = message.parentId
}
// Apply pagination
let startIndex = 0
if (options.beforeNodeId) {
const beforeIndex = path.indexOf(options.beforeNodeId)
if (beforeIndex === -1) {
throw DataApiErrorFactory.notFound('Message', options.beforeNodeId)
}
startIndex = Math.max(0, beforeIndex - limit)
} else {
startIndex = Math.max(0, path.length - limit)
}
const endIndex = options.beforeNodeId ? path.indexOf(options.beforeNodeId) : path.length
const resultPath = path.slice(startIndex, endIndex)
// Build result with optional siblings
const result: BranchMessage[] = []
for (const msgId of resultPath) {
const message = messagesById.get(msgId)!
let siblingsGroup: Message[] | undefined
if (includeSiblings && message.siblingsGroupId !== 0) {
// Find siblings with same parentId and siblingsGroupId
siblingsGroup = allMessages
.filter(
(row) =>
row.parentId === message.parentId &&
row.siblingsGroupId === message.siblingsGroupId &&
row.id !== message.id
)
.map(rowToMessage)
}
result.push({
message,
siblingsGroup
})
}
return {
messages: result,
activeNodeId: topic.activeNodeId
}
}
/**
* Get a single message by ID
*/
async getById(id: string): Promise<Message> {
const db = dbService.getDb()
const [row] = await db
.select()
.from(messageTable)
.where(and(eq(messageTable.id, id), isNull(messageTable.deletedAt)))
.limit(1)
if (!row) {
throw DataApiErrorFactory.notFound('Message', id)
}
return rowToMessage(row)
}
/**
* Create a new message
*/
async create(topicId: string, dto: CreateMessageDto): Promise<Message> {
const db = dbService.getDb()
// Verify topic exists
const [topic] = await db.select().from(topicTable).where(eq(topicTable.id, topicId)).limit(1)
if (!topic) {
throw DataApiErrorFactory.notFound('Topic', topicId)
}
// Verify parent exists if specified
if (dto.parentId) {
const [parent] = await db
.select()
.from(messageTable)
.where(and(eq(messageTable.id, dto.parentId), isNull(messageTable.deletedAt)))
.limit(1)
if (!parent) {
throw DataApiErrorFactory.notFound('Message', dto.parentId)
}
}
const now = Date.now()
const id = uuidv7()
await db.insert(messageTable).values({
id,
topicId,
parentId: dto.parentId,
role: dto.role,
data: dto.data,
status: dto.status || 'success',
siblingsGroupId: dto.siblingsGroupId ?? 0,
assistantId: dto.assistantId,
assistantMeta: dto.assistantMeta,
modelId: dto.modelId,
modelMeta: dto.modelMeta,
traceId: dto.traceId,
stats: dto.stats,
createdAt: now,
updatedAt: now
})
logger.info('Created message', { id, topicId, role: dto.role })
return this.getById(id)
}
/**
* Update a message
*/
async update(id: string, dto: UpdateMessageDto): Promise<Message> {
const db = dbService.getDb()
// Get existing message
const existing = await this.getById(id)
// Check for cycle if moving to new parent
if (dto.parentId !== undefined && dto.parentId !== existing.parentId) {
if (dto.parentId !== null) {
// Check that new parent is not a descendant
const descendants = await this.getDescendantIds(id)
if (descendants.includes(dto.parentId)) {
throw DataApiErrorFactory.invalidOperation('move message', 'would create cycle')
}
// Verify new parent exists
const [parent] = await db
.select()
.from(messageTable)
.where(and(eq(messageTable.id, dto.parentId), isNull(messageTable.deletedAt)))
.limit(1)
if (!parent) {
throw DataApiErrorFactory.notFound('Message', dto.parentId)
}
}
}
// Build update object
const updates: Partial<typeof messageTable.$inferInsert> = {
updatedAt: Date.now()
}
if (dto.data !== undefined) updates.data = dto.data
if (dto.parentId !== undefined) updates.parentId = dto.parentId
if (dto.siblingsGroupId !== undefined) updates.siblingsGroupId = dto.siblingsGroupId
if (dto.status !== undefined) updates.status = dto.status
await db.update(messageTable).set(updates).where(eq(messageTable.id, id))
logger.info('Updated message', { id, changes: Object.keys(dto) })
return this.getById(id)
}
/**
* Delete a message
*/
async delete(id: string, cascade: boolean = false): Promise<{ deletedIds: string[]; reparentedIds?: string[] }> {
const db = dbService.getDb()
// Get the message
const message = await this.getById(id)
// Check if it's a root message
const isRoot = message.parentId === null
if (isRoot && !cascade) {
throw DataApiErrorFactory.invalidOperation('delete root message', 'cascade=true required')
}
const now = Date.now()
if (cascade) {
// Get all descendants
const descendantIds = await this.getDescendantIds(id)
const allIds = [id, ...descendantIds]
// Soft delete all
await db.update(messageTable).set({ deletedAt: now }).where(inArray(messageTable.id, allIds))
logger.info('Cascade deleted messages', { rootId: id, count: allIds.length })
return { deletedIds: allIds }
} else {
// Reparent children to this message's parent
const children = await db
.select({ id: messageTable.id })
.from(messageTable)
.where(and(eq(messageTable.parentId, id), isNull(messageTable.deletedAt)))
const childIds = children.map((c) => c.id)
if (childIds.length > 0) {
await db
.update(messageTable)
.set({ parentId: message.parentId, updatedAt: now })
.where(inArray(messageTable.id, childIds))
}
// Soft delete this message
await db.update(messageTable).set({ deletedAt: now }).where(eq(messageTable.id, id))
logger.info('Deleted message with reparenting', { id, reparentedCount: childIds.length })
return { deletedIds: [id], reparentedIds: childIds }
}
}
/**
* Get all descendant IDs of a message
*/
private async getDescendantIds(id: string): Promise<string[]> {
const db = dbService.getDb()
// Use recursive query to get all descendants
const result = await db.all<{ id: string }>(sql`
WITH RECURSIVE descendants AS (
SELECT id FROM message WHERE parent_id = ${id} AND deleted_at IS NULL
UNION ALL
SELECT m.id FROM message m
INNER JOIN descendants d ON m.parent_id = d.id
WHERE m.deleted_at IS NULL
)
SELECT id FROM descendants
`)
return result.map((r) => r.id)
}
/**
* Get path from root to a node
*/
async getPathToNode(nodeId: string): Promise<Message[]> {
const path: Message[] = []
let currentId: string | null = nodeId
while (currentId) {
const message = await this.getById(currentId)
path.unshift(message)
currentId = message.parentId
}
return path
}
}
export const messageService = MessageService.getInstance()

View File

@ -0,0 +1,244 @@
/**
* Topic Service - handles topic CRUD and branch switching
*
* Provides business logic for:
* - Topic CRUD operations
* - Fork from existing conversation
* - Active node switching
*/
import { dbService } from '@data/db/DbService'
import { messageTable } from '@data/db/schemas/message'
import { topicTable } from '@data/db/schemas/topic'
import { loggerService } from '@logger'
import { DataApiErrorFactory } from '@shared/data/api'
import type { CreateTopicDto, UpdateTopicDto } from '@shared/data/api/schemas/topics'
import type { Topic } from '@shared/data/types/topic'
import { and, eq, isNull } from 'drizzle-orm'
import { v4 as uuidv4, v7 as uuidv7 } from 'uuid'
import { messageService } from './MessageService'
const logger = loggerService.withContext('TopicService')
/**
* Convert database row to Topic entity
*/
function rowToTopic(row: typeof topicTable.$inferSelect): Topic {
return {
id: row.id,
name: row.name,
isNameManuallyEdited: row.isNameManuallyEdited ?? false,
assistantId: row.assistantId,
assistantMeta: row.assistantMeta,
prompt: row.prompt,
activeNodeId: row.activeNodeId,
groupId: row.groupId,
sortOrder: row.sortOrder ?? 0,
isPinned: row.isPinned ?? false,
pinnedOrder: row.pinnedOrder ?? 0,
createdAt: row.createdAt ? new Date(row.createdAt).toISOString() : new Date().toISOString(),
updatedAt: row.updatedAt ? new Date(row.updatedAt).toISOString() : new Date().toISOString()
}
}
export class TopicService {
private static instance: TopicService
private constructor() {}
public static getInstance(): TopicService {
if (!TopicService.instance) {
TopicService.instance = new TopicService()
}
return TopicService.instance
}
/**
* Get a topic by ID
*/
async getById(id: string): Promise<Topic> {
const db = dbService.getDb()
const [row] = await db
.select()
.from(topicTable)
.where(and(eq(topicTable.id, id), isNull(topicTable.deletedAt)))
.limit(1)
if (!row) {
throw DataApiErrorFactory.notFound('Topic', id)
}
return rowToTopic(row)
}
/**
* Create a new topic
*/
async create(dto: CreateTopicDto): Promise<Topic> {
const db = dbService.getDb()
const now = Date.now()
const id = uuidv4()
// If forking from existing node, copy the path
let activeNodeId: string | null = null
if (dto.sourceNodeId) {
// Verify source node exists
try {
await messageService.getById(dto.sourceNodeId)
} catch {
throw DataApiErrorFactory.notFound('Message', dto.sourceNodeId)
}
// Get path from root to source node
const path = await messageService.getPathToNode(dto.sourceNodeId)
// Create new topic first
await db.insert(topicTable).values({
id,
name: dto.name,
assistantId: dto.assistantId,
assistantMeta: dto.assistantMeta,
prompt: dto.prompt,
groupId: dto.groupId,
createdAt: now,
updatedAt: now
})
// Copy messages with new IDs
const idMapping = new Map<string, string>()
for (const message of path) {
const newId = uuidv7()
const newParentId = message.parentId ? idMapping.get(message.parentId) || null : null
idMapping.set(message.id, newId)
await db.insert(messageTable).values({
id: newId,
topicId: id,
parentId: newParentId,
role: message.role,
data: message.data,
status: message.status,
siblingsGroupId: 0, // Simplify multi-model to normal node
assistantId: message.assistantId,
assistantMeta: message.assistantMeta,
modelId: message.modelId,
modelMeta: message.modelMeta,
traceId: null, // Clear trace ID
stats: null, // Clear stats
createdAt: now,
updatedAt: now
})
// Last node becomes the active node
activeNodeId = newId
}
// Update topic with active node
await db.update(topicTable).set({ activeNodeId }).where(eq(topicTable.id, id))
logger.info('Created topic by forking', { id, sourceNodeId: dto.sourceNodeId, messageCount: path.length })
} else {
// Create empty topic
await db.insert(topicTable).values({
id,
name: dto.name,
assistantId: dto.assistantId,
assistantMeta: dto.assistantMeta,
prompt: dto.prompt,
groupId: dto.groupId,
createdAt: now,
updatedAt: now
})
logger.info('Created empty topic', { id })
}
return this.getById(id)
}
/**
* Update a topic
*/
async update(id: string, dto: UpdateTopicDto): Promise<Topic> {
const db = dbService.getDb()
// Verify topic exists
await this.getById(id)
// Build update object
const updates: Partial<typeof topicTable.$inferInsert> = {
updatedAt: Date.now()
}
if (dto.name !== undefined) updates.name = dto.name
if (dto.isNameManuallyEdited !== undefined) updates.isNameManuallyEdited = dto.isNameManuallyEdited
if (dto.assistantId !== undefined) updates.assistantId = dto.assistantId
if (dto.assistantMeta !== undefined) updates.assistantMeta = dto.assistantMeta
if (dto.prompt !== undefined) updates.prompt = dto.prompt
if (dto.groupId !== undefined) updates.groupId = dto.groupId
if (dto.sortOrder !== undefined) updates.sortOrder = dto.sortOrder
if (dto.isPinned !== undefined) updates.isPinned = dto.isPinned
if (dto.pinnedOrder !== undefined) updates.pinnedOrder = dto.pinnedOrder
await db.update(topicTable).set(updates).where(eq(topicTable.id, id))
logger.info('Updated topic', { id, changes: Object.keys(dto) })
return this.getById(id)
}
/**
* Delete a topic and all its messages
*/
async delete(id: string): Promise<void> {
const db = dbService.getDb()
// Verify topic exists
await this.getById(id)
const now = Date.now()
// Soft delete all messages
await db.update(messageTable).set({ deletedAt: now }).where(eq(messageTable.topicId, id))
// Soft delete topic
await db.update(topicTable).set({ deletedAt: now }).where(eq(topicTable.id, id))
logger.info('Deleted topic', { id })
}
/**
* Set the active node for a topic
*/
async setActiveNode(topicId: string, nodeId: string): Promise<{ activeNodeId: string }> {
const db = dbService.getDb()
// Verify topic exists
await this.getById(topicId)
// Verify node exists and belongs to this topic
const [message] = await db
.select()
.from(messageTable)
.where(and(eq(messageTable.id, nodeId), eq(messageTable.topicId, topicId), isNull(messageTable.deletedAt)))
.limit(1)
if (!message) {
throw DataApiErrorFactory.notFound('Message', nodeId)
}
// Update active node
await db.update(topicTable).set({ activeNodeId: nodeId, updatedAt: Date.now() }).where(eq(topicTable.id, topicId))
logger.info('Set active node', { topicId, nodeId })
return { activeNodeId: nodeId }
}
}
export const topicService = TopicService.getInstance()