Architecture · 9 min read

Multi-Agent Orchestration Patterns: Coordinating AI Agent Swarms

A single AI agent is powerful. A swarm of coordinated agents is transformative. But orchestrating dozens of specialized agents without descending into chaos requires battle-tested patterns — here are the ones that work in production.

🚀
Part of ChaozCode · Memory Spine is one of 8 apps in the ChaozCode DevOps AI Platform. 233 agents. 363+ tools. Start free

1. Why Orchestration Matters

Running a single AI agent is straightforward. You give it a task, it executes, you get a result. But real-world systems don't run on a single agent — they require dozens of specialized agents working in concert.

Consider a code review pipeline: a security scanner agent inspects for vulnerabilities, a style checker agent enforces conventions, a test coverage agent identifies untested paths, and a documentation agent verifies that public APIs are documented. Each agent is a specialist. None of them alone provides the full picture.

The question isn't whether you need multiple agents — it's how you coordinate them without building a tangled mess of point-to-point connections that collapses the moment you add agent number 15.

Scale in Practice

The ChaozCode platform runs 233 specialized agents in production. Without formalized orchestration patterns, coordination overhead would grow quadratically — O(n²) connections between n agents. Patterns reduce this to O(n).

Orchestration patterns give you three things: predictable execution order when you need it, parallelism when you can exploit it, and graceful degradation when individual agents fail.

2. Orchestration vs Choreography

The first architectural decision in any multi-agent system is whether to use centralized orchestration or decentralized choreography. Each has distinct tradeoffs.

Centralized Orchestration

A single orchestrator agent (or service) owns the workflow. It decides which agents to invoke, in what order, and how to handle results. Think of it as a conductor directing an orchestra.

class AgentOrchestrator:
    """Central coordinator for multi-agent workflows"""

    def __init__(self, agent_registry: AgentRegistry):
        self.registry = agent_registry

    async def execute_code_review(self, pull_request: PullRequest) -> ReviewResult:
        """Orchestrate a multi-agent code review"""

        # Step 1: Fan out to parallel agents
        security_task = self.registry.get("security-scanner").scan(pull_request)
        style_task = self.registry.get("style-checker").check(pull_request)
        coverage_task = self.registry.get("coverage-analyzer").analyze(pull_request)

        # Step 2: Await all results
        security, style, coverage = await asyncio.gather(
            security_task, style_task, coverage_task,
            return_exceptions=True
        )

        # Step 3: Aggregate and decide
        return self.aggregate_results(security, style, coverage)

Advantages: Full visibility into workflow state. Easy to add logging, retries, and timeouts in one place. Simple to reason about execution order.

Disadvantages: Single point of failure. The orchestrator becomes a bottleneck. Adding new agents requires modifying the orchestrator.

Decentralized Choreography

No central coordinator. Agents react to events and produce new events. The workflow emerges from the interaction of independent agents, like dancers responding to the music and each other.

# Each agent independently reacts to events
class SecurityAgent(EventDrivenAgent):
    @on_event("pull_request.opened")
    async def scan_pr(self, event):
        result = await self.run_security_scan(event.payload)
        await self.emit("security.scan.completed", result)

class StyleAgent(EventDrivenAgent):
    @on_event("pull_request.opened")
    async def check_style(self, event):
        result = await self.run_style_check(event.payload)
        await self.emit("style.check.completed", result)

class ReviewAggregator(EventDrivenAgent):
    @on_event("security.scan.completed", "style.check.completed")
    async def aggregate_when_ready(self, event):
        if self.all_checks_complete(event.correlation_id):
            await self.emit("review.completed", self.build_summary())

Advantages: No single point of failure. Agents are fully decoupled — add or remove agents without touching existing ones. Natural parallelism.

Disadvantages: Harder to debug (no single place to see the full workflow). Complex failure handling. Potential for event storms.

AspectOrchestrationChoreography
Control flowExplicit, centralizedImplicit, emergent
CouplingAgents coupled to orchestratorAgents coupled to event schema
Failure handlingCentralized try/catchDistributed compensation
ObservabilitySingle workflow viewDistributed tracing required
ScalabilityOrchestrator bottleneckScales horizontally
Best forSequential pipelinesReactive, event-heavy systems

In practice, most production systems use a hybrid approach: orchestration for well-defined sequential workflows and choreography for reactive, event-driven coordination.

3. Pub/Sub Patterns for Agent Communication

Publish-subscribe is the backbone of agent choreography. An agent publishes an event to a topic; any number of agents can subscribe to that topic and react independently.

Topic-Based Routing

The simplest pattern. Agents subscribe to specific event topics and receive all messages on those topics.

