Skip to main content

Overview

DeployStack Backend includes @fastify/sse for Server-Sent Events support. SSE provides a simple, unidirectional communication channel from server to client over HTTP - ideal for live updates, notifications, and streaming data. The plugin is globally registered with a 30-second heartbeat interval to keep connections alive.

Naming Convention Standard

All SSE endpoints MUST follow this URL pattern:
  • REST endpoint: /api/{resource}/{action}
  • SSE endpoint: /api/{resource}/{action}/stream

URL Pattern Examples

# Client Activity
GET /api/users/me/mcp/client-activity          # REST API (polling)
GET /api/users/me/mcp/client-activity/stream   # SSE stream

# Notifications
GET /api/teams/:teamId/notifications            # REST API (polling)
GET /api/teams/:teamId/notifications/stream     # SSE stream

# Metrics
GET /api/satellites/:satelliteId/metrics        # REST API (polling)
GET /api/satellites/:satelliteId/metrics/stream # SSE stream

Paired Endpoints

Every SSE endpoint should have a corresponding REST endpoint:
  1. Same Query Parameters: Both endpoints accept identical query parameters
  2. Same Data Structure: Both return the same data format
  3. Consistent Behavior: Both apply the same filters and limits
  4. Fallback Support: REST endpoint serves as fallback for clients without SSE support

Why /stream?

  • Industry Standard: Used by GitHub, Stripe, and Twitter APIs
  • RESTful: Treats streaming as a sub-resource
  • Technology Agnostic: Works for SSE, WebSockets, or any streaming protocol
  • Clear Intent: Immediately indicates real-time streaming capability

Enabling SSE on a Route

Add the sse: true option to any route definition:
server.get('/events', { sse: true }, async (request, reply) => {
  // SSE methods available on reply.sse
})
For route-specific configuration:
server.get('/events', {
  sse: {
    heartbeat: false,  // Disable heartbeat for this route
    serializer: (data) => JSON.stringify(data)  // Custom serializer
  }
}, handler)

Sending Messages

Single Message

reply.sse.send({ data: 'Hello world' })

Full SSE Format

reply.sse.send({
  id: '123',
  event: 'user_update',
  data: { userId: 'abc', status: 'online' },
  retry: 5000  // Client retry interval in ms
})

Streaming with Async Generator

async function* generateUpdates() {
  for (let i = 0; i < 10; i++) {
    yield { data: { count: i } }
    await new Promise(r => setTimeout(r, 1000))
  }
}

reply.sse.send(generateUpdates())

Connection Management

Keep Connection Open

By default, the connection closes after the handler completes. To keep it open:
reply.sse.keepAlive()

Handle Disconnection

reply.sse.onClose(() => {
  // Cleanup logic when client disconnects
  server.log.info('Client disconnected')
})

Manual Close

reply.sse.close()

Client Reconnection

Handle reconnecting clients using the Last-Event-ID header:
reply.sse.replay(async (lastEventId) => {
  // Fetch and send missed events since lastEventId
  const missedEvents = await getMissedEvents(lastEventId)
  for (const event of missedEvents) {
    reply.sse.send(event)
  }
})
Access the last event ID directly:
const lastId = reply.sse.lastEventId

Connection State

if (reply.sse.isConnected) {
  reply.sse.send({ data: 'still connected' })
}

Complete Route Example

import { type FastifyInstance } from 'fastify'
import { requirePermission } from '../../../middleware/roleMiddleware'

export default async function sseRoute(server: FastifyInstance) {
  server.get('/notifications/stream', {
    sse: true,
    preValidation: requirePermission('notifications.read'),
    schema: {
      tags: ['Notifications'],
      summary: 'Stream notifications',
      description: 'Real-time notification stream via SSE',
      security: [{ cookieAuth: [] }]
    }
  }, async (request, reply) => {
    const userId = request.user!.id

    // Handle client reconnection
    reply.sse.replay(async (lastEventId) => {
      const missed = await notificationService.getMissedNotifications(userId, lastEventId)
      for (const notification of missed) {
        reply.sse.send({ id: notification.id, event: 'notification', data: notification })
      }
    })

    // Keep connection open
    reply.sse.keepAlive()

    // Subscribe to new notifications
    const unsubscribe = notificationService.subscribe(userId, (notification) => {
      if (reply.sse.isConnected) {
        reply.sse.send({ id: notification.id, event: 'notification', data: notification })
      }
    })

    // Cleanup on disconnect
    reply.sse.onClose(() => {
      unsubscribe()
      server.log.debug({ userId }, 'SSE connection closed')
    })
  })
}

Polling Pattern with Async Operations

When using setInterval with async database queries or API calls, you must check connection state after the async operation completes to prevent crashes.

Critical: Timestamp Tracking with Array Mutations

⚠️ COMMON BUG: When polling for new records and reversing arrays for chronological order, you must capture the newest timestamp before reversing the array:
// ❌ WRONG: Captures oldest timestamp after reverse
const newItems = await db.query().orderBy(desc(created_at))
for (const item of newItems.reverse()) {  // reverse() MUTATES the array
  reply.sse.send({ event: 'item', data: item })
}
lastSentTimestamp = newItems[0].created_at  // BUG: Now points to OLDEST item!

