Architecture · 13 min read

Event-Driven AI Agent Architecture

Request-response patterns break down when you have dozens of AI agents that need to coordinate. Here's how event-driven architecture creates truly reactive AI systems that scale — and why it's the only way to build agent orchestration that works in production.

🚀
Part of ChaozCode · Memory Spine is one of 8 apps in the ChaozCode DevOps AI Platform. 233 agents. 363+ tools. Start free

1. The Request-Response Wall

Three months ago, our multi-agent system hit a wall. We had 47 AI agents running in production — code reviewers, deployment bots, monitoring agents, security scanners. Each one specialized, each one powerful.

And each one completely isolated from the others.

The architecture was simple: agents received HTTP requests, processed them, returned responses. Clean. Predictable. Completely unable to coordinate at scale.

When a deployment failed, the deployment agent would detect it and... do nothing else. The monitoring agent might notice elevated error rates 5 minutes later. The security agent could spot suspicious traffic patterns. The code review agent might identify that the failed deployment contained a risky change.

But they couldn't talk to each other.

Request-response works beautifully for single agents. It breaks down catastrophically for agent systems.

The Coordination Problem

Request-response forces you to decide upfront which agent calls which other agent. But in complex systems, you can't predict all the coordination patterns. An event in one domain might trigger actions across completely unrelated domains.

Why Request-Response Fails for Agent Systems

1. Tight Coupling
Agent A needs to know about Agent B to call it directly. This creates brittle dependency chains that break when you add new agents or modify existing ones.

2. Synchronous Bottlenecks
If Agent A waits for Agent B to complete before continuing, the entire workflow stalls when Agent B is overloaded or fails.

3. No Broadcasting
When something important happens (deployment fails, security threat detected), there's no natural way to notify all interested agents simultaneously.

4. Coordination Complexity
Multi-step workflows require explicit orchestration logic. Someone has to write code that says "call Agent A, then Agent B, then Agent C if condition X is true." This doesn't scale.

Event-driven architecture solves all of these problems by decoupling agents through asynchronous events.

2. Event-Driven Agent Patterns

In event-driven architecture, agents don't call each other directly. Instead, they emit events when something happens and subscribe to events they care about.

Here's the mental model shift:

This changes everything. New agents can subscribe to existing events without modifying existing agents. Workflows emerge from simple event reactions instead of complex orchestration logic.

Core Event-Driven Patterns

1. Publisher-Subscriber
Agents publish events to topics. Other agents subscribe to topics they're interested in. Publishers don't know who's listening.

# Agent publishes event
await event_bus.publish("deployment.completed", {
    "service": "user-service",
    "version": "v2.1.4",
    "environment": "production",
    "timestamp": "2025-12-15T14:30:00Z"
})

# Multiple agents can subscribe
monitoring_agent.subscribe("deployment.completed", update_deployment_metrics)
notification_agent.subscribe("deployment.completed", send_slack_notification)
audit_agent.subscribe("deployment.completed", log_deployment_event)

2. Event Sourcing
Store the complete history of events that led to the current state. This provides a complete audit trail and enables time-travel debugging.

3. CQRS (Command Query Responsibility Segregation)
Separate read models from write models. Commands change state (and emit events). Queries read from optimized read models.

4. Saga Pattern
Coordinate long-running workflows across multiple agents using compensating actions if something fails.

Event-Driven Performance

After migrating to event-driven architecture, our agent coordination latency dropped from 2.3 seconds (sequential HTTP calls) to 320ms (parallel event processing). Agent development velocity increased 4x because new agents could leverage existing events.

3. Agent Event Types

Not all events are created equal. Here are the event categories that matter for AI agent systems:

Lifecycle Events

Track agent and task lifecycle states:

# Agent lifecycle
agent.started
agent.stopped  
agent.failed
agent.recovered

# Task lifecycle  
task.created
task.assigned
task.started
task.completed
task.failed
task.cancelled

Domain Events

Business events that agents care about:

# Code events
code.commit.pushed
code.pull_request.opened
code.build.completed
code.build.failed
code.deployment.started
code.deployment.completed

