Comprehensive guide to understanding and extending Z.E.N..
Navigation: Overview | Platforms | providers | Isolation | Commands | Streaming | Database
System Overview
Z.E.N. is a workflow engine. Workflows are short YAML files; nodes are prompts, bash commands, or sub-agent calls; the runtime fires them in dependency order against pluggable providers (Claude, Codex, Pi) and pluggable surfaces (Web UI, Telegram, Slack, GitHub). The same engine powers the CLI, the daemon, and the dispatch path from external agents like Claude Code, OpenClaw, and Hermes.
Core Architecture
┌─────────────────────────────────────────────┐
│ Platform Adapters (Web UI, Telegram, │
│ GitHub, Slack, CLI) │
│ • IPlatformAdapter interface │
│ • Web: SSE streaming + REST API │
│ • Others: Platform-specific messaging │
└──────────────────┬──────────────────────────┘
│
▼
┌─────────────────────────────────────────────┐
│ Orchestrator │
│ • Route slash commands → Command Handler │
│ • Route AI queries → Assistant Clients │
│ • Manage session lifecycle │
│ • Stream responses back to platforms │
│ • Emit workflow events to Web UI │
└──────────────┬──────────────────────────────┘
│
┌───────┼────────┐
│ │ │
▼ ▼ ▼
┌───────────┐ ┌───────────────┐ ┌───────────────────┐
│ Command │ │ provider │ │ Isolation │
│ Handler │ │ Clients │ │ Providers │
│ │ │ │ │ │
│ (Slash │ │ IAssistant- │ │ IIsolationProvider│
│ commands) │ │ Client │ │ (worktree, etc.) │
└─────┬─────┘ └───────┬───────┘ └─────────┬─────────┘
│ │ │
└───────────────┼───────────────────┘
▼
┌─────────────────────────────────────────────┐
│ SQLite (default) / PostgreSQL (7 Tables) │
│ • Codebases • Conversations • Sessions │
│ • Isolation Envs • Workflow Runs │
│ • Workflow Events • Messages │
└─────────────────────────────────────────────┘Key Design Principles
- Interface-driven: Both platform adapters and AI clients implement strict interfaces for swappability
- Streaming-first: All AI responses stream through async generators for real-time delivery
- Session persistence: AI sessions survive container restarts via database storage
- Generic commands: Users define commands in Git-versioned markdown files, not hardcoded
- Platform-specific streaming: Each platform controls whether to stream or batch responses
Adding Platform Adapters
Platform adapters connect messaging platforms to the orchestrator. Implement the IPlatformAdapter interface to add new platforms.
IPlatformAdapter Interface
Location: packages/core/src/types/index.ts
export interface IPlatformAdapter {
// Send a message to the platform (optional metadata for message type hints)
sendMessage(conversationId: string, message: string, metadata?: MessageMetadata): Promise<void>;
// Ensure responses go to a thread, creating one if needed
// Returns the thread's conversation ID (may be same as original)
ensureThread(originalConversationId: string, messageContext?: unknown): Promise<string>;
// Get the configured streaming mode
getStreamingMode(): 'stream' | 'batch';
// Get the platform type identifier
getPlatformType(): string;
// Start the platform adapter (e.g., begin polling, start webhook server)
start(): Promise<void>;
// Stop the platform adapter gracefully
stop(): void;
// Optional: Send a structured event (e.g., Web UI rich data)
sendStructuredEvent?(conversationId: string, event: MessageChunk): Promise<void>;
// Optional: Retract previously streamed text (workflow routing intercept)
emitRetract?(conversationId: string): Promise<void>;
}Implementation Guide
1. Create adapter file: packages/adapters/src/chat/your-platform/adapter.ts (or forge/ depending on category)
2. Implement the interface:
import type { IPlatformAdapter } from '@zen/core';
export class YourPlatformAdapter implements IPlatformAdapter {
private streamingMode: 'stream' | 'batch';
constructor(config: YourPlatformConfig, mode: 'stream' | 'batch' = 'stream') {
this.streamingMode = mode;
// Initialize your platform SDK/client
}
async sendMessage(conversationId: string, message: string): Promise<void> {
// Platform-specific message sending logic
// Handle message length limits, formatting, etc.
}
getStreamingMode(): 'stream' | 'batch' {
return this.streamingMode;
}
getPlatformType(): string {
return 'your-platform'; // Used as platform_type in database
}
async start(): Promise<void> {
// Start polling, webhook server, WebSocket connection, etc.
// Example: this.client.startPolling();
}
stop(): void {
// Cleanup: stop polling, close connections
}
}3. Register in main app: packages/server/src/index.ts
import { YourPlatformAdapter } from './adapters/your-platform';
// Read environment variables
const yourPlatformToken = process.env.YOUR_PLATFORM_TOKEN;
const yourPlatformMode = (process.env.YOUR_PLATFORM_STREAMING_MODE || 'stream') as
| 'stream'
| 'batch';
if (yourPlatformToken) {
const adapter = new YourPlatformAdapter(yourPlatformToken, yourPlatformMode);
// Set up message handler
adapter.onMessage(async (conversationId, message) => {
await handleMessage(adapter, conversationId, message);
});
await adapter.start();
log.info({ platform: 'your-platform' }, 'adapter_started');
}4. Add environment variables: .env.example
# Your Platform
YOUR_PLATFORM_TOKEN=<token>
YOUR_PLATFORM_STREAMING_MODE=stream # stream | batchPlatform-Specific Considerations
Conversation ID Format
Each platform must provide a unique, stable conversation ID:
- Web UI: User-provided string or auto-generated UUID
- Telegram:
chat_id(e.g.,"123456789") - GitHub:
owner/repo#issue_number(e.g.,"user/repo#42") - Slack:
thread_tsorchannel_id+thread_ts - CLI:
cli-{timestamp}-{random}(e.g.,"cli-1737400000-abc123")
Message Length Limits
Handle platform-specific message limits in sendMessage():
async sendMessage(conversationId: string, message: string): Promise<void> {
const MAX_LENGTH = 4096; // Telegram's limit
if (message.length <= MAX_LENGTH) {
await this.client.sendMessage(conversationId, message);
} else {
// Split long messages intelligently (by lines, paragraphs, etc.)
const chunks = splitMessage(message, MAX_LENGTH);
for (const chunk of chunks) {
await this.client.sendMessage(conversationId, chunk);
}
}
}Reference: packages/adapters/src/chat/telegram/adapter.ts
Server-Sent Events (SSE)
SSE (Web UI pattern):
// Web adapter maintains SSE connections per conversation
registerStream(conversationId: string, stream: SSEWriter): void {
this.streams.set(conversationId, stream);
}
async sendMessage(conversationId: string, message: string): Promise<void> {
const stream = this.streams.get(conversationId);
if (stream && !stream.closed) {
await stream.writeSSE({ data: JSON.stringify({ type: 'text', content: message }) });
} else {
// Buffer messages if client disconnected (reconnection recovery)
this.messageBuffer.set(conversationId, [
...(this.messageBuffer.get(conversationId) ?? []),
message,
]);
}
}
// Structured events for tool calls, workflow progress, errors
async sendStructuredEvent(conversationId: string, event: MessageChunk): Promise<void> {
await this.emitSSE(conversationId, JSON.stringify(event));
}Benefits:
- Real-time streaming without polling overhead
- Automatic reconnection handling in browser
- Message buffering during disconnections
- Structured events (tool calls, workflow progress, lock state)
Reference: packages/server/src/integrations/web
Polling vs Webhooks
Polling (Telegram pattern):
async start(): Promise<void> {
this.bot.on('message', async (ctx) => {
const conversationId = this.getConversationId(ctx);
const message = ctx.message.text;
await this.onMessageHandler(conversationId, message);
});
await this.bot.launch({ dropPendingUpdates: true });
}Webhooks (GitHub pattern):
// In packages/server/src/index.ts, add route
app.post('/webhooks/your-platform', async (req, res) => {
const signature = req.headers['x-signature'];
const payload = req.body;
await adapter.handleWebhook(payload, signature);
res.sendStatus(200);
});
// In adapter
async handleWebhook(payload: any, signature: string): Promise<void> {
// Verify signature
if (!this.verifySignature(payload, signature)) return;
// Parse event, extract conversationId and message
const { conversationId, message } = this.parseEvent(payload);
// Route to orchestrator
await handleMessage(this, conversationId, message);
}Reference: packages/adapters/src/forge/github/adapter.ts
Adding provider Clients
provider clients wrap AI SDKs and provide a unified streaming interface. Implement the IAssistantClient interface to add new assistants.
IAssistantClient Interface
Location: packages/core/src/types/index.ts
export interface IAssistantClient {
// Send a query and get streaming response
sendQuery(prompt: string, cwd: string, resumeSessionId?: string): AsyncGenerator<MessageChunk>;
// Get the assistant type identifier
getType(): string;
}MessageChunk Types
interface MessageChunk {
type: 'assistant' | 'result' | 'system' | 'tool' | 'thinking';
content?: string; // Text content for assistant/system/thinking
sessionId?: string; // Session ID for result type
toolName?: string; // Tool name for tool type
toolInput?: Record<string, unknown>; // Tool parameters
}Implementation Guide
1. Create client file: packages/core/src/clients/your-assistant.ts
2. Implement the interface:
import { IAssistantClient, MessageChunk } from '../types';
export class YourAssistantClient implements IAssistantClient {
async *sendQuery(
prompt: string,
cwd: string,
resumeSessionId?: string
): AsyncGenerator<MessageChunk> {
// Initialize or resume session
let session;
if (resumeSessionId) {
log.info({ sessionId: resumeSessionId }, 'session_resumed');
session = await this.resumeSession(resumeSessionId);
} else {
log.info({ cwd }, 'session_started');
session = await this.startSession(cwd);
}
// Send query to AI and stream responses
for await (const event of this.sdk.streamQuery(session, prompt)) {
// Map SDK events to MessageChunk types
if (event.type === 'text_response') {
yield { type: 'assistant', content: event.text };
} else if (event.type === 'tool_call') {
yield {
type: 'tool',
toolName: event.tool,
toolInput: event.parameters,
};
} else if (event.type === 'thinking') {
yield { type: 'thinking', content: event.reasoning };
}
}
// Yield session ID for persistence
yield { type: 'result', sessionId: session.id };
}
getType(): string {
return 'your-assistant';
}
}3. Register in factory: packages/core/src/clients/factory.ts
import { YourAssistantClient } from './your-assistant';
export function getAssistantClient(type: string): IAssistantClient {
switch (type) {
case 'claude':
return new ClaudeClient();
case 'codex':
return new CodexClient();
case 'your-assistant':
return new YourAssistantClient();
default:
throw new Error(`Unknown assistant type: ${type}`);
}
}4. Add environment variables: .env.example
# Your Assistant
YOUR_ASSISTANT_API_KEY=<key>
YOUR_ASSISTANT_MODEL=<model-name>Session Management
Key concepts:
- Immutable sessions: Sessions are never modified; transitions create new linked sessions
- Audit trail: Each session stores
parent_session_id(previous session) andtransition_reason(why created) - State machine: Explicit
TransitionTriggertypes define all transition reasons - Session ID persistence: Store
assistant_session_idin database to resume context
Transition triggers (packages/core/src/state/session-transitions.ts):
first-message- No existing sessionplan-to-execute- Plan phase completed, starting execution (creates new session immediately)isolation-changed,codebase-changed,reset-requested, etc. - Deactivate current session
Orchestrator logic (packages/core/src/orchestrator/orchestrator.ts):
// Detect plan-to-execute transition
const trigger = detectPlanToExecuteTransition(commandName, session?.metadata?.lastCommand);
if (trigger && shouldCreateNewSession(trigger)) {
// Transition to new session (links to previous via parent_session_id)
session = await sessionDb.transitionSession(conversationId, trigger, {...});
} else if (!session) {
// No session exists - create one
session = await sessionDb.transitionSession(conversationId, 'first-message', {...});
} else {
// Resume existing session
log.info({ sessionId: session.id }, 'session_resumed');
}Streaming Event Mapping
Different SDKs use different event types. Map them to MessageChunk types:
Claude Code SDK (packages/core/src/clients/claude.ts):
for await (const msg of query({ prompt, options })) {
if (msg.type === 'assistant') {
for (const block of msg.message.content) {
if (block.type === 'text') {
yield { type: 'assistant', content: block.text };
} else if (block.type === 'tool_use') {
yield {
type: 'tool',
toolName: block.name,
toolInput: block.input,
};
}
}
} else if (msg.type === 'result') {
yield { type: 'result', sessionId: msg.session_id };
}
}Codex SDK (packages/core/src/clients/codex.ts):
for await (const event of result.events) {
if (event.type === 'item.completed') {
switch (event.item.type) {
case 'agent_message':
yield { type: 'assistant', content: event.item.text };
break;
case 'command_execution':
yield { type: 'tool', toolName: event.item.command };
break;
case 'reasoning':
yield { type: 'thinking', content: event.item.text };
break;
}
} else if (event.type === 'turn.completed') {
yield { type: 'result', sessionId: thread.id };
break; // CRITICAL: Exit loop on turn completion
}
}Error Handling
Wrap SDK calls in try-catch:
try {
for await (const event of this.sdk.streamQuery(...)) {
yield mapEventToChunk(event);
}
} catch (error) {
log.error({ err: error }, 'query_failed');
throw new Error(`Query failed: ${error.message}`);
}Handle SDK-specific errors:
if (event.type === 'error') {
// Log but don't crash - some errors are non-fatal
log.error({ message: event.message }, 'stream_error');
// Only yield user-facing errors
if (!event.message.includes('internal')) {
yield { type: 'system', content: `Warning: ${event.message}` };
}
}Isolation Providers
Isolation providers create isolated working environments (worktrees, containers, VMs) for concurrent workflows. The default implementation uses git worktrees when the workflow opts in via worktree.enabled: true; non-git workflows run in a plain sandbox under ~/.zen/workspaces/.
IIsolationProvider Interface
Location: packages/isolation/src/types.ts
export interface IIsolationProvider {
readonly providerType: string;
create(request: IsolationRequest): Promise<IsolatedEnvironment>;
destroy(envId: string, options?: DestroyOptions | WorktreeDestroyOptions): Promise<DestroyResult>;
get(envId: string): Promise<IsolatedEnvironment | null>;
list(codebaseId: string): Promise<IsolatedEnvironment[]>;
adopt?(path: string): Promise<IsolatedEnvironment | null>;
healthCheck(envId: string): Promise<boolean>;
}Request & Response Types
interface IsolationRequest {
codebaseId: string;
canonicalRepoPath: string; // Main repo path, never a worktree
workflowType: 'issue' | 'pr' | 'review' | 'thread' | 'task';
identifier: string; // "42", "feature-auth", etc.
prBranch?: string; // PR branch name (for adoption and same-repo PRs)
prSha?: string; // For reproducible PR reviews
isForkPR?: boolean; // True if PR is from a fork
}
interface IsolatedEnvironment {
id: string; // Worktree path (for worktree provider)
provider: 'worktree' | 'container' | 'vm' | 'remote';
workingPath: string; // Where AI should work
branchName?: string;
status: 'active' | 'suspended' | 'destroyed';
createdAt: Date;
metadata: Record<string, unknown>;
}
interface DestroyResult {
worktreeRemoved: boolean; // Primary operation succeeded
branchDeleted: boolean; // Branch cleanup succeeded (true if no branch requested)
directoryClean: boolean; // No orphan files remain
warnings: string[]; // Non-fatal issues during cleanup
}WorktreeProvider Implementation
Location: packages/isolation/src/providers/worktree.ts
export class WorktreeProvider implements IIsolationProvider {
readonly providerType = 'worktree';
async create(request: IsolationRequest): Promise<IsolatedEnvironment> {
// 1. Check for existing worktree (adoption)
// 2. Generate branch name from workflowType + identifier
// 3. Create git worktree at computed path
// 4. Return IsolatedEnvironment
}
async destroy(envId: string, options?: WorktreeDestroyOptions): Promise<DestroyResult> {
// git worktree remove <path> [--force]
// git branch -D <branchName> (if provided, tracked via result)
// Returns DestroyResult with warnings for partial failures
}
}Branch Naming Convention
| Workflow | Identifier | Generated Branch |
|---|---|---|
| issue | "42" | issue-42 |
| pr (same-repo) | "123" | feature/auth (actual branch) |
| pr (fork) | "123" | pr-123-review |
| task | "my-feature" | task-my-feature |
| thread | "C123:ts.123" | thread-a1b2c3d4 (8-char hash) |
Storage Location
PRIMARY: ~/.zen/workspaces/<owner>/<repo>/worktrees/<branch>/
FALLBACK: ~/.zen/worktrees/<owner>/<repo>/<branch>/ (for repos not registered under workspaces/)
DOCKER: /.zen/workspaces/<owner>/<repo>/worktrees/<branch>/Path resolution:
- Project registered under
workspaces/? ->~/.zen/workspaces/<owner>/<repo>/worktrees/<branch>/ - Fallback ->
~/.zen/worktrees/<owner>/<repo>/<branch>/ - Docker detected? ->
/.zen/prefix instead of~/.zen/
Usage Pattern
GitHub adapter (packages/adapters/src/forge/github/adapter.ts):
const provider = getIsolationProvider();
// On @bot mention
const env = await provider.create({
codebaseId: codebase.id,
canonicalRepoPath: repoPath,
workflowType: isPR ? 'pr' : 'issue',
identifier: String(number),
prBranch: prHeadBranch,
prSha: prHeadSha,
});
// Update conversation
await db.updateConversation(conv.id, {
cwd: env.workingPath,
isolation_env_id: env.id,
isolation_provider: env.provider,
});
// On issue/PR close
await provider.destroy(isolationEnvId);Command handler (/worktree create):
const provider = getIsolationProvider();
const env = await provider.create({
workflowType: 'task',
identifier: branchName,
// ...
});Worktree Adoption
The provider adopts existing worktrees before creating new ones:
- Path match: If worktree exists at expected path -> adopt
- Branch match: If PR's branch has existing worktree -> adopt (skill symbiosis)
// Inside create()
const existing = await this.findExisting(request, branchName, worktreePath);
if (existing) {
return existing; // metadata.adopted = true
}
// ... else create newDatabase Fields
remote_agent_conversations
└── isolation_env_id ; Provider-assigned ID (worktree path)
remote_agent_isolation_environments
├── id ; Unique environment ID
├── codebase_id ; Link to codebases table
├── working_path ; Filesystem path to worktree
├── branch_name ; Git branch name
├── status ; 'active' | 'destroyed'
└── ...Lookup pattern:
const envId = conversation.isolation_env_id;Adding a New Isolation Provider
1. Create provider: packages/isolation/src/providers/your-provider.ts
export class ContainerProvider implements IIsolationProvider {
readonly providerType = 'container';
async create(request: IsolationRequest): Promise<IsolatedEnvironment> {
// Spin up Docker container with repo mounted
const containerId = await docker.createContainer({...});
return {
id: containerId,
provider: 'container',
workingPath: '/workspace',
status: 'active',
createdAt: new Date(),
metadata: { request },
};
}
async destroy(envId: string): Promise<void> {
await docker.removeContainer(envId);
}
}2. Register in factory: packages/isolation/src/factory.ts
export function getIsolationProvider(type?: string): IIsolationProvider {
switch (type) {
case 'container':
return new ContainerProvider();
default:
return new WorktreeProvider();
}
}See also: The isolation architecture is documented in .claude/rules/isolation-patterns.md for design patterns and safety rules.
Command System
The command system allows users to define custom workflows in Git-versioned markdown files.
Architecture
User: "Plan adding dark mode to project X"
|
Orchestrator: Route to workflow via AI router
|
Read command file: .zen/commands/plan.md
|
Variable substitution: $ARGUMENTS -> "Add dark mode"
|
Send to AI client: Injected prompt
|
Stream responses back to platformCommand Storage
Database schema (JSONB in remote_agent_codebases table):
{
"prime": {
"path": ".zen/commands/prime.md",
"description": "Research codebase"
},
"plan": {
"path": ".zen/commands/plan-feature.md",
"description": "Create implementation plan"
}
}File-based: Commands are markdown files in the repository, not stored in database. Only paths and metadata are stored.
Command Registration
Manual registration (/command-set):
/command-set analyze .zen/commands/analyze.mdBulk loading (/load-commands):
/load-commands .zen/commands
# Loads all .md files: prime.md -> prime, plan.md -> planAuto-detection (on /clone or GitHub webhook):
// Get command folders from config
const searchPaths = getCommandFolderSearchPaths(config?.commands?.folder);
// Returns: ['.zen/commands'] + configuredFolder if specified
for (const folder of searchPaths) {
if (await folderExists(join(repoPath, folder))) {
await autoLoadCommands(folder, codebaseId);
}
}This registers repo-specific commands. Default commands are loaded at runtime from the app's bundled defaults, not copied to repos.
Reference: packages/paths/src/zen-paths.ts (@zen/paths)
Variable Substitution
Supported variables:
$1,$2,$3, ... - Positional arguments$ARGUMENTS- All arguments as single string\$- Escaped dollar sign (literal$)
Implementation (packages/core/src/utils/variable-substitution.ts):
export function substituteVariables(
text: string,
args: string[],
metadata: Record<string, unknown> = {}
): string {
let result = text;
// Replace $1, $2, $3, etc.
args.forEach((arg, index) => {
result = result.replace(new RegExp(`\\$${index + 1}`, 'g'), arg);
});
// Replace $ARGUMENTS
result = result.replace(/\$ARGUMENTS/g, args.join(' '));
// Replace escaped dollar signs
result = result.replace(/\\\$/g, '$');
return result;
}Example:
<!-- .zen/commands/analyze.md -->
Analyze the following aspect of the codebase: $1
Focus on: $ARGUMENTS
Provide recommendations for improvement.User asks: "Analyze the security of authentication and authorization"
# Orchestrator routes to the `analyze` command
# Variable substitution produces:
# Analyze the following aspect of the codebase: security
# Focus on: security authentication authorization
# Provide recommendations for improvement.Slash Command Routing
Orchestrator logic (packages/core/src/orchestrator/):
All messages starting with / are routed to the Command Handler first. If the command is recognized (deterministic), it is handled directly. Non-slash messages go through the AI router, which discovers available workflows and commands, then routes the user's request to the appropriate one.
Command categories:
Deterministic (handled by Command Handler):
/help,/status,/getcwd,/setcwd/clone,/repos,/repo,/repo-remove/command-set,/load-commands,/commands/worktree,/workflow/reset,/reset-context,/init
AI-routed (handled by Orchestrator):
- Natural language messages are routed to workflows and commands via AI
Command Handler Implementation
Reference: packages/core/src/handlers/command-handler.ts
The handler is split into focused functions per command group:
handleCommand(); Top-level dispatcher (switch on command name)handleRepoCommand();/repo(switch repos, pull, auto-load commands)handleRepoRemoveCommand();/repo-remove(delete repo + codebase record)handleWorktreeCommand();/worktreesubcommands (create, list, remove, cleanup, orphans)handleWorkflowCommand();/workflowsubcommands (list, reload, run, status, cancel, resume, abandon, approve, reject). The status/resume/abandon/approve/reject cases delegate to shared operations inpackages/core/src/operations/workflow-operations.tsresolveRepoArg(); Shared helper for repo lookup by number or name
Important: modified: true flag on CommandResult signals orchestrator to reload conversation state.
Streaming Modes
Streaming modes control how AI responses are delivered to users: real-time (stream) or accumulated (batch).
Configuration
Environment variables (per-platform):
TELEGRAM_STREAMING_MODE=stream # Default: stream (real-time chat)
SLACK_STREAMING_MODE=batch # Default: batchMode Comparison
| Mode | Behavior | Pros | Cons | Best For |
|---|---|---|---|---|
| stream | Send each chunk immediately as AI generates | Real-time feedback, engaging, see progress | Many API calls, potential rate limits | Chat platforms (Telegram, Slack) |
| batch | Accumulate all chunks, send final summary | Single message, no spam, clean | No progress indication, longer wait | Issue trackers (GitHub, Jira) |
Implementation
Orchestrator logic (packages/core/src/orchestrator/orchestrator.ts):
const mode = platform.getStreamingMode();
if (mode === 'stream') {
// Send each chunk immediately
for await (const msg of aiClient.sendQuery(...)) {
if (msg.type === 'assistant' && msg.content) {
await platform.sendMessage(conversationId, msg.content);
} else if (msg.type === 'tool' && msg.toolName) {
const toolMessage = formatToolCall(msg.toolName, msg.toolInput);
await platform.sendMessage(conversationId, toolMessage);
}
}
} else {
// Batch: Accumulate all chunks
const assistantMessages: string[] = [];
for await (const msg of aiClient.sendQuery(...)) {
if (msg.type === 'assistant' && msg.content) {
assistantMessages.push(msg.content);
}
// Tool calls logged but not sent to user
}
// Extract clean summary (filter out tool indicators)
const finalMessage = extractCleanSummary(assistantMessages);
await platform.sendMessage(conversationId, finalMessage);
}Tool Call Formatting
Stream mode: Display tool calls in real-time
BASH
git status
READ
Reading: src/index.ts
EDIT
Editing: src/components/Header.tsxBatch mode: Filter out tool indicators from final response
Reference: packages/core/src/orchestrator/orchestrator.ts
Tool Formatter Utility
Location: packages/core/src/utils/tool-formatter.ts
export function formatToolCall(toolName: string, toolInput?: Record<string, unknown>): string {
let message = `${toolName.toUpperCase()}`;
// Add context-specific info
if (toolName === 'Bash' && toolInput?.command) {
message += `\n${toolInput.command}`;
} else if (toolName === 'Read' && toolInput?.file_path) {
message += `\nReading: ${toolInput.file_path}`;
} else if (toolName === 'Edit' && toolInput?.file_path) {
message += `\nEditing: ${toolInput.file_path}`;
}
return message;
}Database Schema
Z.E.N. uses a 7-table schema with remote_agent_ prefix. SQLite is the default (zero setup); PostgreSQL is optional for cloud/advanced deployments.
Schema Overview
remote_agent_codebases
├── id (UUID)
├── name (VARCHAR)
├── repository_url (VARCHAR)
├── default_cwd (VARCHAR)
├── ai_assistant_type (VARCHAR); 'claude' | 'codex'
└── commands (JSONB); {command_name: {path, description}}
remote_agent_conversations
├── id (UUID)
├── platform_type (VARCHAR); 'web' | 'telegram' | 'github' | 'slack'
├── platform_conversation_id (VARCHAR); Platform-specific ID
├── codebase_id (UUID -> remote_agent_codebases.id)
├── cwd (VARCHAR); Current working directory
├── ai_assistant_type (VARCHAR); LOCKED at creation
├── title (VARCHAR); User-friendly conversation title (Web UI)
├── deleted_at (TIMESTAMP); Soft-delete support
└── UNIQUE(platform_type, platform_conversation_id)
remote_agent_sessions
├── id (UUID)
├── conversation_id (UUID -> remote_agent_conversations.id)
├── codebase_id (UUID -> remote_agent_codebases.id)
├── ai_assistant_type (VARCHAR); Must match conversation
├── assistant_session_id (VARCHAR); SDK session ID for resume
├── active (BOOLEAN); Only one active per conversation
├── parent_session_id (UUID -> remote_agent_sessions.id)
├── transition_reason (TEXT); Why this session was created (TransitionTrigger)
└── metadata (JSONB); {lastCommand: "plan-feature", ...}
remote_agent_isolation_environments
├── id (UUID)
├── codebase_id (UUID -> remote_agent_codebases.id)
├── workflow_type (VARCHAR)
├── workflow_id (VARCHAR)
├── working_path (VARCHAR)
├── branch_name (VARCHAR)
├── status (VARCHAR); 'active' | 'destroyed'
└── metadata (JSONB)
remote_agent_workflow_runs
├── id (UUID)
├── conversation_id (UUID -> remote_agent_conversations.id)
├── codebase_id (UUID -> remote_agent_codebases.id)
├── workflow_name (VARCHAR)
├── status (VARCHAR); 'pending' | 'running' | 'completed' | 'failed' | 'cancelled'
├── parent_conversation_id (UUID); Parent chat that dispatched this run
└── metadata (JSONB)
remote_agent_workflow_events
├── id (UUID)
├── workflow_run_id (UUID -> remote_agent_workflow_runs.id)
├── event_type (VARCHAR); see WorkflowEventType
├── step_index (INTEGER)
├── step_name (VARCHAR)
├── data (JSONB); Event-specific data
└── created_at (TIMESTAMP)
remote_agent_messages
├── id (UUID)
├── conversation_id (UUID -> remote_agent_conversations.id)
├── role (VARCHAR); 'user' | 'assistant'
├── content (TEXT)
├── metadata (JSONB); {toolCalls: [{name, input, duration}], ...}
└── created_at (TIMESTAMP)Database Operations
Location: packages/core/src/db/
Codebases (packages/core/src/db/codebases.ts):
createCodebase()- Create codebase recordgetCodebase(id)- Get by IDfindCodebaseByRepoUrl(url)- Find by repository URLregisterCommand(id, name, def)- Add single commandupdateCodebaseCommands(id, commands)- Bulk update commandsgetCodebaseCommands(id)- Get all commands
Conversations (packages/core/src/db/conversations.ts):
getOrCreateConversation(platform, id)- Idempotent get/createupdateConversation(id, data)- Update fields (throws if conversation not found)
Sessions (packages/core/src/db/sessions.ts):
createSession(data)- Create new session (supportsparent_session_idandtransition_reason)transitionSession(conversationId, reason, data)- Create new session linked to previous (immutable sessions)getActiveSession(conversationId)- Get active session for conversationgetSessionHistory(conversationId)- Get all sessions for conversation (audit trail)getSessionChain(sessionId)- Walk session chain back to rootupdateSession(id, sessionId)- Updateassistant_session_idupdateSessionMetadata(id, metadata)- Update metadata JSONBdeactivateSession(id)- Mark session inactive
Error Handling:
All UPDATE operations verify rowCount and throw errors if no rows were affected. This prevents silent failures when attempting to update non-existent records.
// Example: updateConversation throws if conversation not found
await updateConversation(id, { codebase_id: '...' });
// Throws: "updateConversation: Conversation not found for id=..."Session Lifecycle
Normal flow:
1. User sends message
-> getOrCreateConversation()
-> getActiveSession() // null if first message
2. No session exists
-> transitionSession(conversationId, 'first-message', {...})
-> New session created with transition_reason='first-message'
3. Send to AI, get session ID
-> updateSession(session.id, aiSessionId)
4. User sends another message
-> getActiveSession() // returns existing
-> Resume with assistant_session_id
5. User sends /reset
-> deactivateSession(session.id) // Sets ended_at timestamp
-> Next message creates new session via transitionSession()Plan-to-Execute transition (immutable sessions):
1. User: "Plan adding dark mode" -> routed to plan-feature workflow
-> transitionSession() or resumeSession()
-> updateSessionMetadata({ lastCommand: 'plan-feature' })
2. User: "Execute the plan" -> routed to execute workflow
-> detectPlanToExecuteTransition() // Returns 'plan-to-execute' trigger
-> transitionSession(conversationId, 'plan-to-execute', {...})
-> New session created, parent_session_id points to planning session
-> Fresh context for implementation with full audit trailReference: packages/core/src/orchestrator/orchestrator.ts, packages/core/src/state/session-transitions.ts
Message Flow Examples
Telegram Chat Flow
User types: /clone https://github.com/user/repo
|
TelegramAdapter receives update
|
Extract conversationId = chat_id
|
Orchestrator.handleMessage(adapter, chatId, "/clone ...")
|
Command Handler: /clone
- Execute git clone
- Create codebase record
- Update conversation.codebase_id
- Detect .zen/commands/
|
Send response: "Repository cloned! Found: .zen/commands/"User types: "Prime the codebase"
|
Orchestrator: Route via AI router
|
Load command file: .zen/commands/prime.md
|
Variable substitution (no args in this case)
|
Get or create session
|
ClaudeClient.sendQuery(prompt, cwd, sessionId)
|
Stream mode: Send each chunk immediately
|
Save session ID for next messageGitHub Webhook Flow
User comments: @Z.E.N. prime the codebase
|
GitHub sends webhook to POST /webhooks/github
|
GitHubAdapter.handleWebhook(payload, signature)
- Verify HMAC signature
- Parse event: issue_comment.created
- Extract: owner/repo#42, comment text
- Check for @Z.E.N. mention
|
First mention on this issue?
- Yes -> Clone repo, create codebase, detect and register commands
- No -> Use existing codebase
|
Strip @Z.E.N. from comment
|
Orchestrator.handleMessage(adapter, "user/repo#42", "prime the codebase")
|
Load command file, substitute variables
|
Get or create session
|
CodexClient.sendQuery(prompt, cwd, sessionId)
|
Batch mode: Accumulate all chunks
|
Extract clean summary (filter tool indicators)
|
Post single comment on issue with summaryExtension Checklist
Adding a New Platform Adapter
- [ ] Create
packages/adapters/src/chat/your-platform/adapter.ts - [ ] Implement
IPlatformAdapterinterface - [ ] Handle message length limits in
sendMessage() - [ ] Implement conversation ID extraction
- [ ] Set up polling or webhook handling
- [ ] Add to
packages/server/src/index.tswith environment variable check - [ ] Add environment variables to
.env.example - [ ] Test with both stream and batch modes
Adding a New provider Client
- [ ] Create
packages/core/src/clients/your-assistant.ts - [ ] Implement
IAssistantClientinterface - [ ] Map SDK events to
MessageChunktypes - [ ] Handle session creation and resumption
- [ ] Implement error handling and recovery
- [ ] Add to
packages/core/src/clients/factory.ts - [ ] Add environment variables to
.env.example - [ ] Test session persistence across restarts
- [ ] Test plan-to-execute transition (new session)
Adding a New Isolation Provider
- [ ] Create
packages/isolation/src/providers/your-provider.ts - [ ] Implement
IIsolationProviderinterface - [ ] Handle
create(),destroy(),get(),list(),healthCheck() - [ ] Optional: implement
adopt()for existing environment discovery - [ ] Register in
packages/isolation/src/factory.ts - [ ] Update database columns if needed (
isolation_providertype) - [ ] Test creation and cleanup lifecycle
- [ ] Test concurrent environments (multiple conversations)
Modifying Command System
- [ ] Update
substituteVariables()for new variable types - [ ] Add command to Command Handler for deterministic logic
- [ ] Update
/helpcommand output - [ ] Add example command file to
.zen/commands/ - [ ] Test variable substitution with edge cases
Common Patterns
Idempotent Operations
// Get or create - never fails
const conversation = await db.getOrCreateConversation(platform, id);
// Find or create codebase (GitHub adapter pattern)
const existing = await codebaseDb.findCodebaseByRepoUrl(url);
if (existing) return existing;
return await codebaseDb.createCodebase({...});Session Safety
// Always check for active session
const session = await sessionDb.getActiveSession(conversationId);
// Use transitionSession() for immutable session pattern
// Automatically deactivates old session and creates new one with audit trail
const newSession = await sessionDb.transitionSession(
conversationId,
'reset-requested', // TransitionTrigger
{ codebase_id, ai_assistant_type }
);Streaming Error Handling
try {
for await (const msg of aiClient.sendQuery(...)) {
if (msg.type === 'assistant') {
await platform.sendMessage(conversationId, msg.content);
}
}
} catch (error) {
log.error({ err: error, conversationId }, 'orchestrator_error');
await platform.sendMessage(
conversationId,
'An error occurred. Try /reset.'
);
}Context Injection
// GitHub: Pass issue/PR context as separate parameter
let contextToAppend: string | undefined;
if (eventType === 'issue' && issue) {
contextToAppend = `GitHub Issue #${String(issue.number)}: "${issue.title}"
Use 'gh issue view ${String(issue.number)}' for full details if needed.`;
} else if (eventType === 'pull_request' && pullRequest) {
contextToAppend = `GitHub Pull Request #${String(pullRequest.number)}: "${pullRequest.title}"
Use 'gh pr view ${String(pullRequest.number)}' for full details if needed.`;
}
await handleMessage(adapter, conversationId, finalMessage, contextToAppend);Context is passed as a dedicated issueContext parameter to handleMessage(), keeping it separate from the user's message. For workflows, context is injected via $CONTEXT / $ISSUE_CONTEXT variable substitution in buildPromptWithContext().
Reference: packages/adapters/src/forge/github/adapter.ts, packages/core/src/orchestrator/orchestrator.ts
Key Takeaways
Interfaces enable extensibility:
IPlatformAdapter,IAssistantClient, andIIsolationProviderallow adding platforms, providers, and isolation strategies without modifying core logicAsync generators for streaming: All AI clients return
AsyncGenerator<MessageChunk>for unified streaming across different SDKsSession persistence is critical: Store
assistant_session_idin database to maintain context across restartsPlatform-specific streaming: Each platform controls its own streaming mode via environment variables
Commands are file-based: Store only paths in database, actual commands in Git-versioned files
Plan-to-execute is special: Only transition requiring new session (prevents token bloat during implementation)
Factory pattern:
getAssistantClient()andgetIsolationProvider()instantiate correct implementations based on configurationError recovery: Always provide
/resetescape hatch for users when sessions get stuckIsolation adoption: Providers check for existing environments before creating new ones (enables skill symbiosis)
For detailed implementation examples, see:
- Platform adapter:
packages/adapters/src/chat/telegram/adapter.ts,packages/adapters/src/forge/github/adapter.ts - AI client:
packages/core/src/clients/claude.ts,packages/core/src/clients/codex.ts - Isolation provider:
packages/isolation/src/providers/worktree.ts - Isolation resolver:
packages/isolation/src/resolver.ts - Isolation factory:
packages/isolation/src/factory.ts - Orchestrator:
packages/core/src/orchestrator/orchestrator.ts - Command handler:
packages/core/src/handlers/command-handler.ts