1. Why Orchestration Matters
Running a single AI agent is straightforward. You give it a task, it executes, you get a result. But real-world systems don't run on a single agent — they require dozens of specialized agents working in concert.
Consider a code review pipeline: a security scanner agent inspects for vulnerabilities, a style checker agent enforces conventions, a test coverage agent identifies untested paths, and a documentation agent verifies that public APIs are documented. Each agent is a specialist. None of them alone provides the full picture.
The question isn't whether you need multiple agents — it's how you coordinate them without building a tangled mess of point-to-point connections that collapses the moment you add agent number 15.
The ChaozCode platform runs 233 specialized agents in production. Without formalized orchestration patterns, coordination overhead would grow quadratically — O(n²) connections between n agents. Patterns reduce this to O(n).
Orchestration patterns give you three things: predictable execution order when you need it, parallelism when you can exploit it, and graceful degradation when individual agents fail.
2. Orchestration vs Choreography
The first architectural decision in any multi-agent system is whether to use centralized orchestration or decentralized choreography. Each has distinct tradeoffs.
Centralized Orchestration
A single orchestrator agent (or service) owns the workflow. It decides which agents to invoke, in what order, and how to handle results. Think of it as a conductor directing an orchestra.
class AgentOrchestrator:
"""Central coordinator for multi-agent workflows"""
def __init__(self, agent_registry: AgentRegistry):
self.registry = agent_registry
async def execute_code_review(self, pull_request: PullRequest) -> ReviewResult:
"""Orchestrate a multi-agent code review"""
# Step 1: Fan out to parallel agents
security_task = self.registry.get("security-scanner").scan(pull_request)
style_task = self.registry.get("style-checker").check(pull_request)
coverage_task = self.registry.get("coverage-analyzer").analyze(pull_request)
# Step 2: Await all results
security, style, coverage = await asyncio.gather(
security_task, style_task, coverage_task,
return_exceptions=True
)
# Step 3: Aggregate and decide
return self.aggregate_results(security, style, coverage)
Advantages: Full visibility into workflow state. Easy to add logging, retries, and timeouts in one place. Simple to reason about execution order.
Disadvantages: Single point of failure. The orchestrator becomes a bottleneck. Adding new agents requires modifying the orchestrator.
Decentralized Choreography
No central coordinator. Agents react to events and produce new events. The workflow emerges from the interaction of independent agents, like dancers responding to the music and each other.
# Each agent independently reacts to events
class SecurityAgent(EventDrivenAgent):
@on_event("pull_request.opened")
async def scan_pr(self, event):
result = await self.run_security_scan(event.payload)
await self.emit("security.scan.completed", result)
class StyleAgent(EventDrivenAgent):
@on_event("pull_request.opened")
async def check_style(self, event):
result = await self.run_style_check(event.payload)
await self.emit("style.check.completed", result)
class ReviewAggregator(EventDrivenAgent):
@on_event("security.scan.completed", "style.check.completed")
async def aggregate_when_ready(self, event):
if self.all_checks_complete(event.correlation_id):
await self.emit("review.completed", self.build_summary())
Advantages: No single point of failure. Agents are fully decoupled — add or remove agents without touching existing ones. Natural parallelism.
Disadvantages: Harder to debug (no single place to see the full workflow). Complex failure handling. Potential for event storms.
| Aspect | Orchestration | Choreography |
|---|---|---|
| Control flow | Explicit, centralized | Implicit, emergent |
| Coupling | Agents coupled to orchestrator | Agents coupled to event schema |
| Failure handling | Centralized try/catch | Distributed compensation |
| Observability | Single workflow view | Distributed tracing required |
| Scalability | Orchestrator bottleneck | Scales horizontally |
| Best for | Sequential pipelines | Reactive, event-heavy systems |
In practice, most production systems use a hybrid approach: orchestration for well-defined sequential workflows and choreography for reactive, event-driven coordination.
3. Pub/Sub Patterns for Agent Communication
Publish-subscribe is the backbone of agent choreography. An agent publishes an event to a topic; any number of agents can subscribe to that topic and react independently.
Topic-Based Routing
The simplest pattern. Agents subscribe to specific event topics and receive all messages on those topics.
class AgentMessageBroker:
"""Topic-based pub/sub for agent communication"""
def __init__(self):
self.subscriptions: Dict[str, List[Callable]] = defaultdict(list)
def subscribe(self, topic: str, handler: Callable):
self.subscriptions[topic].append(handler)
async def publish(self, topic: str, message: AgentMessage):
handlers = self.subscriptions.get(topic, [])
# Fan out to all subscribers in parallel
await asyncio.gather(
*[handler(message) for handler in handlers],
return_exceptions=True
)
# Usage
broker = AgentMessageBroker()
broker.subscribe("deployment.completed", monitoring_agent.handle)
broker.subscribe("deployment.completed", notification_agent.handle)
broker.subscribe("deployment.completed", audit_agent.handle)
Content-Based Routing
More sophisticated: messages are routed based on their content, not just their topic. This allows fine-grained filtering without creating thousands of topics.
class ContentRouter:
"""Route messages based on content predicates"""
def __init__(self):
self.rules: List[Tuple[Callable, Callable]] = []
def route(self, predicate: Callable, handler: Callable):
self.rules.append((predicate, handler))
async def dispatch(self, message: AgentMessage):
matched = [
handler(message)
for predicate, handler in self.rules
if predicate(message)
]
await asyncio.gather(*matched, return_exceptions=True)
# Route high-severity security events to the incident agent
router.route(
predicate=lambda m: m.topic == "security.alert" and m.payload["severity"] == "critical",
handler=incident_agent.escalate
)
When Agent A's output triggers Agent B, which triggers Agent C, which triggers Agent A again, you get an infinite loop. Always include a correlation_id and causation_id in events, and implement cycle detection at the broker level.
4. Shared Memory Coordination
Sometimes agents need more than messages — they need shared state. A code review workflow might need every agent to see the same pull request data, contribute findings to a shared report, and read each other's results before making final decisions.
This is where Memory Spine excels as a coordination layer. Instead of passing complete data through messages, agents read and write to a shared memory space.
class SharedMemoryCoordinator:
"""Coordinate agents through shared Memory Spine state"""
def __init__(self, memory_client: MemorySpineClient):
self.memory = memory_client
async def create_workspace(self, workflow_id: str, context: dict) -> str:
"""Create a shared workspace for a workflow"""
await self.memory.store(
content=json.dumps(context),
tags=["workspace", f"workflow:{workflow_id}"],
metadata={"workflow_id": workflow_id, "status": "active"}
)
return workflow_id
async def contribute_result(self, workflow_id: str, agent_id: str, result: dict):
"""Agent contributes its result to shared workspace"""
await self.memory.store(
content=json.dumps(result),
tags=["result", f"workflow:{workflow_id}", f"agent:{agent_id}"],
metadata={"workflow_id": workflow_id, "agent_id": agent_id}
)
async def get_all_results(self, workflow_id: str) -> List[dict]:
"""Retrieve all agent contributions for a workflow"""
results = await self.memory.search(
query=f"workflow:{workflow_id}",
tags=["result", f"workflow:{workflow_id}"]
)
return [json.loads(r.content) for r in results]
Shared memory coordination offers three key advantages over pure message passing:
- Late-binding decisions: An agent can wait until all peers have contributed before reading the combined state and making a decision.
- Idempotent writes: If an agent crashes and restarts, it can re-read shared state instead of replaying a message log.
- Cross-workflow context: Agents can reference results from previous workflows, enabling learning across runs.
5. Fan-Out/Fan-In: Parallel Agent Execution
Fan-out/fan-in is the workhorse pattern for parallel agent execution. A coordinator fans out a task to multiple agents, waits for all results, then fans in to aggregate.
class FanOutFanIn:
"""Parallel agent execution with result aggregation"""
def __init__(self, agents: List[Agent], timeout: float = 30.0):
self.agents = agents
self.timeout = timeout
async def execute(self, task: Task) -> AggregatedResult:
"""Fan out task to all agents, fan in results"""
# Fan-out: dispatch to all agents concurrently
tasks = [
asyncio.wait_for(agent.execute(task), timeout=self.timeout)
for agent in self.agents
]
# Collect results, capturing both successes and failures
results = await asyncio.gather(*tasks, return_exceptions=True)
# Fan-in: aggregate
successes = []
failures = []
for agent, result in zip(self.agents, results):
if isinstance(result, Exception):
failures.append(AgentFailure(agent.id, result))
else:
successes.append(AgentSuccess(agent.id, result))
return AggregatedResult(
successes=successes,
failures=failures,
quorum_met=len(successes) >= self.quorum_threshold
)
# Execute code review with all review agents in parallel
reviewer = FanOutFanIn(
agents=[security_agent, style_agent, coverage_agent, docs_agent],
timeout=60.0
)
result = await reviewer.execute(pull_request_task)
Quorum-Based Decisions
Not every agent needs to succeed for a workflow to complete. Quorum-based decisions let you proceed when enough agents have responded, even if some fail or time out.
class QuorumPolicy:
"""Decide when enough agents have responded"""
@staticmethod
def majority(total: int) -> int:
return (total // 2) + 1
@staticmethod
def all_minus_one(total: int) -> int:
return max(total - 1, 1)
@staticmethod
def weighted(weights: Dict[str, float], threshold: float = 0.7):
"""Weighted quorum — critical agents count more"""
def check(results: Dict[str, bool]) -> bool:
score = sum(
weights.get(agent_id, 1.0)
for agent_id, success in results.items()
if success
)
total = sum(weights.values())
return (score / total) >= threshold
return check
Fan-out/fan-in with a 3-of-4 quorum reduces our average code review latency from 45 seconds (sequential) to 14 seconds (parallel with early completion). The slowest agent no longer dictates total workflow time.
6. Error Propagation in Agent Swarms
In a single-agent system, error handling is straightforward: try/catch/retry. In a swarm of 20 agents, a failure in one agent can cascade through the entire system if you're not careful.
Circuit Breaker Pattern
Prevent cascading failures by wrapping agent calls in circuit breakers. If an agent fails repeatedly, the circuit opens and requests fail fast instead of waiting for timeouts.
class AgentCircuitBreaker:
"""Prevent cascading failures across agent swarms"""
def __init__(self, failure_threshold: int = 5, reset_timeout: float = 60.0):
self.failure_threshold = failure_threshold
self.reset_timeout = reset_timeout
self.failure_count = 0
self.state = "closed" # closed, open, half-open
self.last_failure_time = None
async def call(self, agent: Agent, task: Task):
if self.state == "open":
if time.time() - self.last_failure_time > self.reset_timeout:
self.state = "half-open"
else:
raise CircuitOpenError(f"Circuit open for agent {agent.id}")
try:
result = await agent.execute(task)
if self.state == "half-open":
self.state = "closed"
self.failure_count = 0
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "open"
raise
Compensating Actions
When a multi-step workflow fails midway, you need compensating actions to undo partial work. This is the Saga pattern applied to agent swarms.
class AgentSaga:
"""Multi-agent workflow with compensating actions"""
def __init__(self):
self.steps: List[SagaStep] = []
self.completed: List[SagaStep] = []
def add_step(self, execute: Callable, compensate: Callable):
self.steps.append(SagaStep(execute, compensate))
async def run(self):
for step in self.steps:
try:
await step.execute()
self.completed.append(step)
except Exception as e:
# Compensate in reverse order
for completed_step in reversed(self.completed):
await completed_step.compensate()
raise SagaFailedError(f"Saga failed at step: {e}")
# Define a deployment saga
saga = AgentSaga()
saga.add_step(
execute=lambda: deploy_agent.deploy(service),
compensate=lambda: deploy_agent.rollback(service)
)
saga.add_step(
execute=lambda: dns_agent.update_routing(service),
compensate=lambda: dns_agent.revert_routing(service)
)
await saga.run()
7. Putting It All Together
Production agent systems rarely use a single pattern. Here's how we combine them at ChaozCode:
- Orchestration for well-defined pipelines (CI/CD, deployment, onboarding flows)
- Choreography for reactive monitoring and incident response
- Pub/Sub for all inter-agent communication
- Shared Memory (via Memory Spine) for cross-agent context and state
- Fan-Out/Fan-In for parallel analysis tasks (code review, security scans)
- Circuit Breakers on every external agent call
- Sagas for multi-step workflows that need rollback
The key insight: patterns compose. An orchestrator can fan out to parallel agents via pub/sub, with each agent reading shared memory and protected by circuit breakers. You pick the right pattern for each layer of your system.
Start with orchestration for your first multi-agent workflow. It's simpler to debug and reason about. Migrate to choreography only when you hit the orchestrator bottleneck — and by then you'll have enough operational experience to handle the added complexity.
The patterns described here are battle-tested across 233 agents running in production at ChaozCode. They're not theoretical — they're the hard-won result of building and operating agent swarms at scale.
Ready to Orchestrate Agent Swarms?
ChaozCode provides built-in orchestration primitives, Memory Spine for shared state, and ML Router for intelligent agent selection. Start building multi-agent systems that scale.
Start Building →