From fa0754bf77da3583d5a69d0f571a1de58581ec59 Mon Sep 17 00:00:00 2001 From: Daniel Thorn Date: Tue, 8 Jan 2019 16:06:35 -0800 Subject: [PATCH 1/2] PubSub: Propagate RetryError in PublisherClient.publish --- pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py b/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py index f187024b7cf7..d3fd0d956a90 100644 --- a/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py +++ b/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py @@ -199,7 +199,7 @@ def _commit(self): try: response = self._client.api.publish(self._topic, self._messages) - except google.api_core.exceptions.GoogleAPICallError as exc: + except google.api_core.exceptions.GoogleAPIError as exc: # We failed to publish, set the exception on all futures and # exit. self._status = base.BatchStatus.ERROR From d9fdaa5a3f4a4b9c1c692b57cb9ca7092e5ae70b Mon Sep 17 00:00:00 2001 From: Daniel Thorn Date: Wed, 9 Jan 2019 13:55:01 -0800 Subject: [PATCH 2/2] Add unit test --- .../pubsub_v1/publisher/batch/test_thread.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/pubsub/tests/unit/pubsub_v1/publisher/batch/test_thread.py b/pubsub/tests/unit/pubsub_v1/publisher/batch/test_thread.py index af04f865dd40..d323c2ed2d24 100644 --- a/pubsub/tests/unit/pubsub_v1/publisher/batch/test_thread.py +++ b/pubsub/tests/unit/pubsub_v1/publisher/batch/test_thread.py @@ -221,6 +221,25 @@ def test_block__commmit_api_error(): assert future.exception() == error +def test_block__commmit_retry_error(): + batch = create_batch() + futures = ( + batch.publish({"data": b"blah blah blah"}), + batch.publish({"data": b"blah blah blah blah"}), + ) + + # Make the API throw an error when publishing. + error = google.api_core.exceptions.RetryError("uh oh", None) + patch = mock.patch.object(type(batch.client.api), "publish", side_effect=error) + + with patch: + batch._commit() + + for future in futures: + assert future.done() + assert future.exception() == error + + def test_monitor(): batch = create_batch(max_latency=5.0) with mock.patch.object(time, "sleep") as sleep: