Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscriber future hangs with large number of messages #242

Closed
Spawn opened this issue Nov 19, 2020 · 6 comments
Closed

Subscriber future hangs with large number of messages #242

Spawn opened this issue Nov 19, 2020 · 6 comments
Assignees
Labels
api: pubsub Issues related to the googleapis/python-pubsub API. type: question Request for information or clarification. Not an issue.

Comments

@Spawn
Copy link

Spawn commented Nov 19, 2020

Environment details

  • OS type and version: python:3.6-alpine
  • Python version: 3.6
  • pip version: 20.2.3
  • google-cloud-pubsub 1.7.0

Problem

We have an high-loaded worker in kubernetes which listens pub/sub queue. Problem is that when pub/sub queue receive several thousands messages worker is stopping processing without any errors (just freezes). Other workers whose processing time is short are working normally but problem worker sometimes could process one message up to several minutes (http requests, database requests) and after processing several hundreds messages logs are stop printing anything.

So to reproduce issue you should have pub/sub subscription with several thousands messages and create python subscriber which will process one message up to 1 minute and longer sometimes.

Regarding longer processing time I specified timeout=60 for future so if processing time taking so long it should throw TimeoutError where I'm calling future.cancel() and here is the problem - cancel() is waiting for graceful shutdown of consuming messages thread and when I did some debugging for it I found out that sometimes cancel() never ends. For instance on server this kind of freeze took 10 days without any logs/errors/etc.

I'm wondering if there way to force future canceling?

Subscription config

Selection_999(241)

Code example

def run():
    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(
        project_id, subscription_name
    )

    future = subscriber.subscribe(subscription_path, callback=some_long_working_code)
    logger.info('Subscriber initialized, listening...')

    with subscriber:
        try:
            future.result(timeout=60)
        except TimeoutError:  # from concurrent.futures import TimeoutError
            logger.warning('Timeout reached, resubscribing...')
            future.cancel()

Thank you for your time!

@product-auto-label product-auto-label bot added the api: pubsub Issues related to the googleapis/python-pubsub API. label Nov 19, 2020
@yoshi-automation yoshi-automation added triage me I really want to be triaged. 🚨 This issue needs some love. labels Nov 20, 2020
@pradn
Copy link
Contributor

pradn commented Dec 10, 2020

The fact that cancel() is hanging probably indicates its a threading issue - perhaps your work item is holding up a thread indefinitely and cancel() can't finish waiting for it? The subscriber runs callbacks on a set of threads, and perhaps they are all being used up? You can pass in a scheduler here. The default thread pool uses a max_workers/threads of just 10. Try raising this limit and see if it helps. If the number of long running tasks is greater than the number of threads, you might still run into starvation issues. Perhaps add a metric to see how long each work item is taking and also to check if some never finish? That might be the root cause.

@meredithslota meredithslota added type: question Request for information or clarification. Not an issue. and removed 🚨 This issue needs some love. triage me I really want to be triaged. labels Dec 10, 2020
@plamut
Copy link
Contributor

plamut commented Feb 16, 2021

@Spawn
While developing a graceful (blocking) streaming pull shutdown in #292 I realized that streaming_future.cancel() already blocks until all currently executing callbacks are done processing. I fixed that along the way, so that by default the cancel() method will actually return as soon as streaming pull is shut down (e.g. helper threads terminated). The already executing message callbacks will keep running, but cancel() will not block on them anymore, unless explicitly told to do so.

As @pradn mentioned, the hangs observed are most likely caused by some of the currently executing message callbacks blocking indefinitely. BTW, have you identified any potentially problematic spots in the code in the meantime?

@Spawn
Copy link
Author

Spawn commented Feb 17, 2021

Hey @plamut thank you for update.

BTW, have you identified any potentially problematic spots in the code in the meantime?

Yes, I did some investigations regarding this. By the fact only place which could block execution is database commit() call. I put log messages before and after commit and noticed that both are appearing in logs. Log message after commit is last line of code in current callback and this message ends logs. So it seems runtime freezes elsewhere.

@plamut
Copy link
Contributor

plamut commented Feb 17, 2021

Log message after commit is last line of code in current callback and this message ends logs. So it seems runtime freezes elsewhere.

Noted, thanks.

@acocuzzo
Copy link
Contributor

@Spawn are you still experiencing this issue with the latest version of google-cloud-pubsub?

@acocuzzo
Copy link
Contributor

Closing as no response, please re-open if you are still experiencing

@acocuzzo acocuzzo reopened this Sep 15, 2022
@acocuzzo acocuzzo closed this as completed Apr 3, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the googleapis/python-pubsub API. type: question Request for information or clarification. Not an issue.
Projects
None yet
Development

No branches or pull requests

6 participants