class AgentMessageBroker:
    """Topic-based pub/sub for agent communication"""

    def __init__(self):
        self.subscriptions: Dict[str, List[Callable]] = defaultdict(list)

    def subscribe(self, topic: str, handler: Callable):
        self.subscriptions[topic].append(handler)

    async def publish(self, topic: str, message: AgentMessage):
        handlers = self.subscriptions.get(topic, [])
        # Fan out to all subscribers in parallel
        await asyncio.gather(
            *[handler(message) for handler in handlers],
            return_exceptions=True
        )

# Usage
broker = AgentMessageBroker()
broker.subscribe("deployment.completed", monitoring_agent.handle)
broker.subscribe("deployment.completed", notification_agent.handle)
broker.subscribe("deployment.completed", audit_agent.handle)

Content-Based Routing

More sophisticated: messages are routed based on their content, not just their topic. This allows fine-grained filtering without creating thousands of topics.

class ContentRouter:
    """Route messages based on content predicates"""

    def __init__(self):
        self.rules: List[Tuple[Callable, Callable]] = []

    def route(self, predicate: Callable, handler: Callable):
        self.rules.append((predicate, handler))

    async def dispatch(self, message: AgentMessage):
        matched = [
            handler(message)
            for predicate, handler in self.rules
            if predicate(message)
        ]
        await asyncio.gather(*matched, return_exceptions=True)

# Route high-severity security events to the incident agent
router.route(
    predicate=lambda m: m.topic == "security.alert" and m.payload["severity"] == "critical",
    handler=incident_agent.escalate
)
Avoiding Event Storms

When Agent A's output triggers Agent B, which triggers Agent C, which triggers Agent A again, you get an infinite loop. Always include a correlation_id and causation_id in events, and implement cycle detection at the broker level.

4. Shared Memory Coordination

Sometimes agents need more than messages — they need shared state. A code review workflow might need every agent to see the same pull request data, contribute findings to a shared report, and read each other's results before making final decisions.

This is where Memory Spine excels as a coordination layer. Instead of passing complete data through messages, agents read and write to a shared memory space.

class SharedMemoryCoordinator:
    """Coordinate agents through shared Memory Spine state"""

    def __init__(self, memory_client: MemorySpineClient):
        self.memory = memory_client

    async def create_workspace(self, workflow_id: str, context: dict) -> str:
        """Create a shared workspace for a workflow"""
        await self.memory.store(
            content=json.dumps(context),
            tags=["workspace", f"workflow:{workflow_id}"],
            metadata={"workflow_id": workflow_id, "status": "active"}
        )
        return workflow_id

    async def contribute_result(self, workflow_id: str, agent_id: str, result: dict):
        """Agent contributes its result to shared workspace"""
        await self.memory.store(
            content=json.dumps(result),
            tags=["result", f"workflow:{workflow_id}", f"agent:{agent_id}"],
            metadata={"workflow_id": workflow_id, "agent_id": agent_id}
        )

    async def get_all_results(self, workflow_id: str) -> List[dict]:
        """Retrieve all agent contributions for a workflow"""
        results = await self.memory.search(
            query=f"workflow:{workflow_id}",
            tags=["result", f"workflow:{workflow_id}"]
        )
        return [json.loads(r.content) for r in results]

Shared memory coordination offers three key advantages over pure message passing:

5. Fan-Out/Fan-In: Parallel Agent Execution

Fan-out/fan-in is the workhorse pattern for parallel agent execution. A coordinator fans out a task to multiple agents, waits for all results, then fans in to aggregate.

class FanOutFanIn:
    """Parallel agent execution with result aggregation"""

    def __init__(self, agents: List[Agent], timeout: float = 30.0):
        self.agents = agents
        self.timeout = timeout

    async def execute(self, task: Task) -> AggregatedResult:
        """Fan out task to all agents, fan in results"""

        # Fan-out: dispatch to all agents concurrently
        tasks = [
            asyncio.wait_for(agent.execute(task), timeout=self.timeout)
            for agent in self.agents
        ]

        # Collect results, capturing both successes and failures
        results = await asyncio.gather(*tasks, return_exceptions=True)

        # Fan-in: aggregate
        successes = []
        failures = []
        for agent, result in zip(self.agents, results):
            if isinstance(result, Exception):
                failures.append(AgentFailure(agent.id, result))
            else:
                successes.append(AgentSuccess(agent.id, result))

        return AggregatedResult(
            successes=successes,
            failures=failures,
            quorum_met=len(successes) >= self.quorum_threshold
        )

# Execute code review with all review agents in parallel
reviewer = FanOutFanIn(
    agents=[security_agent, style_agent, coverage_agent, docs_agent],
    timeout=60.0
)
result = await reviewer.execute(pull_request_task)

Quorum-Based Decisions

Not every agent needs to succeed for a workflow to complete. Quorum-based decisions let you proceed when enough agents have responded, even if some fail or time out.

class QuorumPolicy:
    """Decide when enough agents have responded"""

    @staticmethod
    def majority(total: int) -> int:
        return (total // 2) + 1

    @staticmethod
    def all_minus_one(total: int) -> int:
        return max(total - 1, 1)

    @staticmethod
    def weighted(weights: Dict[str, float], threshold: float = 0.7):
        """Weighted quorum — critical agents count more"""
        def check(results: Dict[str, bool]) -> bool:
            score = sum(
                weights.get(agent_id, 1.0)
                for agent_id, success in results.items()
                if success
            )
            total = sum(weights.values())
            return (score / total) >= threshold
        return check
Performance Gain

Fan-out/fan-in with a 3-of-4 quorum reduces our average code review latency from 45 seconds (sequential) to 14 seconds (parallel with early completion). The slowest agent no longer dictates total workflow time.

6. Error Propagation in Agent Swarms

In a single-agent system, error handling is straightforward: try/catch/retry. In a swarm of 20 agents, a failure in one agent can cascade through the entire system if you're not careful.

Circuit Breaker Pattern

Prevent cascading failures by wrapping agent calls in circuit breakers. If an agent fails repeatedly, the circuit opens and requests fail fast instead of waiting for timeouts.

class AgentCircuitBreaker:
    """Prevent cascading failures across agent swarms"""

    def __init__(self, failure_threshold: int = 5, reset_timeout: float = 60.0):
        self.failure_threshold = failure_threshold
        self.reset_timeout = reset_timeout
        self.failure_count = 0
        self.state = "closed"  # closed, open, half-open
        self.last_failure_time = None

    async def call(self, agent: Agent, task: Task):
        if self.state == "open":
            if time.time() - self.last_failure_time > self.reset_timeout:
                self.state = "half-open"
            else:
                raise CircuitOpenError(f"Circuit open for agent {agent.id}")

        try:
            result = await agent.execute(task)
            if self.state == "half-open":
                self.state = "closed"
                self.failure_count = 0
            return result
        except Exception as e:
            self.failure_count += 1
            self.last_failure_time = time.time()
            if self.failure_count >= self.failure_threshold:
                self.state = "open"
            raise

Compensating Actions

When a multi-step workflow fails midway, you need compensating actions to undo partial work. This is the Saga pattern applied to agent swarms.

class AgentSaga:
    """Multi-agent workflow with compensating actions"""

    def __init__(self):
        self.steps: List[SagaStep] = []
        self.completed: List[SagaStep] = []

    def add_step(self, execute: Callable, compensate: Callable):
        self.steps.append(SagaStep(execute, compensate))

    async def run(self):
        for step in self.steps:
            try:
                await step.execute()
                self.completed.append(step)
            except Exception as e:
                # Compensate in reverse order
                for completed_step in reversed(self.completed):
                    await completed_step.compensate()
                raise SagaFailedError(f"Saga failed at step: {e}")

# Define a deployment saga
saga = AgentSaga()
saga.add_step(
    execute=lambda: deploy_agent.deploy(service),
    compensate=lambda: deploy_agent.rollback(service)
)
saga.add_step(
    execute=lambda: dns_agent.update_routing(service),
    compensate=lambda: dns_agent.revert_routing(service)
)
await saga.run()

7. Putting It All Together

Production agent systems rarely use a single pattern. Here's how we combine them at ChaozCode:

The key insight: patterns compose. An orchestrator can fan out to parallel agents via pub/sub, with each agent reading shared memory and protected by circuit breakers. You pick the right pattern for each layer of your system.

Start with orchestration for your first multi-agent workflow. It's simpler to debug and reason about. Migrate to choreography only when you hit the orchestrator bottleneck — and by then you'll have enough operational experience to handle the added complexity.

The patterns described here are battle-tested across 233 agents running in production at ChaozCode. They're not theoretical — they're the hard-won result of building and operating agent swarms at scale.

Ready to Orchestrate Agent Swarms?

ChaozCode provides built-in orchestration primitives, Memory Spine for shared state, and ML Router for intelligent agent selection. Start building multi-agent systems that scale.

Start Building →
Share this article:

🔧 Related ChaozCode Tools

AgentZ

Agent orchestration platform with built-in fan-out/fan-in, circuit breakers, and saga support

Memory Spine

Shared memory coordination layer for multi-agent workflows and cross-agent context

ML Router

Intelligent agent selection and routing with cost-aware load balancing

Explore all 8 ChaozCode apps >