Realtime
WebSocket-based CDC streaming, ephemeral broadcast messaging, presence tracking, and cron job scheduling for AtlasDB tenants.
Overview
The Realtime Engine delivers three capabilities over a single WebSocket connection per client:
- Postgres Changes — stream INSERT, UPDATE, and DELETE events from tenant tables, filtered by RLS policies, with sub-100ms p95 latency.
- Broadcast — ephemeral, low-latency client-to-client messaging with no persistence (at-most-once delivery via NATS Core).
- Cron Jobs — scheduled HTTP endpoint invocation using
pg_cronandpg_netinside PostgreSQL.
The engine lives in the atlas-realtime crate and mounts under /realtime/*. CDC events flow through a pipeline of PostgreSQL WAL, NATS JetStream, and in-memory RLS evaluation before reaching the client.
Performance Targets
| Metric | Target |
|---|---|
| CDC event delivery (DB write to client) | < 100ms p95 |
| Broadcast delivery (client to client) | < 20ms p95 |
| WebSocket connection setup | < 50ms |
| Max concurrent connections per replica | 10,000 |
| CDC replication lag alert threshold | 100MB WAL |
Key Concepts
CDC Pipeline
The Change Data Capture pipeline captures every row-level change and delivers it to subscribed clients:
PostgreSQL WAL → CDC Listener (pgwire-replication) → NATS JetStream (ATLAS_CDC stream)
│
▼
WebSocket Manager
(subscription matching + RLS filter)
│
▼
Client WebSocket
The CDC listener connects to PostgreSQL using the logical replication protocol (pgoutput plugin). It decodes WAL messages (Begin, Relation, Insert, Update, Delete, Commit), maps the internal tenant schema name (proj_*) to a project_id, and publishes structured RealtimeEvent payloads to NATS JetStream.
NATS subject format: atlas.cdc.{project_id}.{schema}.{table}.{event_type}
The ATLAS_CDC stream retains events for up to 1 hour or 100MB (limits-based retention), allowing late-joining consumers to catch up.
RLS Enforcement on CDC Events
PostgreSQL Row Level Security only applies to SQL queries, not to CDC events. The Realtime fan-out layer evaluates the same RLS predicates against each event payload before delivering it over WebSocket. Predicates are cached in memory (invalidated on policy change) so there is no database round-trip per event.
Broadcast Channels
Broadcast uses NATS Core (plain pub/sub) for at-most-once delivery with the lowest possible latency. Messages are published to atlas.broadcast.{project_id}.{topic} and delivered to all server replicas subscribed to that project's wildcard.
Use cases include cursor positions, typing indicators, collaborative editing signals, game state, and live polls.
Clients must join a channel before sending or receiving. Authorization modes:
| Mode | Behavior |
|---|---|
open (default) | Any authenticated client can join any channel |
token | Channel name must appear in the JWT allowed_channels claim |
Presence Tracking
Presence is backed by Redis. Clients call track() to register their state (e.g., user ID, online timestamp) on a channel. The server maintains a per-channel presence map and broadcasts presence_state (full sync) and presence_diff (incremental joins/leaves) events.
Cron Jobs
The cron subsystem provisions pg_cron and pg_net extensions on tenant schemas. Jobs are managed via REST API and execute inside PostgreSQL, invoking HTTP endpoints (including edge functions) on a cron schedule. The spawn_cron_scheduler background task evaluates cron triggers every minute.
Connection Limits by Tier
| Tier | Max WebSocket Connections |
|---|---|
| Free | 200 |
| Pro | 5,000 |
| Enterprise | 50,000 |
Exceeding the limit returns a 503 on WebSocket upgrade.
Replication Slot Management
| Scenario | Behavior |
|---|---|
| Shared tenant DB (free tier) | Single slot atlas_cdc_shared, always active |
| Dedicated DB (paid tier) | Slot atlas_cdc_{project_id}, created on first subscription, dropped after 30 min idle |
| Server restart | Re-attach to existing slots, resume from last confirmed LSN |
| Orphan detection | On startup, drop any atlas_cdc_* slots with no active connection |
API Reference
HTTP Endpoints
| Method | Path | Description | Auth |
|---|---|---|---|
GET | /realtime/socket | WebSocket upgrade for realtime connection | API Key |
Client-to-Server Message Types
| Type | Description |
|---|---|
join | Subscribe to a channel (postgres_changes and/or broadcast) |
leave | Unsubscribe from a channel |
broadcast | Send a message to all channel subscribers |
heartbeat | Keep-alive response to server ping |
access_token | Send or refresh JWT for RLS evaluation |
Server-to-Client Message Types
| Type | Description |
|---|---|
system | Connection events (connected, error) |
join_reply | Acknowledgement of a join request (ok or error) |
postgres_changes | CDC event with event, schema, table, old_record, new_record, commit_timestamp |
broadcast | Forwarded broadcast message with event and payload |
presence_state | Full presence state for a channel |
presence_diff | Presence joins and leaves since last sync |
Join Message Schema
interface JoinMessage {
type: "join"
topic: string // e.g. "realtime:public:orders" or "room:lobby"
ref: string // client-generated ref for correlation
config: {
broadcast?: {
self: boolean // receive own broadcast messages (default: false)
ack: boolean // server acknowledges broadcast sends (default: false)
}
postgres_changes?: Array<{
event: "INSERT" | "UPDATE" | "DELETE" | "*"
schema: string // default: "public" (mapped to tenant schema internally)
table: string
filter?: string // PostgREST filter syntax, e.g. "user_id=eq.123"
}>
}
}
Channel Topic Format
realtime:{schema}:{table}
Example: realtime:public:todos subscribes to all changes on the todos table. The internal tenant schema (proj_*) is mapped to public in the user-facing API.
Code Examples
Subscribe to Table Changes (JavaScript SDK)
import { createClient } from '@altbasedb/sdk'
const client = createClient('http://localhost:3000', ANON_KEY)
const channel = client.realtime
.channel('realtime:public:todos')
.on('postgres_changes', {
event: 'INSERT',
schema: 'public',
table: 'todos'
}, (payload) => {
console.log('New todo:', payload.new_record)
})
.subscribe()
Track Presence
channel
.on('presence', { event: 'sync' }, () => {
const state = channel.presenceState()
console.log('Online users:', Object.keys(state).length)
})
.track({ user_id: 'user-123', online_at: new Date() })
Broadcast (Client-to-Client)
// Send a broadcast message
channel.send({
type: 'broadcast',
event: 'cursor_move',
payload: { x: 100, y: 200 }
})
// Receive broadcast messages
channel.on('broadcast', { event: 'cursor_move' }, (msg) => {
console.log('Cursor:', msg.payload)
})
Raw WebSocket via wscat
wscat -c "ws://localhost:3000/realtime/socket" \
-H "Authorization: Bearer $ANON_KEY"
# Join a channel with postgres_changes:
{"type":"join","topic":"realtime:public:todos","ref":"1","config":{"postgres_changes":[{"event":"*","schema":"public","table":"todos"}]}}
# Send a broadcast:
{"type":"broadcast","topic":"room:lobby","event":"typing","payload":{"user":"alice"}}
Configuration
| Variable | Default | Description |
|---|---|---|
ATLAS_REALTIME_MAX_CONNS_FREE | 200 | WebSocket limit for free tier |
ATLAS_REALTIME_MAX_CONNS_PRO | 5000 | WebSocket limit for pro tier |
ATLAS_REALTIME_MAX_CONNS_ENTERPRISE | 50000 | WebSocket limit for enterprise tier |
ATLAS_REALTIME_HEARTBEAT_INTERVAL | 30 | Heartbeat ping interval in seconds |
ATLAS_REALTIME_HEARTBEAT_TIMEOUT | 10 | Seconds to wait for heartbeat pong before disconnect |
ATLAS_NATS_URL | nats://localhost:4222 | NATS server URL |
Broadcast Limits
| Limit | Value |
|---|---|
| Max payload size | 32 KB |
| Max broadcasts per second per connection | 20 |
| Max channels per connection | 100 |
PostgreSQL Replication Settings
max_replication_slots = 10
max_slot_wal_keep_size = '4GB'
wal_level = logical
To receive old row values on UPDATE and DELETE, enable full replica identity on tables that need realtime:
ALTER TABLE {schema}.{table} REPLICA IDENTITY FULL;
This increases WAL volume. Only enable on tables with active realtime subscriptions.
How It Works
CDC Event Delivery Flow
- A row is inserted, updated, or deleted in a tenant table.
- PostgreSQL WAL captures the change via the
pgoutputlogical replication plugin. - The CDC listener (a background Tokio task using
pgwire-replication) decodes the binary WAL stream intoRealtimeEventstructs containingproject_id,schema,table,event_type,old_record,new_record, andcommit_timestamp. - The event is published to NATS JetStream on subject
atlas.cdc.{project_id}.{schema}.{table}.{event_type}. - The WebSocket Manager receives the event via a push consumer subscribed to
atlas.cdc.{project_id}.>. - It matches the event against active subscriptions (by table and event filter).
- For each matched subscription, the cached RLS predicates are evaluated against the row data and the connection's JWT claims.
- If evaluation passes (or no predicates exist), the event is serialized and delivered as a
postgres_changesWebSocket frame.
Broadcast Flow
- Client A sends a
broadcastmessage on topicroom:123. - Server validates that Client A has joined
room:123. - Message is published to NATS Core subject
atlas.broadcast.{project_id}.room:123. - All server replicas subscribed to
atlas.broadcast.{project_id}.>receive the message. - Each replica finds local WebSocket connections joined to
room:123and forwards the message. - If the sender configured
self: false, the originating connection is excluded.
Heartbeat and Auto-Reconnect
The server sends a heartbeat ping every 30 seconds (configurable). If the client does not respond with a heartbeat pong within 10 seconds, the connection is closed. Client SDKs implement automatic reconnection with exponential backoff and re-subscribe to all previously joined channels on reconnect.
Connection Lifecycle
- Client upgrades HTTP to WebSocket at
/realtime/socketwith API key (via query param or Authorization header). - Server validates the API key, extracts
TenantContext(project_id, schema, tier), and checks the connection count limit for the project's tier. - Server sends a
systemevent withconnectedstatus. - Client joins zero or more channels via
joinmessages. - Event loop: server delivers CDC events and broadcast messages; client sends broadcasts and heartbeat pongs.
- On disconnect (client close, heartbeat timeout, or error), subscriptions are cleaned up and the connection is removed from the registry.
Troubleshooting
| Problem | Cause | Fix |
|---|---|---|
| 503 on WebSocket upgrade | Connection limit reached for tier | Upgrade tier or reduce active connections |
| No events after subscribe | Schema mismatch (proj_* vs public) | Use public as the schema in subscription config |
| Events stop after token expires | JWT expiry check blocks delivery | Send an access_token message with a fresh JWT |
| Heartbeat timeout disconnection | Client not responding to pings | Ensure client sends heartbeat responses within 10s |
| Presence not syncing | Redis not connected | Verify Redis connectivity and configuration |
| Broadcast messages not delivered | Client has not joined the channel | Call join on the topic before sending or receiving |
| High WAL accumulation | Replication slot not consuming | Check CDC listener health and NATS connectivity |
Storage
Buckets, object CRUD, multipart uploads, TUS v1.0.0 protocol, storage providers (filesystem, S3, Azure Blob), signed URLs, image transforms, storage policies, and background cleanup.
Events & Jobs
NATS-backed event publishing, durable JetStream streams, pull-based consumers, and background job processing with SSRF protection.