Skip to content

Multi-Agent OrchestrationΒΆ

AgenticAI Framework provides powerful orchestration capabilities for building sophisticated multi-agent systems. Create teams, define workflows, and coordinate agents seamlessly.

Enterprise Orchestration

The framework includes 12 workflow & orchestration modules for enterprise deployments including workflow engine, state machine, saga patterns, and job queue management.


Orchestration OverviewΒΆ

  • πŸ‘₯ Teams


    Group agents into collaborative teams with shared goals

    Learn Teams

  • πŸ”„ Workflows


    Define execution patterns: sequential, parallel, or custom

    Learn Workflows

  • 🧭 Routing


    Intelligently route tasks to the right agents

    Learn Routing

  • πŸ‘‘ Hierarchies


    Build leader-follower structures for complex tasks

    Learn Hierarchies


ArchitectureΒΆ

Text Only
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Orchestration Layer β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚ β”‚
β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
β”‚ β”‚ Team β”‚ β”‚ Workflow β”‚ β”‚ Router β”‚ β”‚
β”‚ β”‚ Manager │◄──►│ Manager │◄──►│ Engine β”‚ β”‚
β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
β”‚ β”‚ β”‚ β”‚ β”‚
β”‚ β–Ό β–Ό β–Ό β”‚
β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
β”‚ β”‚ Agent β”‚ β”‚ State β”‚ β”‚ Message β”‚ β”‚
β”‚ β”‚ Registry β”‚ β”‚ Manager β”‚ β”‚ Queue β”‚ β”‚
β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
β”‚ β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
β”‚ β”‚ Agent 1 β”‚ β”‚ Agent 2 β”‚ β”‚ Agent 3 β”‚ β”‚ Agent N β”‚ β”‚
β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Agent TeamsΒΆ

Creating a Basic TeamΒΆ

Python
import logging

logger = logging.getLogger(__name__)

from agenticaiframework import Agent, AgentConfig, Team

# Create specialized agents
researcher = Agent(
    config=AgentConfig(
        name="researcher",
        role="Research Analyst",
        goal="Gather comprehensive information on topics",
        tools=["web_search", "wikipedia"]
    )
)

writer = Agent(
    config=AgentConfig(
        name="writer",
        role="Content Writer",
        goal="Transform research into engaging content",
        tools=["text_editor"]
    )
)

editor = Agent(
    config=AgentConfig(
        name="editor",
        role="Quality Editor",
        goal="Ensure content is polished and error-free",
        tools=["grammar_check", "readability_analyzer"]
    )
)

# Create a team
team = Team(
    name="content_team",
    agents=[researcher, writer, editor],
    description="A team for creating high-quality content"
)

# Execute team task
result = team.execute("Create an article about quantum computing")
logger.info(result.final_output)

Team ConfigurationΒΆ

Python
from agenticaiframework import Team, TeamConfig

team = Team(
    name="research_team",
    config=TeamConfig(
        # Execution settings
        max_iterations=10,
        timeout_seconds=300,

        # Communication
        allow_agent_communication=True,
        shared_memory=True,

        # Error handling
        fail_fast=False, # Continue if one agent fails
        retry_failed_agents=True,
        max_retries=3,

        # Logging
        verbose=True,
        log_agent_thoughts=True
    ),
    agents=[researcher, writer, editor]
)

Workflow PatternsΒΆ

Sequential WorkflowΒΆ

Agents execute one after another, passing output to the next agent.

Python
from agenticaiframework import Team, WorkflowManager

# Sequential: researcher β†’ writer β†’ editor
team = Team(
    name="content_pipeline",
    agents=[researcher, writer, editor],
    workflow=WorkflowManager.sequential()
)

result = team.execute("Write about renewable energy")
# 1. researcher gathers information
# 2. writer creates content
# 3. editor polishes final output

Parallel WorkflowΒΆ

Agents work simultaneously on the same task.

Python
from agenticaiframework import WorkflowManager

# Parallel: all analysts work at the same time
team = Team(
    name="multi_analyst",
    agents=[market_analyst, tech_analyst, competitor_analyst],
    workflow=WorkflowManager.parallel()
)

result = team.execute("Analyze the AI industry")
# All three analysts work simultaneously
# Results are aggregated at the end

Mixed WorkflowΒΆ

Combine sequential and parallel patterns.

