Merge branch 'feat/agents-new' of github.com:CherryHQ/cherry-studio into feat/agents-new

This commit is contained in:
icarus 2025-09-18 19:04:14 +08:00
commit eaf302bb40
8 changed files with 415 additions and 136 deletions

View File

@ -0,0 +1 @@
ALTER TABLE `session_messages` ADD `agent_session_id` text DEFAULT '';

View File

@ -0,0 +1,339 @@
{
"version": "6",
"dialect": "sqlite",
"id": "dabab6db-a2cd-4e96-b06e-6cb87d445a87",
"prevId": "35efb412-0230-4767-9c76-7b7c4d40369f",
"tables": {
"agents": {
"name": "agents",
"columns": {
"id": {
"name": "id",
"type": "text",
"primaryKey": true,
"notNull": true,
"autoincrement": false
},
"type": {
"name": "type",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"name": {
"name": "name",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"description": {
"name": "description",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"accessible_paths": {
"name": "accessible_paths",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"instructions": {
"name": "instructions",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"model": {
"name": "model",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"plan_model": {
"name": "plan_model",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"small_model": {
"name": "small_model",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"mcps": {
"name": "mcps",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"allowed_tools": {
"name": "allowed_tools",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"configuration": {
"name": "configuration",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"created_at": {
"name": "created_at",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"updated_at": {
"name": "updated_at",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
}
},
"indexes": {},
"foreignKeys": {},
"compositePrimaryKeys": {},
"uniqueConstraints": {},
"checkConstraints": {}
},
"session_messages": {
"name": "session_messages",
"columns": {
"id": {
"name": "id",
"type": "integer",
"primaryKey": true,
"notNull": true,
"autoincrement": true
},
"session_id": {
"name": "session_id",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"role": {
"name": "role",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"content": {
"name": "content",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"agent_session_id": {
"name": "agent_session_id",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false,
"default": "''"
},
"metadata": {
"name": "metadata",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"created_at": {
"name": "created_at",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"updated_at": {
"name": "updated_at",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
}
},
"indexes": {},
"foreignKeys": {},
"compositePrimaryKeys": {},
"uniqueConstraints": {},
"checkConstraints": {}
},
"migrations": {
"name": "migrations",
"columns": {
"version": {
"name": "version",
"type": "integer",
"primaryKey": true,
"notNull": true,
"autoincrement": false
},
"tag": {
"name": "tag",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"executed_at": {
"name": "executed_at",
"type": "integer",
"primaryKey": false,
"notNull": true,
"autoincrement": false
}
},
"indexes": {},
"foreignKeys": {},
"compositePrimaryKeys": {},
"uniqueConstraints": {},
"checkConstraints": {}
},
"sessions": {
"name": "sessions",
"columns": {
"id": {
"name": "id",
"type": "text",
"primaryKey": true,
"notNull": true,
"autoincrement": false
},
"agent_type": {
"name": "agent_type",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"agent_id": {
"name": "agent_id",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"name": {
"name": "name",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"description": {
"name": "description",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"accessible_paths": {
"name": "accessible_paths",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"instructions": {
"name": "instructions",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"model": {
"name": "model",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"plan_model": {
"name": "plan_model",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"small_model": {
"name": "small_model",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"mcps": {
"name": "mcps",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"allowed_tools": {
"name": "allowed_tools",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"configuration": {
"name": "configuration",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"created_at": {
"name": "created_at",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"updated_at": {
"name": "updated_at",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
}
},
"indexes": {},
"foreignKeys": {},
"compositePrimaryKeys": {},
"uniqueConstraints": {},
"checkConstraints": {}
}
},
"views": {},
"enums": {},
"_meta": {
"schemas": {},
"tables": {},
"columns": {}
},
"internal": {
"indexes": {}
}
}

View File

@ -8,6 +8,13 @@
"when": 1758091173882, "when": 1758091173882,
"tag": "0000_confused_wendigo", "tag": "0000_confused_wendigo",
"breakpoints": true "breakpoints": true
},
{
"idx": 1,
"version": "6",
"when": 1758187378775,
"tag": "0001_woozy_captain_flint",
"breakpoints": true
} }
] ]
} }

View File

@ -35,9 +35,6 @@ export const createMessage = async (req: Request, res: Response): Promise<void>
logger.info(`Creating streaming message for session: ${sessionId}`) logger.info(`Creating streaming message for session: ${sessionId}`)
logger.debug('Streaming message data:', messageData) logger.debug('Streaming message data:', messageData)
// Step 1: Save user message first
const userMessage = await sessionMessageService.saveUserMessage(sessionId, messageData.content)
// Set SSE headers // Set SSE headers
res.setHeader('Content-Type', 'text/event-stream') res.setHeader('Content-Type', 'text/event-stream')
res.setHeader('Cache-Control', 'no-cache') res.setHeader('Cache-Control', 'no-cache')
@ -45,7 +42,8 @@ export const createMessage = async (req: Request, res: Response): Promise<void>
res.setHeader('Access-Control-Allow-Origin', '*') res.setHeader('Access-Control-Allow-Origin', '*')
res.setHeader('Access-Control-Allow-Headers', 'Cache-Control') res.setHeader('Access-Control-Allow-Headers', 'Cache-Control')
const messageStream = sessionMessageService.createSessionMessage(session, messageData, userMessage.id)
const messageStream = sessionMessageService.createSessionMessage(session, messageData)
// Track stream lifecycle so we keep the SSE connection open until persistence finishes // Track stream lifecycle so we keep the SSE connection open until persistence finishes
let responseEnded = false let responseEnded = false

View File

@ -8,6 +8,7 @@ export const sessionMessagesTable = sqliteTable('session_messages', {
session_id: text('session_id').notNull(), session_id: text('session_id').notNull(),
role: text('role').notNull(), // 'user', 'agent', 'system', 'tool' role: text('role').notNull(), // 'user', 'agent', 'system', 'tool'
content: text('content').notNull(), // JSON structured data content: text('content').notNull(), // JSON structured data
agent_session_id: text('agent_session_id').default(''),
metadata: text('metadata'), // JSON metadata (optional) metadata: text('metadata'), // JSON metadata (optional)
created_at: text('created_at').notNull(), created_at: text('created_at').notNull(),
updated_at: text('updated_at').notNull() updated_at: text('updated_at').notNull()

View File

@ -1,6 +1,5 @@
import { EventEmitter } from 'node:events' import { EventEmitter } from 'node:events'
import { PermissionMode } from '@anthropic-ai/claude-code'
import { loggerService } from '@logger' import { loggerService } from '@logger'
import type { import type {
AgentSessionMessageEntity, AgentSessionMessageEntity,
@ -33,17 +32,6 @@ export async function chunksToModelMessages(
return convertToModelMessages(uiMessages) // -> ModelMessage[] return convertToModelMessages(uiMessages) // -> ModelMessage[]
} }
// Utility function to normalize content to ModelMessage
function normalizeModelMessage(content: string | ModelMessage): ModelMessage {
if (typeof content === 'string') {
return {
role: 'user',
content: content
}
}
return content
}
// Ensure errors emitted through SSE are serializable // Ensure errors emitted through SSE are serializable
function serializeError(error: unknown): { message: string; name?: string; stack?: string } { function serializeError(error: unknown): { message: string; name?: string; stack?: string } {
if (error instanceof Error) { if (error instanceof Error) {
@ -63,52 +51,15 @@ function serializeError(error: unknown): { message: string; name?: string; stack
} }
} }
// Interface for persistence context
interface PersistContext {
session: GetAgentSessionResponse
accumulator: ChunkAccumulator
userMessageId: number
}
// Chunk accumulator class to collect and reconstruct streaming data // Chunk accumulator class to collect and reconstruct streaming data
class ChunkAccumulator { class ChunkAccumulator {
private streamedChunks: UIMessageChunk[] = [] private streamedChunks: UIMessageChunk[] = []
private rawAgentMessages: any[] = []
private agentResult: any = null
private agentType: string = 'unknown' private agentType: string = 'unknown'
private uniqueIds: Set<string> = new Set()
addChunk(chunk: UIMessageChunk): void { addChunk(chunk: UIMessageChunk): void {
this.streamedChunks.push(chunk) this.streamedChunks.push(chunk)
} }
addRawMessage(message: any): void {
if (message.uuid && this.uniqueIds.has(message.uuid)) {
// Duplicate message based on uuid; skip adding
return
}
if (message.uuid) {
this.uniqueIds.add(message.uuid)
}
this.rawAgentMessages.push(message)
}
setAgentResult(result: any): void {
this.agentResult = result
if (result?.agentType) {
this.agentType = result.agentType
}
}
buildStructuredContent() {
return {
aiSDKChunks: this.streamedChunks,
rawAgentMessages: this.rawAgentMessages,
agentResult: this.agentResult,
agentType: this.agentType
}
}
// Create a ReadableStream from accumulated chunks // Create a ReadableStream from accumulated chunks
createChunkStream(): ReadableStream<UIMessageChunk> { createChunkStream(): ReadableStream<UIMessageChunk> {
const chunks = [...this.streamedChunks] const chunks = [...this.streamedChunks]
@ -162,14 +113,6 @@ class ChunkAccumulator {
return message as ModelMessage return message as ModelMessage
} }
getChunkCount(): number {
return this.streamedChunks.length
}
getRawMessageCount(): number {
return this.rawAgentMessages.length
}
getAgentType(): string { getAgentType(): string {
return this.agentType return this.agentType
} }
@ -235,68 +178,65 @@ export class SessionMessageService extends BaseService {
return { messages, total } return { messages, total }
} }
async saveUserMessage(sessionId: string, content: ModelMessage | string): Promise<AgentSessionMessageEntity> { async saveUserMessage(
tx: any,
sessionId: string,
prompt: string,
agentSessionId: string
): Promise<AgentSessionMessageEntity> {
this.ensureInitialized() this.ensureInitialized()
const now = new Date().toISOString() const now = new Date().toISOString()
const userContent: ModelMessage = normalizeModelMessage(content)
const insertData: InsertSessionMessageRow = { const insertData: InsertSessionMessageRow = {
session_id: sessionId, session_id: sessionId,
role: 'user', role: 'user',
content: JSON.stringify(userContent), content: prompt,
metadata: JSON.stringify({ agent_session_id: agentSessionId,
timestamp: now,
source: 'api'
}),
created_at: now, created_at: now,
updated_at: now updated_at: now
} }
const [saved] = await this.database.insert(sessionMessagesTable).values(insertData).returning() const [saved] = await tx.insert(sessionMessagesTable).values(insertData).returning()
return this.deserializeSessionMessage(saved) as AgentSessionMessageEntity return this.deserializeSessionMessage(saved) as AgentSessionMessageEntity
} }
createSessionMessage( createSessionMessage(session: GetAgentSessionResponse, messageData: CreateSessionMessageRequest): EventEmitter {
session: GetAgentSessionResponse,
messageData: CreateSessionMessageRequest,
userMessageId: number
): EventEmitter {
this.ensureInitialized() this.ensureInitialized()
// Create a new EventEmitter to manage the session message lifecycle // Create a new EventEmitter to manage the session message lifecycle
const sessionStream = new EventEmitter() const sessionStream = new EventEmitter()
// No parent validation needed, start immediately // No parent validation needed, start immediately
this.startSessionMessageStream(session, messageData, sessionStream, userMessageId) this.startSessionMessageStream(session, messageData, sessionStream)
return sessionStream return sessionStream
} }
private startSessionMessageStream( private async startSessionMessageStream(
session: GetAgentSessionResponse, session: GetAgentSessionResponse,
req: CreateSessionMessageRequest, req: CreateSessionMessageRequest,
sessionStream: EventEmitter, sessionStream: EventEmitter
userMessageId: number ): Promise<void> {
): void {
const previousMessages = session.messages || [] const previousMessages = session.messages || []
let session_id: string = '' let agentSessionId: string = ''
if (previousMessages.length > 0) { if (previousMessages.length > 0) {
session_id = previousMessages[0].session_id agentSessionId = previousMessages[previousMessages.length - 1].agent_session_id
} }
logger.debug('Session Message stream message data:', { message: req, session_id }) logger.debug('Session Message stream message data:', { message: req, session_id: agentSessionId })
if (session.agent_type !== 'claude-code') { if (session.agent_type !== 'claude-code') {
// TODO: Implement support for other agent types
logger.error('Unsupported agent type for streaming:', { agent_type: session.agent_type }) logger.error('Unsupported agent type for streaming:', { agent_type: session.agent_type })
throw new Error('Unsupported agent type for streaming') throw new Error('Unsupported agent type for streaming')
} }
let newAgentSessionId = ''
// Create the streaming agent invocation (using invokeStream for streaming) // Create the streaming agent invocation (using invokeStream for streaming)
const claudeStream = this.cc.invoke(req.content, session.accessible_paths[0], session_id, { const claudeStream = this.cc.invoke(req.content, session.accessible_paths[0], agentSessionId, {
permissionMode: (session.configuration?.permissionMode as PermissionMode) || 'default', permissionMode: session.configuration?.permission_mode,
maxTurns: (session.configuration?.maxTurns as number) || 10 maxTurns: session.configuration?.max_turns
}) })
// Use chunk accumulator to manage streaming data // Use chunk accumulator to manage streaming data
@ -310,12 +250,10 @@ export class SessionMessageService extends BaseService {
// Forward UIMessageChunk directly and collect raw agent messages // Forward UIMessageChunk directly and collect raw agent messages
if (event.chunk) { if (event.chunk) {
const chunk = event.chunk as UIMessageChunk const chunk = event.chunk as UIMessageChunk
accumulator.addChunk(chunk) if (chunk.type === 'start' && chunk.messageId) {
newAgentSessionId = chunk.messageId
// Collect raw agent message if available (agent-agnostic)
if (event.rawAgentMessage) {
accumulator.addRawMessage(event.rawAgentMessage)
} }
accumulator.addChunk(chunk)
sessionStream.emit('data', { sessionStream.emit('data', {
type: 'chunk', type: 'chunk',
@ -328,27 +266,10 @@ export class SessionMessageService extends BaseService {
case 'error': { case 'error': {
const underlyingError = event.error || (event.data?.stderr ? new Error(event.data.stderr) : undefined) const underlyingError = event.error || (event.data?.stderr ? new Error(event.data.stderr) : undefined)
const persistScheduled = accumulator.getChunkCount() > 0
if (persistScheduled) {
// Try to save partial state with error metadata when possible
accumulator.setAgentResult({
error: serializeError(underlyingError),
agentType: 'claude-code',
incomplete: true
})
void this.persistSessionMessageAsync({
session,
accumulator,
userMessageId
})
}
sessionStream.emit('data', { sessionStream.emit('data', {
type: 'error', type: 'error',
error: serializeError(underlyingError), error: serializeError(underlyingError)
persistScheduled
}) })
// Always emit a finish chunk at the end // Always emit a finish chunk at the end
sessionStream.emit('data', { sessionStream.emit('data', {
@ -358,19 +279,15 @@ export class SessionMessageService extends BaseService {
} }
case 'complete': { case 'complete': {
// Extract additional raw agent messages from agentResult if available
if (event.agentResult?.rawSDKMessages) {
event.agentResult.rawSDKMessages.forEach((msg: any) => accumulator.addRawMessage(msg))
}
// Set the agent result in the accumulator
accumulator.setAgentResult(event.agentResult)
// Then handle async persistence // Then handle async persistence
void this.persistSessionMessageAsync({ this.database.transaction(async (tx) => {
session, await this.saveUserMessage(tx, session.id, req.content, newAgentSessionId)
accumulator, await this.persistSessionMessageAsync({
userMessageId tx,
session,
accumulator,
agentSessionId: newAgentSessionId
})
}) })
// Always emit a finish chunk at the end // Always emit a finish chunk at the end
sessionStream.emit('data', { sessionStream.emit('data', {
@ -395,7 +312,17 @@ export class SessionMessageService extends BaseService {
}) })
} }
private async persistSessionMessageAsync({ session, accumulator, userMessageId }: PersistContext) { private async persistSessionMessageAsync({
tx,
session,
accumulator,
agentSessionId
}: {
tx: any
session: GetAgentSessionResponse
accumulator: ChunkAccumulator
agentSessionId: string
}) {
if (!session?.id) { if (!session?.id) {
const missingSessionError = new Error('Missing session_id for persisted message') const missingSessionError = new Error('Missing session_id for persisted message')
logger.error('error persisting session message', { error: missingSessionError }) logger.error('error persisting session message', { error: missingSessionError })
@ -404,7 +331,6 @@ export class SessionMessageService extends BaseService {
const sessionId = session.id const sessionId = session.id
const now = new Date().toISOString() const now = new Date().toISOString()
const structured = accumulator.buildStructuredContent()
try { try {
// Use chunksToModelMessages to convert chunks to ModelMessages // Use chunksToModelMessages to convert chunks to ModelMessages
@ -413,24 +339,16 @@ export class SessionMessageService extends BaseService {
const modelMessage = const modelMessage =
modelMessages.length > 0 ? modelMessages[modelMessages.length - 1] : accumulator.toModelMessage('assistant') modelMessages.length > 0 ? modelMessages[modelMessages.length - 1] : accumulator.toModelMessage('assistant')
const metadata = {
userMessageId,
chunkCount: accumulator.getChunkCount(),
rawMessageCount: accumulator.getRawMessageCount(),
agentType: accumulator.getAgentType(),
completedAt: now
}
const insertData: InsertSessionMessageRow = { const insertData: InsertSessionMessageRow = {
session_id: sessionId, session_id: sessionId,
role: 'assistant', role: 'assistant',
content: JSON.stringify({ modelMessage, ...structured }), content: JSON.stringify(modelMessage),
metadata: JSON.stringify(metadata), agent_session_id: agentSessionId,
created_at: now, created_at: now,
updated_at: now updated_at: now
} }
await this.database.insert(sessionMessagesTable).values(insertData).returning() await tx.insert(sessionMessagesTable).values(insertData).returning()
logger.debug('Success Persisted session message') logger.debug('Success Persisted session message')
} catch (error) { } catch (error) {
logger.error('Failed to persist session message', { error }) logger.error('Failed to persist session message', { error })

View File

@ -53,7 +53,7 @@ class ClaudeCodeService implements AgentServiceInterface {
logger.info('Starting Claude Code SDK query', { logger.info('Starting Claude Code SDK query', {
prompt, prompt,
options: { cwd, maxTurns: options.maxTurns, permissionMode: options.permissionMode } options
}) })
// Start async processing // Start async processing
@ -62,6 +62,20 @@ class ClaudeCodeService implements AgentServiceInterface {
return aiStream return aiStream
} }
private async *userMessages(prompt: string) {
{
yield {
type: 'user' as const,
parent_tool_use_id: null,
session_id: '',
message: {
role: 'user' as const,
content: prompt
}
}
}
}
/** /**
* Process SDK query and emit stream events * Process SDK query and emit stream events
*/ */
@ -73,7 +87,7 @@ class ClaudeCodeService implements AgentServiceInterface {
try { try {
// Process streaming responses using SDK query // Process streaming responses using SDK query
for await (const message of query({ for await (const message of query({
prompt, prompt: this.userMessages(prompt),
options options
})) { })) {
if (hasCompleted) break if (hasCompleted) break

View File

@ -97,6 +97,7 @@ export const AgentSessionMessageEntitySchema = z.object({
// manual defined. may not synced with ai sdk definition // manual defined. may not synced with ai sdk definition
role: z.enum(['assistant', 'user', 'system', 'tool']), // 'assistant' | 'user' | 'system' | 'tool' role: z.enum(['assistant', 'user', 'system', 'tool']), // 'assistant' | 'user' | 'system' | 'tool'
content: modelMessageSchema, content: modelMessageSchema,
agent_session_id: z.string(), // agent session id, use to resume agent session
metadata: z.record(z.string(), z.any()).optional(), // Additional metadata (optional) metadata: z.record(z.string(), z.any()).optional(), // Additional metadata (optional)
created_at: z.iso.datetime(), // ISO timestamp created_at: z.iso.datetime(), // ISO timestamp
updated_at: z.iso.datetime() // ISO timestamp updated_at: z.iso.datetime() // ISO timestamp