diff --git a/noxfile.py b/noxfile.py index 367ce0a..a478087 100644 --- a/noxfile.py +++ b/noxfile.py @@ -18,44 +18,77 @@ def test(session: nox_poetry.Session): ) -@nox_poetry.session(python="3.8") +@nox_poetry.session(python=["3.9"]) +def devtest(session: nox_poetry.Session): + session.run_always("poetry", "install", "-E", "all", external=True) + session.run( + "pytest", + "-vv", + "--cov=sending", + "tests/test_inmemory_backend.py", + env={"SENDING__ENABLE_LOGGING": "True"}, + ) + session.run( + "pytest", + "-vv", + "--cov=sending", + "tests/test_jupyter_backend.py", + env={"SENDING__ENABLE_LOGGING": "True"}, + ) + session.run( + "pytest", + "-vv", + "--cov=sending", + "tests/test_redis_backend.py", + env={"SENDING__ENABLE_LOGGING": "True"}, + ) + session.run( + "pytest", + "-vv", + "--cov=sending", + "tests/test_websocket_backend.py", + env={"SENDING__ENABLE_LOGGING": "True"}, + ) + + +@nox_poetry.session(python="3.9") def lint(session: nox_poetry.Session): session.notify("black_check") session.notify("flake8") session.notify("isort_check") -@nox_poetry.session(python="3.8") +@nox_poetry.session(python="3.9") def flake8(session: nox_poetry.Session): session.install("flake8") session.run("flake8", *LINT_PATHS, "--count", "--show-source", "--statistics", "--benchmark") -@nox_poetry.session(python="3.8") +@nox_poetry.session(python="3.9") def black_check(session: nox_poetry.Session): session.install("black") session.run("black", "--check", *LINT_PATHS) -@nox_poetry.session(python="3.8") +@nox_poetry.session(python="3.9") def isort_check(session: nox_poetry.Session): session.install("isort") session.run("isort", "--diff", "--check", *LINT_PATHS) -@nox_poetry.session(python="3.8") +@nox_poetry.session(python="3.9") def blacken(session: nox_poetry.Session): session.install("black") session.run("black", *LINT_PATHS) -@nox_poetry.session(python="3.8") +@nox_poetry.session(python="3.9") def isort_apply(session: nox_poetry.Session): session.install("isort") session.run("isort", *LINT_PATHS) -@nox_poetry.session(python="3.8") +@nox_poetry.session(python="3.9") def generate_coverage_xml(session: nox_poetry.Session): session.install("coverage[toml]") session.run("coverage", "xml") diff --git a/poetry.lock b/poetry.lock index ecb8d51..313cd0c 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1100,8 +1100,8 @@ websockets = ["websockets"] [metadata] lock-version = "1.1" -python-versions = "^3.9" -content-hash = "287f273fc5f84cc6012a5c1a8388ae8c367895ad9040d8d3fd0e9c51d40a1e7f" +python-versions = "^3.8" +content-hash = "c9fb473a4e4aa657b5d69864cc51a1de89a4cb1fa73f89b212c9aacd7e953763" [metadata.files] aioredis = [ diff --git a/sending/backends/memory.py b/sending/backends/memory.py index b4cfef8..835d203 100644 --- a/sending/backends/memory.py +++ b/sending/backends/memory.py @@ -1,19 +1,24 @@ +"""A Publish Subscribe Manager which operates in memory.""" import asyncio from ..base import AbstractPubSubManager, QueuedMessage class InMemoryPubSubManager(AbstractPubSubManager): + """A Publish Subscribe Manager which operates in memory.""" + def __init__(self): super().__init__() self.message_queue: asyncio.Queue[QueuedMessage] = None async def initialize(self, *args, **kwargs): + """Intialize the message queue and its size.""" queue_size = kwargs.get("queue_size", 0) self.message_queue = asyncio.Queue(queue_size) return await super().initialize(*args, **kwargs) async def shutdown(self, now=False): + """Shutdown the manager with a default graceful cleanup (now=False).""" await super().shutdown(now=now) self.message_queue = None diff --git a/sending/backends/redis.py b/sending/backends/redis.py index 7d2d074..71769a6 100644 --- a/sending/backends/redis.py +++ b/sending/backends/redis.py @@ -1,19 +1,24 @@ +"""A Publish Subscribe Manager which operates using Redis.""" import aioredis from ..base import AbstractPubSubManager, QueuedMessage class RedisPubSubManager(AbstractPubSubManager): + """A Publish Subscribe Manager which operates using Redis.""" + def __init__(self, dsn: str): super().__init__() self._dsn = dsn async def initialize(self, *args, **kwargs): + """Initialize the redis db and the pubsub manager.""" self._redis = aioredis.from_url(self._dsn) self._redis_pubsub = self._redis.pubsub(ignore_subscribe_messages=True) return await super().initialize(*args, **kwargs) async def shutdown(self, now=False): + """Shutdown the pubsub manager and redis connection.""" await super().shutdown(now=now) await self._redis.close() self._redis = None diff --git a/sending/backends/websocket.py b/sending/backends/websocket.py index c3684ec..b655f1e 100644 --- a/sending/backends/websocket.py +++ b/sending/backends/websocket.py @@ -1,3 +1,4 @@ +"""Publish Subscribe Manager using Websockets.""" import asyncio import logging from typing import Any, Callable, Optional @@ -11,99 +12,105 @@ class WebsocketManager(AbstractPubSubManager): + """Websocket-based Sending backend. + + This class handles creating the initial websocket connection and + reconnecting on server-side disconnect. + + Unlike other backends, this Backend ignores the concept of `topic`. + All messages seen over the wire are dropped onto the inbound queue with + topic '', and .send() is overwritten to only take a message, defaulting + outbound topic to ''. + + Real-world applications are likely to subclass this and add hooks + either as class methods or attach them to an instance after init. + + - inbound_message_hook: deserialize incoming messages over websocket + - outbound_message_hook: serialize outgoing messages over websocket + - init_hook: called after websocket connection is established + - auth_hook: called just before init_hook, useful if you need to send + some kind of auth request. This effects .send(), make sure + you also register a callback to .on_auth that is called + when you receive an auth response. + """ + def __init__(self, ws_url: str): - """ - Websocket-based Sending backend. This class handles creating the initial - websocket connection and reconnecting on server-side disconnect. - - One key difference between this Backend and others is that it ignores - the concept of `topic`. All messages seen over the wire are dropped - onto the inbound queue with topic '', and .send() is overwritten - to only take a message, defaulting outbound topic to ''. - - Real-world applications are likely to subclass this and add hooks - either as class methods or attach them to an instance after init. - - - inbound_message_hook: deserialize incoming messages over websocket - - outbound_message_hook: serialize outgoing messages over websocket - - init_hook: called after websocket connection is established - - auth_hook: called just before init_hook, useful if you need to send - some kind of auth request. This effects .send(), make sure - you also register a callback to .on_auth that is called - when you receive an auth response. - """ super().__init__() self.ws_url = ws_url - # An unauth_ws and authed_ws pair of Futures are created so that - # sub-classes can easily implement a pattern where messages are only + # This class offers a pair of Futures (`unauth_ws` and `authed_ws`). + # Sub-classes can implement a pattern where messages are only # sent to the server after the session has been authenticated. self.unauth_ws = asyncio.Future() self.authed_ws = asyncio.Future() - # Can use await mgr.connected.wait() to block until the websocket is connected - # in tests or in situations where you want connect_hook / context_hook to have + + # Use `await mgr.connected.wait()` to block until the websocket is connected + # In tests or in situations where you want `connect_hook` / `context_hook` to have # information available to it from the websocket response (e.g. RTU session id) self.connected = asyncio.Event() # When an outbound worker is ready to send a message over the wire, it - # calls ._publish which will await the unauth_ws or authed_ws Future. - # However, if something goes wrong and those Futures never resolve then - # it can leave you in a hard-to-debug state. So ._publish should use an - # asyncio.wait_for(await , timeout=self.publish_timeout) + # calls `._publish` which will await the unauth_ws or authed_ws Future. + # If something goes wrong and the Futures never resolve, you will be + # left in a hard-to-debug state. Use a timeout to avoid this. + # `._publish` should use `asyncio.wait_for(await , timeout=self.publish_timeout)` self.publish_timeout: float = 5.0 # Used to prevent automatic websocket reconnect when we're trying to shutdown + # TODO: True does xxx; False does yyyy self._shutting_down = False - # For debug and testing, .next_event gets set/cleared after every received message + # For debug and testing, `.next_event` gets set/cleared after every received message self.next_event = asyncio.Event() self.last_seen_message = None - self.reconnections = 0 - # If this is set, then the WebsocketManager will stop reconnecting after this - # many reconnections. + self.reconnections = 0 # reconnection attempt count + # If max_reconnections is set, then the WebsocketManager will stop + # trying to reconnect after the number of tries set. self.max_reconnections = None - # Optional hooks that can be defined in a subclass or attached to an instance. - # - connect_hook is called first when websocket is established, useful to - # set contextvars or store state before init / auth hooks are called - # - # - auth_hook is called next, and also effects how .send() works. - # If auth_hook is defined then .send() won't actually transmit data over the wire - # until on_auth callback has been triggered. - # You want to define an auth_hook if the websocket server expects a first message - # to be some kind of authentication - # - # - init_hook is called next after auth_hook, useful to kick off messages after - # auth_hook, or if authentication is not part of the websocket server flow. - # - # - disconnect_hook is called when the websocket connection is lost - # + # **Optional hooks** may be defined in a subclass or attached to an instance. + # `connect_hook`` is called first when websocket is established and used to + # set contextvars or store state before init / auth hooks are called + if not hasattr(self, "connect_hook"): + self.connect_hook: Optional[Callable] = None + # `auth_hook`` is called next, and also effects how .send() works. + # If auth_hook is defined then .send() won't actually transmit data over the wire + # until on_auth callback has been triggered. + # You want to define an auth_hook if the websocket server expects a first message + # to be some kind of authentication if not hasattr(self, "auth_hook"): self.auth_hook: Optional[Callable] = None + # `init_hook`` is called next after auth_hook, useful to kick off + # messages after auth_hook, or if authentication is not part of the + # websocket server flow. if not hasattr(self, "init_hook"): self.init_hook: Optional[Callable] = None - if not hasattr(self, "connect_hook"): - self.connect_hook: Optional[Callable] = None + # `disconnect_hook`` is called when the websocket connection is lost if not hasattr(self, "disconnect_hook"): self.disconnect_hook: Optional[Callable] = None self.register_callback(self.record_last_seen_message) async def record_last_seen_message(self, message: Any): - """ - Automatically registered callback. Used for debugging and testing. + """Automatically registered callback. + + Used for debugging and testing. + ``` await mgr.next_event.wait() assert mgr.last_seen_message == + ``` Alternatively, if messages may come out of order, iterate until you see the type of message you want to test for. + ``` while True: await asyncio.wait_for(mgr.next_event.wait(), timeout=1) if mgr.last_seen_message['key_field'] == key_of_interest: break assert mgr.last_seen_message['value_field'] == expected_value + ``` """ self.last_seen_message = message self.next_event.set() @@ -116,30 +123,33 @@ async def on_auth(self, message: Any): self.authed_ws.set_result(self.unauth_ws.result()) def send(self, message: Any): - """ - Override the default Sending behavior to only accept a message instead of topic + message. - Topic is not a concept supported in this Backend. + """Send a message to the outbound queue. + + This method overrides the default Sending behavior to ignore topics and + only accept messages. Topic is not a concept supported in this Backend. """ # QueuedMessage is a NamedTuple of topic, contents, session_id self.outbound_queue.put_nowait(QueuedMessage("", message, None)) async def _publish(self, message: QueuedMessage): - """ - Once a message has been picked up from the inbound queue, processed by a callback, - then dropped onto the outbound queue, the outbound worker will call this method. + """Publish a message. - QueuedMessage is namedtuple of topic, contents, session_id. Only contents matter - for this Backend. + Once a message has been picked up from the inbound queue, + processed by a callback, then dropped onto the outbound queue, + the outbound worker will call this method. - Subclasses that need to serialize outbound data can define a .outbound_message_hook - instead of overriding this method. For instance, if using self.send() - you may want to define .outbound_message_hook = lambda model: model.json() + QueuedMessage is namedtuple of topic, contents, session_id. + Only contents matter for this Backend since topics are set to ''. + + Subclasses that need to serialize outbound data can define a + `.outbound_message_hook` instead of overriding this method. + For instance, if using self.send() + you may want to define `.outbound_message_hook = lambda model: model.json()` """ # Assume if an implementation has an auth_hook then it wants to delay # sending outbound messages over the wire until the session is authenticated. # use asyncio.wait_for so we don't end up in a hard-to-debug state if a Future # doesn't resolve for some reason. - if self.auth_hook: if not self.authed_ws.done(): # special logging here because this is a sign that you might be in @@ -149,20 +159,29 @@ async def _publish(self, message: QueuedMessage): ws = await asyncio.wait_for(self.authed_ws, timeout=self.publish_timeout) else: ws = await asyncio.wait_for(self.unauth_ws, timeout=self.publish_timeout) + logger.debug(f"Sending: {message.contents}") await ws.send(message.contents) async def _poll_loop(self): - """ + """Poll websockets for activity. + + TODO: determine if initialize overrides the base class at all in the subclass + When WebsocketManager.initialize() is awaited, it creates an asyncio Task that runs - this function. This is the meat of the WebsocketManager class. It handles creating - the websocket connection, reconnecting if the server disconnects, receiving messages - over the wire, and putting them onto the inbound message queue. + this function. + + This is the meat of the WebsocketManager class. + It handles creating the websocket connection, + reconnecting if the server disconnects, + receiving messages over the wire, and + putting them onto the inbound message queue. It also uses authentication pattern hooks if they're implemented. """ # Automatic reconnect https://websockets.readthedocs.io/en/stable/reference/client.html async for websocket in websockets.connect(self.ws_url): + # Run unauthenticated steps as well as connect and context hooks self.unauth_ws.set_result(websocket) if self.connect_hook: fn = ensure_async(self.connect_hook) @@ -170,6 +189,7 @@ async def _poll_loop(self): if self.context_hook: await self.context_hook() self.connected.set() + try: # Call the auth and init hooks (casting to async if necessary), passing in 'self' if self.auth_hook: @@ -178,14 +198,15 @@ async def _poll_loop(self): if self.init_hook: fn = ensure_async(self.init_hook) await fn(self) + async for message in websocket: logger.debug(f"Received: {message}") self.schedule_for_delivery(topic="", contents=message) + + # Raised if there's an error trying to connect, except websockets.ConnectionClosed: - # This will get raised if there's an error trying to connect, - # keeping it separate from unknown exceptions that a subclass might want - # to handle differently. continue + # Handle unknown excceptions. except Exception as e: await self.on_exception(e) continue @@ -205,10 +226,11 @@ async def _poll_loop(self): self.reconnections += 1 async def shutdown(self, now: bool = False): - """ - Custom shutdown logic to take account of closing our automatically-reconnecting websocket. - In an ideal world, we drain all outbound messages, stop the task that's reading new - inbound messages, and then perform the websocket close handshake. + """Perform shutdown and custom logic to account for reconnecting websocket. + + In an ideal world, we drain all outbound messages, + stop the task that's reading new inbound messages, + and then perform the websocket close handshake. """ self._shutting_down = True await super().shutdown(now) @@ -217,16 +239,15 @@ async def shutdown(self, now: bool = False): await ws.close() async def on_exception(self, exc: Exception): - # Called when we get an exception iterating over websocket messages before - # we reconnect, in case a Subclass wants to do something with it + """Perform additional pre-reconnect behavior when an exception happens""" logger.exception(exc) async def _create_topic_subscription(self, topic_name: str): - # Required method by the ABC base, but topics are irrelevant to this Backend + # noop since topics are irrelevant to this Backend pass async def _cleanup_topic_subscription(self, topic_name: str): - # Required method by the ABC base, but topics are irrelevant to this Backend + # noop since topics are irrelevant to this Backend pass async def _poll(self): diff --git a/sending/base.py b/sending/base.py index 8be8dc4..97f7b41 100644 --- a/sending/base.py +++ b/sending/base.py @@ -1,5 +1,15 @@ -"""This module defines an abstract PubSubManager class that must be used when - implementing custom managers or other features.""" +"""Publish Subscribe Managers and PubSub sessions. + +This module defines an abstract `AbstractPubSubManager` class that must be +subclassed when implementing custom managers or other features. + +This module also implements a `DetachedPubSubSession` that receives messages +only for topics which it has a subscription. + +The module also implements a more robust `PubSubSession`, a subclass of +`DetachedPubSubSession, which receives messages from topics that it or its +parent have a subscription. +""" import abc import asyncio import enum @@ -21,15 +31,18 @@ class SystemEvents(enum.Enum): - # We've forcibly disconnected from the pub/sub server. - # This is for situations where the underlying backend has forced a disconnect - # without the user asking. ZMQ MAXMSGSIZE cycling connections is a good example. + """System events related to the pub/sub manager + + We've forcibly disconnected from the pub/sub server. This is for situations + where the underlying backend has forced a disconnect without the user + asking. ZMQ `MAXMSGSIZE` cycling connections is a good example. + """ + FORCED_DISCONNECT = enum.auto() class AbstractPubSubManager(abc.ABC): - """ - Manages the publish-subscribe workflow. + """A manager for the publish-subscribe workflow. This abstract class provides a common base class for a publish-subscribe workflow and the management of components in the workflow. @@ -74,13 +87,15 @@ async def initialize( poll_workers=1, enable_polling=True, ): - """ - Initialize a pub-sub channel, specifically its queues and workers. - If enable_polling is False, it will only start the inbound and outbound - workers, not the poll worker. That is useful if you're writing tests - and don't want a connection to external IO to be started. + """Initialize a pub-sub channel, specifically its queues and workers. + + `enable_polling` if set to True, the default, will start the poll + workers. If `enable_polling` is False, it will only start the inbound + and outbound workers and not start the poll worker. The False setting + is useful if you're writing tests and don't want a connection to + external IO to be started. - E.g. + ``` mgr = SomeBackend() publish = mocker.patch.object(mgr, "_publish") @@ -94,6 +109,7 @@ def echo(msg: str): publish.assert_called_once_with( QueuedMessage(topic="test-topic, contents="echo test", session_id=None) ) + ``` """ if self.context_hook: await self.context_hook() @@ -111,7 +127,11 @@ def echo(msg: str): self.poll_workers.append(asyncio.create_task(self._poll_loop())) async def shutdown(self, now=False): - """Shut down a pub-sub channel and its related queues and workers""" + """Shut down a pub-sub channel and its related queues and workers. + + The `now` parameter will drain queues gracefully if set to the default + False. If set to True, the queues are cleared and set to None. + """ if not now: await self._drain_queues() @@ -143,6 +163,7 @@ async def shutdown(self, now=False): self.callback_ids_by_session.clear() async def _drain_queues(self): + """Private method which waits for queues to clear.""" await self.inbound_queue.join() await self.outbound_queue.join() @@ -367,13 +388,13 @@ def _emit_system_event(self, topic: str, event: SystemEvents): class DetachedPubSubSession: - """A session that does not receive messages for topics other than what it has explicitly - subscribed to. + """A session that receives messages from subscribed topics. - It still relies on the parent as the centralized queuing mechanism for processing inbound - and outbound messages, but it ensures total isolation for what messages get passed down to - the session's callbacks. This is helpful if you don't want to have a separate polling process - for each session. + It still relies on the parent PubSubManager as the centralized queuing + mechanism for processing inbound and outbound messages. It also ensures + total isolation for what messages get passed down to the session's callbacks + (detached from its parent's subscriptions). This is helpful when a separate + polling process is needed for each session. """ def __init__(self, parent: AbstractPubSubManager) -> None: @@ -448,9 +469,9 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): class PubSubSession(DetachedPubSubSession): - """A holder for callbacks and topic subscriptions. Also receives messages from topics - subscribed to by the parent manager. + """A holder for callbacks and topic subscriptions. + Also receives messages from topics subscribed to by the parent manager. This is helpful if you have a global topic that all sessions should subscribe to automatically without client input. """