From 51ddbec9b0bc71f44561af0247f814f2675ebef9 Mon Sep 17 00:00:00 2001 From: Matt Kafonek Date: Thu, 25 Aug 2022 16:15:59 -0400 Subject: [PATCH] Add a sleep in Jupyter Backend poll loop (#37) * Add a sleep in Jupyter Backend poll loop * docstring * make the sleep time a class attribute so we can change it in PA * fix docstring --- CHANGELOG.md | 1 + sending/backends/jupyter.py | 28 ++++++++++++++++++++++++++-- 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 93abeec..7197e6a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed - Use `managed_service_fixtures` for Redis tests - `WebsocketManager` backend uses vanilla `logging` instead of `structlog`, remove need for `structlog` dependency once `managed-service-fixtures` also drops it +- `JupyterBackend` introduce a short sleep in its poll loop while investigating 100% CPU usage ## [0.2.2] - 2022-07-28 ### Changed diff --git a/sending/backends/jupyter.py b/sending/backends/jupyter.py index 65f8c96..5016d32 100644 --- a/sending/backends/jupyter.py +++ b/sending/backends/jupyter.py @@ -1,7 +1,9 @@ +import asyncio from queue import Empty from typing import Optional, Union from jupyter_client import AsyncKernelClient +from jupyter_client.channels import ZMQSocketChannel from zmq import NOBLOCK, Event, Socket, SocketOption, pyzmq_version, zmq_version from zmq.asyncio import Context from zmq.utils.monitor import recv_monitor_message @@ -11,11 +13,18 @@ class JupyterKernelManager(AbstractPubSubManager): - def __init__(self, connection_info: dict, *, max_message_size: int = None): + def __init__( + self, + connection_info: dict, + *, + max_message_size: int = None, + sleep_between_polls: float = 0.005, + ): super().__init__() self.connection_info = connection_info self._monitor_sockets_for_topic: dict[str, Socket] = {} self.max_message_size = max_message_size + self.sleep_between_polls = sleep_between_polls async def initialize( self, *, queue_size=0, inbound_workers=1, outbound_workers=1, poll_workers=1 @@ -93,7 +102,7 @@ def _cycle_socket(self, topic): async def _poll(self): for topic_name in self.subscribed_topics: - channel_obj = getattr(self._client, f"{topic_name}_channel") + channel_obj: ZMQSocketChannel = getattr(self._client, f"{topic_name}_channel") while True: try: @@ -117,3 +126,18 @@ async def _poll(self): # when it violates some constraint such as the max message size. logger.info(f"ZMQ disconnected for topic '{topic}', cycling socket") self._cycle_socket(topic) + + async def _poll_loop(self): + """ + Override base Manager _poll_loop to switch the final asyncio.sleep from 0 to + something more than that (definable at init or after instantiation, default 0.005). + While observing JupyterManager in real world, containers are using 100% CPU. + Possibly due to this loop being too aggressive? + """ + while True: + try: + await self._poll() + except Exception: + logger.exception("Uncaught exception encountered while polling backend") + finally: + await asyncio.sleep(self.sleep_between_polls)