Skip to content

Commit

Permalink
fix: convert all RPC error types to exceptions (#170)
Browse files Browse the repository at this point in the history
  • Loading branch information
plamut authored Sep 16, 2020
1 parent f34e5b5 commit 89c671a
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,22 @@
"""The load threshold below which to resume the incoming message stream."""


def _maybe_wrap_exception(exception):
"""Wraps a gRPC exception class, if needed."""
if isinstance(exception, grpc.RpcError):
return exceptions.from_grpc_error(exception)
return exception
def _wrap_as_exception(maybe_exception):
"""Wrap an object as a Python exception, if needed.
Args:
maybe_exception (Any): The object to wrap, usually a gRPC exception class.
Returns:
The argument itself if an instance of ``BaseException``, otherwise
the argument represented as an instance of ``Exception`` (sub)class.
"""
if isinstance(maybe_exception, grpc.RpcError):
return exceptions.from_grpc_error(maybe_exception)
elif isinstance(maybe_exception, BaseException):
return maybe_exception

return Exception(maybe_exception)


def _wrap_callback_errors(callback, on_callback_error, message):
Expand Down Expand Up @@ -656,7 +667,7 @@ def _should_recover(self, exception):
Will be :data:`True` if the ``exception`` is "acceptable", i.e.
in a list of retryable / idempotent exceptions.
"""
exception = _maybe_wrap_exception(exception)
exception = _wrap_as_exception(exception)
# If this is in the list of idempotent exceptions, then we want to
# recover.
if isinstance(exception, _RETRYABLE_STREAM_ERRORS):
Expand All @@ -678,7 +689,7 @@ def _should_terminate(self, exception):
Will be :data:`True` if the ``exception`` is "acceptable", i.e.
in a list of terminating exceptions.
"""
exception = _maybe_wrap_exception(exception)
exception = _wrap_as_exception(exception)
if isinstance(exception, _TERMINATING_STREAM_ERRORS):
_LOGGER.info("Observed terminating stream error %s", exception)
return True
Expand All @@ -697,9 +708,9 @@ def _on_rpc_done(self, future):
background consumer and preventing it from being ``joined()``.
"""
_LOGGER.info("RPC termination has signaled streaming pull manager shutdown.")
future = _maybe_wrap_exception(future)
error = _wrap_as_exception(future)
thread = threading.Thread(
name=_RPC_ERROR_THREAD_NAME, target=self.close, kwargs={"reason": future}
name=_RPC_ERROR_THREAD_NAME, target=self.close, kwargs={"reason": error}
)
thread.daemon = True
thread.start()
12 changes: 9 additions & 3 deletions tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@
mock.create_autospec(grpc.RpcError, instance=True),
exceptions.GoogleAPICallError,
),
({"error": "RPC terminated"}, Exception),
("something broke", Exception),
],
)
def test__maybe_wrap_exception(exception, expected_cls):
def test__wrap_as_exception(exception, expected_cls):
assert isinstance(
streaming_pull_manager._maybe_wrap_exception(exception), expected_cls
streaming_pull_manager._wrap_as_exception(exception), expected_cls
)


Expand Down Expand Up @@ -948,8 +950,12 @@ def test__on_rpc_done(thread):
manager._on_rpc_done(mock.sentinel.error)

thread.assert_called_once_with(
name=mock.ANY, target=manager.close, kwargs={"reason": mock.sentinel.error}
name=mock.ANY, target=manager.close, kwargs={"reason": mock.ANY}
)
_, kwargs = thread.call_args
reason = kwargs["kwargs"]["reason"]
assert isinstance(reason, Exception)
assert reason.args == (mock.sentinel.error,) # Exception wraps the original error


def test_activate_ordering_keys():
Expand Down

0 comments on commit 89c671a

Please sign in to comment.