# Infrastructure events
service.health.degraded
service.scaled_up
service.scaled_down
alert.triggered
incident.created
incident.resolved

# Security events
vulnerability.detected
access.denied
anomaly.detected
threat.mitigated

Memory Events

Memory system events for agent coordination:

# Memory operations
memory.stored
memory.retrieved  
memory.updated
memory.deleted

# Knowledge events
knowledge.graph.updated
insight.discovered
pattern.detected
correlation.found

Coordination Events

Events for multi-agent workflows:

# Workflow coordination
workflow.started
workflow.step.completed
workflow.failed  
workflow.completed

# Resource coordination
resource.requested
resource.allocated
resource.released
resource.exhausted

4. Event Bus Architecture

The event bus is the nervous system of your agent system. At ChaozCode, we use Redis Streams for the event bus, but you could also use NATS, Apache Kafka, or AWS EventBridge.

Redis Streams for Agent Events

Redis Streams provides exactly what AI agents need: high throughput, message ordering, consumer groups, and built-in persistence.

import redis
import asyncio
import json
from typing import Dict, Callable

class AgentEventBus:
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis = redis.Redis.from_url(redis_url, decode_responses=True)
        self.subscribers = {}
        
    async def publish(self, event_type: str, payload: Dict) -> str:
        """Publish event to stream"""
        event_data = {
            "type": event_type,
            "payload": json.dumps(payload),
            "timestamp": time.time(),
            "agent_id": self.agent_id
        }
        
        # Add to stream with auto-generated ID
        message_id = self.redis.xadd(f"events:{event_type}", event_data)
        
        # Also add to global event stream for monitoring
        self.redis.xadd("events:all", event_data)
        
        return message_id
    
    async def subscribe(self, event_type: str, handler: Callable, consumer_group: str = None):
        """Subscribe to event stream"""
        stream_key = f"events:{event_type}"
        
        if consumer_group:
            # Create consumer group (idempotent)
            try:
                self.redis.xgroup_create(stream_key, consumer_group, id='0', mkstream=True)
            except redis.ResponseError as e:
                if "BUSYGROUP" not in str(e):
                    raise
        
        # Start consuming in background
        asyncio.create_task(self._consume_stream(stream_key, handler, consumer_group))
    
    async def _consume_stream(self, stream_key: str, handler: Callable, consumer_group: str = None):
        """Consume events from stream"""
        while True:
            try:
                if consumer_group:
                    # Consumer group mode - load balancing
                    messages = self.redis.xreadgroup(
                        consumer_group,
                        f"consumer-{self.agent_id}",
                        {stream_key: '>'},
                        count=10,
                        block=1000
                    )
                else:
                    # Simple mode - all messages  
                    messages = self.redis.xread({stream_key: '$'}, block=1000)
                
                for stream, msgs in messages:
                    for msg_id, fields in msgs:
                        await self._handle_message(msg_id, fields, handler, consumer_group, stream_key)
                        
            except Exception as e:
                print(f"Error consuming stream {stream_key}: {e}")
                await asyncio.sleep(1)
    
    async def _handle_message(self, msg_id: str, fields: Dict, handler: Callable, consumer_group: str, stream_key: str):
        """Handle individual message"""
        try:
            payload = json.loads(fields['payload'])
            await handler(fields['type'], payload)
            
            # ACK message if using consumer group
            if consumer_group:
                self.redis.xack(stream_key, consumer_group, msg_id)
                
        except Exception as e:
            print(f"Error handling message {msg_id}: {e}")
            # TODO: Send to dead letter queue

Event Schema and Versioning

Consistent event schemas are critical for agent interoperability:

from pydantic import BaseModel
from datetime import datetime
from typing import Optional, Dict, Any

class AgentEvent(BaseModel):
    """Base event schema"""
    id: str
    type: str
    version: str = "1.0"
    timestamp: datetime
    source_agent: str
    correlation_id: Optional[str] = None
    payload: Dict[str, Any]
    metadata: Optional[Dict[str, Any]] = None

