Skip to content

Commit

Permalink
dev(auth): remove strange features and make grpc async
Browse files Browse the repository at this point in the history
  • Loading branch information
kitanoyoru committed Oct 6, 2023
1 parent 97d9d53 commit c20aac7
Show file tree
Hide file tree
Showing 12 changed files with 40 additions and 120 deletions.
13 changes: 0 additions & 13 deletions .editorconfig

This file was deleted.

File renamed without changes.
10 changes: 9 additions & 1 deletion apps/auth/src/cli/commands/start_server_command.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import logging

import asyncio

import hydra

from src.config import AppConfig
Expand All @@ -18,4 +20,10 @@
)
def start_server_command(config: AppConfig):
s = Server(config)
s.serve()
loop = asyncio.get_event_loop()

try:
loop.run_until_complete(s.serve())
finally:
loop.run_until_complete(*s.cleanup_coroutines)
loop.close()
102 changes: 9 additions & 93 deletions apps/auth/src/config.py
Original file line number Diff line number Diff line change
@@ -1,120 +1,36 @@
from typing import Any, Callable, Iterable, Optional, Union
from hydra.core.config_store import ConfigStore

from dataclasses import dataclass, field, fields


class _DefaultType:
def __repr__(self) -> str:
return "<default>"


_DEFAULT = _DefaultType()

_ValidatorType = Callable[[str, Any], None]

_ConfigurationType = TypeVar("_ConfigurationType")

_GRPC_WORKER_MIN = 4
_GRPC_WORKER_MAX = 36


def _optional(validator: _ValidatorType) -> _ValidatorType:
def proc(name: str, value: Any) -> None:
if value is not None:
validator(name, value)

return proc


def _chain(*validators: _ValidatorType) -> _ValidatorType:
def proc(name: str, value: Any) -> None:
for validator in validators:
validator(name, value)

return proc

from dataclasses import dataclass

def _of_type(*types: type) -> _ValidatorType:
def proc(name: str, value: Any) -> None:
if not isinstance(value, types):
types_repr = " or ".join(str(t) for t in types)
raise TypeError(f'"{name}" should be of type {types_repr}')

return proc


def _positive(name: str, value: Union[float, int]) -> None:
if value <= 0:
raise ValueError(f'"{name}" should be positive')


def _non_negative(name: str, value: Union[float, int]) -> None:
if value < 0:
raise ValueError(f'"{name}" should not be negative')


def _range(min_: int, max_: int) -> _ValidatorType:
def proc(name: str, value: Union[float, int]) -> None:
if value < min_:
raise ValueError(f'"{name}" should be higher or equal to {min_}')
if value > max_:
raise ValueError(f'"{name}" should be less or equal to {max_}')

return proc

def _validate(config: 'AppConfig') -> None:
for f in fields(config):
validate_fn = f.metadata.get('validate')
if validate_fn is not None:
value = getattr(config, f.name)
if value is not _DEFAULT:
validate_fn(f.name, value)
from hydra.core.config_store import ConfigStore


@dataclass(slots=True)
@dataclass(slots=True, frozen=True)
class DatabaseConfig:
url: str
username: str
password: str


@dataclass(slots=True)
@dataclass(slots=True, frozen=True)
class CacheConfig:
url: str
username: str
password: str


@dataclass(slots=True)
@dataclass(slots=True, frozen=True)
class MessageBrokerConfig:
brokers_url: list[str]


@dataclass(slots=True)
@dataclass(slots=True, frozen=True)
class AppConfig:
port: int
max_grpc_workers: int

db: DatabaseConfig
cache: CacheConfig
broker: MessageBrokerConfig

port: Optional[int] = field(
default=16602,
metadata={
"validate": _chain(_of_type(int), _positive),
}
)

max_grpc_workers: Optional[int] = field(
default=12
metadata= ={
"validate": _chain(_optional, _of_type(int, _positive, _range(_GRPC_WORKER_MIN, _GRPC_WORKER_MAX))),
}
)

def __post_init__(self):
_validate(self)



cs = ConfigStore.instance()

Expand Down
Empty file.
Empty file added apps/auth/src/models/event.py
Empty file.
32 changes: 21 additions & 11 deletions apps/auth/src/server.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import abc

import signal
import logging

from abc import ABC, abstractmethod
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Coroutine

import grpc

from omegaconf import OmegaConf

from grpclib.server import Server as GRPCServer

from src.config import AppConfig
from src.services import AuthService, HealthService
from src.constants import Constants
Expand All @@ -20,22 +20,24 @@
logger = logging.getLogger(__name__)


class IServer(ABC):
@abstractmethod
def serve(self):
class IServer(abc.ABC):
@abc.abstractmethod
async def serve(self):
pass

@abstractmethod
@abc.abstractmethod
def _register_custom_signal_handlers(self):
pass


class Server(IServer):
cleanup_coroutines: list[Coroutine[Any, Any, None]] = []

def __init__(self, config: AppConfig):
self._config = config

def serve(self):
grpc_server = grpc.server(
async def serve(self):
grpc_server = grpc.aio.server(
ThreadPoolExecutor(max_workers=Constants.MAX_GRPC_WORKERS),
options=[
("grpc.max_send_message_length", Constants.MAX_GRPC_SEND_MESSAGE_LENGTH),
Expand All @@ -50,8 +52,16 @@ def serve(self):

grpc_server.add_insecure_port("[::]:" + str(self._config.port))

logger.info(f"gRPC is running on port {self._config.port}")
grpc_server.wait_for_termination()
logger.info(f"gRPC server is running on port {self._config.port}")
await grpc_server.start()

async def server_graceful_shutdown():
logger.info("gRPC server is shutting down")
await grpc_server.stop(5)

self.cleanup_coroutines.append(server_graceful_shutdown())

await grpc_server.wait_for_termination()

def _register_custom_signal_handlers(self):
def sighup_handler():
Expand Down
1 change: 1 addition & 0 deletions apps/auth/src/services/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .health_service import HealthService
2 changes: 0 additions & 2 deletions apps/auth/src/services/health_service.py

This file was deleted.

Empty file.
File renamed without changes.
Empty file.

0 comments on commit c20aac7

Please sign in to comment.