Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pub/Sub: buffered messages delivered multiple times ignoring acknowledgment deadline #9252

Closed
ghost opened this issue Sep 19, 2019 · 20 comments · Fixed by #9268 or #9525
Closed

Pub/Sub: buffered messages delivered multiple times ignoring acknowledgment deadline #9252

ghost opened this issue Sep 19, 2019 · 20 comments · Fixed by #9268 or #9525
Assignees
Labels
api: pubsub Issues related to the Pub/Sub API. priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.

Comments

@ghost
Copy link

ghost commented Sep 19, 2019

I noticed a problem where buffered messages in pubsub library are delivered multiple times ignoring acknowledgment deadline. I have a subscription with acknowledgment deadline set to 5 minutes but messages are delivered multiple times to the same subscriber after about 30 seconds.

My code example starts a subscriber client with max_messages set to 1. When two messages are sent to the subscriptions, the second message is always duplicated. This same behavior also happens with higher max_messages values. For example, using the default value 100 for max_messages, when 150 messages are sent to the subscription, some of the messages are duplicated if the processing of the first 100 messages takes more than 30 seconds.

I understand that with large backlog of small messages, the messages can get redelivered (Dealing with large backlogs of small messages), however, shouldn't the messages be re-delivered only after the ack_deadline is exceeded?

Environment details

  • OS type and version: Ubuntu 18.04.3 LTS
  • Python version and virtual environment information: Python 3.7.3
  • google-cloud-pubsub version: google-cloud-pubsub==1.0.0

Steps to reproduce

  1. Create a subscription with a high ack_deadline.
  2. Start subscriber (see code examples): python subscriber.py -p PROJECT_ID SUBSCRIPTION_NAME.
  3. Send two messages to the pubsub topic.
  4. The second message is duplicated.

Code example

Example subscriber code here: https://gist.github.com/qvik-olli/0bfd4ace2d06def1675a76fbc20493e5

Logs

From the subscriber script:

2019-09-19 14:10:37,740 INFO ThreadPoolExecutor-ThreadScheduler_0: [652240118977134] got message with content: b'0'
2019-09-19 14:10:37,740 INFO ThreadPoolExecutor-ThreadScheduler_0: [652240118977134] sleeping
2019-09-19 14:11:17,747 INFO ThreadPoolExecutor-ThreadScheduler_0: [652240118977134] done sleeping
2019-09-19 14:11:17,819 INFO ThreadPoolExecutor-ThreadScheduler_1: [652240118977135] got message with content: b'1'
2019-09-19 14:11:17,819 INFO ThreadPoolExecutor-ThreadScheduler_1: [652240118977135] sleeping
2019-09-19 14:11:57,859 INFO ThreadPoolExecutor-ThreadScheduler_1: [652240118977135] done sleeping
2019-09-19 14:11:57,986 ERROR ThreadPoolExecutor-ThreadScheduler_0: [652240118977135] Duplicate message!!!

With debug logging from the pubsub library:

