Production-ready patterns and expert recommendations
Build scalable, reliable AI agent applications with 400+ modules and 237 enterprise features
Enterprise Best Practices
For enterprise-specific patterns including multi-tenant architectures, compliance workflows, and high-availability deployments, see Enterprise Documentation.
# Use descriptive, hierarchical namescustomer_service_agent=Agent(name="CustomerService.EmailSupport",role="Email Support Specialist",capabilities=["email_processing","sentiment_analysis","response_generation"],config={"response_tone":"professional","escalation_threshold":0.8})# Group related agentsclassCustomerServiceAgents:@staticmethoddefcreate_email_agent():returnAgent(name="CustomerService.Email",...)@staticmethoddefcreate_chat_agent():returnAgent(name="CustomerService.Chat",...)
# Use descriptive task definitionsclassTaskBuilder:@staticmethoddefcreate_analysis_task(data,analysis_type):returnTask(name=f"Analysis.{analysis_type}",objective=f"Perform {analysis_type} analysis on provided data",executor=lambda:analyze_data(data,analysis_type),inputs={"data":data,"type":analysis_type})@staticmethoddefcreate_workflow_task(subtasks):returnTask(name="Workflow.MultiStep",objective="Execute multiple related tasks in sequence",executor=lambda:execute_workflow(subtasks),inputs={"subtasks":subtasks})
defcreate_dependent_tasks():# Data collection taskcollect_task=Task(name="DataCollection",objective="Collect raw data",executor=collect_data)# Processing task (depends on collection)process_task=Task(name="DataProcessing",objective="Process collected data",executor=lambda:process_data(collect_task.result),dependencies=[collect_task])# Analysis task (depends on processing)analyze_task=Task(name="DataAnalysis",objective="Analyze processed data",executor=lambda:analyze_data(process_task.result),dependencies=[process_task])return[collect_task,process_task,analyze_task]
classMemoryStrategy:def__init__(self):self.memory=MemoryManager()defstore_user_context(self,user_id,context,ttl=3600):"""Store user context with expiration"""key=f"user_context:{user_id}"self.memory.store(key,context,memory_type="short_term")# Implement TTL logic for cleanupdefstore_system_knowledge(self,knowledge_item):"""Store long-term system knowledge"""key=f"knowledge:{knowledge_item['id']}"self.memory.store(key,knowledge_item,memory_type="long_term")defcache_computation_result(self,computation_hash,result):"""Cache expensive computation results"""key=f"cache:{computation_hash}"self.memory.store(key,result,memory_type="short_term")
importasynciofromconcurrent.futuresimportThreadPoolExecutorclassAsyncTaskManager:def__init__(self,max_workers=10):self.executor=ThreadPoolExecutor(max_workers=max_workers)asyncdefexecute_tasks_async(self,tasks):loop=asyncio.get_event_loop()# Execute tasks concurrentlyfutures=[loop.run_in_executor(self.executor,task.run)fortaskintasks]results=awaitasyncio.gather(*futures,return_exceptions=True)returnresultsdefexecute_cpu_intensive_task(self,task):"""For CPU-intensive tasks, use process pool"""fromconcurrent.futuresimportProcessPoolExecutorwithProcessPoolExecutor()asexecutor:future=executor.submit(task.run)returnfuture.result()
classCachingManager:def__init__(self):self.cache={}self.cache_hits=0self.cache_misses=0defget_or_compute(self,key,compute_fn,ttl=3600):current_time=time.time()# Check if cached and not expiredifkeyinself.cache:cached_item=self.cache[key]ifcurrent_time-cached_item["timestamp"]<ttl:self.cache_hits+=1returncached_item["value"]# Compute and cacheself.cache_misses+=1result=compute_fn()self.cache[key]={"value":result,"timestamp":current_time}returnresultdefget_cache_stats(self):total_requests=self.cache_hits+self.cache_misseshit_rate=self.cache_hits/total_requestsiftotal_requests>0else0return{"hits":self.cache_hits,"misses":self.cache_misses,"hit_rate":hit_rate}
classApplicationMetrics:def__init__(self,monitor):self.monitor=monitordeftrack_agent_performance(self,agent_id,task_duration,success):self.monitor.record_metric(f"agent_{agent_id}_task_duration",task_duration)self.monitor.record_metric(f"agent_{agent_id}_success_rate",1ifsuccesselse0)deftrack_system_health(self):importpsutil# System metricscpu_percent=psutil.cpu_percent()memory_percent=psutil.virtual_memory().percentself.monitor.record_metric("system_cpu_percent",cpu_percent)self.monitor.record_metric("system_memory_percent",memory_percent)# Application metricsactive_agents=len(self.agent_manager.list_agents())self.monitor.record_metric("active_agents_count",active_agents)
fromagenticaiframework.securityimportSecurityManager# Production security configurationsecurity=SecurityManager(enable_injection_detection=True,enable_input_validation=True,enable_rate_limiting=True,enable_content_filtering=True,enable_audit_logging=True)
fromagenticaiframework.securityimportAuditLoggeraudit_logger=AuditLogger(log_file="/var/log/agenticai/security.log",retention_days=90)# Log all security-relevant eventsaudit_logger.log_event("user_authentication",{"user_id":user_id,"timestamp":datetime.now().isoformat(),"ip_address":request.remote_addr,"success":True})audit_logger.log_event("prompt_injection_blocked",{"user_id":user_id,"pattern_matched":"ignore_instructions","confidence":0.95})
fromagenticaiframework.securityimportRateLimiter# Different limits for different endpointsapi_limiter=RateLimiter(max_requests=100,window_seconds=60)expensive_limiter=RateLimiter(max_requests=10,window_seconds=60)@app.route('/api/query')defhandle_query(user_id:str):ifnotapi_limiter.check_rate_limit(user_id)['allowed']:return{"error":"Rate limit exceeded"},429# Process requestreturnprocess_query()@app.route('/api/expensive_operation')defhandle_expensive(user_id:str):ifnotexpensive_limiter.check_rate_limit(user_id)['allowed']:return{"error":"Rate limit exceeded"},429# Process expensive requestreturnprocess_expensive_operation()
fromagenticaiframework.securityimportPromptInjectionDetectordetector=PromptInjectionDetector()# Add new threat patterns as they emergedetector.add_pattern(r"new_attack_pattern_here",severity="high")# Update blocked wordscontent_filter.add_blocked_word("new_spam_term",category="spam")# Review and update regularlydefupdate_security_patterns():# Load latest patterns from threat intelligencepatterns=load_latest_threat_patterns()forpatterninpatterns:detector.add_pattern(pattern['regex'],pattern['severity'])
These best practices will help you build robust, secure, scalable, and maintainable applications with AgenticAI Framework. Remember to adapt these patterns to your specific use case and requirements.