diff --git a/src/main/apiServer/routes/agents/handlers/messages.ts b/src/main/apiServer/routes/agents/handlers/messages.ts index e290ec9de3..859329112e 100644 --- a/src/main/apiServer/routes/agents/handlers/messages.ts +++ b/src/main/apiServer/routes/agents/handlers/messages.ts @@ -17,7 +17,7 @@ const verifyAgentAndSession = async (agentId: string, sessionId: string) => { throw { status: 404, code: 'session_not_found', message: 'Session not found' } } - if (session.main_agent_id !== agentId) { + if (session.agent_id !== agentId) { throw { status: 404, code: 'session_not_found', message: 'Session not found for this agent' } } @@ -43,7 +43,7 @@ export const createMessageStream = async (req: Request, res: Response): Promise< res.setHeader('Access-Control-Allow-Headers', 'Cache-Control') // Send initial connection event - res.write('data: {"type":"connected"}\n\n') + res.write('data: {"type":"start"}\n\n') const messageStream = sessionMessageService.createSessionMessageStream(session, messageData) diff --git a/src/main/apiServer/routes/agents/handlers/sessions.ts b/src/main/apiServer/routes/agents/handlers/sessions.ts index 7d23297c89..aa09be0f75 100644 --- a/src/main/apiServer/routes/agents/handlers/sessions.ts +++ b/src/main/apiServer/routes/agents/handlers/sessions.ts @@ -78,7 +78,6 @@ export const getSession = async (req: Request, res: Response): Promise } // Verify session belongs to the agent - if (session.main_agent_id !== agentId) { logger.warn(`Session ${sessionId} does not belong to agent ${agentId}`) return res.status(404).json({ error: { @@ -121,7 +120,7 @@ export const updateSession = async (req: Request, res: Response): Promise B[API Handler] + B --> C[SessionMessageService] + C --> D[Agent Service] + D --> E[Agent Process] + E --> F[Raw Agent Output] + F --> G[Transform to AI SDK] + G --> H[Emit AgentStreamEvent] + H --> I[SessionMessageService] + I --> J[Store in Database] + I --> K[Forward to Client] + K --> L[UI Rendering] +``` + +## Message Transformation Process + +### Step 1: Raw Agent Message Generation + +Each agent generates messages in its native format: + +**Claude Code Example:** +```typescript +// SDKMessage from Claude Code CLI +{ + type: 'assistant', + uuid: 'msg_123', + session_id: 'session_456', + message: { + role: 'assistant', + content: [ + { type: 'text', text: 'Hello, I can help...' }, + { type: 'tool_use', id: 'tool_1', name: 'read_file', input: {...} } + ] + } +} +``` + +### Step 2: Transformation to AI SDK Format + +The agent service transforms native messages to AI SDK `UIMessageChunk`: + +```typescript +// In ClaudeCodeService +const emitChunks = (sdkMessage: SDKMessage) => { + // Transform to AI SDK format + const chunks = transformSDKMessageToUIChunk(sdkMessage) + + for (const chunk of chunks) { + stream.emit('data', { + type: 'chunk', + chunk, // AI SDK format + rawAgentMessage: sdkMessage // Preserve original + }) + } +} +``` + +**Transformed AI SDK Chunk:** +```typescript +{ + type: 'text-delta', + id: 'msg_123', + delta: 'Hello, I can help...', + providerMetadata: { + claudeCode: { + originalSDKMessage: {...}, + uuid: 'msg_123', + session_id: 'session_456' + } + } +} +``` + +### Step 3: Session Message Processing + +The SessionMessageService collects and processes events: + +```typescript +// Collect streaming data +const streamedChunks: UIMessageChunk[] = [] +const rawAgentMessages: any[] = [] + +claudeStream.on('data', async (event: AgentStreamEvent) => { + switch (event.type) { + case 'chunk': + streamedChunks.push(event.chunk) + if (event.rawAgentMessage) { + rawAgentMessages.push(event.rawAgentMessage) + } + // Forward to client + sessionStream.emit('data', { type: 'chunk', chunk: event.chunk }) + break + + case 'complete': + // Store complete structured data + const content = { + aiSDKChunks: streamedChunks, + rawAgentMessages: rawAgentMessages, + agentResult: event.agentResult, + agentType: event.agentResult?.agentType || 'unknown' + } + // Save to database... + break + } +}) +``` + +### Step 4: Client Streaming + +The API handler converts events to Server-Sent Events (SSE): + +```typescript +// In API handler +messageStream.on('data', (event: any) => { + switch (event.type) { + case 'chunk': + // Send AI SDK chunk as SSE + res.write(`data: ${JSON.stringify(event.chunk)}\n\n`) + break + case 'complete': + res.write('data: [DONE]\n\n') + res.end() + break + } +}) +``` + +## Adding New Agent Types + +To add support for a new agent (e.g., OpenAI): + +### 1. Create Agent Service + +```typescript +class OpenAIService implements AgentServiceInterface { + invokeStream(prompt: string, cwd: string, sessionId?: string, options?: any): AgentStream { + const stream = new OpenAIStream() + + // Call OpenAI API + const openaiResponse = await openai.chat.completions.create({ + messages: [{ role: 'user', content: prompt }], + stream: true + }) + + // Transform OpenAI format to AI SDK + for await (const chunk of openaiResponse) { + const aiSDKChunk = transformOpenAIToAISDK(chunk) + stream.emit('data', { + type: 'chunk', + chunk: aiSDKChunk, + rawAgentMessage: chunk // Preserve OpenAI format + }) + } + + return stream + } +} +``` + +### 2. Create Transform Function + +```typescript +function transformOpenAIToAISDK(openaiChunk: OpenAIChunk): UIMessageChunk { + return { + type: 'text-delta', + id: openaiChunk.id, + delta: openaiChunk.choices[0].delta.content, + providerMetadata: { + openai: { + original: openaiChunk, + model: openaiChunk.model + } + } + } +} +``` + +### 3. Register Agent Type + +Update the agent type enum and factory: + +```typescript +export type AgentType = 'claude-code' | 'openai' | 'anthropic-api' + +function createAgentService(type: AgentType): AgentServiceInterface { + switch (type) { + case 'claude-code': + return new ClaudeCodeService() + case 'openai': + return new OpenAIService() + // ... + } +} +``` + +## Benefits of This Architecture + +1. **Extensibility**: Easy to add new agent types without modifying core logic +2. **Data Integrity**: Raw agent data is never lost during transformation +3. **Debugging**: Complete message history available for troubleshooting +4. **Performance**: Streaming support for real-time responses +5. **Type Safety**: Strong interfaces prevent runtime errors +6. **UI Consistency**: All agents provide data in standard AI SDK format + +## Key Interfaces Reference + +### AgentStreamEvent +```typescript +interface AgentStreamEvent { + type: 'chunk' | 'error' | 'complete' + chunk?: UIMessageChunk + rawAgentMessage?: any + error?: Error + agentResult?: any +} +``` + +### SessionMessageEntity +```typescript +interface SessionMessageEntity { + id: number + session_id: string + parent_id?: number + role: 'user' | 'assistant' | 'system' | 'tool' + type: string + content: string | SessionMessageContent + metadata?: Record + created_at: string + updated_at: string +} +``` + +### SessionMessageContent +```typescript +interface SessionMessageContent { + aiSDKChunks: UIMessageChunk[] + rawAgentMessages: any[] + agentResult?: any + agentType: string +} +``` + +## Testing Strategy + +### Unit Tests +- Test each transform function independently +- Verify event emission sequences +- Validate data structure preservation + +### Integration Tests +- Test complete flow from input to database +- Verify streaming behavior +- Test error handling and recovery + +### Agent-Specific Tests +- Validate agent-specific transformations +- Test edge cases for each agent type +- Verify metadata preservation + +## Future Enhancements + +1. **Message Replay**: Ability to replay sessions from stored raw messages +2. **Format Migration**: Tools to migrate between agent formats +3. **Analytics**: Aggregate metrics from raw agent data +4. **Caching**: Cache transformed chunks for performance +5. **Compression**: Compress raw messages for storage efficiency + +## Conclusion + +This architecture provides a robust, extensible foundation for handling messages from multiple AI agents while maintaining data integrity and providing a consistent interface for the UI. The separation of concerns between agent-specific logic and core message handling ensures the system can evolve to support new agents and features without breaking existing functionality. diff --git a/src/main/services/agents/BaseService.ts b/src/main/services/agents/BaseService.ts index 52d29afc69..53830df243 100644 --- a/src/main/services/agents/BaseService.ts +++ b/src/main/services/agents/BaseService.ts @@ -26,6 +26,7 @@ export abstract class BaseService { protected static db: ReturnType | null = null protected static isInitialized = false protected static initializationPromise: Promise | null = null + protected jsonFields: string[] = ['built_in_tools', 'mcps', 'configuration', 'accessible_paths'] /** * Initialize database with retry logic and proper error handling @@ -116,9 +117,8 @@ export abstract class BaseService { protected serializeJsonFields(data: any): any { const serialized = { ...data } - const jsonFields = ['built_in_tools', 'mcps', 'knowledges', 'configuration', 'accessible_paths', 'sub_agent_ids'] - for (const field of jsonFields) { + for (const field of this.jsonFields) { if (serialized[field] !== undefined) { serialized[field] = Array.isArray(serialized[field]) || typeof serialized[field] === 'object' @@ -134,9 +134,8 @@ export abstract class BaseService { if (!data) return data const deserialized = { ...data } - const jsonFields = ['built_in_tools', 'mcps', 'knowledges', 'configuration', 'accessible_paths', 'sub_agent_ids'] - for (const field of jsonFields) { + for (const field of this.jsonFields) { if (deserialized[field] && typeof deserialized[field] === 'string') { try { deserialized[field] = JSON.parse(deserialized[field]) diff --git a/src/main/services/agents/database/drizzle/0000_bizarre_la_nuit.sql b/src/main/services/agents/database/drizzle/0000_dry_luke_cage.sql similarity index 63% rename from src/main/services/agents/database/drizzle/0000_bizarre_la_nuit.sql rename to src/main/services/agents/database/drizzle/0000_dry_luke_cage.sql index 11e60f74c2..09f1c74c36 100644 --- a/src/main/services/agents/database/drizzle/0000_bizarre_la_nuit.sql +++ b/src/main/services/agents/database/drizzle/0000_dry_luke_cage.sql @@ -1,20 +1,33 @@ CREATE TABLE `agents` ( `id` text PRIMARY KEY NOT NULL, - `type` text DEFAULT 'claude-code' NOT NULL, + `type` text NOT NULL, `name` text NOT NULL, `description` text, - `avatar` text, + `accessible_paths` text, `instructions` text, `model` text NOT NULL, `plan_model` text, `small_model` text, - `built_in_tools` text, `mcps` text, - `knowledges` text, + `allowed_tools` text, `configuration` text, + `created_at` text NOT NULL, + `updated_at` text NOT NULL +); +--> statement-breakpoint +CREATE TABLE `sessions` ( + `id` text PRIMARY KEY NOT NULL, + `agent_id` text NOT NULL, + `name` text NOT NULL, + `description` text, `accessible_paths` text, - `permission_mode` text DEFAULT 'default', - `max_steps` integer DEFAULT 10, + `instructions` text, + `model` text NOT NULL, + `plan_model` text, + `small_model` text, + `mcps` text, + `allowed_tools` text, + `configuration` text, `created_at` text NOT NULL, `updated_at` text NOT NULL ); @@ -22,33 +35,9 @@ CREATE TABLE `agents` ( CREATE TABLE `session_messages` ( `id` integer PRIMARY KEY AUTOINCREMENT NOT NULL, `session_id` text NOT NULL, - `parent_id` integer, `role` text NOT NULL, - `type` text NOT NULL, `content` text NOT NULL, `metadata` text, `created_at` text NOT NULL, `updated_at` text NOT NULL ); ---> statement-breakpoint -CREATE TABLE `sessions` ( - `id` text PRIMARY KEY NOT NULL, - `name` text, - `main_agent_id` text NOT NULL, - `sub_agent_ids` text, - `user_goal` text, - `status` text DEFAULT 'idle' NOT NULL, - `external_session_id` text, - `model` text, - `plan_model` text, - `small_model` text, - `built_in_tools` text, - `mcps` text, - `knowledges` text, - `configuration` text, - `accessible_paths` text, - `permission_mode` text DEFAULT 'default', - `max_steps` integer DEFAULT 10, - `created_at` text NOT NULL, - `updated_at` text NOT NULL -); diff --git a/src/main/services/agents/database/drizzle/meta/0000_snapshot.json b/src/main/services/agents/database/drizzle/meta/0000_snapshot.json index ce9720e751..5ddcc15d4f 100644 --- a/src/main/services/agents/database/drizzle/meta/0000_snapshot.json +++ b/src/main/services/agents/database/drizzle/meta/0000_snapshot.json @@ -1,7 +1,7 @@ { "version": "6", "dialect": "sqlite", - "id": "c8b65142-dcf4-4d20-8f0e-a17625b34fa7", + "id": "7ef99575-0fcf-471c-9da7-77e5cf8de6a2", "prevId": "00000000-0000-0000-0000-000000000000", "tables": { "agents": { @@ -19,8 +19,7 @@ "type": "text", "primaryKey": false, "notNull": true, - "autoincrement": false, - "default": "'claude-code'" + "autoincrement": false }, "name": { "name": "name", @@ -36,8 +35,8 @@ "notNull": false, "autoincrement": false }, - "avatar": { - "name": "avatar", + "accessible_paths": { + "name": "accessible_paths", "type": "text", "primaryKey": false, "notNull": false, @@ -71,13 +70,6 @@ "notNull": false, "autoincrement": false }, - "built_in_tools": { - "name": "built_in_tools", - "type": "text", - "primaryKey": false, - "notNull": false, - "autoincrement": false - }, "mcps": { "name": "mcps", "type": "text", @@ -85,8 +77,8 @@ "notNull": false, "autoincrement": false }, - "knowledges": { - "name": "knowledges", + "allowed_tools": { + "name": "allowed_tools", "type": "text", "primaryKey": false, "notNull": false, @@ -99,6 +91,58 @@ "notNull": false, "autoincrement": false }, + "created_at": { + "name": "created_at", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "updated_at": { + "name": "updated_at", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + } + }, + "indexes": {}, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "checkConstraints": {} + }, + "sessions": { + "name": "sessions", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true, + "autoincrement": false + }, + "agent_id": { + "name": "agent_id", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "name": { + "name": "name", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "description": { + "name": "description", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, "accessible_paths": { "name": "accessible_paths", "type": "text", @@ -106,21 +150,54 @@ "notNull": false, "autoincrement": false }, - "permission_mode": { - "name": "permission_mode", + "instructions": { + "name": "instructions", "type": "text", "primaryKey": false, "notNull": false, - "autoincrement": false, - "default": "'default'" + "autoincrement": false }, - "max_steps": { - "name": "max_steps", - "type": "integer", + "model": { + "name": "model", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "plan_model": { + "name": "plan_model", + "type": "text", "primaryKey": false, "notNull": false, - "autoincrement": false, - "default": 10 + "autoincrement": false + }, + "small_model": { + "name": "small_model", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "mcps": { + "name": "mcps", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "allowed_tools": { + "name": "allowed_tools", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "configuration": { + "name": "configuration", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false }, "created_at": { "name": "created_at", @@ -160,13 +237,6 @@ "notNull": true, "autoincrement": false }, - "parent_id": { - "name": "parent_id", - "type": "integer", - "primaryKey": false, - "notNull": false, - "autoincrement": false - }, "role": { "name": "role", "type": "text", @@ -174,13 +244,6 @@ "notNull": true, "autoincrement": false }, - "type": { - "name": "type", - "type": "text", - "primaryKey": false, - "notNull": true, - "autoincrement": false - }, "content": { "name": "content", "type": "text", @@ -215,152 +278,6 @@ "compositePrimaryKeys": {}, "uniqueConstraints": {}, "checkConstraints": {} - }, - "sessions": { - "name": "sessions", - "columns": { - "id": { - "name": "id", - "type": "text", - "primaryKey": true, - "notNull": true, - "autoincrement": false - }, - "name": { - "name": "name", - "type": "text", - "primaryKey": false, - "notNull": false, - "autoincrement": false - }, - "main_agent_id": { - "name": "main_agent_id", - "type": "text", - "primaryKey": false, - "notNull": true, - "autoincrement": false - }, - "sub_agent_ids": { - "name": "sub_agent_ids", - "type": "text", - "primaryKey": false, - "notNull": false, - "autoincrement": false - }, - "user_goal": { - "name": "user_goal", - "type": "text", - "primaryKey": false, - "notNull": false, - "autoincrement": false - }, - "status": { - "name": "status", - "type": "text", - "primaryKey": false, - "notNull": true, - "autoincrement": false, - "default": "'idle'" - }, - "external_session_id": { - "name": "external_session_id", - "type": "text", - "primaryKey": false, - "notNull": false, - "autoincrement": false - }, - "model": { - "name": "model", - "type": "text", - "primaryKey": false, - "notNull": false, - "autoincrement": false - }, - "plan_model": { - "name": "plan_model", - "type": "text", - "primaryKey": false, - "notNull": false, - "autoincrement": false - }, - "small_model": { - "name": "small_model", - "type": "text", - "primaryKey": false, - "notNull": false, - "autoincrement": false - }, - "built_in_tools": { - "name": "built_in_tools", - "type": "text", - "primaryKey": false, - "notNull": false, - "autoincrement": false - }, - "mcps": { - "name": "mcps", - "type": "text", - "primaryKey": false, - "notNull": false, - "autoincrement": false - }, - "knowledges": { - "name": "knowledges", - "type": "text", - "primaryKey": false, - "notNull": false, - "autoincrement": false - }, - "configuration": { - "name": "configuration", - "type": "text", - "primaryKey": false, - "notNull": false, - "autoincrement": false - }, - "accessible_paths": { - "name": "accessible_paths", - "type": "text", - "primaryKey": false, - "notNull": false, - "autoincrement": false - }, - "permission_mode": { - "name": "permission_mode", - "type": "text", - "primaryKey": false, - "notNull": false, - "autoincrement": false, - "default": "'default'" - }, - "max_steps": { - "name": "max_steps", - "type": "integer", - "primaryKey": false, - "notNull": false, - "autoincrement": false, - "default": 10 - }, - "created_at": { - "name": "created_at", - "type": "text", - "primaryKey": false, - "notNull": true, - "autoincrement": false - }, - "updated_at": { - "name": "updated_at", - "type": "text", - "primaryKey": false, - "notNull": true, - "autoincrement": false - } - }, - "indexes": {}, - "foreignKeys": {}, - "compositePrimaryKeys": {}, - "uniqueConstraints": {}, - "checkConstraints": {} } }, "views": {}, diff --git a/src/main/services/agents/database/drizzle/meta/_journal.json b/src/main/services/agents/database/drizzle/meta/_journal.json index 39a34ff84d..0f87e9a936 100644 --- a/src/main/services/agents/database/drizzle/meta/_journal.json +++ b/src/main/services/agents/database/drizzle/meta/_journal.json @@ -5,8 +5,8 @@ { "idx": 0, "version": "6", - "when": 1757946608023, - "tag": "0000_bizarre_la_nuit", + "when": 1758035192486, + "tag": "0000_dry_luke_cage", "breakpoints": true } ] diff --git a/src/main/services/agents/database/schema/agents.schema.ts b/src/main/services/agents/database/schema/agents.schema.ts index f92dd2787e..be8983c1fc 100644 --- a/src/main/services/agents/database/schema/agents.schema.ts +++ b/src/main/services/agents/database/schema/agents.schema.ts @@ -2,25 +2,26 @@ * Drizzle ORM schema for agents table */ -import { index, integer, sqliteTable, text } from 'drizzle-orm/sqlite-core' +import { index, sqliteTable, text } from 'drizzle-orm/sqlite-core' export const agentsTable = sqliteTable('agents', { id: text('id').primaryKey(), - type: text('type').notNull().default('claude-code'), + type: text('type').notNull(), name: text('name').notNull(), description: text('description'), - avatar: text('avatar'), + accessible_paths: text('accessible_paths'), // JSON array of directory paths the agent can access + instructions: text('instructions'), + model: text('model').notNull(), // Main model ID (required) plan_model: text('plan_model'), // Optional plan/thinking model ID small_model: text('small_model'), // Optional small/fast model ID - built_in_tools: text('built_in_tools'), // JSON array of built-in tool IDs + mcps: text('mcps'), // JSON array of MCP tool IDs - knowledges: text('knowledges'), // JSON array of enabled knowledge base IDs - configuration: text('configuration'), // JSON, extensible settings like temperature, top_p - accessible_paths: text('accessible_paths'), // JSON array of directory paths the agent can access - permission_mode: text('permission_mode').default('default'), // 'readOnly', 'acceptEdits', 'bypassPermissions' - max_steps: integer('max_steps').default(10), // Maximum number of steps the agent can take + allowed_tools: text('allowed_tools'), // JSON array of allowed tool IDs (whitelist) + + configuration: text('configuration'), // JSON, extensible settings + created_at: text('created_at').notNull(), updated_at: text('updated_at').notNull() }) @@ -28,10 +29,6 @@ export const agentsTable = sqliteTable('agents', { // Indexes for agents table export const agentsNameIdx = index('idx_agents_name').on(agentsTable.name) export const agentsTypeIdx = index('idx_agents_type').on(agentsTable.type) -export const agentsModelIdx = index('idx_agents_model').on(agentsTable.model) -export const agentsPlanModelIdx = index('idx_agents_plan_model').on(agentsTable.plan_model) -export const agentsSmallModelIdx = index('idx_agents_small_model').on(agentsTable.small_model) -export const agentsPermissionModeIdx = index('idx_agents_permission_mode').on(agentsTable.permission_mode) export const agentsCreatedAtIdx = index('idx_agents_created_at').on(agentsTable.created_at) export type AgentRow = typeof agentsTable.$inferSelect diff --git a/src/main/services/agents/database/schema/index.ts b/src/main/services/agents/database/schema/index.ts index b99c8e3104..f15f0c7014 100644 --- a/src/main/services/agents/database/schema/index.ts +++ b/src/main/services/agents/database/schema/index.ts @@ -4,3 +4,4 @@ export * from './agents.schema' export * from './sessions.schema' +export * from './messages.schema' diff --git a/src/main/services/agents/database/schema/messages.schema.ts b/src/main/services/agents/database/schema/messages.schema.ts new file mode 100644 index 0000000000..a544633da3 --- /dev/null +++ b/src/main/services/agents/database/schema/messages.schema.ts @@ -0,0 +1,28 @@ +import { foreignKey, index, integer, sqliteTable, text } from 'drizzle-orm/sqlite-core' +import { sessionsTable } from './sessions.schema' + +// session_messages table to log all messages, thoughts, actions, observations in a session +export const sessionMessagesTable = sqliteTable('session_messages', { + id: integer('id').primaryKey({ autoIncrement: true }), + session_id: text('session_id').notNull(), + role: text('role').notNull(), // 'user', 'agent', 'system', 'tool' + content: text('content').notNull(), // JSON structured data + metadata: text('metadata'), // JSON metadata (optional) + created_at: text('created_at').notNull(), + updated_at: text('updated_at').notNull() +}) + +// Indexes for session_messages table +export const sessionMessagesSessionIdIdx = index('idx_session_messages_session_id').on(sessionMessagesTable.session_id) +export const sessionMessagesCreatedAtIdx = index('idx_session_messages_created_at').on(sessionMessagesTable.created_at) +export const sessionMessagesUpdatedAtIdx = index('idx_session_messages_updated_at').on(sessionMessagesTable.updated_at) + +// Foreign keys for session_messages table +export const sessionMessagesFkSession = foreignKey({ + columns: [sessionMessagesTable.session_id], + foreignColumns: [sessionsTable.id], + name: 'fk_session_messages_session_id' +}).onDelete('cascade') + +export type SessionMessageRow = typeof sessionMessagesTable.$inferSelect +export type InsertSessionMessageRow = typeof sessionMessagesTable.$inferInsert diff --git a/src/main/services/agents/database/schema/sessions.schema.ts b/src/main/services/agents/database/schema/sessions.schema.ts index d564b8aa05..fee8983165 100644 --- a/src/main/services/agents/database/schema/sessions.schema.ts +++ b/src/main/services/agents/database/schema/sessions.schema.ts @@ -2,78 +2,42 @@ * Drizzle ORM schema for sessions and session_logs tables */ -import { foreignKey, index, integer, sqliteTable, text } from 'drizzle-orm/sqlite-core' +import { foreignKey, index, sqliteTable, text } from 'drizzle-orm/sqlite-core' +import { agentsTable } from './agents.schema' export const sessionsTable = sqliteTable('sessions', { id: text('id').primaryKey(), - name: text('name'), // Session name - main_agent_id: text('main_agent_id').notNull(), // Primary agent ID for the session - sub_agent_ids: text('sub_agent_ids'), // JSON array of sub-agent IDs involved in the session - user_goal: text('user_goal'), // Initial user goal for the session - status: text('status').notNull().default('idle'), // 'idle', 'running', 'completed', 'failed', 'stopped' - external_session_id: text('external_session_id'), // Agent session for external agent management/tracking - // AgentConfiguration fields that can override agent defaults - model: text('model'), // Main model ID (inherits from agent if null) + agent_id: text('agent_id').notNull(), // Primary agent ID for the session + name: text('name').notNull(), + description: text('description'), + accessible_paths: text('accessible_paths'), // JSON array of directory paths the agent can access + + instructions: text('instructions'), + + model: text('model').notNull(), // Main model ID (required) plan_model: text('plan_model'), // Optional plan/thinking model ID small_model: text('small_model'), // Optional small/fast model ID - built_in_tools: text('built_in_tools'), // JSON array of built-in tool IDs + mcps: text('mcps'), // JSON array of MCP tool IDs - knowledges: text('knowledges'), // JSON array of enabled knowledge base IDs - configuration: text('configuration'), // JSON, extensible settings like temperature, top_p - accessible_paths: text('accessible_paths'), // JSON array of directory paths the agent can access - permission_mode: text('permission_mode').default('default'), - max_steps: integer('max_steps').default(10), // Maximum number of steps the agent can take + allowed_tools: text('allowed_tools'), // JSON array of allowed tool IDs (whitelist) + + configuration: text('configuration'), // JSON, extensible settings + created_at: text('created_at').notNull(), updated_at: text('updated_at').notNull() }) -// Indexes for sessions table -export const sessionsNameIdx = index('idx_sessions_name').on(sessionsTable.name) -export const sessionsStatusIdx = index('idx_sessions_status').on(sessionsTable.status) -export const sessionsCreatedAtIdx = index('idx_sessions_created_at').on(sessionsTable.created_at) -export const sessionsExternalSessionIdIdx = index('idx_sessions_external_session_id').on( - sessionsTable.external_session_id -) -export const sessionsMainAgentIdIdx = index('idx_sessions_main_agent_id').on(sessionsTable.main_agent_id) -export const sessionsModelIdx = index('idx_sessions_model').on(sessionsTable.model) -export const sessionsPlanModelIdx = index('idx_sessions_plan_model').on(sessionsTable.plan_model) -export const sessionsSmallModelIdx = index('idx_sessions_small_model').on(sessionsTable.small_model) - -export const sessionMessagesTable = sqliteTable('session_messages', { - id: integer('id').primaryKey({ autoIncrement: true }), - session_id: text('session_id').notNull(), - parent_id: integer('parent_id'), // Foreign Key to session_logs.id, nullable for tree structure - role: text('role').notNull(), // 'user', 'agent', 'system', 'tool' - type: text('type').notNull(), // 'message', 'thought', 'action', 'observation', etc. - content: text('content').notNull(), // JSON structured data - metadata: text('metadata'), // JSON metadata (optional) - created_at: text('created_at').notNull(), - updated_at: text('updated_at').notNull() -}) - -// Indexes for session_messages table -export const sessionMessagesSessionIdIdx = index('idx_session_messages_session_id').on(sessionMessagesTable.session_id) -export const sessionMessagesParentIdIdx = index('idx_session_messages_parent_id').on(sessionMessagesTable.parent_id) -export const sessionMessagesRoleIdx = index('idx_session_messages_role').on(sessionMessagesTable.role) -export const sessionMessagesTypeIdx = index('idx_session_messages_type').on(sessionMessagesTable.type) -export const sessionMessagesCreatedAtIdx = index('idx_session_messages_created_at').on(sessionMessagesTable.created_at) -export const sessionMessagesUpdatedAtIdx = index('idx_session_messages_updated_at').on(sessionMessagesTable.updated_at) - -// Foreign keys for session_messages table -export const sessionMessagesFkSession = foreignKey({ - columns: [sessionMessagesTable.session_id], - foreignColumns: [sessionsTable.id], - name: 'fk_session_messages_session_id' +// Foreign keys for sessions table +export const sessionsFkAgent = foreignKey({ + columns: [sessionsTable.agent_id], + foreignColumns: [agentsTable.id], + name: 'fk_session_agent_id' }).onDelete('cascade') -export const sessionMessagesFkParent = foreignKey({ - columns: [sessionMessagesTable.parent_id], - foreignColumns: [sessionMessagesTable.id], - name: 'fk_session_messages_parent_id' -}) +// Indexes for sessions table +export const sessionsCreatedAtIdx = index('idx_sessions_created_at').on(sessionsTable.created_at) +export const sessionsMainAgentIdIdx = index('idx_sessions_agent_id').on(sessionsTable.agent_id) +export const sessionsModelIdx = index('idx_sessions_model').on(sessionsTable.model) export type SessionRow = typeof sessionsTable.$inferSelect export type InsertSessionRow = typeof sessionsTable.$inferInsert - -export type SessionMessageRow = typeof sessionMessagesTable.$inferSelect -export type InsertSessionMessageRow = typeof sessionMessagesTable.$inferInsert diff --git a/src/main/services/agents/interfaces/AgentStreamInterface.ts b/src/main/services/agents/interfaces/AgentStreamInterface.ts new file mode 100644 index 0000000000..39eeb84f8c --- /dev/null +++ b/src/main/services/agents/interfaces/AgentStreamInterface.ts @@ -0,0 +1,27 @@ +// Agent-agnostic streaming interface +// This interface should be implemented by all agent services + +import { EventEmitter } from 'node:events' + +import { UIMessageChunk } from 'ai' + +// Generic agent stream event that works with any agent type +export interface AgentStreamEvent { + type: 'chunk' | 'error' | 'complete' + chunk?: UIMessageChunk // Standard AI SDK chunk for UI consumption + rawAgentMessage?: any // Agent-specific raw message (SDKMessage for Claude Code, different for other agents) + error?: Error + agentResult?: any // Agent-specific result data +} + +// Agent stream interface that all agents should implement +export interface AgentStream extends EventEmitter { + emit(event: 'data', data: AgentStreamEvent): boolean + on(event: 'data', listener: (data: AgentStreamEvent) => void): this + once(event: 'data', listener: (data: AgentStreamEvent) => void): this +} + +// Base agent service interface +export interface AgentServiceInterface { + invoke(prompt: string, cwd: string, sessionId?: string, options?: any): AgentStream +} diff --git a/src/main/services/agents/services/AgentService.ts b/src/main/services/agents/services/AgentService.ts index 02a97f644a..9edca50301 100644 --- a/src/main/services/agents/services/AgentService.ts +++ b/src/main/services/agents/services/AgentService.ts @@ -1,50 +1,12 @@ import path from 'node:path' import { getDataPath } from '@main/utils' -import type { AgentEntity, AgentType, PermissionMode } from '@types' +import type { AgentEntity, CreateAgentRequest, GetAgentResponse, ListOptions, UpdateAgentRequest } from '@types' import { count, eq } from 'drizzle-orm' import { BaseService } from '../BaseService' import { type AgentRow, agentsTable, type InsertAgentRow } from '../database/schema' -// import { builtinTools } from './claudecode/tools' - -export interface CreateAgentRequest { - type: AgentType - name: string - description?: string - avatar?: string - instructions?: string - model: string - // plan_model?: string - // small_model?: string - // mcps?: string[] - // knowledges?: string[] - // configuration?: Record - accessible_paths?: string[] - permission_mode?: PermissionMode - max_steps?: number -} - -export interface UpdateAgentRequest { - name?: string - description?: string - avatar?: string - instructions?: string - model?: string - // plan_model?: string - // small_model?: string - // mcps?: string[] - // knowledges?: string[] - // configuration?: Record - accessible_paths?: string[] - permission_mode?: PermissionMode - max_steps?: number -} - -export interface ListAgentsOptions { - limit?: number - offset?: number -} +import { builtinTools } from './claudecode/tools' export class AgentService extends BaseService { private static instance: AgentService | null = null @@ -61,49 +23,36 @@ export class AgentService extends BaseService { } // Agent Methods - async createAgent(agentData: CreateAgentRequest): Promise { + async createAgent(req: CreateAgentRequest): Promise { this.ensureInitialized() const id = `agent_${Date.now()}_${Math.random().toString(36).substring(2, 11)}` const now = new Date().toISOString() - if (!agentData.accessible_paths || agentData.accessible_paths.length === 0) { + if (!req.accessible_paths || req.accessible_paths.length === 0) { const defaultPath = path.join(getDataPath(), 'agents', id) - agentData.accessible_paths = [defaultPath] + req.accessible_paths = [defaultPath] } - const serializedData = this.serializeJsonFields(agentData) + const serializedReq = this.serializeJsonFields(req) const insertData: InsertAgentRow = { id, - type: serializedData.type, - name: serializedData.name, - description: serializedData.description || null, - avatar: serializedData.avatar || null, - instructions: serializedData.instructions || null, - model: serializedData.model, - plan_model: serializedData.plan_model || null, - small_model: serializedData.small_model || null, - built_in_tools: serializedData.built_in_tools || null, - mcps: serializedData.mcps || null, - knowledges: serializedData.knowledges || null, - configuration: serializedData.configuration || null, - accessible_paths: serializedData.accessible_paths || null, - permission_mode: serializedData.permission_mode || 'default', - max_steps: serializedData.max_steps || 10, + type: req.type, + name: req.name || 'New Agent', + description: req.description, + instructions: req.instructions || 'You are a helpful assistant.', + model: req.model, + plan_model: req.plan_model, + small_model: req.small_model, + configuration: serializedReq.configuration, + accessible_paths: serializedReq.accessible_paths, created_at: now, updated_at: now } - if (serializedData.name === 'claude-code') { - // insertData.built_in_tools = JSON.stringify(builtinTools) - insertData.built_in_tools = JSON.stringify([]) - } - await this.database.insert(agentsTable).values(insertData) - const result = await this.database.select().from(agentsTable).where(eq(agentsTable.id, id)).limit(1) - if (!result[0]) { throw new Error('Failed to create agent') } @@ -112,7 +61,7 @@ export class AgentService extends BaseService { return agent } - async getAgent(id: string): Promise { + async getAgent(id: string): Promise { this.ensureInitialized() const result = await this.database.select().from(agentsTable).where(eq(agentsTable.id, id)).limit(1) @@ -121,18 +70,19 @@ export class AgentService extends BaseService { return null } - return this.deserializeJsonFields(result[0]) as AgentEntity + const agent = this.deserializeJsonFields(result[0]) as GetAgentResponse + if (agent.type === 'claude-code') { + agent.built_in_tools = builtinTools + } + + return agent } - async listAgents(options: ListAgentsOptions = {}): Promise<{ agents: AgentEntity[]; total: number }> { - this.ensureInitialized() + async listAgents(options: ListOptions = {}): Promise<{ agents: GetAgentResponse[]; total: number }> { + this.ensureInitialized() // Build query with pagination - // Get total count const totalResult = await this.database.select({ count: count() }).from(agentsTable) - const total = totalResult[0].count - - // Build query with pagination const baseQuery = this.database.select().from(agentsTable).orderBy(agentsTable.created_at) const result = @@ -142,12 +92,18 @@ export class AgentService extends BaseService { : await baseQuery.limit(options.limit) : await baseQuery - const agents = result.map((row) => this.deserializeJsonFields(row)) as AgentEntity[] + const agents = result.map((row) => this.deserializeJsonFields(row)) as GetAgentResponse[] - return { agents, total } + agents.forEach((agent) => { + if (agent.type === 'claude-code') { + agent.built_in_tools = builtinTools + } + }) + + return { agents, total: totalResult[0].count } } - async updateAgent(id: string, updates: UpdateAgentRequest): Promise { + async updateAgent(id: string, updates: UpdateAgentRequest): Promise { this.ensureInitialized() // Check if agent exists @@ -166,22 +122,15 @@ export class AgentService extends BaseService { // Only update fields that are provided if (serializedUpdates.name !== undefined) updateData.name = serializedUpdates.name if (serializedUpdates.description !== undefined) updateData.description = serializedUpdates.description - if (serializedUpdates.avatar !== undefined) updateData.avatar = serializedUpdates.avatar if (serializedUpdates.instructions !== undefined) updateData.instructions = serializedUpdates.instructions if (serializedUpdates.model !== undefined) updateData.model = serializedUpdates.model if (serializedUpdates.plan_model !== undefined) updateData.plan_model = serializedUpdates.plan_model if (serializedUpdates.small_model !== undefined) updateData.small_model = serializedUpdates.small_model - if (serializedUpdates.built_in_tools !== undefined) updateData.built_in_tools = serializedUpdates.built_in_tools if (serializedUpdates.mcps !== undefined) updateData.mcps = serializedUpdates.mcps - if (serializedUpdates.knowledges !== undefined) updateData.knowledges = serializedUpdates.knowledges if (serializedUpdates.configuration !== undefined) updateData.configuration = serializedUpdates.configuration if (serializedUpdates.accessible_paths !== undefined) updateData.accessible_paths = serializedUpdates.accessible_paths - if (serializedUpdates.permission_mode !== undefined) updateData.permission_mode = serializedUpdates.permission_mode - if (serializedUpdates.max_steps !== undefined) updateData.max_steps = serializedUpdates.max_steps - await this.database.update(agentsTable).set(updateData).where(eq(agentsTable.id, id)) - return await this.getAgent(id) } diff --git a/src/main/services/agents/services/SessionMessageService.ts b/src/main/services/agents/services/SessionMessageService.ts index 051bb771fe..ec98d112e3 100644 --- a/src/main/services/agents/services/SessionMessageService.ts +++ b/src/main/services/agents/services/SessionMessageService.ts @@ -1,7 +1,12 @@ import { EventEmitter } from 'node:events' import { loggerService } from '@logger' -import type { AgentSessionEntity, SessionMessageEntity } from '@types' +import type { + AgentSessionMessageEntity, + CreateSessionMessageRequest, + GetAgentSessionResponse, + ListOptions, +} from '@types' import { UIMessageChunk } from 'ai' import { count, eq } from 'drizzle-orm' @@ -11,27 +16,9 @@ import ClaudeCodeService from './claudecode' const logger = loggerService.withContext('SessionMessageService') -export interface CreateSessionMessageRequest { - session_id: string - parent_id?: number - role: 'user' | 'agent' | 'system' | 'tool' - type: string - content: string - metadata?: Record -} - -export interface UpdateSessionMessageRequest { - content?: Record - metadata?: Record -} - -export interface ListSessionMessagesOptions { - limit?: number - offset?: number -} - export class SessionMessageService extends BaseService { private static instance: SessionMessageService | null = null + private cc: ClaudeCodeService = new ClaudeCodeService() static getInstance(): SessionMessageService { if (!SessionMessageService.instance) { @@ -58,8 +45,8 @@ export class SessionMessageService extends BaseService { async listSessionMessages( sessionId: string, - options: ListSessionMessagesOptions = {} - ): Promise<{ messages: SessionMessageEntity[]; total: number }> { + options: ListOptions = {} + ): Promise<{ messages: AgentSessionMessageEntity[]; total: number }> { this.ensureInitialized() // Get total count @@ -84,128 +71,144 @@ export class SessionMessageService extends BaseService { : await baseQuery.limit(options.limit) : await baseQuery - const messages = result.map((row) => this.deserializeSessionMessage(row)) as SessionMessageEntity[] + const messages = result.map((row) => this.deserializeSessionMessage(row)) as AgentSessionMessageEntity[] return { messages, total } } - createSessionMessageStream(session: AgentSessionEntity, messageData: CreateSessionMessageRequest): EventEmitter { + createSessionMessageStream(session: GetAgentSessionResponse, messageData: CreateSessionMessageRequest): EventEmitter { this.ensureInitialized() // Create a new EventEmitter to manage the session message lifecycle const sessionStream = new EventEmitter() - // Validate parent exists if specified - if (messageData.parent_id) { - this.sessionMessageExists(messageData.parent_id) - .then((exists) => { - if (!exists) { - process.nextTick(() => { - sessionStream.emit('data', { - type: 'error', - error: new Error(`Parent message with id ${messageData.parent_id} does not exist`) - }) - }) - return - } - - // Start the Claude Code stream after validation passes - this.startClaudeCodeStream(session, messageData, sessionStream) - }) - .catch((error) => { - process.nextTick(() => { - sessionStream.emit('data', { - type: 'error', - error: error as Error - }) - }) - }) - } else { - // No parent validation needed, start immediately - this.startClaudeCodeStream(session, messageData, sessionStream) - } + // No parent validation needed, start immediately + this.startClaudeCodeStream(session, messageData, sessionStream) return sessionStream } private startClaudeCodeStream( - session: AgentSessionEntity, - messageData: CreateSessionMessageRequest, + session: GetAgentSessionResponse, + req: CreateSessionMessageRequest, sessionStream: EventEmitter ): void { - const cc = new ClaudeCodeService() + const previousMessages = session.messages || [] + let session_id: string = '' + if (previousMessages.length > 0) { + session_id = previousMessages[0].session_id + } - // Create the streaming Claude Code invocation - const claudeStream = cc.invokeStream( - messageData.content, - session.accessible_paths[0], - session.external_session_id, - { - permissionMode: session.permission_mode, - maxTurns: session.max_steps - } - ) + logger.debug('Claude Code stream message data:', { message: req, session_id }) - let sessionMessage: SessionMessageEntity | null = null + // Create the streaming agent invocation (using invokeStream for streaming) + const claudeStream = this.cc.invoke(req.content, session.accessible_paths[0], session_id, { + permissionMode: session.configuration?.permissionMode || 'default', + maxTurns: session.configuration?.maxTurns || 10 + }) - // Handle Claude Code stream events + let sessionMessage: AgentSessionMessageEntity | null = null + const streamedChunks: UIMessageChunk[] = [] + const rawAgentMessages: any[] = [] // Generic agent messages storage + + // Handle agent stream events (agent-agnostic) claudeStream.on('data', async (event: any) => { try { switch (event.type) { case 'chunk': - // Forward UIMessageChunk directly - sessionStream.emit('data', { - type: 'chunk', - chunk: event.chunk as UIMessageChunk - }) + // Forward UIMessageChunk directly and collect raw agent messages + if (event.chunk) { + const chunk = event.chunk as UIMessageChunk + streamedChunks.push(chunk) + + // Collect raw agent message if available (agent-agnostic) + if (event.rawAgentMessage) { + rawAgentMessages.push(event.rawAgentMessage) + } + + sessionStream.emit('data', { + type: 'chunk', + chunk + }) + } else { + logger.warn('Received agent chunk event without chunk payload') + } break case 'error': sessionStream.emit('data', { type: 'error', - error: event.error + error: event.error || (event.data?.stderr ? new Error(event.data.stderr) : undefined) }) break case 'complete': { - // Save the final message to database when Claude Code completes - logger.info('Claude Code stream completed, saving message to database') + // Save the final message to database when agent completes + logger.info('Agent stream completed, saving message to database') - const now = new Date().toISOString() - const insertData: InsertSessionMessageRow = { - session_id: messageData.session_id, - parent_id: messageData.parent_id || null, - role: messageData.role, - type: messageData.type, - content: JSON.stringify(event.result), - metadata: messageData.metadata ? JSON.stringify(messageData.metadata) : null, - created_at: now, - updated_at: now + // Extract additional raw agent messages from agentResult if available + if (event.agentResult?.rawSDKMessages) { + rawAgentMessages.push(...event.agentResult.rawSDKMessages) } - const result = await this.database.insert(sessionMessagesTable).values(insertData).returning() - - if (result[0]) { - sessionMessage = this.deserializeSessionMessage(result[0]) as SessionMessageEntity - logger.info(`Session message saved with ID: ${sessionMessage.id}`) - - // Emit the complete event with the saved message - sessionStream.emit('data', { - type: 'complete', - result: event.result, - message: sessionMessage - }) - } else { - sessionStream.emit('data', { - type: 'error', - error: new Error('Failed to save session message to database') - }) + // Create structured content with both AI SDK format and raw data + const structuredContent = { + aiSDKChunks: streamedChunks, // For UI consumption + rawAgentMessages: rawAgentMessages, // Original agent-specific messages + agentResult: event.agentResult, // Complete result from the agent + agentType: event.agentResult?.agentType || 'unknown' // Store agent type for future reference } + + // const now = new Date().toISOString() + // const insertData: InsertSessionMessageRow = { + // session_id: req.session_id, + // parent_id: req.parent_id || null, + // role: req.role, + // type: req.type, + // content: JSON.stringify(structuredContent), + // metadata: req.metadata + // ? JSON.stringify({ + // ...req.metadata, + // chunkCount: streamedChunks.length, + // rawMessageCount: rawAgentMessages.length, + // agentType: event.agentResult?.agentType || 'unknown', + // completedAt: now + // }) + // : JSON.stringify({ + // chunkCount: streamedChunks.length, + // rawMessageCount: rawAgentMessages.length, + // agentType: event.agentResult?.agentType || 'unknown', + // completedAt: now + // }), + // created_at: now, + // updated_at: now + // } + + // const result = await this.database.insert(sessionMessagesTable).values(insertData).returning() + + // if (result[0]) { + // sessionMessage = this.deserializeSessionMessage(result[0]) as AgentSessionMessageEntity + // logger.info(`Session message saved with ID: ${sessionMessage.id}`) + + // // Emit the complete event with the saved message and structured data + // sessionStream.emit('data', { + // type: 'complete', + // result: structuredContent, + // message: sessionMessage + // }) + // } else { + // sessionStream.emit('data', { + // type: 'error', + // error: new Error('Failed to save session message to database') + // }) + // } break } default: - logger.warn('Unknown event type from Claude Code service:', { type: event.type }) + logger.warn('Unknown event type from Claude Code service:', { + type: event.type + }) break } } catch (error) { @@ -218,7 +221,7 @@ export class SessionMessageService extends BaseService { }) } - private deserializeSessionMessage(data: any): SessionMessageEntity { + private deserializeSessionMessage(data: any): AgentSessionMessageEntity { if (!data) return data const deserialized = { ...data } diff --git a/src/main/services/agents/services/SessionService.ts b/src/main/services/agents/services/SessionService.ts index dc61542aa5..6e13656e3d 100644 --- a/src/main/services/agents/services/SessionService.ts +++ b/src/main/services/agents/services/SessionService.ts @@ -1,52 +1,15 @@ -import type { AgentSessionEntity, PermissionMode, SessionStatus } from '@types' +import type { + AgentSessionEntity, + CreateSessionRequest, + GetAgentSessionResponse, + ListOptions, + UpdateSessionRequest +} from '@types' import { and, count, eq, type SQL } from 'drizzle-orm' import { BaseService } from '../BaseService' import { agentsTable, type InsertSessionRow, type SessionRow, sessionsTable } from '../database/schema' -export interface CreateSessionRequest { - name?: string - main_agent_id: string - sub_agent_ids?: string[] - user_goal?: string - status?: SessionStatus - external_session_id?: string - model?: string - plan_model?: string - small_model?: string - built_in_tools?: string[] - mcps?: string[] - knowledges?: string[] - configuration?: Record - accessible_paths?: string[] - permission_mode?: PermissionMode - max_steps?: number -} - -export interface UpdateSessionRequest { - name?: string - main_agent_id?: string - sub_agent_ids?: string[] - user_goal?: string - status?: SessionStatus - external_session_id?: string - model?: string - plan_model?: string - small_model?: string - built_in_tools?: string[] - mcps?: string[] - knowledges?: string[] - configuration?: Record - accessible_paths?: string[] - permission_mode?: PermissionMode - max_steps?: number -} - -export interface ListSessionsOptions { - limit?: number - offset?: number - status?: SessionStatus -} export class SessionService extends BaseService { private static instance: SessionService | null = null @@ -62,18 +25,14 @@ export class SessionService extends BaseService { await BaseService.initialize() } - async createSession(sessionData: CreateSessionRequest): Promise { + async createSession(req: CreateSessionRequest): Promise { this.ensureInitialized() // Validate agent exists - we'll need to import AgentService for this check // For now, we'll skip this validation to avoid circular dependencies // The database foreign key constraint will handle this - const agents = await this.database - .select() - .from(agentsTable) - .where(eq(agentsTable.id, sessionData.main_agent_id)) - .limit(1) + const agents = await this.database.select().from(agentsTable).where(eq(agentsTable.id, req.agent_id)).limit(1) if (!agents[0]) { throw new Error('Agent not found') } @@ -83,20 +42,9 @@ export class SessionService extends BaseService { const now = new Date().toISOString() // inherit configuration from agent by default, can be overridden by sessionData - sessionData = { - ...{ - model: agent.model, - plan_model: agent.plan_model, - small_model: agent.small_model, - mcps: agent.mcps, - knowledges: agent.knowledges, - configuration: agent.configuration, - accessible_paths: agent.accessible_paths, - permission_mode: agent.permission_mode, - max_steps: agent.max_steps, - status: 'idle' - }, - ...sessionData + const sessionData: Partial = { + ...agent, + ...req } const serializedData = this.serializeJsonFields(sessionData) @@ -104,20 +52,14 @@ export class SessionService extends BaseService { const insertData: InsertSessionRow = { id, name: serializedData.name || null, - main_agent_id: serializedData.main_agent_id, - sub_agent_ids: serializedData.sub_agent_ids || null, - user_goal: serializedData.user_goal || null, - status: serializedData.status || 'idle', - external_session_id: serializedData.external_session_id || null, + agent_id: serializedData.agent_id, + description: serializedData.description || null, model: serializedData.model || null, plan_model: serializedData.plan_model || null, small_model: serializedData.small_model || null, mcps: serializedData.mcps || null, - knowledges: serializedData.knowledges || null, configuration: serializedData.configuration || null, accessible_paths: serializedData.accessible_paths || null, - permission_mode: serializedData.permission_mode || 'readOnly', - max_steps: serializedData.max_steps || 10, created_at: now, updated_at: now } @@ -133,7 +75,7 @@ export class SessionService extends BaseService { return this.deserializeJsonFields(result[0]) as AgentSessionEntity } - async getSession(id: string): Promise { + async getSession(id: string): Promise { this.ensureInitialized() const result = await this.database.select().from(sessionsTable).where(eq(sessionsTable.id, id)).limit(1) @@ -142,7 +84,9 @@ export class SessionService extends BaseService { return null } - return this.deserializeJsonFields(result[0]) as AgentSessionEntity + const session = this.deserializeJsonFields(result[0]) as GetAgentSessionResponse + + return session } async getSessionWithAgent(id: string): Promise { @@ -155,17 +99,14 @@ export class SessionService extends BaseService { async listSessions( agentId?: string, - options: ListSessionsOptions = {} + options: ListOptions = {} ): Promise<{ sessions: AgentSessionEntity[]; total: number }> { this.ensureInitialized() // Build where conditions const whereConditions: SQL[] = [] if (agentId) { - whereConditions.push(eq(sessionsTable.main_agent_id, agentId)) - } - if (options.status) { - whereConditions.push(eq(sessionsTable.status, options.status)) + whereConditions.push(eq(sessionsTable.agent_id, agentId)) } const whereClause = @@ -190,12 +131,12 @@ export class SessionService extends BaseService { : await baseQuery.limit(options.limit) : await baseQuery - const sessions = result.map((row) => this.deserializeJsonFields(row)) as AgentSessionEntity[] + const sessions = result.map((row) => this.deserializeJsonFields(row)) as GetAgentSessionResponse[] return { sessions, total } } - async updateSession(id: string, updates: UpdateSessionRequest): Promise { + async updateSession(id: string, updates: UpdateSessionRequest): Promise { this.ensureInitialized() // Check if session exists @@ -216,46 +157,22 @@ export class SessionService extends BaseService { // Only update fields that are provided if (serializedUpdates.name !== undefined) updateData.name = serializedUpdates.name - if (serializedUpdates.main_agent_id !== undefined) updateData.main_agent_id = serializedUpdates.main_agent_id - if (serializedUpdates.sub_agent_ids !== undefined) updateData.sub_agent_ids = serializedUpdates.sub_agent_ids - if (serializedUpdates.user_goal !== undefined) updateData.user_goal = serializedUpdates.user_goal - if (serializedUpdates.status !== undefined) updateData.status = serializedUpdates.status - if (serializedUpdates.external_session_id !== undefined) - updateData.external_session_id = serializedUpdates.external_session_id + if (serializedUpdates.model !== undefined) updateData.model = serializedUpdates.model if (serializedUpdates.plan_model !== undefined) updateData.plan_model = serializedUpdates.plan_model if (serializedUpdates.small_model !== undefined) updateData.small_model = serializedUpdates.small_model - if (serializedUpdates.built_in_tools !== undefined) updateData.built_in_tools = serializedUpdates.built_in_tools + if (serializedUpdates.mcps !== undefined) updateData.mcps = serializedUpdates.mcps - if (serializedUpdates.knowledges !== undefined) updateData.knowledges = serializedUpdates.knowledges + if (serializedUpdates.configuration !== undefined) updateData.configuration = serializedUpdates.configuration if (serializedUpdates.accessible_paths !== undefined) updateData.accessible_paths = serializedUpdates.accessible_paths - if (serializedUpdates.permission_mode !== undefined) updateData.permission_mode = serializedUpdates.permission_mode - if (serializedUpdates.max_steps !== undefined) updateData.max_steps = serializedUpdates.max_steps await this.database.update(sessionsTable).set(updateData).where(eq(sessionsTable.id, id)) return await this.getSession(id) } - async updateSessionStatus(id: string, status: SessionStatus): Promise { - this.ensureInitialized() - - const now = new Date().toISOString() - - const result = await this.database - .update(sessionsTable) - .set({ status, updated_at: now }) - .where(eq(sessionsTable.id, id)) - - if (result.rowsAffected === 0) { - return null - } - - return await this.getSession(id) - } - async deleteSession(id: string): Promise { this.ensureInitialized() diff --git a/src/main/services/agents/services/claudecode/index.ts b/src/main/services/agents/services/claudecode/index.ts index aca42ef747..73b737c4f1 100644 --- a/src/main/services/agents/services/claudecode/index.ts +++ b/src/main/services/agents/services/claudecode/index.ts @@ -5,8 +5,8 @@ import { createRequire } from 'node:module' import { Options, SDKMessage } from '@anthropic-ai/claude-code' import { loggerService } from '@logger' -import { UIMessageChunk } from 'ai' +import { AgentServiceInterface, AgentStream, AgentStreamEvent } from '../../interfaces/AgentStreamInterface' import { transformSDKMessageToUIChunk } from './transform' const require_ = createRequire(import.meta.url) @@ -21,34 +21,13 @@ interface ClaudeCodeResult { exitCode?: number } -interface ClaudeCodeStreamEvent { - type: 'message' | 'error' | 'complete' - data?: any - error?: Error - result?: ClaudeCodeResult +class ClaudeCodeStream extends EventEmitter implements AgentStream { + declare emit: (event: 'data', data: AgentStreamEvent) => boolean + declare on: (event: 'data', listener: (data: AgentStreamEvent) => void) => this + declare once: (event: 'data', listener: (data: AgentStreamEvent) => void) => this } -class ClaudeCodeStream extends EventEmitter { - declare emit: (event: 'data', data: ClaudeCodeStreamEvent) => boolean - declare on: (event: 'data', listener: (data: ClaudeCodeStreamEvent) => void) => this - declare once: (event: 'data', listener: (data: ClaudeCodeStreamEvent) => void) => this -} - -// AI SDK compatible stream events -interface AISDKStreamEvent { - type: 'chunk' | 'error' | 'complete' - chunk?: UIMessageChunk - error?: Error - result?: ClaudeCodeResult -} - -class AISDKStream extends EventEmitter { - declare emit: (event: 'data', data: AISDKStreamEvent) => boolean - declare on: (event: 'data', listener: (data: AISDKStreamEvent) => void) => this - declare once: (event: 'data', listener: (data: AISDKStreamEvent) => void) => this -} - -class ClaudeCodeService { +class ClaudeCodeService implements AgentServiceInterface { private claudeExecutablePath: string constructor() { @@ -56,48 +35,8 @@ class ClaudeCodeService { this.claudeExecutablePath = require_.resolve('@anthropic-ai/claude-code/cli.js') } - async invoke(prompt: string, cwd: string, session_id?: string, base?: Options): Promise { - // Ensure Electron behaves like Node for any child process that resolves to process.execPath - // process.env.ELECTRON_RUN_AS_NODE = '1' - - const args: string[] = [this.claudeExecutablePath, '--output-format', 'stream-json', '--verbose', 'cwd', cwd] - - if (session_id) { - args.push('--resume', session_id) - } - if (base?.maxTurns) { - args.push('--max-turns', base.maxTurns.toString()) - } - if (base?.permissionMode) { - args.push('--permission-mode', base.permissionMode) - } - - args.push('--print', prompt) - - logger.info('Spawning Claude Code process', { args, cwd }) - - const p = spawn(process.execPath, args, { - env: { ...process.env, ELECTRON_RUN_AS_NODE: '1' }, - stdio: ['pipe', 'pipe', 'pipe'], - shell: false, - detached: false - }) - - // Log process creation - logger.info('Process created', { pid: p.pid }) - - // Close stdin immediately since we're passing the prompt via --print - if (p.stdin) { - p.stdin.end() - logger.debug('Closed stdin') - } - - return this.setupProcessHandlers(p) - } - - invokeStream(prompt: string, cwd: string, session_id?: string, base?: Options): EventEmitter { - const aiStream = new AISDKStream() - const rawStream = new ClaudeCodeStream() + invoke(prompt: string, cwd: string, session_id?: string, base?: Options): AgentStream { + const aiStream = new ClaudeCodeStream() // Spawn process with same parameters as invoke const args: string[] = [this.claudeExecutablePath, '--output-format', 'stream-json', '--verbose'] @@ -132,8 +71,7 @@ class ClaudeCodeService { logger.debug('Closed stdin for streaming process') } - this.setupStreamingHandlers(p, rawStream) - this.setupAISDKTransform(rawStream, aiStream) + this.setupStreamingHandlers(p, aiStream) return aiStream } @@ -146,34 +84,59 @@ class ClaudeCodeService { let stderrData = '' const jsonOutput: any[] = [] let hasCompleted = false + let stdoutBuffer = '' const startTime = Date.now() + const emitChunks = (sdkMessage: SDKMessage) => { + jsonOutput.push(sdkMessage) + const chunks = transformSDKMessageToUIChunk(sdkMessage) + for (const chunk of chunks) { + stream.emit('data', { + type: 'chunk', + chunk, + rawAgentMessage: sdkMessage // Store Claude Code specific SDKMessage as generic agent message + }) + } + } + // Handle stdout with streaming events if (process.stdout) { process.stdout.setEncoding('utf8') process.stdout.on('data', (data: string) => { stdoutData += data + stdoutBuffer += data logger.debug('Streaming stdout chunk:', { length: data.length }) - // Parse JSON stream output line by line and emit events - const lines = data.split('\n') - for (const line of lines) { + let newlineIndex = stdoutBuffer.indexOf('\n') + while (newlineIndex !== -1) { + const line = stdoutBuffer.slice(0, newlineIndex) + stdoutBuffer = stdoutBuffer.slice(newlineIndex + 1) const trimmed = line.trim() - if (!trimmed) continue - try { - const parsed = JSON.parse(trimmed) - stream.emit('data', { type: 'message', data: parsed }) - logger.debug('Parsed JSON line', { parsed }) - } catch { - // If you expect NDJSON only, you may want to keep this in buffer instead of emitting. - stream.emit('data', { type: 'message', data: { text: trimmed } }) - logger.debug('Non-JSON line', { line: trimmed }) + if (trimmed) { + try { + const parsed = JSON.parse(trimmed) as SDKMessage + emitChunks(parsed) + logger.debug('Parsed JSON line', { parsed }) + } catch (error) { + logger.debug('Non-JSON line', { line: trimmed }) + } } + newlineIndex = stdoutBuffer.indexOf('\n') } }) process.stdout.on('end', () => { + const trimmed = stdoutBuffer.trim() + if (trimmed) { + try { + const parsed = JSON.parse(trimmed) as SDKMessage + emitChunks(parsed) + logger.debug('Parsed JSON line on stream end', { parsed }) + } catch (error) { + logger.debug('Non-JSON remainder on stdout end', { line: trimmed }) + } + } logger.debug('Streaming stdout ended') }) } @@ -183,12 +146,12 @@ class ClaudeCodeService { process.stderr.setEncoding('utf8') process.stderr.on('data', (data: string) => { stderrData += data - logger.warn('Streaming stderr chunk:', { data: data.trim() }) - - // Emit stderr as error events + const message = data.trim() + if (!message) return + logger.warn('Streaming stderr chunk:', { data: message }) stream.emit('data', { type: 'error', - data: { stderr: data.trim() } + error: new Error(message) }) }) @@ -225,10 +188,14 @@ class ClaudeCodeService { error } - // Emit completion event + // Emit completion event with agent-specific result stream.emit('data', { type: 'complete', - result + agentResult: { + ...result, + rawSDKMessages: jsonOutput, // Claude Code specific: all collected SDK messages + agentType: 'claude-code' // Identify the agent type + } }) } @@ -275,45 +242,6 @@ class ClaudeCodeService { process.on('error', () => clearTimeout(timeout)) } - /** - * Transform raw Claude Code stream events to AI SDK format - */ - private setupAISDKTransform(rawStream: ClaudeCodeStream, aiStream: AISDKStream): void { - rawStream.on('data', (event: ClaudeCodeStreamEvent) => { - try { - switch (event.type) { - case 'message': - // Transform SDKMessage to UIMessageChunk - if (event.data) { - const chunks = transformSDKMessageToUIChunk(event.data as SDKMessage) - for (const chunk of chunks) { - aiStream.emit('data', { type: 'chunk', chunk }) - } - } - break - - case 'error': - aiStream.emit('data', { type: 'error', error: event.error }) - break - - case 'complete': - aiStream.emit('data', { type: 'complete', result: event.result }) - break - - default: - logger.warn('Unknown raw stream event type:', { type: (event as any).type }) - break - } - } catch (error) { - logger.error('Error transforming stream event:', { error, event }) - aiStream.emit('data', { - type: 'error', - error: error instanceof Error ? error : new Error('Transform error') - }) - } - }) - } - /** * Set up process event handlers and return a promise that resolves with complete output */ diff --git a/src/main/services/agents/services/claudecode/transform.ts b/src/main/services/agents/services/claudecode/transform.ts index d5d1185dbb..31b8be0cf9 100644 --- a/src/main/services/agents/services/claudecode/transform.ts +++ b/src/main/services/agents/services/claudecode/transform.ts @@ -74,7 +74,14 @@ export function transformSDKMessageToUIChunk(sdkMessage: SDKMessage): UIMessageC function sdkMessageToProviderMetadata(message: SDKMessage): ProviderMetadata { const meta: ProviderMetadata = { - raw: message as Record + raw: message as Record, + claudeCode: { + originalSDKMessage: JSON.parse(JSON.stringify(message)), // Serialize to ensure JSON compatibility + uuid: message.uuid || null, + session_id: message.session_id || null, + timestamp: new Date().toISOString(), + type: message.type + } } return meta } @@ -392,6 +399,10 @@ function handleResultMessage(message: Extract): }) } + // Always emit a finish chunk at the end + chunks.push({ + type: 'finish' + }) return chunks } diff --git a/src/main/services/agents/services/index.ts b/src/main/services/agents/services/index.ts index bde32ad84f..33bb352a04 100644 --- a/src/main/services/agents/services/index.ts +++ b/src/main/services/agents/services/index.ts @@ -16,10 +16,5 @@ export { sessionMessageService } from './SessionMessageService' export { sessionService } from './SessionService' // Type definitions for service requests and responses -export type { CreateAgentRequest, ListAgentsOptions, UpdateAgentRequest } from './AgentService' -export type { - CreateSessionMessageRequest, - ListSessionMessagesOptions, - UpdateSessionMessageRequest -} from './SessionMessageService' + export type { CreateSessionRequest, ListSessionsOptions, UpdateSessionRequest } from './SessionService' diff --git a/src/renderer/src/config/agent.ts b/src/renderer/src/config/agent.ts index 711ede4e18..eed0f226ac 100644 --- a/src/renderer/src/config/agent.ts +++ b/src/renderer/src/config/agent.ts @@ -1,9 +1,9 @@ -import { AgentConfiguration } from '@renderer/types' +import { AgentBase } from '@renderer/types' // base agent config. no default config for now. -const DEFAULT_AGENT_CONFIG: Omit = {} as const +const DEFAULT_AGENT_CONFIG: Omit = {} as const // no default config for now. -export const DEFAULT_CLAUDE_CODE_CONFIG: Omit = { +export const DEFAULT_CLAUDE_CODE_CONFIG: Omit = { ...DEFAULT_AGENT_CONFIG } as const diff --git a/src/renderer/src/hooks/agents/useUpdateAgent.ts b/src/renderer/src/hooks/agents/useUpdateAgent.ts index 01f23072ec..632d740fe7 100644 --- a/src/renderer/src/hooks/agents/useUpdateAgent.ts +++ b/src/renderer/src/hooks/agents/useUpdateAgent.ts @@ -7,8 +7,7 @@ export const useUpdateAgent = () => { // TODO: use api return useMutation({ // @ts-expect-error not-implemented - - mutationFn: async ({ id, ...payload }: Partial & { id: string }) => {}, + mutationFn: async ({}: Partial & { id: string }) => {}, onSuccess: (updated: AgentEntity) => { qc.setQueryData(['todos'], (old) => old ? old.map((t) => (t.id === updated.id ? updated : t)) : [] diff --git a/src/renderer/src/types/agent.ts b/src/renderer/src/types/agent.ts index 611f2d593a..f21275eb1e 100644 --- a/src/renderer/src/types/agent.ts +++ b/src/renderer/src/types/agent.ts @@ -2,10 +2,9 @@ * Database entity types for Agent, Session, and SessionMessage * Shared between main and renderer processes */ -import { TextStreamPart } from 'ai' -export type SessionStatus = 'idle' | 'running' | 'completed' | 'failed' | 'stopped' +import { TextStreamPart, UIMessageChunk, ModelMessage } from 'ai' export type PermissionMode = 'default' | 'acceptEdits' | 'bypassPermissions' | 'plan' -export type SessionMessageRole = 'assistant' | 'user' | 'system' | 'tool' +export type SessionMessageRole = ModelMessage['role'] export type AgentType = 'claude-code' export const isAgentType = (type: string): type is AgentType => { @@ -17,59 +16,101 @@ export type SessionMessageType = TextStreamPart>['type'] export interface Tool { id: string name: string - description: string - requirePermissions: boolean + description?: string + requirePermissions?: boolean +} + +export interface AgentConfiguration extends Record { + permission_mode: PermissionMode // Permission mode, default to 'default' + max_turns: number // Maximum number of interaction turns, default to 10 } // Shared configuration interface for both agents and sessions -export interface AgentConfiguration { +export interface AgentBase { + // Basic info + name?: string + description?: string + accessible_paths: string[] // Array of directory paths the agent can access + + // Instructions for the agent + instructions?: string // System prompt + + // Models model: string // Main Model ID (required) plan_model?: string // Optional plan/thinking model ID small_model?: string // Optional small/fast model ID - built_in_tools?: Tool[] // Array of built-in tool IDs + + // Tools mcps?: string[] // Array of MCP tool IDs - knowledges?: string[] // Array of enabled knowledge base IDs - configuration?: Record // Extensible settings like temperature, top_p - accessible_paths: string[] // Array of directory paths the agent can access - permission_mode: PermissionMode // Permission mode - max_steps: number // Maximum number of steps the agent can take + allowed_tools?: string[] // Array of allowed tool IDs (whitelist) + + // Configuration + configuration?: AgentConfiguration // Extensible settings like temperature, top_p, etc. } // Agent entity representing an autonomous agent configuration -export interface AgentEntity extends AgentConfiguration { +export interface AgentEntity extends AgentBase { id: string type: AgentType - name: string - description?: string - avatar?: string - instructions?: string // System prompt created_at: string updated_at: string } +export interface CreateAgentRequest extends AgentBase { + type: AgentType +} + +export interface UpdateAgentRequest extends Partial {} + +export interface GetAgentResponse extends AgentEntity { + built_in_tools?: Tool[] // Built-in tools available to the agent +} + +export interface ListOptions { + limit?: number + offset?: number +} + // AgentSession entity representing a conversation session with one or more agents -export interface AgentSessionEntity extends AgentConfiguration { +export interface AgentSessionEntity extends AgentBase { id: string - name?: string // Session name - main_agent_id: string // Primary agent ID for the session - sub_agent_ids?: string[] // Array of sub-agent IDs involved in the session - user_goal?: string // Initial user goal for the session - status: SessionStatus - external_session_id?: string // Agent session for external agent management/tracking - messages?: SessionMessageEntity[] // Hierarchical session messages + agent_id: string // Primary agent ID for the session + // sub_agent_ids?: string[] // Array of sub-agent IDs involved in the session + created_at: string updated_at: string } -// SessionMessage entity for tracking all agent activities -export interface SessionMessageEntity { +export interface CreateSessionRequest extends AgentBase { + agent_id: string // Primary agent ID for the session +} + +export interface UpdateSessionRequest extends Partial {} + +export interface GetAgentSessionResponse extends AgentSessionEntity { + built_in_tools?: Tool[] // Built-in tools available to the agent + messages: AgentSessionMessageEntity[] // Messages in the session +} + +// AgentSessionMessageEntity representing a message within a session +export interface AgentSessionMessageEntity { id: number // Auto-increment primary key session_id: string // Reference to session - parent_id?: number // For tree structure (e.g., tool calls under an action) - role: SessionMessageRole // 'user', 'agent', 'system', 'tool' - type: SessionMessageType // Type of log entry - content: string | Record // JSON structured data + role: ModelMessage['role'] // 'assistant' | 'user' | 'system' | 'tool' + content: ModelMessage metadata?: Record // Additional metadata (optional) created_at: string // ISO timestamp updated_at: string // ISO timestamp } + +export interface CreateSessionMessageRequest { + content: string +} + +// Structured content for session messages that preserves both AI SDK and raw data +export interface SessionMessageContent { + chunk: UIMessageChunk[] // UI-friendly AI SDK chunks for rendering + raw: any[] // Original agent-specific messages for data integrity (agent-agnostic) + agentResult?: any // Complete result from the underlying agent service + agentType: string // The type of agent that generated this message (e.g., 'claude-code', 'openai', etc.) +}