Skip to main content

Background Jobs System

DeployStack Satellite implements a centralized job management system for recurring background tasks. The system provides a consistent pattern for cron-like operations with automatic error handling, execution metrics, and lifecycle management.

Architecture Overview

The job system consists of three core components:
┌─────────────────────────────────────────────────────────────────┐
│                    Job System Architecture                      │
│                                                                 │
│  ┌──────────────┐    ┌──────────────┐    ┌──────────────┐    │
│  │   BaseJob    │    │  JobManager  │    │ Concrete Job │    │
│  │              │    │              │    │              │    │
│  │ • Interval   │◄───│ • Registry   │◄───│ HeartbeatJob │    │
│  │ • Execute    │    │ • Lifecycle  │    │ CleanupJob   │    │
│  │ • Metrics    │    │ • Monitoring │    │ CustomJob    │    │
│  └──────────────┘    └──────────────┘    └──────────────┘    │
└─────────────────────────────────────────────────────────────────┘

BaseJob Abstract Class

All jobs extend BaseJob, which provides:
  • Automatic Interval Execution: Jobs run on configured intervals
  • Immediate First Run: Execute immediately on start, then follow interval
  • Error Handling: Automatic error catching with structured logging
  • Execution Metrics: Track execution count, timing, and errors
  • Lifecycle Management: Start/stop methods with state tracking

JobManager

The JobManager provides centralized control:
  • Job Registry: Register and track all jobs
  • Lifecycle Control: Start/stop all jobs or individual jobs
  • Status Monitoring: Query job statistics and execution state
  • Graceful Shutdown: Stop all jobs cleanly on satellite shutdown

Current Jobs

Job NameIntervalPurposeStatus
heartbeat30 secondsSend status updates to backend✅ Active
cleanup5 minutesTemplate for new jobs📝 Example

Creating a New Job

Add a new background job in three steps:

Step 1: Create Job File

Create src/jobs/process-health-job.ts:
import { BaseJob } from './base-job';
import { FastifyBaseLogger } from 'fastify';

export class ProcessHealthJob extends BaseJob {
  constructor(logger: FastifyBaseLogger) {
    super('process-health', 120000, logger); // 2 minutes
  }

  protected async execute(): Promise<void> {
    this.logger.info({
      operation: 'process_health_check'
    }, 'Checking process health...');

    // Your job logic here

    this.logger.info({
      operation: 'process_health_complete'
    }, 'Health check completed');
  }
}

Step 2: Export from Index

Add to src/jobs/index.ts:
export { ProcessHealthJob } from './process-health-job';

Step 3: Register in Server

Add to src/server.ts:
import { JobManager, HeartbeatJob, CleanupJob, ProcessHealthJob } from './jobs';

const jobManager = new JobManager(server.log);
jobManager.register(new HeartbeatJob(heartbeatService));
jobManager.register(new CleanupJob(server.log));
jobManager.register(new ProcessHealthJob(server.log));

await jobManager.startAll();
That’s it! Your job will start running immediately and then execute every 2 minutes automatically.

Job Intervals

Common interval values in milliseconds:
// Seconds
30 * 1000           // 30 seconds
60 * 1000           // 1 minute

// Minutes
2 * 60 * 1000       // 2 minutes
5 * 60 * 1000       // 5 minutes
10 * 60 * 1000      // 10 minutes
15 * 60 * 1000      // 15 minutes
30 * 60 * 1000      // 30 minutes

// Hours
60 * 60 * 1000      // 1 hour
6 * 60 * 60 * 1000  // 6 hours
24 * 60 * 60 * 1000 // 24 hours

Environment-Configurable Intervals

Make job intervals configurable:
export class MyJob extends BaseJob {
  constructor(logger: FastifyBaseLogger) {
    const interval = parseInt(
      process.env.MY_JOB_INTERVAL || '300000',
      10
    );
    super('my-job', interval, logger);
  }
}
Add to .env.example:
# My Job interval in milliseconds (default: 5 minutes)
MY_JOB_INTERVAL=300000

Jobs with Dependencies

If your job needs access to services, inject them via constructor:
export class ProcessHealthJob extends BaseJob {
  constructor(
    logger: FastifyBaseLogger,
    private processManager: ProcessManager,
    private runtimeState: RuntimeState
  ) {
    super('process-health', 120000, logger);
  }

