Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Azure Service Bus SDK - long running process and failed lock renewal process/closed handler #35717

Closed
gpiotrowski opened this issue May 21, 2024 · 19 comments · Fixed by #35889
Closed
Assignees
Labels
Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. issue-addressed Workflow: The Azure SDK team believes it to be addressed and ready to close. Messaging Messaging crew question The issue doesn't require a change to the product in order to be resolved. Most issues start as that Service Bus

Comments

@gpiotrowski
Copy link

  • Package Name: azure-servicebus
  • Package Version: 7.12.2
  • Operating System: Windows, Linux
  • Python Version: 3.9.13

Describe the bug
My message consumer is a long running process. From time to time it have a problem with renewing message lock and after that with completing/rejecting that message. I started to investigate possible reasons for that and this is my findings:

  1. It happens when consumer take very long time to process the message (during my tests it was cases that take 45 minutes and 75 minutes)
  2. Log from lock renewal:
INFO - AMQP error occurred: (None), condition: (b'amqp:unauthorized-access'), description: ('Message lock renew failed. b"Unauthorized access. \'Listen\' claim(s) are required to perform this operation. Resource: \'sb://{namespace}.servicebus.windows.net/{topic}/subscriptions/{subscription}/$management\'. TrackingId:b700e3ac-fca7-42bf-b21f-bb9370b30651_G52, SystemTracker:NoSystemTracker, Timestamp:2024-05-10T07:02:12".').
[...]
DEBUG - Failed to auto-renew lock: ServiceBusAuthorizationError('Message lock renew failed. b"Unauthorized access. \'Listen\' claim(s) are required to perform this operation. Resource: \'sb://{namespace}.servicebus.windows.net/{topic}/subscriptions/{subscription}/$management\'. TrackingId:c9f7ef23-d8f1-42e4-80a8-dd1ec074f1fc_G55, SystemTracker:NoSystemTracker, Timestamp:2024-05-20T07:58:41". Error condition: amqp:unauthorized-access. Status Code: 401.'). Closing thread.
  1. After that when message is processed and service try to complete it I receive following error:
ERROR - Error during processing message: {"message": "Test message 5"}. Error: Failed to complete the message as the handler has already been shutdown.Please use ServiceBusClient to create a new instance.
Traceback (most recent call last):
  File "\sample.py", line 32, in run_consumer
    subscription_receiver.complete_message(msg)
  File "\.venv\lib\site-packages\azure\servicebus\_servicebus_receiver.py", line 879, in complete_message
    self._settle_message_with_retry(message, MESSAGE_COMPLETE)
  File "\.venv\lib\site-packages\azure\servicebus\_servicebus_receiver.py", line 498, in _settle_message_with_retry
    self._check_message_alive(message, settle_operation)
  File "\.venv\lib\site-packages\azure\servicebus\_common\receiver_mixins.py", line 103, in _check_message_alive
    raise ValueError(
ValueError: Failed to complete the message as the handler has already been shutdown.Please use ServiceBusClient to create a new instance.
  1. I tested that both by creating client from connection string (ServiceBusClient.from_connection_string(conn_str=connection_string)) and azure-identity library (ServiceBusClient(host_name, credential=DefaultAzureCredential()) - in both cases that error appear
  2. My tests cover various combinations of possible ways of implementing message consumers with various configurations:
    • sync code (with sleep())
    • async code using asyncio with await asyncio.sleep() and sleep()
    • Service Bus lock time set to 1, 3 and 5 minutes
    • Various time of simulated consumer work (5, 10, 15, 30, 45, 60, 75 minutes) + random time
  3. When I use consumer that wait 45 minutes it fail during processing second message. For 75 minutes I could reproduce that behavior each time app is trying to process the message. What is worth to mention is that sleep for 60 minutes works just fine even if running for several hours. For consumer processing 30 minutes or lower everything works just fine
  4. During testing it on random sleep time (which is more likely what is happening in my real application) I face that problem from time to time. This is how it looks like on sample test run:
    • First message took 1094 seconds (~18 minutes). Completed successfully
    • Second message took 1493 seconds (~25 minutes). Completed successfully
    • Third message took 1700 seconds (~28 minutes). During processing of that message I receive the error described in this issue.
  5. First logs related to error messages appear around 1h after consumer starts processing that long running operation (for example consumer starts processing message at 13:43:21,774. Message lock renewal failed at 14:48:45,475 with information about unauthorized access. This suggest that it happens when token is expired during that long running process. Previous point looks very similar when we sum the times. First and second message took in total 43 minutes. Next message took 28 minutes - during that time token probably expired.

To Reproduce
Steps to reproduce the behavior:

  1. Run following code
import logging
from time import sleep
from azure.servicebus import ServiceBusClient, ServiceBusReceivedMessage, AutoLockRenewer
from azure.servicebus.exceptions import ServiceBusError
import logging

def run_consumer(connection_string: str, topic_name: str, subscription_name: str, processing_time_seconds: int):
    try:
        servicebus_client = ServiceBusClient.from_connection_string(conn_str=connection_string)
                    
        with servicebus_client:
            renewer = AutoLockRenewer(max_lock_renewal_duration=60000, on_lock_renew_failure=_lock_renew_failed)

            subscription_receiver = servicebus_client.get_subscription_receiver(topic_name=topic_name, subscription_name=subscription_name, auto_lock_renewer=renewer)
            
            with subscription_receiver:
                while True:
                    received_msgs: list[ServiceBusReceivedMessage] = subscription_receiver.receive_messages(max_wait_time=30, max_message_count=1)
                    
                    for msg in received_msgs:
                        try:
                            logging.info(f"Received message: {msg}")
                            sleep(processing_time_seconds)
                            logging.info(f"Processed message: {msg}")

                            subscription_receiver.complete_message(msg)
                        except Exception as e:
                            logging.exception(f"Error during processing message: {msg}. Error: {e}")
                            try:
                                subscription_receiver.abandon_message(msg)
                            except ServiceBusError as service_bus_exception:
                                logging.exception(f"Error during abandoning message: {msg}. Error: {service_bus_exception}")
                                raise
    except Exception as exception:
        logging.exception(f"Consumer for {topic_name}/{subscription_name} failed. Error: {exception}")

def _lock_renew_failed(msg, exception):
    logging.error(f"Failed to renew message lock. Message: {msg}. Error: {exception}")

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')

connection_string='Endpoint=sb://{namespace}.servicebus.windows.net/;SharedAccessKeyName={key_name};SharedAccessKey={key};EntityPath={entity_path}'
topic_name = ''
subscription_name= ''
processing_time_minutes = 75

run_consumer(connection_string, topic_name, subscription_name, processing_time_minutes*60)
  1. Wait around 1h, after that time you should see in logs something like this:
INFO - AMQP error occurred: (None), condition: (b'amqp:unauthorized-access'), description: ('Message lock renew failed. b"Unauthorized access. \'Listen\' claim(s) are required to perform this operation. Resource: \'sb://{namespace}.servicebus.windows.net/{topic}/subscriptions/{subscription}/$management\'. TrackingId:b2e0e289-4788-4985-8119-35b8d7f69845_G28, SystemTracker:NoSystemTracker, Timestamp:2024-05-15T12:48:44".').
[...]
DEBUG - Failed to auto-renew lock: ServiceBusAuthorizationError('Message lock renew failed. b"Unauthorized access. \'Listen\' claim(s) are required to perform this operation. Resource: \'sb://{namespace}.servicebus.windows.net/{topic}/subscriptions/{subscription}/$management\'. TrackingId:b2e0e289-4788-4985-8119-35b8d7f69845_G28, SystemTracker:NoSystemTracker, Timestamp:2024-05-15T12:48:44". Error condition: amqp:unauthorized-access. Status Code: 401.'). Closing thread.
ERROR - Failed to renew message lock. Message: {"message": "Test message 0"}. Error: Failed to auto-renew lock
  1. After another 15 minutes, when message will be processed, you could observe following logs:
INFO - Processed message: {"message": "Test message 0"}
ERROR - Error during processing message: {"message": "Test message 0"}. Error: Failed to complete the message as the handler has already been shutdown.Please use ServiceBusClient to create a new instance.
Traceback (most recent call last):
  File "\sample.py", line 32, in run_consumer
    subscription_receiver.complete_message(msg)
  File "\.venv\lib\site-packages\azure\servicebus\_servicebus_receiver.py", line 879, in complete_message
    self._settle_message_with_retry(message, MESSAGE_COMPLETE)
  File "\.venv\lib\site-packages\azure\servicebus\_servicebus_receiver.py", line 498, in _settle_message_with_retry
    self._check_message_alive(message, settle_operation)
  File "\.venv\lib\site-packages\azure\servicebus\_common\receiver_mixins.py", line 103, in _check_message_alive
    raise ValueError(
ValueError: Failed to complete the message as the handler has already been shutdown.Please use ServiceBusClient to create a new instance.

Expected behavior
Application will behave the same way no mater how long consumer will take. Refreshing tokens (if this is the issue) should be handled correctly during long running operations.

Additional context
Currently I handle that situation by reconnecting to the Azure Service Bus in case of any errors and subscribe to messages one more time. Additionaly I implement inbox to know which messages were already processed (from application perspective, not broker). But this is workaround that will works only if I will have one instance of the consumer. If I will have multiple instances then when lock renewal process fail other instance will take the same message for processing when the first consumer is still working on it. Inbox will not help in that situation.

I attach full logs from sample run (processing time set to 75 minutes): log.log

@github-actions github-actions bot added Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team question The issue doesn't require a change to the product in order to be resolved. Most issues start as that Service Attention Workflow: This issue is responsible by Azure service team. Service Bus labels May 21, 2024
Copy link

Thanks for the feedback! We are routing this to the appropriate team for follow-up. cc @EldertGrootenboer.

@kashifkhan
Copy link
Member

Thank you for the feedback @gpiotrowski . We will investigate and get back to you asap.

@kashifkhan kashifkhan added Messaging Messaging crew and removed Service Attention Workflow: This issue is responsible by Azure service team. needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team labels May 21, 2024
@github-actions github-actions bot added the needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team label May 21, 2024
@kashifkhan
Copy link
Member

@gpiotrowski Would you also be able to provide DEBUG level logs with frame level logging turned on please? This way we can see the interaction between the client and service.

import logging
import sys

handler = logging.StreamHandler(stream=sys.stdout)
logger = logging.getLogger('azure.servicebus')
logger.setLevel(logging.DEBUG)
logger.addHandler(handler)

...

from azure.servicebus import ServiceBusClient

client = ServiceBusClient(..., logging_enable=True)

For the issue around authentication please make sure that either mechanism has the proper access rights

Connection String - Manage level - https://learn.microsoft.com/en-us/azure/event-hubs/event-hubs-get-connection-string
Entra ID https://learn.microsoft.com/en-us/azure/event-hubs/authorize-access-azure-active-directory

@kashifkhan kashifkhan added the needs-author-feedback Workflow: More information is needed from author to address the issue. label May 21, 2024
@github-actions github-actions bot removed the needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team label May 21, 2024
Copy link

Hi @gpiotrowski. Thank you for opening this issue and giving us the opportunity to assist. To help our team better understand your issue and the details of your scenario please provide a response to the question asked above or the information requested above. This will help us more accurately address your issue.

@gpiotrowski
Copy link
Author

Hi @kashifkhan, sure! You could find log here: logging-enabled.log. It was test run using connection string with sleep() for 75 minutes.

Both mechanisms have proper access rights - for connection string it is Manage (so both Send and Listen are enabled). In fact, as described in the issue, for shorter jobs it works flawlessly for several hours without any authentication issue. So I assume that this is connected with long running jobs.

@github-actions github-actions bot added needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team and removed needs-author-feedback Workflow: More information is needed from author to address the issue. labels May 22, 2024
@kashifkhan
Copy link
Member

@gpiotrowski I was able to reproduce this on my end and am working on a PR to address it. Ill update the issue when we have it ready for release

@kashifkhan kashifkhan added the issue-addressed Workflow: The Azure SDK team believes it to be addressed and ready to close. label Jun 19, 2024
@github-actions github-actions bot removed the needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team label Jun 19, 2024
Copy link

Hi @gpiotrowski. Thank you for opening this issue and giving us the opportunity to assist. We believe that this has been addressed. If you feel that further discussion is needed, please add a comment with the text "/unresolve" to remove the "issue-addressed" label and continue the conversation.

Copy link

Hi @gpiotrowski, since you haven’t asked that we /unresolve the issue, we’ll close this out. If you believe further discussion is needed, please add a comment /unresolve to reopen the issue.

@cgoma
Copy link

cgoma commented Aug 27, 2024

Hi @kashifkhan I am also facing same issue for my long running processes.
Could you please let me know how I can use the version of service bus sdk which has this fix implemented.

I am unable to install pre release packages using command "pip install --pre azure-servicebus==7.12.3". It throws error. Also when I am trying to install "pip install --pre azure-servicebus=7.13.0b1", it does not have this fix implemented.

Could you please help me regarding this.

@l0lawrence
Copy link
Member

l0lawrence commented Aug 27, 2024

Hi @cgoma, the version with the fix has not been released yet. We will update this thread when it is

@tj---
Copy link

tj--- commented Sep 11, 2024

There are so many threads on this issue, so I am posting it here only. Spent hours to find the cause 😔
One problem is this config in AutoLockRenewer that cannot be changed

        self._sleep_time = 0.5
        self._renew_period = 10

I pull a large number of messages that take a while to process. The renewer starts to process when only 10s are remaining (which is too late) because there is a funny "sleep" of 0.5 seconds between each message.
So my only option is to create the renewer with a pool of size (num_messages / (renew_period * sleep_time + some_buffer). Which is not intuitive because I am using so many threads, just to renew the messages!

It would be great if you could let a configurable value be put against the _renew_period -> maybe up to a minute.

@kashifkhan
@l0lawrence

@tj---
Copy link

tj--- commented Sep 11, 2024

Also, why can't a larger lock duration ( > 5m) be put in the ServiceBus::Queue itself? This seems like such a roundabout and has a potential for errors.

@l0lawrence
Copy link
Member

@tj--- could you please share a code sample of what you are running where you are hitting issues with renewing the locks, as well as the size of the messages you are receiving?

@tj---
Copy link

tj--- commented Sep 11, 2024

I'll give you pseudo code, as the actual code is part of my project distributed across modules.
Size of the message: ~1.2KB each . This is a standard Azure Storage Account blob creation message that is delivered by EventGrid System Topic to ServiceBus Queue - We have created those subscriptions. Effectively, each message has the "blob name".
Lock duration (set on the Queue): PT4M

Pseudo Code:

message_lock_renewer = AutoLockRenewer(
    max_lock_renewal_duration=15*60,
    max_workers=1, # using 1, to demonstrate the problem
    # on_lock_renew_failure=self.lock_renewal_failure_callback
)
client = ServiceBusClient.from_connection_string(
    conn_str="<connection string>"
)
receiver = client.get_queue_receiver(
    queue_name="some_test_queue",
    max_wait_time=30,
    auto_lock_renewer=message_lock_renewer,
    receive_mode=ServiceBusReceiveMode.PEEK_LOCK
)
messages = receiver.receive_messages(
       max_message_count=400,
       max_wait_time=30
)
# Process a batch of messages. Processing involves:
# Download blobs in smaller batches, apply certain transformations, write to Snowflake.
# this operation takes 5+ minutes
# or you could do this instead ---->  time.sleep(6*60)
process_messages(messages)

# Complete the messages
for message in messages:
    receiver.complete_message(message)

@kashifkhan
Copy link
Member

@tj--- we believe we fixed this problem for the original issue. We plan on releasing the fix officially next week but in the mean time you could build a wheel from main and see if it resolves your scenario.

@tj---
Copy link

tj--- commented Sep 11, 2024

If you are referring to this: #35889, I don't think it addresses this problem.

@kashifkhan
Copy link
Member

@tj please go ahead and open a new issue please, attach DEBUG logs with frame level logging as described here #35717 (comment)

@tj---
Copy link

tj--- commented Sep 11, 2024

Hmm, okay, will do. But it's very easy to reproduce using the pseudocode I provided.

@kashifkhan
Copy link
Member

@cgoma @gpiotrowski the fix is now in pypi

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. issue-addressed Workflow: The Azure SDK team believes it to be addressed and ready to close. Messaging Messaging crew question The issue doesn't require a change to the product in order to be resolved. Most issues start as that Service Bus
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants