Documentation Index
Fetch the complete documentation index at: https://docs.deploystack.io/llms.txt
Use this file to discover all available pages before exploring further.
Overview
DeployStack Backend includes @fastify/sse for Server-Sent Events support. SSE provides a simple, unidirectional communication channel from server to client over HTTP - ideal for live updates, notifications, and streaming data.
The plugin is globally registered with a 30-second heartbeat interval to keep connections alive.
Naming Convention Standard
All SSE endpoints MUST follow this URL pattern:
- REST endpoint:
/api/{resource}/{action}
- SSE endpoint:
/api/{resource}/{action}/stream
URL Pattern Examples
# Client Activity
GET /api/users/me/mcp/client-activity # REST API (polling)
GET /api/users/me/mcp/client-activity/stream # SSE stream
# Notifications
GET /api/teams/:teamId/notifications # REST API (polling)
GET /api/teams/:teamId/notifications/stream # SSE stream
# Metrics
GET /api/satellites/:satelliteId/metrics # REST API (polling)
GET /api/satellites/:satelliteId/metrics/stream # SSE stream
Paired Endpoints
Every SSE endpoint should have a corresponding REST endpoint:
- Same Query Parameters: Both endpoints accept identical query parameters
- Same Data Structure: Both return the same data format
- Consistent Behavior: Both apply the same filters and limits
- Fallback Support: REST endpoint serves as fallback for clients without SSE support
Why /stream?
- Industry Standard: Used by GitHub, Stripe, and Twitter APIs
- RESTful: Treats streaming as a sub-resource
- Technology Agnostic: Works for SSE, WebSockets, or any streaming protocol
- Clear Intent: Immediately indicates real-time streaming capability
Enabling SSE on a Route
Add the sse: true option to any route definition:
server.get('/events', { sse: true }, async (request, reply) => {
// SSE methods available on reply.sse
})
For route-specific configuration:
server.get('/events', {
sse: {
heartbeat: false, // Disable heartbeat for this route
serializer: (data) => JSON.stringify(data) // Custom serializer
}
}, handler)
Sending Messages
Single Message
reply.sse.send({ data: 'Hello world' })
reply.sse.send({
id: '123',
event: 'user_update',
data: { userId: 'abc', status: 'online' },
retry: 5000 // Client retry interval in ms
})
Streaming with Async Generator
async function* generateUpdates() {
for (let i = 0; i < 10; i++) {
yield { data: { count: i } }
await new Promise(r => setTimeout(r, 1000))
}
}
reply.sse.send(generateUpdates())
Connection Management
Keep Connection Open
By default, the connection closes after the handler completes. To keep it open:
Handle Disconnection
reply.sse.onClose(() => {
// Cleanup logic when client disconnects
server.log.info('Client disconnected')
})
Manual Close
Client Reconnection
Handle reconnecting clients using the Last-Event-ID header:
reply.sse.replay(async (lastEventId) => {
// Fetch and send missed events since lastEventId
const missedEvents = await getMissedEvents(lastEventId)
for (const event of missedEvents) {
reply.sse.send(event)
}
})
Access the last event ID directly:
const lastId = reply.sse.lastEventId
Connection State
if (reply.sse.isConnected) {
reply.sse.send({ data: 'still connected' })
}
Complete Route Example
import { type FastifyInstance } from 'fastify'
import { requirePermission } from '../../../middleware/roleMiddleware'
export default async function sseRoute(server: FastifyInstance) {
server.get('/notifications/stream', {
sse: true,
preValidation: requirePermission('notifications.read'),
schema: {
tags: ['Notifications'],
summary: 'Stream notifications',
description: 'Real-time notification stream via SSE',
security: [{ cookieAuth: [] }]
}
}, async (request, reply) => {
const userId = request.user!.id
// Handle client reconnection
reply.sse.replay(async (lastEventId) => {
const missed = await notificationService.getMissedNotifications(userId, lastEventId)
for (const notification of missed) {
reply.sse.send({ id: notification.id, event: 'notification', data: notification })
}
})
// Keep connection open
reply.sse.keepAlive()
// Subscribe to new notifications
const unsubscribe = notificationService.subscribe(userId, (notification) => {
if (reply.sse.isConnected) {
reply.sse.send({ id: notification.id, event: 'notification', data: notification })
}
})
// Cleanup on disconnect
reply.sse.onClose(() => {
unsubscribe()
server.log.debug({ userId }, 'SSE connection closed')
})
})
}
Polling Pattern with Async Operations
When using setInterval with async database queries or API calls, you must check connection state after the async operation completes to prevent crashes.
Critical: Timestamp Tracking with Array Mutations
⚠️ COMMON BUG: When polling for new records and reversing arrays for chronological order, you must capture the newest timestamp before reversing the array:
// ❌ WRONG: Captures oldest timestamp after reverse
const newItems = await db.query().orderBy(desc(created_at))
for (const item of newItems.reverse()) { // reverse() MUTATES the array
reply.sse.send({ event: 'item', data: item })
}
lastSentTimestamp = newItems[0].created_at // BUG: Now points to OLDEST item!
// ✅ CORRECT: Capture newest timestamp before reversing
const newItems = await db.query().orderBy(desc(created_at))
const newestTimestamp = newItems[0].created_at // Capture FIRST (newest)
for (const item of newItems.reverse()) {
reply.sse.send({ event: 'item', data: item })
}
lastSentTimestamp = newestTimestamp // Use captured newest timestamp
Why this matters:
- Query returns items in descending order (newest first):
[newest, ..., oldest]
array.reverse() mutates the array: [oldest, ..., newest]
- After reversal,
array[0] points to the oldest item
- Using
array[0].created_at after reversal sets lastSentTimestamp to the oldest timestamp
- Next poll finds the same items again → infinite duplicate stream
The Fix:
Always capture the newest timestamp from array[0] before calling reverse().
Complete Polling Pattern Example
import { type FastifyInstance } from 'fastify'
import { getDb, getSchema } from '../../../db'
import { eq, and, desc, gt } from 'drizzle-orm'
export default async function pollingStreamRoute(server: FastifyInstance) {
server.get('/logs/stream', {
sse: true,
preValidation: requirePermission('logs.view'),
schema: {
tags: ['Logs'],
summary: 'Stream logs via SSE with polling',
security: [{ cookieAuth: [] }]
}
}, async (request, reply) => {
const userId = request.user!.id
let pollInterval: NodeJS.Timeout | null = null
const db = getDb()
const { logs } = getSchema()
// Keep connection open
reply.sse.keepAlive()
// Send initial snapshot
const initialLogs = await db
.select()
.from(logs)
.where(eq(logs.user_id, userId))
.orderBy(desc(logs.created_at))
.limit(50)
reply.sse.send({
event: 'snapshot',
data: { logs: initialLogs }
})
// Track last sent timestamp for polling
let lastSentTimestamp = initialLogs[0]?.created_at || new Date(0)
// Poll for new logs every 3 seconds
pollInterval = setInterval(async () => {
// Check #1: Before starting async work
if (!reply.sse.isConnected) {
if (pollInterval) clearInterval(pollInterval)
return
}
try {
// Query for new logs (newest first)
const newLogs = await db
.select()
.from(logs)
.where(and(
eq(logs.user_id, userId),
gt(logs.created_at, lastSentTimestamp)
))
.orderBy(desc(logs.created_at))
.limit(100)
// ⚠️ CRITICAL: Check #2 after async operation completes
if (!reply.sse.isConnected) {
if (pollInterval) clearInterval(pollInterval)
return
}
if (newLogs.length > 0) {
// ⚠️ CRITICAL: Capture newest timestamp BEFORE reversing
const newestTimestamp = newLogs[0].created_at
// Send logs in chronological order (oldest first)
for (const log of newLogs.reverse()) {
// Check before each send
if (!reply.sse.isConnected) {
if (pollInterval) clearInterval(pollInterval)
return
}
reply.sse.send({
id: log.id,
event: 'log',
data: log
})
}
// Update to newest timestamp (captured before reversal)
lastSentTimestamp = newestTimestamp
}
} catch (error) {
server.log.error(error, 'Failed to poll for new logs')
// Don't crash - just log the error
}
}, 3000)
// Cleanup on disconnect
reply.sse.onClose(() => {
if (pollInterval) {
clearInterval(pollInterval)
pollInterval = null
}
server.log.debug({ userId }, 'Log stream closed')
})
})
}
Why Multiple Checks Are Needed
Without the second check after async operations:
Time 0ms: setInterval fires → Check isConnected ✅ (connected)
Time 5ms: Start database query (async)
Time 50ms: Client disconnects → isConnected = false
Time 100ms: Query completes → Call send() → CRASH! ❌
With proper checks:
Time 0ms: setInterval fires → Check #1 ✅ (connected)
Time 5ms: Start database query (async)
Time 50ms: Client disconnects → isConnected = false
Time 100ms: Query completes → Check #2 ✅ (disconnected) → Return early ✅
The client can disconnect during async operations, so checking only at the start of the interval is insufficient.
Key Takeaways for Polling Patterns
When implementing SSE with polling:
- ✅ Always check
reply.sse.isConnected after async operations - Client can disconnect during queries
- ✅ Capture timestamps before array mutations -
array.reverse() mutates the array
- ✅ Use
gt() (greater than) for timestamp filtering - Prevents re-sending same items
- ✅ Clear interval on disconnect - Prevent memory leaks and unnecessary queries
- ✅ Send in chronological order - Oldest first for natural reading experience
- ✅ Wrap polling logic in try-catch - Don’t crash on database errors
Common Pitfalls:
- ❌ Using
lastSentTimestamp = array[0] after array.reverse()
- ❌ Only checking
isConnected before async operations
- ❌ Forgetting to clear interval in
onClose handler
- ❌ Not handling database errors gracefully
Frontend Client
const eventSource = new EventSource('/api/notifications/stream', {
withCredentials: true // Include cookies for authentication
})
eventSource.addEventListener('notification', (event) => {
const data = JSON.parse(event.data)
console.log('New notification:', data)
})
eventSource.onerror = () => {
// Browser automatically reconnects
console.log('Connection lost, reconnecting...')
}
TypeScript Types
Import types from the package:
import type { SSEMessage } from '@fastify/sse'
const message: SSEMessage = {
id: '123',
event: 'update',
data: { status: 'active' }
}