Skip to content

Commit

Permalink
[fix] block in close
Browse files Browse the repository at this point in the history
In case that close() is called while doing
read operation, the close() call is blocked and whole application
is stucked. Shut-down of socket will resolve this issue and releases
thread blocked at readline().

Regression test for graceful shutdown also included.
  • Loading branch information
richard78917 authored and Richard Wolfert committed Feb 14, 2020
1 parent f426d37 commit b6ea4c0
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 2 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## 0.7.0 (2020-XX-XX)

- Fixed issue when threads are blocked on close while reading, #11 by @richard78917

## 0.6.0 (2020-01-08)

- Added Python 3.8.* support, #10
Expand Down
9 changes: 7 additions & 2 deletions pynats/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import pkg_resources

from pynats.exceptions import NATSInvalidResponse, NATSUnexpectedResponse
from pynats.exceptions import NATSInvalidResponse, NATSUnexpectedResponse, NATSSocketError
from pynats.nuid import NUID

__all__ = ("NATSSubscription", "NATSMessage", "NATSClient")
Expand Down Expand Up @@ -154,6 +154,7 @@ def connect(self) -> None:
self._recv(INFO_RE)

def close(self) -> None:
self._socket.shutdown(socket.SHUT_RDWR)
self._socket_file.close()
self._socket.close()

Expand Down Expand Up @@ -279,7 +280,11 @@ def _readline(self, *, size: int = None) -> bytes:
read = io.BytesIO()

while True:
line = cast(bytes, self._socket_file.readline())
raw_bytes = self._socket_file.readline()
if not raw_bytes:
raise NATSSocketError("unable to read from socket")

line = cast(bytes, raw_bytes)
read.write(line)

if size is not None:
Expand Down
5 changes: 5 additions & 0 deletions pynats/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,8 @@ class NATSInvalidResponse(NATSError):
def __init__(self, line: bytes, *args, **kwargs) -> None:
self.line = line
super().__init__()

class NATSSocketError(NATSError):
def __init__(self, line: bytes, *args, **kwargs) -> None:
self.line = line
super().__init__()
21 changes: 21 additions & 0 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import pytest

from pynats import NATSClient
from pynats.exceptions import NATSSocketError


@pytest.fixture
Expand Down Expand Up @@ -179,3 +180,23 @@ def test_request_timeout(nats_url):
with NATSClient(nats_url, socket_timeout=2) as client:
with pytest.raises(socket.timeout):
client.request("test-subject")

def test_graceful_shutdown(nats_url):
def worker(client, connected_event):
client.connect()
connected_event.set()
try:
client.wait()
except NATSSocketError:
assert True
except Exception:
assert False, "unexpected Exception raised"

client = NATSClient(nats_url)
connected_event = threading.Event()
thread = threading.Thread(target=worker, args=[client, connected_event])
thread.start()
assert connected_event.wait(5), "unable to connect"
client.close()
thread.join(5)
assert not thread.is_alive(), "thread did not finish"

0 comments on commit b6ea4c0

Please sign in to comment.