Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add performance test #12

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,15 @@ jobs:

- name: Check types
if: ${{ (matrix.python-version == '3.12') && (matrix.os == 'ubuntu-latest') }}
run: mypy src/anycorn/ tests/
run: |
python -m pip install git+https://github.com/agronholm/anyio.git@master
mypy src/anycorn/ tests/

- name: Run tests
run: pytest
run: |
python -m pip install git+https://github.com/agronholm/anyio.git@master
pytest -k "not test_performances"
pytest -k "test_performances" -s


h2spec:
Expand All @@ -70,7 +75,9 @@ jobs:
python -m pip install -U wheel
python -m pip install -U setuptools
python -m pip install -U pip
- run: python -m pip install trio .
- run: |
python -m pip install git+https://github.com/agronholm/anyio.git@master
python -m pip install trio .

- name: Run server
working-directory: compliance/h2spec
Expand Down Expand Up @@ -101,7 +108,9 @@ jobs:
python -m pip install -U wheel
python -m pip install -U setuptools
python -m pip install -U pip
- run: python -m pip install trio .
- run: |
python -m pip install git+https://github.com/agronholm/anyio.git@master
python -m pip install trio .
- name: Run server
working-directory: compliance/autobahn
run: nohup anycorn ws_server:app &
Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ test = [
"mock",
"trio",
"mypy",
"httpx",
"hypercorn",
]

[project.scripts]
Expand Down
5 changes: 3 additions & 2 deletions src/anycorn/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
import warnings
from typing import Awaitable, Callable, Literal

import anyio
from anyio import TASK_STATUS_IGNORED
from anyio.abc import TaskStatus

