import { type FastifyInstance } from 'fastify'
import { getDb, getSchema } from '../../../db'
import { eq, and, desc, gt } from 'drizzle-orm'
export default async function pollingStreamRoute(server: FastifyInstance) {
server.get('/logs/stream', {
sse: true,
preValidation: requirePermission('logs.view'),
schema: {
tags: ['Logs'],
summary: 'Stream logs via SSE with polling',
security: [{ cookieAuth: [] }]
}
}, async (request, reply) => {
const userId = request.user!.id
let pollInterval: NodeJS.Timeout | null = null
const db = getDb()
const { logs } = getSchema()
// Keep connection open
reply.sse.keepAlive()
// Send initial snapshot
const initialLogs = await db
.select()
.from(logs)
.where(eq(logs.user_id, userId))
.orderBy(desc(logs.created_at))
.limit(50)
reply.sse.send({
event: 'snapshot',
data: { logs: initialLogs }
})
// Track last sent timestamp for polling
let lastSentTimestamp = initialLogs[0]?.created_at || new Date(0)
// Poll for new logs every 3 seconds
pollInterval = setInterval(async () => {
// Check #1: Before starting async work
if (!reply.sse.isConnected) {
if (pollInterval) clearInterval(pollInterval)
return
}
try {
// Query for new logs (newest first)
const newLogs = await db
.select()
.from(logs)
.where(and(
eq(logs.user_id, userId),
gt(logs.created_at, lastSentTimestamp)
))
.orderBy(desc(logs.created_at))
.limit(100)
// ⚠️ CRITICAL: Check #2 after async operation completes
if (!reply.sse.isConnected) {
if (pollInterval) clearInterval(pollInterval)
return
}
if (newLogs.length > 0) {
// ⚠️ CRITICAL: Capture newest timestamp BEFORE reversing
const newestTimestamp = newLogs[0].created_at
// Send logs in chronological order (oldest first)
for (const log of newLogs.reverse()) {
// Check before each send
if (!reply.sse.isConnected) {
if (pollInterval) clearInterval(pollInterval)
return
}
reply.sse.send({
id: log.id,
event: 'log',
data: log
})
}
// Update to newest timestamp (captured before reversal)
lastSentTimestamp = newestTimestamp
}
} catch (error) {
server.log.error(error, 'Failed to poll for new logs')
// Don't crash - just log the error
}
}, 3000)
// Cleanup on disconnect
reply.sse.onClose(() => {
if (pollInterval) {
clearInterval(pollInterval)
pollInterval = null
}
server.log.debug({ userId }, 'Log stream closed')
})
})
}