Comprehensive integration guide for APIs, databases, and message queues across 400+ modules
Enterprise Integrations
Part of 237 enterprise modules with 18 pre-built connectors for ServiceNow, GitHub, Azure DevOps, Slack, Salesforce, and more. See Enterprise Documentation.
This guide demonstrates various integration patterns for connecting AgenticAI Framework with external systems, APIs, databases, and services.
Enterprise Connectors
The framework includes 18 pre-built enterprise integration connectors for ServiceNow, GitHub, Azure DevOps, Slack, Teams, Salesforce, AWS, Azure, GCP, and more.
importaiohttpimportasyncioclassRESTAPIClient:"""Async REST API client"""def__init__(self,base_url:str,api_key:str=None):self.base_url=base_urlself.headers={"Content-Type":"application/json","Authorization":f"Bearer {api_key}"ifapi_keyelseNone}self.session=Noneasyncdef__aenter__(self):self.session=aiohttp.ClientSession(headers=self.headers)returnselfasyncdef__aexit__(self,exc_type,exc_val,exc_tb):awaitself.session.close()asyncdefget(self,endpoint:str,params:Dict=None)->Dict:"""GET request"""asyncwithself.session.get(f"{self.base_url}/{endpoint}",params=params)asresponse:response.raise_for_status()returnawaitresponse.json()asyncdefpost(self,endpoint:str,data:Dict)->Dict:"""POST request"""asyncwithself.session.post(f"{self.base_url}/{endpoint}",json=data)asresponse:response.raise_for_status()returnawaitresponse.json()asyncdefput(self,endpoint:str,data:Dict)->Dict:"""PUT request"""asyncwithself.session.put(f"{self.base_url}/{endpoint}",json=data)asresponse:response.raise_for_status()returnawaitresponse.json()asyncdefdelete(self,endpoint:str)->Dict:"""DELETE request"""asyncwithself.session.delete(f"{self.base_url}/{endpoint}")asresponse:response.raise_for_status()returnawaitresponse.json()# Usageasyncdeffetch_data_example():asyncwithRESTAPIClient("https://api.example.com",api_key="your-key")asclient:# GET requestusers=awaitclient.get("users",params={"limit":10})# POST requestnew_user=awaitclient.post("users",data={"name":"John","email":"john@example.com"})returnusers,new_user
fromagenticaiframework.agentsimportAgentclassAPIIntegrationAgent(Agent):"""Agent that integrates with external APIs"""def__init__(self,name:str,api_client:RESTAPIClient):super().__init__(name=name,role="api_integrator",capabilities=["api_calls"])self.api_client=api_clientasyncdeffetch_user_data(self,user_id:str):"""Fetch user data from external API"""try:user_data=awaitself.api_client.get(f"users/{user_id}")# Store in memoryself.memory_manager.store(key=f"user_{user_id}",value=user_data)returnuser_dataexceptExceptionase:self.logger.error(f"Failed to fetch user data: {e}")raiseasyncdefcreate_resource(self,resource_type:str,data:dict):"""Create resource via API"""result=awaitself.api_client.post(resource_type,data=data)returnresult# Usageasyncdefmain():asyncwithRESTAPIClient("https://api.example.com","api-key")asclient:agent=APIIntegrationAgent("api_agent",api_client=client)# Fetch datauser=awaitagent.fetch_user_data("user_123")# Create resourceresource=awaitagent.create_resource("projects",{"name":"New Project","description":"AI-powered project"})
importasyncpgclassPostgreSQLAdapter:"""PostgreSQL database adapter"""def__init__(self,connection_string:str):self.connection_string=connection_stringself.pool=Noneasyncdefconnect(self):"""Create connection pool"""self.pool=awaitasyncpg.create_pool(self.connection_string,min_size=10,max_size=50,command_timeout=60)asyncdefclose(self):"""Close connection pool"""awaitself.pool.close()asyncdefexecute(self,query:str,*args)->str:"""Execute query without returning results"""asyncwithself.pool.acquire()asconn:returnawaitconn.execute(query,*args)asyncdeffetch_one(self,query:str,*args)->Dict:"""Fetch single row"""asyncwithself.pool.acquire()asconn:row=awaitconn.fetchrow(query,*args)returndict(row)ifrowelseNoneasyncdeffetch_all(self,query:str,*args)->list[Dict]:"""Fetch all rows"""asyncwithself.pool.acquire()asconn:rows=awaitconn.fetch(query,*args)return[dict(row)forrowinrows]asyncdeftransaction(self,queries:list[tuple]):"""Execute multiple queries in transaction"""asyncwithself.pool.acquire()asconn:asyncwithconn.transaction():results=[]forquery,argsinqueries:result=awaitconn.execute(query,*args)results.append(result)returnresults# Usage with AgentclassDatabaseAgent(Agent):"""Agent with database integration"""def__init__(self,name:str,db_adapter:PostgreSQLAdapter):super().__init__(name=name,role="data_manager",capabilities=["database"])self.db=db_adapterasyncdefstore_result(self,task_id:str,result:dict):"""Store task result in database"""query=""" INSERT INTO task_results (task_id, result, created_at) VALUES ($1, $2, NOW()) RETURNING id """result_id=awaitself.db.fetch_one(query,task_id,json.dumps(result))returnresult_idasyncdefget_user_history(self,user_id:str):"""Retrieve user history from database"""query=""" SELECT * FROM user_history WHERE user_id = $1 ORDER BY created_at DESC LIMIT 100 """history=awaitself.db.fetch_all(query,user_id)returnhistory
importaioredisimportjsonclassRedisAdapter:"""Redis cache adapter"""def__init__(self,redis_url:str):self.redis_url=redis_urlself.redis=Noneasyncdefconnect(self):"""Connect to Redis"""self.redis=awaitaioredis.from_url(self.redis_url,encoding="utf-8",decode_responses=True)asyncdefclose(self):"""Close Redis connection"""awaitself.redis.close()asyncdefget(self,key:str)->Any:"""Get value from cache"""value=awaitself.redis.get(key)returnjson.loads(value)ifvalueelseNoneasyncdefset(self,key:str,value:Any,ttl:int=None):"""Set value in cache"""serialized=json.dumps(value)ifttl:awaitself.redis.setex(key,ttl,serialized)else:awaitself.redis.set(key,serialized)asyncdefdelete(self,key:str):"""Delete key from cache"""awaitself.redis.delete(key)asyncdefexists(self,key:str)->bool:"""Check if key exists"""returnawaitself.redis.exists(key)asyncdefexpire(self,key:str,ttl:int):"""Set expiration on key"""awaitself.redis.expire(key,ttl)asyncdefget_many(self,keys:list[str])->dict[str,Any]:"""Get multiple values"""values=awaitself.redis.mget(keys)return{key:json.loads(value)ifvalueelseNoneforkey,valueinzip(keys,values)}asyncdefset_many(self,mapping:dict[str,Any],ttl:int=None):"""Set multiple values"""serialized={k:json.dumps(v)fork,vinmapping.items()}awaitself.redis.mset(serialized)ifttl:forkeyinmapping.keys():awaitself.redis.expire(key,ttl)# Usage with cachingclassCachedAgent(Agent):"""Agent with Redis caching"""def__init__(self,name:str,redis:RedisAdapter):super().__init__(name=name,role="cached_agent",capabilities=["caching"])self.redis=redisasyncdefget_with_cache(self,key:str,fetch_fn):"""Get data with caching"""# Check cachecached=awaitself.redis.get(key)ifcached:returncached# Cache miss - fetch datadata=awaitfetch_fn()# Store in cache (TTL: 1 hour)awaitself.redis.set(key,data,ttl=3600)returndata
importlogginglogger=logging.getLogger(__name__)fromagenticaiframework.integrationsimport(IntegrationManager,IntegrationConfig,IntegrationStatus,integration_manager)# Add a ServiceNow integrationconfig=integration_manager.add_integration(name="production_snow",integration_type="servicenow",endpoint="https://yourinstance.service-now.com/api",auth_type="api_key",credentials={"api_key":"your-api-key"},settings={"timeout":30})# Connectintegration_manager.connect(config.integration_id)# Health checkhealth=integration_manager.health_check(config.integration_id)logger.info(f"Status: {health['status']}")# List all integrationsall_integrations=integration_manager.list_integrations()
importlogginglogger=logging.getLogger(__name__)fromagenticaiframework.integrationsimportGitHubIntegration,IntegrationConfigconfig=IntegrationConfig(integration_id="gh-001",name="GitHub Enterprise",integration_type="github",endpoint="https://api.github.com",auth_type="token",credentials={"token":"ghp_xxxxxxxxxxxx"},settings={},status=IntegrationStatus.PENDING,created_at=time.time())github=GitHubIntegration(config)github.connect()# Create an issueissue=github.create_issue(owner="myorg",repo="myrepo",title="AI Agent identified bug",body="Automated bug detection by AgenticAI Framework.",labels=["bug","ai-detected"],assignees=["developer1"])logger.info(f"Created issue: {issue['html_url']}")# Create a pull requestpr=github.create_pull_request(owner="myorg",repo="myrepo",title="AI-generated fix for #123",body="This PR was created by an AI agent.",head="feature/ai-fix",base="main")
fromagenticaiframework.integrationsimportAzureDevOpsIntegration,IntegrationConfigconfig=IntegrationConfig(integration_id="ado-001",name="Azure DevOps",integration_type="azure_devops",endpoint="https://dev.azure.com/yourorg",auth_type="pat",credentials={"pat":"your-personal-access-token"},settings={"project":"MyProject"},status=IntegrationStatus.PENDING,created_at=time.time())ado=AzureDevOpsIntegration(config)ado.connect()# Create a work itemwork_item=ado.create_work_item(work_item_type="Bug",title="AI-detected issue",description="Found during automated analysis.",assigned_to="developer@company.com")# Trigger a pipelinepipeline_run=ado.trigger_pipeline(pipeline_id=123,branch="main",parameters={"environment":"staging"})
fromagenticaiframework.integrationsimport(SnowflakeConnector,DatabricksConnector)# Snowflakesnowflake=SnowflakeConnector(account="your-account",user="username",password="password",warehouse="COMPUTE_WH",database="ANALYTICS",schema="PUBLIC")results=awaitsnowflake.execute_query("SELECT * FROM sales WHERE date > '2024-01-01'")# Databricksdatabricks=DatabricksConnector(workspace_url="https://your-workspace.cloud.databricks.com",token="dapi_xxxxx")job_run=awaitdatabricks.run_job(job_id=12345,parameters={"input_path":"/data/raw"})
fromagenticaiframework.integrationsimportWebhookManager,webhook_manager# Register a webhook endpointwebhook_manager.register_endpoint(name="github-events",path="/webhooks/github",secret="webhook-secret",handler=asyncdef(payload):# Process webhookreturn{"status":"processed"})# Send a webhookawaitwebhook_manager.send(url="https://api.slack.com/incoming-webhook",payload={"text":"Task completed by AI agent"},headers={"Authorization":"Bearer token"})