Skip to content

API Examples

Production-ready patterns for the three most common integration use cases: incremental data export, SLA threshold monitoring, and model performance regression detection.

Use Case Endpoints Used Language
Incremental polling /runs, /model-executions cURL, Python
SLA threshold check /runs/daily-stats Python
Model performance comparison /model-executions/stats Python

Prerequisites

All examples assume you have set the following environment variables. Generate an API token from Settings > Authentication > API Tokens in the dagctl web UI.

export DAGCTL_TOKEN="dctl_your_token_here"
export ORG_ID="org-your_org_id"
export PROJECT_ID="your_project_id"

Incremental Polling

Fetch only records that changed since your last poll. Uses a 60-second overlap and client-side deduplication to guard against concurrent writes. See the Polling Guide for details on why this is necessary.

cURL

# First poll — get all runs from the last 24 hours
# Linux:
SINCE=$(date -u -d '24 hours ago' +"%Y-%m-%dT%H:%M:%SZ")
# macOS:
# SINCE=$(date -u -v-24H +"%Y-%m-%dT%H:%M:%SZ")

curl -s "https://api.dagctl.com/api/v1/projects/${PROJECT_ID}/runs?updated_after=${SINCE}&include_stats=false&limit=500" \
  -H "Authorization: Bearer ${DAGCTL_TOKEN}" \
  -H "X-Organization-ID: ${ORG_ID}" | jq '.runs'

Python

Both /runs and /model-executions use the same pagination and deduplication approach — only the URL path and response key differ.

import os
import requests
from datetime import datetime, timedelta, timezone

API_BASE = "https://api.dagctl.com/api/v1"
HEADERS = {
    "Authorization": f"Bearer {os.environ['DAGCTL_TOKEN']}",
    "X-Organization-ID": os.environ["ORG_ID"],
}
OVERLAP = timedelta(seconds=60)


def fetch_runs_since(project_id: str, since: datetime) -> list:
    """Fetch all job runs updated after the given timestamp."""
    cursor = since - OVERLAP
    runs = []
    page = 1
    while True:
        resp = requests.get(
            f"{API_BASE}/projects/{project_id}/runs",
            headers=HEADERS,
            params={
                "updated_after": cursor.isoformat(),
                "include_stats": "false",
                "limit": 500,
                "page": page,
            },
        )
        resp.raise_for_status()
        data = resp.json()
        runs.extend(data["runs"])
        if page >= data["total_pages"]:
            break
        page += 1
    seen = set()
    return [r for r in runs if r["id"] not in seen and not seen.add(r["id"])]


def fetch_model_executions_since(project_id: str, since: datetime) -> list:
    """Fetch all model executions updated after the given timestamp."""
    cursor = since - OVERLAP
    executions = []
    page = 1
    while True:
        resp = requests.get(
            f"{API_BASE}/projects/{project_id}/model-executions",
            headers=HEADERS,
            params={
                "updated_after": cursor.isoformat(),
                "limit": 500,
                "page": page,
            },
        )
        resp.raise_for_status()
        data = resp.json()
        executions.extend(data["executions"])
        if page >= data["total_pages"]:
            break
        page += 1
    seen = set()
    return [e for e in executions if e["id"] not in seen and not seen.add(e["id"])]

Error handling

These examples use raise_for_status() for brevity. Production implementations should handle HTTP errors (rate limits, transient failures) with retries and backoff.


SLA Threshold Check

Check daily stats for runs where the p95 duration exceeded your SLA threshold.

def check_sla_breaches(project_id: str, threshold_seconds: float, days: int = 7) -> list:
    """Return days where p95 duration exceeded the SLA threshold."""
    resp = requests.get(
        f"{API_BASE}/projects/{project_id}/runs/daily-stats",
        headers=HEADERS,
        params={"days": days},
    )
    resp.raise_for_status()
    breaches = []
    for day in resp.json()["daily_stats"]:
        p95 = day.get("p95_duration_seconds")
        if p95 is not None and p95 > threshold_seconds:
            breaches.append({
                "date": day["date"],
                "p95_duration": p95,
                "max_duration": day.get("max_duration_seconds"),
                "total_runs": day["total_runs"],
            })
    return breaches

Model Performance Comparison

Compare model execution stats across two time periods to detect regressions. Returns models where the p95 duration increased by more than 20%.

def compare_model_performance(
    project_id: str,
    current_start: str,
    current_end: str,
    previous_start: str,
    previous_end: str,
) -> list:
    """Compare model p95 durations between two date ranges."""
    def get_stats(start_date, end_date):
        resp = requests.get(
            f"{API_BASE}/projects/{project_id}/model-executions/stats",
            headers=HEADERS,
            params={"start_date": start_date, "end_date": end_date},
        )
        resp.raise_for_status()
        return {s["model_name"]: s for s in resp.json()["stats"]}

    current = get_stats(current_start, current_end)
    previous = get_stats(previous_start, previous_end)

    regressions = []
    for model, curr in current.items():
        prev = previous.get(model)
        if prev and curr.get("p95_duration_seconds") and prev.get("p95_duration_seconds"):
            change = (curr["p95_duration_seconds"] - prev["p95_duration_seconds"]) / prev["p95_duration_seconds"]
            if change > 0.2:
                regressions.append({
                    "model": model,
                    "current_p95": curr["p95_duration_seconds"],
                    "previous_p95": prev["p95_duration_seconds"],
                    "change_pct": round(change * 100, 1),
                })
    return sorted(regressions, key=lambda r: r["change_pct"], reverse=True)

Scheduling

Run these scripts on a schedule using cron, Airflow, or a cloud function. For incremental polling, every 5-15 minutes is typical. For SLA checks and regression comparisons, daily is usually sufficient.


Next Steps