Skip to content
Warlock.js v4

Run workflow

ai.workflow({...}) is the second rung of the ladder — a named, ordered set of steps with a stable signature. Each step is exactly one of: an agent call, a run function, or a parallel group. Compose another workflow in by wrapping it with workflow.asTool() and calling it from a run step. Workflows are durable, observable, cancellable, resumable.

import { ai } from "@warlock.js/ai";
import { MemoryCacheDriver } from "@warlock.js/cache";
import { v } from "@warlock.js/seal";
ai.config({ defaultStore: new MemoryCacheDriver() });
type CatalogInput = { url: string };
type CatalogOutput = { id: string };
type CatalogState = { html?: string; catalogId?: string };
const wf = ai.workflow<CatalogInput, CatalogOutput, CatalogState>({
name: "catalog-item",
output: {
extract: (ctx) => ({ id: ctx.state.catalogId ?? "" }),
schema: v.object({ id: v.string() }),
},
steps: [
ai.step<CatalogInput, CatalogState>({
name: "fetch",
run: async (ctx) => {
ctx.state.html = await fetch(ctx.input.url).then((r) => r.text());
},
}),
ai.step<CatalogInput, CatalogState>({
name: "extract",
agent: extractorAgent,
input: (ctx) => ({ prompt: `Extract from: ${ctx.state.html}` }),
output: {
extract: (ctx) => ctx.agentResult?.data,
schema: itemSchema,
},
retry: { attempts: 3, backoff: "exponential" },
}),
],
});
ai.workflow<TInput, TOutput, TState, TContext>(...)
ai.step<TInput, TState, TContext>(...)

Order: Input / Output describe the public contract first. State comes before Context because step bodies touch state far more often. Defaults (unknown, Record<string, unknown>) let partial typing work.

// canonical — mirrors agent.execute
const result = await wf.execute(
{ url: "https://..." },
{ runId: "catalog-123", signal: AbortSignal.timeout(60_000) },
);
// single-object — ergonomic alt
const result = await wf.execute({
input: { url: "https://..." },
runId: "catalog-123",
});

WorkflowRunOptions carries runId, signal, on, context, sessionId. workflow.version mirrors onto every produced report.

All failures funnel into result.error:

  • StepFailedError / STEP_FAILED
  • RoutingError / WORKFLOW_INVALID_GOTO
  • WorkflowDriftError / WORKFLOW_DRIFT
  • WorkflowCancelledError / WORKFLOW_CANCELLED
  • MaxStepsExceededError / WORKFLOW_MAX_STEPS

See Handle errors.

type WorkflowResult<TOutput> = {
type: "workflow";
data?: TOutput; // from workflow.output.extract
report: WorkflowReport; // runId, signature, status, timings, per-step snapshots
usage: Usage; // aggregated across all agent calls
error?: AIError;
};

report.steps[name] holds a frozen StepSnapshot per step — output, status, attempts, attemptHistory, timings, nested children for parallel groups.

skip? → before? → (run | agent | parallel) → output.extract (+ schema) → after? → nextStep?
PhasePurpose
skipReturn true to skip the step. Output becomes undefined.
beforePre-work — fetch, set state, validate.
runCore non-agent work.
agentAgent to execute. input(ctx) builds the prompt.
inputRequired when agent is set.
output{ extract, schema? } — extract the step’s output.
afterPost-work — save, notify.
nextStepStep-level routing on success.
onFailureStep-level recovery after retries exhaust.
onCancelCleanup if cancelled in flight.

Errors in before / run / agent / after / output are retryable. Errors in nextStep / onFailure terminate the workflow with RoutingError — those are programmer errors, not transient failures.

type WorkflowContext<TInput, TState, TContext> = {
readonly input: TInput; // frozen — durable cause
readonly context: TContext; // frozen — per-execution
readonly steps: Record<string, StepSnapshot>; // frozen snapshots of completed steps
state: TState; // mutable shared state
readonly agentResult?: AgentResult<unknown>; // set when current step has an agent
readonly runId: string;
readonly signal?: AbortSignal;
readonly startedAt: Date;
};

input, context, steps are deep-frozen. state is mutable during a step and frozen into steps[name].state on completion.

  • inputwhat to process. Persisted in the snapshot, replayed verbatim on resume().
  • contextwho’s running it. Tenancy, user, locale, traceId. Never persisted. Callers pass fresh on every execute() and resume().

Resume invariant. Persistence-scoping fields in context (e.g. organizationId) MUST match across resume calls. The framework can’t fingerprint context — mixing scopes silently corrupts data.

  • Small control data (flags, counters) → ctx.state. Cheap.
  • Large artifacts (HTML blobs, embedding vectors) → producer step’s output.extract, read via ctx.steps[prev].output.

ctx.state clones on every retry attempt. ctx.steps clones once on step commit.

ai.step({
name: "generate",
parallel: [
ai.step({ name: "draft", agent: writerAgent, input, output }),
ai.step({ name: "suggest-articles", agent: kbAgent, input, output }),
],
});
  • Children share ctx.state — last-write-wins on conflict.
  • Addressable by flat (ctx.steps.draft) AND nested (ctx.steps.generate.steps.draft) path.
  • Any child failing doesn’t cancel siblings; the parent records the first child’s error.
  • Checkpoint writes atomically after all children settle.
ai.step({
name: "qa",
agent: qaReviewerAgent,
input,
output,
nextStep: (ctx) => {
if (!ctx.agentResult?.data.approved) {
ctx.state.qaFeedback = ctx.agentResult?.data.feedback;
return { goto: "draft" };
}
},
onFailure: (ctx, error) => {
if (error.code === "PROVIDER_RATE_LIMIT") {
return { goto: "fallbackQa" };
}
},
});

Returns: { goto: "stepName" }, { end: true }, or void (fall through / halt).

Guards: maxSteps (default 100) hard-fails with MaxStepsExceededError. loopWarnAfter (default 5 revisits) emits workflow.loop.warning.

retry: {
attempts: 3,
backoff: "exponential", // "none" | "linear" | "exponential" | (attempt) => ms
retryOn: (error, attempt) => true,
onRetry: (attempt, error) => {},
}

Exponential defaults: 500ms → 1s → 2s → 4s → 8s, capped at 30s. AbortError short-circuits retry — cancellation is final.

const ctrl = new AbortController();
const result = wf.execute({ input, signal: ctrl.signal });
ctrl.abort("user cancelled");

Between-step cancellation is guaranteed. Mid-step is best-effort. status: "cancelled" on return with partial report.steps; the checkpoint writes before returning so resume works.

await wf.execute({ input, runId: "ticket-123" }); // fresh run
await wf.resume("ticket-123"); // after crash

See Persist AI data for snapshot stores, drift detection, and recovery patterns.

workflow.starting, workflow.step.{starting|streaming|completed|skipped|retrying|failed}, workflow.loop.warning, workflow.cancelled, workflow.completed, workflow.error.

Subscription order: definition → instance → per-call. All matching handlers fire.

Every payload carries runId and rootRunId.

  • Unknown shape at author time — wait for ai.planner() (v3) or model as a supervisor.
  • Quality loop until goal metai.supervisor() with evaluate.
  • Multi-turn conversation with persistent session — orchestrator (v2).
  • Iterate a runtime list of items — wrap a workflow with the ai.batch() utility.