1. The Request-Response Wall
Three months ago, our multi-agent system hit a wall. We had 47 AI agents running in production — code reviewers, deployment bots, monitoring agents, security scanners. Each one specialized, each one powerful.
And each one completely isolated from the others.
The architecture was simple: agents received HTTP requests, processed them, returned responses. Clean. Predictable. Completely unable to coordinate at scale.
When a deployment failed, the deployment agent would detect it and... do nothing else. The monitoring agent might notice elevated error rates 5 minutes later. The security agent could spot suspicious traffic patterns. The code review agent might identify that the failed deployment contained a risky change.
But they couldn't talk to each other.
Request-response works beautifully for single agents. It breaks down catastrophically for agent systems.
Request-response forces you to decide upfront which agent calls which other agent. But in complex systems, you can't predict all the coordination patterns. An event in one domain might trigger actions across completely unrelated domains.
Why Request-Response Fails for Agent Systems
1. Tight Coupling
Agent A needs to know about Agent B to call it directly. This creates brittle dependency chains that break when you add new agents or modify existing ones.
2. Synchronous Bottlenecks
If Agent A waits for Agent B to complete before continuing, the entire workflow stalls when Agent B is overloaded or fails.
3. No Broadcasting
When something important happens (deployment fails, security threat detected), there's no natural way to notify all interested agents simultaneously.
4. Coordination Complexity
Multi-step workflows require explicit orchestration logic. Someone has to write code that says "call Agent A, then Agent B, then Agent C if condition X is true." This doesn't scale.
Event-driven architecture solves all of these problems by decoupling agents through asynchronous events.
2. Event-Driven Agent Patterns
In event-driven architecture, agents don't call each other directly. Instead, they emit events when something happens and subscribe to events they care about.
Here's the mental model shift:
- Traditional: "When deployment completes, call the notification agent and the monitoring agent."
- Event-driven: "When deployment completes, emit a
deployment.completedevent. Any agent that cares about deployments can react."
This changes everything. New agents can subscribe to existing events without modifying existing agents. Workflows emerge from simple event reactions instead of complex orchestration logic.
Core Event-Driven Patterns
1. Publisher-Subscriber
Agents publish events to topics. Other agents subscribe to topics they're interested in. Publishers don't know who's listening.
# Agent publishes event
await event_bus.publish("deployment.completed", {
"service": "user-service",
"version": "v2.1.4",
"environment": "production",
"timestamp": "2025-12-15T14:30:00Z"
})
# Multiple agents can subscribe
monitoring_agent.subscribe("deployment.completed", update_deployment_metrics)
notification_agent.subscribe("deployment.completed", send_slack_notification)
audit_agent.subscribe("deployment.completed", log_deployment_event)
2. Event Sourcing
Store the complete history of events that led to the current state. This provides a complete audit trail and enables time-travel debugging.
3. CQRS (Command Query Responsibility Segregation)
Separate read models from write models. Commands change state (and emit events). Queries read from optimized read models.
4. Saga Pattern
Coordinate long-running workflows across multiple agents using compensating actions if something fails.
After migrating to event-driven architecture, our agent coordination latency dropped from 2.3 seconds (sequential HTTP calls) to 320ms (parallel event processing). Agent development velocity increased 4x because new agents could leverage existing events.
3. Agent Event Types
Not all events are created equal. Here are the event categories that matter for AI agent systems:
Lifecycle Events
Track agent and task lifecycle states:
# Agent lifecycle
agent.started
agent.stopped
agent.failed
agent.recovered
# Task lifecycle
task.created
task.assigned
task.started
task.completed
task.failed
task.cancelled
Domain Events
Business events that agents care about:
# Code events
code.commit.pushed
code.pull_request.opened
code.build.completed
code.build.failed
code.deployment.started
code.deployment.completed
# Infrastructure events
service.health.degraded
service.scaled_up
service.scaled_down
alert.triggered
incident.created
incident.resolved
# Security events
vulnerability.detected
access.denied
anomaly.detected
threat.mitigated
Memory Events
Memory system events for agent coordination:
# Memory operations
memory.stored
memory.retrieved
memory.updated
memory.deleted
# Knowledge events
knowledge.graph.updated
insight.discovered
pattern.detected
correlation.found
Coordination Events
Events for multi-agent workflows:
# Workflow coordination
workflow.started
workflow.step.completed
workflow.failed
workflow.completed
# Resource coordination
resource.requested
resource.allocated
resource.released
resource.exhausted
4. Event Bus Architecture
The event bus is the nervous system of your agent system. At ChaozCode, we use Redis Streams for the event bus, but you could also use NATS, Apache Kafka, or AWS EventBridge.
Redis Streams for Agent Events
Redis Streams provides exactly what AI agents need: high throughput, message ordering, consumer groups, and built-in persistence.
import redis
import asyncio
import json
from typing import Dict, Callable
class AgentEventBus:
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis = redis.Redis.from_url(redis_url, decode_responses=True)
self.subscribers = {}
async def publish(self, event_type: str, payload: Dict) -> str:
"""Publish event to stream"""
event_data = {
"type": event_type,
"payload": json.dumps(payload),
"timestamp": time.time(),
"agent_id": self.agent_id
}
# Add to stream with auto-generated ID
message_id = self.redis.xadd(f"events:{event_type}", event_data)
# Also add to global event stream for monitoring
self.redis.xadd("events:all", event_data)
return message_id
async def subscribe(self, event_type: str, handler: Callable, consumer_group: str = None):
"""Subscribe to event stream"""
stream_key = f"events:{event_type}"
if consumer_group:
# Create consumer group (idempotent)
try:
self.redis.xgroup_create(stream_key, consumer_group, id='0', mkstream=True)
except redis.ResponseError as e:
if "BUSYGROUP" not in str(e):
raise
# Start consuming in background
asyncio.create_task(self._consume_stream(stream_key, handler, consumer_group))
async def _consume_stream(self, stream_key: str, handler: Callable, consumer_group: str = None):
"""Consume events from stream"""
while True:
try:
if consumer_group:
# Consumer group mode - load balancing
messages = self.redis.xreadgroup(
consumer_group,
f"consumer-{self.agent_id}",
{stream_key: '>'},
count=10,
block=1000
)
else:
# Simple mode - all messages
messages = self.redis.xread({stream_key: '$'}, block=1000)
for stream, msgs in messages:
for msg_id, fields in msgs:
await self._handle_message(msg_id, fields, handler, consumer_group, stream_key)
except Exception as e:
print(f"Error consuming stream {stream_key}: {e}")
await asyncio.sleep(1)
async def _handle_message(self, msg_id: str, fields: Dict, handler: Callable, consumer_group: str, stream_key: str):
"""Handle individual message"""
try:
payload = json.loads(fields['payload'])
await handler(fields['type'], payload)
# ACK message if using consumer group
if consumer_group:
self.redis.xack(stream_key, consumer_group, msg_id)
except Exception as e:
print(f"Error handling message {msg_id}: {e}")
# TODO: Send to dead letter queue
Event Schema and Versioning
Consistent event schemas are critical for agent interoperability:
from pydantic import BaseModel
from datetime import datetime
from typing import Optional, Dict, Any
class AgentEvent(BaseModel):
"""Base event schema"""
id: str
type: str
version: str = "1.0"
timestamp: datetime
source_agent: str
correlation_id: Optional[str] = None
payload: Dict[str, Any]
metadata: Optional[Dict[str, Any]] = None
# Domain-specific events inherit from base
class DeploymentCompleted(AgentEvent):
type: str = "deployment.completed"
class PayloadModel(BaseModel):
service: str
version: str
environment: str
duration_seconds: int
status: str
payload: PayloadModel
5. Event Sourcing for Agent Audit
Event sourcing stores every event that happened, creating an immutable audit trail. This is incredibly valuable for AI agents because it provides complete transparency into what agents did and why.
Event Store Implementation
class AgentEventStore:
def __init__(self, db_connection):
self.db = db_connection
async def append_event(self, stream_id: str, event: AgentEvent) -> None:
"""Append event to stream"""
await self.db.execute("""
INSERT INTO events (
stream_id, event_id, event_type, event_version,
timestamp, source_agent, payload, metadata
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
""",
stream_id, event.id, event.type, event.version,
event.timestamp, event.source_agent,
json.dumps(event.payload), json.dumps(event.metadata))
async def read_stream(self, stream_id: str, from_version: int = 0) -> List[AgentEvent]:
"""Read events from stream"""
rows = await self.db.fetch("""
SELECT event_id, event_type, event_version, timestamp,
source_agent, payload, metadata
FROM events
WHERE stream_id = $1 AND version > $2
ORDER BY version ASC
""", stream_id, from_version)
return [AgentEvent.parse_obj(row) for row in rows]
async def get_agent_timeline(self, agent_id: str) -> List[AgentEvent]:
"""Get all events for specific agent"""
rows = await self.db.fetch("""
SELECT * FROM events
WHERE source_agent = $1
ORDER BY timestamp ASC
""", agent_id)
return [AgentEvent.parse_obj(row) for row in rows]
Reconstructing Agent State
Event sourcing lets you reconstruct any agent's state at any point in time:
async def reconstruct_agent_state(agent_id: str, at_timestamp: datetime = None):
"""Rebuild agent state from events"""
events = await event_store.get_agent_timeline(agent_id)
if at_timestamp:
events = [e for e in events if e.timestamp <= at_timestamp]
# Start with empty state
agent_state = AgentState()
# Apply each event
for event in events:
agent_state = apply_event(agent_state, event)
return agent_state
def apply_event(state: AgentState, event: AgentEvent) -> AgentState:
"""Apply event to state (pure function)"""
if event.type == "task.assigned":
state.current_tasks.append(event.payload["task_id"])
elif event.type == "task.completed":
state.current_tasks.remove(event.payload["task_id"])
state.completed_tasks += 1
elif event.type == "memory.stored":
state.memory_count += 1
return state
6. CQRS for Agent State Management
CQRS separates commands (write operations that change state) from queries (read operations). This is perfect for AI agents because:
- Commands are agent actions that emit events
- Queries are read-optimized views for fast lookups
- Event handlers update read models when events occur
Command Side: Agent Actions
class AgentCommandHandler:
def __init__(self, event_store: AgentEventStore, event_bus: AgentEventBus):
self.event_store = event_store
self.event_bus = event_bus
async def assign_task(self, agent_id: str, task: Task) -> None:
"""Command: Assign task to agent"""
# Validate command
if not task.is_valid():
raise InvalidTaskError("Task validation failed")
# Create event
event = AgentEvent(
id=generate_uuid(),
type="task.assigned",
timestamp=datetime.utcnow(),
source_agent="task-manager",
payload={
"agent_id": agent_id,
"task_id": task.id,
"task_type": task.type,
"priority": task.priority
}
)
# Store event
await self.event_store.append_event(f"agent:{agent_id}", event)
# Publish to event bus
await self.event_bus.publish(event.type, event.payload)
async def complete_task(self, agent_id: str, task_id: str, result: Dict) -> None:
"""Command: Mark task as completed"""
event = AgentEvent(
id=generate_uuid(),
type="task.completed",
timestamp=datetime.utcnow(),
source_agent=agent_id,
payload={
"task_id": task_id,
"result": result,
"duration_seconds": result.get("duration", 0)
}
)
await self.event_store.append_event(f"agent:{agent_id}", event)
await self.event_bus.publish(event.type, event.payload)
Query Side: Read Models
class AgentReadModel:
"""Optimized read model for agent queries"""
def __init__(self, db_connection):
self.db = db_connection
async def get_agent_status(self, agent_id: str) -> AgentStatus:
"""Get current agent status"""
row = await self.db.fetchrow("""
SELECT agent_id, status, current_task_count,
completed_tasks, failed_tasks, last_active
FROM agent_status
WHERE agent_id = $1
""", agent_id)
return AgentStatus.parse_obj(row) if row else None
async def get_active_tasks(self, agent_id: str) -> List[TaskSummary]:
"""Get agent's current tasks"""
rows = await self.db.fetch("""
SELECT task_id, task_type, assigned_at, priority
FROM active_tasks
WHERE agent_id = $1
ORDER BY priority DESC, assigned_at ASC
""", agent_id)
return [TaskSummary.parse_obj(row) for row in rows]
async def get_system_health(self) -> SystemHealth:
"""Get overall system health metrics"""
stats = await self.db.fetchrow("""
SELECT
COUNT(*) FILTER (WHERE status = 'active') as active_agents,
COUNT(*) FILTER (WHERE status = 'idle') as idle_agents,
COUNT(*) FILTER (WHERE status = 'failed') as failed_agents,
AVG(current_task_count) as avg_task_load
FROM agent_status
""")
return SystemHealth.parse_obj(stats)
Event Handlers: Update Read Models
class AgentEventHandler:
"""Update read models when events occur"""
def __init__(self, db_connection):
self.db = db_connection
async def handle_task_assigned(self, event_type: str, payload: Dict):
"""Update read model when task is assigned"""
agent_id = payload["agent_id"]
task_id = payload["task_id"]
# Update agent status
await self.db.execute("""
UPDATE agent_status
SET current_task_count = current_task_count + 1,
last_active = NOW()
WHERE agent_id = $1
""", agent_id)
# Add to active tasks
await self.db.execute("""
INSERT INTO active_tasks (agent_id, task_id, task_type, assigned_at, priority)
VALUES ($1, $2, $3, NOW(), $4)
""", agent_id, task_id, payload["task_type"], payload["priority"])
async def handle_task_completed(self, event_type: str, payload: Dict):
"""Update read model when task completes"""
task_id = payload["task_id"]
# Remove from active tasks
await self.db.execute("DELETE FROM active_tasks WHERE task_id = $1", task_id)
# Update agent stats
await self.db.execute("""
UPDATE agent_status
SET current_task_count = current_task_count - 1,
completed_tasks = completed_tasks + 1,
last_active = NOW()
WHERE agent_id = (SELECT agent_id FROM active_tasks WHERE task_id = $1)
""", task_id)
7. Implementation Guide
Here's how to implement event-driven architecture for your AI agents:
Step 1: Set Up Event Infrastructure
# docker-compose.yml
version: '3.8'
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
command: redis-server --appendonly yes
postgres:
image: postgres:15
environment:
POSTGRES_DB: agent_events
POSTGRES_USER: agent_user
POSTGRES_PASSWORD: agent_pass
ports:
- "5432:5432"
volumes:
- ./init.sql:/docker-entrypoint-initdb.d/init.sql
# init.sql
CREATE TABLE events (
id SERIAL PRIMARY KEY,
stream_id VARCHAR(255) NOT NULL,
event_id UUID NOT NULL UNIQUE,
event_type VARCHAR(100) NOT NULL,
event_version VARCHAR(10) NOT NULL,
timestamp TIMESTAMPTZ NOT NULL,
source_agent VARCHAR(100) NOT NULL,
payload JSONB NOT NULL,
metadata JSONB,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_events_stream_id ON events(stream_id);
CREATE INDEX idx_events_type ON events(event_type);
CREATE INDEX idx_events_timestamp ON events(timestamp);
Step 2: Create Event-Driven Agent Base Class
class EventDrivenAgent:
"""Base class for event-driven AI agents"""
def __init__(self, agent_id: str, event_bus: AgentEventBus):
self.agent_id = agent_id
self.event_bus = event_bus
self.event_handlers = {}
async def start(self):
"""Start agent and subscribe to events"""
# Emit agent started event
await self.emit_event("agent.started", {
"agent_id": self.agent_id,
"capabilities": self.get_capabilities()
})
# Subscribe to events we handle
for event_type, handler in self.event_handlers.items():
await self.event_bus.subscribe(
event_type,
handler,
consumer_group=f"agent-{self.agent_id}"
)
async def emit_event(self, event_type: str, payload: Dict):
"""Emit event to the system"""
event = AgentEvent(
id=generate_uuid(),
type=event_type,
timestamp=datetime.utcnow(),
source_agent=self.agent_id,
payload=payload
)
await self.event_bus.publish(event_type, event.dict())
def handles_event(self, event_type: str):
"""Decorator to register event handlers"""
def decorator(func):
self.event_handlers[event_type] = func
return func
return decorator
def get_capabilities(self) -> List[str]:
"""Return list of capabilities this agent provides"""
raise NotImplementedError
Step 3: Implement Specific Agents
class CodeReviewAgent(EventDrivenAgent):
"""Agent that reviews code changes"""
def __init__(self, agent_id: str, event_bus: AgentEventBus):
super().__init__(agent_id, event_bus)
def get_capabilities(self) -> List[str]:
return ["code_review", "security_scan", "best_practices_check"]
@handles_event("code.pull_request.opened")
async def review_pull_request(self, event_type: str, payload: Dict):
"""Review new pull requests"""
pr_id = payload["pr_id"]
repository = payload["repository"]
# Perform code review (simplified)
review_result = await self.analyze_code_changes(pr_id, repository)
# Emit review completed event
await self.emit_event("code.review.completed", {
"pr_id": pr_id,
"repository": repository,
"score": review_result.score,
"issues": review_result.issues,
"recommendation": review_result.recommendation
})
class DeploymentAgent(EventDrivenAgent):
"""Agent that handles deployments"""
def get_capabilities(self) -> List[str]:
return ["deployment", "rollback", "health_check"]
@handles_event("code.review.completed")
async def consider_deployment(self, event_type: str, payload: Dict):
"""Consider deployment after code review"""
if payload["recommendation"] == "APPROVE":
await self.emit_event("deployment.triggered", {
"pr_id": payload["pr_id"],
"repository": payload["repository"],
"trigger_reason": "code_review_approved"
})
@handles_event("deployment.triggered")
async def deploy_service(self, event_type: str, payload: Dict):
"""Execute deployment"""
try:
result = await self.perform_deployment(payload["repository"])
await self.emit_event("deployment.completed", {
"repository": payload["repository"],
"status": "success",
"duration_seconds": result.duration
})
except DeploymentError as e:
await self.emit_event("deployment.failed", {
"repository": payload["repository"],
"error": str(e),
"rollback_required": True
})
Ready to Build Event-Driven Agents?
The ChaozCode platform includes built-in event infrastructure with Redis Streams, event sourcing, and CQRS patterns. Start building reactive agent systems that scale.
Start Building →