Skip to content

Commit

Permalink
ThreadedZMQStream: close stream when it's done
Browse files Browse the repository at this point in the history
not just the underlying socket,
because the stream represents event listeners on the socket,
and can produce errors or at least warnings if the socket is closed before being unregistered
  • Loading branch information
minrk committed Mar 16, 2023
1 parent f6a03ac commit f528bc5
Showing 1 changed file with 26 additions and 1 deletion.
27 changes: 26 additions & 1 deletion jupyter_client/threaded.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import asyncio
import atexit
import time
import warnings
from concurrent.futures import Future
from threading import Event, Thread
from typing import Any, Dict, List, Optional

Expand Down Expand Up @@ -45,7 +47,7 @@ def __init__(
session : :class:`session.Session`
The session to use.
loop
A pyzmq ioloop to connect the socket to using a ZMQStream
A tornado ioloop to connect the socket to using a ZMQStream
"""
super().__init__()

Expand Down Expand Up @@ -80,6 +82,29 @@ def stop(self) -> None:

def close(self) -> None:
""" "Close the channel."""
if self.stream is not None and self.ioloop is not None:
# c.f.Future for threadsafe results
f = Future()

def close_stream():
try:
self.stream.close()
self.socket.close(linger=0)
self.socket = None
except Exception as e:
f.set_exception(e)
else:
f.set_result(None)

self.ioloop.add_callback(close_stream)
try:
f.result(timeout=5)
except Exception as e:
# This should probably be log.warning, but we don't have self.log
warnings.warn(
f"Error closing stream {self.stream}: {e}", RuntimeWarning, stacklevel=2
)

if self.socket is not None:
try:
self.socket.close(linger=0)
Expand Down

0 comments on commit f528bc5

Please sign in to comment.