Python
from agenticaiframework import WorkflowManager, WorkflowStep

workflow = WorkflowManager.custom([# Step 1: Research in parallel
    WorkflowStep(
        name="research",
        agents=["researcher_1", "researcher_2"],
        execution="parallel"
    ),
    # Step 2: Aggregate results
    WorkflowStep(
        name="aggregate",
        agents=["synthesizer"],
        execution="sequential",
        depends_on=["research"]
    ),
    # Step 3: Write and review in parallel
    WorkflowStep(
        name="content",
        agents=["writer", "reviewer"],
        execution="parallel",
        depends_on=["aggregate"]
    ),
    # Step 4: Final edit
    WorkflowStep(
        name="finalize",
        agents=["editor"],
        execution="sequential",
        depends_on=["content"]
    )
])

team = Team(
    name="content_factory",
    agents=[researcher_1, researcher_2, synthesizer, writer, reviewer, editor],
    workflow=workflow
)

Conditional WorkflowΒΆ

Execute different paths based on conditions.

Python
from agenticaiframework import WorkflowManager, ConditionalStep

workflow = WorkflowManager.custom([WorkflowStep(name="analyze", agents=["analyzer"]),
    ConditionalStep(
        name="route",
        condition=lambda result: result.complexity > 0.8,
        if_true=WorkflowStep(name="expert", agents=["expert_agent"]),
        if_false=WorkflowStep(name="standard", agents=["standard_agent"])
    ),
    WorkflowStep(name="finalize", agents=["finalizer"])
])

Hierarchical TeamsΒΆ

Leader-Follower StructureΒΆ

Python
from agenticaiframework import Team, Agent, AgentConfig

# Create leader agent
leader = Agent(
    config=AgentConfig(
        name="project_manager",
        role="Project Manager",
        goal="Coordinate team members and synthesize results",
        is_leader=True
    )
)

# Create team members
developers = [Agent(config=AgentConfig(name=f"dev_{i}", role="Developer"))
    for i in range(3)
]

# Hierarchical team
team = Team(
    name="dev_team",
    leader=leader,
    agents=developers,
    workflow=WorkflowManager.hierarchical()
)

# Leader delegates and coordinates
result = team.execute("Build a REST API for user management")

Multi-Level HierarchyΒΆ

Python
from agenticaiframework import Team, TeamHierarchy

# Create department leads
research_lead = Agent(config=AgentConfig(name="research_lead", role="Research Lead"))
dev_lead = Agent(config=AgentConfig(name="dev_lead", role="Development Lead"))
qa_lead = Agent(config=AgentConfig(name="qa_lead", role="QA Lead"))

# Create sub-teams
research_team = Team(
    name="research",
    leader=research_lead,
    agents=[researcher_1, researcher_2]
)

dev_team = Team(
    name="development",
    leader=dev_lead,
    agents=[dev_1, dev_2, dev_3]
)

qa_team = Team(
    name="qa",
    leader=qa_lead,
    agents=[qa_1, qa_2]
)

# Create executive leader
cto = Agent(config=AgentConfig(name="cto", role="CTO"))

# Build hierarchy
hierarchy = TeamHierarchy(
    leader=cto,
    sub_teams=[research_team, dev_team, qa_team]
)

result = hierarchy.execute("Develop a new AI feature")

Intelligent RoutingΒΆ

Skill-Based RoutingΒΆ

Python
from agenticaiframework import Router, RoutingStrategy

# Define agent skills
router = Router(
    strategy=RoutingStrategy.SKILL_BASED,
    agents={
        "python_expert": ["python", "django", "flask"],
        "js_expert": ["javascript", "react", "nodejs"],
        "data_scientist": ["ml", "data_analysis", "statistics"],
        "devops_engineer": ["docker", "kubernetes", "ci/cd"]
    }
)

# Route task to appropriate agent
task = "Create a React component for data visualization"
selected_agent = router.route(task)
# Returns: "js_expert"

Load-Based RoutingΒΆ

Python
1
2
3
4
5
6
7
8
router = Router(
    strategy=RoutingStrategy.LOAD_BALANCED,
    agents=[agent_1, agent_2, agent_3],
    max_concurrent_per_agent=5
)

# Routes to least busy agent
agent = router.route(task)

Quality-Based RoutingΒΆ