# Domain-specific events inherit from base
class DeploymentCompleted(AgentEvent):
    type: str = "deployment.completed"
    
    class PayloadModel(BaseModel):
        service: str
        version: str
        environment: str
        duration_seconds: int
        status: str
        
    payload: PayloadModel

5. Event Sourcing for Agent Audit

Event sourcing stores every event that happened, creating an immutable audit trail. This is incredibly valuable for AI agents because it provides complete transparency into what agents did and why.

Event Store Implementation

class AgentEventStore:
    def __init__(self, db_connection):
        self.db = db_connection
        
    async def append_event(self, stream_id: str, event: AgentEvent) -> None:
        """Append event to stream"""
        await self.db.execute("""
            INSERT INTO events (
                stream_id, event_id, event_type, event_version,
                timestamp, source_agent, payload, metadata
            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
        """, 
        stream_id, event.id, event.type, event.version,
        event.timestamp, event.source_agent, 
        json.dumps(event.payload), json.dumps(event.metadata))
    
    async def read_stream(self, stream_id: str, from_version: int = 0) -> List[AgentEvent]:
        """Read events from stream"""
        rows = await self.db.fetch("""
            SELECT event_id, event_type, event_version, timestamp,
                   source_agent, payload, metadata
            FROM events 
            WHERE stream_id = $1 AND version > $2
            ORDER BY version ASC
        """, stream_id, from_version)
        
        return [AgentEvent.parse_obj(row) for row in rows]
    
    async def get_agent_timeline(self, agent_id: str) -> List[AgentEvent]:
        """Get all events for specific agent"""
        rows = await self.db.fetch("""
            SELECT * FROM events 
            WHERE source_agent = $1 
            ORDER BY timestamp ASC
        """, agent_id)
        
        return [AgentEvent.parse_obj(row) for row in rows]

Reconstructing Agent State

Event sourcing lets you reconstruct any agent's state at any point in time:

async def reconstruct_agent_state(agent_id: str, at_timestamp: datetime = None):
    """Rebuild agent state from events"""
    events = await event_store.get_agent_timeline(agent_id)
    
    if at_timestamp:
        events = [e for e in events if e.timestamp <= at_timestamp]
    
    # Start with empty state
    agent_state = AgentState()
    
    # Apply each event
    for event in events:
        agent_state = apply_event(agent_state, event)
    
    return agent_state

def apply_event(state: AgentState, event: AgentEvent) -> AgentState:
    """Apply event to state (pure function)"""
    if event.type == "task.assigned":
        state.current_tasks.append(event.payload["task_id"])
    elif event.type == "task.completed":
        state.current_tasks.remove(event.payload["task_id"])
        state.completed_tasks += 1
    elif event.type == "memory.stored":
        state.memory_count += 1
    
    return state

6. CQRS for Agent State Management

CQRS separates commands (write operations that change state) from queries (read operations). This is perfect for AI agents because:

Command Side: Agent Actions

class AgentCommandHandler:
    def __init__(self, event_store: AgentEventStore, event_bus: AgentEventBus):
        self.event_store = event_store
        self.event_bus = event_bus
    
    async def assign_task(self, agent_id: str, task: Task) -> None:
        """Command: Assign task to agent"""
        # Validate command
        if not task.is_valid():
            raise InvalidTaskError("Task validation failed")
        
        # Create event
        event = AgentEvent(
            id=generate_uuid(),
            type="task.assigned",
            timestamp=datetime.utcnow(),
            source_agent="task-manager",
            payload={
                "agent_id": agent_id,
                "task_id": task.id,
                "task_type": task.type,
                "priority": task.priority
            }
        )
        
        # Store event
        await self.event_store.append_event(f"agent:{agent_id}", event)
        
        # Publish to event bus
        await self.event_bus.publish(event.type, event.payload)
    
    async def complete_task(self, agent_id: str, task_id: str, result: Dict) -> None:
        """Command: Mark task as completed"""
        event = AgentEvent(
            id=generate_uuid(),
            type="task.completed",
            timestamp=datetime.utcnow(),
            source_agent=agent_id,
            payload={
                "task_id": task_id,
                "result": result,
                "duration_seconds": result.get("duration", 0)
            }
        )
        
        await self.event_store.append_event(f"agent:{agent_id}", event)
        await self.event_bus.publish(event.type, event.payload)

