# Create task with dependencies and custom execution logicclassDataProcessingTask(Task):defexecute(self):"""Custom execution logic"""input_path=self.config.get("input_path")batch_size=self.config.get("batch_size",50)# Implement processing logicprocessed_items=[]forbatchinself.get_data_batches(input_path,batch_size):processed_batch=self.process_batch(batch)processed_items.extend(processed_batch)return{"processed_count":len(processed_items),"items":processed_items,"status":"completed"}defget_data_batches(self,path,batch_size):"""Load data in batches"""# Implementation for loading datapassdefprocess_batch(self,batch):"""Process a single batch"""# Implementation for processingpass# Create and configure advanced taskadvanced_task=DataProcessingTask(name="advanced_data_processing",description="Advanced data processing with custom logic",priority=1,dependencies=["data_validation","schema_check"],config={"input_path":"/data/raw","output_path":"/data/processed","batch_size":100,"validation_rules":["not_null","type_check"],"processing_mode":"parallel"})
importlogginglogger=logging.getLogger(__name__)fromagenticaiframework.tasksimportTaskSchedulerfromdatetimeimportdatetime,timedeltascheduler=TaskScheduler()# Schedule task for specific timefuture_time=datetime.now()+timedelta(hours=2)scheduler.add_task(data_task,schedule_time=future_time)# Schedule recurring taskscheduler.schedule_recurring(task=backup_task,interval=timedelta(hours=24)# Run daily)# Run pending tasksexecuted_tasks=scheduler.run_pending()logger.info(f"Executed {len(executed_tasks)} tasks")
classConditionalScheduler(TaskScheduler):defadd_conditional_task(self,task:Task,condition_func:Callable):"""Add task that executes when condition is met"""self.conditional_tasks.append({"task":task,"condition":condition_func})defcheck_conditions(self):"""Check and execute tasks whose conditions are met"""foriteminself.conditional_tasks:ifitem["condition"]():self.add_task(item["task"])self.conditional_tasks.remove(item)# Usagedefdata_available():returnos.path.exists("/data/new_files")conditional_scheduler=ConditionalScheduler()conditional_scheduler.add_conditional_task(process_new_data_task,data_available)
classCronScheduler(TaskScheduler):defschedule_cron(self,task:Task,cron_expression:str):"""Schedule task using cron expression"""# Parse cron expression and schedule accordinglyschedule_info=self.parse_cron(cron_expression)self.cron_tasks.append({"task":task,"schedule":schedule_info})defparse_cron(self,expression:str):"""Parse cron expression (simplified implementation)"""# Format: minute hour day month weekdayparts=expression.split()return{"minute":parts[0]iflen(parts)>0else"*","hour":parts[1]iflen(parts)>1else"*","day":parts[2]iflen(parts)>2else"*","month":parts[3]iflen(parts)>3else"*","weekday":parts[4]iflen(parts)>4else"*"}# Usagecron_scheduler=CronScheduler()# Run every day at 2:30 AMcron_scheduler.schedule_cron(daily_report_task,"30 2 * * *")# Run every Monday at 9:00 AM cron_scheduler.schedule_cron(weekly_backup_task,"0 9 * * 1")
importlogginglogger=logging.getLogger(__name__)fromagenticaiframework.tasksimportTaskQueue# Create priority queuequeue=TaskQueue(queue_type="priority")# Add tasks with different prioritieshigh_priority_task=Task("urgent_fix",priority=1)medium_priority_task=Task("feature_update",priority=5)low_priority_task=Task("cleanup",priority=9)queue.enqueue(medium_priority_task)queue.enqueue(high_priority_task)# Will be processed firstqueue.enqueue(low_priority_task)# Process tasks in priority orderwhilequeue.get_queue_size()>0:task=queue.dequeue()logger.info(f"Processing: {task.name} (priority: {task.priority})")task.execute()
# Create FIFO (First In, First Out) queuefifo_queue=TaskQueue(queue_type="fifo")# Add tasks in ordertasks=[Task("step_1",description="Initialize system"),Task("step_2",description="Load configuration"),Task("step_3",description="Start processing")]fortaskintasks:fifo_queue.enqueue(task)# Process in orderwhilefifo_queue.get_queue_size()>0:task=fifo_queue.dequeue()task.execute()
importlogginglogger=logging.getLogger(__name__)importasynciofromconcurrent.futuresimportThreadPoolExecutorclassParallelWorkflow:def__init__(self,name:str,max_workers:int=4):self.name=nameself.tasks=[]self.max_workers=max_workersself.status="not_started"defadd_task(self,task:Task):"""Add task to parallel workflow"""self.tasks.append(task)defexecute_parallel(self):"""Execute tasks in parallel using threads"""self.status="running"withThreadPoolExecutor(max_workers=self.max_workers)asexecutor:# Submit all tasksfuture_to_task={executor.submit(task.execute):taskfortaskinself.tasks}results={}forfutureinconcurrent.futures.as_completed(future_to_task):task=future_to_task[future]try:result=future.result()results[task.name]=resultlogger.info(f"Completed: {task.name}")exceptExceptionase:results[task.name]={"error":str(e)}logger.info(f"Failed: {task.name} - {e}")self.status="completed"returnresultsasyncdefexecute_async(self):"""Execute tasks asynchronously"""self.status="running"asyncdefrun_task(task):try:# Convert synchronous task to asyncloop=asyncio.get_event_loop()result=awaitloop.run_in_executor(None,task.execute)returntask.name,resultexceptExceptionase:returntask.name,{"error":str(e)}# Run all tasks concurrentlytask_coroutines=[run_task(task)fortaskinself.tasks]results=awaitasyncio.gather(*task_coroutines)self.status="completed"returndict(results)# Usageparallel_workflow=ParallelWorkflow("batch_processing",max_workers=8)parallel_workflow.add_task(Task("process_batch_1"))parallel_workflow.add_task(Task("process_batch_2"))parallel_workflow.add_task(Task("process_batch_3"))# Execute with threadsresults=parallel_workflow.execute_parallel()# Or execute asynchronously# results = asyncio.run(parallel_workflow.execute_async())
importlogginglogger=logging.getLogger(__name__)classCircuitBreakerTask(Task):def__init__(self,name:str,failure_threshold:int=5,recovery_timeout:int=60,**kwargs):super().__init__(name,**kwargs)self.failure_threshold=failure_thresholdself.recovery_timeout=recovery_timeoutself.failure_count=0self.last_failure_time=Noneself.state="closed"# closed, open, half-opendefexecute_with_circuit_breaker(self):"""Execute task with circuit breaker pattern"""ifself.state=="open":iftime.time()-self.last_failure_time>self.recovery_timeout:self.state="half-open"logger.info(f"Circuit breaker for {self.name} moving to half-open state")else:raiseException(f"Circuit breaker open for task {self.name}")try:result=self.execute()ifself.state=="half-open":self.state="closed"self.failure_count=0logger.info(f"Circuit breaker for {self.name} closed - service recovered")returnresultexceptExceptionase:self.failure_count+=1self.last_failure_time=time.time()ifself.failure_count>=self.failure_threshold:self.state="open"logger.info(f"Circuit breaker for {self.name} opened due to {self.failure_count} failures")raisee# Usagecb_task=CircuitBreakerTask(name="external_service_call",failure_threshold=3,recovery_timeout=30)
importpsutilclassResourceMonitoringTask(MetricsTask):defexecute_with_monitoring(self):"""Execute task while monitoring system resources"""# Capture initial resource stateinitial_memory=psutil.virtual_memory().usedinitial_cpu=psutil.cpu_percent()start_time=time.time()try:result=self.execute_with_metrics()# Capture final resource statefinal_memory=psutil.virtual_memory().usedfinal_cpu=psutil.cpu_percent()duration=time.time()-start_time# Calculate resource usagememory_delta=final_memory-initial_memory# Store resource metricsself.resource_metrics={"duration":duration,"memory_used":memory_delta,"peak_memory":psutil.virtual_memory().used,"cpu_usage":final_cpu,"timestamp":start_time}returnresultexceptExceptionase:# Capture resource state even on failureduration=time.time()-start_timeself.resource_metrics={"duration":duration,"memory_used":psutil.virtual_memory().used-initial_memory,"peak_memory":psutil.virtual_memory().used,"cpu_usage":psutil.cpu_percent(),"timestamp":start_time,"error":str(e)}raiseedefget_resource_stats(self):"""Get resource usage statistics"""returngetattr(self,'resource_metrics',{})
importlogginglogger=logging.getLogger(__name__)fromagenticaiframework.agentsimportAgentclassTaskExecutingAgent(Agent):def__init__(self,name:str,role:str,capabilities:list[str],config:Dict):super().__init__(name,role,capabilities,config)self.task_queue=TaskQueue()self.task_history=[]defassign_task(self,task:Task):"""Assign task to agent"""self.task_queue.enqueue(task)logger.info(f"Task {task.name} assigned to agent {self.name}")defprocess_tasks(self):"""Process all queued tasks"""whileself.task_queue.get_queue_size()>0:task=self.task_queue.dequeue()try:logger.info(f"Agent {self.name} executing task: {task.name}")result=task.execute()task.status="completed"task.result=resultexceptExceptionase:task.status="failed"task.error=str(e)logger.info(f"Task {task.name} failed: {e}")self.task_history.append(task)defget_task_summary(self):"""Get summary of completed tasks"""completed=[tfortinself.task_historyift.status=="completed"]failed=[tfortinself.task_historyift.status=="failed"]return{"total_tasks":len(self.task_history),"completed":len(completed),"failed":len(failed),"success_rate":len(completed)/max(1,len(self.task_history))}# Usageagent=TaskExecutingAgent(name="TaskProcessor",role="Task Execution Specialist",capabilities=["task_execution","data_processing"],config={})# Assign tasks to agentagent.assign_task(Task("process_data_batch_1"))agent.assign_task(Task("process_data_batch_2"))agent.assign_task(Task("generate_report"))# Process all tasksagent.process_tasks()# Get summarysummary=agent.get_task_summary()logger.info(f"Agent processed {summary['total_tasks']} tasks with {summary['success_rate']:.2%} success rate")
fromagenticaiframework.memoryimportMemoryManagerclassMemoryAwareTask(Task):def__init__(self,name:str,**kwargs):super().__init__(name,**kwargs)self.memory=MemoryManager()defexecute_with_memory(self):"""Execute task with memory context"""# Retrieve relevant context from memorycontext=self.memory.retrieve(f"context_{self.name}")# Execute task with contextresult=self.execute()# Store result in memory for future tasksself.memory.store(f"result_{self.name}",result)# Store execution metadataself.memory.store(f"execution_time_{self.name}",time.time())returnresultdefget_memory_context(self,keys:list[str]):"""Retrieve specific keys from memory"""context={}forkeyinkeys:value=self.memory.retrieve(key)ifvalueisnotNone:context[key]=valuereturncontext
importlogginglogger=logging.getLogger(__name__)classRobustTask(Task):def__init__(self,name:str,**kwargs):super().__init__(name,**kwargs)self.error_handlers={}self.fallback_function=kwargs.get("fallback_function")defregister_error_handler(self,error_type:type,handler:Callable):"""Register specific error handler"""self.error_handlers[error_type]=handlerdefexecute_with_error_handling(self):"""Execute task with comprehensive error handling"""try:returnself.execute()exceptExceptionase:# Try specific error handlerforerror_type,handlerinself.error_handlers.items():ifisinstance(e,error_type):try:returnhandler(e,self)exceptExceptionashandler_error:logger.info(f"Error handler failed: {handler_error}")# Try fallback functionifself.fallback_function:try:returnself.fallback_function(e,self)exceptExceptionasfallback_error:logger.info(f"Fallback function failed: {fallback_error}")# Re-raise original exception if no handler workedraiseedefcreate_fallback_result(self,error:Exception):"""Create a safe fallback result"""return{"status":"completed_with_errors","error":str(error),"fallback_used":True,"partial_result":None}# Usage with error handlingdefhandle_connection_error(error,task):logger.info(f"Handling connection error for {task.name}: {error}")# Implement retry logic or alternative approachreturntask.create_fallback_result(error)defhandle_validation_error(error,task):logger.info(f"Handling validation error for {task.name}: {error}")# Implement data cleaning or user notificationreturn{"status":"validation_failed","error":str(error)}robust_task=RobustTask(name="api_data_fetch",fallback_function=lambdae,t:{"status":"fallback","data":[]})robust_task.register_error_handler(ConnectionError,handle_connection_error)robust_task.register_error_handler(ValueError,handle_validation_error)
def has_cycle(task_name):
if task_name in rec_stack:
return True
if task_name in visited:
return False
visited.add(task_name)
rec_stack.add(task_name)
task = get_task_by_name(task_name)
for dep in task.dependencies:
if has_cycle(dep):
return True
rec_stack.remove(task_name)
return False
for task in tasks:
if has_cycle(task.name):
logger.info(f"Dependency cycle detected involving task: {task.name}")
return True
return False
```
Memory Leaks in Long-Running Tasks ```python import logging
importsignalclassTimeoutTask(Task):defexecute_with_timeout(self,timeout_seconds:int=300):deftimeout_handler(signum,frame):raiseTimeoutError(f"Task {self.name} timed out after {timeout_seconds} seconds")# Set timeoutsignal.signal(signal.SIGALRM,timeout_handler)signal.alarm(timeout_seconds)try:result=self.execute()signal.alarm(0)# Cancel timeoutreturnresultexceptExceptionase:signal.alarm(0)# Cancel timeoutraisee
classDebugTask(Task):def__init__(self,name:str,debug_mode:bool=False,**kwargs):super().__init__(name,**kwargs)self.debug_mode=debug_modeself.debug_info=[]defdebug_execute(self):"""Execute task with comprehensive debugging"""ifnotself.debug_mode:returnself.execute()# Capture execution contextcontext={"task_name":self.name,"start_time":time.time(),"memory_before":psutil.virtual_memory().used,"config":self.config.copy()}self.debug_info.append(f"Starting task {self.name}")self.debug_info.append(f"Config: {context['config']}")try:result=self.execute()context["end_time"]=time.time()context["duration"]=context["end_time"]-context["start_time"]context["memory_after"]=psutil.virtual_memory().usedcontext["memory_used"]=context["memory_after"]-context["memory_before"]context["result_size"]=len(str(result))self.debug_info.append(f"Task completed in {context['duration']:.2f}s")self.debug_info.append(f"Memory used: {context['memory_used']/1024/1024:.2f}MB")self.debug_info.append(f"Result size: {context['result_size']} characters")returnresultexceptExceptionase:context["error"]=str(e)context["end_time"]=time.time()context["duration"]=context["end_time"]-context["start_time"]self.debug_info.append(f"Task failed after {context['duration']:.2f}s")self.debug_info.append(f"Error: {context['error']}")raiseedefget_debug_report(self):"""Get comprehensive debug report"""return{"task_name":self.name,"debug_info":self.debug_info,"config":self.config,"status":self.status}
This comprehensive Tasks module documentation provides everything needed to effectively use tasks within the AgenticAI Framework, from basic concepts to advanced patterns and troubleshooting techniques.