Skip to content

πŸ€– Lesson 3-1: Multi-Agent Orchestration

Learning Objectives

By the end of this lesson, you will be able to:

  • Orchestrate planner/executor/reviewer agent patterns
  • Implement multi-agent systems using LangGraph
  • Build collaborative workflows with AutoGen
  • Design task networks with CrewAI
  • Create scalable and secure multi-agent architectures

Prerequisites


πŸ—οΈ 1. Multi-Agent Architecture Patterns

1.1 Planner/Executor/Reviewer Pattern

Three-Agent Pattern

class PlannerAgent:
    def plan(self, task):
        return {"steps": ["step1", "step2", "step3"]}

class ExecutorAgent:
    def execute(self, plan):
        results = []
        for step in plan["steps"]:
            results.append(self.execute_step(step))
        return results

class ReviewerAgent:
    def review(self, results):
        return {"approved": True, "feedback": "Good work!"}

Pattern Benefits

  • Separation of concerns: Each agent has a specific role
  • Modularity: Easy to swap or upgrade individual agents
  • Quality control: Reviewer ensures output quality
  • Scalability: Can add more specialized agents

1.2 Hierarchical Agent Patterns

Hierarchical Structure

  • Manager Agent: Coordinates overall workflow
  • Specialist Agents: Handle specific domains (research, writing, analysis)
  • Worker Agents: Execute low-level tasks

Hierarchical Implementation

class ManagerAgent:
    def coordinate(self, task):
        # Decompose task
        subtasks = self.decompose(task)

        # Assign to specialists
        for subtask in subtasks:
            specialist = self.select_specialist(subtask)
            result = specialist.execute(subtask)

        # Synthesize results
        return self.synthesize(results)

πŸ”„ 2. LangGraph for Workflow Orchestration

2.1 State Management

LangGraph State

LangGraph uses a state object to manage data flow between agents.

State Definition

from typing import TypedDict, Annotated
from langgraph.graph import StateGraph

class AgentState(TypedDict):
    messages: Annotated[list, "Chat messages"]
    task: Annotated[str, "Current task"]
    results: Annotated[list, "Execution results"]
    next_agent: Annotated[str, "Next agent to call"]

2.2 Workflow Definition

LangGraph Workflow

from langgraph.graph import StateGraph

# Define the workflow
workflow = StateGraph(AgentState)

# Add nodes (agents)
workflow.add_node("planner", planner_agent)
workflow.add_node("executor", executor_agent)
workflow.add_node("reviewer", reviewer_agent)

# Define edges (transitions)
workflow.add_edge("planner", "executor")
workflow.add_edge("executor", "reviewer")

# Set entry point
workflow.set_entry_point("planner")

# Compile the graph
app = workflow.compile()

2.3 Conditional Routing

Conditional Workflow

def router(state):
    if state["task_type"] == "research":
        return "researcher"
    elif state["task_type"] == "writing":
        return "writer"
    else:
        return "generalist"

workflow.add_conditional_edges(
    "planner",
    router,
    {
        "researcher": "researcher",
        "writer": "writer",
        "generalist": "generalist"
    }
)

🀝 3. AutoGen Collaborative Workflows

3.1 Agent Definition

AutoGen Agent Setup

from autogen import AssistantAgent, UserProxyAgent

# Create agents
assistant = AssistantAgent(
    name="assistant",
    llm_config={"config_list": config_list},
    system_message="You are a helpful AI assistant."
)

user_proxy = UserProxyAgent(
    name="user_proxy",
    human_input_mode="NEVER",
    max_consecutive_auto_reply=10,
    llm_config={"config_list": config_list}
)

3.2 Multi-Agent Conversations

Group Chat Implementation

from autogen import GroupChat, GroupChatManager

# Create group chat
groupchat = GroupChat(
    agents=[user_proxy, assistant, coder],
    messages=[],
    max_round=50
)

manager = GroupChatManager(
    groupchat=groupchat,
    llm_config={"config_list": config_list}
)

# Start conversation
user_proxy.initiate_chat(
    manager,
    message="Build a web scraper for news articles"
)

3.3 Tool Integration

Tool-Enabled Agents

from autogen import AssistantAgent

# Agent with tools
coder = AssistantAgent(
    name="coder",
    system_message="You are a Python developer. Use tools to write and test code.",
    llm_config={"config_list": config_list}
)

# Register tools
coder.register_function(
    function_map={
        "write_code": write_code_function,
        "run_tests": run_tests_function
    }
)

πŸš€ 4. CrewAI Task Networks

4.1 Crew Definition

