Skip to main content

Overview

DeployStack Backend includes @fastify/sse for Server-Sent Events support. SSE provides a simple, unidirectional communication channel from server to client over HTTP - ideal for live updates, notifications, and streaming data. The plugin is globally registered with a 30-second heartbeat interval to keep connections alive.

Naming Convention Standard

All SSE endpoints MUST follow this URL pattern:
  • REST endpoint: /api/{resource}/{action}
  • SSE endpoint: /api/{resource}/{action}/stream

URL Pattern Examples

# Client Activity
GET /api/users/me/mcp/client-activity          # REST API (polling)
GET /api/users/me/mcp/client-activity/stream   # SSE stream

# Notifications
GET /api/teams/:teamId/notifications            # REST API (polling)
GET /api/teams/:teamId/notifications/stream     # SSE stream

# Metrics
GET /api/satellites/:satelliteId/metrics        # REST API (polling)
GET /api/satellites/:satelliteId/metrics/stream # SSE stream

Paired Endpoints

Every SSE endpoint should have a corresponding REST endpoint:
  1. Same Query Parameters: Both endpoints accept identical query parameters
  2. Same Data Structure: Both return the same data format
  3. Consistent Behavior: Both apply the same filters and limits
  4. Fallback Support: REST endpoint serves as fallback for clients without SSE support

Why /stream?

  • Industry Standard: Used by GitHub, Stripe, and Twitter APIs
  • RESTful: Treats streaming as a sub-resource
  • Technology Agnostic: Works for SSE, WebSockets, or any streaming protocol
  • Clear Intent: Immediately indicates real-time streaming capability

Enabling SSE on a Route

Add the sse: true option to any route definition:
server.get('/events', { sse: true }, async (request, reply) => {
  // SSE methods available on reply.sse
})
For route-specific configuration:
server.get('/events', {
  sse: {
    heartbeat: false,  // Disable heartbeat for this route
    serializer: (data) => JSON.stringify(data)  // Custom serializer
  }
}, handler)

Sending Messages

Single Message

reply.sse.send({ data: 'Hello world' })

Full SSE Format

reply.sse.send({
  id: '123',
  event: 'user_update',
  data: { userId: 'abc', status: 'online' },
  retry: 5000  // Client retry interval in ms
})

Streaming with Async Generator

async function* generateUpdates() {
  for (let i = 0; i < 10; i++) {
    yield { data: { count: i } }
    await new Promise(r => setTimeout(r, 1000))
  }
}

reply.sse.send(generateUpdates())

Connection Management

Keep Connection Open

By default, the connection closes after the handler completes. To keep it open:
reply.sse.keepAlive()

Handle Disconnection

reply.sse.onClose(() => {
  // Cleanup logic when client disconnects
  server.log.info('Client disconnected')
})

Manual Close

reply.sse.close()

Client Reconnection

Handle reconnecting clients using the Last-Event-ID header:
reply.sse.replay(async (lastEventId) => {
  // Fetch and send missed events since lastEventId
  const missedEvents = await getMissedEvents(lastEventId)
  for (const event of missedEvents) {
    reply.sse.send(event)
  }
})
Access the last event ID directly:
const lastId = reply.sse.lastEventId

Connection State

if (reply.sse.isConnected) {
  reply.sse.send({ data: 'still connected' })
}

Complete Route Example

import { type FastifyInstance } from 'fastify'
import { requirePermission } from '../../../middleware/roleMiddleware'

export default async function sseRoute(server: FastifyInstance) {
  server.get('/notifications/stream', {
    sse: true,
    preValidation: requirePermission('notifications.read'),
    schema: {
      tags: ['Notifications'],
      summary: 'Stream notifications',
      description: 'Real-time notification stream via SSE',
      security: [{ cookieAuth: [] }]
    }
  }, async (request, reply) => {
    const userId = request.user!.id

    // Handle client reconnection
    reply.sse.replay(async (lastEventId) => {
      const missed = await notificationService.getMissedNotifications(userId, lastEventId)
      for (const notification of missed) {
        reply.sse.send({ id: notification.id, event: 'notification', data: notification })
      }
    })

    // Keep connection open
    reply.sse.keepAlive()

    // Subscribe to new notifications
    const unsubscribe = notificationService.subscribe(userId, (notification) => {
      if (reply.sse.isConnected) {
        reply.sse.send({ id: notification.id, event: 'notification', data: notification })
      }
    })

    // Cleanup on disconnect
    reply.sse.onClose(() => {
      unsubscribe()
      server.log.debug({ userId }, 'SSE connection closed')
    })
  })
}

Frontend Client

const eventSource = new EventSource('/api/notifications/stream', {
  withCredentials: true  // Include cookies for authentication
})

eventSource.addEventListener('notification', (event) => {
  const data = JSON.parse(event.data)
  console.log('New notification:', data)
})

eventSource.onerror = () => {
  // Browser automatically reconnects
  console.log('Connection lost, reconnecting...')
}

TypeScript Types

Import types from the package:
import type { SSEMessage } from '@fastify/sse'

const message: SSEMessage = {
  id: '123',
  event: 'update',
  data: { status: 'active' }
}