from .config import Config
from .run import worker_serve
Expand All @@ -19,7 +20,7 @@ async def serve(
config: Config,
*,
shutdown_trigger: Callable[..., Awaitable[None]] | None = None,
task_status: anyio.abc.TaskStatus[list[str]] = anyio.TASK_STATUS_IGNORED,
task_status: TaskStatus[list[str]] = TASK_STATUS_IGNORED,
mode: Literal["asgi", "wsgi"] | None = None,
) -> None:
"""Serve an ASGI framework app given the config.
Expand Down
12 changes: 6 additions & 6 deletions src/anycorn/lifespan.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import sys

import anyio
from anyio import TASK_STATUS_IGNORED, Event, create_memory_object_stream
from anyio.abc import TaskStatus

from .config import Config
from .typing import AppWrapper, ASGIReceiveEvent, ASGISendEvent, LifespanScope, LifespanState
Expand All @@ -20,17 +22,15 @@ class Lifespan:
def __init__(self, app: AppWrapper, config: Config, state: LifespanState) -> None:
self.app = app
self.config = config
self.startup = anyio.Event()
self.shutdown = anyio.Event()
self.app_send_channel, self.app_receive_channel = anyio.create_memory_object_stream[
self.startup = Event()
self.shutdown = Event()
self.app_send_channel, self.app_receive_channel = create_memory_object_stream[
ASGIReceiveEvent
](config.max_app_queue_size)
self.state = state
self.supported = True

async def handle_lifespan(
self, *, task_status: anyio.abc.TaskStatus[None] = anyio.TASK_STATUS_IGNORED
) -> None:
async def handle_lifespan(self, *, task_status: TaskStatus[None] = TASK_STATUS_IGNORED) -> None:
task_status.started()
scope: LifespanScope = {
"type": "lifespan",
Expand Down
5 changes: 3 additions & 2 deletions src/anycorn/middleware/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ async def _handle_lifespan(self, scope: Scope, receive: Callable, send: Callable
class DispatcherMiddleware(_DispatcherMiddleware):
async def _handle_lifespan(self, scope: Scope, receive: Callable, send: Callable) -> None:
import anyio
from anyio import create_memory_object_stream, create_task_group

self.app_queues: dict[
str,
Expand All @@ -44,13 +45,13 @@ async def _handle_lifespan(self, scope: Scope, receive: Callable, send: Callable
anyio.streams.memory.MemoryObjectReceiveStream,
],
] = {
path: anyio.create_memory_object_stream[ASGIReceiveEvent](MAX_QUEUE_SIZE)
path: create_memory_object_stream[ASGIReceiveEvent](MAX_QUEUE_SIZE)
for path in self.mounts
}
self.startup_complete = {path: False for path in self.mounts}
self.shutdown_complete = {path: False for path in self.mounts}

async with anyio.create_task_group() as tg:
async with create_task_group() as tg:
for path, app in self.mounts.items():
tg.start_soon(
app,
Expand Down
25 changes: 14 additions & 11 deletions src/anycorn/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
from typing import Any, Awaitable, Callable

import anyio
from anyio import TASK_STATUS_IGNORED, Event, create_task_group, current_time, sleep
from anyio._core._eventloop import get_async_backend
from anyio.abc import SocketListener, TaskStatus
from anyio.streams.tls import TLSListener

from .config import Config, Sockets
from .lifespan import Lifespan
Expand Down Expand Up @@ -161,7 +165,7 @@ async def worker_serve(
*,
sockets: Sockets | None = None,
shutdown_trigger: Callable[..., Awaitable[None]] | None = None,
task_status: anyio.abc.TaskStatus[list[str]] = anyio.TASK_STATUS_IGNORED,
task_status: TaskStatus[list[str]] = TASK_STATUS_IGNORED,
) -> None:
config.set_statsd_logger_class(StatsdLogger)

Expand All @@ -172,11 +176,11 @@ async def worker_serve(
max_requests = config.max_requests + randint(0, config.max_requests_jitter)
context = WorkerContext(max_requests)

async with anyio.create_task_group() as lifespan_tg:
async with create_task_group() as lifespan_tg:
await lifespan_tg.start(lifespan.handle_lifespan)
await lifespan.wait_for_startup()

async with anyio.create_task_group() as server_tg:
async with create_task_group() as server_tg:
if sockets is None:
sockets = config.create_sockets()
for sock in sockets.secure_sockets:
Expand All @@ -185,11 +189,11 @@ async def worker_serve(
sock.listen(config.backlog)

ssl_context = config.create_ssl_context()
listeners: list[anyio.abc.SocketListener | anyio.streams.tls.TLSListener] = []
listeners: list[SocketListener | TLSListener] = []
binds = []
asynclib = get_async_backend()
for secure_sock in sockets.secure_sockets:
asynclib = anyio._core._eventloop.get_async_backend()
secure_listener = anyio.streams.tls.TLSListener(
secure_listener = TLSListener(
asynclib.create_tcp_listener(secure_sock),
ssl_context,
True,
Expand All @@ -202,7 +206,6 @@ async def worker_serve(
await config.log.info("Running on %s (CTRL + C to quit)", url)

for insecure_sock in sockets.insecure_sockets:
asynclib = anyio._core._eventloop.get_async_backend()
insecure_listener = asynclib.create_tcp_listener(insecure_sock)
listeners.append(insecure_listener)
bind = repr_socket_addr(insecure_sock.family, insecure_sock.getsockname())
Expand All @@ -218,7 +221,7 @@ async def worker_serve(

task_status.started(binds)
try:
async with anyio.create_task_group() as tg:
async with create_task_group() as tg:
if shutdown_trigger is not None:
tg.start_soon(raise_shutdown, shutdown_trigger)
tg.start_soon(raise_shutdown, context.terminate.wait)
Expand All @@ -233,14 +236,14 @@ async def worker_serve(
),
)

await anyio.Event().wait()
await Event().wait()
except BaseExceptionGroup as error:
_, other_errors = error.split((ShutdownError, KeyboardInterrupt))
if other_errors is not None:
raise other_errors
finally:
await context.terminated.set()
server_tg.cancel_scope.deadline = anyio.current_time() + config.graceful_timeout
server_tg.cancel_scope.deadline = current_time() + config.graceful_timeout

await lifespan.wait_for_shutdown()
lifespan_tg.cancel_scope.cancel()
Expand All @@ -258,7 +261,7 @@ def anyio_worker(

shutdown_trigger = None
if shutdown_event is not None:
shutdown_trigger = partial(check_multiprocess_shutdown_event, shutdown_event, anyio.sleep)
shutdown_trigger = partial(check_multiprocess_shutdown_event, shutdown_event, sleep)

anyio.run(
partial(worker_serve, app, config, sockets=sockets, shutdown_trigger=shutdown_trigger),
Expand Down
7 changes: 4 additions & 3 deletions src/anycorn/statsd.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
import socket
from typing import TYPE_CHECKING, Any

import anyio
from anyio import create_connected_udp_socket
from anyio.abc import ConnectedUDPSocket

from .logging import Logger

Expand Down Expand Up @@ -100,7 +101,7 @@ async def _socket_send(self, message: bytes) -> None:


class StatsdLogger(BaseStatsdLogger):
socket: anyio.abc.ConnectedUDPSocket | None
socket: ConnectedUDPSocket | None

def __init__(self, config: Config) -> None:
super().__init__(config)
Expand All @@ -109,7 +110,7 @@ def __init__(self, config: Config) -> None:

async def _socket_send(self, message: bytes) -> None:
if self.socket is None:
self.socket = await anyio.create_connected_udp_socket(
self.socket = await create_connected_udp_socket(
self.address[0], int(self.address[1]), family=socket.AddressFamily.AF_INET
)
await self.socket.send(message)
9 changes: 5 additions & 4 deletions src/anycorn/task_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import Any, Awaitable, Callable

import anyio
from anyio import create_memory_object_stream, create_task_group, get_cancelled_exc_class

from .config import Config
from .typing import AppWrapper, ASGIReceiveCallable, ASGIReceiveEvent, ASGISendEvent, Scope
Expand All @@ -24,10 +25,10 @@ async def _handle(
) -> None:
try:
await app(scope, receive, send, sync_spawn, call_soon)
except anyio.get_cancelled_exc_class():
except get_cancelled_exc_class():
raise
except BaseExceptionGroup as error:
_, other_errors = error.split(anyio.get_cancelled_exc_class())
_, other_errors = error.split(get_cancelled_exc_class())
if other_errors is not None:
await config.log.exception("Error in ASGI Framework")
await send(None)
Expand All @@ -50,7 +51,7 @@ async def spawn_app(
scope: Scope,
send: Callable[[ASGISendEvent | None], Awaitable[None]],
) -> Callable[[ASGIReceiveEvent], Awaitable[None]]:
app_send_channel, app_receive_channel = anyio.create_memory_object_stream[ASGIReceiveEvent](
app_send_channel, app_receive_channel = create_memory_object_stream[ASGIReceiveEvent](
config.max_app_queue_size
)
self._task_group.start_soon(
Expand All @@ -69,7 +70,7 @@ def spawn(self, func: Callable, *args: Any) -> None:
self._task_group.start_soon(func, *args)

async def __aenter__(self) -> TaskGroup:
tg = anyio.create_task_group()
tg = create_task_group()
self._task_group = await tg.__aenter__()
return self

Expand Down
Loading
Loading