API Examples
Production-ready patterns for the three most common integration use cases: incremental data export, SLA threshold monitoring, and model performance regression detection.
| Use Case | Endpoints Used | Language |
|---|---|---|
| Incremental polling | /runs, /model-executions |
cURL, Python |
| SLA threshold check | /runs/daily-stats |
Python |
| Model performance comparison | /model-executions/stats |
Python |
Prerequisites
All examples assume you have set the following environment variables. Generate an API token from Settings > Authentication > API Tokens in the dagctl web UI.
Incremental Polling
Fetch only records that changed since your last poll. Uses a 60-second overlap and client-side deduplication to guard against concurrent writes. See the Polling Guide for details on why this is necessary.
cURL
# First poll — get all runs from the last 24 hours
# Linux:
SINCE=$(date -u -d '24 hours ago' +"%Y-%m-%dT%H:%M:%SZ")
# macOS:
# SINCE=$(date -u -v-24H +"%Y-%m-%dT%H:%M:%SZ")
curl -s "https://api.dagctl.com/api/v1/projects/${PROJECT_ID}/runs?updated_after=${SINCE}&include_stats=false&limit=500" \
-H "Authorization: Bearer ${DAGCTL_TOKEN}" \
-H "X-Organization-ID: ${ORG_ID}" | jq '.runs'
Python
Both /runs and /model-executions use the same pagination and deduplication approach — only the URL path and response key differ.
import os
import requests
from datetime import datetime, timedelta, timezone
API_BASE = "https://api.dagctl.com/api/v1"
HEADERS = {
"Authorization": f"Bearer {os.environ['DAGCTL_TOKEN']}",
"X-Organization-ID": os.environ["ORG_ID"],
}
OVERLAP = timedelta(seconds=60)
def fetch_runs_since(project_id: str, since: datetime) -> list:
"""Fetch all job runs updated after the given timestamp."""
cursor = since - OVERLAP
runs = []
page = 1
while True:
resp = requests.get(
f"{API_BASE}/projects/{project_id}/runs",
headers=HEADERS,
params={
"updated_after": cursor.isoformat(),
"include_stats": "false",
"limit": 500,
"page": page,
},
)
resp.raise_for_status()
data = resp.json()
runs.extend(data["runs"])
if page >= data["total_pages"]:
break
page += 1
seen = set()
return [r for r in runs if r["id"] not in seen and not seen.add(r["id"])]
def fetch_model_executions_since(project_id: str, since: datetime) -> list:
"""Fetch all model executions updated after the given timestamp."""
cursor = since - OVERLAP
executions = []
page = 1
while True:
resp = requests.get(
f"{API_BASE}/projects/{project_id}/model-executions",
headers=HEADERS,
params={
"updated_after": cursor.isoformat(),
"limit": 500,
"page": page,
},
)
resp.raise_for_status()
data = resp.json()
executions.extend(data["executions"])
if page >= data["total_pages"]:
break
page += 1
seen = set()
return [e for e in executions if e["id"] not in seen and not seen.add(e["id"])]
Error handling
These examples use raise_for_status() for brevity. Production implementations should handle HTTP errors (rate limits, transient failures) with retries and backoff.
SLA Threshold Check
Check daily stats for runs where the p95 duration exceeded your SLA threshold.
def check_sla_breaches(project_id: str, threshold_seconds: float, days: int = 7) -> list:
"""Return days where p95 duration exceeded the SLA threshold."""
resp = requests.get(
f"{API_BASE}/projects/{project_id}/runs/daily-stats",
headers=HEADERS,
params={"days": days},
)
resp.raise_for_status()
breaches = []
for day in resp.json()["daily_stats"]:
p95 = day.get("p95_duration_seconds")
if p95 is not None and p95 > threshold_seconds:
breaches.append({
"date": day["date"],
"p95_duration": p95,
"max_duration": day.get("max_duration_seconds"),
"total_runs": day["total_runs"],
})
return breaches
Model Performance Comparison
Compare model execution stats across two time periods to detect regressions. Returns models where the p95 duration increased by more than 20%.
def compare_model_performance(
project_id: str,
current_start: str,
current_end: str,
previous_start: str,
previous_end: str,
) -> list:
"""Compare model p95 durations between two date ranges."""
def get_stats(start_date, end_date):
resp = requests.get(
f"{API_BASE}/projects/{project_id}/model-executions/stats",
headers=HEADERS,
params={"start_date": start_date, "end_date": end_date},
)
resp.raise_for_status()
return {s["model_name"]: s for s in resp.json()["stats"]}
current = get_stats(current_start, current_end)
previous = get_stats(previous_start, previous_end)
regressions = []
for model, curr in current.items():
prev = previous.get(model)
if prev and curr.get("p95_duration_seconds") and prev.get("p95_duration_seconds"):
change = (curr["p95_duration_seconds"] - prev["p95_duration_seconds"]) / prev["p95_duration_seconds"]
if change > 0.2:
regressions.append({
"model": model,
"current_p95": curr["p95_duration_seconds"],
"previous_p95": prev["p95_duration_seconds"],
"change_pct": round(change * 100, 1),
})
return sorted(regressions, key=lambda r: r["change_pct"], reverse=True)
Scheduling
Run these scripts on a schedule using cron, Airflow, or a cloud function. For incremental polling, every 5-15 minutes is typical. For SLA checks and regression comparisons, daily is usually sufficient.
Next Steps
- Execution Metrics Reference — Full endpoint documentation with query parameters and response schemas
- Observability — Built-in monitoring dashboard and model health tracking