Python
1
2
3
4
5
6
7
8
router = Router(
    strategy=RoutingStrategy.QUALITY_BASED,
    agents=agents,
    performance_history=True # Uses historical performance data
)

# Routes to agent with best track record for similar tasks
agent = router.route(task, task_type="code_review")

Custom Routing LogicΒΆ

Python
from agenticaiframework import Router

class CustomRouter(Router):
    def route(self, task, context=None):
        # Custom routing logic
        if "urgent" in task.lower():
            return self.get_fastest_agent()
        elif "complex" in task.lower():
            return self.get_most_skilled_agent(task)
        else:
            return self.get_available_agent()

router = CustomRouter(agents=agents)

Agent CommunicationΒΆ

Direct MessagingΒΆ

Python
from agenticaiframework import AgentMessenger

messenger = AgentMessenger()

# Send message between agents
messenger.send(
    from_agent="researcher",
    to_agent="writer",
    message_type="data_handoff",
    content={
        "research_data": research_results,
        "key_findings": findings,
        "sources": sources
    }
)

# Receive messages
messages = messenger.receive(agent_id="writer")

Broadcast CommunicationΒΆ

Python
1
2
3
4
5
6
# Broadcast to all team members
messenger.broadcast(
    from_agent="leader",
    message_type="announcement",
    content={"status": "Project milestone reached"}
)

Request-Response PatternΒΆ

Python
# Request help from another agent
response = messenger.request(
    from_agent="writer",
    to_agent="researcher",
    request={
        "type": "clarification",
        "question": "Can you provide more details on point 3?"
    },
    timeout_seconds=60
)

Shared ContextΒΆ

Python
from agenticaiframework import SharedContext

# Create shared context for team
shared = SharedContext(team_id="content_team")

# Agent adds to shared context
shared.set("research_findings", findings, agent="researcher")

# Another agent reads from shared context
findings = shared.get("research_findings")

# Watch for updates
shared.watch("research_findings", callback=on_update)

Task DelegationΒΆ

Automatic DelegationΒΆ

Python
from agenticaiframework import DelegationManager

delegation = DelegationManager(team=team)

# Leader delegates based on capabilities
delegation.delegate(
    task="Analyze market trends and write report",
    strategy="capability_match"
)
# Automatically breaks down task and assigns to appropriate agents

Manual DelegationΒΆ

Python
# Leader explicitly assigns tasks
delegation.assign(
    task="Research competitor pricing",
    agent="researcher",
    priority="high",
    deadline="2024-01-15"
)

delegation.assign(
    task="Write executive summary",
    agent="writer",
    depends_on="Research competitor pricing"
)

Delegation TrackingΒΆ

Python
import logging

logger = logging.getLogger(__name__)

# Track delegation status
status = delegation.get_status()
logger.info(f"Assigned: {status.assigned}")
logger.info(f"In Progress: {status.in_progress}")
logger.info(f"Completed: {status.completed}")

# Get agent workload
workload = delegation.get_agent_workload("researcher")
logger.info(f"Current tasks: {workload.current_tasks}")
logger.info(f"Capacity: {workload.remaining_capacity}")

Result AggregationΒΆ

Aggregation StrategiesΒΆ

Python
1
2
3
4
5
6
7
8
from agenticaiframework import ResultAggregator, AggregationStrategy

# Create aggregator
aggregator = ResultAggregator(strategy=AggregationStrategy.MERGE)

# Aggregate parallel results
results = [result_1, result_2, result_3]
final = aggregator.aggregate(results)

Available StrategiesΒΆ

Python
1
2
3
# Combine all outputs into one
aggregator = ResultAggregator(strategy=AggregationStrategy.MERGE)
# Result: Combined text from all agents
Python
1
2
3
# Use majority voting
aggregator = ResultAggregator(strategy=AggregationStrategy.VOTE)
# Result: Most common answer
Python
1
2
3
# Select best result by score
aggregator = ResultAggregator(strategy=AggregationStrategy.BEST)
# Result: Highest scored output
Python
1
2
3
4
5
6
# AI-powered synthesis
aggregator = ResultAggregator(
    strategy=AggregationStrategy.SYNTHESIZE,
    synthesizer_agent=synthesizer
)
# Result: Synthesized output using AI

Custom AggregationΒΆ

