Skip to content

Best PracticesΒΆ

Production-ready patterns and expert recommendations

Build scalable, reliable AI agent applications with 400+ modules and 237 enterprise features

Enterprise Best Practices

For enterprise-specific patterns including multi-tenant architectures, compliance workflows, and high-availability deployments, see Enterprise Documentation.

Quick NavigationΒΆ

πŸ’‘ General PrinciplesΒΆ

1. Design for ObservabilityΒΆ

Monitor Everything

Always implement comprehensive monitoring and logging from day one.

Python
from agenticaiframework.monitoring import MonitoringSystem

# Initialize monitoring early
monitor = MonitoringSystem()

# Log all important events
monitor.log_event("ApplicationStarted", {"version": "1.0.0"})

# Track key metrics
monitor.record_metric("startup_time", startup_duration)

Benefits

  • Debug issues faster
  • Track performance trends
  • Understand user behavior
  • Optimize resource usage

2. Implement Proper Error HandlingΒΆ

Handle Failures Gracefully

Always anticipate and handle errors with meaningful feedback and recovery strategies.

Python
def safe_task_execution(task):
    try:
        result = task.run()
        monitor.log_event("TaskCompleted", {"task": task.name})
        return result
    except Exception as e:
        monitor.log_event("TaskFailed", {
            "task": task.name, 
            "error": str(e),
            "error_type": type(e).__name__
        })
        # Implement fallback or recovery logic
        return handle_task_failure(task, e)
Python
1
2
3
4
5
6
7
8
from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=4, max=10)
)
def execute_with_retry(task):
    return task.run()

Error Handling Checklist

  • βœ… Log all errors with context
  • βœ… Implement retry logic for transient failures
  • βœ… Provide fallback mechanisms
  • βœ… Alert on critical errors

3. Use Configuration ManagementΒΆ

Centralize configuration and avoid hard-coding values.

Python
from agenticaiframework.configurations import ConfigurationManager

config = ConfigurationManager()

# Environment-specific configurations
config.set_config("Production", {
    "max_agents": 100,
    "timeout": 30,
    "log_level": "WARNING",
    "memory_backend": "redis"
})

config.set_config("Development", {
    "max_agents": 5,
    "timeout": 60,
    "log_level": "DEBUG",
    "memory_backend": "memory"
})

Agent DevelopmentΒΆ

Agent Naming and OrganizationΒΆ

Python
# Use descriptive, hierarchical names
customer_service_agent = Agent(
    name="CustomerService.EmailSupport",
    role="Email Support Specialist",
    capabilities=["email_processing", "sentiment_analysis", "response_generation"],
    config={
        "response_tone": "professional",
        "escalation_threshold": 0.8
    }
)

# Group related agents
class CustomerServiceAgents:
    @staticmethod
    def create_email_agent():
        return Agent(name="CustomerService.Email", ...)

    @staticmethod
    def create_chat_agent():
        return Agent(name="CustomerService.Chat", ...)

Agent Lifecycle ManagementΒΆ

Python
class AgentLifecycleManager:
    def __init__(self):
        self.agent_manager = AgentManager()
        self.active_agents = {}

    def create_agent(self, agent_config):
        agent = Agent(**agent_config)
        self.agent_manager.register_agent(agent)
        self.active_agents[agent.id] = agent
        return agent

    def shutdown_agent(self, agent_id):
        if agent_id in self.active_agents:
            agent = self.active_agents[agent_id]
            agent.stop()
            self.agent_manager.remove_agent(agent_id)
            del self.active_agents[agent_id]

    def health_check(self):
        for agent_id, agent in self.active_agents.items():
            if agent.status != "running":
                self.monitor.log_event("AgentUnhealthy", {"agent_id": agent_id})

Capability-Based DesignΒΆ

Python
# Define clear capabilities
TEXT_PROCESSING_CAPABILITIES = ["text_analysis",
    "sentiment_detection", 
    "entity_extraction",
    "language_detection"
]

CONVERSATION_CAPABILITIES = ["dialogue_management",
    "context_tracking",
    "response_generation"
]

# Create specialized agents
text_processor = Agent(
    name="TextProcessor",
    role="Text Analysis Specialist",
    capabilities=TEXT_PROCESSING_CAPABILITIES,
    config={"model": "text-analysis-v2"}
)

conversation_agent = Agent(
    name="ConversationAgent", 
    role="Dialogue Manager",
    capabilities=CONVERSATION_CAPABILITIES,
    config={"context_window": 10}
)

Task ManagementΒΆ

Task Design PatternsΒΆ

Python
# Use descriptive task definitions
class TaskBuilder:
    @staticmethod
    def create_analysis_task(data, analysis_type):
        return Task(
            name=f"Analysis.{analysis_type}",
            objective=f"Perform {analysis_type} analysis on provided data",
            executor=lambda: analyze_data(data, analysis_type),
            inputs={"data": data, "type": analysis_type}
        )

    @staticmethod
    def create_workflow_task(subtasks):
        return Task(
            name="Workflow.MultiStep",
            objective="Execute multiple related tasks in sequence",
            executor=lambda: execute_workflow(subtasks),
            inputs={"subtasks": subtasks}
        )

Task DependenciesΒΆ

Python
def create_dependent_tasks():
    # Data collection task
    collect_task = Task(
        name="DataCollection",
        objective="Collect raw data",
        executor=collect_data
    )

    # Processing task (depends on collection)
    process_task = Task(
        name="DataProcessing",
        objective="Process collected data", 
        executor=lambda: process_data(collect_task.result),
        dependencies=[collect_task]
    )

    # Analysis task (depends on processing)
    analyze_task = Task(
        name="DataAnalysis",
        objective="Analyze processed data",
        executor=lambda: analyze_data(process_task.result),
        dependencies=[process_task]
    )

    return [collect_task, process_task, analyze_task]

Memory ManagementΒΆ

Memory StrategiesΒΆ

Python
class MemoryStrategy:
    def __init__(self):
        self.memory = MemoryManager()

    def store_user_context(self, user_id, context, ttl=3600):
        """Store user context with expiration"""
        key = f"user_context:{user_id}"
        self.memory.store(key, context, memory_type="short_term")
        # Implement TTL logic for cleanup

    def store_system_knowledge(self, knowledge_item):
        """Store long-term system knowledge"""
        key = f"knowledge:{knowledge_item['id']}"
        self.memory.store(key, knowledge_item, memory_type="long_term")

    def cache_computation_result(self, computation_hash, result):
        """Cache expensive computation results"""
        key = f"cache:{computation_hash}"
        self.memory.store(key, result, memory_type="short_term")

Memory OptimizationΒΆ

Python
def optimize_memory_usage():
    memory = MemoryManager()

    # Regular cleanup of expired data
    def cleanup_expired_data():
        current_time = time.time()
        for key in list(memory.short_term.keys()):
            if key.startswith("temp:"):
                # Remove temporary data older than 1 hour
                if current_time - memory.short_term[key].get("timestamp", 0) > 3600:
                    del memory.short_term[key]

    # Memory usage monitoring
    def monitor_memory_usage():
        short_term_size = len(memory.short_term)
        long_term_size = len(memory.long_term)

        monitor.record_metric("memory_short_term_size", short_term_size)
        monitor.record_metric("memory_long_term_size", long_term_size)

        if short_term_size > 10000: # Threshold
            monitor.log_event("MemoryThresholdExceeded", {
                "type": "short_term",
                "size": short_term_size
            })

Security Best PracticesΒΆ

Input ValidationΒΆ

Python
from agenticaiframework.guardrails import Guardrail, GuardrailManager