  protected async execute(): Promise<void> {
    const processes = this.processManager.getAllProcesses();
    
    for (const proc of processes) {
      if (proc.errorCount > 10) {
        this.logger.warn({
          process_id: proc.config.installation_id,
          error_count: proc.errorCount
        }, 'Process has high error count');
      }
    }
  }
}
Register with dependencies:
jobManager.register(
  new ProcessHealthJob(server.log, processManager, runtimeState)
);

Job Lifecycle

Initialization Flow

Satellite Startup

    ├── Register Satellite with Backend

    ├── Initialize Services

    ├── Create JobManager

    ├── Register Jobs
    │   ├── new HeartbeatJob(heartbeatService)
    │   ├── new CleanupJob(logger)
    │   └── new CustomJob(logger)

    ├── jobManager.startAll()
    │   ├── Start Job 1 → Execute immediately → Set interval
    │   ├── Start Job 2 → Execute immediately → Set interval
    │   └── Start Job 3 → Execute immediately → Set interval

    └── Satellite Ready

Job Execution Flow

Job Start

    ├── Execute Immediately (first run)
    │   ├── Log: job_execute_start
    │   ├── Run execute() method
    │   ├── Track execution time
    │   ├── Log: job_execute_success
    │   └── Update metrics

    ├── Wait for Interval

    └── Execute on Interval (repeating)
        ├── Log: job_execute_start
        ├── Run execute() method
        ├── Handle errors (if any)
        ├── Log: job_execute_success or job_execute_error
        └── Update metrics → Repeat

Monitoring and Observability

Structured Logging

All job events are logged with structured data:
// Job started
{
  "operation": "job_start",
  "job_name": "process-health",
  "interval_ms": 120000,
  "interval_seconds": 120
}

// Job executing
{
  "operation": "job_execute_start",
  "job_name": "process-health",
  "execution_number": 5
}

// Job completed
{
  "operation": "job_execute_success",
  "job_name": "process-health",
  "execution_number": 5,
  "execution_time_ms": 234
}

// Job error
{
  "operation": "job_execute_error",
  "job_name": "process-health",
  "execution_number": 5,
  "error_count": 2,
  "error": "Connection timeout"
}

Job Statistics

Query job statistics via JobManager:
const stats = jobManager.getStats('process-health');
// Returns:
{
  executionCount: 42,
  errorCount: 2,
  averageExecutionTime: 234,
  isRunning: true
}
Get all job statistics:
const allStats = jobManager.getAllStats();
// Returns array of all job statistics

Error Handling

Automatic Error Recovery

The BaseJob class automatically handles errors:
protected async execute(): Promise<void> {
  // If this throws, BaseJob catches it
  await someOperationThatMightFail();
  
  // Job continues running on next interval
}

Custom Error Handling

Add custom error handling for specific scenarios:
protected async execute(): Promise<void> {
  try {
    await this.criticalOperation();
  } catch (error) {
    this.logger.error({ error }, 'Critical operation failed');
    // Don't throw - let BaseJob track the error
  }
}

Timeout Protection

Add timeouts for long-running operations:
protected async execute(): Promise<void> {
  const controller = new AbortController();
  const timeout = setTimeout(() => controller.abort(), 60000);

  try {
    await this.longOperation({ signal: controller.signal });
  } finally {
    clearTimeout(timeout);
  }
}

Best Practices

1. Keep Jobs Focused

Each job should have a single responsibility: Good:
export class SessionCleanupJob extends BaseJob {
  protected async execute(): Promise<void> {
    await this.cleanupExpiredSessions();
  }
}
Bad:
export class MaintenanceJob extends BaseJob {
  protected async execute(): Promise<void> {
    await this.cleanupSessions();
    await this.checkProcessHealth();
    await this.rotateBlogs();
    await this.updateMetrics();
  }
}

2. Choose Appropriate Intervals

  • High-frequency (30s-1m): Health checks, critical monitoring
  • Medium (5m-15m): Cleanup tasks, periodic updates
  • Low (1h+): Reports, analytics, maintenance

