Skip to content

Commit

Permalink
Agents api - Add backend plumbing for agent manager (#30)
Browse files Browse the repository at this point in the history
* Add api changes for agent management (Admin)

* Rename a function and change return type

* Resolve review comments

* Throw bad requests when bad data received

* Update Create route to retrun 409 and 201

* Update doc string and return types

* Updat doc strings

---------

Co-authored-by: AJ (Ashitosh Jedhe) <ajedhe@microsoft.com>
  • Loading branch information
jedheaj314 and AJ (Ashitosh Jedhe) authored Jan 23, 2025
1 parent 99c391b commit edb3189
Show file tree
Hide file tree
Showing 10 changed files with 565 additions and 6 deletions.
1 change: 1 addition & 0 deletions app/api/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class Settings(BaseSettings):
cosmos_key: str = ""
database_name: str = "state"
issues_container: str = "issues"
agents_container: str = "agents"
feedback_container: str = "feedback"
storage_account_url: str = ""
storage_container_name: str = "documents"
Expand Down
114 changes: 114 additions & 0 deletions app/api/database/agents_repository.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
from common.logger import get_logger
from typing import Any, Dict, List
from common.models import Agent
from config.config import settings
from database.db_client import CosmosDBClient

logging = get_logger(__name__)

class AgentsRepository:
def __init__(self) -> None:
"""
Initialize the AgentsRepository.
Sets up a connection to the CosmosDB container using the database client.
"""
self.db_client = CosmosDBClient(settings.agents_container)

async def get_agent_by_id(self, agent_id: str) -> Agent:
"""
Retrieve an agent by its ID.
Args:
agent_id (str): The ID of the agent to retrieve.
Returns:
Agent: The agent corresponding to the provided ID, or None if not found.
"""
logging.info(f"Retrieving agent by ID: {agent_id}")
agent = await self.db_client.retrieve_item_by_id(agent_id, agent_id)
if not agent:
return None
logging.info(f"Retrieved agent by ID: {agent}")
return Agent(**agent)

async def get_agents_by_name_and_type(self, name: str, type: str) -> List[Agent]:
"""
Retrieve agents that match the specified name and type.
Args:
name (str): The name of the agents to retrieve.
type (str): The type of the agents to retrieve.
Returns:
List[Agent]: A list of agents matching the specified name and type.
"""
logging.info(f"Retrieving agents by name and type {name}, {type}")
agents = await self.db_client.retrieve_items_by_values({"name": name, "type": type})
logging.info(f"Retrieved agents by name and type: {agents}")
return [Agent(**agent) for agent in agents]

async def get_all_agents(self) -> List[Agent]:
"""
Retrieve all agents.
Returns:
List[Agent]: A list of all agents stored in the repository.
"""
logging.info("Retrieving all agents.")
agents = await self.db_client.retrieve_items()
logging.info(f"Retrieved {len(agents)} agents.")
return [Agent(**agent) for agent in agents]

async def create_agent(self, agent: Agent) -> Agent:
"""
Create and store a new agent.
Args:
agent (Agent): The agent object to be created.
Returns:
Agent: The newly created agent.
"""
logging.info("Creating an agent.")
new_agent_data = await self.db_client.store_item(agent.model_dump())
new_agent = Agent(**new_agent_data)
logging.info(f"Agent created: {new_agent.id}")
return new_agent

async def delete_agent(self, agent_id: str) -> None:
"""
Delete an agent by its ID.
Args:
agent_id (str): The ID of the agent to delete.
"""
logging.info(f"Deleting agent: {agent_id}")
await self.db_client.delete_item(agent_id)
logging.info(f"Deleted agent: {agent_id}")

async def update_agent(self, agent_id: str, fields: Dict[str, Any]) -> Agent:
"""
Update an existing agent information.
Args:
agent_id (str): The ID of the agent to update.
fields (Dict[str, Any]): A dictionary containing the fields to update and their new values.
Returns:
Agent: The updated agent object.
Raises:
ValueError: If the agent with the given ID is not found.
"""
logging.info(f"Updating agent {agent_id}")
agent = await self.db_client.retrieve_item_by_id(agent_id, agent_id)
if agent:
for field, value in fields.items():
agent[field] = value

updated_agent = await self.db_client.store_item(agent)
logging.info(f"Agent updated: {agent_id}")
return Agent(**updated_agent)
else:
raise ValueError(f"Agent {agent_id} not found.")
39 changes: 37 additions & 2 deletions app/api/database/db_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,37 @@ def __init__(self, container_name: str) -> None:
self.container = self.database.get_container_client(container_name)


async def store_item(self, item: Dict[str, any]) -> None:
async def store_item(self, item: Dict[str, any]) -> Dict[str, any]:
"""
Store an item in the Cosmos DB container.
:param item: A dictionary representing the item to store. Must contain an 'id' field.
"""
try:
self.container.upsert_item(body=item)
item = self.container.upsert_item(body=item)
logging.info("Item stored successfully.")
return item
except CosmosHttpResponseError as e:
logging.error(f"An error occurred while storing the item: {e}")
raise e

async def retrieve_items(self) -> Optional[List[Dict[str, Any]]]:
"""
Retrieve all items from the Cosmos DB container.
:return: A list of all items, or None if an error occurs.
"""
try:
items = self.container.query_items(
query="SELECT * FROM c",
enable_cross_partition_query=True,
)
items_list = list(items)
return items_list
except CosmosHttpResponseError as e:
logging.error(f"An error occurred while retrieving items: {e}")
raise e


async def retrieve_item_by_id(self, item_id: str, partition_key: str) -> Optional[Dict[str, Any]]:
"""
Expand Down Expand Up @@ -76,3 +94,20 @@ async def retrieve_items_by_values(self, filters: Dict[str, Any]) -> Optional[Li
except CosmosHttpResponseError as e:
logging.error(f"An error occurred while retrieving items: {e}")
return None


async def delete_item(self, item_id: str) -> None:
"""
Delete an item from the Cosmos DB container by its ID.
:param item_id: The ID of the item to delete.
"""
try:
self.container.delete_item(item=item_id, partition_key=item_id)
logging.info(f"Item with ID {item_id} deleted successfully.")
except CosmosHttpResponseError as e:
if e.status_code == 404:
logging.warning(f"Item with ID {item_id} not found.")
else:
logging.error(f"An error occurred while deleting the item: {e}")
raise e
3 changes: 2 additions & 1 deletion app/api/database/issues_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,5 @@ async def update_issue(self, doc_id: str, issue_id: str, fields: Dict[str, Any])
logging.info(f"Issue {issue_id} updated.")
return Issue(**issue)
else:
raise ValueError(f"Issue {issue_id} not found.")
raise ValueError(f"Issue not found.")

4 changes: 4 additions & 0 deletions app/api/dependencies.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
from database.agents_repository import AgentsRepository
from services.agents_service import AgentsService
from services.aml_client import AMLClient
from database.issues_repository import IssuesRepository
from services.issues_service import IssuesService
from azure.identity import DefaultAzureCredential
from azure.ai.ml import MLClient
from config.config import settings

def get_agents_service() -> AgentsService:
return AgentsService(AgentsRepository())

def get_issues_service() -> IssuesService:
return IssuesService(IssuesRepository(), get_aml_client())
Expand Down
4 changes: 2 additions & 2 deletions app/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from config.config import settings
from fastapi.staticfiles import StaticFiles
from middleware.logging import LoggingMiddleware, setup_logging
from routers import issues
from routers import issues, agents


# Set up logging configuration
Expand Down Expand Up @@ -35,7 +35,7 @@

# Include routers
app.include_router(issues.router)

app.include_router(agents.router)

# Health check endpoint
@app.get(
Expand Down
157 changes: 157 additions & 0 deletions app/api/routers/agents.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
from fastapi.responses import JSONResponse
from common.Exceptions import ConflictError, ResourceNotFoundError
from dependencies import get_agents_service
from common.logger import get_logger
from typing import List
from fastapi import APIRouter, Depends, HTTPException
from security.auth import validate_authenticated
from common.models import Agent, CreateAgent, UpdateAgent

router = APIRouter()
logging = get_logger(__name__)

@router.get(
"/api/v1/admin/agents",
summary="Get all agents",
responses={
200: {"description": "Agents Collection"},
401: {"description": "Unauthorized"},
500: {"description": "Internal server error"},
},
)
async def get_agents(
user=Depends(validate_authenticated),
agents_service=Depends(get_agents_service)
) -> List[Agent]:
"""
Retrieve all agents.
Args:
user (Depends): The authenticated user.
agents_service (Depends): The service to interact with agents.
Returns:
List[Agent]: A list of all agents.
"""
try:
agents = await agents_service.get_all_agents()
return agents
except Exception as e:
logging.error(f"Error retrieving agents: {str(e)}")
raise HTTPException(status_code=500, detail="Internal server error")


@router.post(
"/api/v1/admin/agents",
summary="Create an agent",
responses={
201: {"description": "Created Agent"},
401: {"description": "Unauthorized"},
500: {"description": "Internal server error"},
400: {"description": "Bad request"},
409: {"description": "Conflict"},
},
)
async def create_agent(
agent: CreateAgent,
user=Depends(validate_authenticated),
agents_service=Depends(get_agents_service)
) -> str:
"""
Create a new agent.
Args:
agent (CreateAgent): The agent data to create.
user (Depends): The authenticated user.
agents_service (Depends): The service to interact with agents.
Returns:
str: The JSON representation of the created agent.
"""
try:
created_agent = await agents_service.create_agent(agent, user)
return JSONResponse(status_code=201, content=created_agent.dict())
except ValueError as exc:
raise HTTPException(status_code=400, detail={"description": str(exc)})
except ConflictError as ex:
raise HTTPException(status_code=409, detail={"description": str(ex)})
except Exception as e:
logging.error(f"Error creating agent: {str(e)}")
raise HTTPException(status_code=500, detail="Internal server error")


@router.delete(
"/api/v1/admin/agents/{agent_id}",
summary="Delete an agent",
responses={
204: {"description": "Agent deleted successfully"},
401: {"description": "Unauthorized"},
500: {"description": "Internal server error"},
},
status_code=204, # Explicitly set the response code
)
async def delete_agent(
agent_id: str,
user=Depends(validate_authenticated),
agents_service=Depends(get_agents_service)
):
"""
Delete an agent by its ID.
Args:
agent_id (str): The ID of the agent to delete.
user (Depends): The authenticated user.
agents_service (Depends): The service to interact with agents.
Returns:
None
"""
try:
await agents_service.delete_agent(agent_id)
except Exception as e:
logging.error(f"Error deleting agent: {str(e)}")
raise HTTPException(status_code=500, detail="Internal server error")


@router.patch(
"/api/v1/admin/agents/{agent_id}",
summary="Update an agent",
responses={
200: {"description": "Updated Agent"},
401: {"description": "Unauthorized"},
500: {"description": "Internal server error"},
400: {"description": "Bad request"},
409: {"description": "Conflict"},
404: {"description": "Not found"},
},
)
async def update_agent(
agent_id: str,
agent: UpdateAgent,
user=Depends(validate_authenticated),
agents_service=Depends(get_agents_service)
):
"""
Update an existing agent's data.
Args:
agent_id (str): The ID of the agent to update.
agent (UpdateAgent): The updated data for the agent.
user (Depends): The authenticated user.
agents_service (Depends): The service to interact with agents.
Returns:
str: The JSON representation of the updated agent.
"""
try:
updated_agent = await agents_service.update_agent(agent_id, agent, user)
return JSONResponse(status_code=200, content=updated_agent.dict())
except ValueError as ex:
raise HTTPException(status_code=400, detail={"description": str(ex)})
except ConflictError as ex:
raise HTTPException(status_code=409, detail={"description": str(ex)})
except ResourceNotFoundError as ex:
raise HTTPException(status_code=404, detail={"description": str(ex)})
except Exception as e:
logging.error(f"Error updating agent: {str(e)}")
raise HTTPException(status_code=500, detail="Internal server error")
Loading

0 comments on commit edb3189

Please sign in to comment.