Query Side: Read Models

class AgentReadModel:
    """Optimized read model for agent queries"""
    
    def __init__(self, db_connection):
        self.db = db_connection
    
    async def get_agent_status(self, agent_id: str) -> AgentStatus:
        """Get current agent status"""
        row = await self.db.fetchrow("""
            SELECT agent_id, status, current_task_count, 
                   completed_tasks, failed_tasks, last_active
            FROM agent_status 
            WHERE agent_id = $1
        """, agent_id)
        
        return AgentStatus.parse_obj(row) if row else None
    
    async def get_active_tasks(self, agent_id: str) -> List[TaskSummary]:
        """Get agent's current tasks"""
        rows = await self.db.fetch("""
            SELECT task_id, task_type, assigned_at, priority
            FROM active_tasks 
            WHERE agent_id = $1
            ORDER BY priority DESC, assigned_at ASC
        """, agent_id)
        
        return [TaskSummary.parse_obj(row) for row in rows]
    
    async def get_system_health(self) -> SystemHealth:
        """Get overall system health metrics"""
        stats = await self.db.fetchrow("""
            SELECT 
                COUNT(*) FILTER (WHERE status = 'active') as active_agents,
                COUNT(*) FILTER (WHERE status = 'idle') as idle_agents,
                COUNT(*) FILTER (WHERE status = 'failed') as failed_agents,
                AVG(current_task_count) as avg_task_load
            FROM agent_status
        """)
        
        return SystemHealth.parse_obj(stats)

Event Handlers: Update Read Models

class AgentEventHandler:
    """Update read models when events occur"""
    
    def __init__(self, db_connection):
        self.db = db_connection
    
    async def handle_task_assigned(self, event_type: str, payload: Dict):
        """Update read model when task is assigned"""
        agent_id = payload["agent_id"]
        task_id = payload["task_id"]
        
        # Update agent status
        await self.db.execute("""
            UPDATE agent_status 
            SET current_task_count = current_task_count + 1,
                last_active = NOW()
            WHERE agent_id = $1
        """, agent_id)
        
        # Add to active tasks
        await self.db.execute("""
            INSERT INTO active_tasks (agent_id, task_id, task_type, assigned_at, priority)
            VALUES ($1, $2, $3, NOW(), $4)
        """, agent_id, task_id, payload["task_type"], payload["priority"])
    
    async def handle_task_completed(self, event_type: str, payload: Dict):
        """Update read model when task completes"""
        task_id = payload["task_id"]
        
        # Remove from active tasks  
        await self.db.execute("DELETE FROM active_tasks WHERE task_id = $1", task_id)
        
        # Update agent stats
        await self.db.execute("""
            UPDATE agent_status 
            SET current_task_count = current_task_count - 1,
                completed_tasks = completed_tasks + 1,
                last_active = NOW()
            WHERE agent_id = (SELECT agent_id FROM active_tasks WHERE task_id = $1)
        """, task_id)

7. Implementation Guide

Here's how to implement event-driven architecture for your AI agents:

Step 1: Set Up Event Infrastructure

# docker-compose.yml
version: '3.8'
services:
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    command: redis-server --appendonly yes
    
  postgres:
    image: postgres:15
    environment:
      POSTGRES_DB: agent_events
      POSTGRES_USER: agent_user
      POSTGRES_PASSWORD: agent_pass
    ports:
      - "5432:5432"
    volumes:
      - ./init.sql:/docker-entrypoint-initdb.d/init.sql

# init.sql
CREATE TABLE events (
    id SERIAL PRIMARY KEY,
    stream_id VARCHAR(255) NOT NULL,
    event_id UUID NOT NULL UNIQUE,
    event_type VARCHAR(100) NOT NULL,
    event_version VARCHAR(10) NOT NULL,
    timestamp TIMESTAMPTZ NOT NULL,
    source_agent VARCHAR(100) NOT NULL,
    payload JSONB NOT NULL,
    metadata JSONB,
    created_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_events_stream_id ON events(stream_id);
CREATE INDEX idx_events_type ON events(event_type);
CREATE INDEX idx_events_timestamp ON events(timestamp);

Step 2: Create Event-Driven Agent Base Class

class EventDrivenAgent:
    """Base class for event-driven AI agents"""
    
    def __init__(self, agent_id: str, event_bus: AgentEventBus):
        self.agent_id = agent_id
        self.event_bus = event_bus
        self.event_handlers = {}
        
    async def start(self):
        """Start agent and subscribe to events"""
        # Emit agent started event
        await self.emit_event("agent.started", {
            "agent_id": self.agent_id,
            "capabilities": self.get_capabilities()
        })
        
        # Subscribe to events we handle
        for event_type, handler in self.event_handlers.items():
            await self.event_bus.subscribe(
                event_type, 
                handler, 
                consumer_group=f"agent-{self.agent_id}"
            )
    
    async def emit_event(self, event_type: str, payload: Dict):
        """Emit event to the system"""
        event = AgentEvent(
            id=generate_uuid(),
            type=event_type,
            timestamp=datetime.utcnow(),
            source_agent=self.agent_id,
            payload=payload
        )
        
        await self.event_bus.publish(event_type, event.dict())
    
    def handles_event(self, event_type: str):
        """Decorator to register event handlers"""
        def decorator(func):
            self.event_handlers[event_type] = func
            return func
        return decorator
    
    def get_capabilities(self) -> List[str]:
        """Return list of capabilities this agent provides"""
        raise NotImplementedError

Step 3: Implement Specific Agents

class CodeReviewAgent(EventDrivenAgent):
    """Agent that reviews code changes"""
    
    def __init__(self, agent_id: str, event_bus: AgentEventBus):
        super().__init__(agent_id, event_bus)
        
    def get_capabilities(self) -> List[str]:
        return ["code_review", "security_scan", "best_practices_check"]
    
    @handles_event("code.pull_request.opened")
    async def review_pull_request(self, event_type: str, payload: Dict):
        """Review new pull requests"""
        pr_id = payload["pr_id"]
        repository = payload["repository"]
        
        # Perform code review (simplified)
        review_result = await self.analyze_code_changes(pr_id, repository)
        
        # Emit review completed event
        await self.emit_event("code.review.completed", {
            "pr_id": pr_id,
            "repository": repository,
            "score": review_result.score,
            "issues": review_result.issues,
            "recommendation": review_result.recommendation
        })

class DeploymentAgent(EventDrivenAgent):
    """Agent that handles deployments"""
    
    def get_capabilities(self) -> List[str]:
        return ["deployment", "rollback", "health_check"]
    
    @handles_event("code.review.completed")
    async def consider_deployment(self, event_type: str, payload: Dict):
        """Consider deployment after code review"""
        if payload["recommendation"] == "APPROVE":
            await self.emit_event("deployment.triggered", {
                "pr_id": payload["pr_id"],
                "repository": payload["repository"],
                "trigger_reason": "code_review_approved"
            })
    
    @handles_event("deployment.triggered")  
    async def deploy_service(self, event_type: str, payload: Dict):
        """Execute deployment"""
        try:
            result = await self.perform_deployment(payload["repository"])
            
            await self.emit_event("deployment.completed", {
                "repository": payload["repository"],
                "status": "success",
                "duration_seconds": result.duration
            })
            
        except DeploymentError as e:
            await self.emit_event("deployment.failed", {
                "repository": payload["repository"],
                "error": str(e),
                "rollback_required": True
            })

Ready to Build Event-Driven Agents?

The ChaozCode platform includes built-in event infrastructure with Redis Streams, event sourcing, and CQRS patterns. Start building reactive agent systems that scale.

Start Building →
Share this article:

🔧 Related ChaozCode Tools

AgentZ

Agent orchestration platform with built-in event-driven architecture and Redis Streams

Memory Spine

Event-driven memory system for agent coordination and shared context

ML Router

Event-driven model routing with feedback loops and cost optimization

Explore all 8 ChaozCode apps >