Events & Jobs
NATS-backed event publishing, durable JetStream streams, pull-based consumers, and background job processing with SSRF protection.
Overview
AtlasDB provides two complementary systems for asynchronous messaging and task processing:
- Events — a NATS-backed pub/sub and durable streaming API. Publish events for real-time fan-out, create persistent streams with retention policies, and consume messages with pull-based consumers.
- Background Jobs — a NATS JetStream work queue for asynchronous task processing with webhook callbacks, status tracking, and SSRF protection on outbound requests.
Both systems live in the atlas-events and atlas-jobs crates respectively, sharing the same NATS infrastructure.
Key Concepts
Fire-and-Forget Pub/Sub
The pub/sub API publishes events to NATS Core with no persistence. If no subscriber is online when the event is published, the message is dropped. This is designed for ephemeral notifications, real-time UI updates, and fan-out to active consumers.
Subject namespacing: atlasdb.{project_id}.{environment_id}.{subject}
The {environment_id} segment isolates dev, staging, and prod events within the same project.
Durable JetStream Streams
For events that must not be lost (payment processing, audit logs, order tracking), you create a durable stream. NATS JetStream persists all messages matching the stream's subject patterns, with configurable retention by size or age.
Consumers are created with durable names for at-least-once delivery. Pull-based consumption gives clients control over their processing rate, preventing message buildup when a consumer is slow.
Subject namespacing for streams: atlasdb.streams.{project_id}.{environment_id}.{stream_name}
Subject Filtering
Both pub/sub and streams support NATS wildcard subjects:
*matches a single token:payment.*matchespayment.completedbut notpayment.us.completed>matches one or more tokens:payment.>matchespayment.completed,payment.us.completed, etc.
Consumers can apply a filter_subject to receive only a subset of messages from a stream.
Message Replay
Durable streams retain messages according to the configured max_bytes and max_age_secs. New consumers start from the beginning of the stream by default, replaying all retained messages. This enables late-joining services to catch up on historical events.
Background Jobs
The jobs system provides a queue abstraction over NATS JetStream. Jobs are enqueued with a type, JSON payload, optional delay, and optional webhook callback URL. Workers dequeue from the NATS stream, execute the job, and update status in PostgreSQL.
Job subject namespacing: atlasdb.jobs.{project_id}.{environment_id}.{queue}
SSRF Protection
Jobs can include webhook callback URLs for completion notifications. The SSRF protection module validates that callback URLs do not target internal services:
- Blocks
localhost,127.0.0.1,::1, and all private IP ranges (10.0.0.0/8,172.16.0.0/12,192.168.0.0/16) - Blocks cloud metadata endpoints (
169.254.169.254) - Blocks internal Docker/Kubernetes DNS names
- Only allows
httpandhttpsschemes
API Reference
Events Endpoints
| Method | Path | Description | Auth |
|---|---|---|---|
POST | /events/v1/publish | Publish a fire-and-forget event | API Key |
POST | /events/v1/streams | Create a durable stream | API Key |
POST | /events/v1/streams/{stream_name}/publish | Publish to a stream | API Key |
POST | /events/v1/streams/{stream_name}/consumers | Create a consumer on a stream | API Key |
POST | /events/v1/streams/{stream_name}/consumers/{consumer_name}/pull | Pull messages from a consumer | API Key |
Jobs Endpoints
| Method | Path | Description | Auth |
|---|---|---|---|
POST | /jobs/v1/enqueue | Enqueue a background job | API Key |
GET | /jobs/v1/status/{job_id} | Get job status | API Key |
PublishRequest
| Field | Type | Required | Description |
|---|---|---|---|
subject | String | Yes | Event subject/topic |
payload | JSON | Yes | Event data |
CreateStreamRequest
| Field | Type | Required | Description |
|---|---|---|---|
name | String | Yes | Stream name |
subjects | String[] | Yes | Subject patterns to capture |
max_bytes | Integer | No | Maximum stream size in bytes |
max_age_secs | Integer | No | Maximum message retention in seconds |
CreateConsumerRequest
| Field | Type | Required | Description |
|---|---|---|---|
name | String | Yes | Consumer name (durable) |
filter_subject | String | No | Subject filter pattern |
EnqueueRequest
| Field | Type | Required | Description |
|---|---|---|---|
type | String | Yes | Job type identifier |
payload | JSON | Yes | Job data |
webhook_url | String | No | Callback URL on completion (SSRF-validated) |
delay_seconds | Integer | No | Delay before processing |
JobStatus Response
| Field | Type | Description |
|---|---|---|
id | UUID | Job ID |
status | String | pending, running, completed, or failed |
result | JSON | Job result (on completion) |
error | String | Error message (on failure) |
created_at | DateTime | When the job was enqueued |
completed_at | DateTime | When the job finished |
Code Examples
Publish a Fire-and-Forget Event
curl -X POST http://localhost:3000/events/v1/publish \
-H "Authorization: Bearer $SERVICE_KEY" \
-H "Content-Type: application/json" \
-d '{
"subject": "user.signup",
"payload": {
"user_id": "abc-123",
"email": "user@example.com"
}
}'
Create a Durable Stream
curl -X POST http://localhost:3000/events/v1/streams \
-H "Authorization: Bearer $SERVICE_KEY" \
-H "Content-Type: application/json" \
-d '{
"name": "payments",
"subjects": ["payment.>"],
"max_age_secs": 86400
}'
Publish to a Stream
curl -X POST http://localhost:3000/events/v1/streams/payments/publish \
-H "Authorization: Bearer $SERVICE_KEY" \
-H "Content-Type: application/json" \
-d '{
"subject": "payment.completed",
"payload": {
"order_id": "ord-456",
"amount": 99.99
}
}'
Create a Consumer and Pull Messages
# Create a durable consumer
curl -X POST http://localhost:3000/events/v1/streams/payments/consumers \
-H "Authorization: Bearer $SERVICE_KEY" \
-H "Content-Type: application/json" \
-d '{"name": "invoice-generator"}'
# Pull messages (default batch size: 10)
curl -X POST http://localhost:3000/events/v1/streams/payments/consumers/invoice-generator/pull \
-H "Authorization: Bearer $SERVICE_KEY"
Enqueue a Background Job
curl -X POST http://localhost:3000/jobs/v1/enqueue \
-H "Authorization: Bearer $SERVICE_KEY" \
-H "Content-Type: application/json" \
-d '{
"type": "send_email",
"payload": {
"to": "user@example.com",
"template": "welcome"
},
"webhook_url": "https://myapp.com/webhooks/job-complete"
}'
Check Job Status
curl http://localhost:3000/jobs/v1/status/$JOB_ID \
-H "Authorization: Bearer $SERVICE_KEY"
# Response:
# {
# "id": "...",
# "status": "completed",
# "result": {"sent": true},
# "completed_at": "2026-03-23T12:00:00Z"
# }
JavaScript SDK Usage
import { createClient } from '@altbasedb/sdk'
const client = createClient('http://localhost:3000', SERVICE_KEY)
// Publish an event
await client.events.publish('user.signup', {
user_id: 'abc-123',
email: 'user@example.com'
})
// Enqueue a job
const { id } = await client.jobs.enqueue({
type: 'send_email',
payload: { to: 'user@example.com', template: 'welcome' },
webhookUrl: 'https://myapp.com/webhooks/job-complete'
})
// Check status
const status = await client.jobs.status(id)
console.log(status.status) // "completed"
Configuration
| Variable | Default | Description |
|---|---|---|
ATLAS_NATS_URL | nats://localhost:4222 | NATS server URL |
NATS JetStream Streams (created at server startup)
| Stream | Subjects | Purpose |
|---|---|---|
CDC | atlas.cdc.> | CDC events (shared with realtime) |
JOBS | atlasdb.jobs.> | Background job queue |
EVENTS | atlasdb.streams.> | User-created durable streams |
Rate Limits by Tier
| Tier | Job Enqueues/min | Events Messages/sec |
|---|---|---|
| Free | 20 | Rate-limited by throughput |
| Pro | 200 | Rate-limited by throughput |
| Enterprise | 2,000 | Rate-limited by throughput |
How It Works
Pub/Sub Flow
- Client publishes to
/events/v1/publishwith a subject and payload. - The subject is namespaced to
atlasdb.{project_id}.{environment_id}.{subject}. - NATS delivers the message to all active subscribers on that subject.
- No persistence. If no subscriber is online, the message is dropped.
Durable Stream Flow
- Client creates a stream with subject patterns and retention config via
/events/v1/streams. - NATS JetStream persists all messages matching the configured subject patterns.
- Consumers are created with durable names for at-least-once delivery.
- Pull requests return batches of messages (default: 10 per pull).
- Messages are acknowledged after processing. Unacknowledged messages are redelivered.
Job Processing Flow
- Client enqueues a job via
POST /jobs/v1/enqueue. - Job metadata is persisted to the control plane database with status
pending. - The job message is published to NATS JetStream on subject
atlasdb.jobs.{project_id}.{environment_id}.{type}. - A worker dequeues the message and updates status to
running. - The worker executes the job based on its
type. - On success, status is updated to
completedwith result data. - If a
webhook_urlwas provided, the worker sends a POST request to the callback URL (after SSRF validation). - On failure, status is updated to
failedwith error details.
NATS Tenant Isolation
NATS is strictly internal and never exposed to tenant applications. All tenant isolation is enforced at the API gateway layer. Defense-in-depth measures include:
- NATS listens only on the internal Docker/Kubernetes network (no public port binding).
- NATS authentication uses a shared internal token (
NATS_AUTH_TOKEN); only AtlasDB services can connect. - All publish/subscribe operations scope subjects by
project_idextracted from the validatedTenantContext. - In Kubernetes, NATS is deployed in the same namespace with a
NetworkPolicyrestricting ingress to AtlasDB pods only.
Troubleshooting
| Problem | Cause | Fix |
|---|---|---|
| Published events not received | No active subscriber online | For durability, use streams instead of pub/sub |
| Pull returns empty | No new messages since last pull | Wait for new messages to be published |
| Stream creation fails | NATS not connected | Verify ATLAS_NATS_URL and NATS server status |
| Consumer lost messages | Consumer deleted and recreated | Use durable names; messages replay from start |
Job stuck in pending | Worker not running or NATS disconnected | Check NATS connectivity and worker logs |
| Webhook not called | URL blocked by SSRF filter | Use a public URL (not localhost or private IP) |
| Job failed | Runtime error during execution | Check the error field in status response |
Job not found (404) | Wrong job_id or different project | Verify job_id and use the same project's API key |
Realtime
WebSocket-based CDC streaming, ephemeral broadcast messaging, presence tracking, and cron job scheduling for AtlasDB tenants.
Functions & Triggers
Deploy and invoke server-side functions via QuickJS, Wasmtime, Node, and Deno engines, with event-driven triggers for CDC, auth, storage, cron, and custom events.