Skip to content

Commit

Permalink
Merge pull request #42 from melvinkcx/features/local_handler_dependen…
Browse files Browse the repository at this point in the history
…cies

FastAPI Dependency Support For Local Handler
  • Loading branch information
melvinkcx authored Apr 6, 2023
2 parents 91b298f + cc72eb8 commit 83016a0
Show file tree
Hide file tree
Showing 3 changed files with 219 additions and 1 deletion.
149 changes: 148 additions & 1 deletion fastapi_events/handlers/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,155 @@
import fnmatch
import functools
import inspect
import sys
from typing import Any, Callable, Dict, ForwardRef, List, Optional, Tuple, cast

# TODO Try to completely eliminate the need of using dependent libs
from fastapi import params # FIXME
from pydantic.error_wrappers import ErrorWrapper

from fastapi_events.handlers.base import BaseEventHandler
from fastapi_events.otel.utils import create_span_for_handle_fn
from fastapi_events.typing import Event


def evaluate_forwardref(type_: ForwardRef, globalns: Any, localns: Any) -> Any:
"""
Adopted from pydantic source code
"""
if sys.version_info < (3, 9):
return type_._evaluate(globalns, localns)
else:
# Even though it is the right signature for python 3.9, mypy complains with
# `error: Too many arguments for "_evaluate" of "ForwardRef"` hence the cast...
return cast(Any, type_)._evaluate(globalns, localns, set())


def get_typed_annotation(
annotation: Any,
globalns: Dict[str, Any]
) -> Any:
"""
Adopted from fastapi source code
"""
if isinstance(annotation, str):
annotation = ForwardRef(annotation)
annotation = evaluate_forwardref(annotation, globalns, globalns)
return annotation


def get_typed_signature(
call: Callable[..., Any]
) -> inspect.Signature:
"""
Adopted from fastapi source code
"""
signature = inspect.signature(call)
globalns = getattr(call, "__globals__", {})
typed_params = [
inspect.Parameter(
name=param.name,
kind=param.kind,
default=param.default,
annotation=get_typed_annotation(param.annotation, globalns),
)
for param in signature.parameters.values()
]
typed_signature = inspect.Signature(typed_params)
return typed_signature


class Dependant:
def __init__(
self,
call: Callable[..., Any],
name: Optional[str],
dependencies: Optional[List["Dependant"]] = None,
):
self.call = call
self.name = name
self.dependencies = dependencies or []


def get_param_sub_dependant(
*,
param: inspect.Parameter,
name: str,
) -> Dependant:
depends: params.Depends = param.default
if depends.dependency:
dependency = depends.dependency
else:
dependency = param.annotation

return get_dependant(
name=name,
call=dependency,
)


def get_dependant(
*,
call: Callable[..., Any],
name: Optional[str] = None,
) -> Dependant:
handler_signature = get_typed_signature(call)
signature_params = handler_signature.parameters

dependant = Dependant(
call=call,
name=name,
)

for param_name, param in signature_params.items():
if isinstance(param.default, params.Depends): # FIXME create a Protocol for params.Depends?
sub_dependant = get_param_sub_dependant(
param=param,
name=param_name,
)
dependant.dependencies.append(sub_dependant)
continue

return dependant


async def solve_dependencies(
*,
event: Event,
dependant: Dependant,
) -> Tuple[
Dict[str, Any],
List[ErrorWrapper]
]:
values: Dict[str, Any] = {}
errors: List[ErrorWrapper] = []

for sub_dependant in dependant.dependencies:
use_sub_dependant = sub_dependant
call = sub_dependant.call

sub_values, sub_errors = await solve_dependencies(
event=event,
dependant=use_sub_dependant,
)
if sub_errors:
errors.extend(sub_errors)
continue

# TODO support dependencies with `yield`

elif asyncio.iscoroutinefunction(call):
solved = await call(**sub_values)
else:
loop = asyncio.get_event_loop()
solved = await loop.run_in_executor(None, functools.partial(call, event, **sub_values))

if sub_dependant.name is not None:
values[sub_dependant.name] = solved

return values, errors


class LocalHandler(BaseEventHandler):
def __init__(self):
self._registry = {}
Expand All @@ -31,8 +174,12 @@ async def handle(self, event: Event) -> None:
payload=payload,
):
for handler in self._get_handlers_for_event(event_name=event_name):
# #41 resolve dependencies
dependant = get_dependant(call=handler)
values, errors = await solve_dependencies(event=event, dependant=dependant)

if inspect.iscoroutinefunction(handler):
await handler(event)
await handler(event, **values)
else:
# Making sure sync function will never block the event loop
loop = asyncio.get_event_loop()
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ def get_long_description():
"starlette>=0.21.0",
"starlite>=1.38.0",
"httpx>=0.23.0",
"fastapi>=0.92.0", # FIXME
],
"aws": ["boto3>=1.14"],
"google": ["google-cloud-pubsub>=2.13.6"],
Expand Down
70 changes: 70 additions & 0 deletions tests/handlers/test_local_handler.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from enum import Enum
from typing import Callable, Tuple
from unittest.mock import MagicMock

import pytest
from fastapi import Depends
from starlette.applications import Starlette
from starlette.middleware import Middleware
from starlette.requests import Request
Expand Down Expand Up @@ -172,3 +174,71 @@ async def handle_events(event: Event):
spans_created = otel_test_manager.get_finished_spans()
assert spans_created[-1].name == "handling event TEST_EVENT with LocalHandler"
assert spans_created[-1].attributes[SpanAttributes.HANDLER] == "fastapi_events.handlers.local.LocalHandler"


def test_local_handler_with_fastapi_dependencies(
setup_test
):
"""
to verify the support of FastAPI dependencies
Relevant Github issue: #41
"""
app, handler = setup_test()

_mock_db = MagicMock()
_mock_service_client = MagicMock()

async def get_db():
return _mock_db

async def get_service_client():
return _mock_service_client

@handler.register(event_name="TEST_EVENT")
async def handle_event_with_dependency(
event: Event,
db=Depends(get_db),
service_client=Depends(get_service_client)
):
assert db == _mock_db
assert service_client == _mock_service_client

client = TestClient(app)
client.get("/events?event=TEST_EVENT")


def test_local_handler_with_nested_dependencies(
setup_test
):
"""
to verify the support of nested FastAPI dependencies
Relevant Github issue: #41
"""
app, handler = setup_test()

_mock_service_client = MagicMock()
_mock_db = MagicMock()
_mock_connection_pool = MagicMock()

async def get_connection_pool():
return _mock_connection_pool

async def get_db(
connection_pool=Depends(get_connection_pool)
):
return _mock_db, connection_pool

async def get_service_client():
return _mock_service_client

@handler.register(event_name="TEST_EVENT")
async def handle_event_with_dependency(
event: Event,
db=Depends(get_db),
service_client=Depends(get_service_client)
):
assert db == (_mock_db, _mock_connection_pool)
assert service_client == _mock_service_client

client = TestClient(app)
client.get("/events?event=TEST_EVENT")

0 comments on commit 83016a0

Please sign in to comment.