Skip to content

Commit

Permalink
Changed how StatelessServer handles event loops
Browse files Browse the repository at this point in the history
Co-authored-by: bright.hsu <bright.hsu@tdsc.com.tw>
  • Loading branch information
bright2227 and bright.hsu authored Sep 17, 2021
1 parent 7bc055c commit 4364f9b
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 8 deletions.
6 changes: 3 additions & 3 deletions asgiref/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import time
import traceback

from .compatibility import get_running_loop, guarantee_single_callable, run_future
from .compatibility import guarantee_single_callable

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -56,7 +56,7 @@ def run(self):
"""
Runs the asyncio event loop with our handler loop.
"""
event_loop = get_running_loop()
event_loop = asyncio.get_event_loop()
asyncio.ensure_future(self.application_checker())
try:
event_loop.run_until_complete(self.handle())
Expand Down Expand Up @@ -88,7 +88,7 @@ def get_or_create_application_instance(self, scope_id, scope):
input_queue = asyncio.Queue()
application_instance = guarantee_single_callable(self.application)
# Run it, and stash the future for later checking
future = run_future(
future = asyncio.ensure_future(
application_instance(
scope=scope,
receive=input_queue.get,
Expand Down
148 changes: 143 additions & 5 deletions tests/test_server.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,149 @@
import asyncio
import socket as sock
from functools import partial

import pytest

from asgiref.server import StatelessServer


def test_stateless_server():
"""StatelessServer can be instantiated with an ASGI 3 application."""
async def sock_recvfrom(sock, n):
while True:
try:
return sock.recvfrom(n)
except BlockingIOError:
await asyncio.sleep(0)


class Server(StatelessServer):
def __init__(self, application, max_applications=1000):
super().__init__(
application,
max_applications=max_applications,
)
self._sock = sock.socket(sock.AF_INET, sock.SOCK_DGRAM | sock.SOCK_NONBLOCK)
self._sock.bind(("127.0.0.1", 0))

@property
def address(self):
return self._sock.getsockname()

async def handle(self):
while True:
data, addr = await sock_recvfrom(self._sock, 4096)
data = data.decode("utf-8")

if data.startswith("Register"):
_, usr_name = data.split(" ")
input_quene = self.get_or_create_application_instance(usr_name, addr)
input_quene.put_nowait(b"Welcome")

elif data.startswith("To"):
_, usr_name, msg = data.split(" ", 2)
input_quene = self.get_or_create_application_instance(usr_name, addr)
input_quene.put_nowait(msg.encode("utf-8"))

async def application_send(self, scope, message):
self._sock.sendto(message, scope)

def close(self):
self._sock.close()
for details in self.application_instances.values():
details["future"].cancel()


class Client:
def __init__(self, name):
self._sock = sock.socket(sock.AF_INET, sock.SOCK_DGRAM | sock.SOCK_NONBLOCK)
self.name = name

async def register(self, server_addr, name=None):
name = name or self.name
self._sock.sendto(f"Register {name}".encode("utf-8"), server_addr)

async def send(self, server_addr, to, msg):
self._sock.sendto(f"To {to} {msg}".encode("utf-8"), server_addr)

async def get_msg(self):
msg, server_addr = await sock_recvfrom(self._sock, 4096)
return msg, server_addr

def close(self):
self._sock.close()


@pytest.fixture(scope="function")
def server():
async def app(scope, receive, send):
pass
while True:
msg = await receive()
await send(msg)

server = Server(app, 10)
yield server
server.close()


async def check_client_msg(client, expected_address, expected_msg):
msg, server_addr = await asyncio.wait_for(client.get_msg(), timeout=1.0)
assert msg == expected_msg
assert server_addr == expected_address


async def server_auto_close(fut, timeout):
"""Server run based on run_until_complete. It will block forever with handle
function because it is a while True loop without break. Use this method to close
server automatically."""
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(fut, loop=loop)
await asyncio.sleep(timeout)
task.cancel()


def test_stateless_server(server):
"""StatelessServer can be instantiated with an ASGI 3 application."""
"""Create a UDP Server can register instance based on name from message of client.
Clients can communicate to other client by name through server"""

loop = asyncio.get_event_loop()
server.handle = partial(server_auto_close, fut=server.handle(), timeout=1.0)

client1 = Client(name="client1")
client2 = Client(name="client2")

async def check_client1_behavior():
await client1.register(server.address)
await check_client_msg(client1, server.address, b"Welcome")
await client1.send(server.address, "client2", "Hello")

async def check_client2_behavior():
await client2.register(server.address)
await check_client_msg(client2, server.address, b"Welcome")
await check_client_msg(client2, server.address, b"Hello")

task1 = loop.create_task(check_client1_behavior())
task2 = loop.create_task(check_client2_behavior())

server.run()

assert task1.done()
assert task2.done()


def test_server_delete_instance(server):
"""The max_applications of Server is 10. After 20 times register, application number should be 10."""
loop = asyncio.get_event_loop()
server.handle = partial(server_auto_close, fut=server.handle(), timeout=1.0)

client1 = Client(name="client1")

async def client1_multiple_register():
for i in range(20):
await client1.register(server.address, name=f"client{i}")
print(f"client{i}")
await check_client_msg(client1, server.address, b"Welcome")

task = loop.create_task(client1_multiple_register())
server.run()

server = StatelessServer(app)
server.get_or_create_application_instance("scope_id", {})
assert task.done()

0 comments on commit 4364f9b

Please sign in to comment.