Skip to content

Commit

Permalink
Add a sleep in Jupyter Backend poll loop (#37)
Browse files Browse the repository at this point in the history
* Add a sleep in Jupyter Backend poll loop

* docstring

* make the sleep time a class attribute so we can change it in PA

* fix docstring
  • Loading branch information
Matt Kafonek authored Aug 25, 2022
1 parent c95f327 commit 51ddbec
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed
- Use `managed_service_fixtures` for Redis tests
- `WebsocketManager` backend uses vanilla `logging` instead of `structlog`, remove need for `structlog` dependency once `managed-service-fixtures` also drops it
- `JupyterBackend` introduce a short sleep in its poll loop while investigating 100% CPU usage

## [0.2.2] - 2022-07-28
### Changed
Expand Down
28 changes: 26 additions & 2 deletions sending/backends/jupyter.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import asyncio
from queue import Empty
from typing import Optional, Union

from jupyter_client import AsyncKernelClient
from jupyter_client.channels import ZMQSocketChannel
from zmq import NOBLOCK, Event, Socket, SocketOption, pyzmq_version, zmq_version
from zmq.asyncio import Context
from zmq.utils.monitor import recv_monitor_message
Expand All @@ -11,11 +13,18 @@


class JupyterKernelManager(AbstractPubSubManager):
def __init__(self, connection_info: dict, *, max_message_size: int = None):
def __init__(
self,
connection_info: dict,
*,
max_message_size: int = None,
sleep_between_polls: float = 0.005,
):
super().__init__()
self.connection_info = connection_info
self._monitor_sockets_for_topic: dict[str, Socket] = {}
self.max_message_size = max_message_size
self.sleep_between_polls = sleep_between_polls

async def initialize(
self, *, queue_size=0, inbound_workers=1, outbound_workers=1, poll_workers=1
Expand Down Expand Up @@ -93,7 +102,7 @@ def _cycle_socket(self, topic):

async def _poll(self):
for topic_name in self.subscribed_topics:
channel_obj = getattr(self._client, f"{topic_name}_channel")
channel_obj: ZMQSocketChannel = getattr(self._client, f"{topic_name}_channel")

while True:
try:
Expand All @@ -117,3 +126,18 @@ async def _poll(self):
# when it violates some constraint such as the max message size.
logger.info(f"ZMQ disconnected for topic '{topic}', cycling socket")
self._cycle_socket(topic)

async def _poll_loop(self):
"""
Override base Manager _poll_loop to switch the final asyncio.sleep from 0 to
something more than that (definable at init or after instantiation, default 0.005).
While observing JupyterManager in real world, containers are using 100% CPU.
Possibly due to this loop being too aggressive?
"""
while True:
try:
await self._poll()
except Exception:
logger.exception("Uncaught exception encountered while polling backend")
finally:
await asyncio.sleep(self.sleep_between_polls)

0 comments on commit 51ddbec

Please sign in to comment.