diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/messages_on_hold.py b/google/cloud/pubsub_v1/subscriber/_protocol/messages_on_hold.py index 5c3cc1a75..63c2edbfa 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/messages_on_hold.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/messages_on_hold.py @@ -13,6 +13,7 @@ # limitations under the License. import collections +import logging import typing from typing import Any, Callable, Iterable, Optional @@ -20,6 +21,9 @@ from google.cloud.pubsub_v1 import subscriber +_LOGGER = logging.getLogger(__name__) + + class MessagesOnHold(object): """Tracks messages on hold by ordering key. Not thread-safe.""" @@ -113,14 +117,17 @@ def activate_ordering_keys( Args: ordering_keys: - The ordering keys to activate. May be empty. + The ordering keys to activate. May be empty, or contain duplicates. schedule_message_callback: The callback to call to schedule a message to be sent to the user. """ for key in ordering_keys: - assert ( - self._pending_ordered_messages.get(key) is not None - ), "A message queue should exist for every ordered message in flight." + pending_ordered_messages = self._pending_ordered_messages.get(key) + if pending_ordered_messages is None: + _LOGGER.warning( + "No message queue exists for message ordering key: %s.", key + ) + continue next_msg = self._get_next_for_ordering_key(key) if next_msg: # Schedule the next message because the previous was dropped. @@ -154,15 +161,20 @@ def _get_next_for_ordering_key( def _clean_up_ordering_key(self, ordering_key: str) -> None: """Clean up state for an ordering key with no pending messages. - Args: + Args ordering_key: The ordering key to clean up. """ message_queue = self._pending_ordered_messages.get(ordering_key) - assert ( - message_queue is not None - ), "Cleaning up ordering key that does not exist." - assert not len(message_queue), ( - "Ordering key must only be removed if there are no messages " - "left for that key." - ) + if message_queue is None: + _LOGGER.warning( + "Tried to clean up ordering key that does not exist: %s", ordering_key + ) + return + if len(message_queue) > 0: + _LOGGER.warning( + "Tried to clean up ordering key: %s with %d messages remaining.", + ordering_key, + len(message_queue), + ) + return del self._pending_ordered_messages[ordering_key] diff --git a/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py b/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py index 0d28ec447..5e1dcf91b 100644 --- a/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py +++ b/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py @@ -109,6 +109,72 @@ def test_ordered_messages_one_key(): assert moh.size == 0 +def test_ordered_messages_drop_duplicate_keys(caplog): + moh = messages_on_hold.MessagesOnHold() + + msg1 = make_message(ack_id="ack1", ordering_key="key1") + moh.put(msg1) + assert moh.size == 1 + + msg2 = make_message(ack_id="ack2", ordering_key="key1") + moh.put(msg2) + assert moh.size == 2 + + # Get first message for "key1" + assert moh.get() == msg1 + assert moh.size == 1 + + # Still waiting on the previously-sent message for "key1", and there are no + # other messages, so return None. + assert moh.get() is None + assert moh.size == 1 + + # Activate "key1". + callback_tracker = ScheduleMessageCallbackTracker() + moh.activate_ordering_keys(["key1", "key1"], callback_tracker) + assert callback_tracker.called + assert callback_tracker.message == msg2 + assert moh.size == 0 + assert len(moh._pending_ordered_messages) == 0 + + # Activate "key1" again + callback_tracker = ScheduleMessageCallbackTracker() + moh.activate_ordering_keys(["key1"], callback_tracker) + assert not callback_tracker.called + + # Activate "key1" again. There are no other messages for that key, so clean + # up state for that key. + callback_tracker = ScheduleMessageCallbackTracker() + moh.activate_ordering_keys(["key1"], callback_tracker) + assert not callback_tracker.called + + msg3 = make_message(ack_id="ack3", ordering_key="key1") + moh.put(msg3) + assert moh.size == 1 + + # Get next message for "key1" + assert moh.get() == msg3 + assert moh.size == 0 + + # Activate "key1". + callback_tracker = ScheduleMessageCallbackTracker() + moh.activate_ordering_keys(["key1"], callback_tracker) + assert not callback_tracker.called + + # Activate "key1" again. There are no other messages for that key, so clean + # up state for that key. + callback_tracker = ScheduleMessageCallbackTracker() + moh.activate_ordering_keys(["key1"], callback_tracker) + assert not callback_tracker.called + + # Activate "key1" again after being cleaned up. There are no other messages for that key, so clean + # up state for that key. + callback_tracker = ScheduleMessageCallbackTracker() + moh.activate_ordering_keys(["key1"], callback_tracker) + assert not callback_tracker.called + assert "No message queue exists for message ordering key: key1" in caplog.text + + def test_ordered_messages_two_keys(): moh = messages_on_hold.MessagesOnHold() @@ -278,3 +344,39 @@ def test_ordered_and_unordered_messages_interleaved(): # No messages left. assert moh.get() is None assert moh.size == 0 + + +def test_cleanup_nonexistent_key(caplog): + moh = messages_on_hold.MessagesOnHold() + moh._clean_up_ordering_key("non-existent-key") + assert ( + "Tried to clean up ordering key that does not exist: non-existent-key" + in caplog.text + ) + + +def test_cleanup_key_with_messages(caplog): + moh = messages_on_hold.MessagesOnHold() + + # Put message with "key1". + msg1 = make_message(ack_id="ack1", ordering_key="key1") + moh.put(msg1) + assert moh.size == 1 + + # Put another message "key1" + msg2 = make_message(ack_id="ack2", ordering_key="key1") + moh.put(msg2) + assert moh.size == 2 + + # Get first message for "key1" + assert moh.get() == msg1 + assert moh.size == 1 + + # Get first message for "key1" + assert moh.get() is None + assert moh.size == 1 + + moh._clean_up_ordering_key("key1") + assert ( + "Tried to clean up ordering key: key1 with 1 messages remaining." in caplog.text + )