From 253ced28f308450c7a1a93cc38f6d101ecd7d4c0 Mon Sep 17 00:00:00 2001 From: Prad Nelluru Date: Wed, 9 Mar 2022 13:48:08 -0500 Subject: [PATCH] feat: return singleton success future for exactly-once methods in Message (#608) * Return singleton success future for exactly-once methods in subscriber.Message --- google/cloud/pubsub_v1/subscriber/message.py | 24 ++++++++++++------- .../unit/pubsub_v1/subscriber/test_message.py | 3 +++ 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/google/cloud/pubsub_v1/subscriber/message.py b/google/cloud/pubsub_v1/subscriber/message.py index 5744aa71c..ab17bab78 100644 --- a/google/cloud/pubsub_v1/subscriber/message.py +++ b/google/cloud/pubsub_v1/subscriber/message.py @@ -40,6 +40,9 @@ attributes: {} }}""" +_SUCCESS_FUTURE = futures.Future() +_SUCCESS_FUTURE.set_result(AcknowledgeStatus.SUCCESS) + def _indent(lines: str, prefix: str = " ") -> str: """Indent some text. @@ -291,12 +294,13 @@ def ack_with_response(self) -> "futures.Future": pubsub_v1.subscriber.exceptions.AcknowledgeError exception will be thrown. """ - future = futures.Future() - req_future = None + req_future: Optional[futures.Future] if self._exactly_once_delivery_enabled_func(): + future = futures.Future() req_future = future else: - future.set_result(AcknowledgeStatus.SUCCESS) + future = _SUCCESS_FUTURE + req_future = None time_to_ack = math.ceil(time.time() - self._received_timestamp) self._request_queue.put( requests.AckRequest( @@ -390,12 +394,13 @@ def modify_ack_deadline_with_response(self, seconds: int) -> "futures.Future": will be thrown. """ - future = futures.Future() - req_future = None + req_future: Optional[futures.Future] if self._exactly_once_delivery_enabled_func(): + future = futures.Future() req_future = future else: - future.set_result(AcknowledgeStatus.SUCCESS) + future = _SUCCESS_FUTURE + req_future = None self._request_queue.put( requests.ModAckRequest( @@ -451,12 +456,13 @@ def nack_with_response(self) -> "futures.Future": will be thrown. """ - future = futures.Future() - req_future = None + req_future: Optional[futures.Future] if self._exactly_once_delivery_enabled_func(): + future = futures.Future() req_future = future else: - future.set_result(AcknowledgeStatus.SUCCESS) + future = _SUCCESS_FUTURE + req_future = None self._request_queue.put( requests.NackRequest( diff --git a/tests/unit/pubsub_v1/subscriber/test_message.py b/tests/unit/pubsub_v1/subscriber/test_message.py index f5c7bf3c7..0debabaf3 100644 --- a/tests/unit/pubsub_v1/subscriber/test_message.py +++ b/tests/unit/pubsub_v1/subscriber/test_message.py @@ -156,6 +156,7 @@ def test_ack_with_response_exactly_once_delivery_disabled(): ) ) assert future.result() == AcknowledgeStatus.SUCCESS + assert future == message._SUCCESS_FUTURE check_call_types(put, requests.AckRequest) @@ -205,6 +206,7 @@ def test_modify_ack_deadline_with_response_exactly_once_delivery_disabled(): requests.ModAckRequest(ack_id="bogus_ack_id", seconds=60, future=None) ) assert future.result() == AcknowledgeStatus.SUCCESS + assert future == message._SUCCESS_FUTURE check_call_types(put, requests.ModAckRequest) @@ -242,6 +244,7 @@ def test_nack_with_response_exactly_once_delivery_disabled(): ) ) assert future.result() == AcknowledgeStatus.SUCCESS + assert future == message._SUCCESS_FUTURE check_call_types(put, requests.NackRequest)