Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(backend): Fix .env file read contention on pyro connection setup #8736

Merged
merged 7 commits into from
Nov 25, 2024
3 changes: 2 additions & 1 deletion autogpt_platform/backend/backend/data/credit.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from backend.data.cost import BlockCost, BlockCostType
from backend.util.settings import Config

config = Config()


class UserCreditBase(ABC):
def __init__(self, num_user_credits_refill: int):
Expand Down Expand Up @@ -202,7 +204,6 @@ async def top_up_credits(self, *args, **kwargs):


def get_user_credit_model() -> UserCreditBase:
config = Config()
if config.enable_credit.lower() == "true":
return UserCredit(config.num_user_credits_refill)
else:
Expand Down
3 changes: 2 additions & 1 deletion autogpt_platform/backend/backend/executor/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

P = ParamSpec("P")
R = TypeVar("R")
config = Config()


class DatabaseManager(AppService):
Expand All @@ -38,7 +39,7 @@ def __init__(self):

@classmethod
def get_port(cls) -> int:
return Config().database_api_port
return config.database_api_port

@expose
def send_execution_update(self, execution_result: ExecutionResult):
Expand Down
3 changes: 2 additions & 1 deletion autogpt_platform/backend/backend/executor/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def _extract_schema_from_url(database_url) -> tuple[str, str]:


logger = logging.getLogger(__name__)
config = Config()


def log(msg, **kwargs):
Expand Down Expand Up @@ -96,7 +97,7 @@ class ExecutionScheduler(AppService):

@classmethod
def get_port(cls) -> int:
return Config().execution_scheduler_port
return config.execution_scheduler_port

@property
@thread_cached
Expand Down
5 changes: 3 additions & 2 deletions autogpt_platform/backend/backend/util/retry.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import os
import threading
from functools import wraps
from uuid import uuid4

Expand All @@ -16,7 +17,7 @@ def _log_prefix(resource_name: str, conn_id: str):
This needs to be called on the fly to get the current process ID & service name,
not the parent process ID & service name.
"""
return f"[PID-{os.getpid()}|{get_service_name()}|{resource_name}-{conn_id}]"
return f"[PID-{os.getpid()}|THREAD-{threading.get_native_id()}|{get_service_name()}|{resource_name}-{conn_id}]"


def conn_retry(resource_name: str, action_name: str, max_retry: int = 5):
Expand All @@ -25,7 +26,7 @@ def conn_retry(resource_name: str, action_name: str, max_retry: int = 5):
def on_retry(retry_state):
prefix = _log_prefix(resource_name, conn_id)
exception = retry_state.outcome.exception()
logger.info(f"{prefix} {action_name} failed: {exception}. Retrying now...")
logger.error(f"{prefix} {action_name} failed: {exception}. Retrying now...")

def decorator(func):
@wraps(func)
Expand Down
9 changes: 4 additions & 5 deletions autogpt_platform/backend/backend/util/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def get_port(cls) -> int:

@classmethod
def get_host(cls) -> str:
return os.environ.get(f"{cls.service_name.upper()}_HOST", Config().pyro_host)
return os.environ.get(f"{cls.service_name.upper()}_HOST", config.pyro_host)

def run_service(self) -> None:
while True:
Expand Down Expand Up @@ -170,14 +170,13 @@ def cleanup(self):

@conn_retry("Pyro", "Starting Pyro Service")
def __start_pyro(self):
conf = Config()
maximum_connection_thread_count = max(
Pyro5.config.THREADPOOL_SIZE,
conf.num_node_workers * conf.num_graph_workers,
config.num_node_workers * config.num_graph_workers,
)

Pyro5.config.THREADPOOL_SIZE = maximum_connection_thread_count # type: ignore
daemon = Pyro5.api.Daemon(host=conf.pyro_host, port=self.get_port())
daemon = Pyro5.api.Daemon(host=config.pyro_host, port=self.get_port())
self.uri = daemon.register(self, objectId=self.service_name)
logger.info(f"[{self.service_name}] Connected to Pyro; URI = {self.uri}")
daemon.requestLoop()
Expand Down Expand Up @@ -209,7 +208,7 @@ def get_service_client(service_type: Type[AS]) -> AS:
class DynamicClient(PyroClient):
@conn_retry("Pyro", f"Connecting to [{service_name}]")
def __init__(self):
host = os.environ.get(f"{service_name.upper()}_HOST", "localhost")
host = os.environ.get(f"{service_name.upper()}_HOST", pyro_host)
uri = f"PYRO:{service_type.service_name}@{host}:{service_type.get_port()}"
logger.debug(f"Connecting to service [{service_name}]. URI = {uri}")
self.proxy = Pyro5.api.Proxy(uri)
Expand Down
Loading