Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a sleep in Jupyter Backend poll loop #37

Merged
merged 4 commits into from
Aug 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed
- Use `managed_service_fixtures` for Redis tests
- `WebsocketManager` backend uses vanilla `logging` instead of `structlog`, remove need for `structlog` dependency once `managed-service-fixtures` also drops it
- `JupyterBackend` introduce a short sleep in its poll loop while investigating 100% CPU usage

## [0.2.2] - 2022-07-28
### Changed
Expand Down
28 changes: 26 additions & 2 deletions sending/backends/jupyter.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import asyncio
from queue import Empty
from typing import Optional, Union

from jupyter_client import AsyncKernelClient
from jupyter_client.channels import ZMQSocketChannel
from zmq import NOBLOCK, Event, Socket, SocketOption, pyzmq_version, zmq_version
from zmq.asyncio import Context
from zmq.utils.monitor import recv_monitor_message
Expand All @@ -11,11 +13,18 @@


class JupyterKernelManager(AbstractPubSubManager):
def __init__(self, connection_info: dict, *, max_message_size: int = None):
def __init__(
self,
connection_info: dict,
*,
max_message_size: int = None,
sleep_between_polls: float = 0.005,
):
super().__init__()
self.connection_info = connection_info
self._monitor_sockets_for_topic: dict[str, Socket] = {}
self.max_message_size = max_message_size
self.sleep_between_polls = sleep_between_polls

async def initialize(
self, *, queue_size=0, inbound_workers=1, outbound_workers=1, poll_workers=1
Expand Down Expand Up @@ -93,7 +102,7 @@ def _cycle_socket(self, topic):

async def _poll(self):
for topic_name in self.subscribed_topics:
channel_obj = getattr(self._client, f"{topic_name}_channel")
channel_obj: ZMQSocketChannel = getattr(self._client, f"{topic_name}_channel")

while True:
try:
Expand All @@ -117,3 +126,18 @@ async def _poll(self):
# when it violates some constraint such as the max message size.
logger.info(f"ZMQ disconnected for topic '{topic}', cycling socket")
self._cycle_socket(topic)

async def _poll_loop(self):
"""
Override base Manager _poll_loop to switch the final asyncio.sleep from 0 to
something more than that (definable at init or after instantiation, default 0.005).
While observing JupyterManager in real world, containers are using 100% CPU.
Possibly due to this loop being too aggressive?
"""
while True:
try:
await self._poll()
except Exception:
logger.exception("Uncaught exception encountered while polling backend")
finally:
await asyncio.sleep(self.sleep_between_polls)