Docs / Flows

Flows

Complete reference for flow definitions — tasks, dependencies, executors, overrides, and examples.

Flow Definition

A flow is a JSON object with a single required field: tasks. This is the top-level schema you submit to Tasked.

{
  "tasks": [
    // one or more task definitions
  ]
}
Field Type Required Description
tasks TaskDef[] Yes Array of task definitions forming the DAG
webhooks FlowWebhooks No Optional webhook URLs for completion/failure notifications (see Webhooks)

The engine validates the DAG at submission: task IDs must be unique, depends_on references must point to tasks in the same flow, and the graph must be acyclic.

Task Definition

Each task in the tasks array is a JSON object with the following fields:

Field Type Required Description
id string Yes Unique identifier within the flow
executor string Yes Executor type: shell, http, noop, callback, delay, approval, container, agent, spawn, or trigger
config object No Executor-specific configuration (see Executors)
input any No Arbitrary JSON passed to the executor
depends_on string[] No IDs of tasks this task depends on
timeout_secs integer No Override queue default timeout (seconds)
retries integer No Override queue default max retries
backoff BackoffStrategy No Override queue default backoff strategy
condition string No Optional condition expression evaluated with Rhai. Task is skipped if the expression evaluates to false. See Conditions.
spawn_output string[] No For spawn executor only. Declares which generated task IDs are available as dependency targets for downstream tasks.

Minimal example — only the required fields:

{
  "id": "greet",
  "executor": "shell",
  "config": { "command": "echo hello" }
}

Dependencies

The depends_on field declares that a task must wait for other tasks to succeed before it can run. The engine uses these edges to build the DAG and determine execution order.

  • Tasks with no dependencies start immediately (in parallel if there are several).
  • A task moves from pending to ready only when all its dependencies reach succeeded.
  • If a dependency fails, all downstream tasks are cancelled.

Diamond DAG example

A common pattern where two branches fan out from one task and converge on another:

{
  "tasks": [
    {
      "id": "setup",
      "executor": "shell",
      "config": { "command": "echo 'preparing environment'" }
    },
    {
      "id": "build-api",
      "executor": "shell",
      "config": { "command": "make build-api" },
      "depends_on": ["setup"]
    },
    {
      "id": "build-ui",
      "executor": "shell",
      "config": { "command": "make build-ui" },
      "depends_on": ["setup"]
    },
    {
      "id": "deploy",
      "executor": "shell",
      "config": { "command": "make deploy" },
      "depends_on": ["build-api", "build-ui"]
    }
  ]
}
           ┌── build-api ──┐
  setup ──┤                ├──→ deploy
           └── build-ui ───┘

build-api and build-ui run in parallel after setup completes. deploy waits for both builds to succeed.

Dynamic Task Generation

The spawn executor enables dynamic workflows where a task discovers work at runtime and injects new tasks into the running flow. This is useful when the set of tasks to run is not known ahead of time — for example, processing a variable number of files, partitions, or targets.

A spawn task delegates to an inner executor (shell, http, container, etc.) and parses its text output as a JSON array of task definitions. These generated tasks are injected into the flow and wired into the DAG automatically.

spawn_output

The spawn_output field declares which generated task IDs are available as dependency targets for downstream static tasks. Generated task IDs are automatically prefixed with the generator's ID to prevent collisions.

{
  "tasks": [
    {
      "id": "discover",
      "executor": "spawn",
      "config": { "command": "./list-targets.sh" },
      "spawn_output": ["complete"]
    },
    {
      "id": "aggregate",
      "executor": "shell",
      "config": { "command": "./aggregate.sh" },
      "depends_on": ["discover/complete"]
    }
  ]
}

Here, discover runs ./list-targets.sh, which outputs a JSON array of tasks including one with ID "complete". The aggregate task depends on discover/complete — the prefixed form of that generated task. Generated root tasks (those with no internal dependencies) automatically depend on the generator task.

See the Spawn Executor documentation for full configuration details and behaviors.

Per-Task Overrides

By default, tasks inherit timeout, retry count, and backoff strategy from the queue they run on. You can override any of these per task.

{
  "id": "long-running-export",
  "executor": "shell",
  "config": { "command": "pg_dump -h db.example.com mydb" },
  "timeout_secs": 1800,
  "retries": 1,
  "backoff": { "fixed": { "delay_ms": 5000 } }
}

This task gets a 30-minute timeout (instead of the queue default of 300s), only 1 retry (instead of 3), and a fixed 5-second backoff between retries.

Available backoff strategies:

Strategy JSON Behavior
Fixed {"fixed": {"delay_ms": 2000}} Same delay every retry
Exponential {"exponential": {"initial_delay_ms": 1000}} Doubles each attempt: 1s, 2s, 4s, 8s...
Exponential + Jitter {"exponential_jitter": {"initial_delay_ms": 1000}} Exponential with 50-150% random jitter

Variable Substitution in Flows

