Skip to content

Airflow Integration

Trigger dbt runs from your Airflow DAGs and get webhook callbacks when they complete. No bridge service or custom operator required.

Capability How
Trigger a dbt run POST /api/v1/projects/:id/jobs/:job_id/trigger
Run specific models Pass command_override in the trigger request body
Get notified on completion Configure a webhook URL on the job pointing at Airflow's dagRuns API
Authenticate dagctl API token (dctl_...) + Airflow Basic Auth via webhook headers
sequenceDiagram
    participant Airflow
    participant dagctl
    participant dbt

    Airflow->>dagctl: POST /trigger (command_override: "dbt build --select my_model+")
    dagctl-->>Airflow: 200 OK {job_run_id}
    dagctl->>dbt: Execute dbt build --select my_model+
    dbt-->>dagctl: Complete (success/failure)
    dagctl->>Airflow: POST /dags/{dag_id}/dagRuns {conf: {event, status, ...}}
    Airflow->>Airflow: Continue downstream pipeline

Prerequisites

  1. A deployed dbt project in dagctl
  2. A job configured on that project (created automatically on first deploy, or via the API)
  3. An API token generated from Settings > Authentication > API Tokens
  4. Your Airflow instance's REST API enabled with authentication configured

Environment variables

All examples assume the following are set:

export DAGCTL_TOKEN="dctl_your_token_here"
export ORG_ID="org-your_org_id"
export PROJECT_ID="your_project_id"
export JOB_ID="your_job_id"

Find your project and job IDs from the dagctl web UI or via GET /api/v1/projects.


Step 1: Trigger a dbt Run from Airflow

Basic trigger (runs the job's default command)

curl -X POST "https://api.dagctl.com/api/v1/projects/${PROJECT_ID}/jobs/${JOB_ID}/trigger" \
  -H "Authorization: Bearer ${DAGCTL_TOKEN}" \
  -H "X-Organization-ID: ${ORG_ID}" \
  -H "Content-Type: application/json"

Response:

{
  "message": "DBT job run triggered successfully",
  "job_run_id": "abc-123",
  "job_id": "def-456",
  "job_name": "scheduled-run-manual-20260402-120000"
}

Trigger with a model selector

Pass command_override to run a subset of models instead of the job's full command:

curl -X POST "https://api.dagctl.com/api/v1/projects/${PROJECT_ID}/jobs/${JOB_ID}/trigger" \
  -H "Authorization: Bearer ${DAGCTL_TOKEN}" \
  -H "X-Organization-ID: ${ORG_ID}" \
  -H "Content-Type: application/json" \
  -d '{"command_override": "dbt build --select my_model+"}'

The override must be a valid dbt command (dbt build, dbt run, dbt test, etc.) with any selectors your project supports. The job's stored command is not modified; the override applies only to this run.

Command validation

The override is validated server-side against the same rules as job commands. Shell metacharacters (;, &&, |, etc.) are rejected. Only recognized dbt subcommands are allowed.

Airflow DAG example

import json
from airflow import DAG
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.providers.http.sensors.http import HttpSensor
from datetime import datetime

# Create an Airflow HTTP connection named "dagctl_api" with:
#   Host: https://api.dagctl.com
#   Header: {"Authorization": "Bearer dctl_...", "X-Organization-ID": "org-..."}

PROJECT_ID = "your_project_id"
JOB_ID = "your_job_id"

with DAG("dagctl_dbt_run", start_date=datetime(2026, 1, 1), schedule="@daily") as dag:

    trigger = SimpleHttpOperator(
        task_id="trigger_dbt_run",
        http_conn_id="dagctl_api",
        endpoint=f"/api/v1/projects/{PROJECT_ID}/jobs/{JOB_ID}/trigger",
        method="POST",
        headers={"Content-Type": "application/json"},
        data=json.dumps({"command_override": "dbt build --select my_model+"}),
        response_filter=lambda resp: resp.json()["job_run_id"],
        log_response=True,
    )

    poll_status = HttpSensor(
        task_id="wait_for_completion",
        http_conn_id="dagctl_api",
        endpoint=f"/api/v1/runs/{{{{ ti.xcom_pull(task_ids='trigger_dbt_run') }}}}",
        method="GET",
        response_check=lambda resp: resp.json()["status"] in (
            "success", "partial", "failed", "error"
        ),
        poke_interval=30,
        timeout=3600,
    )

    trigger >> poll_status

Step 2: Configure Webhook Callbacks

Instead of polling for completion, configure a webhook on the dagctl job that POSTs directly to Airflow's dagRuns API when the run finishes.

Configure via the dagctl API

curl -X PATCH "https://api.dagctl.com/api/v1/projects/${PROJECT_ID}/jobs/${JOB_ID}" \
  -H "Authorization: Bearer ${DAGCTL_TOKEN}" \
  -H "X-Organization-ID: ${ORG_ID}" \
  -H "Content-Type: application/json" \
  -d '{
    "webhook_enabled": true,
    "webhook_url": "https://your-airflow.example.com/api/v1/dags/downstream_pipeline/dagRuns",
    "webhook_headers": {
      "Authorization": "Basic dXNlcjpwYXNzd29yZA=="
    },
    "webhook_events": ["completed", "failed"]
  }'