Crew Setup

from crewai import Agent, Task, Crew

# Define agents
researcher = Agent(
    role='Research Analyst',
    goal='Find and analyze market data',
    backstory='Expert in market research and data analysis',
    verbose=True,
    allow_delegation=False,
    tools=[search_tool, web_scraper_tool]
)

writer = Agent(
    role='Content Writer',
    goal='Write compelling content based on research',
    backstory='Experienced content writer and editor',
    verbose=True,
    allow_delegation=False,
    tools=[writing_tool]
)

4.2 Task Definition

Task Creation

# Define tasks
research_task = Task(
    description="""
    Research the latest trends in AI and machine learning.
    Focus on practical applications and market opportunities.
    Provide detailed analysis with supporting data.
    """,
    agent=researcher,
    expected_output="Comprehensive research report with data and insights"
)

writing_task = Task(
    description="""
    Write a blog post based on the research findings.
    Make it engaging and accessible to a general audience.
    Include key insights and actionable takeaways.
    """,
    agent=writer,
    expected_output="Well-written blog post with engaging content"
)

4.3 Crew Execution

Crew Execution

# Create crew
crew = Crew(
    agents=[researcher, writer],
    tasks=[research_task, writing_task],
    verbose=True,
    memory=True
)

# Execute
result = crew.kickoff()
print(result)

πŸ”’ 5. Security and Scalability

5.1 Agent Security

Security Considerations

  • Input validation: Validate all inputs to agents
  • Output sanitization: Sanitize agent outputs before use
  • Access control: Limit agent permissions and capabilities
  • Audit logging: Log all agent interactions for security

Security Implementation

class SecureAgent:
    def __init__(self, permissions):
        self.permissions = permissions
        self.audit_log = []

    def execute(self, task):
        # Validate permissions
        if not self.has_permission(task):
            raise PermissionError("Agent lacks permission for this task")

        # Log execution
        self.audit_log.append({
            "timestamp": time.time(),
            "task": task,
            "agent": self.name
        })

        # Execute with sandboxing
        return self.sandboxed_execute(task)

5.2 Scalability Patterns

Scaling Strategies

  • Horizontal scaling: Add more agent instances
  • Load balancing: Distribute tasks across agents
  • Caching: Cache common results and responses
  • Async processing: Use async/await for non-blocking operations

Scalable Architecture

import asyncio
from concurrent.futures import ThreadPoolExecutor

class ScalableAgentSystem:
    def __init__(self, max_workers=10):
        self.executor = ThreadPoolExecutor(max_workers=max_workers)
        self.agent_pool = []

    async def execute_task(self, task):
        # Get available agent
        agent = await self.get_available_agent()

        # Execute in thread pool
        loop = asyncio.get_event_loop()
        result = await loop.run_in_executor(
            self.executor, 
            agent.execute, 
            task
        )

        return result

πŸ’» 6. Mini-Project: Multi-Agent Content Creation System

Content Creation Challenge

Build a multi-agent system that creates content:

  1. Researcher Agent: Gathers information on a topic
  2. Writer Agent: Creates initial content
  3. Editor Agent: Reviews and improves content
  4. Publisher Agent: Formats and publishes content

Implementation Skeleton

from crewai import Agent, Task, Crew

# Define agents
researcher = Agent(
    role='Research Specialist',
    goal='Gather comprehensive information on topics',
    tools=[web_search_tool, document_reader_tool]
)

writer = Agent(
    role='Content Writer',
    goal='Create engaging and informative content',
    tools=[writing_tool, grammar_checker_tool]
)

editor = Agent(
    role='Content Editor',
    goal='Review and improve content quality',
    tools=[editing_tool, fact_checker_tool]
)

# Define workflow
research_task = Task(description="Research the topic", agent=researcher)
writing_task = Task(description="Write content", agent=writer)
editing_task = Task(description="Edit and improve", agent=editor)

# Create and execute crew
crew = Crew(agents=[researcher, writer, editor], 
            tasks=[research_task, writing_task, editing_task])
result = crew.kickoff()

❓ 7. Self-Check Questions

Knowledge Check

  1. What are the benefits of the planner/executor/reviewer pattern?
  2. How does LangGraph manage state between agents?
  3. What's the difference between AutoGen and CrewAI?
  4. How would you implement security in a multi-agent system?
  5. What scaling strategies would you use for high-traffic agent systems?

Phase 3 Complete!

Phase 4: Production Deployment & Ops β†’

Phase 4 covers Production Deployment & Operationsβ€”deploying agents to production, monitoring, and maintaining agent systems at scale.