Tasks can reference dependency outputs in their config fields using ${tasks.<id>.output...} syntax. The engine resolves these references just before dispatching each task.

{
  "tasks": [
    {
      "id": "fetch",
      "executor": "shell",
      "config": {
        "command": "curl -s https://api.example.com/status"
      }
    },
    {
      "id": "process",
      "executor": "shell",
      "config": {
        "command": "echo '${tasks.fetch.output.stdout}' | jq '.status'"
      },
      "depends_on": ["fetch"]
    },
    {
      "id": "notify",
      "executor": "http",
      "config": {
        "url": "https://hooks.slack.com/services/T00/B00/xxx",
        "method": "POST",
        "body": {
          "text": "Status check result: ${tasks.process.output.stdout}"
        }
      },
      "depends_on": ["process"]
    }
  ]
}

In this pipeline, fetch calls an API. process extracts a field from the response using ${tasks.fetch.output.stdout}. Finally, notify sends the result to Slack using ${tasks.process.output.stdout}.

Secrets substitution

In addition to task output references, executor configs can reference queue-level secrets using ${secrets.<name>} syntax. Secrets are defined on the queue configuration and resolved at dispatch time.

{
  "id": "call-api",
  "executor": "container",
  "config": {
    "image": "curlimages/curl",
    "command": ["curl", "-H", "Authorization: Bearer ${secrets.API_TOKEN}", "https://api.example.com"]
  }
}

This keeps sensitive values out of flow definitions. The secret value is loaded from the environment variable or file configured in the queue's secrets map.

See Concepts → Variable Substitution for the full syntax reference and type-preservation rules.

Conditions

The condition field lets you skip a task based on a runtime expression. When a task becomes ready, the engine evaluates its condition. If the result is false, the task is skipped instead of executed. If the expression is invalid, the task fails.

Expression language

Conditions are Rhai expressions. Task outputs and secrets are injected as native scope variables — you reference them directly (e.g., tasks.check.output.status) rather than using the ${...} interpolation syntax used in executor configs.

Conditions vs. executor config interpolation. Executor config fields (command strings, HTTP bodies, prompts) use ${tasks.<id>.output...} string interpolation. Conditions use native Rhai scope variables instead: tasks.check.output.exit_code == 0. The two syntaxes are not interchangeable.
// Always run
"condition": "true"

// Run only if a previous task exited successfully
"condition": "tasks.check.output.exit_code == 0"

// Run only if output contains a keyword
"condition": "tasks.detect.output.stdout.contains(\"changed\")"

// Combine multiple conditions
"condition": "(tasks.a.output.ok || tasks.b.output.ok) && tasks.c.output.ready"

// Hyphenated task IDs use bracket notation
"condition": "tasks[\"build-api\"].output.exit_code == 0"

// Check a secret is present
"condition": "secrets.API_KEY != \"\""

Syntax reference

Feature Syntax
Task output access tasks.<id>.output.<path>
Hyphenated task IDs tasks["my-task"].output.<path>
Secrets secrets.<name>
Comparison operators ==, !=, <, >, <=, >=
Boolean operators &&, ||, !
String methods .contains(), .starts_with(), .ends_with(), .len()

Behavior

  • A skipped task produces the output {"skipped": true, "condition": "..."}, where the condition string is the original expression.
  • Dependents of a skipped task proceed normally — skipping is not treated as a failure.
  • If the expression cannot be evaluated (syntax error, wrong type, etc.), the task fails.
  • A task with no condition field always runs when its dependencies are met.

Example: conditional build steps

{
  "tasks": [
    {
      "id": "detect",
      "executor": "shell",
      "config": { "command": "git diff --name-only HEAD~1" }
    },
    {
      "id": "build-frontend",
      "executor": "shell",
      "config": { "command": "npm run build" },
      "depends_on": ["detect"],
      "condition": "tasks.detect.output.stdout.contains(\"frontend/\")"
    },
    {
      "id": "build-backend",
      "executor": "shell",
      "config": { "command": "cargo build" },
      "depends_on": ["detect"],
      "condition": "tasks.detect.output.stdout.contains(\"backend/\")"
    }
  ]
}

The detect task lists changed files. build-frontend only runs if any file under frontend/ changed; build-backend only runs if backend/ changed. Unchanged components are skipped entirely.

Webhooks

Flows can optionally include a webhooks object to receive HTTP notifications when the flow reaches a terminal state. Tasked sends a best-effort POST request to the configured URLs.

Field Type Required Description
on_complete string No URL to POST when the flow succeeds
on_failure string No URL to POST when the flow fails

Example: flow with webhooks

{
  "tasks": [
    {
      "id": "build",
      "executor": "shell",
      "config": { "command": "make build" }
    }
  ],
  "webhooks": {
    "on_complete": "https://hooks.example.com/success",
    "on_failure": "https://hooks.example.com/failure"
  }
}

Webhook payload

The POST body is a JSON object with the following fields:

{
  "event": "flow_completed",
  "flow_id": "f_7k2m3n4p",
  "queue_id": "ci",
  "state": "succeeded",
  "task_count": 3,
  "tasks_succeeded": 3,
  "tasks_failed": 0
}

Webhooks are best-effort: if the target URL is unreachable or returns an error, the engine logs the failure but does not retry the webhook. The flow state is unaffected.

Artifacts

Artifacts are shared files that tasks within a flow can use to pass data to each other. Unlike variable substitution (which passes structured output), artifacts are for transferring actual files — build outputs, generated reports, data exports, or anything that doesn't fit in task output JSON.

Every running flow gets a temporary artifact directory. Tasks access it through two mechanisms:

Access Method Use When Description
$TASKED_ARTIFACTS Shell tasks Local directory path. Read and write files directly.
$TASKED_ARTIFACT_URL Container tasks HTTP API base URL. Upload and download via curl.

Shell tasks can read and write files directly to the $TASKED_ARTIFACTS directory, just like any local path. Container tasks use the HTTP API at $TASKED_ARTIFACT_URLPUT to upload and GET to download (see the API Reference for endpoint details).

Artifacts are automatically cleaned up when the flow completes. They exist only for the lifetime of the flow.

Example: build and deploy with artifacts

Task A compiles a binary and copies it to the artifacts directory. Task B retrieves it and deploys it to a server.

{
  "tasks": [
    {
      "id": "build",
      "executor": "shell",
      "config": { "command": "cargo build --release && cp target/release/myapp $TASKED_ARTIFACTS/myapp" }
    },
    {
      "id": "deploy",
      "executor": "shell",
      "config": { "command": "scp $TASKED_ARTIFACTS/myapp server:/opt/myapp" },
      "depends_on": ["build"]
    }
  ]
}

Container tasks with the HTTP API

Container tasks cannot access the local filesystem, so they use the HTTP API instead:

{
  "tasks": [
    {
      "id": "generate-report",
      "executor": "container",
      "config": {
        "image": "python:3.12-slim",
        "command": ["sh", "-c", "python3 report.py > /tmp/report.csv && curl -X PUT --data-binary @/tmp/report.csv $TASKED_ARTIFACT_URL/report.csv"]
      }
    },
    {
      "id": "upload-report",
      "executor": "container",
      "config": {
        "image": "curlimages/curl",
        "command": ["sh", "-c", "curl -s $TASKED_ARTIFACT_URL/report.csv | curl -X POST -d @- https://storage.example.com/reports/"]
      },
      "depends_on": ["generate-report"]
    }
  ]
}

Examples

Simple two-task pipeline

{
  "tasks": [
    {
      "id": "generate",
      "executor": "shell",
      "config": { "command": "echo '{\"count\": 42}'" }
    },
    {
      "id": "consume",
      "executor": "shell",
      "config": {
        "command": "echo 'Received count: ${tasks.generate.output.stdout}'"
      },
      "depends_on": ["generate"]
    }
  ]
}

Build, test, deploy with HTTP executor

{
  "tasks": [
    {
      "id": "build",
      "executor": "shell",
      "config": { "command": "cargo build --release" },
      "timeout_secs": 600
    },
    {
      "id": "test",
      "executor": "shell",
      "config": { "command": "cargo test" },
      "depends_on": ["build"],
      "timeout_secs": 300
    },
    {
      "id": "deploy",
      "executor": "http",
      "config": {
        "url": "https://deploy.example.com/api/releases",
        "method": "POST",
        "headers": {
          "Authorization": "Bearer deploy-token",
          "Content-Type": "application/json"
        },
        "body": { "version": "latest", "env": "production" }
      },
      "depends_on": ["test"],
      "retries": 2,
      "backoff": { "exponential": { "initial_delay_ms": 3000 } }
    }
  ]
}

Fan-out / fan-in pattern

Run multiple independent tasks in parallel, then join on a single summary task:

{
  "tasks": [
    {
      "id": "test-unit",
      "executor": "shell",
      "config": { "command": "make test-unit" }
    },
    {
      "id": "test-integration",
      "executor": "shell",
      "config": { "command": "make test-integration" }
    },
    {
      "id": "test-e2e",
      "executor": "shell",
      "config": { "command": "make test-e2e" }
    },
    {
      "id": "lint",
      "executor": "shell",
      "config": { "command": "make lint" }
    },
    {
      "id": "all-checks-passed",
      "executor": "noop",
      "depends_on": ["test-unit", "test-integration", "test-e2e", "lint"]
    },
    {
      "id": "deploy",
      "executor": "shell",
      "config": { "command": "make deploy" },
      "depends_on": ["all-checks-passed"]
    }
  ]
}
  test-unit ─────────┐
  test-integration ──┤
                     ├──→ all-checks-passed ──→ deploy
  test-e2e ──────────┤
  lint ──────────────┘

The four check tasks run in parallel with no dependencies between them. The noop task all-checks-passed acts as a synchronization barrier — it succeeds instantly once all its dependencies succeed. deploy only runs after every check has passed.

On this page