class SecurityGuardrails:
    @staticmethod
    def create_input_validation():
        def validate_input(data):
            # Check for malicious content
            if any(pattern in str(data).lower() for pattern in ["<script>", "javascript:", "eval("]):
                return False

            # Check data size limits
            if len(str(data)) > 100000: # 100KB limit
                return False

            return True

        return Guardrail("InputValidation", validate_input)

    @staticmethod
    def create_output_filter():
        def filter_output(text):
            # Remove sensitive information patterns
            import re

            # Remove potential credit card numbers
            text = re.sub(r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b', '[REDACTED]', text)

            # Remove potential SSNs
            text = re.sub(r'\b\d{3}-\d{2}-\d{4}\b', '[REDACTED]', text)

            return text

        return Guardrail("OutputFilter", lambda x: filter_output(x) != "[BLOCKED]")

Access ControlΒΆ

Python
class AccessControlManager:
    def __init__(self):
        self.permissions = {}
        self.roles = {}

    def define_role(self, role_name, permissions):
        self.roles[role_name] = permissions

    def assign_role(self, agent_id, role_name):
        if role_name in self.roles:
            self.permissions[agent_id] = self.roles[role_name]

    def check_permission(self, agent_id, action):
        agent_permissions = self.permissions.get(agent_id, [])
        return action in agent_permissions

# Usage
access_control = AccessControlManager()
access_control.define_role("data_reader", ["read_data", "list_data"])
access_control.define_role("data_writer", ["read_data", "write_data", "delete_data"])

# Assign roles to agents
access_control.assign_role(agent.id, "data_reader")

Performance OptimizationΒΆ

Asynchronous OperationsΒΆ

Python
import asyncio
from concurrent.futures import ThreadPoolExecutor

class AsyncTaskManager:
    def __init__(self, max_workers=10):
        self.executor = ThreadPoolExecutor(max_workers=max_workers)

    async def execute_tasks_async(self, tasks):
        loop = asyncio.get_event_loop()

        # Execute tasks concurrently
        futures = [loop.run_in_executor(self.executor, task.run)
            for task in tasks
        ]

        results = await asyncio.gather(*futures, return_exceptions=True)
        return results

    def execute_cpu_intensive_task(self, task):
        """For CPU-intensive tasks, use process pool"""
        from concurrent.futures import ProcessPoolExecutor

        with ProcessPoolExecutor() as executor:
            future = executor.submit(task.run)
            return future.result()

Caching StrategiesΒΆ

Python
class CachingManager:
    def __init__(self):
        self.cache = {}
        self.cache_hits = 0
        self.cache_misses = 0

    def get_or_compute(self, key, compute_fn, ttl=3600):
        current_time = time.time()

        # Check if cached and not expired
        if key in self.cache:
            cached_item = self.cache[key]
            if current_time - cached_item["timestamp"] < ttl:
                self.cache_hits += 1
                return cached_item["value"]

        # Compute and cache
        self.cache_misses += 1
        result = compute_fn()
        self.cache[key] = {
            "value": result,
            "timestamp": current_time
        }

        return result

    def get_cache_stats(self):
        total_requests = self.cache_hits + self.cache_misses
        hit_rate = self.cache_hits / total_requests if total_requests > 0 else 0
        return {
            "hits": self.cache_hits,
            "misses": self.cache_misses,
            "hit_rate": hit_rate
        }

Testing Best PracticesΒΆ

Unit TestingΒΆ

Python
import unittest
from unittest.mock import Mock, patch

class TestAgentBehavior(unittest.TestCase):
    def setUp(self):
        self.agent = Agent(
            name="TestAgent",
            role="Tester",
            capabilities=["testing"],
            config={"test_mode": True}
        )

    def test_agent_initialization(self):
        self.assertEqual(self.agent.name, "TestAgent")
        self.assertEqual(self.agent.status, "initialized")

    def test_agent_start(self):
        self.agent.start()
        self.assertEqual(self.agent.status, "running")

    @patch('agenticaiframework.llms.LLMManager')
    def test_task_execution(self, mock_llm):
        # Mock LLM response
        mock_llm.generate.return_value = "Test response"

        task = Task(
            name="TestTask",
            objective="Test execution",
            executor=lambda: mock_llm.generate("test prompt")
        )

        result = task.run()
        self.assertEqual(result, "Test response")

Integration TestingΒΆ

Python
class TestAgentIntegration(unittest.TestCase):
    def setUp(self):
        self.agent_manager = AgentManager()
        self.memory_manager = MemoryManager()
        self.monitor = MonitoringSystem()

    def test_multi_agent_workflow(self):
        # Create agents
        agent1 = Agent("Agent1", "Role1", ["cap1"], {})
        agent2 = Agent("Agent2", "Role2", ["cap2"], {})

        # Register agents
        self.agent_manager.register_agent(agent1)
        self.agent_manager.register_agent(agent2)

        # Test workflow
        agent1.start()
        agent2.start()

        self.assertEqual(len(self.agent_manager.list_agents()), 2)

Monitoring and ObservabilityΒΆ

Custom MetricsΒΆ

Python
class ApplicationMetrics:
    def __init__(self, monitor):
        self.monitor = monitor

    def track_agent_performance(self, agent_id, task_duration, success):
        self.monitor.record_metric(f"agent_{agent_id}_task_duration", task_duration)
        self.monitor.record_metric(f"agent_{agent_id}_success_rate", 1 if success else 0)

    def track_system_health(self):
        import psutil

        # System metrics
        cpu_percent = psutil.cpu_percent()
        memory_percent = psutil.virtual_memory().percent

        self.monitor.record_metric("system_cpu_percent", cpu_percent)
        self.monitor.record_metric("system_memory_percent", memory_percent)

        # Application metrics
        active_agents = len(self.agent_manager.list_agents())
        self.monitor.record_metric("active_agents_count", active_agents)

Logging StandardsΒΆ

Python
import logging
import json

class StructuredLogger:
    def __init__(self, name):
        self.logger = logging.getLogger(name)
        self.logger.setLevel(logging.INFO)

        # Create structured formatter
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )

        handler = logging.StreamHandler()
        handler.setFormatter(formatter)
        self.logger.addHandler(handler)

    def log_structured(self, level, message, **kwargs):
        structured_data = {
            "message": message,
            "metadata": kwargs
        }

        getattr(self.logger, level)(json.dumps(structured_data))

    def log_agent_action(self, agent_id, action, **context):
        self.log_structured("info", "Agent action", 
                          agent_id=agent_id, 
                          action=action, 
                          **context)

Deployment Best PracticesΒΆ

Environment ConfigurationΒΆ

Python
import os
from enum import Enum

class Environment(Enum):
    DEVELOPMENT = "development"
    STAGING = "staging"
    PRODUCTION = "production"

class EnvironmentConfig:
    def __init__(self):
        self.env = Environment(os.getenv("AGENTICAI_ENV", "development"))
        self.config = self._load_config()

    def _load_config(self):
        configs = {
            Environment.DEVELOPMENT: {
                "log_level": "DEBUG",
                "max_agents": 5,
                "enable_debug_features": True
            },
            Environment.STAGING: {
                "log_level": "INFO", 
                "max_agents": 20,
                "enable_debug_features": False
            },
            Environment.PRODUCTION: {
                "log_level": "WARNING",
                "max_agents": 100,
                "enable_debug_features": False
            }
        }
        return configs[self.env]

Health ChecksΒΆ

Python
class HealthChecker:
    def __init__(self, agent_manager, memory_manager):
        self.agent_manager = agent_manager
        self.memory_manager = memory_manager

    def check_system_health(self):
        health_status = {
            "status": "healthy",
            "checks": {
                "agents": self._check_agents(),
                "memory": self._check_memory(),
                "database": self._check_database()
            }
        }

        # Overall status
        if any(check["status"] != "healthy" for check in health_status["checks"].values()):
            health_status["status"] = "unhealthy"

        return health_status

    def _check_agents(self):
        agents = self.agent_manager.list_agents()
        healthy_agents = [a for a in agents if a.status == "running"]

        return {
            "status": "healthy" if len(healthy_agents) > 0 else "unhealthy",
            "total_agents": len(agents),
            "healthy_agents": len(healthy_agents)
        }

    def _check_memory(self):
        try:
            # Test memory operations
            test_key = "health_check_test"
            self.memory_manager.store(test_key, "test_value")
            retrieved = self.memory_manager.retrieve(test_key)

            return {
                "status": "healthy" if retrieved == "test_value" else "unhealthy",
                "details": "Memory read/write operations successful"
            }
        except Exception as e:
            return {
                "status": "unhealthy",
                "error": str(e)
            }

Security Best PracticesΒΆ

1. Enable All Security Features in ProductionΒΆ

Always enable comprehensive security in production environments:

Python
from agenticaiframework.security import SecurityManager

# Production security configuration
security = SecurityManager(
    enable_injection_detection=True,
    enable_input_validation=True,
    enable_rate_limiting=True,
    enable_content_filtering=True,
    enable_audit_logging=True
)

2. Implement Defense in DepthΒΆ

Layer multiple security controls:

Python
def secure_agent_input(user_input: str, user_id: str) -> dict:
    # Layer 1: Rate limiting
    rate_result = rate_limiter.check_rate_limit(user_id)
    if not rate_result['allowed']:
        return {"error": "Rate limit exceeded"}

    # Layer 2: Input validation
    validation = input_validator.validate(user_input)
    if not validation['is_valid']:
        return {"error": "Invalid input", "details": validation['errors']}

    # Layer 3: Injection detection
    injection = injection_detector.detect(user_input)
    if injection['is_injection']:
        audit_logger.log_event("injection_detected", {
            "user_id": user_id,
            "confidence": injection['confidence']
        })
        return {"error": "Security violation detected"}

    # Layer 4: Content filtering
    filter_result = content_filter.filter_text(user_input)
    if filter_result['blocked']:
        return {"error": "Content policy violation"}

    return {"success": True, "sanitized_input": validation['sanitized']}

