rewrite agents schema and types

This commit is contained in:
Vaayne 2025-09-16 23:55:31 +08:00
parent 697f7d1946
commit 669f60273c
21 changed files with 871 additions and 766 deletions

View File

@ -17,7 +17,7 @@ const verifyAgentAndSession = async (agentId: string, sessionId: string) => {
throw { status: 404, code: 'session_not_found', message: 'Session not found' }
}
if (session.main_agent_id !== agentId) {
if (session.agent_id !== agentId) {
throw { status: 404, code: 'session_not_found', message: 'Session not found for this agent' }
}
@ -43,7 +43,7 @@ export const createMessageStream = async (req: Request, res: Response): Promise<
res.setHeader('Access-Control-Allow-Headers', 'Cache-Control')
// Send initial connection event
res.write('data: {"type":"connected"}\n\n')
res.write('data: {"type":"start"}\n\n')
const messageStream = sessionMessageService.createSessionMessageStream(session, messageData)

View File

@ -78,7 +78,6 @@ export const getSession = async (req: Request, res: Response): Promise<Response>
}
// Verify session belongs to the agent
if (session.main_agent_id !== agentId) {
logger.warn(`Session ${sessionId} does not belong to agent ${agentId}`)
return res.status(404).json({
error: {
@ -121,7 +120,7 @@ export const updateSession = async (req: Request, res: Response): Promise<Respon
// First check if session exists and belongs to agent
const existingSession = await sessionService.getSession(sessionId)
if (!existingSession || existingSession.main_agent_id !== agentId) {
if (!existingSession || existingSession.agent_id !== agentId) {
logger.warn(`Session ${sessionId} not found for agent ${agentId}`)
return res.status(404).json({
error: {
@ -169,7 +168,7 @@ export const patchSession = async (req: Request, res: Response): Promise<Respons
// First check if session exists and belongs to agent
const existingSession = await sessionService.getSession(sessionId)
if (!existingSession || existingSession.main_agent_id !== agentId) {
if (!existingSession || existingSession.agent_id !== agentId) {
logger.warn(`Session ${sessionId} not found for agent ${agentId}`)
return res.status(404).json({
error: {
@ -215,7 +214,7 @@ export const deleteSession = async (req: Request, res: Response): Promise<Respon
// First check if session exists and belongs to agent
const existingSession = await sessionService.getSession(sessionId)
if (!existingSession || existingSession.main_agent_id !== agentId) {
if (!existingSession || existingSession.agent_id !== agentId) {
logger.warn(`Session ${sessionId} not found for agent ${agentId}`)
return res.status(404).json({
error: {

View File

@ -0,0 +1,341 @@
# Agent Message Architecture Design Document
## Overview
This document describes the architecture for handling agent messages in Cherry Studio, including how agent-specific messages are generated, transformed to AI SDK format, stored, and sent to the UI. The system is designed to be agent-agnostic, allowing multiple agent types (Claude Code, OpenAI, etc.) to integrate seamlessly.
## Core Design Principles
1. **Agent Agnosticism**: The core message handling system should work with any agent type without modification
2. **Data Preservation**: All raw agent data must be preserved alongside transformed UI-friendly formats
3. **Streaming First**: Support real-time streaming of agent responses to the UI
4. **Type Safety**: Strong TypeScript interfaces ensure consistency across the pipeline
## Architecture Components
### 1. Agent Service Layer
Each agent (e.g., ClaudeCodeService) implements the `AgentServiceInterface`:
```typescript
interface AgentServiceInterface {
invoke(prompt: string, cwd: string, sessionId?: string, options?: any): AgentStream
}
```
#### Responsibilities:
- Spawn and manage agent-specific processes (e.g., Claude Code CLI)
- Parse agent-specific output formats (e.g., SDKMessage for Claude Code)
- Transform agent messages to AI SDK format
- Emit standardized `AgentStreamEvent` objects
### 2. Agent Stream Events
The standardized event interface that all agents emit:
```typescript
interface AgentStreamEvent {
type: 'chunk' | 'error' | 'complete'
chunk?: UIMessageChunk // AI SDK format for UI
rawAgentMessage?: any // Agent-specific raw message
error?: Error
agentResult?: any // Complete agent-specific result
}
```
### 3. Session Message Service
The `SessionMessageService` acts as the orchestration layer:
#### Responsibilities:
- Manages session lifecycle and persistence
- Collects streaming chunks and raw agent messages
- Stores structured data in the database
- Forwards events to the API layer
### 4. Database Storage
Session messages are stored with complete structured data:
```typescript
interface SessionMessageContent {
aiSDKChunks: UIMessageChunk[] // UI-friendly format
rawAgentMessages: any[] // Original agent messages
agentResult?: any // Complete agent result
agentType: string // Agent identifier
}
```
## Data Flow
```mermaid
graph TD
A[User Input] --> B[API Handler]
B --> C[SessionMessageService]
C --> D[Agent Service]
D --> E[Agent Process]
E --> F[Raw Agent Output]
F --> G[Transform to AI SDK]
G --> H[Emit AgentStreamEvent]
H --> I[SessionMessageService]
I --> J[Store in Database]
I --> K[Forward to Client]
K --> L[UI Rendering]
```
## Message Transformation Process
### Step 1: Raw Agent Message Generation
Each agent generates messages in its native format:
**Claude Code Example:**
```typescript
// SDKMessage from Claude Code CLI
{
type: 'assistant',
uuid: 'msg_123',
session_id: 'session_456',
message: {
role: 'assistant',
content: [
{ type: 'text', text: 'Hello, I can help...' },
{ type: 'tool_use', id: 'tool_1', name: 'read_file', input: {...} }
]
}
}
```
### Step 2: Transformation to AI SDK Format
The agent service transforms native messages to AI SDK `UIMessageChunk`:
```typescript
// In ClaudeCodeService
const emitChunks = (sdkMessage: SDKMessage) => {
// Transform to AI SDK format
const chunks = transformSDKMessageToUIChunk(sdkMessage)
for (const chunk of chunks) {
stream.emit('data', {
type: 'chunk',
chunk, // AI SDK format
rawAgentMessage: sdkMessage // Preserve original
})
}
}
```
**Transformed AI SDK Chunk:**
```typescript
{
type: 'text-delta',
id: 'msg_123',
delta: 'Hello, I can help...',
providerMetadata: {
claudeCode: {
originalSDKMessage: {...},
uuid: 'msg_123',
session_id: 'session_456'
}
}
}
```
### Step 3: Session Message Processing
The SessionMessageService collects and processes events:
```typescript
// Collect streaming data
const streamedChunks: UIMessageChunk[] = []
const rawAgentMessages: any[] = []
claudeStream.on('data', async (event: AgentStreamEvent) => {
switch (event.type) {
case 'chunk':
streamedChunks.push(event.chunk)
if (event.rawAgentMessage) {
rawAgentMessages.push(event.rawAgentMessage)
}
// Forward to client
sessionStream.emit('data', { type: 'chunk', chunk: event.chunk })
break
case 'complete':
// Store complete structured data
const content = {
aiSDKChunks: streamedChunks,
rawAgentMessages: rawAgentMessages,
agentResult: event.agentResult,
agentType: event.agentResult?.agentType || 'unknown'
}
// Save to database...
break
}
})
```
### Step 4: Client Streaming
The API handler converts events to Server-Sent Events (SSE):
```typescript
// In API handler
messageStream.on('data', (event: any) => {
switch (event.type) {
case 'chunk':
// Send AI SDK chunk as SSE
res.write(`data: ${JSON.stringify(event.chunk)}\n\n`)
break
case 'complete':
res.write('data: [DONE]\n\n')
res.end()
break
}
})
```
## Adding New Agent Types
To add support for a new agent (e.g., OpenAI):
### 1. Create Agent Service
```typescript
class OpenAIService implements AgentServiceInterface {
invokeStream(prompt: string, cwd: string, sessionId?: string, options?: any): AgentStream {
const stream = new OpenAIStream()
// Call OpenAI API
const openaiResponse = await openai.chat.completions.create({
messages: [{ role: 'user', content: prompt }],
stream: true
})
// Transform OpenAI format to AI SDK
for await (const chunk of openaiResponse) {
const aiSDKChunk = transformOpenAIToAISDK(chunk)
stream.emit('data', {
type: 'chunk',
chunk: aiSDKChunk,
rawAgentMessage: chunk // Preserve OpenAI format
})
}
return stream
}
}
```
### 2. Create Transform Function
```typescript
function transformOpenAIToAISDK(openaiChunk: OpenAIChunk): UIMessageChunk {
return {
type: 'text-delta',
id: openaiChunk.id,
delta: openaiChunk.choices[0].delta.content,
providerMetadata: {
openai: {
original: openaiChunk,
model: openaiChunk.model
}
}
}
}
```
### 3. Register Agent Type
Update the agent type enum and factory:
```typescript
export type AgentType = 'claude-code' | 'openai' | 'anthropic-api'
function createAgentService(type: AgentType): AgentServiceInterface {
switch (type) {
case 'claude-code':
return new ClaudeCodeService()
case 'openai':
return new OpenAIService()
// ...
}
}
```
## Benefits of This Architecture
1. **Extensibility**: Easy to add new agent types without modifying core logic
2. **Data Integrity**: Raw agent data is never lost during transformation
3. **Debugging**: Complete message history available for troubleshooting
4. **Performance**: Streaming support for real-time responses
5. **Type Safety**: Strong interfaces prevent runtime errors
6. **UI Consistency**: All agents provide data in standard AI SDK format
## Key Interfaces Reference
### AgentStreamEvent
```typescript
interface AgentStreamEvent {
type: 'chunk' | 'error' | 'complete'
chunk?: UIMessageChunk
rawAgentMessage?: any
error?: Error
agentResult?: any
}
```
### SessionMessageEntity
```typescript
interface SessionMessageEntity {
id: number
session_id: string
parent_id?: number
role: 'user' | 'assistant' | 'system' | 'tool'
type: string
content: string | SessionMessageContent
metadata?: Record<string, any>
created_at: string
updated_at: string
}
```
### SessionMessageContent
```typescript
interface SessionMessageContent {
aiSDKChunks: UIMessageChunk[]
rawAgentMessages: any[]
agentResult?: any
agentType: string
}
```
## Testing Strategy
### Unit Tests
- Test each transform function independently
- Verify event emission sequences
- Validate data structure preservation
### Integration Tests
- Test complete flow from input to database
- Verify streaming behavior
- Test error handling and recovery
### Agent-Specific Tests
- Validate agent-specific transformations
- Test edge cases for each agent type
- Verify metadata preservation
## Future Enhancements
1. **Message Replay**: Ability to replay sessions from stored raw messages
2. **Format Migration**: Tools to migrate between agent formats
3. **Analytics**: Aggregate metrics from raw agent data
4. **Caching**: Cache transformed chunks for performance
5. **Compression**: Compress raw messages for storage efficiency
## Conclusion
This architecture provides a robust, extensible foundation for handling messages from multiple AI agents while maintaining data integrity and providing a consistent interface for the UI. The separation of concerns between agent-specific logic and core message handling ensures the system can evolve to support new agents and features without breaking existing functionality.

View File

@ -26,6 +26,7 @@ export abstract class BaseService {
protected static db: ReturnType<typeof drizzle> | null = null
protected static isInitialized = false
protected static initializationPromise: Promise<void> | null = null
protected jsonFields: string[] = ['built_in_tools', 'mcps', 'configuration', 'accessible_paths']
/**
* Initialize database with retry logic and proper error handling
@ -116,9 +117,8 @@ export abstract class BaseService {
protected serializeJsonFields(data: any): any {
const serialized = { ...data }
const jsonFields = ['built_in_tools', 'mcps', 'knowledges', 'configuration', 'accessible_paths', 'sub_agent_ids']
for (const field of jsonFields) {
for (const field of this.jsonFields) {
if (serialized[field] !== undefined) {
serialized[field] =
Array.isArray(serialized[field]) || typeof serialized[field] === 'object'
@ -134,9 +134,8 @@ export abstract class BaseService {
if (!data) return data
const deserialized = { ...data }
const jsonFields = ['built_in_tools', 'mcps', 'knowledges', 'configuration', 'accessible_paths', 'sub_agent_ids']
for (const field of jsonFields) {
for (const field of this.jsonFields) {
if (deserialized[field] && typeof deserialized[field] === 'string') {
try {
deserialized[field] = JSON.parse(deserialized[field])

View File

@ -1,20 +1,33 @@
CREATE TABLE `agents` (
`id` text PRIMARY KEY NOT NULL,
`type` text DEFAULT 'claude-code' NOT NULL,
`type` text NOT NULL,
`name` text NOT NULL,
`description` text,
`avatar` text,
`accessible_paths` text,
`instructions` text,
`model` text NOT NULL,
`plan_model` text,
`small_model` text,
`built_in_tools` text,
`mcps` text,
`knowledges` text,
`allowed_tools` text,
`configuration` text,
`created_at` text NOT NULL,
`updated_at` text NOT NULL
);
--> statement-breakpoint
CREATE TABLE `sessions` (
`id` text PRIMARY KEY NOT NULL,
`agent_id` text NOT NULL,
`name` text NOT NULL,
`description` text,
`accessible_paths` text,
`permission_mode` text DEFAULT 'default',
`max_steps` integer DEFAULT 10,
`instructions` text,
`model` text NOT NULL,
`plan_model` text,
`small_model` text,
`mcps` text,
`allowed_tools` text,
`configuration` text,
`created_at` text NOT NULL,
`updated_at` text NOT NULL
);
@ -22,33 +35,9 @@ CREATE TABLE `agents` (
CREATE TABLE `session_messages` (
`id` integer PRIMARY KEY AUTOINCREMENT NOT NULL,
`session_id` text NOT NULL,
`parent_id` integer,
`role` text NOT NULL,
`type` text NOT NULL,
`content` text NOT NULL,
`metadata` text,
`created_at` text NOT NULL,
`updated_at` text NOT NULL
);
--> statement-breakpoint
CREATE TABLE `sessions` (
`id` text PRIMARY KEY NOT NULL,
`name` text,
`main_agent_id` text NOT NULL,
`sub_agent_ids` text,
`user_goal` text,
`status` text DEFAULT 'idle' NOT NULL,
`external_session_id` text,
`model` text,
`plan_model` text,
`small_model` text,
`built_in_tools` text,
`mcps` text,
`knowledges` text,
`configuration` text,
`accessible_paths` text,
`permission_mode` text DEFAULT 'default',
`max_steps` integer DEFAULT 10,
`created_at` text NOT NULL,
`updated_at` text NOT NULL
);

View File

@ -1,7 +1,7 @@
{
"version": "6",
"dialect": "sqlite",
"id": "c8b65142-dcf4-4d20-8f0e-a17625b34fa7",
"id": "7ef99575-0fcf-471c-9da7-77e5cf8de6a2",
"prevId": "00000000-0000-0000-0000-000000000000",
"tables": {
"agents": {
@ -19,8 +19,7 @@
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false,
"default": "'claude-code'"
"autoincrement": false
},
"name": {
"name": "name",
@ -36,8 +35,8 @@
"notNull": false,
"autoincrement": false
},
"avatar": {
"name": "avatar",
"accessible_paths": {
"name": "accessible_paths",
"type": "text",
"primaryKey": false,
"notNull": false,
@ -71,13 +70,6 @@
"notNull": false,
"autoincrement": false
},
"built_in_tools": {
"name": "built_in_tools",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"mcps": {
"name": "mcps",
"type": "text",
@ -85,8 +77,8 @@
"notNull": false,
"autoincrement": false
},
"knowledges": {
"name": "knowledges",
"allowed_tools": {
"name": "allowed_tools",
"type": "text",
"primaryKey": false,
"notNull": false,
@ -99,6 +91,58 @@
"notNull": false,
"autoincrement": false
},
"created_at": {
"name": "created_at",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"updated_at": {
"name": "updated_at",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
}
},
"indexes": {},
"foreignKeys": {},
"compositePrimaryKeys": {},
"uniqueConstraints": {},
"checkConstraints": {}
},
"sessions": {
"name": "sessions",
"columns": {
"id": {
"name": "id",
"type": "text",
"primaryKey": true,
"notNull": true,
"autoincrement": false
},
"agent_id": {
"name": "agent_id",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"name": {
"name": "name",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"description": {
"name": "description",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"accessible_paths": {
"name": "accessible_paths",
"type": "text",
@ -106,21 +150,54 @@
"notNull": false,
"autoincrement": false
},
"permission_mode": {
"name": "permission_mode",
"instructions": {
"name": "instructions",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false,
"default": "'default'"
"autoincrement": false
},
"max_steps": {
"name": "max_steps",
"type": "integer",
"model": {
"name": "model",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"plan_model": {
"name": "plan_model",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false,
"default": 10
"autoincrement": false
},
"small_model": {
"name": "small_model",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"mcps": {
"name": "mcps",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"allowed_tools": {
"name": "allowed_tools",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"configuration": {
"name": "configuration",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"created_at": {
"name": "created_at",
@ -160,13 +237,6 @@
"notNull": true,
"autoincrement": false
},
"parent_id": {
"name": "parent_id",
"type": "integer",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"role": {
"name": "role",
"type": "text",
@ -174,13 +244,6 @@
"notNull": true,
"autoincrement": false
},
"type": {
"name": "type",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"content": {
"name": "content",
"type": "text",
@ -215,152 +278,6 @@
"compositePrimaryKeys": {},
"uniqueConstraints": {},
"checkConstraints": {}
},
"sessions": {
"name": "sessions",
"columns": {
"id": {
"name": "id",
"type": "text",
"primaryKey": true,
"notNull": true,
"autoincrement": false
},
"name": {
"name": "name",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"main_agent_id": {
"name": "main_agent_id",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"sub_agent_ids": {
"name": "sub_agent_ids",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"user_goal": {
"name": "user_goal",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"status": {
"name": "status",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false,
"default": "'idle'"
},
"external_session_id": {
"name": "external_session_id",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"model": {
"name": "model",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"plan_model": {
"name": "plan_model",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"small_model": {
"name": "small_model",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"built_in_tools": {
"name": "built_in_tools",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"mcps": {
"name": "mcps",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"knowledges": {
"name": "knowledges",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"configuration": {
"name": "configuration",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"accessible_paths": {
"name": "accessible_paths",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"permission_mode": {
"name": "permission_mode",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false,
"default": "'default'"
},
"max_steps": {
"name": "max_steps",
"type": "integer",
"primaryKey": false,
"notNull": false,
"autoincrement": false,
"default": 10
},
"created_at": {
"name": "created_at",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"updated_at": {
"name": "updated_at",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
}
},
"indexes": {},
"foreignKeys": {},
"compositePrimaryKeys": {},
"uniqueConstraints": {},
"checkConstraints": {}
}
},
"views": {},

View File

@ -5,8 +5,8 @@
{
"idx": 0,
"version": "6",
"when": 1757946608023,
"tag": "0000_bizarre_la_nuit",
"when": 1758035192486,
"tag": "0000_dry_luke_cage",
"breakpoints": true
}
]

View File

@ -2,25 +2,26 @@
* Drizzle ORM schema for agents table
*/
import { index, integer, sqliteTable, text } from 'drizzle-orm/sqlite-core'
import { index, sqliteTable, text } from 'drizzle-orm/sqlite-core'
export const agentsTable = sqliteTable('agents', {
id: text('id').primaryKey(),
type: text('type').notNull().default('claude-code'),
type: text('type').notNull(),
name: text('name').notNull(),
description: text('description'),
avatar: text('avatar'),
accessible_paths: text('accessible_paths'), // JSON array of directory paths the agent can access
instructions: text('instructions'),
model: text('model').notNull(), // Main model ID (required)
plan_model: text('plan_model'), // Optional plan/thinking model ID
small_model: text('small_model'), // Optional small/fast model ID
built_in_tools: text('built_in_tools'), // JSON array of built-in tool IDs
mcps: text('mcps'), // JSON array of MCP tool IDs
knowledges: text('knowledges'), // JSON array of enabled knowledge base IDs
configuration: text('configuration'), // JSON, extensible settings like temperature, top_p
accessible_paths: text('accessible_paths'), // JSON array of directory paths the agent can access
permission_mode: text('permission_mode').default('default'), // 'readOnly', 'acceptEdits', 'bypassPermissions'
max_steps: integer('max_steps').default(10), // Maximum number of steps the agent can take
allowed_tools: text('allowed_tools'), // JSON array of allowed tool IDs (whitelist)
configuration: text('configuration'), // JSON, extensible settings
created_at: text('created_at').notNull(),
updated_at: text('updated_at').notNull()
})
@ -28,10 +29,6 @@ export const agentsTable = sqliteTable('agents', {
// Indexes for agents table
export const agentsNameIdx = index('idx_agents_name').on(agentsTable.name)
export const agentsTypeIdx = index('idx_agents_type').on(agentsTable.type)
export const agentsModelIdx = index('idx_agents_model').on(agentsTable.model)
export const agentsPlanModelIdx = index('idx_agents_plan_model').on(agentsTable.plan_model)
export const agentsSmallModelIdx = index('idx_agents_small_model').on(agentsTable.small_model)
export const agentsPermissionModeIdx = index('idx_agents_permission_mode').on(agentsTable.permission_mode)
export const agentsCreatedAtIdx = index('idx_agents_created_at').on(agentsTable.created_at)
export type AgentRow = typeof agentsTable.$inferSelect

View File

@ -4,3 +4,4 @@
export * from './agents.schema'
export * from './sessions.schema'
export * from './messages.schema'

View File

@ -0,0 +1,28 @@
import { foreignKey, index, integer, sqliteTable, text } from 'drizzle-orm/sqlite-core'
import { sessionsTable } from './sessions.schema'
// session_messages table to log all messages, thoughts, actions, observations in a session
export const sessionMessagesTable = sqliteTable('session_messages', {
id: integer('id').primaryKey({ autoIncrement: true }),
session_id: text('session_id').notNull(),
role: text('role').notNull(), // 'user', 'agent', 'system', 'tool'
content: text('content').notNull(), // JSON structured data
metadata: text('metadata'), // JSON metadata (optional)
created_at: text('created_at').notNull(),
updated_at: text('updated_at').notNull()
})
// Indexes for session_messages table
export const sessionMessagesSessionIdIdx = index('idx_session_messages_session_id').on(sessionMessagesTable.session_id)
export const sessionMessagesCreatedAtIdx = index('idx_session_messages_created_at').on(sessionMessagesTable.created_at)
export const sessionMessagesUpdatedAtIdx = index('idx_session_messages_updated_at').on(sessionMessagesTable.updated_at)
// Foreign keys for session_messages table
export const sessionMessagesFkSession = foreignKey({
columns: [sessionMessagesTable.session_id],
foreignColumns: [sessionsTable.id],
name: 'fk_session_messages_session_id'
}).onDelete('cascade')
export type SessionMessageRow = typeof sessionMessagesTable.$inferSelect
export type InsertSessionMessageRow = typeof sessionMessagesTable.$inferInsert

View File

@ -2,78 +2,42 @@
* Drizzle ORM schema for sessions and session_logs tables
*/
import { foreignKey, index, integer, sqliteTable, text } from 'drizzle-orm/sqlite-core'
import { foreignKey, index, sqliteTable, text } from 'drizzle-orm/sqlite-core'
import { agentsTable } from './agents.schema'
export const sessionsTable = sqliteTable('sessions', {
id: text('id').primaryKey(),
name: text('name'), // Session name
main_agent_id: text('main_agent_id').notNull(), // Primary agent ID for the session
sub_agent_ids: text('sub_agent_ids'), // JSON array of sub-agent IDs involved in the session
user_goal: text('user_goal'), // Initial user goal for the session
status: text('status').notNull().default('idle'), // 'idle', 'running', 'completed', 'failed', 'stopped'
external_session_id: text('external_session_id'), // Agent session for external agent management/tracking
// AgentConfiguration fields that can override agent defaults
model: text('model'), // Main model ID (inherits from agent if null)
agent_id: text('agent_id').notNull(), // Primary agent ID for the session
name: text('name').notNull(),
description: text('description'),
accessible_paths: text('accessible_paths'), // JSON array of directory paths the agent can access
instructions: text('instructions'),
model: text('model').notNull(), // Main model ID (required)
plan_model: text('plan_model'), // Optional plan/thinking model ID
small_model: text('small_model'), // Optional small/fast model ID
built_in_tools: text('built_in_tools'), // JSON array of built-in tool IDs
mcps: text('mcps'), // JSON array of MCP tool IDs
knowledges: text('knowledges'), // JSON array of enabled knowledge base IDs
configuration: text('configuration'), // JSON, extensible settings like temperature, top_p
accessible_paths: text('accessible_paths'), // JSON array of directory paths the agent can access
permission_mode: text('permission_mode').default('default'),
max_steps: integer('max_steps').default(10), // Maximum number of steps the agent can take
allowed_tools: text('allowed_tools'), // JSON array of allowed tool IDs (whitelist)
configuration: text('configuration'), // JSON, extensible settings
created_at: text('created_at').notNull(),
updated_at: text('updated_at').notNull()
})
// Indexes for sessions table
export const sessionsNameIdx = index('idx_sessions_name').on(sessionsTable.name)
export const sessionsStatusIdx = index('idx_sessions_status').on(sessionsTable.status)
export const sessionsCreatedAtIdx = index('idx_sessions_created_at').on(sessionsTable.created_at)
export const sessionsExternalSessionIdIdx = index('idx_sessions_external_session_id').on(
sessionsTable.external_session_id
)
export const sessionsMainAgentIdIdx = index('idx_sessions_main_agent_id').on(sessionsTable.main_agent_id)
export const sessionsModelIdx = index('idx_sessions_model').on(sessionsTable.model)
export const sessionsPlanModelIdx = index('idx_sessions_plan_model').on(sessionsTable.plan_model)
export const sessionsSmallModelIdx = index('idx_sessions_small_model').on(sessionsTable.small_model)
export const sessionMessagesTable = sqliteTable('session_messages', {
id: integer('id').primaryKey({ autoIncrement: true }),
session_id: text('session_id').notNull(),
parent_id: integer('parent_id'), // Foreign Key to session_logs.id, nullable for tree structure
role: text('role').notNull(), // 'user', 'agent', 'system', 'tool'
type: text('type').notNull(), // 'message', 'thought', 'action', 'observation', etc.
content: text('content').notNull(), // JSON structured data
metadata: text('metadata'), // JSON metadata (optional)
created_at: text('created_at').notNull(),
updated_at: text('updated_at').notNull()
})
// Indexes for session_messages table
export const sessionMessagesSessionIdIdx = index('idx_session_messages_session_id').on(sessionMessagesTable.session_id)
export const sessionMessagesParentIdIdx = index('idx_session_messages_parent_id').on(sessionMessagesTable.parent_id)
export const sessionMessagesRoleIdx = index('idx_session_messages_role').on(sessionMessagesTable.role)
export const sessionMessagesTypeIdx = index('idx_session_messages_type').on(sessionMessagesTable.type)
export const sessionMessagesCreatedAtIdx = index('idx_session_messages_created_at').on(sessionMessagesTable.created_at)
export const sessionMessagesUpdatedAtIdx = index('idx_session_messages_updated_at').on(sessionMessagesTable.updated_at)
// Foreign keys for session_messages table
export const sessionMessagesFkSession = foreignKey({
columns: [sessionMessagesTable.session_id],
foreignColumns: [sessionsTable.id],
name: 'fk_session_messages_session_id'
// Foreign keys for sessions table
export const sessionsFkAgent = foreignKey({
columns: [sessionsTable.agent_id],
foreignColumns: [agentsTable.id],
name: 'fk_session_agent_id'
}).onDelete('cascade')
export const sessionMessagesFkParent = foreignKey({
columns: [sessionMessagesTable.parent_id],
foreignColumns: [sessionMessagesTable.id],
name: 'fk_session_messages_parent_id'
})
// Indexes for sessions table
export const sessionsCreatedAtIdx = index('idx_sessions_created_at').on(sessionsTable.created_at)
export const sessionsMainAgentIdIdx = index('idx_sessions_agent_id').on(sessionsTable.agent_id)
export const sessionsModelIdx = index('idx_sessions_model').on(sessionsTable.model)
export type SessionRow = typeof sessionsTable.$inferSelect
export type InsertSessionRow = typeof sessionsTable.$inferInsert
export type SessionMessageRow = typeof sessionMessagesTable.$inferSelect
export type InsertSessionMessageRow = typeof sessionMessagesTable.$inferInsert

View File

@ -0,0 +1,27 @@
// Agent-agnostic streaming interface
// This interface should be implemented by all agent services
import { EventEmitter } from 'node:events'
import { UIMessageChunk } from 'ai'
// Generic agent stream event that works with any agent type
export interface AgentStreamEvent {
type: 'chunk' | 'error' | 'complete'
chunk?: UIMessageChunk // Standard AI SDK chunk for UI consumption
rawAgentMessage?: any // Agent-specific raw message (SDKMessage for Claude Code, different for other agents)
error?: Error
agentResult?: any // Agent-specific result data
}
// Agent stream interface that all agents should implement
export interface AgentStream extends EventEmitter {
emit(event: 'data', data: AgentStreamEvent): boolean
on(event: 'data', listener: (data: AgentStreamEvent) => void): this
once(event: 'data', listener: (data: AgentStreamEvent) => void): this
}
// Base agent service interface
export interface AgentServiceInterface {
invoke(prompt: string, cwd: string, sessionId?: string, options?: any): AgentStream
}

View File

@ -1,50 +1,12 @@
import path from 'node:path'
import { getDataPath } from '@main/utils'
import type { AgentEntity, AgentType, PermissionMode } from '@types'
import type { AgentEntity, CreateAgentRequest, GetAgentResponse, ListOptions, UpdateAgentRequest } from '@types'
import { count, eq } from 'drizzle-orm'
import { BaseService } from '../BaseService'
import { type AgentRow, agentsTable, type InsertAgentRow } from '../database/schema'
// import { builtinTools } from './claudecode/tools'
export interface CreateAgentRequest {
type: AgentType
name: string
description?: string
avatar?: string
instructions?: string
model: string
// plan_model?: string
// small_model?: string
// mcps?: string[]
// knowledges?: string[]
// configuration?: Record<string, any>
accessible_paths?: string[]
permission_mode?: PermissionMode
max_steps?: number
}
export interface UpdateAgentRequest {
name?: string
description?: string
avatar?: string
instructions?: string
model?: string
// plan_model?: string
// small_model?: string
// mcps?: string[]
// knowledges?: string[]
// configuration?: Record<string, any>
accessible_paths?: string[]
permission_mode?: PermissionMode
max_steps?: number
}
export interface ListAgentsOptions {
limit?: number
offset?: number
}
import { builtinTools } from './claudecode/tools'
export class AgentService extends BaseService {
private static instance: AgentService | null = null
@ -61,49 +23,36 @@ export class AgentService extends BaseService {
}
// Agent Methods
async createAgent(agentData: CreateAgentRequest): Promise<AgentEntity> {
async createAgent(req: CreateAgentRequest): Promise<AgentEntity> {
this.ensureInitialized()
const id = `agent_${Date.now()}_${Math.random().toString(36).substring(2, 11)}`
const now = new Date().toISOString()
if (!agentData.accessible_paths || agentData.accessible_paths.length === 0) {
if (!req.accessible_paths || req.accessible_paths.length === 0) {
const defaultPath = path.join(getDataPath(), 'agents', id)
agentData.accessible_paths = [defaultPath]
req.accessible_paths = [defaultPath]
}
const serializedData = this.serializeJsonFields(agentData)
const serializedReq = this.serializeJsonFields(req)
const insertData: InsertAgentRow = {
id,
type: serializedData.type,
name: serializedData.name,
description: serializedData.description || null,
avatar: serializedData.avatar || null,
instructions: serializedData.instructions || null,
model: serializedData.model,
plan_model: serializedData.plan_model || null,
small_model: serializedData.small_model || null,
built_in_tools: serializedData.built_in_tools || null,
mcps: serializedData.mcps || null,
knowledges: serializedData.knowledges || null,
configuration: serializedData.configuration || null,
accessible_paths: serializedData.accessible_paths || null,
permission_mode: serializedData.permission_mode || 'default',
max_steps: serializedData.max_steps || 10,
type: req.type,
name: req.name || 'New Agent',
description: req.description,
instructions: req.instructions || 'You are a helpful assistant.',
model: req.model,
plan_model: req.plan_model,
small_model: req.small_model,
configuration: serializedReq.configuration,
accessible_paths: serializedReq.accessible_paths,
created_at: now,
updated_at: now
}
if (serializedData.name === 'claude-code') {
// insertData.built_in_tools = JSON.stringify(builtinTools)
insertData.built_in_tools = JSON.stringify([])
}
await this.database.insert(agentsTable).values(insertData)
const result = await this.database.select().from(agentsTable).where(eq(agentsTable.id, id)).limit(1)
if (!result[0]) {
throw new Error('Failed to create agent')
}
@ -112,7 +61,7 @@ export class AgentService extends BaseService {
return agent
}
async getAgent(id: string): Promise<AgentEntity | null> {
async getAgent(id: string): Promise<GetAgentResponse | null> {
this.ensureInitialized()
const result = await this.database.select().from(agentsTable).where(eq(agentsTable.id, id)).limit(1)
@ -121,18 +70,19 @@ export class AgentService extends BaseService {
return null
}
return this.deserializeJsonFields(result[0]) as AgentEntity
const agent = this.deserializeJsonFields(result[0]) as GetAgentResponse
if (agent.type === 'claude-code') {
agent.built_in_tools = builtinTools
}
return agent
}
async listAgents(options: ListAgentsOptions = {}): Promise<{ agents: AgentEntity[]; total: number }> {
this.ensureInitialized()
async listAgents(options: ListOptions = {}): Promise<{ agents: GetAgentResponse[]; total: number }> {
this.ensureInitialized() // Build query with pagination
// Get total count
const totalResult = await this.database.select({ count: count() }).from(agentsTable)
const total = totalResult[0].count
// Build query with pagination
const baseQuery = this.database.select().from(agentsTable).orderBy(agentsTable.created_at)
const result =
@ -142,12 +92,18 @@ export class AgentService extends BaseService {
: await baseQuery.limit(options.limit)
: await baseQuery
const agents = result.map((row) => this.deserializeJsonFields(row)) as AgentEntity[]
const agents = result.map((row) => this.deserializeJsonFields(row)) as GetAgentResponse[]
return { agents, total }
agents.forEach((agent) => {
if (agent.type === 'claude-code') {
agent.built_in_tools = builtinTools
}
})
return { agents, total: totalResult[0].count }
}
async updateAgent(id: string, updates: UpdateAgentRequest): Promise<AgentEntity | null> {
async updateAgent(id: string, updates: UpdateAgentRequest): Promise<GetAgentResponse | null> {
this.ensureInitialized()
// Check if agent exists
@ -166,22 +122,15 @@ export class AgentService extends BaseService {
// Only update fields that are provided
if (serializedUpdates.name !== undefined) updateData.name = serializedUpdates.name
if (serializedUpdates.description !== undefined) updateData.description = serializedUpdates.description
if (serializedUpdates.avatar !== undefined) updateData.avatar = serializedUpdates.avatar
if (serializedUpdates.instructions !== undefined) updateData.instructions = serializedUpdates.instructions
if (serializedUpdates.model !== undefined) updateData.model = serializedUpdates.model
if (serializedUpdates.plan_model !== undefined) updateData.plan_model = serializedUpdates.plan_model
if (serializedUpdates.small_model !== undefined) updateData.small_model = serializedUpdates.small_model
if (serializedUpdates.built_in_tools !== undefined) updateData.built_in_tools = serializedUpdates.built_in_tools
if (serializedUpdates.mcps !== undefined) updateData.mcps = serializedUpdates.mcps
if (serializedUpdates.knowledges !== undefined) updateData.knowledges = serializedUpdates.knowledges
if (serializedUpdates.configuration !== undefined) updateData.configuration = serializedUpdates.configuration
if (serializedUpdates.accessible_paths !== undefined)
updateData.accessible_paths = serializedUpdates.accessible_paths
if (serializedUpdates.permission_mode !== undefined) updateData.permission_mode = serializedUpdates.permission_mode
if (serializedUpdates.max_steps !== undefined) updateData.max_steps = serializedUpdates.max_steps
await this.database.update(agentsTable).set(updateData).where(eq(agentsTable.id, id))
return await this.getAgent(id)
}

View File

@ -1,7 +1,12 @@
import { EventEmitter } from 'node:events'
import { loggerService } from '@logger'
import type { AgentSessionEntity, SessionMessageEntity } from '@types'
import type {
AgentSessionMessageEntity,
CreateSessionMessageRequest,
GetAgentSessionResponse,
ListOptions,
} from '@types'
import { UIMessageChunk } from 'ai'
import { count, eq } from 'drizzle-orm'
@ -11,27 +16,9 @@ import ClaudeCodeService from './claudecode'
const logger = loggerService.withContext('SessionMessageService')
export interface CreateSessionMessageRequest {
session_id: string
parent_id?: number
role: 'user' | 'agent' | 'system' | 'tool'
type: string
content: string
metadata?: Record<string, any>
}
export interface UpdateSessionMessageRequest {
content?: Record<string, any>
metadata?: Record<string, any>
}
export interface ListSessionMessagesOptions {
limit?: number
offset?: number
}
export class SessionMessageService extends BaseService {
private static instance: SessionMessageService | null = null
private cc: ClaudeCodeService = new ClaudeCodeService()
static getInstance(): SessionMessageService {
if (!SessionMessageService.instance) {
@ -58,8 +45,8 @@ export class SessionMessageService extends BaseService {
async listSessionMessages(
sessionId: string,
options: ListSessionMessagesOptions = {}
): Promise<{ messages: SessionMessageEntity[]; total: number }> {
options: ListOptions = {}
): Promise<{ messages: AgentSessionMessageEntity[]; total: number }> {
this.ensureInitialized()
// Get total count
@ -84,128 +71,144 @@ export class SessionMessageService extends BaseService {
: await baseQuery.limit(options.limit)
: await baseQuery
const messages = result.map((row) => this.deserializeSessionMessage(row)) as SessionMessageEntity[]
const messages = result.map((row) => this.deserializeSessionMessage(row)) as AgentSessionMessageEntity[]
return { messages, total }
}
createSessionMessageStream(session: AgentSessionEntity, messageData: CreateSessionMessageRequest): EventEmitter {
createSessionMessageStream(session: GetAgentSessionResponse, messageData: CreateSessionMessageRequest): EventEmitter {
this.ensureInitialized()
// Create a new EventEmitter to manage the session message lifecycle
const sessionStream = new EventEmitter()
// Validate parent exists if specified
if (messageData.parent_id) {
this.sessionMessageExists(messageData.parent_id)
.then((exists) => {
if (!exists) {
process.nextTick(() => {
sessionStream.emit('data', {
type: 'error',
error: new Error(`Parent message with id ${messageData.parent_id} does not exist`)
})
})
return
}
// Start the Claude Code stream after validation passes
this.startClaudeCodeStream(session, messageData, sessionStream)
})
.catch((error) => {
process.nextTick(() => {
sessionStream.emit('data', {
type: 'error',
error: error as Error
})
})
})
} else {
// No parent validation needed, start immediately
this.startClaudeCodeStream(session, messageData, sessionStream)
}
// No parent validation needed, start immediately
this.startClaudeCodeStream(session, messageData, sessionStream)
return sessionStream
}
private startClaudeCodeStream(
session: AgentSessionEntity,
messageData: CreateSessionMessageRequest,
session: GetAgentSessionResponse,
req: CreateSessionMessageRequest,
sessionStream: EventEmitter
): void {
const cc = new ClaudeCodeService()
const previousMessages = session.messages || []
let session_id: string = ''
if (previousMessages.length > 0) {
session_id = previousMessages[0].session_id
}
// Create the streaming Claude Code invocation
const claudeStream = cc.invokeStream(
messageData.content,
session.accessible_paths[0],
session.external_session_id,
{
permissionMode: session.permission_mode,
maxTurns: session.max_steps
}
)
logger.debug('Claude Code stream message data:', { message: req, session_id })
let sessionMessage: SessionMessageEntity | null = null
// Create the streaming agent invocation (using invokeStream for streaming)
const claudeStream = this.cc.invoke(req.content, session.accessible_paths[0], session_id, {
permissionMode: session.configuration?.permissionMode || 'default',
maxTurns: session.configuration?.maxTurns || 10
})
// Handle Claude Code stream events
let sessionMessage: AgentSessionMessageEntity | null = null
const streamedChunks: UIMessageChunk[] = []
const rawAgentMessages: any[] = [] // Generic agent messages storage
// Handle agent stream events (agent-agnostic)
claudeStream.on('data', async (event: any) => {
try {
switch (event.type) {
case 'chunk':
// Forward UIMessageChunk directly
sessionStream.emit('data', {
type: 'chunk',
chunk: event.chunk as UIMessageChunk
})
// Forward UIMessageChunk directly and collect raw agent messages
if (event.chunk) {
const chunk = event.chunk as UIMessageChunk
streamedChunks.push(chunk)
// Collect raw agent message if available (agent-agnostic)
if (event.rawAgentMessage) {
rawAgentMessages.push(event.rawAgentMessage)
}
sessionStream.emit('data', {
type: 'chunk',
chunk
})
} else {
logger.warn('Received agent chunk event without chunk payload')
}
break
case 'error':
sessionStream.emit('data', {
type: 'error',
error: event.error
error: event.error || (event.data?.stderr ? new Error(event.data.stderr) : undefined)
})
break
case 'complete': {
// Save the final message to database when Claude Code completes
logger.info('Claude Code stream completed, saving message to database')
// Save the final message to database when agent completes
logger.info('Agent stream completed, saving message to database')
const now = new Date().toISOString()
const insertData: InsertSessionMessageRow = {
session_id: messageData.session_id,
parent_id: messageData.parent_id || null,
role: messageData.role,
type: messageData.type,
content: JSON.stringify(event.result),
metadata: messageData.metadata ? JSON.stringify(messageData.metadata) : null,
created_at: now,
updated_at: now
// Extract additional raw agent messages from agentResult if available
if (event.agentResult?.rawSDKMessages) {
rawAgentMessages.push(...event.agentResult.rawSDKMessages)
}
const result = await this.database.insert(sessionMessagesTable).values(insertData).returning()
if (result[0]) {
sessionMessage = this.deserializeSessionMessage(result[0]) as SessionMessageEntity
logger.info(`Session message saved with ID: ${sessionMessage.id}`)
// Emit the complete event with the saved message
sessionStream.emit('data', {
type: 'complete',
result: event.result,
message: sessionMessage
})
} else {
sessionStream.emit('data', {
type: 'error',
error: new Error('Failed to save session message to database')
})
// Create structured content with both AI SDK format and raw data
const structuredContent = {
aiSDKChunks: streamedChunks, // For UI consumption
rawAgentMessages: rawAgentMessages, // Original agent-specific messages
agentResult: event.agentResult, // Complete result from the agent
agentType: event.agentResult?.agentType || 'unknown' // Store agent type for future reference
}
// const now = new Date().toISOString()
// const insertData: InsertSessionMessageRow = {
// session_id: req.session_id,
// parent_id: req.parent_id || null,
// role: req.role,
// type: req.type,
// content: JSON.stringify(structuredContent),
// metadata: req.metadata
// ? JSON.stringify({
// ...req.metadata,
// chunkCount: streamedChunks.length,
// rawMessageCount: rawAgentMessages.length,
// agentType: event.agentResult?.agentType || 'unknown',
// completedAt: now
// })
// : JSON.stringify({
// chunkCount: streamedChunks.length,
// rawMessageCount: rawAgentMessages.length,
// agentType: event.agentResult?.agentType || 'unknown',
// completedAt: now
// }),
// created_at: now,
// updated_at: now
// }
// const result = await this.database.insert(sessionMessagesTable).values(insertData).returning()
// if (result[0]) {
// sessionMessage = this.deserializeSessionMessage(result[0]) as AgentSessionMessageEntity
// logger.info(`Session message saved with ID: ${sessionMessage.id}`)
// // Emit the complete event with the saved message and structured data
// sessionStream.emit('data', {
// type: 'complete',
// result: structuredContent,
// message: sessionMessage
// })
// } else {
// sessionStream.emit('data', {
// type: 'error',
// error: new Error('Failed to save session message to database')
// })
// }
break
}
default:
logger.warn('Unknown event type from Claude Code service:', { type: event.type })
logger.warn('Unknown event type from Claude Code service:', {
type: event.type
})
break
}
} catch (error) {
@ -218,7 +221,7 @@ export class SessionMessageService extends BaseService {
})
}
private deserializeSessionMessage(data: any): SessionMessageEntity {
private deserializeSessionMessage(data: any): AgentSessionMessageEntity {
if (!data) return data
const deserialized = { ...data }

View File

@ -1,52 +1,15 @@
import type { AgentSessionEntity, PermissionMode, SessionStatus } from '@types'
import type {
AgentSessionEntity,
CreateSessionRequest,
GetAgentSessionResponse,
ListOptions,
UpdateSessionRequest
} from '@types'
import { and, count, eq, type SQL } from 'drizzle-orm'
import { BaseService } from '../BaseService'
import { agentsTable, type InsertSessionRow, type SessionRow, sessionsTable } from '../database/schema'
export interface CreateSessionRequest {
name?: string
main_agent_id: string
sub_agent_ids?: string[]
user_goal?: string
status?: SessionStatus
external_session_id?: string
model?: string
plan_model?: string
small_model?: string
built_in_tools?: string[]
mcps?: string[]
knowledges?: string[]
configuration?: Record<string, any>
accessible_paths?: string[]
permission_mode?: PermissionMode
max_steps?: number
}
export interface UpdateSessionRequest {
name?: string
main_agent_id?: string
sub_agent_ids?: string[]
user_goal?: string
status?: SessionStatus
external_session_id?: string
model?: string
plan_model?: string
small_model?: string
built_in_tools?: string[]
mcps?: string[]
knowledges?: string[]
configuration?: Record<string, any>
accessible_paths?: string[]
permission_mode?: PermissionMode
max_steps?: number
}
export interface ListSessionsOptions {
limit?: number
offset?: number
status?: SessionStatus
}
export class SessionService extends BaseService {
private static instance: SessionService | null = null
@ -62,18 +25,14 @@ export class SessionService extends BaseService {
await BaseService.initialize()
}
async createSession(sessionData: CreateSessionRequest): Promise<AgentSessionEntity> {
async createSession(req: CreateSessionRequest): Promise<AgentSessionEntity> {
this.ensureInitialized()
// Validate agent exists - we'll need to import AgentService for this check
// For now, we'll skip this validation to avoid circular dependencies
// The database foreign key constraint will handle this
const agents = await this.database
.select()
.from(agentsTable)
.where(eq(agentsTable.id, sessionData.main_agent_id))
.limit(1)
const agents = await this.database.select().from(agentsTable).where(eq(agentsTable.id, req.agent_id)).limit(1)
if (!agents[0]) {
throw new Error('Agent not found')
}
@ -83,20 +42,9 @@ export class SessionService extends BaseService {
const now = new Date().toISOString()
// inherit configuration from agent by default, can be overridden by sessionData
sessionData = {
...{
model: agent.model,
plan_model: agent.plan_model,
small_model: agent.small_model,
mcps: agent.mcps,
knowledges: agent.knowledges,
configuration: agent.configuration,
accessible_paths: agent.accessible_paths,
permission_mode: agent.permission_mode,
max_steps: agent.max_steps,
status: 'idle'
},
...sessionData
const sessionData: Partial<CreateSessionRequest> = {
...agent,
...req
}
const serializedData = this.serializeJsonFields(sessionData)
@ -104,20 +52,14 @@ export class SessionService extends BaseService {
const insertData: InsertSessionRow = {
id,
name: serializedData.name || null,
main_agent_id: serializedData.main_agent_id,
sub_agent_ids: serializedData.sub_agent_ids || null,
user_goal: serializedData.user_goal || null,
status: serializedData.status || 'idle',
external_session_id: serializedData.external_session_id || null,
agent_id: serializedData.agent_id,
description: serializedData.description || null,
model: serializedData.model || null,
plan_model: serializedData.plan_model || null,
small_model: serializedData.small_model || null,
mcps: serializedData.mcps || null,
knowledges: serializedData.knowledges || null,
configuration: serializedData.configuration || null,
accessible_paths: serializedData.accessible_paths || null,
permission_mode: serializedData.permission_mode || 'readOnly',
max_steps: serializedData.max_steps || 10,
created_at: now,
updated_at: now
}
@ -133,7 +75,7 @@ export class SessionService extends BaseService {
return this.deserializeJsonFields(result[0]) as AgentSessionEntity
}
async getSession(id: string): Promise<AgentSessionEntity | null> {
async getSession(id: string): Promise<GetAgentSessionResponse | null> {
this.ensureInitialized()
const result = await this.database.select().from(sessionsTable).where(eq(sessionsTable.id, id)).limit(1)
@ -142,7 +84,9 @@ export class SessionService extends BaseService {
return null
}
return this.deserializeJsonFields(result[0]) as AgentSessionEntity
const session = this.deserializeJsonFields(result[0]) as GetAgentSessionResponse
return session
}
async getSessionWithAgent(id: string): Promise<any | null> {
@ -155,17 +99,14 @@ export class SessionService extends BaseService {
async listSessions(
agentId?: string,
options: ListSessionsOptions = {}
options: ListOptions = {}
): Promise<{ sessions: AgentSessionEntity[]; total: number }> {
this.ensureInitialized()
// Build where conditions
const whereConditions: SQL[] = []
if (agentId) {
whereConditions.push(eq(sessionsTable.main_agent_id, agentId))
}
if (options.status) {
whereConditions.push(eq(sessionsTable.status, options.status))
whereConditions.push(eq(sessionsTable.agent_id, agentId))
}
const whereClause =
@ -190,12 +131,12 @@ export class SessionService extends BaseService {
: await baseQuery.limit(options.limit)
: await baseQuery
const sessions = result.map((row) => this.deserializeJsonFields(row)) as AgentSessionEntity[]
const sessions = result.map((row) => this.deserializeJsonFields(row)) as GetAgentSessionResponse[]
return { sessions, total }
}
async updateSession(id: string, updates: UpdateSessionRequest): Promise<AgentSessionEntity | null> {
async updateSession(id: string, updates: UpdateSessionRequest): Promise<GetAgentSessionResponse | null> {
this.ensureInitialized()
// Check if session exists
@ -216,46 +157,22 @@ export class SessionService extends BaseService {
// Only update fields that are provided
if (serializedUpdates.name !== undefined) updateData.name = serializedUpdates.name
if (serializedUpdates.main_agent_id !== undefined) updateData.main_agent_id = serializedUpdates.main_agent_id
if (serializedUpdates.sub_agent_ids !== undefined) updateData.sub_agent_ids = serializedUpdates.sub_agent_ids
if (serializedUpdates.user_goal !== undefined) updateData.user_goal = serializedUpdates.user_goal
if (serializedUpdates.status !== undefined) updateData.status = serializedUpdates.status
if (serializedUpdates.external_session_id !== undefined)
updateData.external_session_id = serializedUpdates.external_session_id
if (serializedUpdates.model !== undefined) updateData.model = serializedUpdates.model
if (serializedUpdates.plan_model !== undefined) updateData.plan_model = serializedUpdates.plan_model
if (serializedUpdates.small_model !== undefined) updateData.small_model = serializedUpdates.small_model
if (serializedUpdates.built_in_tools !== undefined) updateData.built_in_tools = serializedUpdates.built_in_tools
if (serializedUpdates.mcps !== undefined) updateData.mcps = serializedUpdates.mcps
if (serializedUpdates.knowledges !== undefined) updateData.knowledges = serializedUpdates.knowledges
if (serializedUpdates.configuration !== undefined) updateData.configuration = serializedUpdates.configuration
if (serializedUpdates.accessible_paths !== undefined)
updateData.accessible_paths = serializedUpdates.accessible_paths
if (serializedUpdates.permission_mode !== undefined) updateData.permission_mode = serializedUpdates.permission_mode
if (serializedUpdates.max_steps !== undefined) updateData.max_steps = serializedUpdates.max_steps
await this.database.update(sessionsTable).set(updateData).where(eq(sessionsTable.id, id))
return await this.getSession(id)
}
async updateSessionStatus(id: string, status: SessionStatus): Promise<AgentSessionEntity | null> {
this.ensureInitialized()
const now = new Date().toISOString()
const result = await this.database
.update(sessionsTable)
.set({ status, updated_at: now })
.where(eq(sessionsTable.id, id))
if (result.rowsAffected === 0) {
return null
}
return await this.getSession(id)
}
async deleteSession(id: string): Promise<boolean> {
this.ensureInitialized()

View File

@ -5,8 +5,8 @@ import { createRequire } from 'node:module'
import { Options, SDKMessage } from '@anthropic-ai/claude-code'
import { loggerService } from '@logger'
import { UIMessageChunk } from 'ai'
import { AgentServiceInterface, AgentStream, AgentStreamEvent } from '../../interfaces/AgentStreamInterface'
import { transformSDKMessageToUIChunk } from './transform'
const require_ = createRequire(import.meta.url)
@ -21,34 +21,13 @@ interface ClaudeCodeResult {
exitCode?: number
}
interface ClaudeCodeStreamEvent {
type: 'message' | 'error' | 'complete'
data?: any
error?: Error
result?: ClaudeCodeResult
class ClaudeCodeStream extends EventEmitter implements AgentStream {
declare emit: (event: 'data', data: AgentStreamEvent) => boolean
declare on: (event: 'data', listener: (data: AgentStreamEvent) => void) => this
declare once: (event: 'data', listener: (data: AgentStreamEvent) => void) => this
}
class ClaudeCodeStream extends EventEmitter {
declare emit: (event: 'data', data: ClaudeCodeStreamEvent) => boolean
declare on: (event: 'data', listener: (data: ClaudeCodeStreamEvent) => void) => this
declare once: (event: 'data', listener: (data: ClaudeCodeStreamEvent) => void) => this
}
// AI SDK compatible stream events
interface AISDKStreamEvent {
type: 'chunk' | 'error' | 'complete'
chunk?: UIMessageChunk
error?: Error
result?: ClaudeCodeResult
}
class AISDKStream extends EventEmitter {
declare emit: (event: 'data', data: AISDKStreamEvent) => boolean
declare on: (event: 'data', listener: (data: AISDKStreamEvent) => void) => this
declare once: (event: 'data', listener: (data: AISDKStreamEvent) => void) => this
}
class ClaudeCodeService {
class ClaudeCodeService implements AgentServiceInterface {
private claudeExecutablePath: string
constructor() {
@ -56,48 +35,8 @@ class ClaudeCodeService {
this.claudeExecutablePath = require_.resolve('@anthropic-ai/claude-code/cli.js')
}
async invoke(prompt: string, cwd: string, session_id?: string, base?: Options): Promise<ClaudeCodeResult> {
// Ensure Electron behaves like Node for any child process that resolves to process.execPath
// process.env.ELECTRON_RUN_AS_NODE = '1'
const args: string[] = [this.claudeExecutablePath, '--output-format', 'stream-json', '--verbose', 'cwd', cwd]
if (session_id) {
args.push('--resume', session_id)
}
if (base?.maxTurns) {
args.push('--max-turns', base.maxTurns.toString())
}
if (base?.permissionMode) {
args.push('--permission-mode', base.permissionMode)
}
args.push('--print', prompt)
logger.info('Spawning Claude Code process', { args, cwd })
const p = spawn(process.execPath, args, {
env: { ...process.env, ELECTRON_RUN_AS_NODE: '1' },
stdio: ['pipe', 'pipe', 'pipe'],
shell: false,
detached: false
})
// Log process creation
logger.info('Process created', { pid: p.pid })
// Close stdin immediately since we're passing the prompt via --print
if (p.stdin) {
p.stdin.end()
logger.debug('Closed stdin')
}
return this.setupProcessHandlers(p)
}
invokeStream(prompt: string, cwd: string, session_id?: string, base?: Options): EventEmitter {
const aiStream = new AISDKStream()
const rawStream = new ClaudeCodeStream()
invoke(prompt: string, cwd: string, session_id?: string, base?: Options): AgentStream {
const aiStream = new ClaudeCodeStream()
// Spawn process with same parameters as invoke
const args: string[] = [this.claudeExecutablePath, '--output-format', 'stream-json', '--verbose']
@ -132,8 +71,7 @@ class ClaudeCodeService {
logger.debug('Closed stdin for streaming process')
}
this.setupStreamingHandlers(p, rawStream)
this.setupAISDKTransform(rawStream, aiStream)
this.setupStreamingHandlers(p, aiStream)
return aiStream
}
@ -146,34 +84,59 @@ class ClaudeCodeService {
let stderrData = ''
const jsonOutput: any[] = []
let hasCompleted = false
let stdoutBuffer = ''
const startTime = Date.now()
const emitChunks = (sdkMessage: SDKMessage) => {
jsonOutput.push(sdkMessage)
const chunks = transformSDKMessageToUIChunk(sdkMessage)
for (const chunk of chunks) {
stream.emit('data', {
type: 'chunk',
chunk,
rawAgentMessage: sdkMessage // Store Claude Code specific SDKMessage as generic agent message
})
}
}
// Handle stdout with streaming events
if (process.stdout) {
process.stdout.setEncoding('utf8')
process.stdout.on('data', (data: string) => {
stdoutData += data
stdoutBuffer += data
logger.debug('Streaming stdout chunk:', { length: data.length })
// Parse JSON stream output line by line and emit events
const lines = data.split('\n')
for (const line of lines) {
let newlineIndex = stdoutBuffer.indexOf('\n')
while (newlineIndex !== -1) {
const line = stdoutBuffer.slice(0, newlineIndex)
stdoutBuffer = stdoutBuffer.slice(newlineIndex + 1)
const trimmed = line.trim()
if (!trimmed) continue
try {
const parsed = JSON.parse(trimmed)
stream.emit('data', { type: 'message', data: parsed })
logger.debug('Parsed JSON line', { parsed })
} catch {
// If you expect NDJSON only, you may want to keep this in buffer instead of emitting.
stream.emit('data', { type: 'message', data: { text: trimmed } })
logger.debug('Non-JSON line', { line: trimmed })
if (trimmed) {
try {
const parsed = JSON.parse(trimmed) as SDKMessage
emitChunks(parsed)
logger.debug('Parsed JSON line', { parsed })
} catch (error) {
logger.debug('Non-JSON line', { line: trimmed })
}
}
newlineIndex = stdoutBuffer.indexOf('\n')
}
})
process.stdout.on('end', () => {
const trimmed = stdoutBuffer.trim()
if (trimmed) {
try {
const parsed = JSON.parse(trimmed) as SDKMessage
emitChunks(parsed)
logger.debug('Parsed JSON line on stream end', { parsed })
} catch (error) {
logger.debug('Non-JSON remainder on stdout end', { line: trimmed })
}
}
logger.debug('Streaming stdout ended')
})
}
@ -183,12 +146,12 @@ class ClaudeCodeService {
process.stderr.setEncoding('utf8')
process.stderr.on('data', (data: string) => {
stderrData += data
logger.warn('Streaming stderr chunk:', { data: data.trim() })
// Emit stderr as error events
const message = data.trim()
if (!message) return
logger.warn('Streaming stderr chunk:', { data: message })
stream.emit('data', {
type: 'error',
data: { stderr: data.trim() }
error: new Error(message)
})
})
@ -225,10 +188,14 @@ class ClaudeCodeService {
error
}
// Emit completion event
// Emit completion event with agent-specific result
stream.emit('data', {
type: 'complete',
result
agentResult: {
...result,
rawSDKMessages: jsonOutput, // Claude Code specific: all collected SDK messages
agentType: 'claude-code' // Identify the agent type
}
})
}
@ -275,45 +242,6 @@ class ClaudeCodeService {
process.on('error', () => clearTimeout(timeout))
}
/**
* Transform raw Claude Code stream events to AI SDK format
*/
private setupAISDKTransform(rawStream: ClaudeCodeStream, aiStream: AISDKStream): void {
rawStream.on('data', (event: ClaudeCodeStreamEvent) => {
try {
switch (event.type) {
case 'message':
// Transform SDKMessage to UIMessageChunk
if (event.data) {
const chunks = transformSDKMessageToUIChunk(event.data as SDKMessage)
for (const chunk of chunks) {
aiStream.emit('data', { type: 'chunk', chunk })
}
}
break
case 'error':
aiStream.emit('data', { type: 'error', error: event.error })
break
case 'complete':
aiStream.emit('data', { type: 'complete', result: event.result })
break
default:
logger.warn('Unknown raw stream event type:', { type: (event as any).type })
break
}
} catch (error) {
logger.error('Error transforming stream event:', { error, event })
aiStream.emit('data', {
type: 'error',
error: error instanceof Error ? error : new Error('Transform error')
})
}
})
}
/**
* Set up process event handlers and return a promise that resolves with complete output
*/

View File

@ -74,7 +74,14 @@ export function transformSDKMessageToUIChunk(sdkMessage: SDKMessage): UIMessageC
function sdkMessageToProviderMetadata(message: SDKMessage): ProviderMetadata {
const meta: ProviderMetadata = {
raw: message as Record<string, any>
raw: message as Record<string, any>,
claudeCode: {
originalSDKMessage: JSON.parse(JSON.stringify(message)), // Serialize to ensure JSON compatibility
uuid: message.uuid || null,
session_id: message.session_id || null,
timestamp: new Date().toISOString(),
type: message.type
}
}
return meta
}
@ -392,6 +399,10 @@ function handleResultMessage(message: Extract<SDKMessage, { type: 'result' }>):
})
}
// Always emit a finish chunk at the end
chunks.push({
type: 'finish'
})
return chunks
}

View File

@ -16,10 +16,5 @@ export { sessionMessageService } from './SessionMessageService'
export { sessionService } from './SessionService'
// Type definitions for service requests and responses
export type { CreateAgentRequest, ListAgentsOptions, UpdateAgentRequest } from './AgentService'
export type {
CreateSessionMessageRequest,
ListSessionMessagesOptions,
UpdateSessionMessageRequest
} from './SessionMessageService'
export type { CreateSessionRequest, ListSessionsOptions, UpdateSessionRequest } from './SessionService'

View File

@ -1,9 +1,9 @@
import { AgentConfiguration } from '@renderer/types'
import { AgentBase } from '@renderer/types'
// base agent config. no default config for now.
const DEFAULT_AGENT_CONFIG: Omit<AgentConfiguration, 'model'> = {} as const
const DEFAULT_AGENT_CONFIG: Omit<AgentBase, 'model'> = {} as const
// no default config for now.
export const DEFAULT_CLAUDE_CODE_CONFIG: Omit<AgentConfiguration, 'model'> = {
export const DEFAULT_CLAUDE_CODE_CONFIG: Omit<AgentBase, 'model'> = {
...DEFAULT_AGENT_CONFIG
} as const

View File

@ -7,8 +7,7 @@ export const useUpdateAgent = () => {
// TODO: use api
return useMutation({
// @ts-expect-error not-implemented
mutationFn: async ({ id, ...payload }: Partial<AgentEntity> & { id: string }) => {},
mutationFn: async ({}: Partial<AgentEntity> & { id: string }) => {},
onSuccess: (updated: AgentEntity) => {
qc.setQueryData<AgentEntity[]>(['todos'], (old) =>
old ? old.map((t) => (t.id === updated.id ? updated : t)) : []

View File

@ -2,10 +2,9 @@
* Database entity types for Agent, Session, and SessionMessage
* Shared between main and renderer processes
*/
import { TextStreamPart } from 'ai'
export type SessionStatus = 'idle' | 'running' | 'completed' | 'failed' | 'stopped'
import { TextStreamPart, UIMessageChunk, ModelMessage } from 'ai'
export type PermissionMode = 'default' | 'acceptEdits' | 'bypassPermissions' | 'plan'
export type SessionMessageRole = 'assistant' | 'user' | 'system' | 'tool'
export type SessionMessageRole = ModelMessage['role']
export type AgentType = 'claude-code'
export const isAgentType = (type: string): type is AgentType => {
@ -17,59 +16,101 @@ export type SessionMessageType = TextStreamPart<Record<string, any>>['type']
export interface Tool {
id: string
name: string
description: string
requirePermissions: boolean
description?: string
requirePermissions?: boolean
}
export interface AgentConfiguration extends Record<string, any> {
permission_mode: PermissionMode // Permission mode, default to 'default'
max_turns: number // Maximum number of interaction turns, default to 10
}
// Shared configuration interface for both agents and sessions
export interface AgentConfiguration {
export interface AgentBase {
// Basic info
name?: string
description?: string
accessible_paths: string[] // Array of directory paths the agent can access
// Instructions for the agent
instructions?: string // System prompt
// Models
model: string // Main Model ID (required)
plan_model?: string // Optional plan/thinking model ID
small_model?: string // Optional small/fast model ID
built_in_tools?: Tool[] // Array of built-in tool IDs
// Tools
mcps?: string[] // Array of MCP tool IDs
knowledges?: string[] // Array of enabled knowledge base IDs
configuration?: Record<string, any> // Extensible settings like temperature, top_p
accessible_paths: string[] // Array of directory paths the agent can access
permission_mode: PermissionMode // Permission mode
max_steps: number // Maximum number of steps the agent can take
allowed_tools?: string[] // Array of allowed tool IDs (whitelist)
// Configuration
configuration?: AgentConfiguration // Extensible settings like temperature, top_p, etc.
}
// Agent entity representing an autonomous agent configuration
export interface AgentEntity extends AgentConfiguration {
export interface AgentEntity extends AgentBase {
id: string
type: AgentType
name: string
description?: string
avatar?: string
instructions?: string // System prompt
created_at: string
updated_at: string
}
export interface CreateAgentRequest extends AgentBase {
type: AgentType
}
export interface UpdateAgentRequest extends Partial<AgentBase> {}
export interface GetAgentResponse extends AgentEntity {
built_in_tools?: Tool[] // Built-in tools available to the agent
}
export interface ListOptions {
limit?: number
offset?: number
}
// AgentSession entity representing a conversation session with one or more agents
export interface AgentSessionEntity extends AgentConfiguration {
export interface AgentSessionEntity extends AgentBase {
id: string
name?: string // Session name
main_agent_id: string // Primary agent ID for the session
sub_agent_ids?: string[] // Array of sub-agent IDs involved in the session
user_goal?: string // Initial user goal for the session
status: SessionStatus
external_session_id?: string // Agent session for external agent management/tracking
messages?: SessionMessageEntity[] // Hierarchical session messages
agent_id: string // Primary agent ID for the session
// sub_agent_ids?: string[] // Array of sub-agent IDs involved in the session
created_at: string
updated_at: string
}
// SessionMessage entity for tracking all agent activities
export interface SessionMessageEntity {
export interface CreateSessionRequest extends AgentBase {
agent_id: string // Primary agent ID for the session
}
export interface UpdateSessionRequest extends Partial<AgentBase> {}
export interface GetAgentSessionResponse extends AgentSessionEntity {
built_in_tools?: Tool[] // Built-in tools available to the agent
messages: AgentSessionMessageEntity[] // Messages in the session
}
// AgentSessionMessageEntity representing a message within a session
export interface AgentSessionMessageEntity {
id: number // Auto-increment primary key
session_id: string // Reference to session
parent_id?: number // For tree structure (e.g., tool calls under an action)
role: SessionMessageRole // 'user', 'agent', 'system', 'tool'
type: SessionMessageType // Type of log entry
content: string | Record<string, any> // JSON structured data
role: ModelMessage['role'] // 'assistant' | 'user' | 'system' | 'tool'
content: ModelMessage
metadata?: Record<string, any> // Additional metadata (optional)
created_at: string // ISO timestamp
updated_at: string // ISO timestamp
}
export interface CreateSessionMessageRequest {
content: string
}
// Structured content for session messages that preserves both AI SDK and raw data
export interface SessionMessageContent {
chunk: UIMessageChunk[] // UI-friendly AI SDK chunks for rendering
raw: any[] // Original agent-specific messages for data integrity (agent-agnostic)
agentResult?: any // Complete result from the underlying agent service
agentType: string // The type of agent that generated this message (e.g., 'claude-code', 'openai', etc.)
}