Skip to main content

Satellite Events System

The Satellite Events System provides real-time communication from satellites to the backend for operational visibility, audit trails, and user feedback. Events are processed through a convention-based dispatcher that routes them to handlers updating existing business tables.

Architecture Overview

Event Flow

Satellite → EventBus (3s batching) → POST /api/satellites/{id}/events → Backend Dispatcher → Handler → Business Table
Key Principle: Events are routing triggers that update existing business tables, not raw event storage. Each handler performs meaningful business logic rather than storing JSON blobs.

Why Events vs Heartbeat?

DeployStack uses three distinct communication channels: Heartbeat (Every 30 seconds):
  • Aggregate metrics and system health
  • Resource monitoring and capacity planning
  • Process counts grouped by team
Events (Immediate with 3s batching):
  • Point-in-time occurrences with precise timestamps
  • Real-time UI updates and user notifications
  • Audit trails for compliance
Commands (Polling):
  • Backend-initiated tasks
  • Configuration updates and process management

Backend Implementation

Directory Structure

services/backend/src/events/satellite/
├── index.ts                    # Event dispatcher (auto-discovers handlers)
├── types.ts                    # Shared TypeScript interfaces
├── mcp-server-started.ts       # Updates satelliteProcesses status
├── mcp-server-crashed.ts       # Updates satelliteProcesses with error
├── mcp-tool-executed.ts        # Logs to satelliteUsageLogs
└── [future-event-types].ts     # Additional handlers as needed

Convention-Based Handler Discovery

The dispatcher automatically discovers and registers handlers from the handlerModules array in index.ts:
const handlerModules = [
  () => import('./mcp-server-started'),
  () => import('./mcp-tool-executed'),
  () => import('./mcp-server-crashed'),
  // Add new handlers here - they will be automatically registered
];
Each handler must export three components:
  1. EVENT_TYPE: String constant identifying the event
  2. SCHEMA: JSON Schema for AJV validation
  3. handle(): Async function that updates business tables

Handler Interface

All event handlers must implement this interface:
export interface EventHandler {
  EVENT_TYPE: string;
  SCHEMA: Record<string, unknown>;
  handle: (
    satelliteId: string,
    eventData: Record<string, unknown>,
    db: LibSQLDatabase,
    eventTimestamp: Date
  ) => Promise<void>;
}

Event Processing

Batch Endpoint

Route: POST /api/satellites/{satelliteId}/events Authentication: Satellite API key (Bearer token via requireSatelliteAuth() middleware) Request Schema (uses snake_case for all fields):
{
  "events": [
    {
      "type": "mcp.server.started",
      "timestamp": "2025-01-10T10:30:45.123Z",
      "data": {
        "process_id": "proc-123",
        "server_id": "filesystem-team-xyz",
        "server_slug": "filesystem",
        "team_id": "team-xyz",
        "pid": 12345,
        "transport": "stdio",
        "tool_count": 0,
        "spawn_duration_ms": 234
      }
    }
  ]
}
Response Schema:
{
  "success": true,
  "processed": 45,
  "failed": 0,
  "event_ids": ["evt_1736512345_abc123", "evt_1736512346_def456"]
}

Batch Processing Strategy

The dispatcher processes batched events with isolated error handling:
  1. Validate request structure (events array present)
  2. Validate batch size (1-100 events)
  3. Process each event individually:
    • Check event type exists in registry
    • Validate event data against handler schema using AJV
    • Parse and validate timestamp
    • Call handler.handle() for valid events
    • Track successful and failed events
  4. Return aggregated results
Error Isolation: Invalid events are logged and skipped without failing the entire batch. Valid events in the same batch are still processed.

Partial Success Handling

When some events fail validation, the endpoint returns partial success:
{
  "success": true,
  "processed": 43,
  "failed": 2,
  "event_ids": ["evt_001", "evt_002", "..."],
  "failures": [
    {
      "index": 5,
      "type": "mcp.unknown.event",
      "error": "Unknown event type"
    },
    {
      "index": 12,
      "type": "mcp.tool.executed",
      "error": "Missing required field: toolName"
    }
  ]
}

Implemented Event Types

MCP Server Lifecycle

mcp.server.started

Updates satelliteProcesses table when server successfully spawns. Business Logic: Sets status=‘running’, records start time and process PID. Required Fields (snake_case): process_id, server_id, server_slug, team_id, transport, tool_count, spawn_duration_ms Optional Fields: pid (OS process ID)

mcp.server.crashed

Updates satelliteProcesses table when server exits unexpectedly. Business Logic: Sets status=‘failed’, logs error details and exit code. Required Fields (snake_case): process_id, server_id, server_slug, team_id, exit_code, signal, uptime_seconds, crash_count, will_restart Optional Fields: None (all fields required for proper crash tracking)

Tool Execution

mcp.tool.executed

Inserts record into satelliteUsageLogs for analytics and audit trails. Business Logic: Logs tool execution with metrics, user context, and performance data. Required Fields (snake_case): tool_name, server_id, team_id, duration_ms, success Optional Fields: error_message (string, only present when success=false)

Creating New Event Handlers

Handler Template

CRITICAL: All event data fields MUST use snake_case naming convention to match satellite event emission and backend API standards. Create a new file in services/backend/src/events/satellite/:
import type { LibSQLDatabase } from 'drizzle-orm/libsql';
import { yourTable } from '../../db/schema.sqlite';
import { eq } from 'drizzle-orm';

export const EVENT_TYPE = 'your.event.type';

export const SCHEMA = {
  type: 'object',
  properties: {
    required_field: {
      type: 'string',
      minLength: 1,
      description: 'Description of this field'
    },
    optional_field: {
      type: 'number',
      description: 'Optional numeric field'
    }
  },
  required: ['required_field'],
  additionalProperties: true
} as const;

// TypeScript interface can use camelCase internally
interface YourEventData {
  required_field: string;
  optional_field?: number;
}

export async function handle(
  satelliteId: string,
  eventData: Record<string, unknown>,
  db: LibSQLDatabase,
  eventTimestamp: Date
): Promise<void> {
  const data = eventData as unknown as YourEventData;
  
  // Update existing business table
  await db.update(yourTable)
    .set({ 
      status: 'updated',
      updated_at: eventTimestamp
    })
    .where(eq(yourTable.id, data.required_field));
}

Registration Steps

  1. Create handler file in services/backend/src/events/satellite/
  2. Export EVENT_TYPE, SCHEMA, and handle() function
  3. Add import to handlerModules array in index.ts:
const handlerModules = [
  () => import('./mcp-server-started'),
  () => import('./mcp-tool-executed'),
  () => import('./mcp-server-crashed'),
  () => import('./your-new-handler'), // Add here
];
  1. Handler is automatically registered and ready to process events

Schema Validation

AJV Configuration

The dispatcher uses AJV with specific configuration for compatibility:
const ajv = new Ajv({ 
  allErrors: true,        // Report all validation errors
  strict: false,          // Allow unknown keywords
  strictTypes: false      // Disable strict type checking
});
addFormats(ajv);          // Add format validators (email, date-time, etc.)

Validation Process

For each event:
  1. Compile handler SCHEMA with AJV
  2. Validate event.data against compiled schema
  3. Log validation errors with instance path details
  4. Skip invalid events (don’t fail entire batch)

Schema Best Practices

  • Use minLength: 1 for required string fields
  • Include descriptive description fields for documentation
  • Set additionalProperties: true to allow future extensibility
  • Use required array for mandatory fields
  • Leverage AJV formats: email, date-time, uri, uuid

Database Integration

Event-to-Table Mapping

Events route to existing business tables based on their purpose:
Event TypeBusiness TableAction
mcp.server.startedsatelliteProcessesUpdate status=‘running’, set start time
mcp.server.crashedsatelliteProcessesUpdate status=‘failed’, log error details
mcp.tool.executedsatelliteUsageLogsInsert usage record with metrics

Transaction Strategy

Each event is processed in a separate database transaction:
  • Failed events don’t rollback other events
  • Maintains data consistency per event
  • Isolated error handling prevents cascade failures

Database Driver Compatibility

When updating records, use the driver-compatible pattern:
const result = await db.update(table).set(data).where(condition);

// Handle both SQLite (changes) and Turso (rowsAffected)
const updated = (result.changes || result.rowsAffected || 0) > 0;

Performance Considerations

Batch Processing Efficiency

  • Target: < 100ms per 100-event batch
  • Isolation: Each event in separate transaction
  • Logging: Structured logging with batch metrics
  • Monitoring: Track processing duration and success rates

Database Performance

  • Updates use indexed lookups (processId, satelliteId)
  • Inserts optimized for high-volume logging
  • No generic JSON storage overhead
  • Leverages existing optimized table schemas

Memory Usage

  • Batch size limited to 100 events (backend validation)
  • Event processing is sequential (simple implementation)
  • No long-lived memory allocations
  • Efficient JSON parsing with TypeScript interfaces

Error Handling

Invalid Event Type

Response: Partial success with failure details Logging: Warn level with event type Action: Skip event, continue batch processing

Schema Validation Failure

Response: Partial success with validation errors Logging: Warn level with instance path details Action: Skip event, log validation errors

Handler Execution Error

Response: Partial success with error message Logging: Error level with stack trace Action: Catch error, track failure, continue batch

Database Transaction Failure

Response: Partial success with database error Logging: Error level with query details Action: Rollback transaction, track failure, continue batch

Testing

Unit Testing

Test individual event handlers in isolation:
// Test handler validation
const validData = { processId: 'proc-123', serverId: 'server-xyz', ... };
await handler.handle('satellite-id', validData, mockDb, new Date());

// Test schema validation
const validate = ajv.compile(handler.SCHEMA);
expect(validate(validData)).toBe(true);

Integration Testing

Test full endpoint with satellite authentication:
curl -X POST http://localhost:3000/api/satellites/{satelliteId}/events \
  -H "Authorization: Bearer {satellite_api_key}" \
  -H "Content-Type: application/json" \
  -d '{
    "events": [
      {
        "type": "mcp.server.started",
        "timestamp": "2025-01-10T10:30:45.123Z",
        "data": {
          "processId": "proc-123",
          "serverId": "filesystem-test",
          "serverName": "Filesystem MCP",
          "teamId": "test-team"
        }
      }
    ]
  }'

Batch Processing Tests

  • Single event batch (1 event)
  • Normal batch (50 events)
  • Maximum batch (100 events)
  • Oversized batch (> 100 events, should reject)
  • Mixed success/failure batch
  • Unknown event type handling
  • Invalid timestamp handling
  • Schema validation failures

Monitoring and Debugging

Structured Logging

All event operations are logged with structured data:
# Event processing started
{"level":"info","satelliteId":"sat-123","batchSize":45}

# Successful processing
{"level":"info","satelliteId":"sat-123","eventType":"mcp.server.started","msg":"Event processed"}

# Validation failure
{"level":"warn","eventType":"unknown.type","msg":"Unknown event type"}

# Batch complete
{"level":"info","satelliteId":"sat-123","processed":43,"failed":2,"msg":"Batch complete"}

Debug Queries

Check registered event types:
import { getRegisteredEventTypes } from '../events/satellite';

const types = await getRegisteredEventTypes();
console.log('Registered event types:', types);
Verify database updates:
-- Check process status after mcp.server.started
SELECT status, started_at, process_pid 
FROM satelliteProcesses 
WHERE id = 'proc-123';

-- Check tool execution logs
SELECT tool_name, duration_ms, status_code, timestamp
FROM satelliteUsageLogs
WHERE satellite_id = 'sat-123'
ORDER BY timestamp DESC
LIMIT 10;

Best Practices

Event Handler Design

DO:
  • Update existing business tables with structured data
  • Use TypeScript interfaces for type safety
  • Include comprehensive field descriptions in schemas
  • Log important state changes
  • Handle optional fields gracefully
DON’T:
  • Store raw JSON in generic events tables
  • Assume all optional fields are present
  • Skip error handling in database operations
  • Use blocking operations (keep handlers async)
  • Duplicate business logic across handlers

Schema Design

DO:
  • Use descriptive field names matching domain concepts
  • Include description for documentation
  • Set appropriate minLength and format constraints
  • Use additionalProperties: true for extensibility
  • Mark truly required fields in required array
DON’T:
  • Over-constrain with excessive validation
  • Use generic field names like data or info
  • Forget to set as const on schema objects
  • Validate business logic in schemas (do that in handlers)
  • Create schemas with circular references

Database Operations

DO:
  • Use parameterized queries via Drizzle ORM
  • Handle both SQLite and Turso driver differences
  • Include timestamps for all state changes
  • Use transactions for multi-step operations
  • Index frequently queried fields
DON’T:
  • Concatenate SQL strings manually
  • Assume specific driver properties exist
  • Skip error handling for database operations
  • Create N+1 query patterns
  • Store large BLOBs in event data

Future Enhancements

Planned Event Types

  • Client Connections: mcp.client.connected, mcp.client.disconnected
  • Tool Discovery: mcp.tools.discovered, mcp.tools.updated
  • Configuration: config.refreshed, config.error
  • Satellite Lifecycle: satellite.registered, satellite.deregistered
  • Process Management: mcp.server.restarted, mcp.server.permanently_failed

Performance Optimizations

  • Batch database insertions for high-volume events
  • Async event processing with job queue
  • Event sampling for high-frequency events
  • Compression for large event payloads

Analytics Features

  • Real-time event aggregation
  • Custom alert rules based on events
  • Event replay for debugging
  • Historical event analysis dashboards
I