3. Audit All Security EventsΒΆ

Maintain comprehensive audit logs:

Python
from agenticaiframework.security import AuditLogger

audit_logger = AuditLogger(
    log_file="/var/log/agenticai/security.log",
    retention_days=90
)

# Log all security-relevant events
audit_logger.log_event("user_authentication", {
    "user_id": user_id,
    "timestamp": datetime.now().isoformat(),
    "ip_address": request.remote_addr,
    "success": True
})

audit_logger.log_event("prompt_injection_blocked", {
    "user_id": user_id,
    "pattern_matched": "ignore_instructions",
    "confidence": 0.95
})

4. Validate and Sanitize All InputsΒΆ

Never trust user input:

Python
from agenticaiframework.security import InputValidator

validator = InputValidator(
    max_length=10000,
    allow_html=False,
    allow_scripts=False
)

def process_user_input(raw_input: str) -> str:
    # Validate
    validation = validator.validate(raw_input)
    if not validation['is_valid']:
        raise ValueError(f"Invalid input: {validation['errors']}")

    # Sanitize
    clean_input = validator.sanitize(raw_input)

    # Additional domain-specific sanitization
    clean_input = remove_sensitive_patterns(clean_input)

    return clean_input

5. Implement Rate LimitingΒΆ

Protect against abuse:

Python
from agenticaiframework.security import RateLimiter

# Different limits for different endpoints
api_limiter = RateLimiter(max_requests=100, window_seconds=60)
expensive_limiter = RateLimiter(max_requests=10, window_seconds=60)

@app.route('/api/query')
def handle_query(user_id: str):
    if not api_limiter.check_rate_limit(user_id)['allowed']:
        return {"error": "Rate limit exceeded"}, 429

    # Process request
    return process_query()

@app.route('/api/expensive_operation')
def handle_expensive(user_id: str):
    if not expensive_limiter.check_rate_limit(user_id)['allowed']:
        return {"error": "Rate limit exceeded"}, 429

    # Process expensive request
    return process_expensive_operation()

6. Use Prompt Security FeaturesΒΆ

Enable security in prompt rendering:

Python
from agenticaiframework.prompts import Prompt, PromptManager

# Create prompts with security enabled
manager = PromptManager(enable_security=True)

prompt = Prompt(
    template="Process user request: {user_input}",
    metadata={"category": "user_facing"},
    enable_security=True
)

# Render safely
try:
    result = manager.render_prompt(
        prompt.id,
        user_input=untrusted_user_input
    )
except ValueError as e:
    # Handle injection attempts
    log_security_violation(e)

7. Monitor Security MetricsΒΆ

Track and alert on security events:

Python
from agenticaiframework.security import SecurityManager
from agenticaiframework.monitoring import MonitoringSystem

security = SecurityManager()
monitor = MonitoringSystem()

# Regular security checks
def check_security_health():
    report = security.get_security_report()

    # Track metrics
    monitor.track_metric("injection_attempts", report['injection_attempts'])
    monitor.track_metric("rate_limit_violations", report['rate_limit_violations'])
    monitor.track_metric("content_violations", report['content_violations'])

    # Alert on anomalies
    if report['injection_attempts'] > threshold:
        monitor.create_alert(
            "high_injection_attempts",
            lambda: report['injection_attempts'] > threshold
        )

8. Regular Security UpdatesΒΆ

Keep detection patterns current:

Python
from agenticaiframework.security import PromptInjectionDetector

detector = PromptInjectionDetector()

# Add new threat patterns as they emerge
detector.add_pattern(
    r"new_attack_pattern_here",
    severity="high"
)

# Update blocked words
content_filter.add_blocked_word("new_spam_term", category="spam")

# Review and update regularly
def update_security_patterns():
    # Load latest patterns from threat intelligence
    patterns = load_latest_threat_patterns()
    for pattern in patterns:
        detector.add_pattern(pattern['regex'], pattern['severity'])

These best practices will help you build robust, secure, scalable, and maintainable applications with AgenticAI Framework. Remember to adapt these patterns to your specific use case and requirements.