Airflow Integration
Trigger dbt runs from your Airflow DAGs and get webhook callbacks when they complete. No bridge service or custom operator required.
| Capability | How |
|---|---|
| Trigger a dbt run | POST /api/v1/projects/:id/jobs/:job_id/trigger |
| Run specific models | Pass command_override in the trigger request body |
| Get notified on completion | Configure a webhook URL on the job pointing at Airflow's dagRuns API |
| Authenticate | dagctl API token (dctl_...) + Airflow Basic Auth via webhook headers |
sequenceDiagram
participant Airflow
participant dagctl
participant dbt
Airflow->>dagctl: POST /trigger (command_override: "dbt build --select my_model+")
dagctl-->>Airflow: 200 OK {job_run_id}
dagctl->>dbt: Execute dbt build --select my_model+
dbt-->>dagctl: Complete (success/failure)
dagctl->>Airflow: POST /dags/{dag_id}/dagRuns {conf: {event, status, ...}}
Airflow->>Airflow: Continue downstream pipeline
Prerequisites
- A deployed dbt project in dagctl
- A job configured on that project (created automatically on first deploy, or via the API)
- An API token generated from Settings > Authentication > API Tokens
- Your Airflow instance's REST API enabled with authentication configured
Environment variables
All examples assume the following are set:
export DAGCTL_TOKEN="dctl_your_token_here"
export ORG_ID="org-your_org_id"
export PROJECT_ID="your_project_id"
export JOB_ID="your_job_id"
Find your project and job IDs from the dagctl web UI or via GET /api/v1/projects.
Step 1: Trigger a dbt Run from Airflow
Basic trigger (runs the job's default command)
curl -X POST "https://api.dagctl.com/api/v1/projects/${PROJECT_ID}/jobs/${JOB_ID}/trigger" \
-H "Authorization: Bearer ${DAGCTL_TOKEN}" \
-H "X-Organization-ID: ${ORG_ID}" \
-H "Content-Type: application/json"
Response:
{
"message": "DBT job run triggered successfully",
"job_run_id": "abc-123",
"job_id": "def-456",
"job_name": "scheduled-run-manual-20260402-120000"
}
Trigger with a model selector
Pass command_override to run a subset of models instead of the job's full command:
curl -X POST "https://api.dagctl.com/api/v1/projects/${PROJECT_ID}/jobs/${JOB_ID}/trigger" \
-H "Authorization: Bearer ${DAGCTL_TOKEN}" \
-H "X-Organization-ID: ${ORG_ID}" \
-H "Content-Type: application/json" \
-d '{"command_override": "dbt build --select my_model+"}'
The override must be a valid dbt command (dbt build, dbt run, dbt test, etc.) with any selectors your project supports. The job's stored command is not modified; the override applies only to this run.
Command validation
The override is validated server-side against the same rules as job commands. Shell metacharacters (;, &&, |, etc.) are rejected. Only recognized dbt subcommands are allowed.
Airflow DAG example
import json
from airflow import DAG
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.providers.http.sensors.http import HttpSensor
from datetime import datetime
# Create an Airflow HTTP connection named "dagctl_api" with:
# Host: https://api.dagctl.com
# Header: {"Authorization": "Bearer dctl_...", "X-Organization-ID": "org-..."}
PROJECT_ID = "your_project_id"
JOB_ID = "your_job_id"
with DAG("dagctl_dbt_run", start_date=datetime(2026, 1, 1), schedule="@daily") as dag:
trigger = SimpleHttpOperator(
task_id="trigger_dbt_run",
http_conn_id="dagctl_api",
endpoint=f"/api/v1/projects/{PROJECT_ID}/jobs/{JOB_ID}/trigger",
method="POST",
headers={"Content-Type": "application/json"},
data=json.dumps({"command_override": "dbt build --select my_model+"}),
response_filter=lambda resp: resp.json()["job_run_id"],
log_response=True,
)
poll_status = HttpSensor(
task_id="wait_for_completion",
http_conn_id="dagctl_api",
endpoint=f"/api/v1/runs/{{{{ ti.xcom_pull(task_ids='trigger_dbt_run') }}}}",
method="GET",
response_check=lambda resp: resp.json()["status"] in (
"success", "partial", "failed", "error"
),
poke_interval=30,
timeout=3600,
)
trigger >> poll_status
Step 2: Configure Webhook Callbacks
Instead of polling for completion, configure a webhook on the dagctl job that POSTs directly to Airflow's dagRuns API when the run finishes.
Configure via the dagctl API
curl -X PATCH "https://api.dagctl.com/api/v1/projects/${PROJECT_ID}/jobs/${JOB_ID}" \
-H "Authorization: Bearer ${DAGCTL_TOKEN}" \
-H "X-Organization-ID: ${ORG_ID}" \
-H "Content-Type: application/json" \
-d '{
"webhook_enabled": true,
"webhook_url": "https://your-airflow.example.com/api/v1/dags/downstream_pipeline/dagRuns",
"webhook_headers": {
"Authorization": "Basic dXNlcjpwYXNzd29yZA=="
},
"webhook_events": ["completed", "failed"]
}'
| Field | Type | Description |
|---|---|---|
webhook_enabled |
boolean | Enable or disable webhook delivery |
webhook_url |
string | The URL to POST to on job completion. For Airflow, use /api/v1/dags/{dag_id}/dagRuns |
webhook_headers |
object | Custom HTTP headers merged into the webhook request. Use this for Airflow authentication |
webhook_events |
array | Which events trigger the webhook: completed, failed, partial. Empty array fires on all events |
Webhook secret
A webhook secret is generated server-side when you configure a URL. dagctl signs every webhook payload with this secret using HMAC-SHA256 and includes the signature in the X-Dagctl-Signature header. The secret is not returned in API responses. To verify signatures, retrieve the secret from the dagctl web UI (coming soon) or regenerate it by updating the webhook URL.
Webhook payload
dagctl wraps the payload in a conf key so it maps directly to Airflow's dagRuns API format. Airflow stores it as the DAG run configuration, accessible via dag_run.conf in your tasks.
{
"conf": {
"event": "job.completed",
"timestamp": "2026-04-02T12:00:00Z",
"job": {
"id": "def-456",
"name": "scheduled-run",
"command": "dbt build"
},
"job_run": {
"id": "abc-123",
"status": "success",
"triggered_by": "manual",
"start_time": "2026-04-02T11:58:00Z",
"end_time": "2026-04-02T12:00:00Z",
"duration_seconds": 120.5,
"command_override": "dbt build --select my_model+",
"error_message": null
},
"project": {
"id": "project-uuid",
"name": "dw-main",
"type": "dbt"
},
"organization": {
"id": "org-uuid",
"name": "acme"
}
}
}
Reading the webhook in an Airflow DAG
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def process_dagctl_result(**context):
conf = context["dag_run"].conf
event = conf["event"] # "job.completed" or "job.failed"
status = conf["job_run"]["status"]
project = conf["project"]["name"]
duration = conf["job_run"]["duration_seconds"]
if status != "success":
raise Exception(f"dagctl run failed for {project}: {conf['job_run'].get('error_message')}")
print(f"dagctl run for {project} completed in {duration}s")
with DAG("downstream_pipeline", start_date=datetime(2026, 1, 1), schedule=None) as dag:
check_result = PythonOperator(
task_id="check_dagctl_result",
python_callable=process_dagctl_result,
)
# Add downstream tasks here
# check_result >> refresh_dashboards >> notify_stakeholders
Putting It Together: Full Bidirectional Flow
The complete integration without polling:
- Airflow triggers dagctl via
POST /triggerwith an optionalcommand_override - dagctl runs the dbt command in an isolated container with your project's configuration
- dagctl webhooks Airflow on completion, triggering a downstream DAG
- Airflow reads
dag_run.confto check the result and continues the pipeline
from airflow import DAG
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.operators.python import PythonOperator
from datetime import datetime
import json
PROJECT_ID = "your_project_id"
JOB_ID = "your_job_id"
# This DAG triggers dagctl. The webhook fires a SEPARATE DAG on completion.
with DAG("trigger_dagctl", start_date=datetime(2026, 1, 1), schedule="@daily") as dag:
trigger = SimpleHttpOperator(
task_id="trigger_dbt_run",
http_conn_id="dagctl_api",
endpoint=f"/api/v1/projects/{PROJECT_ID}/jobs/{JOB_ID}/trigger",
method="POST",
headers={"Content-Type": "application/json"},
data=json.dumps({"command_override": "dbt build"}),
log_response=True,
)
The webhook is configured on the dagctl job to point at a separate Airflow DAG (e.g., downstream_pipeline) which handles the completion event. This decouples the trigger from the callback and eliminates polling entirely.
Webhook Delivery Details
| Behavior | Detail |
|---|---|
| HTTP method | POST |
| Content type | application/json |
| Signature header | X-Dagctl-Signature: sha256=<HMAC-SHA256 hex digest> |
| Event header | X-Dagctl-Event: job.completed |
| Timeout | 10 seconds |
| Retries | 1 retry on 5xx or timeout, with 2-second backoff |
| Deduplication | Guaranteed at-most-once delivery per job run |
| Custom headers | Merged from webhook_headers configuration |
Delivery guarantees
Webhook delivery is at-most-once. If the first attempt and single retry both fail, the webhook is not retried further. Monitor your Airflow endpoint's availability if you depend on webhook callbacks for pipeline orchestration. For critical pipelines, combine webhooks with periodic polling as a fallback.
Troubleshooting
Webhook not firing
- Verify
webhook_enabledistrueon the job - Check that
webhook_eventsincludes the event type (e.g.,completed). An empty array fires on all events. - Webhooks only fire for dbt projects, not SQLMesh
Airflow returns 401/403
- Verify the
Authorizationheader inwebhook_headersmatches your Airflow API authentication method (Basic Auth, Kerberos token, etc.) - Ensure the Airflow API is enabled in your
airflow.cfg(auth_backendsmust be configured)
Airflow returns 404
- Verify the DAG ID in the webhook URL matches an existing, unpaused DAG in Airflow
- The Airflow REST API endpoint format is
/api/v1/dags/{dag_id}/dagRuns
Command override rejected
- The override must start with
dbtfollowed by an allowed subcommand (build,run,test,seed,snapshot, etc.) - Shell metacharacters (
;,&&,|,`,$(,>,<) are blocked - Command override is only supported for dbt projects
Next Steps
- API Reference - Authentication, pagination, and conventions
- Execution Metrics - Query job run history and model performance
- dbt Jobs - Job configuration and scheduling