AgenticAI Framework provides 6 specialized state managers for complete control over agent lifecycles, workflow execution, and system state with multiple persistence backends.
fromagenticaiframework.stateimport(StateManager,StateConfig,MemoryBackend,FileBackend,RedisBackend,)# In-memory state (development)state_manager=StateManager(config=StateConfig(backend="memory",auto_save=True,save_interval=30,))# File-based state (single instance)state_manager=StateManager(config=StateConfig(backend="file",file_path="./state/app_state.json",auto_save=True,))# Redis state (production/distributed)state_manager=StateManager(config=StateConfig(backend="redis",redis_url="redis://localhost:6379",key_prefix="agenticai:",ttl=3600,))
importlogginglogger=logging.getLogger(__name__)# Save stateawaitstate_manager.save("agent:researcher",{"status":"running","current_task":"analyze_data","progress":0.45,})# Get stateagent_state=awaitstate_manager.get("agent:researcher")logger.info(f"Status: {agent_state['status']}")# Update partial stateawaitstate_manager.update("agent:researcher",{"progress":0.75,})# Delete stateawaitstate_manager.delete("agent:researcher")# List all keyskeys=awaitstate_manager.list_keys("agent:*")
importlogginglogger=logging.getLogger(__name__)# Subscribe to state changesasyncdefon_state_change(key:str,old_value:dict,new_value:dict):logger.info(f"State changed for {key}")logger.info(f" Old: {old_value}")logger.info(f" New: {new_value}")state_manager.subscribe("agent:*",on_state_change)# Unsubscribestate_manager.unsubscribe("agent:*",on_state_change)
importlogginglogger=logging.getLogger(__name__)fromagenticaiframework.stateimport(AgentStateStore,AgentSnapshot,AgentCheckpoint,)# Initialize agent state storeagent_store=AgentStateStore(persistence="redis",redis_url="redis://localhost:6379",)# Create a snapshot of agent statesnapshot=AgentSnapshot(agent_id="researcher_01",timestamp=datetime.now(),memory_state=agent.memory.export(),tool_state=agent.tools.get_state(),conversation_history=agent.conversation_history,metadata={"version":"1.0","task_count":42,})# Save snapshotawaitagent_store.save_snapshot(snapshot)# List available snapshotssnapshots=awaitagent_store.list_snapshots("researcher_01")forsnapinsnapshots:logger.info(f"Snapshot: {snap.timestamp} - {snap.metadata}")
# Create checkpoint during long-running taskscheckpoint=AgentCheckpoint(agent_id="researcher_01",checkpoint_id="task_step_5",step=5,state={"current_task":"data_analysis","intermediate_results":results,"tokens_used":15000,},recoverable=True,)awaitagent_store.save_checkpoint(checkpoint)# Resume from checkpointcheckpoint=awaitagent_store.get_checkpoint(agent_id="researcher_01",checkpoint_id="task_step_5")ifcheckpoint:agent.restore_from_checkpoint(checkpoint.state)
importlogginglogger=logging.getLogger(__name__)fromagenticaiframework.stateimport(WorkflowStateManager,WorkflowState,WorkflowStatus,StepState,)# Initialize workflow state managerworkflow_state=WorkflowStateManager(persistence="redis",checkpoint_interval=5,# Checkpoint every 5 steps)# Create workflow statestate=WorkflowState(workflow_id="research_pipeline",status=WorkflowStatus.PENDING,total_steps=10,current_step=0,context={},)awaitworkflow_state.create(state)# Update step progressawaitworkflow_state.update_step(workflow_id="research_pipeline",step=StepState(step_number=1,name="data_collection",status="completed",result={"documents":150},duration_ms=5400,))# Get current statecurrent=awaitworkflow_state.get("research_pipeline")logger.info(f"Progress: {current.current_step}/{current.total_steps}")
fromagenticaiframework.stateimportWorkflowCheckpoint# Create checkpointcheckpoint=WorkflowCheckpoint(workflow_id="research_pipeline",checkpoint_id="step_5_complete",step=5,state=current_state,context=workflow_context,timestamp=datetime.now(),)awaitworkflow_state.checkpoint(checkpoint)# List checkpointscheckpoints=awaitworkflow_state.list_checkpoints("research_pipeline")# Resume from checkpointawaitworkflow_state.resume_from(workflow_id="research_pipeline",checkpoint_id="step_5_complete")
importlogginglogger=logging.getLogger(__name__)# Update agent statusawaitorch_state.update_agent_status(team_id="research_team",agent_id="researcher",status="working",current_task="task_001",)# Get team overviewteam=awaitorch_state.get_team("research_team")foragent_id,agent_stateinteam.agents.items():logger.info(f"{agent_id}: {agent_state.status}")# Get available agentsavailable=awaitorch_state.get_available_agents(team_id="research_team",capability="research")
importlogginglogger=logging.getLogger(__name__)fromagenticaiframework.stateimportSourceState# Track source sync statussource_state=SourceState(source_id="confluence_docs",knowledge_base_id="company_docs",sync_status=SyncStatus.SYNCING,last_sync=datetime.now()-timedelta(hours=1),documents_synced=250,documents_pending=50,)awaitkb_state.update_source_state(source_state)# Get all sources for knowledge basesources=awaitkb_state.get_sources("company_docs")forsourceinsources:logger.info(f"{source.source_id}: {source.sync_status}")
importlogginglogger=logging.getLogger(__name__)fromagenticaiframework.stateimportSTTState,TranscriptionStatus# Track STT statestt_state=STTState(session_id=session_id,status=TranscriptionStatus.TRANSCRIBING,audio_received_bytes=45000,transcription_progress=0.6,interim_transcript="Hello, I would like to...",)awaitspeech_state.update_stt_state(stt_state)# Get current STT statecurrent_stt=awaitspeech_state.get_stt_state(session_id)logger.info(f"Progress: {current_stt.transcription_progress:.0%}")logger.info(f"Interim: {current_stt.interim_transcript}")
importlogginglogger=logging.getLogger(__name__)fromagenticaiframework.stateimportTTSState# Track TTS statetts_state=TTSState(session_id=session_id,text_queue=["Hello! How can I help you today?"],audio_generated_bytes=12000,audio_played_bytes=8000,is_speaking=True,)awaitspeech_state.update_tts_state(tts_state)# Check if speakingcurrent_tts=awaitspeech_state.get_tts_state(session_id)ifcurrent_tts.is_speaking:logger.info("Agent is currently speaking...")
fromagenticaiframework.stateimportVoiceConversationState# Track full conversation stateconversation_state=VoiceConversationState(session_id=session_id,turn_count=5,current_speaker="user",stt_state=stt_state,tts_state=tts_state,transcript=[{"role":"user","content":"Hello"},{"role":"assistant","content":"Hi! How can I help?"},# ...])awaitspeech_state.update_conversation_state(conversation_state)# End sessionawaitspeech_state.end_session(session_id=session_id,reason="user_ended",)
fromagenticaiframework.stateimportFileBackendbackend=FileBackend(base_path="./state",format="json",# or "msgpack" for better performancecompression=True,)# Persists to local files# Best for: Single instance, small deployments
fromagenticaiframework.stateimportRedisBackendbackend=RedisBackend(url="redis://localhost:6379",db=0,key_prefix="agenticai:",connection_pool_size=10,)# Distributed, high performance# Supports pub/sub for state sync# Best for: Production, multi-instance