Skip to main content

Background Job Queue System

The DeployStack backend includes a custom background job queue system for processing long-running tasks that cannot be completed within a typical HTTP request/response cycle. The system uses database-backed persistence with automatic retries and rate limiting.

Overview

The job queue system solves the challenge of processing tasks that require:
  • Extended execution time - Operations taking minutes to hours
  • External API dependencies - Calls to third-party services with rate limits
  • Large-scale batch operations - Processing hundreds or thousands of items
  • Failure resilience - Automatic retry logic for transient errors
  • Progress tracking - Monitor task completion status

Architecture

The system consists of four core components working together:

JobQueueService

CRUD operations for managing jobs in the database. Handles job creation, status updates, and retrieval of pending jobs.

JobProcessorService

Background worker loop that polls the database every second, processes jobs sequentially, respects rate limits via scheduled_for timestamps, and implements exponential backoff for retries.

Worker Registry

Plugin-style pattern where each worker implements the Worker interface. Workers are registered by type (e.g., send_email, process_csv, sync_registry) and execute specific job types.

Database Tables

Two tables provide persistence: queueJobs stores individual jobs with payload and status, while queueJobBatches tracks groups of related jobs for progress monitoring.

Core Concepts

Jobs

Individual units of work with a specific type, JSON payload, and status tracking. Jobs flow through states: pendingprocessingcompleted or failed.

Workers

Execution handlers that implement the Worker interface. Each worker is responsible for one job type and receives payload data plus context (database, logger).

Batches

Logical grouping of related jobs that enables tracking progress across multiple jobs. Useful for operations like sending 1,000 emails or processing a large CSV file.

Rate Limiting

Built-in support through the scheduled_for field. Jobs scheduled for future execution remain in pending state until their scheduled time, preventing API rate limit violations.

When to Use Job Queue vs Events

The job queue system complements rather than replaces the Global Event Bus.
Use Job Queue for:
  • Long-running operations (>30 seconds)
  • Tasks requiring retry logic
  • Operations that must survive server restarts
  • Batch processing with progress tracking
  • Rate-limited external API calls
Use Event Bus for:
  • Fire-and-forget notifications
  • Real-time event distribution
  • Triggering multiple actions from one event
  • Low-latency intra-application messaging
Combining Both: Event listeners can create jobs for heavy processing:
// Event listener creates job for heavy work
eventBus.on('user.registered', async (eventData, context) => {
  await jobQueueService.createJob('send_welcome_email', {
    userId: eventData.userId,
    email: eventData.data.email
  });
});

Creating Workers

Workers live in services/backend/src/workers/ and implement the Worker interface from workers/types.ts.

Worker Interface

interface Worker {
  execute(payload: unknown, jobId: string): Promise<WorkerResult>;
}

interface WorkerResult {
  success: boolean;
  message?: string;
  data?: any;
}

Basic Worker Pattern

import type { AnyDatabase } from '../db';
import type { FastifyBaseLogger } from 'fastify';
import type { Worker, WorkerResult } from './types';

interface EmailPayload {
  to: string;
  subject: string;
  body: string;
}

export class EmailWorker implements Worker {
  constructor(
    private readonly db: AnyDatabase,
    private readonly logger: FastifyBaseLogger
  ) {}

  async execute(payload: unknown, jobId: string): Promise<WorkerResult> {
    if (!this.isValidPayload(payload)) {
      return {
        success: false,
        message: 'Invalid payload format'
      };
    }

    const emailPayload = payload as EmailPayload;

    try {
      await this.sendEmail(emailPayload);
      
      this.logger.info({ 
        jobId, 
        to: emailPayload.to,
        operation: 'send_email'
      }, 'Email sent successfully');

      return {
        success: true,
        message: 'Email sent successfully'
      };
    } catch (error) {
      this.logger.error({ 
        jobId, 
        error,
        operation: 'send_email'
      }, 'Failed to send email');
      
      throw error; // Triggers retry logic
    }
  }

  private isValidPayload(payload: unknown): payload is EmailPayload {
    if (typeof payload !== 'object' || payload === null) return false;
    const p = payload as any;
    return typeof p.to === 'string' && 
           typeof p.subject === 'string' && 
           typeof p.body === 'string';
  }

  private async sendEmail(payload: EmailPayload): Promise<void> {
    // Implementation
  }
}

Worker Registration

Register workers in workers/index.ts:
import { EmailWorker } from './emailWorker';

export function registerWorkers(
  processor: JobProcessorService, 
  db: AnyDatabase,
  logger: FastifyBaseLogger
) {
  processor.registerWorker('send_email', new EmailWorker(db, logger));
  
  logger.info('Workers registered successfully');
}

Error Handling Strategies

Retriable Errors - Throw errors for temporary failures that might succeed on retry:
async execute(payload: unknown): Promise<WorkerResult> {
  try {
    await this.callExternalApi(payload);
    return { success: true };
  } catch (error) {
    throw error; // JobProcessor handles exponential backoff
  }
}
Non-Retriable Errors - Return failure for permanent errors that won’t be fixed by retrying:
async execute(payload: unknown): Promise<WorkerResult> {
  if (!this.isValid(payload)) {
    return {
      success: false,
      message: 'Invalid data format - will not retry'
    };
  }
}

Worker Best Practices

  1. Keep Workers Stateless - No state between executions
  2. Inject Dependencies - Database, logger, and services through constructor
  3. Validate Payloads - Always validate before processing
  4. Use Structured Logging - Include context objects with operation, jobId, and relevant data
  5. Single Responsibility - One worker per job type
  6. Make Testable - Design for easy unit testing with mocked dependencies

Creating Jobs

From API Routes

server.post('/api/users/:id/send-welcome', async (request, reply) => {
  const { id } = request.params;
  
  const job = await jobQueueService.createJob('send_welcome_email', {
    userId: id,
    email: user.email
  });
  
  return { jobId: job.id, message: 'Email queued' };
});

With Rate Limiting

// Schedule jobs 1 second apart to respect API limits
for (let i = 0; i < users.length; i++) {
  await jobQueueService.createJob('sync_user_data', {
    userId: users[i].id
  }, {
    scheduledFor: new Date(Date.now() + (i * 1000))
  });
}

Batch Operations

const batch = await jobQueueService.createBatch(
  'process_users', 
  userIds.length,
  { source: 'admin_action', requestedBy: adminId }
);

for (const userId of userIds) {
  await jobQueueService.createJob('process_user', 
    { userId }, 
    { batchId: batch.id }
  );
}

Server Integration

The job queue integrates with the backend server lifecycle in server.ts within the initializeDatabaseDependentServices function.

Initialization

The job queue system is initialized automatically after the database is ready:
// In initializeDatabaseDependentServices function
try {
  server.log.debug('🔄 Initializing Job Queue System...');
  const { JobQueueService } = await import('./services/jobQueueService');
  const { JobProcessorService } = await import('./services/jobProcessorService');
  const { registerWorkers } = await import('./workers');
  
  // Initialize JobQueueService
  const jobQueueService = new JobQueueService(dbInstance, server.log);
  server.log.debug('✅ JobQueueService initialized');
  
  // Initialize JobProcessorService (pass db and logger)
  const jobProcessorService = new JobProcessorService(dbInstance, server.log);
  server.log.debug('✅ JobProcessorService initialized');
  
  // Register workers
  registerWorkers(jobProcessorService, dbInstance, server.log);
  server.log.debug('✅ Workers registered');
  
  // Start processing jobs
  await jobProcessorService.start();
  server.log.info('✅ Job Queue System started and processing jobs');
  
  // Decorate server with job services for use in routes
  if (!server.hasDecorator('jobQueueService')) {
    server.decorate('jobQueueService', jobQueueService);
  } else {
    (server as any).jobQueueService = jobQueueService;
  }
  
  if (!server.hasDecorator('jobProcessorService')) {
    server.decorate('jobProcessorService', jobProcessorService);
  } else {
    (server as any).jobProcessorService = jobProcessorService;
  }
  
} catch (jobQueueError) {
  server.log.error({
    error: jobQueueError,
    message: jobQueueError instanceof Error ? jobQueueError.message : 'Unknown error',
    stack: jobQueueError instanceof Error ? jobQueueError.stack : 'No stack trace'
  }, '❌ Job Queue System failed to initialize:');
  server.log.warn('⚠️ Continuing without Job Queue System due to error');
}

Graceful Shutdown

Graceful shutdown is handled in the onClose hook to ensure current jobs complete before server shutdown:
server.addHook('onClose', async () => {
  // Stop job processor first to gracefully finish current jobs
  if ((server as any).jobProcessorService) {
    server.log.info('Stopping job processor...');
    await (server as any).jobProcessorService.stop();
    server.log.info('Job processor stopped.');
  }
  
  await pluginManager.shutdownPlugins();
  const rawConn = server.rawDbConnection;
  if (rawConn) {
    const status = getDbStatus();
    if (status.dialect === 'sqlite' && 'close' in rawConn) {
      (rawConn as SqliteDriver.Database).close();
      server.log.info('SQLite connection closed.');
    }
  }
});
The job processor’s stop() method will:
  1. Stop accepting new jobs from the queue
  2. Wait for the current job to complete (with 30-second timeout)
  3. Clean up resources
This ensures no jobs are interrupted mid-execution during server shutdown.

Job Monitoring

Job Status Lifecycle

Jobs transition through these states:
  1. pending - Job created, waiting to be processed
  2. processing - Currently executing
  3. completed - Executed successfully
  4. failed - Execution failed after all retries

Database Queries

Check job status:
SELECT id, type, status, created_at, started_at, completed_at, attempts
FROM queue_jobs 
WHERE id = ?;
Monitor batch progress:
SELECT 
  b.id,
  b.total_jobs,
  COUNT(CASE WHEN j.status = 'completed' THEN 1 END) as completed,
  COUNT(CASE WHEN j.status = 'failed' THEN 1 END) as failed,
  COUNT(CASE WHEN j.status = 'processing' THEN 1 END) as processing
FROM queue_job_batches b
LEFT JOIN queue_jobs j ON j.batch_id = b.id
WHERE b.id = ?
GROUP BY b.id;

Common Issues

Jobs Not Processing:
  • Verify JobProcessorService is started
  • Check worker is registered for job type
  • Review server logs for errors
Jobs Stuck in Processing:
  • Server crashed during job execution
  • Manual intervention: Set status back to pending
  • System will retry after exponential backoff
High Retry Count:
  • Worker throwing errors unnecessarily
  • External service temporarily unavailable
  • Review worker error handling logic

Database Schema

For the complete database schema, see schema.sqlite.ts in the backend directory.

Jobs Table

CREATE TABLE queue_jobs (
  id TEXT PRIMARY KEY,
  type TEXT NOT NULL,
  payload TEXT,
  status TEXT DEFAULT 'pending',
  attempts INTEGER DEFAULT 0,
  max_attempts INTEGER DEFAULT 3,
  scheduled_for INTEGER,
  created_at INTEGER DEFAULT (unixepoch()),
  started_at INTEGER,
  completed_at INTEGER,
  last_error TEXT,
  batch_id TEXT,
  FOREIGN KEY (batch_id) REFERENCES queue_job_batches(id)
);

Batches Table

CREATE TABLE queue_job_batches (
  id TEXT PRIMARY KEY,
  name TEXT NOT NULL,
  total_jobs INTEGER NOT NULL,
  created_at INTEGER DEFAULT (unixepoch()),
  metadata TEXT
);

System Behavior

Processing Loop

  1. Poll database every 1 second for pending jobs
  2. Check if job’s scheduled_for time has passed
  3. Lock job by setting status to processing
  4. Execute worker for job type
  5. Update status based on result
  6. Implement exponential backoff for retries (1s, 2s, 4s, etc.)

Resource Usage

  • CPU: <5% during normal operation (mostly waiting)
  • Memory: Minimal footprint, jobs processed sequentially
  • Database: Single query per second, additional writes during processing
  • Non-blocking: Async processing doesn’t block main event loop

Performance Characteristics

  • Throughput: 1-60 jobs per minute depending on job duration
  • Latency: 1-second maximum delay before job starts
  • Concurrency: Sequential processing prevents resource overload
  • Scalability: Suitable for small to medium deployments

Design Decisions

Why Database-Backed?

No additional infrastructure required (Redis, message queues). Uses existing SQLite/Turso database, and jobs persist across server restarts.

Why Sequential Processing?

Prevents server resource exhaustion from hundreds of concurrent operations. Simplifies rate limiting for external APIs. Adequate for most use cases (1,000 items = 15-30 minutes).

Why 1-Second Polling?

Balance between responsiveness and database load. Adequate latency for background tasks. Can be adjusted if needed.

Limitations

  • Not suitable for sub-second latency requirements
  • Single-server deployment (no distributed workers)
  • No built-in job scheduling (cron-like patterns)
  • Sequential processing limits throughput

Migration Path

If scaling beyond single-server becomes necessary, clear upgrade paths exist:
  • Redis Backend: Migrate to BullMQ for distributed processing
  • PostgreSQL: Switch to pg-boss or Graphile Worker
  • Cloud Queues: Move to AWS SQS, Google Cloud Tasks, etc.
Worker interface remains compatible, simplifying migration.

Common Use Cases

Batch Email Sending

Send emails to hundreds of users with rate limiting to avoid SMTP throttling.

CSV File Processing

Process uploaded files row-by-row, validate data, and store results.

External API Synchronization

Fetch data from third-party APIs respecting rate limits (e.g., 1 request per second).

Database Backups

Schedule periodic database backups and upload to cloud storage.

Report Generation

Generate complex reports from large datasets without blocking API requests.

Summary

The background job queue system provides a simple, reliable way to process long-running tasks in DeployStack. Built on familiar SQLite/Turso infrastructure, it requires no additional services while providing persistence, retry logic, and rate limiting. Workers follow a straightforward pattern making them easy to implement and test. For routine operations, the system handles thousands of jobs efficiently. For specialized needs requiring higher throughput or distributed processing, the architecture supports clear migration paths to more advanced solutions.
I