3. Document Job Purpose

Add clear comments explaining what the job does:
/**
 * Process Health Check Job
 * 
 * Monitors all running MCP server processes and restarts unhealthy ones.
 * Runs every 2 minutes to ensure quick failure detection.
 * 
 * Checks:
 * - Process still running
 * - Error count within limits
 * - Response time acceptable
 * - Memory usage not excessive
 */
export class ProcessHealthJob extends BaseJob {
  // ...
}

4. Use Structured Logging

Always log with operation context:
protected async execute(): Promise<void> {
  this.logger.info({
    operation: 'cleanup_start',
    session_count: sessions.length
  }, 'Starting session cleanup...');

  // ... cleanup logic ...

  this.logger.info({
    operation: 'cleanup_complete',
    removed_count: removed
  }, 'Session cleanup completed');
}

Common Job Patterns

Health Check Pattern

export class HealthCheckJob extends BaseJob {
  constructor(
    logger: FastifyBaseLogger,
    private service: ServiceToMonitor
  ) {
    super('health-check', 120000, logger);
  }

  protected async execute(): Promise<void> {
    const isHealthy = await this.service.checkHealth();
    
    if (!isHealthy) {
      this.logger.warn({
        operation: 'health_check_failed',
        service: 'my-service'
      }, 'Service health check failed');
      
      await this.service.restart();
    }
  }
}

Cleanup Pattern

export class CleanupJob extends BaseJob {
  constructor(
    logger: FastifyBaseLogger,
    private manager: ResourceManager
  ) {
    super('cleanup', 900000, logger); // 15 minutes
  }

  protected async execute(): Promise<void> {
    const expired = await this.manager.findExpired();
    
    for (const resource of expired) {
      await this.manager.cleanup(resource);
    }
    
    this.logger.info({
      operation: 'cleanup_complete',
      count: expired.length
    }, 'Cleanup completed');
  }
}

Metrics Collection Pattern

export class MetricsJob extends BaseJob {
  constructor(
    logger: FastifyBaseLogger,
    private collector: MetricsCollector
  ) {
    super('metrics', 300000, logger); // 5 minutes
  }

  protected async execute(): Promise<void> {
    const metrics = await this.collector.collect();
    await this.collector.report(metrics);
  }
}

Troubleshooting

Job Not Starting

Check if the job is registered:
# Look for job_start logs
grep "job_start" satellite.log | grep "my-job"
Verify registration in code:
const jobs = jobManager.getRegisteredJobs();
console.log(jobs); // Should include your job name

Job Failing Repeatedly

Check error logs:
# Find job errors
grep "job_execute_error" satellite.log | grep "my-job"
Review error count in statistics:
const stats = jobManager.getStats('my-job');
console.log(`Error count: ${stats.errorCount}`);

Performance Issues

Monitor execution time:
# Check execution times
grep "job_execute_success" satellite.log | grep "my-job"
If execution time approaches interval:
  • Increase the interval
  • Optimize job logic
  • Consider breaking into smaller jobs

Job Not Executing on Time

Verify interval configuration:
// Log interval on job creation
this.logger.info({
  job_name: 'my-job',
  interval_ms: this.intervalMs,
  interval_seconds: this.intervalMs / 1000
}, 'Job interval configured');
Check system clock drift if timing is critical.

Future Enhancements

Planned improvements to the job system:
  • Job dependencies (Job B waits for Job A completion)
  • Conditional execution (skip job if condition not met)
  • Job state persistence (resume after satellite restart)
  • Distributed coordination (multi-satellite job scheduling)
  • Retry logic with exponential backoff
  • Dynamic interval adjustment based on load
  • Prometheus metrics export
  • Web UI for job management

Implementation Status

Current Features:
  • ✅ BaseJob abstract class with interval management
  • ✅ JobManager for centralized control
  • ✅ Automatic error handling and logging
  • ✅ Execution metrics tracking
  • ✅ HeartbeatJob integration
  • ✅ Template job for reference
In Development:
  • 🚧 Job priority levels
  • 🚧 Job status API endpoint
  • 🚧 Advanced monitoring features
The job system is production-ready and actively used for the heartbeat service. The pattern has proven stable and is ready for additional jobs.
I