From ed5a25ff662dbc66d3b64563d862d4a4ac485b17 Mon Sep 17 00:00:00 2001 From: Zamil Majdy Date: Thu, 24 Oct 2024 23:39:52 +0700 Subject: [PATCH 1/2] fix(backend): Avoid long synchronous call to block FatAPI event-loop --- .../backend/backend/server/integrations/router.py | 12 ++++++------ autogpt_platform/backend/backend/server/rest_api.py | 13 +++++++++---- .../backend/backend/usecases/block_autogen.py | 2 +- .../backend/backend/usecases/reddit_marketing.py | 2 +- autogpt_platform/backend/backend/usecases/sample.py | 2 +- .../backend/test/executor/test_manager.py | 2 +- 6 files changed, 19 insertions(+), 14 deletions(-) diff --git a/autogpt_platform/backend/backend/server/integrations/router.py b/autogpt_platform/backend/backend/server/integrations/router.py index 5163de0b2fa3..1e3d01e0bfc0 100644 --- a/autogpt_platform/backend/backend/server/integrations/router.py +++ b/autogpt_platform/backend/backend/server/integrations/router.py @@ -29,7 +29,7 @@ class LoginResponse(BaseModel): @router.get("/{provider}/login") -async def login( +def login( provider: Annotated[str, Path(title="The provider to initiate an OAuth flow for")], user_id: Annotated[str, Depends(get_user_id)], request: Request, @@ -60,7 +60,7 @@ class CredentialsMetaResponse(BaseModel): @router.post("/{provider}/callback") -async def callback( +def callback( provider: Annotated[str, Path(title="The target provider for this OAuth exchange")], code: Annotated[str, Body(title="Authorization code acquired by user login")], state_token: Annotated[str, Body(title="Anti-CSRF nonce")], @@ -115,7 +115,7 @@ async def callback( @router.get("/{provider}/credentials") -async def list_credentials( +def list_credentials( provider: Annotated[str, Path(title="The provider to list credentials for")], user_id: Annotated[str, Depends(get_user_id)], ) -> list[CredentialsMetaResponse]: @@ -133,7 +133,7 @@ async def list_credentials( @router.get("/{provider}/credentials/{cred_id}") -async def get_credential( +def get_credential( provider: Annotated[str, Path(title="The provider to retrieve credentials for")], cred_id: Annotated[str, Path(title="The ID of the credentials to retrieve")], user_id: Annotated[str, Depends(get_user_id)], @@ -149,7 +149,7 @@ async def get_credential( @router.post("/{provider}/credentials", status_code=201) -async def create_api_key_credentials( +def create_api_key_credentials( user_id: Annotated[str, Depends(get_user_id)], provider: Annotated[str, Path(title="The provider to create credentials for")], api_key: Annotated[str, Body(title="The API key to store")], @@ -184,7 +184,7 @@ class CredentialsDeletionResponse(BaseModel): @router.delete("/{provider}/credentials/{cred_id}") -async def delete_credentials( +def delete_credentials( request: Request, provider: Annotated[str, Path(title="The provider to delete credentials for")], cred_id: Annotated[str, Path(title="The ID of the credentials to delete")], diff --git a/autogpt_platform/backend/backend/server/rest_api.py b/autogpt_platform/backend/backend/server/rest_api.py index f0d922c19686..c908ad9fb55a 100644 --- a/autogpt_platform/backend/backend/server/rest_api.py +++ b/autogpt_platform/backend/backend/server/rest_api.py @@ -1,3 +1,4 @@ +import asyncio import inspect import logging from collections import defaultdict @@ -516,7 +517,7 @@ async def set_graph_active_version( user_id=user_id, ) - async def execute_graph( + def execute_graph( self, graph_id: str, node_input: dict[Any, Any], @@ -539,7 +540,9 @@ async def stop_graph_run( 404, detail=f"Agent execution #{graph_exec_id} not found" ) - self.execution_manager_client.cancel_execution(graph_exec_id) + await asyncio.to_thread( + self.execution_manager_client.cancel_execution(graph_exec_id) + ) # Retrieve & return canceled graph execution in its final state return await execution_db.get_execution_results(graph_exec_id) @@ -616,8 +619,10 @@ async def create_schedule( raise HTTPException(status_code=404, detail=f"Graph #{graph_id} not found.") execution_scheduler = self.execution_scheduler_client return { - "id": execution_scheduler.add_execution_schedule( - graph_id, graph.version, cron, input_data, user_id=user_id + "id": await asyncio.to_thread( + execution_scheduler.add_execution_schedule( + graph_id, graph.version, cron, input_data, user_id=user_id + ) ) } diff --git a/autogpt_platform/backend/backend/usecases/block_autogen.py b/autogpt_platform/backend/backend/usecases/block_autogen.py index 55fdece6b463..9f5ae435286d 100644 --- a/autogpt_platform/backend/backend/usecases/block_autogen.py +++ b/autogpt_platform/backend/backend/usecases/block_autogen.py @@ -252,7 +252,7 @@ async def block_autogen_agent(): test_user = await create_test_user() test_graph = await create_graph(create_test_graph(), user_id=test_user.id) input_data = {"input": "Write me a block that writes a string into a file."} - response = await server.agent_server.execute_graph( + response = server.agent_server.execute_graph( test_graph.id, input_data, test_user.id ) print(response) diff --git a/autogpt_platform/backend/backend/usecases/reddit_marketing.py b/autogpt_platform/backend/backend/usecases/reddit_marketing.py index 54413b1eef1f..1d297d5bc9ac 100644 --- a/autogpt_platform/backend/backend/usecases/reddit_marketing.py +++ b/autogpt_platform/backend/backend/usecases/reddit_marketing.py @@ -156,7 +156,7 @@ async def reddit_marketing_agent(): test_user = await create_test_user() test_graph = await create_graph(create_test_graph(), user_id=test_user.id) input_data = {"subreddit": "AutoGPT"} - response = await server.agent_server.execute_graph( + response = server.agent_server.execute_graph( test_graph.id, input_data, test_user.id ) print(response) diff --git a/autogpt_platform/backend/backend/usecases/sample.py b/autogpt_platform/backend/backend/usecases/sample.py index 3b4087a5bd54..37fa7407de59 100644 --- a/autogpt_platform/backend/backend/usecases/sample.py +++ b/autogpt_platform/backend/backend/usecases/sample.py @@ -78,7 +78,7 @@ async def sample_agent(): test_user = await create_test_user() test_graph = await create_graph(create_test_graph(), test_user.id) input_data = {"input_1": "Hello", "input_2": "World"} - response = await server.agent_server.execute_graph( + response = server.agent_server.execute_graph( test_graph.id, input_data, test_user.id ) print(response) diff --git a/autogpt_platform/backend/test/executor/test_manager.py b/autogpt_platform/backend/test/executor/test_manager.py index cabb5cad0491..5f5c3c358351 100644 --- a/autogpt_platform/backend/test/executor/test_manager.py +++ b/autogpt_platform/backend/test/executor/test_manager.py @@ -22,7 +22,7 @@ async def execute_graph( num_execs: int = 4, ) -> str: # --- Test adding new executions --- # - response = await agent_server.execute_graph(test_graph.id, input_data, test_user.id) + response = agent_server.execute_graph(test_graph.id, input_data, test_user.id) graph_exec_id = response["id"] # Execution queue should be empty From cca81c450a4bfad572012c7745d675f9118ee39d Mon Sep 17 00:00:00 2001 From: Zamil Majdy Date: Fri, 25 Oct 2024 17:05:59 +0700 Subject: [PATCH 2/2] Fix type hinting --- .../store.py | 7 ++++--- .../autogpt_libs/autogpt_libs/utils/cache.py | 9 +-------- .../backend/backend/executor/scheduler.py | 5 +++-- .../backend/backend/server/rest_api.py | 20 ++++++++++++------- 4 files changed, 21 insertions(+), 20 deletions(-) diff --git a/autogpt_platform/autogpt_libs/autogpt_libs/supabase_integration_credentials_store/store.py b/autogpt_platform/autogpt_libs/autogpt_libs/supabase_integration_credentials_store/store.py index 787683623313..6a4bb354fc87 100644 --- a/autogpt_platform/autogpt_libs/autogpt_libs/supabase_integration_credentials_store/store.py +++ b/autogpt_platform/autogpt_libs/autogpt_libs/supabase_integration_credentials_store/store.py @@ -6,7 +6,7 @@ from redis import Redis from backend.executor.database import DatabaseManager -from autogpt_libs.utils.cache import thread_cached_property +from autogpt_libs.utils.cache import thread_cached from autogpt_libs.utils.synchronize import RedisKeyedMutex from .types import ( @@ -21,8 +21,9 @@ class SupabaseIntegrationCredentialsStore: def __init__(self, redis: "Redis"): self.locks = RedisKeyedMutex(redis) - - @thread_cached_property + + @property + @thread_cached def db_manager(self) -> "DatabaseManager": from backend.executor.database import DatabaseManager from backend.util.service import get_service_client diff --git a/autogpt_platform/autogpt_libs/autogpt_libs/utils/cache.py b/autogpt_platform/autogpt_libs/autogpt_libs/utils/cache.py index b4506dda47b8..9c69da9411e2 100644 --- a/autogpt_platform/autogpt_libs/autogpt_libs/utils/cache.py +++ b/autogpt_platform/autogpt_libs/autogpt_libs/utils/cache.py @@ -1,8 +1,6 @@ +from typing import Callable, TypeVar, ParamSpec import threading -from functools import wraps -from typing import Callable, ParamSpec, TypeVar -T = TypeVar("T") P = ParamSpec("P") R = TypeVar("R") @@ -10,7 +8,6 @@ def thread_cached(func: Callable[P, R]) -> Callable[P, R]: thread_local = threading.local() - @wraps(func) def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: cache = getattr(thread_local, "cache", None) if cache is None: @@ -21,7 +18,3 @@ def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: return cache[key] return wrapper - - -def thread_cached_property(func: Callable[[T], R]) -> property: - return property(thread_cached(func)) diff --git a/autogpt_platform/backend/backend/executor/scheduler.py b/autogpt_platform/backend/backend/executor/scheduler.py index 979631c0585e..5080e16031bf 100644 --- a/autogpt_platform/backend/backend/executor/scheduler.py +++ b/autogpt_platform/backend/backend/executor/scheduler.py @@ -4,7 +4,7 @@ from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.cron import CronTrigger -from autogpt_libs.utils.cache import thread_cached_property +from autogpt_libs.utils.cache import thread_cached from backend.data.block import BlockInput from backend.data.schedule import ( @@ -37,7 +37,8 @@ def __init__(self, refresh_interval=10): def get_port(cls) -> int: return Config().execution_scheduler_port - @thread_cached_property + @property + @thread_cached def execution_client(self) -> ExecutionManager: return get_service_client(ExecutionManager) diff --git a/autogpt_platform/backend/backend/server/rest_api.py b/autogpt_platform/backend/backend/server/rest_api.py index c908ad9fb55a..8c3ed3dcba14 100644 --- a/autogpt_platform/backend/backend/server/rest_api.py +++ b/autogpt_platform/backend/backend/server/rest_api.py @@ -8,7 +8,7 @@ import uvicorn from autogpt_libs.auth.middleware import auth_middleware -from autogpt_libs.utils.cache import thread_cached_property +from autogpt_libs.utils.cache import thread_cached from fastapi import APIRouter, Body, Depends, FastAPI, HTTPException, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse @@ -308,11 +308,13 @@ async def wrapper(*args, **kwargs): return wrapper - @thread_cached_property + @property + @thread_cached def execution_manager_client(self) -> ExecutionManager: return get_service_client(ExecutionManager) - @thread_cached_property + @property + @thread_cached def execution_scheduler_client(self) -> ExecutionScheduler: return get_service_client(ExecutionScheduler) @@ -541,7 +543,7 @@ async def stop_graph_run( ) await asyncio.to_thread( - self.execution_manager_client.cancel_execution(graph_exec_id) + lambda: self.execution_manager_client.cancel_execution(graph_exec_id) ) # Retrieve & return canceled graph execution in its final state @@ -617,11 +619,15 @@ async def create_schedule( graph = await graph_db.get_graph(graph_id, user_id=user_id) if not graph: raise HTTPException(status_code=404, detail=f"Graph #{graph_id} not found.") - execution_scheduler = self.execution_scheduler_client + return { "id": await asyncio.to_thread( - execution_scheduler.add_execution_schedule( - graph_id, graph.version, cron, input_data, user_id=user_id + lambda: self.execution_scheduler_client.add_execution_schedule( + graph_id=graph_id, + graph_version=graph.version, + cron=cron, + input_data=input_data, + user_id=user_id, ) ) }