2019-09-19 14:10:35,175 DEBUG Thread-LeaseMaintainer: The current p99 value is 10 seconds.
2019-09-19 14:10:35,176 DEBUG Thread-LeaseMaintainer: Snoozing lease management for 6.301498 seconds.
2019-09-19 14:10:37,634 DEBUG Thread-ConsumeBidirectionalStream: recved response.
2019-09-19 14:10:37,634 DEBUG Thread-ConsumeBidirectionalStream: Processing 2 received message(s), currenty on hold 0.
2019-09-19 14:10:37,740 DEBUG Thread-ConsumeBidirectionalStream: Sent request(s) over unary RPC.
2019-09-19 14:10:37,740 DEBUG Thread-ConsumeBidirectionalStream: Message backlog over load at 1.00, pausing.
2019-09-19 14:10:37,740 DEBUG Thread-ConsumeBidirectionalStream: Scheduling callbacks for 1 new messages, new total on hold 1.
2019-09-19 14:10:37,740 INFO ThreadPoolExecutor-ThreadScheduler_0: [652240118977134] got message with content: b'0'
2019-09-19 14:10:37,740 INFO ThreadPoolExecutor-ThreadScheduler_0: [652240118977134] sleeping
2019-09-19 14:10:37,740 DEBUG Thread-ConsumeBidirectionalStream: paused, waiting for waking.
2019-09-19 14:10:41,477 DEBUG Thread-LeaseMaintainer: The current p99 value is 10 seconds.
2019-09-19 14:10:41,477 DEBUG Thread-LeaseMaintainer: Renewing lease for 1 ack IDs.
2019-09-19 14:10:41,539 DEBUG Thread-LeaseMaintainer: Sent request(s) over unary RPC.
2019-09-19 14:10:41,539 DEBUG Thread-LeaseMaintainer: Snoozing lease management for 5.628947 seconds.
2019-09-19 14:10:47,168 DEBUG Thread-LeaseMaintainer: The current p99 value is 10 seconds.
2019-09-19 14:10:47,168 DEBUG Thread-LeaseMaintainer: Renewing lease for 1 ack IDs.
2019-09-19 14:10:47,238 DEBUG Thread-LeaseMaintainer: Sent request(s) over unary RPC.
2019-09-19 14:10:47,238 DEBUG Thread-LeaseMaintainer: Snoozing lease management for 1.232672 seconds.
2019-09-19 14:10:48,471 DEBUG Thread-LeaseMaintainer: The current p99 value is 10 seconds.
2019-09-19 14:10:48,471 DEBUG Thread-LeaseMaintainer: Renewing lease for 1 ack IDs.
2019-09-19 14:10:48,584 DEBUG Thread-LeaseMaintainer: Sent request(s) over unary RPC.
2019-09-19 14:10:48,584 DEBUG Thread-LeaseMaintainer: Snoozing lease management for 0.579531 seconds.
2019-09-19 14:10:49,164 DEBUG Thread-LeaseMaintainer: The current p99 value is 10 seconds.
2019-09-19 14:10:49,164 DEBUG Thread-LeaseMaintainer: Renewing lease for 1 ack IDs.
2019-09-19 14:10:49,242 DEBUG Thread-LeaseMaintainer: Sent request(s) over unary RPC.
2019-09-19 14:10:49,242 DEBUG Thread-LeaseMaintainer: Snoozing lease management for 5.177725 seconds.
2019-09-19 14:10:54,420 DEBUG Thread-LeaseMaintainer: The current p99 value is 10 seconds.
2019-09-19 14:10:54,420 DEBUG Thread-LeaseMaintainer: Renewing lease for 1 ack IDs.
2019-09-19 14:10:54,477 DEBUG Thread-LeaseMaintainer: Sent request(s) over unary RPC.
2019-09-19 14:10:54,477 DEBUG Thread-LeaseMaintainer: Snoozing lease management for 6.448523 seconds.
2019-09-19 14:11:00,926 DEBUG Thread-LeaseMaintainer: The current p99 value is 10 seconds.
2019-09-19 14:11:00,926 DEBUG Thread-LeaseMaintainer: Renewing lease for 1 ack IDs.
2019-09-19 14:11:01,002 DEBUG Thread-LeaseMaintainer: Sent request(s) over unary RPC.
2019-09-19 14:11:01,002 DEBUG Thread-LeaseMaintainer: Snoozing lease management for 7.749161 seconds.
2019-09-19 14:11:02,469 DEBUG Thread-Heartbeater: Sent heartbeat.
2019-09-19 14:11:08,752 DEBUG Thread-LeaseMaintainer: The current p99 value is 10 seconds.
2019-09-19 14:11:08,752 DEBUG Thread-LeaseMaintainer: Renewing lease for 1 ack IDs.
2019-09-19 14:11:08,813 DEBUG Thread-LeaseMaintainer: Sent request(s) over unary RPC.
2019-09-19 14:11:08,813 DEBUG Thread-LeaseMaintainer: Snoozing lease management for 4.640038 seconds.
2019-09-19 14:11:13,454 DEBUG Thread-LeaseMaintainer: The current p99 value is 10 seconds.
2019-09-19 14:11:13,454 DEBUG Thread-LeaseMaintainer: Renewing lease for 1 ack IDs.
2019-09-19 14:11:13,532 DEBUG Thread-LeaseMaintainer: Sent request(s) over unary RPC.
2019-09-19 14:11:13,532 DEBUG Thread-LeaseMaintainer: Snoozing lease management for 4.966822 seconds.
2019-09-19 14:11:17,747 INFO ThreadPoolExecutor-ThreadScheduler_0: [652240118977134] done sleeping
2019-09-19 14:11:17,748 DEBUG Thread-CallbackRequestDispatcher: Handling 1 batched requests
2019-09-19 14:11:17,818 DEBUG Thread-CallbackRequestDispatcher: Sent request(s) over unary RPC.
2019-09-19 14:11:17,818 DEBUG Thread-CallbackRequestDispatcher: Current load: 0.00
2019-09-19 14:11:17,818 DEBUG Thread-CallbackRequestDispatcher: Released held message to leaser, scheduling callback for it, still on hold 0.
2019-09-19 14:11:17,819 INFO ThreadPoolExecutor-ThreadScheduler_1: [652240118977135] got message with content: b'1'
2019-09-19 14:11:17,819 INFO ThreadPoolExecutor-ThreadScheduler_1: [652240118977135] sleeping
2019-09-19 14:11:17,819 DEBUG Thread-CallbackRequestDispatcher: Did not resume, current load is 1.00.
2019-09-19 14:11:18,499 DEBUG Thread-LeaseMaintainer: The current p99 value is 41 seconds.
2019-09-19 14:11:18,499 DEBUG Thread-LeaseMaintainer: Renewing lease for 1 ack IDs.
2019-09-19 14:11:18,575 DEBUG Thread-LeaseMaintainer: Sent request(s) over unary RPC.
2019-09-19 14:11:18,575 DEBUG Thread-LeaseMaintainer: Snoozing lease management for 18.568330 seconds.
2019-09-19 14:11:32,469 DEBUG Thread-Heartbeater: Sent heartbeat.
2019-09-19 14:11:37,144 DEBUG Thread-LeaseMaintainer: The current p99 value is 41 seconds.
2019-09-19 14:11:37,144 DEBUG Thread-LeaseMaintainer: Renewing lease for 1 ack IDs.
2019-09-19 14:11:37,470 DEBUG Thread-LeaseMaintainer: Sent request(s) over unary RPC.
2019-09-19 14:11:37,470 DEBUG Thread-LeaseMaintainer: Snoozing lease management for 5.602221 seconds.
2019-09-19 14:11:43,072 DEBUG Thread-LeaseMaintainer: The current p99 value is 41 seconds.
2019-09-19 14:11:43,072 DEBUG Thread-LeaseMaintainer: Renewing lease for 1 ack IDs.
2019-09-19 14:11:43,146 DEBUG Thread-LeaseMaintainer: Sent request(s) over unary RPC.
2019-09-19 14:11:43,146 DEBUG Thread-LeaseMaintainer: Snoozing lease management for 22.866717 seconds.
2019-09-19 14:11:57,859 INFO ThreadPoolExecutor-ThreadScheduler_1: [652240118977135] done sleeping
2019-09-19 14:11:57,859 DEBUG Thread-CallbackRequestDispatcher: Handling 1 batched requests
2019-09-19 14:11:57,923 DEBUG Thread-CallbackRequestDispatcher: Sent request(s) over unary RPC.
2019-09-19 14:11:57,924 DEBUG Thread-CallbackRequestDispatcher: Current load: 0.00
2019-09-19 14:11:57,924 DEBUG Thread-CallbackRequestDispatcher: Current load is 0.00, resuming consumer.
2019-09-19 14:11:57,924 DEBUG Thread-ConsumeBidirectionalStream: woken.
2019-09-19 14:11:57,924 DEBUG Thread-ConsumeBidirectionalStream: waiting for recv.
2019-09-19 14:11:57,924 DEBUG Thread-ConsumeBidirectionalStream: recved response.
2019-09-19 14:11:57,924 DEBUG Thread-ConsumeBidirectionalStream: Processing 1 received message(s), currenty on hold 0.
2019-09-19 14:11:57,985 DEBUG Thread-ConsumeBidirectionalStream: Sent request(s) over unary RPC.
2019-09-19 14:11:57,986 DEBUG Thread-ConsumeBidirectionalStream: Message backlog over load at 1.00, pausing.
2019-09-19 14:11:57,986 DEBUG Thread-ConsumeBidirectionalStream: Scheduling callbacks for 1 new messages, new total on hold 0.
2019-09-19 14:11:57,986 ERROR ThreadPoolExecutor-ThreadScheduler_0: [652240118977135] Duplicate message!!!
2019-09-19 14:11:57,986 DEBUG Thread-ConsumeBidirectionalStream: paused, waiting for waking.
2019-09-19 14:11:57,986 DEBUG Thread-CallbackRequestDispatcher: Handling 1 batched requests
2019-09-19 14:11:58,087 DEBUG Thread-CallbackRequestDispatcher: Sent request(s) over unary RPC.
2019-09-19 14:11:58,088 DEBUG Thread-CallbackRequestDispatcher: Current load: 0.00
2019-09-19 14:11:58,088 DEBUG Thread-CallbackRequestDispatcher: Current load is 0.00, resuming consumer.
2019-09-19 14:11:58,088 DEBUG Thread-ConsumeBidirectionalStream: woken.
2019-09-19 14:11:58,088 DEBUG Thread-ConsumeBidirectionalStream: waiting for recv.
2019-09-19 14:12:02,470 DEBUG Thread-Heartbeater: Sent heartbeat.
2019-09-19 14:12:06,013 DEBUG Thread-LeaseMaintainer: The current p99 value is 81 seconds.
2019-09-19 14:12:06,013 DEBUG Thread-LeaseMaintainer: Snoozing lease management for 71.733369 seconds.
@plamut plamut added api: pubsub Issues related to the Pub/Sub API. type: question Request for information or clarification. Not an issue. labels Sep 19, 2019
@plamut
Copy link
Contributor

plamut commented Sep 19, 2019

@qvik-olli Thanks for the detailed report!

This sounds as if the request(s) to modify the message acknowledge deadline do not arrive in time, and the backend re-sends the message. Can you check if the subscription's max ACK deadline is indeed set to 300 seconds (5 minutes), and not just to e.g. the reported 30 seconds because of a typo or something?

Technical details:
Received messages that exceed the max_messages setting are put into a waiting buffer until the lease manager's load drops under a certain threshold, at which point some of these messages are released to the lease management.

The messages in the buffer are not lease-managed, and if they need to wait there for longer than the acknowledgment deadline, the backend assumes they got lost and re-sends them.

Processing the first message takes ~40 seconds, which exceeds the default subscription's ACK deadline of 10 seconds. However, if the deadline is indeed correctly set to 300 seconds (5 minutes), the reported behavior is actually surprising and would require a deeper look.

@ghost
Copy link
Author

ghost commented Sep 20, 2019

Thank you @plamut for your quick response. I'm sure about the ack deadline and I double-checked that it is 300 seconds (5 minutes).

You can forget what I said about 30 seconds. I did some further testing and it seems that there is some dynamic timeout that seems to be changing depending on how long does it take to process a batch of messages. This timeout seems to ignore ack deadline.

I did some tests with larger amount of messages. I used the script you can find in my first message. I set the message handler to sleep 1 second when a message arrived. max_messages was set to 1 and ack deadline was set to 600 seconds:

2019-09-20 09:09:29,188 INFO MainThread: Subscriber started
2019-09-20 09:09:34,777 INFO ThreadPoolExecutor-ThreadScheduler_0: [652265204686881] got message with content: b'1'
2019-09-20 09:09:34,778 INFO ThreadPoolExecutor-ThreadScheduler_0: [652265204686881] sleeping
2019-09-20 09:09:35,779 INFO ThreadPoolExecutor-ThreadScheduler_0: [652265204686881] done sleeping
2019-09-20 09:09:35,850 INFO ThreadPoolExecutor-ThreadScheduler_1: [652265204686882] got message with content: b'2'
2019-09-20 09:09:35,851 INFO ThreadPoolExecutor-ThreadScheduler_1: [652265204686882] sleeping
2019-09-20 09:09:36,852 INFO ThreadPoolExecutor-ThreadScheduler_1: [652265204686882] done sleeping
---snip---sending 60 messages with each taking 1 second to handle---
2019-09-20 09:10:36,946 INFO ThreadPoolExecutor-ThreadScheduler_9: [652265204686939] got message with content: b'59'
2019-09-20 09:10:36,947 INFO ThreadPoolExecutor-ThreadScheduler_9: [652265204686939] sleeping
2019-09-20 09:10:37,948 INFO ThreadPoolExecutor-ThreadScheduler_9: [652265204686939] done sleeping
2019-09-20 09:10:37,997 INFO ThreadPoolExecutor-ThreadScheduler_6: [652265204686940] got message with content: b'60'
2019-09-20 09:10:37,997 INFO ThreadPoolExecutor-ThreadScheduler_6: [652265204686940] sleeping
2019-09-20 09:10:38,998 INFO ThreadPoolExecutor-ThreadScheduler_6: [652265204686940] done sleeping
2019-09-20 09:10:39,170 ERROR ThreadPoolExecutor-ThreadScheduler_3: [652265204686891] Duplicate message!!!
2019-09-20 09:10:39,259 ERROR ThreadPoolExecutor-ThreadScheduler_0: [652265204686892] Duplicate message!!!
2019-09-20 09:10:39,377 ERROR ThreadPoolExecutor-ThreadScheduler_5: [652265204686893] Duplicate message!!!
---snip---all together 145 duplicate messages---
2019-09-20 09:10:54,913 ERROR ThreadPoolExecutor-ThreadScheduler_8: [652265204686938] Duplicate message!!!
2019-09-20 09:10:55,011 ERROR ThreadPoolExecutor-ThreadScheduler_9: [652265204686939] Duplicate message!!!
2019-09-20 09:10:55,125 ERROR ThreadPoolExecutor-ThreadScheduler_6: [652265204686940] Duplicate message!!!
2019-09-20 09:10:59,245 INFO ThreadPoolExecutor-ThreadScheduler_7: [652265042758581] got message with content: b'1'
2019-09-20 09:10:59,245 INFO ThreadPoolExecutor-ThreadScheduler_7: [652265042758581] sleeping
2019-09-20 09:11:00,246 INFO ThreadPoolExecutor-ThreadScheduler_7: [652265042758581] done sleeping
2019-09-20 09:11:00,318 INFO ThreadPoolExecutor-ThreadScheduler_4: [652265042758582] got message with content: b'2'
2019-09-20 09:11:00,318 INFO ThreadPoolExecutor-ThreadScheduler_4: [652265042758582] sleeping
2019-09-20 09:11:01,320 INFO ThreadPoolExecutor-ThreadScheduler_4: [652265042758582] done sleeping
---snip---sending another 60 messages---
2019-09-20 09:12:00,801 INFO ThreadPoolExecutor-ThreadScheduler_9: [652265042758639] got message with content: b'59'
2019-09-20 09:12:00,801 INFO ThreadPoolExecutor-ThreadScheduler_9: [652265042758639] sleeping
2019-09-20 09:12:01,803 INFO ThreadPoolExecutor-ThreadScheduler_9: [652265042758639] done sleeping
2019-09-20 09:12:01,886 INFO ThreadPoolExecutor-ThreadScheduler_2: [652265042758640] got message with content: b'60'
2019-09-20 09:12:01,886 INFO ThreadPoolExecutor-ThreadScheduler_2: [652265042758640] sleeping
2019-09-20 09:12:02,887 INFO ThreadPoolExecutor-ThreadScheduler_2: [652265042758640] done sleeping
2019-09-20 09:12:03,077 ERROR ThreadPoolExecutor-ThreadScheduler_7: [652265042758640] Duplicate message!!!
2019-09-20 09:12:14,436 INFO ThreadPoolExecutor-ThreadScheduler_4: [652264914144087] got message with content: b'1'
2019-09-20 09:12:14,436 INFO ThreadPoolExecutor-ThreadScheduler_4: [652264914144087] sleeping
2019-09-20 09:12:15,437 INFO ThreadPoolExecutor-ThreadScheduler_4: [652264914144087] done sleeping
2019-09-20 09:12:15,609 INFO ThreadPoolExecutor-ThreadScheduler_8: [652264914144088] got message with content: b'2'
2019-09-20 09:12:15,609 INFO ThreadPoolExecutor-ThreadScheduler_8: [652264914144088] sleeping
2019-09-20 09:12:16,610 INFO ThreadPoolExecutor-ThreadScheduler_8: [652264914144088] done sleeping
---snip---sending another 60 messages---
2019-09-20 09:13:16,882 INFO ThreadPoolExecutor-ThreadScheduler_2: [652264914144145] got message with content: b'59'
2019-09-20 09:13:16,882 INFO ThreadPoolExecutor-ThreadScheduler_2: [652264914144145] sleeping
2019-09-20 09:13:17,883 INFO ThreadPoolExecutor-ThreadScheduler_2: [652264914144145] done sleeping
2019-09-20 09:13:17,948 INFO ThreadPoolExecutor-ThreadScheduler_3: [652264914144146] got message with content: b'60'
2019-09-20 09:13:17,948 INFO ThreadPoolExecutor-ThreadScheduler_3: [652264914144146] sleeping
2019-09-20 09:13:18,949 INFO ThreadPoolExecutor-ThreadScheduler_3: [652264914144146] done sleeping
2019-09-20 09:13:19,194 ERROR ThreadPoolExecutor-ThreadScheduler_4: [652264914144146] Duplicate message!!!
2019-09-20 09:13:46,466 INFO ThreadPoolExecutor-ThreadScheduler_8: [652264749734941] got message with content: b'1'
2019-09-20 09:13:46,466 INFO ThreadPoolExecutor-ThreadScheduler_8: [652264749734941] sleeping
2019-09-20 09:13:47,467 INFO ThreadPoolExecutor-ThreadScheduler_8: [652264749734941] done sleeping
2019-09-20 09:13:47,512 INFO ThreadPoolExecutor-ThreadScheduler_1: [652264749734942] got message with content: b'2'
2019-09-20 09:13:47,512 INFO ThreadPoolExecutor-ThreadScheduler_1: [652264749734942] sleeping
2019-09-20 09:13:48,513 INFO ThreadPoolExecutor-ThreadScheduler_1: [652264749734942] done sleeping
---snip---sending 100 messages---
2019-09-20 09:15:30,324 INFO ThreadPoolExecutor-ThreadScheduler_2: [652264749735038] sleeping
2019-09-20 09:15:31,325 INFO ThreadPoolExecutor-ThreadScheduler_2: [652264749735038] done sleeping
2019-09-20 09:15:31,372 INFO ThreadPoolExecutor-ThreadScheduler_3: [652264749735039] got message with content: b'99'
2019-09-20 09:15:31,372 INFO ThreadPoolExecutor-ThreadScheduler_3: [652264749735039] sleeping
2019-09-20 09:15:32,373 INFO ThreadPoolExecutor-ThreadScheduler_3: [652264749735039] done sleeping
2019-09-20 09:15:32,418 INFO ThreadPoolExecutor-ThreadScheduler_0: [652264749735040] got message with content: b'100'
2019-09-20 09:15:32,418 INFO ThreadPoolExecutor-ThreadScheduler_0: [652264749735040] sleeping
2019-09-20 09:15:33,419 INFO ThreadPoolExecutor-ThreadScheduler_0: [652264749735040] done sleeping
2019-09-20 09:15:33,573 ERROR ThreadPoolExecutor-ThreadScheduler_8: [652264749735001] Duplicate message!!!
2019-09-20 09:15:33,693 ERROR ThreadPoolExecutor-ThreadScheduler_1: [652264749735002] Duplicate message!!!
2019-09-20 09:15:33,811 ERROR ThreadPoolExecutor-ThreadScheduler_6: [652264749735003] Duplicate message!!!
---snip---all together 95 duplicate messages---
2019-09-20 09:15:43,785 ERROR ThreadPoolExecutor-ThreadScheduler_6: [652264749735038] Duplicate message!!!
2019-09-20 09:15:43,858 ERROR ThreadPoolExecutor-ThreadScheduler_7: [652264749735039] Duplicate message!!!
2019-09-20 09:15:43,938 ERROR ThreadPoolExecutor-ThreadScheduler_0: [652264749735040] Duplicate message!!!

So when I started the subscriber and sent the first 60 messages, I got 145 duplicates. When I sent another 60 messages, I got just one duplicate message. After that, I sent 100 messages and I got a high amount of duplicates again.

@plamut
Copy link
Contributor

plamut commented Sep 20, 2019

Interesting, I will try to run it today myself and see if I can reproduce it. I do not see anything apparent in the client itself, but I have not looked too deeply yet.

Update:
I was able to reproduce the reported behavior, and it's indeed unexpected. The key bit is that both messages need to be delivered in a single server response (so that one of them is temporarily put on hold into a buffer).

This can be achieved by rapidly publishing them one after another, i.e. without waiting on each individual publish future, which results in both messages being sent to the backend in the same publish batch.

@plamut plamut added priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. and removed type: question Request for information or clarification. Not an issue. labels Sep 20, 2019
@plamut
Copy link
Contributor

plamut commented Sep 20, 2019

After experimenting with various things, I was not able to pinpoint the exact source of the issue (in the client at least).

It appears that if the delay in the message handler is considerably shorter than 10 seconds, the issue is not reproducible. The probability of a duplicate message increases when the time.sleep() interval is around 10 seconds (8 - 12), and approaches one with delays longer than that.

I also tried disabling the automatic modifications of the ACK deadlines, just in case the related client's logic is flawed:

diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py
index 2b257482930..d6b1d2c54d0 100644
--- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py
+++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py
@@ -152,7 +156,10 @@ class Dispatcher(object):
         """
         ack_ids = [item.ack_id for item in items]
         seconds = [item.seconds for item in items]
-
+        #################
+        _LOGGER.debug(f"\x1b[33m(DISABLED) Modifying ACK deadline to {seconds}, ACK IDs: {ack_ids}\x1b[0m")
+        return
+        ######################
         request = types.StreamingPullRequest(
             modify_deadline_ack_ids=ack_ids, modify_deadline_seconds=seconds
         )

The result was the same, which makes me think it might be something with the subscription on the backend (the ACK deadline is set to 300 seconds), and that the deadline expected by the server is actually around 10 seconds, in my case at least.

I did some further testing and it seems that there is some dynamic timeout that seems to be changing depending on how long does it take to process a batch of messages. This timeout seems to ignore ack deadline.

I got a similar impression. The actual server-side deadline does not seem to be static. If it was really, say, 10 seconds, a sleep delay of 8 seconds should almost definitely not reproduce the issue, and at the same time a delay of 12 seconds should always reproduce it - but the outcome of the test script was not deterministic.

Labeling as a backend issue until further evidence.

@plamut plamut added the backend label Sep 20, 2019
@kamalaboulhosn
Copy link

I don't think this is a backend issue. Looking at some logs, it appears that the server is delivering the message based on its understanding of the ack deadline with no unexpected duplicates. I think there are two things at play here:

  1. The client library sets the ack deadline on the stream based on the 99th percentile ack latency. This overrides the ack deadline set on the subscription at the time it is created. By default, this deadline is 10 seconds. We do this because there is a tradeoff with ack deadlines between more time to process and delays in redelivery if a client goes down or becomes unhealthy. The client library changes the ack deadline for the streams it creates and then calls ModifyAckDeadline up to the specified max_lease_duration in the FlowControl settings.

  2. It looks like these ModifyAckDeadlines are arriving a little late, a second or two beyond the time of the deadline expiry. I suspect this means that the client library is not sending the ModifyAckDeadlines soon enough, possibly because they are buffered or something like that.

@plamut
Copy link
Contributor

plamut commented Sep 20, 2019

The client library sets the ack deadline on the stream based on the 99th percentile ack latency.

Indeed, although the issue was reproducible even with these calls disabled. However, it appears that the topic ACK deadline is overridden even before pulling the messages start, i.e. when the stream is established. This key piece of info is missing in the client debug logs and needs to be added (along with the fix for the observed behavior).

Thanks for checking the backend logs, @kamalaboulhosn !

@plamut
Copy link
Contributor

plamut commented Sep 27, 2019

@qvik-olli We have just released a new version of the PubSub client (1.0.1), which includes the fix for this issue. The ACK deadline for the messages should now more closely match the deadline set on the subscription, resulting in fewer duplicate re-deliveries.

plamut added a commit to plamut/google-cloud-python that referenced this issue Sep 30, 2019
Pulling the mesages with the streaming pull should work with the
default pubsub.subscriber role.

This commit removes the call to fetch a subscription, and replaces the
subscription's ACK deadline with a fixed deadline of 60 seconds.

That *will* re-introduce the issue googleapis#9252, but at least in a less severe
manner.
plamut added a commit to plamut/google-cloud-python that referenced this issue Sep 30, 2019
Pulling the mesages with the streaming pull should work with the
default pubsub.subscriber role.

This commit removes the call to fetch a subscription, and replaces the
subscription's ACK deadline with a fixed deadline of 60 seconds.

That *will* re-introduce the issue googleapis#9252, but at least in a less severe
manner.
plamut added a commit to plamut/google-cloud-python that referenced this issue Sep 30, 2019
Pulling the mesages with the streaming pull should work with the
default pubsub.subscriber role.

This commit removes the call to fetch a subscription, and replaces the
subscription's ACK deadline with a fixed deadline of 60 seconds.

That *will* re-introduce the issue googleapis#9252, but at least in a less severe
manner.
plamut added a commit that referenced this issue Sep 30, 2019
…on (#9360)

Pulling the mesages with the streaming pull should work with the
default pubsub.subscriber role.

This commit removes the call to fetch a subscription, and replaces the
subscription's ACK deadline with a fixed deadline of 60 seconds.

That *will* re-introduce the issue #9252, but at least in a less severe
manner.
@plamut
Copy link
Contributor

plamut commented Sep 30, 2019

@qvik-olli Unfortunately, it turned out that he the fix for this caused a different issue (#9339) which is more severe, and thus had to be reverted in PubSub 1.0.2 (released ~15 minutes ago). Fetching a subscription to read its ACK deadline requires additional permissions, and that broke some the users' applications.

For the time being the default stream ACK deadline is set to 60 seconds. Some of the messages can still time out a bit to soon and will be re-delivered, but the time window is now wider, giving the user code more time to process the received messages that exceed the FlowControl.max_messages setting.

A different fix will have to be made, although that will likely require some more substantial changes to the streaming pull core logic.

@plamut plamut reopened this Sep 30, 2019
@ghost
Copy link
Author

ghost commented Oct 1, 2019

Okay, I see. Increasing the default stream ACK deadline to 60 seconds is helpful, but I do hope that the issue will get fixed.

Our systems were experiencing this issue because we were controlling concurrency by setting max_messages to a low value (less than 10). We have now updated our code to use default max_messages and have a custom scheduler to control concurrency. This doesn't prevent the issue from occurring but it removes a lot of the duplicates we were experiencing earlier.

I'm sharing this snippet if someone else is affected by this issue. Creating custom scheduler to control concurrency:

subscriber = pubsub.SubscriberClient()
executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
# google.cloud.pubsub_v1.subscriber.scheduler.ThreadScheduler
scheduler = ThreadScheduler(executor=executor)
subscriber.subscribe(subscription_name, callback, scheduler=scheduler)

@plamut
Copy link
Contributor

plamut commented Oct 1, 2019

For CPU-intensive computations in callbacks, providing a shallower thread pool for invoking callbacks is indeed one way of limiting the concurrency (the default thread pool size is 10, FWIW).

Nevertheless, I reopened this issue until it gets a proper fix, which will probably require to auto-lease (i.e. auto-extend the ACK deadlines) all received messages, and also to decouple tracking of the current load from the number of actively leased messages.

(the load is used to decide how many messages to dispatch to the callbacks, and when to pause/resume the message stream)

@saitej09
Copy link

@plamut Is this issue resolved in the later versions? I am still facing the duplication of messages when I set the ack_dealine to the max 600 sec, I sometimes get a duplicate msg with in 10 sec of the previous msg.

@plamut
Copy link
Contributor

plamut commented Jul 20, 2021

@saitej09 How often does that happen?

By design, some duplication is actually expected. IIRC something like 0.1 % of messages being re-sent is considered normal, and applications must be prepared to handle this. Are you perhaps seeing much higher re-delivery rates?

Re: 10 seconds - that's the initial ACK deadline when a new streaming pull is started (subscriber.subscribe() call ). The client automatically keeps extending this deadline in the background, but it can of course happen that an ACK or "extend ACK deadline" request sometimes gets lots in the network, resulting in some of the messages expiring, and the server re-sends them.

@saitej09
Copy link

@plamut I have set the deadline to 600 sec by default in the subscription setting. I am not setting any value in the subscriber.subscribe() call. So will it be reset by any chance ? If not, then pubsub resending the msg before 600 sec cannot be attributed to network loss etc as I am not updating the deadline.

I am not calling any method (modify_ack_deadline) to extend the deadline currently. More than 1% of the message are being duplicated but the behavior is very random. Does it have any correlation with flow control ?

@kamalaboulhosn
Copy link

@saitej09 The ack deadline is not a guarantee that messages will not be redelivered prior to the deadline's expiration, so it is possible that no matter what, messages will be delivered before the 600 seconds passes.

@plamut Do we always start with an ack deadline of 10 seconds? If so, this is something we might want to consider changing. I think we did or were going to introduce a notion of "minimum ack deadline" in the same way we have "maximum ack deadline" which would allow one to specify the minimum amount of time sent back with modAckDeadline, including the initial one.

@plamut
Copy link
Contributor

plamut commented Jul 20, 2021

@saitej09 The library sets the initial deadline internally. Initially it uses the minimum ACK deadline, not the 600 seconds. This is to optimize for throughput, because if the subscriber crashes and is restarted, messages will not be re-delivered for full 10 minutes.

This ACK deadline is then gradually adjusted based on the history, i.e. how much time the subscriber needed to po process past messages.

@plamut
Copy link
Contributor

plamut commented Jul 20, 2021

@kamalaboulhosn Yes, we always start with the minimum deadline (when there's not histogram data yet) and then gradually adjust it.

We could change that to use a different value, though, although IIRC it is desired to start with a short deadline for potentially better throughput on average?

@kamalaboulhosn
Copy link

@plamut I think what we want to do is have a minimum and a maximum and only adjust within that range. We never want to go below the minimum specified, even if we have histogram data. We basically want the user to have control over this if they want it.

@plamut
Copy link
Contributor

plamut commented Jul 20, 2021

I think what we want to do is have a minimum and a maximum and only adjust within that range. We never want to go below the minimum specified, even if we have histogram data.

@kamalaboulhosn We already do that, but if I undrestood correctly, the user wants their modify_ack_deadline setting to be used initially, not the (effectively hard-coded) MIN_ACK_DEADLINE, which is 10 seconds?

@kamalaboulhosn
Copy link

@plamut Right, so I think we need a property in the client library for MIN_ACK_DEADLINE so that it can be controlled by the user. In the same way we have max_duration_per_lease_extension, we need min_duration_per_lease_extension.

@plamut
Copy link
Contributor

plamut commented Jul 20, 2021

Sounds right, I'll create a feature request issue in the new repo.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the Pub/Sub API. priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.
Projects
None yet
3 participants