Skip to main content

Event Emission

The satellite communicates with the backend through a centralized EventBus that emits typed events. These events enable real-time status updates, log streaming, and tool metadata synchronization without polling.

Overview

The satellite emits events for:
  • Status Changes: Real-time installation status updates
  • Server Logs: Batched stderr output from MCP servers
  • Request Logs: Batched tool execution logs with request/response data
  • Tool Metadata: Tool discovery results with token counts
  • Process Lifecycle: Server start, crash, restart, permanent failure events
All events are processed by the backend’s event handler system and trigger database updates, SSE broadcasts to frontend, and health monitoring actions.

Event System Architecture

Satellite Component (ProcessManager, McpServerWrapper, DiscoveryManager)

EventBus.emit(eventType, eventData)

Backend Polling Service (30-second interval)

Backend Event Handlers (process events, update database)

Frontend SSE Streams (real-time updates to users)

Event Types Reference

mcp.server.status_changed

Purpose: Update installation status in real-time Emitted by:
  • ProcessManager (connecting, online, crashed, permanently_failed)
  • McpServerWrapper (offline, error, requires_reauth on tool execution failures)
  • RemoteToolDiscoveryManager (connecting, online, offline, error, requires_reauth)
For complete status transition triggers and lifecycle flows, see Status Tracking. Payload:
{
  installation_id: string;
  team_id: string;
  status: 'provisioning' | 'command_received' | 'connecting' | 'discovering_tools'
    | 'syncing_tools' | 'online' | 'restarting' | 'offline' | 'error'
    | 'requires_reauth' | 'permanently_failed';
  status_message?: string;
  timestamp: string; // ISO 8601
}
Example:
eventBus.emit('mcp.server.status_changed', {
  installation_id: 'inst_abc123',
  team_id: 'team_xyz',
  status: 'online',
  status_message: 'Server connected successfully',
  timestamp: '2025-01-15T10:30:00.000Z'
});
Backend Action: Updates mcpServerInstallations.status and broadcasts via SSE

mcp.server.logs

Purpose: Stream server logs (stderr, connection errors, startup messages) to backend Emitted by:
  • ProcessManager (batched stderr output from stdio MCP servers)
Batching Strategy:
  • Interval: 3 seconds after first log entry
  • Max Size: 20 logs per batch (forces immediate flush)
  • Grouping: By installation_id + team_id
Payload:
{
  installation_id: string;
  team_id: string;
  logs: Array<{
    level: 'info' | 'warn' | 'error' | 'debug';
    message: string;
    metadata?: Record<string, unknown>;
    timestamp: string; // ISO 8601
  }>;
}
Example:
eventBus.emit('mcp.server.logs', {
  installation_id: 'inst_abc123',
  team_id: 'team_xyz',
  logs: [
    {
      level: 'error',
      message: 'Connection refused to http://localhost:3568/sse',
      metadata: { error_code: 'ECONNREFUSED' },
      timestamp: '2025-01-15T10:30:00.000Z'
    },
    {
      level: 'info',
      message: 'Retrying connection in 2 seconds...',
      timestamp: '2025-01-15T10:30:02.000Z'
    }
  ]
});
Backend Action: Inserts logs into mcpServerLogs table, enforces 100-line limit per installation

mcp.request.logs

Purpose: Stream tool execution logs with full request/response data Emitted by:
  • McpServerWrapper (batched tool call logs)
Batching Strategy:
  • Interval: 3 seconds after first request
  • Max Size: 20 requests per batch
  • Grouping: By installation_id + team_id
Payload:
{
  installation_id: string;
  team_id: string;
  requests: Array<{
    user_id?: string;
    tool_name: string;
    tool_params: Record<string, unknown>;
    tool_response?: unknown; // Full MCP server response
    response_time_ms: number;
    success: boolean;
    error_message?: string;
    timestamp: string; // ISO 8601
  }>;
}
Example:
eventBus.emit('mcp.request.logs', {
  installation_id: 'inst_abc123',
  team_id: 'team_xyz',
  requests: [
    {
      user_id: 'user_xyz',
      tool_name: 'github:list-repos',
      tool_params: { owner: 'deploystackio' },
      tool_response: { repos: ['deploystack', 'mcp-server'], total: 2 },
      response_time_ms: 234,
      success: true,
      timestamp: '2025-01-15T10:30:00.000Z'
    }
  ]
});
Backend Action: Inserts requests into mcpRequestLogs table, enforces 100-line limit Privacy Note: Only emitted if settings.request_logging_enabled !== false

mcp.tools.discovered

Purpose: Synchronize discovered tools and metadata to backend Emitted by:
  • UnifiedToolDiscoveryManager (after tool discovery completes)
Payload:
{
  installation_id: string;
  team_id: string;
  tools: Array<{
    tool_path: string; // e.g., "github:list-repos"
    name: string;
    description?: string;
    inputSchema: unknown;
    token_count: number; // Estimated token usage
  }>;
  timestamp: string; // ISO 8601
}
Example:
eventBus.emit('mcp.tools.discovered', {
  installation_id: 'inst_abc123',
  team_id: 'team_xyz',
  tools: [
    {
      tool_path: 'github:list-repos',
      name: 'list-repos',
      description: 'List all repositories for an owner',
      inputSchema: { type: 'object', properties: { owner: { type: 'string' } } },
      token_count: 42
    }
  ],
  timestamp: '2025-01-15T10:30:00.000Z'
});
Backend Action: Updates mcpTools table with discovered tools and metadata

Process Lifecycle Events

These events track stdio MCP server process state:

mcp.server.started

Emitted when: Stdio process successfully spawned Payload:
{
  installation_id: string;
  team_id: string;
  process_id: string;
  timestamp: string;
}

mcp.server.crashed

Emitted when: Stdio process terminates unexpectedly Payload:
{
  installation_id: string;
  team_id: string;
  process_id: string;
  exit_code: number | null;
  signal: string | null;
  crash_count: number; // Crashes within 5-minute window
  timestamp: string;
}

mcp.server.restarted

Emitted when: Stdio process automatically restarted after crash Payload:
{
  installation_id: string;
  team_id: string;
  process_id: string;
  restart_count: number;
  timestamp: string;
}

mcp.server.permanently_failed

Emitted when: Stdio process crashes 3 times within 5 minutes Payload:
{
  installation_id: string;
  team_id: string;
  process_id: string;
  crash_count: number; // Always 3
  message: string; // "Process crashed 3 times in 5 minutes"
  timestamp: string;
}
Backend Action: Sets installation status to permanently_failed, requires manual restart

Event Batching Strategy

Why Batching?

Batching reduces:
  • Backend API calls (20 logs = 1 API call instead of 20)
  • Database transactions (bulk insert instead of individual inserts)
  • Network overhead (fewer HTTP requests)
  • Backend processing load (batch operations are more efficient)

Batching Configuration

ParameterValueReason
Batch Interval3 secondsBalance between real-time feel and efficiency
Max Batch Size20 entriesPrevent large payloads, force timely emission
Grouping Keyinstallation_id + team_idSeparate batches per installation

Batching Implementation

Log batching implementation details are in Log Capture - Buffering Implementation for both server logs and request logs.

EventBus Usage

Emitting Events

import { EventBus } from './events/event-bus';

// EventBus is a singleton
const eventBus = EventBus.getInstance();

// Emit with type safety
eventBus.emit('mcp.server.status_changed', {
  installation_id: 'inst_123',
  team_id: 'team_456',
  status: 'online',
  timestamp: new Date().toISOString()
});

Event Registry

All event types are defined in the event registry:
// services/satellite/src/events/registry.ts

export type EventType =
  | 'mcp.server.status_changed'
  | 'mcp.server.logs'
  | 'mcp.request.logs'
  | 'mcp.tools.discovered'
  | 'mcp.server.started'
  | 'mcp.server.crashed'
  | 'mcp.server.restarted'
  | 'mcp.server.permanently_failed'
  // ... 13 total event types
  ;

export interface EventDataMap {
  'mcp.server.status_changed': { /* payload */ };
  'mcp.server.logs': { /* payload */ };
  // ... type definitions for all events
}

Backend Event Handlers

Each event type has a dedicated backend handler: Status Changed:
// services/backend/src/events/satellite/mcp-server-status-changed.ts
// Updates mcpServerInstallations.status
Server Logs:
// services/backend/src/events/satellite/mcp-server-logs.ts
// Inserts into mcpServerLogs table
Request Logs:
// services/backend/src/events/satellite/mcp-request-logs.ts
// Inserts into mcpRequestLogs table (if logging enabled)
Tools Discovered:
// services/backend/src/events/satellite/mcp-tools-discovered.ts
// Updates mcpTools table with metadata

Integration Points

Process Manager:
  • Emits server logs (stderr batching)
  • Emits lifecycle events (started, crashed, restarted, permanently_failed)
  • Emits status changes (connecting, online, permanently_failed)
MCP Server Wrapper:
  • Emits request logs (tool execution batching)
  • Emits status changes (offline, error, requires_reauth on failures)
  • Emits status changes (connecting, online on recovery)
Tool Discovery Managers:
  • Emit status changes (connecting, discovering_tools, online, offline, error)
  • Trigger tool metadata emission via UnifiedToolDiscoveryManager
Unified Tool Discovery Manager:
  • Emits mcp.tools.discovered after successful discovery
  • Coordinates status callbacks from discovery managers

Implementation Components

The event emission system consists of several integrated components:
  • Backend event handler system
  • Satellite status event emission
  • Server and request log batching
  • Tool metadata event emission
  • Stdio permanently_failed event
  • Tool execution failure status events