Workflows & Integrations
DAG-based workflow automation with 18 node types, NATS-driven step dispatch, SSE monitoring, plus 500+ third-party integrations via Nango OAuth, templated actions, webhooks, and circuit breaker protection.
Overview
AtlasDB provides two deeply integrated systems for automation and external connectivity:
- Workflows — a visual, DAG-based workflow engine. Define multi-step automations as JSON node-and-edge graphs (compatible with React Flow). Execute via API, webhook, CDC event, or cron schedule. Monitor in real time via SSE.
- Integrations — connect to 500+ external services (Slack, GitHub, Stripe, HubSpot, etc.) via OAuth through Nango. Execute templated actions, proxy raw API calls, and manage bidirectional webhooks with HMAC verification.
Workflow steps can invoke integrations directly via action_integration nodes.
Key Concepts
DAG-Based Workflow DSL
Workflows are JSON documents with nodes and edges. The React Flow visual builder generates this structure; the engine executes it as an event-driven state machine over NATS JetStream.
{
"version": 1,
"nodes": [
{
"id": "trigger_1", "type": "trigger_record",
"config": { "object": "orders", "event": "created",
"conditions": [{ "field": "amount", "op": "gt", "value": 100 }] },
"on_error": "stop"
},
{
"id": "action_1", "type": "action_integration",
"config": { "connection_id": "conn_slack_123", "action_key": "send_message",
"params": { "channel": "#sales", "text": "New order: {{trigger.record.name}}" } },
"retry": { "max_retries": 3, "backoff": "exponential" },
"on_error": "continue", "timeout_seconds": 30
}
],
"edges": [
{ "id": "edge_1", "source": "trigger_1", "target": "action_1" }
]
}
Per-node options: on_error ("stop", "continue", "goto:<node_id>"), retry (exponential backoff, configurable initial_delay_ms / max_delay_ms), timeout_seconds. The {{...}} template syntax resolves dot-paths like trigger.record.name and steps.code_1.output.total.
Step Types (18 Total)
| Category | Type | Description |
|---|---|---|
| Trigger | trigger_record | CDC events (created/updated/deleted) with field conditions |
| Trigger | trigger_schedule | Cron expression with timezone |
| Trigger | trigger_webhook | Inbound webhook with optional signature verification |
| Trigger | trigger_manual | REST API with optional input schema |
| Trigger | trigger_integration | Integration webhook or polling event |
| Action | action_integration | Templated action via integration connection |
| Action | action_record | Create, update, or delete a database record |
| Action | action_email | Send email using template with merge fields |
| Action | action_http | HTTP request with SSRF validation |
| Action | action_notification | Notification via realtime WebSocket |
| Logic | logic_condition | If/else routing based on expressions |
| Logic | logic_switch | Multi-way branch based on value |
| Logic | logic_loop | Iterate over collection (parallel or sequential) |
| Logic | logic_delay | Wait for duration, datetime, or field-change |
| Logic | logic_parallel | Dispatch all branches, join on all/any/n-of-m |
| Logic | logic_approval | Human approval gate with timeout |
| Code | code_snippet | Inline JavaScript (V8 isolate, sub-ms startup) |
| Code | code_function | Invoke edge function by slug |
| Meta | meta_subworkflow | Start child workflow (sync or async) |
| Meta | meta_error | Error handler target for on_error: "goto:<id>" |
NATS-Based Execution Engine
The engine is a stateless, event-driven state machine with three NATS consumers (trigger evaluator, step dispatcher, completion handler). The database is the source of truth. No long-lived processes or in-memory state.
Run idempotency uses sha256(workflow_id + trigger_event_id) to prevent duplicates from at-least-once NATS delivery. Version immutability ensures mid-execution definition edits never affect in-flight runs.
Third-Party Integrations via Nango
Nango handles OAuth2 authorization flows, token refresh, and secure storage for 500+ providers. AtlasDB delegates OAuth to Nango and proxies authenticated API calls through it. Connector templates define available actions per provider with endpoint patterns, rate limits, and auth type.
Webhooks, Circuit Breaker, and Audit Logging
Inbound webhooks receive events from external services via unique URL tokens with HMAC signature verification. Outbound webhooks dispatch events to external URLs with retry logic and tracked delivery history. A circuit breaker prevents cascading failures when Nango or a provider is unresponsive (closed/open/half-open states with auto-recovery). Every integration API call is audit logged with connection ID, action, status, latency, and timestamp.
API Reference
Workflow Endpoints
| Method | Path | Description | Auth |
|---|---|---|---|
POST | /workflows/v1/definitions | Create a workflow definition | API Key |
GET | /workflows/v1/definitions | List definitions | API Key |
GET | /workflows/v1/definitions/{id} | Get definition details | API Key |
PUT | /workflows/v1/definitions/{id} | Update a definition | API Key |
DELETE | /workflows/v1/definitions/{id} | Soft-delete a definition | API Key |
PATCH | /workflows/v1/definitions/{id}/status | Change status (draft/active/paused/archived) | API Key |
POST | /workflows/v1/runs | Start a workflow run | API Key |
GET | /workflows/v1/runs | List runs | API Key |
GET | /workflows/v1/runs/{id} | Get run details | API Key |
GET | /workflows/v1/runs/{id}/steps | List step runs | API Key |
POST | /workflows/v1/runs/{id}/cancel | Cancel a running workflow | API Key |
GET | /workflows/v1/runs/{id}/stream | SSE stream for live monitoring | API Key |
POST | /workflows/v1/approvals/{id}/respond | Approve or reject | API Key |
GET | /workflows/v1/approvals/pending | List pending approvals | API Key |
POST | /webhooks/workflow/{url_token} | Receive webhook trigger (public) | None |
Integration Endpoints
| Method | Path | Description | Auth |
|---|---|---|---|
POST | /integrations/connections | Create connection (starts OAuth) | API Key |
GET | /integrations/connections | List connections | API Key |
GET | /integrations/connections/{id} | Get connection details | API Key |
PATCH | /integrations/connections/{id} | Update connection | API Key |
DELETE | /integrations/connections/{id} | Delete connection (revoke in Nango) | API Key |
GET | /integrations/connections/{id}/callback | OAuth callback handler | None |
POST | /integrations/connections/{id}/reconnect | Reconnect expired connection | API Key |
GET | /integrations/connections/{id}/logs | Audit logs for connection | API Key |
POST | /integrations/execute | Execute a templated action | API Key |
POST | /integrations/proxy | Raw proxy request to provider | API Key |
GET | /integrations/templates | List connector templates | API Key |
POST | /integrations/webhooks/inbound | Register inbound webhook | API Key |
POST | /integrations/webhooks/outbound | Create outbound webhook | API Key |
GET | /integrations/webhooks/outbound/{id}/deliveries | View delivery history | API Key |
POST | /webhooks/inbound/{url_token} | Receive inbound webhook (public) | None |
GET | /integrations/health | Nango health + circuit breaker status | API Key |
Code Examples
Create and Run a Workflow
# Create a workflow definition
curl -X POST http://localhost:3000/workflows/v1/definitions \
-H "Authorization: Bearer $SERVICE_KEY" \
-H "Content-Type: application/json" \
-d '{
"name": "New Order Processing",
"trigger_type": "cdc",
"definition": {
"nodes": [
{"id": "trigger", "type": "trigger_record",
"config": {"event": "created", "object": "orders"}},
{"id": "validate", "type": "code_function",
"config": {"function_slug": "validate-order"}},
{"id": "notify", "type": "action_http",
"config": {"url": "https://hooks.slack.com/...", "method": "POST",
"body": {"text": "New order: {{trigger.record.id}}"}}}
],
"edges": [
{"id": "e1", "source": "trigger", "target": "validate"},
{"id": "e2", "source": "validate", "target": "notify"}
]
}
}'
# Activate the workflow
curl -X PATCH http://localhost:3000/workflows/v1/definitions/$WORKFLOW_ID/status \
-H "Authorization: Bearer $SERVICE_KEY" \
-H "Content-Type: application/json" \
-d '{"status": "active"}'
# Manually start a run
curl -X POST http://localhost:3000/workflows/v1/runs \
-H "Authorization: Bearer $SERVICE_KEY" \
-H "Content-Type: application/json" \
-d '{"workflow_id": "'$WORKFLOW_ID'", "trigger_data": {"order_id": "ord-123"}}'
# Monitor run via SSE
curl -N http://localhost:3000/workflows/v1/runs/$RUN_ID/stream \
-H "Authorization: Bearer $SERVICE_KEY"
Connect and Use an Integration
# Create a Slack connection (returns OAuth authorization URL)
curl -X POST http://localhost:3000/integrations/connections \
-H "Authorization: Bearer $SERVICE_KEY" \
-H "Content-Type: application/json" \
-d '{"provider": "slack", "redirect_url": "https://myapp.com/callback"}'
# Execute a templated Slack action
curl -X POST http://localhost:3000/integrations/execute \
-H "Authorization: Bearer $SERVICE_KEY" \
-H "Content-Type: application/json" \
-d '{
"connection_id": "'$CONNECTION_ID'",
"action_key": "send_message",
"params": {"channel": "#general", "text": "Hello from AtlasDB!"}
}'
# Raw proxy request to any provider API
curl -X POST http://localhost:3000/integrations/proxy \
-H "Authorization: Bearer $SERVICE_KEY" \
-H "Content-Type: application/json" \
-d '{
"connection_id": "'$CONNECTION_ID'",
"method": "GET",
"endpoint": "/api/v1/users"
}'
Register an Inbound Webhook with HMAC
curl -X POST http://localhost:3000/integrations/webhooks/inbound \
-H "Authorization: Bearer $SERVICE_KEY" \
-H "Content-Type: application/json" \
-d '{
"connection_id": "'$CONNECTION_ID'",
"target_event": "push",
"verification_type": "hmac",
"verification_config": {"secret": "whsec_...", "header": "X-Hub-Signature-256"}
}'
Configuration
| Variable | Default | Description |
|---|---|---|
ATLAS_NATS_URL | nats://localhost:4222 | NATS for workflow step dispatch |
ATLAS_NANGO_URL | http://localhost:3003 | Nango server URL |
ATLAS_NANGO_SECRET_KEY | (required) | Nango secret key for API calls |
How It Works
Workflow Execution Flow
- A workflow is triggered (API call, webhook POST, CDC event, or cron tick).
- The trigger evaluator (NATS pull consumer) deserializes the event, queries matching
workflow_triggerrows, checks rate limits via Redis, and creates aworkflow_runwith statusrunning. - Entry nodes are identified. Child action nodes are dispatched via NATS on
atlasdb.workflows.dispatch.{project_id}.{env_id}. - The step dispatcher executes each step type:
action_httpmakes HTTP requests,code_functioninvokes edge functions,action_recordruns SQL,logic_delayuses NATS scheduled delivery,logic_approvalcreates an approval gate,logic_condition/logic_switchevaluate expressions and dispatch matching branches,logic_loopexpands to N parallel dispatches sharing aloop_group_id, andaction_integrationproxies through Nango. - The completion handler merges step output into the run's
contextJSONB atsteps.<node_id>.outputand dispatches next nodes based on edges. - For parallel joins, the handler scans all incoming step_runs to check the join condition (all/any/n-of-m).
- When all nodes complete, the run status is set to
completed.
Integration Connection and Execution Flow
- Client creates a connection with a provider slug. AtlasDB calls Nango to start OAuth.
- User authorizes in the provider's UI. Callback completes the handshake. Status becomes
active. - To execute an action: client sends
connection_id,action_key, andparams. Template is loaded, params substituted, rate limit checked, and the request is proxied through Nango with the OAuth token injected. - Response is returned, audit log written,
last_usedupdated. If the provider returns 401/403, status becomesauth_error.
Circuit Breaker States
- Closed (normal): requests pass through, failures counted.
- Open (tripped): after consecutive failure threshold, all requests short-circuit with 503.
- Half-open (probing): after cooldown, one request is allowed. Success closes the breaker; failure re-opens it.
Troubleshooting
| Problem | Cause | Fix |
|---|---|---|
Workflow not found or not active (404) | Definition is draft or deleted | Set status to active via PATCH |
Run stuck in running | Step failed or NATS disconnected | Check step runs for errors; verify NATS |
| Webhook trigger not firing | Wrong url_token or workflow not active | Verify URL and workflow status |
| SSE stream stops | Run completed or cancelled | Check for run_completed event |
| OAuth redirect fails | Nango misconfigured | Check ATLAS_NANGO_URL and Nango logs |
auth_error on connection | Token revoked or expired | Call the reconnect endpoint |
Circuit breaker open | Nango or provider failing | Check Nango health; breaker auto-resets |
Action not found on execute | Wrong action_key | List templates to see available actions |
| Inbound webhook 404 | Wrong url_token | Verify webhook was registered |
| Rate limit hit | Too many provider calls | Wait for rate limit window to reset |
Functions & Triggers
Deploy and invoke server-side functions via QuickJS, Wasmtime, Node, and Deno engines, with event-driven triggers for CDC, auth, storage, cron, and custom events.
Vector Search & RAG
Embedding generation, vector storage backends, hybrid search with RRF fusion, auto-embedding via CDC, knowledge collections, and LLM-powered Q&A with source citations.