Background Jobs System
DeployStack Satellite implements a centralized job management system for recurring background tasks. The system provides a consistent pattern for cron-like operations with automatic error handling, execution metrics, and lifecycle management.
Architecture Overview
The job system consists of three core components:
┌─────────────────────────────────────────────────────────────────┐
│ Job System Architecture │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ BaseJob │ │ JobManager │ │ Concrete Job │ │
│ │ │ │ │ │ │ │
│ │ • Interval │◄───│ • Registry │◄───│ HeartbeatJob │ │
│ │ • Execute │ │ • Lifecycle │ │ CleanupJob │ │
│ │ • Metrics │ │ • Monitoring │ │ CustomJob │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────────┘
BaseJob Abstract Class
All jobs extend BaseJob
, which provides:
- Automatic Interval Execution: Jobs run on configured intervals
- Immediate First Run: Execute immediately on start, then follow interval
- Error Handling: Automatic error catching with structured logging
- Execution Metrics: Track execution count, timing, and errors
- Lifecycle Management: Start/stop methods with state tracking
JobManager
The JobManager
provides centralized control:
- Job Registry: Register and track all jobs
- Lifecycle Control: Start/stop all jobs or individual jobs
- Status Monitoring: Query job statistics and execution state
- Graceful Shutdown: Stop all jobs cleanly on satellite shutdown
Current Jobs
Job Name | Interval | Purpose | Status |
---|
heartbeat | 30 seconds | Send status updates to backend | ✅ Active |
cleanup | 5 minutes | Template for new jobs | 📝 Example |
Creating a New Job
Add a new background job in three steps:
Step 1: Create Job File
Create src/jobs/process-health-job.ts
:
import { BaseJob } from './base-job';
import { FastifyBaseLogger } from 'fastify';
export class ProcessHealthJob extends BaseJob {
constructor(logger: FastifyBaseLogger) {
super('process-health', 120000, logger); // 2 minutes
}
protected async execute(): Promise<void> {
this.logger.info({
operation: 'process_health_check'
}, 'Checking process health...');
// Your job logic here
this.logger.info({
operation: 'process_health_complete'
}, 'Health check completed');
}
}
Step 2: Export from Index
Add to src/jobs/index.ts
:
export { ProcessHealthJob } from './process-health-job';
Step 3: Register in Server
Add to src/server.ts
:
import { JobManager, HeartbeatJob, CleanupJob, ProcessHealthJob } from './jobs';
const jobManager = new JobManager(server.log);
jobManager.register(new HeartbeatJob(heartbeatService));
jobManager.register(new CleanupJob(server.log));
jobManager.register(new ProcessHealthJob(server.log));
await jobManager.startAll();
That’s it! Your job will start running immediately and then execute every 2 minutes automatically.
Job Intervals
Common interval values in milliseconds:
// Seconds
30 * 1000 // 30 seconds
60 * 1000 // 1 minute
// Minutes
2 * 60 * 1000 // 2 minutes
5 * 60 * 1000 // 5 minutes
10 * 60 * 1000 // 10 minutes
15 * 60 * 1000 // 15 minutes
30 * 60 * 1000 // 30 minutes
// Hours
60 * 60 * 1000 // 1 hour
6 * 60 * 60 * 1000 // 6 hours
24 * 60 * 60 * 1000 // 24 hours
Environment-Configurable Intervals
Make job intervals configurable:
export class MyJob extends BaseJob {
constructor(logger: FastifyBaseLogger) {
const interval = parseInt(
process.env.MY_JOB_INTERVAL || '300000',
10
);
super('my-job', interval, logger);
}
}
Add to .env.example
:
# My Job interval in milliseconds (default: 5 minutes)
MY_JOB_INTERVAL=300000
Jobs with Dependencies
If your job needs access to services, inject them via constructor:
export class ProcessHealthJob extends BaseJob {
constructor(
logger: FastifyBaseLogger,
private processManager: ProcessManager,
private runtimeState: RuntimeState
) {
super('process-health', 120000, logger);
}
protected async execute(): Promise<void> {
const processes = this.processManager.getAllProcesses();
for (const proc of processes) {
if (proc.errorCount > 10) {
this.logger.warn({
process_id: proc.config.installation_id,
error_count: proc.errorCount
}, 'Process has high error count');
}
}
}
}
Register with dependencies:
jobManager.register(
new ProcessHealthJob(server.log, processManager, runtimeState)
);
Job Lifecycle
Initialization Flow
Satellite Startup
│
├── Register Satellite with Backend
│
├── Initialize Services
│
├── Create JobManager
│
├── Register Jobs
│ ├── new HeartbeatJob(heartbeatService)
│ ├── new CleanupJob(logger)
│ └── new CustomJob(logger)
│
├── jobManager.startAll()
│ ├── Start Job 1 → Execute immediately → Set interval
│ ├── Start Job 2 → Execute immediately → Set interval
│ └── Start Job 3 → Execute immediately → Set interval
│
└── Satellite Ready
Job Execution Flow
Job Start
│
├── Execute Immediately (first run)
│ ├── Log: job_execute_start
│ ├── Run execute() method
│ ├── Track execution time
│ ├── Log: job_execute_success
│ └── Update metrics
│
├── Wait for Interval
│
└── Execute on Interval (repeating)
├── Log: job_execute_start
├── Run execute() method
├── Handle errors (if any)
├── Log: job_execute_success or job_execute_error
└── Update metrics → Repeat
Monitoring and Observability
Structured Logging
All job events are logged with structured data:
// Job started
{
"operation": "job_start",
"job_name": "process-health",
"interval_ms": 120000,
"interval_seconds": 120
}
// Job executing
{
"operation": "job_execute_start",
"job_name": "process-health",
"execution_number": 5
}
// Job completed
{
"operation": "job_execute_success",
"job_name": "process-health",
"execution_number": 5,
"execution_time_ms": 234
}
// Job error
{
"operation": "job_execute_error",
"job_name": "process-health",
"execution_number": 5,
"error_count": 2,
"error": "Connection timeout"
}
Job Statistics
Query job statistics via JobManager:
const stats = jobManager.getStats('process-health');
// Returns:
{
executionCount: 42,
errorCount: 2,
averageExecutionTime: 234,
isRunning: true
}
Get all job statistics:
const allStats = jobManager.getAllStats();
// Returns array of all job statistics
Error Handling
Automatic Error Recovery
The BaseJob
class automatically handles errors:
protected async execute(): Promise<void> {
// If this throws, BaseJob catches it
await someOperationThatMightFail();
// Job continues running on next interval
}
Custom Error Handling
Add custom error handling for specific scenarios:
protected async execute(): Promise<void> {
try {
await this.criticalOperation();
} catch (error) {
this.logger.error({ error }, 'Critical operation failed');
// Don't throw - let BaseJob track the error
}
}
Timeout Protection
Add timeouts for long-running operations:
protected async execute(): Promise<void> {
const controller = new AbortController();
const timeout = setTimeout(() => controller.abort(), 60000);
try {
await this.longOperation({ signal: controller.signal });
} finally {
clearTimeout(timeout);
}
}
Best Practices
1. Keep Jobs Focused
Each job should have a single responsibility:
Good:
export class SessionCleanupJob extends BaseJob {
protected async execute(): Promise<void> {
await this.cleanupExpiredSessions();
}
}
Bad:
export class MaintenanceJob extends BaseJob {
protected async execute(): Promise<void> {
await this.cleanupSessions();
await this.checkProcessHealth();
await this.rotateBlogs();
await this.updateMetrics();
}
}
2. Choose Appropriate Intervals
- High-frequency (30s-1m): Health checks, critical monitoring
- Medium (5m-15m): Cleanup tasks, periodic updates
- Low (1h+): Reports, analytics, maintenance
3. Document Job Purpose
Add clear comments explaining what the job does:
/**
* Process Health Check Job
*
* Monitors all running MCP server processes and restarts unhealthy ones.
* Runs every 2 minutes to ensure quick failure detection.
*
* Checks:
* - Process still running
* - Error count within limits
* - Response time acceptable
* - Memory usage not excessive
*/
export class ProcessHealthJob extends BaseJob {
// ...
}
4. Use Structured Logging
Always log with operation context:
protected async execute(): Promise<void> {
this.logger.info({
operation: 'cleanup_start',
session_count: sessions.length
}, 'Starting session cleanup...');
// ... cleanup logic ...
this.logger.info({
operation: 'cleanup_complete',
removed_count: removed
}, 'Session cleanup completed');
}
Common Job Patterns
Health Check Pattern
export class HealthCheckJob extends BaseJob {
constructor(
logger: FastifyBaseLogger,
private service: ServiceToMonitor
) {
super('health-check', 120000, logger);
}
protected async execute(): Promise<void> {
const isHealthy = await this.service.checkHealth();
if (!isHealthy) {
this.logger.warn({
operation: 'health_check_failed',
service: 'my-service'
}, 'Service health check failed');
await this.service.restart();
}
}
}
Cleanup Pattern
export class CleanupJob extends BaseJob {
constructor(
logger: FastifyBaseLogger,
private manager: ResourceManager
) {
super('cleanup', 900000, logger); // 15 minutes
}
protected async execute(): Promise<void> {
const expired = await this.manager.findExpired();
for (const resource of expired) {
await this.manager.cleanup(resource);
}
this.logger.info({
operation: 'cleanup_complete',
count: expired.length
}, 'Cleanup completed');
}
}
Metrics Collection Pattern
export class MetricsJob extends BaseJob {
constructor(
logger: FastifyBaseLogger,
private collector: MetricsCollector
) {
super('metrics', 300000, logger); // 5 minutes
}
protected async execute(): Promise<void> {
const metrics = await this.collector.collect();
await this.collector.report(metrics);
}
}
Troubleshooting
Job Not Starting
Check if the job is registered:
# Look for job_start logs
grep "job_start" satellite.log | grep "my-job"
Verify registration in code:
const jobs = jobManager.getRegisteredJobs();
console.log(jobs); // Should include your job name
Job Failing Repeatedly
Check error logs:
# Find job errors
grep "job_execute_error" satellite.log | grep "my-job"
Review error count in statistics:
const stats = jobManager.getStats('my-job');
console.log(`Error count: ${stats.errorCount}`);
Monitor execution time:
# Check execution times
grep "job_execute_success" satellite.log | grep "my-job"
If execution time approaches interval:
- Increase the interval
- Optimize job logic
- Consider breaking into smaller jobs
Job Not Executing on Time
Verify interval configuration:
// Log interval on job creation
this.logger.info({
job_name: 'my-job',
interval_ms: this.intervalMs,
interval_seconds: this.intervalMs / 1000
}, 'Job interval configured');
Check system clock drift if timing is critical.
Future Enhancements
Planned improvements to the job system:
- Job dependencies (Job B waits for Job A completion)
- Conditional execution (skip job if condition not met)
- Job state persistence (resume after satellite restart)
- Distributed coordination (multi-satellite job scheduling)
- Retry logic with exponential backoff
- Dynamic interval adjustment based on load
- Prometheus metrics export
- Web UI for job management
Implementation Status
Current Features:
- ✅ BaseJob abstract class with interval management
- ✅ JobManager for centralized control
- ✅ Automatic error handling and logging
- ✅ Execution metrics tracking
- ✅ HeartbeatJob integration
- ✅ Template job for reference
In Development:
- 🚧 Job priority levels
- 🚧 Job status API endpoint
- 🚧 Advanced monitoring features
The job system is production-ready and actively used for the heartbeat service. The pattern has proven stable and is ready for additional jobs.