Multi-Step Workflows
Patterns for chaining skills into complex, multi-step workflows with state management, branching logic, and recovery strategies.
Most real-world tasks need more than a single skill invocation. Deploying an application might mean running tests, building a container image, pushing to a registry, and updating a deployment manifest. Reviewing a pull request might require reading changed files, checking test coverage, running linters, and posting a summary. These are multi-step workflows: sequences of skill invocations that work together toward a bigger goal.
This article covers the core patterns for building reliable multi-step workflows, including how to chain skills, manage state between steps, handle branching logic, and recover when things go wrong.
Anatomy of a workflow
A workflow is a directed sequence of steps where each step invokes one or more skills and passes its results forward. The simplest version is a linear chain:
Step 1: Read file --> Step 2: Transform content --> Step 3: Write file
But real workflows branch, loop, and sometimes need to backtrack. Before getting into the advanced patterns, let’s cover the building blocks.
The workflow context object
Every multi-step workflow needs shared context, a place to accumulate results, track progress, and store decisions. Think of it as the workflow’s working memory.
interface WorkflowContext {
// Input parameters
input: Record<string, unknown>;
// Results from each completed step
stepResults: Map<string, StepResult>;
// Current workflow state
status: "running" | "paused" | "completed" | "failed";
// Metadata for recovery
currentStep: string;
startedAt: Date;
completedSteps: string[];
}
interface StepResult {
stepName: string;
output: unknown;
duration: number;
timestamp: Date;
}
In Python, the equivalent pattern uses a dataclass or typed dictionary:
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
@dataclass
class WorkflowContext:
input: dict[str, Any]
step_results: dict[str, Any] = field(default_factory=dict)
status: str = "running"
current_step: str = ""
completed_steps: list[str] = field(default_factory=list)
started_at: datetime = field(default_factory=datetime.now)
Chaining skills for complex tasks
The most common workflow pattern is a linear chain where each step depends on the output of the previous one. The key principle here: each step should produce a well-defined output that the next step can consume without ambiguity.
Pattern: sequential pipeline
async function deployWorkflow(ctx: WorkflowContext): Promise<WorkflowContext> {
// Step 1: Run tests
const testResult = await invoke("run_tests", {
path: ctx.input.projectPath,
});
ctx.stepResults.set("tests", testResult);
if (!testResult.success) {
ctx.status = "failed";
return ctx;
}
// Step 2: Build container image
const buildResult = await invoke("build_image", {
path: ctx.input.projectPath,
tag: ctx.input.version,
});
ctx.stepResults.set("build", buildResult);
// Step 3: Push to registry
const pushResult = await invoke("push_image", {
image: buildResult.imageName,
registry: ctx.input.registry,
});
ctx.stepResults.set("push", pushResult);
// Step 4: Update deployment
const deployResult = await invoke("update_deployment", {
manifest: ctx.input.manifestPath,
image: pushResult.fullImageRef,
});
ctx.stepResults.set("deploy", deployResult);
ctx.status = "completed";
return ctx;
}
Notice how each step extracts exactly what it needs from the previous step’s result. This is deliberate. Tight coupling between steps makes workflows brittle.
Pattern: accumulating pipeline
Sometimes each step adds to a growing result rather than transforming the previous output. This comes up often in research and analysis workflows.
async def code_review_workflow(ctx: WorkflowContext) -> WorkflowContext:
"""Review a pull request by accumulating findings from multiple analyses."""
findings = []
# Step 1: Check for style issues
lint_result = await invoke("run_linter", path=ctx.input["pr_path"])
findings.extend(lint_result.issues)
ctx.step_results["lint"] = lint_result
# Step 2: Check test coverage
coverage_result = await invoke("check_coverage", path=ctx.input["pr_path"])
if coverage_result.coverage_percent < 80:
findings.append({
"type": "coverage",
"message": f"Coverage is {coverage_result.coverage_percent}%, below 80% threshold"
})
ctx.step_results["coverage"] = coverage_result
# Step 3: Analyze complexity
complexity_result = await invoke("analyze_complexity", path=ctx.input["pr_path"])
findings.extend(complexity_result.warnings)
ctx.step_results["complexity"] = complexity_result
# Step 4: Generate summary from accumulated findings
summary = await invoke("generate_review_summary", findings=findings)
ctx.step_results["summary"] = summary
ctx.status = "completed"
return ctx
State management between steps
As workflows grow, managing state becomes the central challenge. Here are some proven strategies.
Keep state minimal and explicit
Only store what downstream steps actually need. A common mistake is dumping the entire result of every step into context, which bloats memory and makes it harder for the agent to figure out what matters.
| Approach | Pros | Cons |
|---|---|---|
| Full result storage | Nothing is lost | Context bloat, harder to reason about |
| Selective extraction | Clean, focused context | You need to know what downstream steps want |
| Summarized results | Compact, easy to scan | Loses detail you might need later |
The best approach in practice is selective extraction with fallback access:
// After running tests, extract only what downstream steps need
const testResult = await invoke("run_tests", { path: projectPath });
ctx.stepResults.set("tests", {
passed: testResult.passed,
failCount: testResult.failures.length,
// Keep full details available but separate from the main flow
_raw: testResult,
});
Checkpointing for long workflows
For workflows that take minutes or longer, save checkpoints so you can resume after an interruption. This matters especially for agent skills that might hit context window limits mid-workflow. See Context Management for more on working within those constraints.
async function checkpointedWorkflow(
ctx: WorkflowContext,
): Promise<WorkflowContext> {
const steps = [
{ name: "fetch_data", fn: fetchData },
{ name: "transform", fn: transformData },
{ name: "validate", fn: validateResults },
{ name: "publish", fn: publishResults },
];
for (const step of steps) {
// Skip already-completed steps (supports resume)
if (ctx.completedSteps.includes(step.name)) {
continue;
}
ctx.currentStep = step.name;
await saveCheckpoint(ctx);
const result = await step.fn(ctx);
ctx.stepResults.set(step.name, result);
ctx.completedSteps.push(step.name);
}
ctx.status = "completed";
await saveCheckpoint(ctx);
return ctx;
}
Branching and conditional logic
Real workflows rarely follow a single straight path. The agent needs to make decisions based on intermediate results.
Pattern: conditional branching
async def migration_workflow(ctx: WorkflowContext) -> WorkflowContext:
# Step 1: Analyze the current schema
analysis = await invoke("analyze_schema", db=ctx.input["database"])
ctx.step_results["analysis"] = analysis
# Branch based on migration complexity
if analysis.breaking_changes:
# Complex path: requires backup and staged rollout
await invoke("create_backup", db=ctx.input["database"])
await invoke("apply_migration_staged", migration=analysis.migration_plan)
await invoke("verify_migration", db=ctx.input["database"])
elif analysis.has_changes:
# Simple path: direct migration
await invoke("apply_migration", migration=analysis.migration_plan)
else:
# No changes needed
ctx.step_results["migration"] = {"status": "no_changes"}
ctx.status = "completed"
return ctx
ctx.status = "completed"
return ctx
Pattern: fan-out and fan-in
When multiple independent tasks can run in parallel, use fan-out/fan-in. Skill Composition covers this in more depth, but the basic pattern works for workflows too.
async function analyzeRepository(
ctx: WorkflowContext,
): Promise<WorkflowContext> {
// Fan-out: run independent analyses in parallel
const [dependencies, security, codeQuality, testCoverage] = await Promise.all(
[
invoke("analyze_dependencies", { path: ctx.input.repoPath }),
invoke("security_scan", { path: ctx.input.repoPath }),
invoke("code_quality_check", { path: ctx.input.repoPath }),
invoke("coverage_report", { path: ctx.input.repoPath }),
],
);
// Fan-in: combine results into a unified report
ctx.stepResults.set("analyses", {
dependencies,
security,
codeQuality,
testCoverage,
});
const report = await invoke("generate_report", {
analyses: ctx.stepResults.get("analyses"),
});
ctx.stepResults.set("report", report);
ctx.status = "completed";
return ctx;
}
Rollback and recovery strategies
Things will fail. Network calls time out. APIs return errors. Files get locked. A solid workflow needs a plan for when that happens. For the foundational error handling patterns behind these recovery strategies, see Error Handling Patterns.
Pattern: compensating actions
For each step that changes state (writes a file, updates a database, deploys a service), define a compensating action that undoes the change.
interface WorkflowStep {
name: string;
execute: (ctx: WorkflowContext) => Promise<StepResult>;
compensate?: (ctx: WorkflowContext) => Promise<void>;
}
async function executeWithRollback(
steps: WorkflowStep[],
ctx: WorkflowContext,
): Promise<WorkflowContext> {
const completedSteps: WorkflowStep[] = [];
for (const step of steps) {
try {
const result = await step.execute(ctx);
ctx.stepResults.set(step.name, result);
completedSteps.push(step);
} catch (error) {
// Rollback completed steps in reverse order
for (const completed of completedSteps.reverse()) {
if (completed.compensate) {
await completed.compensate(ctx);
}
}
ctx.status = "failed";
return ctx;
}
}
ctx.status = "completed";
return ctx;
}
Pattern: partial completion with resume
Sometimes full rollback isn’t what you want. You’ve done useful work and want to keep it. Instead, mark the workflow as partially complete and allow resumption.
async def resilient_workflow(ctx: WorkflowContext) -> WorkflowContext:
steps = [
("download", download_dataset),
("clean", clean_data),
("analyze", run_analysis),
("report", generate_report),
]
for step_name, step_fn in steps:
if step_name in ctx.completed_steps:
continue
try:
result = await step_fn(ctx)
ctx.step_results[step_name] = result
ctx.completed_steps.append(step_name)
except Exception as e:
ctx.status = "paused"
ctx.step_results[f"{step_name}_error"] = str(e)
# Return partial progress — can be resumed later
return ctx
ctx.status = "completed"
return ctx
Guidelines for reliable workflows
Building multi-step workflows that hold up in production takes some discipline. Here are the principles that matter most:
-
Make steps idempotent. If a step runs twice with the same input, it should produce the same result. This is what makes retry and resume logic work.
-
Validate between steps. Don’t assume the output of step N is valid input for step N+1. Add assertions or validation checks at step boundaries.
-
Keep steps focused. Each step should do one thing. If a step is doing three things, it should probably be three steps. This relates closely to the God Skill anti-pattern.
-
Log step transitions. Record when each step starts, completes, or fails. This is invaluable when debugging workflows that fail intermittently.
-
Set timeouts on every step. A single hung step can block an entire workflow. Always define a maximum duration and treat the timeout as an error.
-
Design for human oversight. Long or high-impact workflows should include approval gates at critical decision points, especially before irreversible actions like deployments or data deletions.
These patterns form the backbone of complex agent behavior. Start with simple linear chains, add branching as your workflows demand it, and always have a recovery strategy for when things go sideways.