asyncwithHTTPClient()asclient:# GET requestresponse=awaitclient.get("https://api.example.com/users",params={"limit":10,"offset":0})# POST request with JSON bodyresponse=awaitclient.post("https://api.example.com/users",json={"name":"Alice","email":"alice@example.com"})# PUT requestresponse=awaitclient.put("https://api.example.com/users/123",json={"name":"Alice Updated"})# DELETE requestresponse=awaitclient.delete("https://api.example.com/users/123")# PATCH requestresponse=awaitclient.patch("https://api.example.com/users/123",json={"status":"active"})
importlogginglogger=logging.getLogger(__name__)asyncwithHTTPClient()asclient:response=awaitclient.get("https://api.example.com/data")# Check statusifresponse.is_success:data=response.json()elifresponse.status_code==404:logger.info("Resource not found")else:logger.info(f"Error: {response.status_code}")# Access headerscontent_type=response.headers.get("content-type")# Get raw contenttext=response.textbinary=response.content
importlogginglogger=logging.getLogger(__name__)asyncwithWebSocketClient("wss://chat.example.com/ws")asws:# Send chat messageawaitws.send({"type":"chat","message":"Hello, how can you help me?","user_id":"user_123"})# Receive streaming responseresponse_text=""asyncformessageinws:ifmessage["type"]=="token":response_text+=message["content"]print(message["content"],end="",flush=True)elifmessage["type"]=="done":breaklogger.info(f"\nFull response: {response_text}")
importlogginglogger=logging.getLogger(__name__)fromagenticaiframework.communicationimportSSEClientasyncdefstream_completion(prompt:str):"""Stream AI completion tokens."""asyncwithSSEClient("https://api.example.com/completions",method="POST",json={"prompt":prompt,"stream":True})assse:full_response=""asyncforeventinsse:ifevent.event=="token":token=event.datafull_response+=tokenyieldtokenelifevent.event=="done":breakreturnfull_response# Usageasyncfortokeninstream_completion("Tell me a story"):logger.info(token,end="",flush=True)
fromagenticaiframework.communicationimportMQTTClient,MQTTConfigconfig=MQTTConfig(# Connectionhost="broker.example.com",port=1883,client_id="agent_client_01",# Authenticationusername="user",password="password",# TLS/SSLuse_tls=True,ca_certs="/path/to/ca.pem",# Quality of Servicedefault_qos=1,# 0: At most once, 1: At least once, 2: Exactly once# Keep alivekeepalive=60,# Clean sessionclean_session=True)mqtt=MQTTClient(config=config)
asyncwithMQTTClient(broker_url)asmqtt:# Subscribe to single topicawaitmqtt.subscribe("agents/agent_01/status")# Subscribe with single-level wildcard (+)awaitmqtt.subscribe("agents/+/status")# Any agent's status# Subscribe with multi-level wildcard (#)awaitmqtt.subscribe("agents/#")# All agent messages# Multiple subscriptionsawaitmqtt.subscribe([("agents/+/status",1),# QoS 1("tasks/+/result",2),# QoS 2])
asyncwithGRPCClient(target)asclient:# Server streams responsesasyncforresponseinclient.stream_call(service="AgentService",method="StreamTokens",request={"prompt":"Tell me a story"}):print(response["token"],end="",flush=True)
# Generate from .proto filefromagenticaiframework.communicationimportgenerate_grpc_client# Auto-generate client from protoclient=generate_grpc_client(proto_file="agent_service.proto",target="localhost:50051")# Use generated methodsresponse=awaitclient.AgentService.ExecuteTask(task_id="123",input="Process this")
fromagenticaiframework.communicationimportConnectionPool# Create connection poolpool=ConnectionPool(protocol="http",max_connections=10,max_connections_per_host=5,connection_timeout=30,idle_timeout=300)# Use pooled connectionsasyncwithpool.get_connection("https://api.example.com")asconn:response=awaitconn.get("/data")
importlogginglogger=logging.getLogger(__name__)fromagenticaiframework.communicationimportConnectionManagermanager=ConnectionManager()# Register connectionsmanager.register("api",HTTPClient("https://api.example.com"))manager.register("broker",MQTTClient("mqtt://broker.example.com"))# Health check all connectionshealth=awaitmanager.health_check()forname,statusinhealth.items():logger.info(f"{name}: {'healthy'ifstatus.is_healthyelse'unhealthy'}")