Skip to content

Commit

Permalink
ThreadedZMQStream: close stream when it's done
Browse files Browse the repository at this point in the history
not just the underlying socket,
because the stream represents event listeners on the socket,
and can produce errors or at least warnings if the socket is closed before being unregistered
  • Loading branch information
minrk committed Mar 16, 2023
1 parent f6a03ac commit 9fc5a57
Showing 1 changed file with 26 additions and 1 deletion.
27 changes: 26 additions & 1 deletion jupyter_client/threaded.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
import asyncio
import atexit
import time
from concurrent.futures import Future
from threading import Event, Thread
from typing import Any, Dict, List, Optional

import zmq
from tornado.ioloop import IOLoop
from traitlets import Instance, Type
from traitlets.log import get_logger
from zmq.eventloop import zmqstream

from .channels import HBChannel
Expand Down Expand Up @@ -45,7 +47,7 @@ def __init__(
session : :class:`session.Session`
The session to use.
loop
A pyzmq ioloop to connect the socket to using a ZMQStream
A tornado ioloop to connect the socket to using a ZMQStream
"""
super().__init__()

Expand Down Expand Up @@ -80,6 +82,29 @@ def stop(self) -> None:

def close(self) -> None:
""" "Close the channel."""
if self.stream is not None and self.ioloop is not None:
# c.f.Future for threadsafe results
f: Future = Future()

def close_stream():
try:
if self.stream is not None:
self.stream.close(linger=0)
self.stream = None
except Exception as e:
f.set_exception(e)
else:
f.set_result(None)

self.ioloop.add_callback(close_stream)
# wait for result
try:
f.result(timeout=5)
except Exception as e:
log = get_logger()
msg = f"Error closing stream {self.stream}: {e}"
log.warning(msg, RuntimeWarning, stacklevel=2)

if self.socket is not None:
try:
self.socket.close(linger=0)
Expand Down

0 comments on commit 9fc5a57

Please sign in to comment.