Skip to content

State ManagementΒΆ

Comprehensive State Management

Production-ready state management with persistence, recovery, and real-time tracking for agents, workflows, and orchestration

Enterprise State Management

Part of 400+ modules with 7 specialized state managers and 14 enterprise caching features. See Enterprise Documentation.

7
State Managers
14
Enterprise Caching
Auto
Recovery
Real-time
Checkpointing

OverviewΒΆ

AgenticAI Framework provides 6 specialized state managers for complete control over agent lifecycles, workflow execution, and system state with multiple persistence backends.

Python
1
2
3
4
5
6
7
8
9
from agenticaiframework.state import (
    StateManager, # Core state management
    AgentStateStore, # Agent state & snapshots
    WorkflowStateManager, # Workflow execution state
    OrchestrationStateManager, # Multi-agent coordination
    KnowledgeStateManager, # Knowledge base state
    ToolStateManager, # Tool execution state
    SpeechStateManager, # Speech session state
)

State Manager ArchitectureΒΆ

StateManager

Core State Engine

Central state management with pluggable backends

AgentStateStore

Agent Lifecycle

Snapshots, checkpoints, and recovery for agents

WorkflowStateManager

Workflow Tracking

Step-by-step execution state and checkpoints

OrchestrationStateManager

Team Coordination

Multi-agent state and task queue management

KnowledgeStateManager

Knowledge Base

Indexing progress and sync status tracking

ToolStateManager

Tool Execution

Execution state, caching, and retry management


Core StateManagerΒΆ

The central state management engine with pluggable persistence backends.

InitializationΒΆ

Python
from agenticaiframework.state import (
    StateManager,
    StateConfig,
    MemoryBackend,
    FileBackend,
    RedisBackend,
)

# In-memory state (development)
state_manager = StateManager(
    config=StateConfig(
        backend="memory",
        auto_save=True,
        save_interval=30,
    )
)

# File-based state (single instance)
state_manager = StateManager(
    config=StateConfig(
        backend="file",
        file_path="./state/app_state.json",
        auto_save=True,
    )
)

# Redis state (production/distributed)
state_manager = StateManager(
    config=StateConfig(
        backend="redis",
        redis_url="redis://localhost:6379",
        key_prefix="agenticai:",
        ttl=3600,
    )
)

Basic OperationsΒΆ

Python
import logging

logger = logging.getLogger(__name__)

# Save state
await state_manager.save("agent:researcher", {
    "status": "running",
    "current_task": "analyze_data",
    "progress": 0.45,
})

# Get state
agent_state = await state_manager.get("agent:researcher")
logger.info(f"Status: {agent_state['status']}")

# Update partial state
await state_manager.update("agent:researcher", {
    "progress": 0.75,
})

# Delete state
await state_manager.delete("agent:researcher")

# List all keys
keys = await state_manager.list_keys("agent:*")

State SubscriptionsΒΆ

Python
import logging

logger = logging.getLogger(__name__)

# Subscribe to state changes
async def on_state_change(key: str, old_value: dict, new_value: dict):
    logger.info(f"State changed for {key}")
    logger.info(f" Old: {old_value}")
    logger.info(f" New: {new_value}")

state_manager.subscribe("agent:*", on_state_change)

# Unsubscribe
state_manager.unsubscribe("agent:*", on_state_change)

AgentStateStoreΒΆ

Manages agent snapshots, checkpoints, and recovery.

Creating Agent SnapshotsΒΆ

Python
import logging

logger = logging.getLogger(__name__)

from agenticaiframework.state import (
    AgentStateStore,
    AgentSnapshot,
    AgentCheckpoint,
)

# Initialize agent state store
agent_store = AgentStateStore(
    persistence="redis",
    redis_url="redis://localhost:6379",
)

# Create a snapshot of agent state
snapshot = AgentSnapshot(
    agent_id="researcher_01",
    timestamp=datetime.now(),
    memory_state=agent.memory.export(),
    tool_state=agent.tools.get_state(),
    conversation_history=agent.conversation_history,
    metadata={
        "version": "1.0",
        "task_count": 42,
    }
)

# Save snapshot
await agent_store.save_snapshot(snapshot)

# List available snapshots
snapshots = await agent_store.list_snapshots("researcher_01")
for snap in snapshots:
    logger.info(f"Snapshot: {snap.timestamp} - {snap.metadata}")

CheckpointingΒΆ

Python
# Create checkpoint during long-running tasks
checkpoint = AgentCheckpoint(
    agent_id="researcher_01",
    checkpoint_id="task_step_5",
    step=5,
    state={
        "current_task": "data_analysis",
        "intermediate_results": results,
        "tokens_used": 15000,
    },
    recoverable=True,
)

await agent_store.save_checkpoint(checkpoint)

# Resume from checkpoint
checkpoint = await agent_store.get_checkpoint(
    agent_id="researcher_01",
    checkpoint_id="task_step_5"
)

if checkpoint:
    agent.restore_from_checkpoint(checkpoint.state)

Agent RecoveryΒΆ

Python
import logging

logger = logging.getLogger(__name__)

from agenticaiframework.state import AgentRecoveryManager

recovery_manager = AgentRecoveryManager(
    agent_store=agent_store,
    auto_recover=True,
    max_recovery_attempts=3,
)

# Register agent for recovery
recovery_manager.register(agent)

# Manual recovery
try:
    result = await agent.run(task)
except Exception as e:
    # Attempt recovery
    recovered = await recovery_manager.recover(
        agent_id=agent.id,
        from_checkpoint="latest",
    )

    if recovered:
        logger.info("Agent recovered successfully")
        result = await agent.resume()

WorkflowStateManagerΒΆ

Tracks workflow execution, steps, and enables pause/resume.

Workflow State TrackingΒΆ

Python
import logging

logger = logging.getLogger(__name__)

from agenticaiframework.state import (
    WorkflowStateManager,
    WorkflowState,
    WorkflowStatus,
    StepState,
)

# Initialize workflow state manager
workflow_state = WorkflowStateManager(
    persistence="redis",
    checkpoint_interval=5, # Checkpoint every 5 steps
)

# Create workflow state
state = WorkflowState(
    workflow_id="research_pipeline",
    status=WorkflowStatus.PENDING,
    total_steps=10,
    current_step=0,
    context={},
)

await workflow_state.create(state)

# Update step progress
await workflow_state.update_step(
    workflow_id="research_pipeline",
    step=StepState(
        step_number=1,
        name="data_collection",
        status="completed",
        result={"documents": 150},
        duration_ms=5400,
    )
)

# Get current state
current = await workflow_state.get("research_pipeline")
logger.info(f"Progress: {current.current_step}/{current.total_steps}")

Workflow CheckpointingΒΆ

Python
from agenticaiframework.state import WorkflowCheckpoint

# Create checkpoint
checkpoint = WorkflowCheckpoint(
    workflow_id="research_pipeline",
    checkpoint_id="step_5_complete",
    step=5,
    state=current_state,
    context=workflow_context,
    timestamp=datetime.now(),
)

await workflow_state.checkpoint(checkpoint)

# List checkpoints
checkpoints = await workflow_state.list_checkpoints("research_pipeline")

# Resume from checkpoint
await workflow_state.resume_from(
    workflow_id="research_pipeline",
    checkpoint_id="step_5_complete"
)

Pause and ResumeΒΆ

Python
import logging

logger = logging.getLogger(__name__)

# Pause workflow
await workflow_state.pause("research_pipeline")
status = await workflow_state.get_status("research_pipeline")
logger.info(f"Status: {status}") # WorkflowStatus.PAUSED

# Resume workflow
await workflow_state.resume("research_pipeline")

# Cancel workflow
await workflow_state.cancel(
    workflow_id="research_pipeline",
    reason="User requested cancellation"
)

OrchestrationStateManagerΒΆ

Manages multi-agent coordination and team state.

Team State ManagementΒΆ

Python
from agenticaiframework.state import (
    OrchestrationStateManager,
    TeamState,
    AgentCoordinationState,
    TaskQueueState,
)

# Initialize orchestration state
orch_state = OrchestrationStateManager(
    persistence="redis",
    sync_interval=1.0,
)

# Create team state
team_state = TeamState(
    team_id="research_team",
    agents={
        "researcher": AgentCoordinationState(
            agent_id="researcher",
            status="idle",
            capabilities=["research", "analysis"],
            current_task=None,
        ),
        "writer": AgentCoordinationState(
            agent_id="writer",
            status="idle",
            capabilities=["writing", "editing"],
            current_task=None,
        ),
    },
    task_queue=TaskQueueState(
        pending=[],
        in_progress=[],
        completed=[],
    ),
)

await orch_state.create_team(team_state)

Agent CoordinationΒΆ

Python
import logging

logger = logging.getLogger(__name__)

# Update agent status
await orch_state.update_agent_status(
    team_id="research_team",
    agent_id="researcher",
    status="working",
    current_task="task_001",
)

# Get team overview
team = await orch_state.get_team("research_team")
for agent_id, agent_state in team.agents.items():
    logger.info(f"{agent_id}: {agent_state.status}")

# Get available agents
available = await orch_state.get_available_agents(
    team_id="research_team",
    capability="research"
)

Task Queue ManagementΒΆ

Python
import logging

logger = logging.getLogger(__name__)

# Add task to queue
await orch_state.enqueue_task(
    team_id="research_team",
    task={
        "id": "task_002",
        "type": "analysis",
        "priority": "high",
        "data": {"topic": "AI trends"},
    }
)

# Assign task to agent
await orch_state.assign_task(
    team_id="research_team",
    task_id="task_002",
    agent_id="researcher",
)

# Complete task
await orch_state.complete_task(
    team_id="research_team",
    task_id="task_002",
    result={"analysis": "..."},
)

# Get queue status
queue = await orch_state.get_queue_status("research_team")
logger.info(f"Pending: {len(queue.pending)}")
logger.info(f"In Progress: {len(queue.in_progress)}")
logger.info(f"Completed: {len(queue.completed)}")

KnowledgeStateManagerΒΆ

Tracks knowledge base indexing and synchronization.

Indexing ProgressΒΆ

Python
import logging

logger = logging.getLogger(__name__)

from agenticaiframework.state import (
    KnowledgeStateManager,
    IndexingProgress,
    IndexingStatus,
    SyncStatus,
)

# Initialize knowledge state manager
kb_state = KnowledgeStateManager(
    persistence="redis",
)

# Track indexing progress
progress = IndexingProgress(
    knowledge_base_id="company_docs",
    status=IndexingStatus.IN_PROGRESS,
    total_documents=1000,
    processed_documents=450,
    failed_documents=5,
    started_at=datetime.now(),
)

await kb_state.update_indexing_progress(progress)

# Check progress
current = await kb_state.get_indexing_progress("company_docs")
logger.info(f"Progress: {current.processed_documents}/{current.total_documents}")
logger.info(f"Failed: {current.failed_documents}")

Source SynchronizationΒΆ

Python
import logging

logger = logging.getLogger(__name__)

from agenticaiframework.state import SourceState

# Track source sync status
source_state = SourceState(
    source_id="confluence_docs",
    knowledge_base_id="company_docs",
    sync_status=SyncStatus.SYNCING,
    last_sync=datetime.now() - timedelta(hours=1),
    documents_synced=250,
    documents_pending=50,
)

await kb_state.update_source_state(source_state)

# Get all sources for knowledge base
sources = await kb_state.get_sources("company_docs")
for source in sources:
    logger.info(f"{source.source_id}: {source.sync_status}")

Knowledge Base StateΒΆ

Python
import logging

logger = logging.getLogger(__name__)

from agenticaiframework.state import KnowledgeBaseState

# Get overall knowledge base state
kb_overview = await kb_state.get_knowledge_base_state("company_docs")

logger.info(f"Total Documents: {kb_overview.total_documents}")
logger.info(f"Total Embeddings: {kb_overview.total_embeddings}")
logger.info(f"Last Updated: {kb_overview.last_updated}")
logger.info(f"Storage Size: {kb_overview.storage_size_mb}MB")

ToolStateManagerΒΆ

Manages tool execution state and caching.

Tool Execution TrackingΒΆ

Python
from agenticaiframework.state import (
    ToolStateManager,
    ToolExecution,
    ToolExecutionStatus,
)

# Initialize tool state manager
tool_state = ToolStateManager(
    persistence="redis",
    cache_results=True,
    cache_ttl=3600,
)

# Track tool execution
execution = ToolExecution(
    execution_id="exec_001",
    tool_name="search_web",
    status=ToolExecutionStatus.RUNNING,
    started_at=datetime.now(),
    parameters={"query": "AI news"},
)

await tool_state.start_execution(execution)

# Update execution
await tool_state.complete_execution(
    execution_id="exec_001",
    result={"results": [...]},
    duration_ms=1500,
)

# Get execution history
history = await tool_state.get_execution_history(
    tool_name="search_web",
    limit=100,
)

Result CachingΒΆ

Python
import logging

logger = logging.getLogger(__name__)

from agenticaiframework.state import ToolCacheEntry

# Cache tool result
cache_entry = ToolCacheEntry(
    tool_name="search_web",
    parameters_hash="abc123",
    result={"results": [...]},
    created_at=datetime.now(),
    ttl=3600,
)

await tool_state.cache_result(cache_entry)

# Check cache before execution
cached = await tool_state.get_cached_result(
    tool_name="search_web",
    parameters={"query": "AI news"},
)

if cached:
    logger.info("Using cached result")
    result = cached.result
else:
    result = await tool.execute({"query": "AI news"})

Retry ManagementΒΆ

Python
from agenticaiframework.state import RetryState

# Track retry state
retry_state = RetryState(
    execution_id="exec_002",
    attempts=2,
    max_attempts=5,
    last_error="Connection timeout",
    next_retry_at=datetime.now() + timedelta(seconds=30),
    backoff_factor=2.0,
)

await tool_state.update_retry_state(retry_state)

# Get tools needing retry
pending_retries = await tool_state.get_pending_retries()
for retry in pending_retries:
    if retry.next_retry_at <= datetime.now():
        await retry_tool(retry)

Tool StatisticsΒΆ

Python
import logging

logger = logging.getLogger(__name__)

from agenticaiframework.state import ToolStats

# Get tool statistics
stats = await tool_state.get_tool_stats("search_web")

logger.info(f"Total Executions: {stats.total_executions}")
logger.info(f"Success Rate: {stats.success_rate:.2%}")
logger.info(f"Avg Duration: {stats.avg_duration_ms}ms")
logger.info(f"Cache Hit Rate: {stats.cache_hit_rate:.2%}")
logger.info(f"Error Rate: {stats.error_rate:.2%}")

SpeechStateManagerΒΆ

Manages speech session state for STT/TTS operations.

Session ManagementΒΆ

Python
from agenticaiframework.state import (
    SpeechStateManager,
    AudioSessionStatus,
    StreamingMode,
)

# Initialize speech state manager
speech_state = SpeechStateManager(
    persistence="redis",
)

# Create audio session
session_id = await speech_state.create_session(
    session_type="voice_conversation",
    streaming_mode=StreamingMode.BIDIRECTIONAL,
    config={
        "stt_provider": "openai",
        "tts_provider": "elevenlabs",
        "language": "en-US",
    }
)

# Update session status
await speech_state.update_session_status(
    session_id=session_id,
    status=AudioSessionStatus.ACTIVE,
)

STT State TrackingΒΆ

Python
import logging

logger = logging.getLogger(__name__)

from agenticaiframework.state import STTState, TranscriptionStatus

# Track STT state
stt_state = STTState(
    session_id=session_id,
    status=TranscriptionStatus.TRANSCRIBING,
    audio_received_bytes=45000,
    transcription_progress=0.6,
    interim_transcript="Hello, I would like to...",
)

await speech_state.update_stt_state(stt_state)

# Get current STT state
current_stt = await speech_state.get_stt_state(session_id)
logger.info(f"Progress: {current_stt.transcription_progress:.0%}")
logger.info(f"Interim: {current_stt.interim_transcript}")

TTS State TrackingΒΆ

Python
import logging

logger = logging.getLogger(__name__)

from agenticaiframework.state import TTSState

# Track TTS state
tts_state = TTSState(
    session_id=session_id,
    text_queue=["Hello! How can I help you today?"],
    audio_generated_bytes=12000,
    audio_played_bytes=8000,
    is_speaking=True,
)

await speech_state.update_tts_state(tts_state)

# Check if speaking
current_tts = await speech_state.get_tts_state(session_id)
if current_tts.is_speaking:
    logger.info("Agent is currently speaking...")

Voice Conversation StateΒΆ

Python
from agenticaiframework.state import VoiceConversationState

# Track full conversation state
conversation_state = VoiceConversationState(
    session_id=session_id,
    turn_count=5,
    current_speaker="user",
    stt_state=stt_state,
    tts_state=tts_state,
    transcript=[{"role": "user", "content": "Hello"},
        {"role": "assistant", "content": "Hi! How can I help?"},
        # ...
    ]
)

await speech_state.update_conversation_state(conversation_state)

# End session
await speech_state.end_session(
    session_id=session_id,
    reason="user_ended",
)

Persistence BackendsΒΆ

Memory Backend (Development)ΒΆ

Python
1
2
3
4
5
6
7
from agenticaiframework.state import MemoryBackend

backend = MemoryBackend()

# Fast, no persistence
# Data lost on restart
# Best for: Development, testing

File Backend (Single Instance)ΒΆ

Python
from agenticaiframework.state import FileBackend

backend = FileBackend(
    base_path="./state",
    format="json", # or "msgpack" for better performance
    compression=True,
)

# Persists to local files
# Best for: Single instance, small deployments

Redis Backend (Production)ΒΆ

Python
from agenticaiframework.state import RedisBackend

backend = RedisBackend(
    url="redis://localhost:6379",
    db=0,
    key_prefix="agenticai:",
    connection_pool_size=10,
)

# Distributed, high performance
# Supports pub/sub for state sync
# Best for: Production, multi-instance

Best PracticesΒΆ

Checkpoint Regularly

Create checkpoints at logical boundaries in long-running tasks to enable recovery without losing progress.

Choose Right Backend

Use memory for dev, file for single instance, Redis for distributed production deployments.

Clean Old State

Implement TTLs and cleanup routines to prevent unbounded state growth.

Monitor State Size

Track state storage size and set alerts for unusual growth patterns.


Next StepsΒΆ

Memory

Learn about memory management for agents

Orchestration

Multi-agent team coordination

Monitoring

Set up observability for your agents

Tracing

Debug agent execution flows