Field Type Description
webhook_enabled boolean Enable or disable webhook delivery
webhook_url string The URL to POST to on job completion. For Airflow, use /api/v1/dags/{dag_id}/dagRuns
webhook_headers object Custom HTTP headers merged into the webhook request. Use this for Airflow authentication
webhook_events array Which events trigger the webhook: completed, failed, partial. Empty array fires on all events

Webhook secret

A webhook secret is generated server-side when you configure a URL. dagctl signs every webhook payload with this secret using HMAC-SHA256 and includes the signature in the X-Dagctl-Signature header. The secret is not returned in API responses. To verify signatures, retrieve the secret from the dagctl web UI (coming soon) or regenerate it by updating the webhook URL.

Webhook payload

dagctl wraps the payload in a conf key so it maps directly to Airflow's dagRuns API format. Airflow stores it as the DAG run configuration, accessible via dag_run.conf in your tasks.

{
  "conf": {
    "event": "job.completed",
    "timestamp": "2026-04-02T12:00:00Z",
    "job": {
      "id": "def-456",
      "name": "scheduled-run",
      "command": "dbt build"
    },
    "job_run": {
      "id": "abc-123",
      "status": "success",
      "triggered_by": "manual",
      "start_time": "2026-04-02T11:58:00Z",
      "end_time": "2026-04-02T12:00:00Z",
      "duration_seconds": 120.5,
      "command_override": "dbt build --select my_model+",
      "error_message": null
    },
    "project": {
      "id": "project-uuid",
      "name": "dw-main",
      "type": "dbt"
    },
    "organization": {
      "id": "org-uuid",
      "name": "acme"
    }
  }
}

Reading the webhook in an Airflow DAG

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime


def process_dagctl_result(**context):
    conf = context["dag_run"].conf
    event = conf["event"]         # "job.completed" or "job.failed"
    status = conf["job_run"]["status"]
    project = conf["project"]["name"]
    duration = conf["job_run"]["duration_seconds"]

    if status != "success":
        raise Exception(f"dagctl run failed for {project}: {conf['job_run'].get('error_message')}")

    print(f"dagctl run for {project} completed in {duration}s")


with DAG("downstream_pipeline", start_date=datetime(2026, 1, 1), schedule=None) as dag:

    check_result = PythonOperator(
        task_id="check_dagctl_result",
        python_callable=process_dagctl_result,
    )

    # Add downstream tasks here
    # check_result >> refresh_dashboards >> notify_stakeholders

Putting It Together: Full Bidirectional Flow

The complete integration without polling:

  1. Airflow triggers dagctl via POST /trigger with an optional command_override
  2. dagctl runs the dbt command in an isolated container with your project's configuration
  3. dagctl webhooks Airflow on completion, triggering a downstream DAG
  4. Airflow reads dag_run.conf to check the result and continues the pipeline
from airflow import DAG
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.operators.python import PythonOperator
from datetime import datetime
import json

PROJECT_ID = "your_project_id"
JOB_ID = "your_job_id"

# This DAG triggers dagctl. The webhook fires a SEPARATE DAG on completion.
with DAG("trigger_dagctl", start_date=datetime(2026, 1, 1), schedule="@daily") as dag:

    trigger = SimpleHttpOperator(
        task_id="trigger_dbt_run",
        http_conn_id="dagctl_api",
        endpoint=f"/api/v1/projects/{PROJECT_ID}/jobs/{JOB_ID}/trigger",
        method="POST",
        headers={"Content-Type": "application/json"},
        data=json.dumps({"command_override": "dbt build"}),
        log_response=True,
    )

The webhook is configured on the dagctl job to point at a separate Airflow DAG (e.g., downstream_pipeline) which handles the completion event. This decouples the trigger from the callback and eliminates polling entirely.


Webhook Delivery Details

Behavior Detail
HTTP method POST
Content type application/json
Signature header X-Dagctl-Signature: sha256=<HMAC-SHA256 hex digest>
Event header X-Dagctl-Event: job.completed
Timeout 10 seconds
Retries 1 retry on 5xx or timeout, with 2-second backoff
Deduplication Guaranteed at-most-once delivery per job run
Custom headers Merged from webhook_headers configuration

Delivery guarantees

Webhook delivery is at-most-once. If the first attempt and single retry both fail, the webhook is not retried further. Monitor your Airflow endpoint's availability if you depend on webhook callbacks for pipeline orchestration. For critical pipelines, combine webhooks with periodic polling as a fallback.


Troubleshooting

Webhook not firing

  • Verify webhook_enabled is true on the job
  • Check that webhook_events includes the event type (e.g., completed). An empty array fires on all events.
  • Webhooks only fire for dbt projects, not SQLMesh

Airflow returns 401/403

  • Verify the Authorization header in webhook_headers matches your Airflow API authentication method (Basic Auth, Kerberos token, etc.)
  • Ensure the Airflow API is enabled in your airflow.cfg (auth_backends must be configured)

Airflow returns 404

  • Verify the DAG ID in the webhook URL matches an existing, unpaused DAG in Airflow
  • The Airflow REST API endpoint format is /api/v1/dags/{dag_id}/dagRuns

Command override rejected

  • The override must start with dbt followed by an allowed subcommand (build, run, test, seed, snapshot, etc.)
  • Shell metacharacters (;, &&, |, `, $(, >, <) are blocked
  • Command override is only supported for dbt projects

Next Steps