Python
1
2
3
4
5
6
7
8
9
class CustomAggregator(ResultAggregator):
    def aggregate(self, results):
        # Custom aggregation logic
        weighted_results = []
        for result in results:
            weight = self.get_agent_weight(result.agent)
            weighted_results.append((result, weight))

        return self.weighted_merge(weighted_results)

Error HandlingΒΆ

Team-Level Error HandlingΒΆ

Python
from agenticaiframework import Team, TeamErrorHandler

handler = TeamErrorHandler(
    on_agent_failure="retry", # Options: retry, skip, fail, delegate
    max_retries=3,
    fallback_agent="backup_agent"
)

team = Team(
    name="robust_team",
    agents=agents,
    error_handler=handler
)

Recovery StrategiesΒΆ

Python
from agenticaiframework import RecoveryStrategy

team = Team(
    name="resilient_team",
    agents=agents,
    recovery_strategies=[RecoveryStrategy.RETRY_WITH_BACKOFF,
        RecoveryStrategy.DELEGATE_TO_BACKUP,
        RecoveryStrategy.ROLLBACK_TO_CHECKPOINT,
        RecoveryStrategy.GRACEFUL_DEGRADATION
    ]
)

Circuit BreakerΒΆ

Python
from agenticaiframework import CircuitBreaker

# Add circuit breaker to agent
agent = Agent(
    config=config,
    circuit_breaker=CircuitBreaker(
        failure_threshold=5,
        reset_timeout_seconds=60
    )
)

# Circuit opens after 5 failures, resets after 60 seconds

Monitoring & ObservabilityΒΆ

Orchestration MetricsΒΆ

Python
import logging

logger = logging.getLogger(__name__)

from agenticaiframework import OrchestrationMonitor

monitor = OrchestrationMonitor(team=team)

# Get team metrics
metrics = monitor.get_metrics()
logger.info(f"Total tasks: {metrics.total_tasks}")
logger.info(f"Success rate: {metrics.success_rate:.2%}")
logger.info(f"Avg completion time: {metrics.avg_completion_time_ms}ms")

# Get agent-level metrics
for agent_id, agent_metrics in metrics.agent_metrics.items():
    logger.info(f"{agent_id}: {agent_metrics.tasks_completed} tasks")

Event StreamingΒΆ

Python
import logging

logger = logging.getLogger(__name__)

# Stream orchestration events
async for event in monitor.stream_events():
    if event.type == "task_started":
        logger.info(f"Task started: {event.task_id} by {event.agent_id}")
    elif event.type == "task_completed":
        logger.info(f"Task completed: {event.task_id}")
    elif event.type == "agent_failed":
        logger.info(f"Agent failed: {event.agent_id}: {event.error}")

VisualizationΒΆ

Python
1
2
3
4
5
# Generate workflow visualization
team.visualize(output_file="workflow.png")

# Generate execution timeline
team.generate_timeline(execution_id="exec_001", output_file="timeline.html")

Best PracticesΒΆ

1. Design for FailureΒΆ

Python
1
2
3
4
5
6
7
# Always include fallback agents
team = Team(
    name="robust_team",
    agents=[primary_agent],
    fallback_agents=[backup_agent_1, backup_agent_2],
    error_handler=TeamErrorHandler(on_agent_failure="delegate")
)

2. Use Appropriate Workflow PatternsΒΆ

Python
1
2
3
4
5
6
7
8
# Sequential for dependent tasks
workflow = WorkflowManager.sequential() # research β†’ write β†’ edit

# Parallel for independent tasks
workflow = WorkflowManager.parallel() # analyze_a | analyze_b | analyze_c

# Hierarchical for complex coordination
workflow = WorkflowManager.hierarchical() # leader delegates to workers

3. Enable Shared Memory for CollaborationΒΆ

Python
1
2
3
4
5
6
7
8
team = Team(
    name="collaborative_team",
    agents=agents,
    config=TeamConfig(
        shared_memory=True,
        allow_agent_communication=True
    )
)

4. Set Appropriate TimeoutsΒΆ

Python
1
2
3
4
5
6
7
8
team = Team(
    name="time_bounded",
    agents=agents,
    config=TeamConfig(
        timeout_seconds=300, # 5 minute overall timeout
        per_agent_timeout=60 # 1 minute per agent
    )
)

API ReferenceΒΆ

For complete API documentation, see: