Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Long tasks lead to duplicate messages #593

Closed
jtressle opened this issue Mar 1, 2022 · 66 comments
Closed

Long tasks lead to duplicate messages #593

jtressle opened this issue Mar 1, 2022 · 66 comments
Assignees
Labels
api: pubsub Issues related to the googleapis/python-pubsub API. type: question Request for information or clarification. Not an issue.

Comments

@jtressle
Copy link

jtressle commented Mar 1, 2022

Hi,

I'm having an issue where I have a long process (up to 80 minutes) running on a Kubernetes docker instance. The instance is running Ubuntu 20.04, and I'm using Python 3.8.10. The docker container runs a python worker script, which runs a subprocess. The subprocess is multi-threaded and can use all threads during some CPU intensive tasks.

I'm getting a lot of duplicates (about 5 to 10 duplicates). This is repeatable and probably due to the intense CPU usage. What is the correct way to handle this? Thanks in advance.

My pip versions are:

google-api-core               1.30.0
google-api-python-client      2.38.0
google-auth                   1.30.2
google-auth-httplib2          0.1.0
google-auth-oauthlib          0.4.4
google-cloud-core             1.7.0
google-cloud-firestore        2.3.4
google-cloud-pubsub           2.9.0

My worker code is similar to this:

import shutil
import argparse
import time
import json
import subprocess, shlex
from pathlib import Path
import concurrent.futures
from google.cloud import pubsub_v1

def run_command(cmd_title, cmd):
	"""Run command string as subprocess"""
	try:
		cmd_list = shlex.split(cmd)
		output = subprocess.run(cmd_list,check=True)
		print (output)
	except subprocess.CalledProcessError as e:
		print ('Error running command ',str(cmd))
		print (e.output)
		pass

	return

def worker(
	project_id,
	subscription):
	''' receives messages from a pull subscription'''

	#create subscriber
	subscriber = pubsub_v1.SubscriberClient()
	subscription_path = subscriber.subscription_path(project_id, subscription)
	flow_control = pubsub_v1.types.FlowControl(
		max_messages=1, 
		max_duration_per_lease_extension = 60,
		max_lease_duration = 7200)

	def callback(message):
		print ('Received message {} of message ID {}'.format(message, message.message_id))

		cmd = '......'
		run_command('Long Task', cmd)

		#ack message
		message.ack()
		print('Acknowledged message of message ID {}\n'.format(message.message_id))

	future = subscriber.subscribe(subscription_path, callback=callback, flow_control=flow_control)
	print('Listening for messages on {}..\n'.format(subscription_path))

	# Wrap subscriber in a 'with' block to automatically call close() when done.
	with subscriber:
		try:
			# When `timeout` is not set, result() will block indefinitely,
			# unless an exception is encountered first.
			future.result()
		except:
			print ('Exiting subscriber')
			future.cancel()

if __name__ == '__main__':
	parser = argparse.ArgumentParser(description = 'Worker process')
	parser.add_argument('-p', '--project_id',default='project_id')
	
	parser.add_argument('-s', '--subscription', default='subscription')
	args = parser.parse_args()

	#run worker
	worker(args.project_id, args.subscription)

Thanks in advance,

@product-auto-label product-auto-label bot added the api: pubsub Issues related to the googleapis/python-pubsub API. label Mar 1, 2022
@meredithslota meredithslota added the type: question Request for information or clarification. Not an issue. label Mar 1, 2022
@meredithslota
Copy link
Contributor

Thanks for filing an issue! I will see if someone has time to look through your code to see where your issue might be coming from but in the meantime, there's a relatively recently published blog post tackling this subject here that includes some options for identifying where your duplicates are coming from as well as options for resolving. https://cloud.google.com/blog/products/data-analytics/handling-duplicate-data-in-streaming-pipeline-using-pubsub-dataflow

@jtressle
Copy link
Author

jtressle commented Mar 1, 2022

@meredithslota thanks for the link! I went ahead and implemented a similar deduplication strategy. I used Firestore to monitor duplicate messages.

I forgot to mention my duplicates all have the same messageIDs so it looks like my workers are not extending the ack deadline. This could be because my subprocesses are CPU-intensive, and pubsub isn't unable to send the extension request?

@jtressle
Copy link
Author

jtressle commented Mar 2, 2022

Here are more notes and observations.

My subscription acknowledgement duration is set to 600 seconds.

I just observed a duplicate message being sent out. The original message was acknowledged 659.52 seconds after it was received. Immediately after acknowledgement (24 msec), a duplicate message was resent. Is there a case where a subscriber would send a bad ack signal back to pubsub that would cause pubsub to resend the message?

Thanks,

@jtressle
Copy link
Author

jtressle commented Mar 8, 2022

@meredithslota just following up on this to see if anyone has taken a look. I'm also happy to add logging and/or troubleshoot to see what the issue is. My main concern is that pubsub isn't behaving correctly on a machine under heavy CPU-loads. Specifically, if the lease extension fails to connect to pubsub provider, is there a retry? Or does one failed attempt immediately issue a new message? Thanks

@pradn
Copy link
Contributor

pradn commented Mar 9, 2022

The library will keep modacking the message as long as the max_lease_duration time isn't up. However, acks/modacks are best-effort and duplicates may be sent for any reason. CPS isn't optimized for such high latency acks, but should work regardless - you should be able to keep modacking until the retention duration of the subscription.

If a duplicate was given to your callback immediately after you acked the message, it's likely that that duplicate was already queued in the client library while the previous message was being processed. Your flow control settings allow just one message at a time, so this is possible.

You can try playing with setting the min_duration_per_lease_extension and max_duration_per_lease_extension fairly high, perhaps 10 minutes each, and seeing if that helps. 'min_duration_per_lease_extension` was added in the 2.10.0.

But to mitigate duplicates, you need to either use the brand new exactly-once delivery feature, in preview, or to keep a look-aside DB like you're doing with Firestore.

@jtressle
Copy link
Author

@pradn thanks for the tips. I'll test out both the min_duration_per_lease_extension and the new exactly-once feature. Will report back after running it for a bit.

@pradn
Copy link
Contributor

pradn commented Mar 30, 2022

@jtressle, have you had a chance to test with the new settings?

@jtressle
Copy link
Author

jtressle commented Apr 5, 2022

@pradn I'm about to start testing on our servers. I'll update this in the next couple of days. Thanks and sorry for the delay.

@pradn
Copy link
Contributor

pradn commented Apr 5, 2022

Ok, let us know how it goes!

@jtressle
Copy link
Author

jtressle commented Apr 6, 2022

Hi @pradn,

I ended up changing the subscriber initialization to:

subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path(project_id, subscription) flow_control = pubsub_v1.types.FlowControl( max_messages=1, min_duration_per_lease_extension = 600, max_duration_per_lease_extension = 600, max_lease_duration = 7200)

I also changed pubsub to only issue exactly one message. I did receive the following error on one of my runs. It happened after an ack(). I'll continue to test to see if it repeats.

Thanks

Error in queue callback worker: 'NoneType' object has no attribute 'set_exception' Traceback (most recent call last): File "/usr/local/lib/python3.8/dist-packages/google/cloud/pubsub_v1/subscriber/_protocol/helper_threads.py", line 118, in __call__ self._callback(items) File "/usr/local/lib/python3.8/dist-packages/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py", line 159, in dispatch_callback self.ack(ack_requests) File "/usr/local/lib/python3.8/dist-packages/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py", line 190, in ack requests_completed, requests_to_retry = self._manager.send_unary_ack( File "/usr/local/lib/python3.8/dist-packages/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py", line 615, in send_unary_ack requests_completed, requests_to_retry = _process_requests( File "/usr/local/lib/python3.8/dist-packages/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py", line 187, in _process_requests future.set_exception(exc) AttributeError: 'NoneType' object has no attribute 'set_exception' AcknowledgeError when lease-modacking a message. Traceback (most recent call last): File "/usr/local/lib/python3.8/dist-packages/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py", line 920, in _send_lease_modacks req.future.result() File "/usr/local/lib/python3.8/dist-packages/google/cloud/pubsub_v1/subscriber/futures.py", line 125, in result return super().result(timeout=timeout) File "/usr/lib/python3.8/concurrent/futures/_base.py", line 437, in result return self.__get_result() File "/usr/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result raise self._exception google.cloud.pubsub_v1.subscriber.exceptions.AcknowledgeError: None AcknowledgeStatus.INVALID_ACK_ID

@pradn
Copy link
Contributor

pradn commented Apr 6, 2022

jtressle@, this is a bug that has been fixed but hasn't been released yet. We can continue when that happens. I'll let you know.

@jtressle
Copy link
Author

jtressle commented Apr 6, 2022

@pradn, what is the correct way in handling this so it doesn't kill the subscriber and instead just issues a warning? My code is above. I'm using PubSub in Kubernetes workers, and this bug kills my workers, and therefore stops my pipeline.

If we can't ignore the error, is this an issue with the exactly-one-message implementation? Or with the min_duration_per_lease_extension? I can roll-back my changes until the fix is released, or until there's a branch I can test.

Thanks for your help.

@pradn
Copy link
Contributor

pradn commented Apr 6, 2022

We released the 2.12.0 version just now. Please try with that version. This will fix the bug in #593 (comment).

Did you enable exactly-once delivery on your subscription?

@jtressle
Copy link
Author

jtressle commented Apr 7, 2022

@pradn, I just started testing now and will update here after the weekend.

I set min_duration_per_lease_extension and max_duration_per_lease_extension to 600 and enabled the exactly-once delivery on each of my subscribers. I also disabled my database duplicate check to ensure we can catch the duplicates.

So far, I've had none of the NoneType issues with version 2.12.0.

Thanks for your help.

@jtressle
Copy link
Author

jtressle commented Apr 8, 2022

@pradn I wanted to report back. Version 2.12.0 reduced the number of NoneType errors, but I did get three exact errors. They occurred at the same place. The frequency has been reduced.

Error:
File "/usr/local/lib/python3.8/dist-packages/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py", line 971, in _send_lease_modacks req.future.result() File "/usr/local/lib/python3.8/dist-packages/google/cloud/pubsub_v1/subscriber/futures.py", line 125, in result return super().result(timeout=timeout) File "/usr/lib/python3.8/concurrent/futures/_base.py", line 437, in result return self.__get_result() File "/usr/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result raise self._exception google.cloud.pubsub_v1.subscriber.exceptions.AcknowledgeError: None AcknowledgeStatus.INVALID_ACK_ID

My python packages are:

google-api-core 2.7.1 google-api-python-client 2.43.0 google-auth 2.6.3 google-auth-httplib2 0.1.0 google-cloud-appengine-logging 1.1.1 google-cloud-audit-log 0.2.0 google-cloud-core 2.2.3 google-cloud-firestore 2.4.0 google-cloud-logging 3.0.0 google-cloud-pubsub 2.12.0 google-cloud-storage 2.2.1 google-crc32c 1.3.0 google-resumable-media 2.3.2 googleapis-common-protos 1.56.0 grpc-google-iam-v1 0.12.4

I'm also still getting the same number of duplicates as I had before turning on the exactly-once delivery. I had one message run 3 times and another message run 5 times. The min and max lease extensions were set to 600, but I did see one message was sent twice 428 seconds apart. Is there anything else I can change?

Also, is there a configuration change, or a earlier pubsub version I can use to isolate the errors above? I have another pub-sub system that has been running for years without this issue, but those tasks are a lot shorter. The pubsub errors unfortunate error out my long-running tasks and also making the Kubernete workers non-responsive because pubsub stops subscribing.

Thanks for your help,
Jan-Michael

@jtressle
Copy link
Author

Hi @pradn,

I wanted to update you regarding this issue. After disabling the exactly once delivery, all the "AcknowledgeStatus.INVALID_ACK_ID" issues have gone away. Is this expected?

Is there a way to have my subscriber immediately ACK the message, and then in a closure block perform my work. Once my work is done, I'll turn on the subscriber to wait for the next message. This will alleviate the issue of running pubsub with other CPU intensive tasks, and also running long-running tasks.

Thanks and much appreciated,
Jan-Michael

@acocuzzo
Copy link
Contributor

acocuzzo commented Apr 11, 2022

Hi @jtressle

Please take a look at the sample EOD code here: https://cloud.google.com/pubsub/docs/samples/pubsub-subscriber-exactly-once
In particular, note that with EOD, the acking behavior should be changed to:

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    print(f"Received {message}.")

    # Use `ack_with_response()` instead of `ack()` to get a future that tracks
    # the result of the acknowledge call. When exactly-once delivery is enabled
    # on the subscription, the message is guaranteed to not be delivered again
    # if the ack future succeeds.
    ack_future = message.ack_with_response()

    try:
        # Block on result of acknowledge call.
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        ack_future.result(timeout=timeout)
        print(f"Ack for message {message.message_id} successful.")
    except sub_exceptions.AcknowledgeError as e:
        print(
            f"Ack for message {message.message_id} failed with error: {e.error_code}"
        )

I believe this should help you with gracefully handling INVALID_ACK_ID errors.

@pradn
Copy link
Contributor

pradn commented Apr 12, 2022

Some more context around why we want you to use ack_with_response instead of ack. When exactly-once delivery is enabled on a subscription, we return more errors to the user for ack/modack. The reason is you need to know whether your ack/modack when through or not. If you see that it succeeded, you are guaranteed that you wont get your message again. If you see that it failed, you need to decide what to do with that message. You may want to retry the ack/modack or do something else. But, in the case of failure, the message may be redelivered. Since the actual impact of an ack/modack failure depends on the impact of a re-delivery, we require you to make that choice. Only you know what to do in this case, per your usage of "exactly-once delivery".

@jtressle
Copy link
Author

@pradn @acocuzzo thanks for this. I've updated my servers and will run this over the next couple of days. Initial tests have been good. I'll update here afterwards.

@jtressle
Copy link
Author

jtressle commented Apr 19, 2022

UPDATE:

I'm doing further testing and I've received 11 INVALID_ACK_ID errors in the last 20 minutes. It's happening on my largest instances which has the most intensive CPU load (all threads). The errors are all basically this:

Traceback (most recent call last): File "/usr/local/lib/python3.8/dist-packages/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py", line 971, in _send_lease_modacks req.future.result() File "/usr/local/lib/python3.8/dist-packages/google/cloud/pubsub_v1/subscriber/futures.py", line 125, in result return super().result(timeout=timeout) File "/usr/lib/python3.8/concurrent/futures/_base.py", line 437, in result return self.__get_result() File "/usr/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result raise self._exception google.cloud.pubsub_v1.subscriber.exceptions.AcknowledgeError: None AcknowledgeStatus.INVALID_ACK_ID

ORIGINAL:
@pradn @acocuzzo I tested the ack_with_response and the workers were able to recover from a few INVALID_ACK_ID errors I received.

Many of the errors happen while I'm processing data, which returned back errors to our users. The reason for this is processing of the data is self-contained in the callback function. For context, I have another app that has been using Pub-Sub for years without issue in a GKE environment. The Pub-Sub failure rate is basically 0% as I have the exactly once delivery feature disabled. So I haven't had to add communication between Pub-Sub and my other processes.

I have a few questions:

  1. You stated that in the case of failure, the message may be redelivered? What dictates whether a message retries or not? Is it only if that message is dead-lettered or not?

  2. Is there a way to not stop the callback when we receive these errors? If we can't ignore these errors, the best solution might be to have the subscriber immediately return an ACK, then have it go off-line until processing has completed. The subscriber can go back online after processing has finished.

Thanks

@jtressle
Copy link
Author

jtressle commented Apr 20, 2022

@pradn @acocuzzo I wanted to provide more insight based on my testing. When running on a compute instance, I see no INVALID_ACK_ID errors. However, when I run these same workers on GKE, I see INVALID_ACK_ID issues every minute or so.

I've updated my subscriber to ignore the invalid errors, but I'm getting multiple retries and multiple runs of the same message.

Next steps are to verify GKE resources are configured correctly. If they are, I'll go back to my original configuration and work on starting/stopping Pubsub.

Thanks,

@pradn
Copy link
Contributor

pradn commented May 2, 2022

In the update in this comment are you saying that your program crashes when this error occurs? If you look in the code, we call future.result() and then catch the resulting exception (an AcknowledgeError). When caught, the exception just logs the error. So if your program is crashing / seeing an unexpected exception bubble up, it wasn't the intention and should be investigated.

You stated that in the case of failure, the message may be redelivered? What dictates whether a message retries or not? Is it only if that message is dead-lettered or not?

Cloud Pub/Sub by default always delivers messages until they're acked. An ack is best-effort, so the system might lose the ack. So then you get the message again. Moreover, if a message isn't acked within the ack deadline, an ack expiration and re-delivery occur. Note your ack might race with the ack expiration time. Ack status for a message is stored in memory and periodically synced to disk on the server-side, so our server restarts may lose message ack statuses. So, there's several reasons for re-delivery.

When a DLQ is enabled, we keep failure counts for when ack expirations occur and explicit nacks are sent for a message. If the failure count goes above a threshold, we move to to the DLQ topic. IF a DLQ is not enabled, we keep re-delivering messages until the message expires (based on the subscription settings, defaulting to 7 days).

Is there a way to not stop the callback when we receive these errors? If we can't ignore these errors, the best solution might be to have the subscriber immediately return an ACK, then have it go off-line until processing has completed. The subscriber can go back online after processing has finished.

It's up to you what you do with the exceptions returned by ack_with_response. You may choose to catch them and keep the processing going. The exception will only occur when you call the result function.

I wanted to provide more insight based on my testing. When running on a compute instance, I see no INVALID_ACK_ID errors. However, when I run these same workers on GKE, I see INVALID_ACK_ID issues every minute or so.

It's possible there's environmental differences that account for that - maybe network latency. But you need to be able to deal with them occurring, in any environment.

I've updated my subscriber to ignore the invalid errors, but I'm getting multiple retries and multiple runs of the same message.

This is expected. The "exactly-once" guarantee is that if you successfully ack the message, it wont come back. These invalid ack errors indicate an unsuccessful ack.

Next steps are to verify GKE resources are configured correctly. If they are, I'll go back to my original configuration and work on starting/stopping Pubsub.

Ok, sounds good. Keep us posted.

@jtressle
Copy link
Author

jtressle commented May 3, 2022

@pradn please see below:

In the update in #593 (comment) are you saying that your program crashes when this error occurs? If you look in the code, we call future.result() and then catch the resulting exception (an AcknowledgeError). When caught, the exception just logs the error. So if your program is crashing / seeing an unexpected exception bubble up, it wasn't the intention and should be investigated.

What happens when I get the AcknowledgeError is that the callback completes, causing my long-running processes to end prematurely. Below is the code I'm using:

`def worker(
project_id,
subscription):

#create subscriber
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription)
flow_control = pubsub_v1.types.FlowControl(
	max_messages=1, 
	max_duration_per_lease_extension = 60,
	max_lease_duration = 7200)

def callback(message):
	print ('Received message {} of message ID {}'.format(message, message.message_id))

	cmd = '......'
	run_command('Long Task', cmd)

	#ack message
	ack_future = message.ack_with_response()

	try:
		# Block on result of acknowledge call.
		# When `timeout` is not set, result() will block indefinitely,
		# unless an exception is encountered first.
		ack_future.result()
		print(f"Ack for message {message.message_id} successful.")
	except sub_exceptions.AcknowledgeError as e:
		print(f"Ack for message {message.message_id} failed with error: {e.error_code}")

future = subscriber.subscribe(subscription_path, callback=callback, flow_control=flow_control)
print('Listening for messages on {}..\n'.format(subscription_path))

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
	try:
		# When `timeout` is not set, result() will block indefinitely,
		# unless an exception is encountered first.
		future.result()
	except:
		print ('Exiting subscriber')
		future.cancel()`

I updated my code to what was specified just above. Is this the correct way of handling the warning without stopping the callback?

Note your ack might race with the ack expiration time. Ack status for a message is stored in memory and periodically synced to disk on the server-side, so our server restarts may lose message ack statuses. So, there's several reasons for re-delivery.

My subscriber is set as:

subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path(project_id, subscription) flow_control = pubsub_v1.types.FlowControl( max_messages=1, min_duration_per_lease_extension = 600, max_duration_per_lease_extension = 600, max_lease_duration = 7200)

My understanding is I should never get a failure or warning anytime before 600 seconds and should hold the lease until 7200. However, I'm getting duplicates (when DLQ is not enabled) and AcknowledgeError warnings way before 7200 seconds and before the minimum time of 600 seconds. How can I configure PubSub to adhere to these times?

It's up to you what you do with the exceptions returned by ack_with_response. You may choose to catch them and keep the processing going. The exception will only occur when you call the result function.

Can you please provide example code on how to do this? The code I presented above ends the callback when I get the AcknowledgeError.

It's possible there's environmental differences that account for that - maybe network latency. But you need to be able to deal with them occurring, in any environment.

How should one go about handling the network latency? If my lease extensions are 600 seconds, shouldn't this be sufficient for PubSub to retry connections?

Thanks

@pradn
Copy link
Contributor

pradn commented May 5, 2022

I updated my code to what was specified just above. Is this the correct way of handling the warning without stopping the callback?

Yes, you should be calling ack_future.result() in the try block. Your code looks as expected.

My understanding is I should never get a failure or warning anytime before 600 seconds and should hold the lease until 7200.

It's possible to have a problem with the background modacking process. In your case, the background leaser would send a modack every 600 seconds up to 7200 seconds. If there's a modack error, we log the error. So when your code finishes working and calls ack_with_response for that message, that ack might have expired. There's not really a way around this since acks/modacks may actually fail when exactly-once is enabled. But at least you do know about this and can re-try the work when the message is re-delivered. I realize this weakens the appearance of the "exactly-once" property, but that's the best we can do given that failures are a fact in distributed systems.

Can you please provide example code on how to do this? The code I presented above ends the callback when I get the AcknowledgeError.

There isn't any code after the except catch block, so the callback should end, yes? Are you saying the program itself stops or the scheduler/thread-pool stops calling the callback for subsequent messages?

How should one go about handling the network latency? If my lease extensions are 600 seconds, shouldn't this be sufficient for PubSub to retry connections?

I think setting a long lease timeout is the best you can do. I can't think of other mitigations.

@ramirek
Copy link

ramirek commented Jun 8, 2022

Hi,
Is there any updates about this issue? I'm facing the same problem. A large amount of messages are redelivered due to the INVALID_ACK_ID error. I've also enabled the "exactly-once" feature and using the 2.12.0 version.
This is problematic because we have processes that can take hours. We don't want to receive the same message multiple times and waste time on recomputing things.
Is there any workaround to force acknowledging the message?
Thank you 🙏

@jtressle
Copy link
Author

@ramirek I never got it to work properly. What I'm currently doing is turning off "exactly-once", living with the duplicates and handling them by logging messages in a database.

I think the best solution would be to have pubsub immediately return an ACK, and then take that worker offline until the process has completed.

@hjtran
Copy link

hjtran commented Jun 24, 2022

Hi all,

I think this may be an issue we're also dealing with. We take messages and run computations on batches of them before acking them. The batch calculation can be cpu-heavy and in the exact example I've been testing with takes about 9 minutes. We're getting tons of redeliveries.

The exact test I'm running is using 127 clients with a subscription that has a backlog of 1M messages. The clients process message at about 3 messages per second and run for an hour.

With this setup, I'm consistently seeing acks get lost (the workers record the message_id of messages before calling msg.ack(). I compare these message_ids with the messages remaining in the queue) 50%+.

This is adding a ton of required compute to get through a subscription and even when the clients run until they can't pull any more messages, messages get leftover anyways.

Is this issue well-understood at all? Unfortunately we really can't use PubSub with our application like this. Happy to give more debugging details, I've turned on the PubSub client libraries logging along with timestamps so there's quite a lot of info to sift through. The modacks seems to be going out at the right time and there's no info reported about the AckRequests so I'm really not sure where the issue might lie.

@YungChunLu
Copy link

@acocuzzo Our team has upgraded the version to be v2.13.6 this week, but we do not see any ack/modack failure rates decrease. We also have forwarded more detail through the support ticket.

@jtressle
Copy link
Author

@acocuzzo 2.13.6 has not improved the duplicates issue or the ACK retries. Two questions:

(1) is it possible for your team to reproduce the error by running a long-running CPU intensive task on a Google Instance? Can something like this be added to the release tests?

(2) Is there a recommended way to turn on/off a pub-sub worker? In our case, we don't need all the leasing logic that is causing this issue. I rather receive a message, immediately ACK that message, turn OFF Pub-sub on the worker to prevent pulling any more messages, have the worker run the long-running task, and then turn ON Pub-sub for the next message.

Thanks.

@acocuzzo
Copy link
Contributor

Hi @jtressle

Thanks for checking on the new version.

(1) I am currently working on a reproduction with processes of 1-2 hours, it would be helpful to know the specific memory/CPU usage for a proper reproduction. I'm not sure we could add this to the release tests but I can investigate what our options are.

(2) If you don't want the leasing logic, you can try using SynchronousPull instead of StreamingPull aka AsynchronousPull, please see the docs for reference: https://cloud.google.com/pubsub/docs/pull#synchronous_pull

One thing I would caution for SynchronousPull is that it is necessary to "overpull" meaning you should send many more pull requests than you might expect, as they will often return an empty response or fewer messages than requested rather than waiting for all of the messages requested. This is done to lower delivery latency. For example: https://medium.com/javarevisited/gcp-how-to-achieve-high-performance-synchronous-pull-with-pub-sub-and-spring-boot-12cb220c4d65

@jtressle
Copy link
Author

Hi @acocuzzo,

This is excellent news! I've noticed it on both our servers that run longer processes. The servers in question are c2-standard-4 and c2-standard-30. The larger one runs a longer process, and has more frequent duplicates. For both servers, the cpu usage is basically 100% on all cores. One thing to note is that both servers are running in a GKE docker configuration, and I've set aside about 10% of the CPU for GKE related tasks.

Thanks for the pull documentation. This might be a good alternative. I'll test implementing it and will report back.

Thanks.

@acocuzzo
Copy link
Contributor

acocuzzo commented Sep 23, 2022

@jtressle @YungChunLu @hjtran @ramirek
Through my 2hr+ testing I discovered a bug related to flow control buffering, which I've added logic to account for. If you are using exactly once delivery, please update to google-cloud-pubsub v2.13.7, it should reduce any INVALID_ACK_ID error logs to once per redelivered message, and prevent duplicates from being delivered to the user's specified callback. Please let me know if this fix helps!

@hjtran
Copy link

hjtran commented Sep 23, 2022

@acocuzzo
That's exciting news. Unfortunately my application does not use eod. We tried some tests with it yesterday but the throughput of messages sent to subscribers was too little for our application and relative to non-eod subscription.

@acocuzzo
Copy link
Contributor

@hjtran Unfortunately without eod the INVALID_ACK_ID errors are not surfaced for us to check.
I am looking in the underlying reasons for this issue with flow control (which are either server side or in the grpc layer), and hopefully I'll be able to get some answers there.

@hjtran
Copy link

hjtran commented Sep 23, 2022

Thanks @acocuzzo! I appreciate the help. FWIW, I've also been talking with our GCP account manager about this for a few months in case you want to be linked to that, we've gone back and forth trying to debug this for a while. If that's helpful, I can include you on the email chain.

@Gwojda
Copy link

Gwojda commented Oct 25, 2022

Hi,
Facing the same problem since 2 months, any news ?
Thanks,
Gautier

@hjtran
Copy link

hjtran commented Oct 25, 2022

I'm still facing the same issues, so no news I don't think. There have been a couple new releases for the python client which I was advised to upgrade and rerun my load tests but I'm still seeing lots of expired acks

@Gwojda
Copy link

Gwojda commented Oct 25, 2022

Exactly the same problem. I never faced the problem before switching to "exactly once" delivery system.

@hjtran
Copy link

hjtran commented Oct 25, 2022

Ah I see the issue without exactly-once. I'm about to run a LoadTest with exactly-once to see if that fixes anything

@Gwojda
Copy link

Gwojda commented Oct 25, 2022

Oh, interesting. I was sure that it was because of this option that this error had arrived on our application.

@hjtran
Copy link

hjtran commented Oct 25, 2022

@acocuzzo any developments?

@acocuzzo
Copy link
Contributor

@Gwojda I would highly recommend reaching out via Cloud support for help on your individual issue, as causes differ case to case.
The latest version of the client library should help (as a mitigation) if you have EOD enabled, and you have high message latency and low flow-control settings.
The remaining mitigations and investigation would have to go through Cloud support as the causes differ between cases.

@Gwojda
Copy link

Gwojda commented Oct 26, 2022

Hi,
I don't think the causes differ, I have the impression that we are all facing the same problem. (edit : except @hjtran who's not using EOD. But @jtressle is facing exactly the same problem as me.)
Impossible for me to contact the gcp support service which is paid and which we do not have in our subscription. Anyone else concerned can escalate the issue?

@acocuzzo
Copy link
Contributor

@Gwojda Can you please clarify your issue (since @jtressle had different issues at different times), and are you on the latest client library version? Thanks

@jtressle
Copy link
Author

jtressle commented Oct 30, 2022

@Gwojda I would highly recommend reaching out via Cloud support for help on your individual issue, as causes differ case to case.
The latest version of the client library should help (as a mitigation) if you have EOD enabled, and you have high message latency and low flow-control settings.
The remaining mitigations and investigation would have to go through Cloud support as the causes differ between cases.

@acocuzzo I initially submitted a ticket with cloud support right after opening this ticket. Their recommendation was to seek support through this ticket, as Pub/Sub was out of their expertise.

Regarding issues, I have two that are both related to long-running tasks. With EOD, I get the INVALID_ACK_ID issue. With EOD turned OFF, I get duplicate messages being sent before the lease is done. Both these issues prohibit me from running PubSub reliably. At the end, I'm using a database to track duplicate messages, which works except in the case of some race conditions.

First, I think a solution where we can run EOD and a simplified leasing mechanism. The current leasing logic fails and is placing optimization over reliability. If INVALID_ACK_ID is given only for past expired messages, why are any messages expiring before my timeout of 7200 seconds? Alternatively, please just provide a mechanism where we can ignore ALL INVALID_ACK_ID messages.

I've yet to implement your SynchronizedPull recommendation, but I believe this may be the best solution for my circumstance (long-running tasks, and one message at a time).

Thanks,

@Gwojda
Copy link

Gwojda commented Oct 31, 2022

Regarding issues, I have two that are both related to long-running tasks. With EOD, I get the INVALID_ACK_ID issue. With EOD turned OFF, I get duplicate messages being sent before the lease is done. Both these issues prohibit me from running PubSub reliably. At the end, I'm using a database to track duplicate messages, which works except in the case of some race conditions.
=> Facing the same problem. I tried too a solution with SynchronizedPull, but it's realy dirty and slower for my use case. So, for now, i just ignore INVALID_ACK_ID and i'm living with them.
Ah, and it's look like INVALID_ACK_ID appears when i need to scale up. (many microservices are subscribe to same subscriber.)
Thanks,

@jtressle
Copy link
Author

jtressle commented Nov 3, 2022

@Gwojda how are you ignoring the INVALID_ACK_ID? In my case, it disrupted my process I was running.

Thanks

@acocuzzo
Copy link
Contributor

acocuzzo commented Nov 11, 2022

@Gwojda @jtressle @hjtran @YungChunLu
There is a new library release which I think should reduce some of the leasing time:
https://github.com/googleapis/python-pubsub/releases/tag/v2.13.11

In particular, this should reduce:

  1. With and without exactly once delivery:
    Reduces some of the leasing time for subscriber clients with a high number of max_messages (as leasing time is related to the # of messages being leased). If you are still experiencing duplicates and ack expirations with a high number of max_messages, please try reducing your max_messages and increasing your subscriber clients.

  2. With Exactly once delivery:
    Reduces the number of "invalid_ack_id" warnings which occur per expired message to 1 . This is because when a message has expired, we drop it from leasing (since subsequence modacks will also fail). This should reduce logging noise. Each warning should now correspond to only 1 expired message.

Currently, the leasing behavior in the library still creates new threads, but not new processes, as communication is needed between threads, and therefore we have asynchronous behavior, but not concurrency. In order to optimize the leasing further, we will need to change the behavior significantly in order to allow for concurrency, but this fix will require more significant design changes. In particular, we use the python grpc library, so we are limited in our ability to parallelize calls to the service within the same client.

If your subscription requires high throughput, I either recommend (1) increasing the number of clients and reducing flow control settings or (2) switching to a library that offers thread parallelism
Please see this article on maximizing streaming performance with Pub/Sub: https://cloud.google.com/blog/products/data-analytics/testing-cloud-pubsub-clients-to-maximize-streaming-performance

Thanks everyone for their input and patience.

@acocuzzo
Copy link
Contributor

acocuzzo commented Nov 11, 2022

@jtressle
In order to prevent "INVALID_ACK_ID" exceptions from interrupting your client, please use a callback similar to:


    def callback(message: pubsub_v1.subscriber.message.Message) -> None:
        try:
            ack_future = message.ack_with_response()
            ack_future.result(timeout=30)
            logger.info("Succeed to ack the message.")
        except sub_exceptions.AcknowledgeError as e:
            logger.error(
                f"Failed to ack the message. {message.message_id}, {e.error_code}, {e.response}"
            )
     ```
            

@YungChunLu
Copy link

@Gwojda @jtressle @hjtran @YungChunLu There is a new library release which I think should reduce some of the leasing time: https://github.com/googleapis/python-pubsub/releases/tag/v2.13.11

In particular, this should reduce:

  1. With and without exactly once delivery:
    Reduces some of the leasing time for subscriber clients with a high number of max_messages (as leasing time is related to the # of messages being leased). If you are still experiencing duplicates and ack expirations with a high number of max_messages, please try reducing your max_messages and increasing your subscriber clients.
  2. With Exactly once delivery:
    Reduces the number of "invalid_ack_id" warnings which occur per expired message to 1 . This is because when a message has expired, we drop it from leasing (since subsequence modacks will also fail). This should reduce logging noise. Each warning should now correspond to only 1 expired message.

Currently, the leasing behavior in the library still creates new threads, but not new processes, as communication is needed between threads, and therefore we have asynchronous behavior, but not concurrency. In order to optimize the leasing further, we will need to change the behavior significantly in order to allow for concurrency, but this fix will require more significant design changes. In particular, we use the python grpc library, so we are limited in our ability to parallelize calls to the service within the same client.

If your subscription requires high throughput, I either recommend (1) increasing the number of clients and reducing flow control settings or (2) switching to a library that offers thread parallelism Please see this article on maximizing streaming performance with Pub/Sub: https://cloud.google.com/blog/products/data-analytics/testing-cloud-pubsub-clients-to-maximize-streaming-performance

Thanks everyone for their input and patience.

Thanks for the detailed explaination. Will give the new client a try

@acocuzzo acocuzzo added the status: blocked Resolving the issue is dependent on other work. label Jul 11, 2023
@acocuzzo
Copy link
Contributor

Adding a blocked tag pending some exactly-once delivery server-side changes.

@acocuzzo
Copy link
Contributor

Closing this bug as many of the issues related have been fixed, both client and server-side:
Including:

  1. Expired messages delivered to callbacks due to low flow control settings and long processing times. This is fixed for exactly once delivery (since there is no way without exactly once delivery to verify the message is not expired before sending it to the callback.)
  2. Excessive warning logs for expired messages with EOD
  3. Server-side issues which led to messages being redelivered before expiration
  4. Some inefficiencies in client library leasing behavior

Remaining issues:

  1. The remaining issues we sometimes see with client-side behavior are due to processing which prevents the library from leasing messages due to thread starvation. This is not due to the library itself, but rather limitations of the python threading model which allows concurrency but not parallelism.
    If you have questions about improving processing please see the following resources on python concurrency/parallelism/GIL:
  1. If you have exactly-once delivery enabled, your processing times are low, and you are still receiving duplicates, please file a Cloud support request. In that ticket it can be determined whether this starvation is occurring + preventing the library from leasing messages, or if the issue originates from the server.

@acocuzzo acocuzzo removed the status: blocked Resolving the issue is dependent on other work. label Jul 13, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the googleapis/python-pubsub API. type: question Request for information or clarification. Not an issue.
Projects
None yet
Development

No branches or pull requests

8 participants