// ✅ CORRECT: Capture newest timestamp before reversing
const newItems = await db.query().orderBy(desc(created_at))
const newestTimestamp = newItems[0].created_at  // Capture FIRST (newest)
for (const item of newItems.reverse()) {
  reply.sse.send({ event: 'item', data: item })
}
lastSentTimestamp = newestTimestamp  // Use captured newest timestamp
Why this matters:
  • Query returns items in descending order (newest first): [newest, ..., oldest]
  • array.reverse() mutates the array: [oldest, ..., newest]
  • After reversal, array[0] points to the oldest item
  • Using array[0].created_at after reversal sets lastSentTimestamp to the oldest timestamp
  • Next poll finds the same items again → infinite duplicate stream
The Fix: Always capture the newest timestamp from array[0] before calling reverse().

Complete Polling Pattern Example

import { type FastifyInstance } from 'fastify'
import { getDb, getSchema } from '../../../db'
import { eq, and, desc, gt } from 'drizzle-orm'

export default async function pollingStreamRoute(server: FastifyInstance) {
  server.get('/logs/stream', {
    sse: true,
    preValidation: requirePermission('logs.view'),
    schema: {
      tags: ['Logs'],
      summary: 'Stream logs via SSE with polling',
      security: [{ cookieAuth: [] }]
    }
  }, async (request, reply) => {
    const userId = request.user!.id
    let pollInterval: NodeJS.Timeout | null = null

    const db = getDb()
    const { logs } = getSchema()

    // Keep connection open
    reply.sse.keepAlive()

    // Send initial snapshot
    const initialLogs = await db
      .select()
      .from(logs)
      .where(eq(logs.user_id, userId))
      .orderBy(desc(logs.created_at))
      .limit(50)

    reply.sse.send({
      event: 'snapshot',
      data: { logs: initialLogs }
    })

    // Track last sent timestamp for polling
    let lastSentTimestamp = initialLogs[0]?.created_at || new Date(0)

    // Poll for new logs every 3 seconds
    pollInterval = setInterval(async () => {
      // Check #1: Before starting async work
      if (!reply.sse.isConnected) {
        if (pollInterval) clearInterval(pollInterval)
        return
      }

      try {
        // Query for new logs (newest first)
        const newLogs = await db
          .select()
          .from(logs)
          .where(and(
            eq(logs.user_id, userId),
            gt(logs.created_at, lastSentTimestamp)
          ))
          .orderBy(desc(logs.created_at))
          .limit(100)

        // ⚠️ CRITICAL: Check #2 after async operation completes
        if (!reply.sse.isConnected) {
          if (pollInterval) clearInterval(pollInterval)
          return
        }

        if (newLogs.length > 0) {
          // ⚠️ CRITICAL: Capture newest timestamp BEFORE reversing
          const newestTimestamp = newLogs[0].created_at

          // Send logs in chronological order (oldest first)
          for (const log of newLogs.reverse()) {
            // Check before each send
            if (!reply.sse.isConnected) {
              if (pollInterval) clearInterval(pollInterval)
              return
            }
            reply.sse.send({
              id: log.id,
              event: 'log',
              data: log
            })
          }

          // Update to newest timestamp (captured before reversal)
          lastSentTimestamp = newestTimestamp
        }
      } catch (error) {
        server.log.error(error, 'Failed to poll for new logs')
        // Don't crash - just log the error
      }
    }, 3000)

    // Cleanup on disconnect
    reply.sse.onClose(() => {
      if (pollInterval) {
        clearInterval(pollInterval)
        pollInterval = null
      }
      server.log.debug({ userId }, 'Log stream closed')
    })
  })
}

Why Multiple Checks Are Needed

Without the second check after async operations:
Time 0ms:   setInterval fires → Check isConnected ✅ (connected)
Time 5ms:   Start database query (async)
Time 50ms:  Client disconnects → isConnected = false
Time 100ms: Query completes → Call send() → CRASH! ❌
With proper checks:
Time 0ms:   setInterval fires → Check #1 ✅ (connected)
Time 5ms:   Start database query (async)
Time 50ms:  Client disconnects → isConnected = false
Time 100ms: Query completes → Check #2 ✅ (disconnected) → Return early ✅
The client can disconnect during async operations, so checking only at the start of the interval is insufficient.

Key Takeaways for Polling Patterns

When implementing SSE with polling:
  1. ✅ Always check reply.sse.isConnected after async operations - Client can disconnect during queries
  2. ✅ Capture timestamps before array mutations - array.reverse() mutates the array
  3. ✅ Use gt() (greater than) for timestamp filtering - Prevents re-sending same items
  4. ✅ Clear interval on disconnect - Prevent memory leaks and unnecessary queries
  5. ✅ Send in chronological order - Oldest first for natural reading experience
  6. ✅ Wrap polling logic in try-catch - Don’t crash on database errors
Common Pitfalls:
  • ❌ Using lastSentTimestamp = array[0] after array.reverse()
  • ❌ Only checking isConnected before async operations
  • ❌ Forgetting to clear interval in onClose handler
  • ❌ Not handling database errors gracefully

Frontend Client

const eventSource = new EventSource('/api/notifications/stream', {
  withCredentials: true  // Include cookies for authentication
})

eventSource.addEventListener('notification', (event) => {
  const data = JSON.parse(event.data)
  console.log('New notification:', data)
})

eventSource.onerror = () => {
  // Browser automatically reconnects
  console.log('Connection lost, reconnecting...')
}

TypeScript Types

Import types from the package:
import type { SSEMessage } from '@fastify/sse'

const message: SSEMessage = {
  id: '123',
  event: 'update',
  data: { status: 'active' }
}