Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple celery fork pool workers don't work #1785

Open
lzgabel opened this issue Sep 9, 2023 · 17 comments
Open

Multiple celery fork pool workers don't work #1785

lzgabel opened this issue Sep 9, 2023 · 17 comments

Comments

@lzgabel
Copy link

lzgabel commented Sep 9, 2023

ENV:
Broker: Redis
Celery: 5.2.7
Python: 3.9.0

👋 Hi All. FYI, since 5.3.2 was released, we encountered multiple celery fork pool workers don't work, so we rolled back to 5.3.1 and everything returned to normal.

Version: 5.3.2

image

image


Version: 5.3.1
image

image


🤔 We compared the kombu version changes, and when we reverted this PR: #1733 in version 5.3.2, all workers worked normally.

cc @auvipy @Nusnus @mfaw @mbierma.

@mbierma
Copy link
Contributor

mbierma commented Sep 9, 2023

@lzgabel do you have an example to repo the issue?

@lzgabel
Copy link
Author

lzgabel commented Sep 11, 2023

Hi @mbierma. After our investigation, due to the following code existing in the system:

@celeryd_init.connect
def recover_job(sender=None, conf=None, **kwargs) -> None:
    i = app.control.inspect()
    running_list = []
    for worker_list in i.active().values():
        for task_item in worker_list:
            name = task_item.get('name')
            # do something
            args = task_item.get('args')
            job_id = args[1]
            if job_id:
                running_list.append(job_id)
        logger.info(f'lost job : {running_list}')

It will be blocked here _brpop_read() because of the content fixed by this PR #1733.

@mfaw
Copy link

mfaw commented Sep 17, 2023

hi @lzgabel , I think this is different than my issue. My issue is not fork pool workers but celery workers themselves. I can have 2 celery workers when I'm using rabbitmq as a broker but, when I use kafka as a broker, only one celery worker is working.

@ojnas
Copy link

ojnas commented Nov 16, 2023

We have exactly the same problem. Downgrading kombu to 5.3.1 solves it.

@thuibr
Copy link

thuibr commented Apr 1, 2024

hi @lzgabel , I think this is different than my issue. My issue is not fork pool workers but celery workers themselves. I can have 2 celery workers when I'm using rabbitmq as a broker but, when I use kafka as a broker, only one celery worker is working.

@mfaw is this still the case? If so then maybe I'll put it as a limitation in my celery documentation ticket here celery/celery#8935

@thuibr
Copy link

thuibr commented Apr 1, 2024

It seems that if you specify another queue, like add instead of the default celery queue, then the task is routed to a new topic called add. If you do that then you can also create a second worker listening on the add queue. Then you can have two working. It is unfortunate that you can't have two workers listening to the same queue though. That definitely seems like a regression. I wonder if we can construct a test case for this.

@thuibr
Copy link

thuibr commented Apr 1, 2024

I wonder if we can use partitions to allow for multiple workers.

@ojnas
Copy link

ojnas commented May 21, 2024

Is there any plan to fix the issue reported by @lzgabel or is there any known workaround? It's still preventing us from upgrading kombu and celery beyond 5.3.1.

@thuibr
Copy link

thuibr commented May 22, 2024

Is there any plan to fix the issue reported by @lzgabel or is there any known workaround? It's still preventing us from upgrading kombu and celery beyond 5.3.1.

I don't think that there is a maintainer for Kafka at the moment.

@ojnas
Copy link

ojnas commented May 23, 2024

Is there any plan to fix the issue reported by @lzgabel or is there any known workaround? It's still preventing us from upgrading kombu and celery beyond 5.3.1.

I don't think that there is a maintainer for Kafka at the moment.

The original issue reported by @lzgabel is not for Kafka but using Redis as broker, which we are also using.

@thuibr
Copy link

thuibr commented Jul 28, 2024

@lzgabel @ojnas I am unable to reproduce the issue using Celery 5.2.7 and Kombu 5.3.2, Redis-Py 5.0.7, and Python 3.9.0. What version of Redis are you using? I am using

$ redis-server --version
Redis server v=6.0.16 sha=00000000:0 malloc=jemalloc-5.2.1 bits=64 build=a3fdef44459b3ad6
[2024-07-28 10:38:49,622: INFO/ForkPoolWorker-1] Task tasks.add[8adf78ef-53ff-4845-80d3-2582089060fb] succeeded in 10.009252441999706s: 8
[2024-07-28 10:38:59,371: INFO/ForkPoolWorker-2] Task tasks.add[b59745df-bef0-48be-9667-691dc1893aa9] succeeded in 10.008264554000107s: 8

Additionally, have you tried the latest versions of Kombu and Celery?

@ojnas
Copy link

ojnas commented Sep 26, 2024

@thuibr I just tried with latest versions Celery 5.4.0 and Kombu 5.4.2 and the issue still persists for us. Redis version is 7.4.0.

@ojnas
Copy link

ojnas commented Sep 26, 2024

@thuibr If you want to reproduce the issue, as @lzgabel mentioned in their comment here #1785 (comment), it seems related to app.control.inspect(). When this has been used at any time, multiple workers don't work.

@ojnas
Copy link

ojnas commented Sep 29, 2024

These issues seems to be related:
celery/celery#8671
celery/celery#8874

@thuibr
Copy link

thuibr commented Oct 15, 2024

@ojnas I was able to produce a different issue, but I would think that it has the same root cause. A Warm Shutdown hangs with the following code:

import logging

from celery import Celery
from celery.signals import celeryd_init

app = Celery('tasks', broker='redis://localhost:6379/0')

logger = logging.getLogger(__name__)

@app.task
def add(x, y):
    return x + y


@celeryd_init.connect
def recover_job(sender=None, conf=None, **kwargs) -> None:
    i = app.control.inspect()
    running_list = []
    if i.active():
        for worker_list in i.active().values():
            for task_item in worker_list:
                name = task_item.get('name')
                args = task_item.get('args')
                job_id = args[1]
                if job_id:
                    running_list.append(job_id)
            logger.info(f'lost job : {running_list}')

Adding a breakpoint here in kombu/transport/redis.py for some reason fixes the issue:

    def close(self):
        self._closing = True
        if self._in_poll:
            try:
                breakpoint()
                self._brpop_read()
            except Empty:
                pass
        if not self.closed:
            # remove from channel poller.
            self.connection.cycle.discard(self)

            # delete fanout bindings
            client = self.__dict__.get('client')  # only if property cached
            if client is not None:
                for queue in self._fanout_queues:
                    if queue in self.auto_delete_queues:
                        self.queue_delete(queue, client=client)
            self._disconnect_pools()
            self._close_clients()
        super().close()

@thuibr
Copy link

thuibr commented Oct 15, 2024

I also notice that we get stuck here:

worker: Warm shutdown (MainProcess)
[2024-10-15 06:33:14,725: DEBUG/MainProcess] | Worker: Closing Hub...
[2024-10-15 06:33:14,725: DEBUG/MainProcess] | Worker: Closing Pool...
[2024-10-15 06:33:14,725: DEBUG/MainProcess] | Worker: Closing Consumer...
[2024-10-15 06:33:14,726: DEBUG/MainProcess] | Worker: Stopping Consumer...
[2024-10-15 06:33:14,726: DEBUG/MainProcess] | Consumer: Closing Connection...
[2024-10-15 06:33:14,726: DEBUG/MainProcess] | Consumer: Closing Events...
[2024-10-15 06:33:14,726: DEBUG/MainProcess] | Consumer: Closing Mingle...
[2024-10-15 06:33:14,726: DEBUG/MainProcess] | Consumer: Closing Gossip...
[2024-10-15 06:33:14,727: DEBUG/MainProcess] | Consumer: Closing Tasks...
[2024-10-15 06:33:14,727: DEBUG/MainProcess] | Consumer: Closing Control...
[2024-10-15 06:33:14,727: DEBUG/MainProcess] | Consumer: Closing Heart...
[2024-10-15 06:33:14,727: DEBUG/MainProcess] | Consumer: Closing event loop...
[2024-10-15 06:33:14,728: DEBUG/MainProcess] | Consumer: Stopping event loop...
[2024-10-15 06:33:14,728: DEBUG/MainProcess] | Consumer: Stopping Heart...
[2024-10-15 06:33:14,730: DEBUG/MainProcess] | Consumer: Stopping Control...
[2024-10-15 06:33:14,734: DEBUG/MainProcess] | Consumer: Stopping Tasks...
[2024-10-15 06:33:14,734: DEBUG/MainProcess] Canceling task consumer...
[2024-10-15 06:33:14,734: DEBUG/MainProcess] | Consumer: Stopping Gossip...
[2024-10-15 06:33:14,738: DEBUG/MainProcess] | Consumer: Stopping Mingle...
[2024-10-15 06:33:14,738: DEBUG/MainProcess] | Consumer: Stopping Events...
[2024-10-15 06:33:14,738: DEBUG/MainProcess] | Consumer: Stopping Connection...
[2024-10-15 06:33:14,739: DEBUG/MainProcess] | Worker: Stopping Pool...

but we make it past that with the breakpoint:

worker: Warm shutdown (MainProcess)
[2024-10-15 06:37:19,521: DEBUG/MainProcess] | Worker: Closing Hub...
[2024-10-15 06:37:19,522: DEBUG/MainProcess] | Worker: Closing Pool...
[2024-10-15 06:37:19,522: DEBUG/MainProcess] | Worker: Closing Consumer...
[2024-10-15 06:37:19,522: DEBUG/MainProcess] | Worker: Stopping Consumer...
[2024-10-15 06:37:19,522: DEBUG/MainProcess] | Consumer: Closing Connection...
[2024-10-15 06:37:19,523: DEBUG/MainProcess] | Consumer: Closing Events...
[2024-10-15 06:37:19,523: DEBUG/MainProcess] | Consumer: Closing Mingle...
[2024-10-15 06:37:19,523: DEBUG/MainProcess] | Consumer: Closing Gossip...
[2024-10-15 06:37:19,523: DEBUG/MainProcess] | Consumer: Closing Tasks...
[2024-10-15 06:37:19,523: DEBUG/MainProcess] | Consumer: Closing Control...
[2024-10-15 06:37:19,523: DEBUG/MainProcess] | Consumer: Closing Heart...
[2024-10-15 06:37:19,524: DEBUG/MainProcess] | Consumer: Closing event loop...
[2024-10-15 06:37:19,524: DEBUG/MainProcess] | Consumer: Stopping event loop...
[2024-10-15 06:37:19,524: DEBUG/MainProcess] | Consumer: Stopping Heart...
[2024-10-15 06:37:19,526: DEBUG/MainProcess] | Consumer: Stopping Control...
[2024-10-15 06:37:19,530: DEBUG/MainProcess] | Consumer: Stopping Tasks...
[2024-10-15 06:37:19,530: DEBUG/MainProcess] Canceling task consumer...
[2024-10-15 06:37:19,531: DEBUG/MainProcess] | Consumer: Stopping Gossip...
[2024-10-15 06:37:19,534: DEBUG/MainProcess] | Consumer: Stopping Mingle...
[2024-10-15 06:37:19,535: DEBUG/MainProcess] | Consumer: Stopping Events...
[2024-10-15 06:37:19,535: DEBUG/MainProcess] | Consumer: Stopping Connection...
[2024-10-15 06:37:19,535: DEBUG/MainProcess] | Worker: Stopping Pool...
[2024-10-15 06:37:20,544: DEBUG/MainProcess] | Worker: Stopping Hub...
[2024-10-15 06:37:20,545: DEBUG/MainProcess] | Consumer: Shutdown Heart...
[2024-10-15 06:37:20,545: DEBUG/MainProcess] | Consumer: Shutdown Control...
[2024-10-15 06:37:20,545: DEBUG/MainProcess] | Consumer: Shutdown Tasks...
[2024-10-15 06:37:20,545: DEBUG/MainProcess] Canceling task consumer...
[2024-10-15 06:37:20,545: DEBUG/MainProcess] Closing consumer channel...
[2024-10-15 06:37:20,545: DEBUG/MainProcess] | Consumer: Shutdown Gossip...
[2024-10-15 06:37:20,545: DEBUG/MainProcess] | Consumer: Shutdown Events...
[2024-10-15 06:37:20,545: DEBUG/MainProcess] | Consumer: Shutdown Connection...
[2024-10-15 06:37:20,548: WARNING/MainProcess] > /home/tom/code/kombu/kombu/transport/redis.py(1093)close()
-> self._brpop_read()
[2024-10-15 06:37:20,548: WARNING/MainProcess] (Pdb) 
c
[2024-10-15 06:37:22,115: DEBUG/MainProcess] removing tasks from inqueue until task handler finished

@thuibr
Copy link

thuibr commented Oct 23, 2024

I am wondering if disconnecting all redis connections after an inspect operation will mitigate this issue, but for some reason the redis connections are not disconnecting when I call client.connection_pool.disconnect()

@celeryd_init.connect
def recover_job(sender=None, conf=None, **kwargs) -> None:
    #breakpoint()
    i = app.control.inspect()
    running_list = []
    # breakpoint()

    # Get number of active_connections from redis
    client = app.connection().channel().client
    info = client.info()
    active_connections = info.get("connected_clients", 0)
    # breakpoint()

    i.ping()

    # Disconnect all redis connections
    client.connection_pool.disconnect()

    info = client.info()
    active_connections = info.get("connected_clients", 0)
    breakpoint()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants