From 5642b8c2cd2823b49169963cc2ff76385caf254e Mon Sep 17 00:00:00 2001 From: javierdelapuente Date: Tue, 1 Oct 2024 12:32:01 +0200 Subject: [PATCH] Create celery_app even if there is no broker_url (#50) --- examples/flask/test_rock/app.py | 79 +++++++++++++------------ examples/flask/test_rock/rockcraft.yaml | 6 +- 2 files changed, 42 insertions(+), 43 deletions(-) diff --git a/examples/flask/test_rock/app.py b/examples/flask/test_rock/app.py index db803da..7d0b087 100644 --- a/examples/flask/test_rock/app.py +++ b/examples/flask/test_rock/app.py @@ -57,46 +57,47 @@ def __call__(self, *args: object, **kwargs: object) -> object: broker_url = os.environ.get("REDIS_DB_CONNECT_STRING") # Configure Celery only if Redis is configured -if broker_url: - celery_app = celery_init_app(app, broker_url) - redis_client = redis.Redis.from_url(broker_url) +celery_app = celery_init_app(app, broker_url) +redis_client = redis.Redis.from_url(broker_url) if broker_url else None - @celery_app.on_after_configure.connect - def setup_periodic_tasks(sender, **kwargs): - """Set up periodic tasks in the scheduler.""" - try: - # This will only have an effect in the beat scheduler. - sender.add_periodic_task(0.5, scheduled_task.s(hostname()), name="every 0.5s") - except NameError as e: - logging.exception("Failed to configure the periodic task") - - @celery_app.task - def scheduled_task(scheduler_hostname): - """Function to run a schedule task in a worker. - - The worker that will run this task will add the scheduler hostname argument - to the "schedulers" set in Redis, and the worker's hostname to the "workers" - set in Redis. - """ - worker_hostname = hostname() - logging.info( - "scheduler host received %s in worker host %s", scheduler_hostname, worker_hostname - ) - redis_client.sadd("schedulers", scheduler_hostname) - redis_client.sadd("workers", worker_hostname) - logging.info("schedulers: %s", redis_client.smembers("schedulers")) - logging.info("workers: %s", redis_client.smembers("workers")) - # The goal is to have all workers busy in all processes. - # For that it maybe necessary to exhaust all workers, but not to get the pending tasks - # too big, so all schedulers can manage to run their scheduled tasks. - # Celery prefetches tasks, and if they cannot be run they are put in reserved. - # If all processes have tasks in reserved, this task will finish immediately to not make - # queues any longer. - inspect_obj = celery_app.control.inspect() - reserved_sizes = [len(tasks) for tasks in inspect_obj.reserved().values()] - logging.info("number of reserved tasks %s", reserved_sizes) - delay = 0 if min(reserved_sizes) > 0 else 5 - time.sleep(delay) + +@celery_app.on_after_configure.connect +def setup_periodic_tasks(sender, **kwargs): + """Set up periodic tasks in the scheduler.""" + try: + # This will only have an effect in the beat scheduler. + sender.add_periodic_task(0.5, scheduled_task.s(hostname()), name="every 0.5s") + except NameError as e: + logging.exception("Failed to configure the periodic task") + + +@celery_app.task +def scheduled_task(scheduler_hostname): + """Function to run a schedule task in a worker. + + The worker that will run this task will add the scheduler hostname argument + to the "schedulers" set in Redis, and the worker's hostname to the "workers" + set in Redis. + """ + worker_hostname = hostname() + logging.info( + "scheduler host received %s in worker host %s", scheduler_hostname, worker_hostname + ) + redis_client.sadd("schedulers", scheduler_hostname) + redis_client.sadd("workers", worker_hostname) + logging.info("schedulers: %s", redis_client.smembers("schedulers")) + logging.info("workers: %s", redis_client.smembers("workers")) + # The goal is to have all workers busy in all processes. + # For that it maybe necessary to exhaust all workers, but not to get the pending tasks + # too big, so all schedulers can manage to run their scheduled tasks. + # Celery prefetches tasks, and if they cannot be run they are put in reserved. + # If all processes have tasks in reserved, this task will finish immediately to not make + # queues any longer. + inspect_obj = celery_app.control.inspect() + reserved_sizes = [len(tasks) for tasks in inspect_obj.reserved().values()] + logging.info("number of reserved tasks %s", reserved_sizes) + delay = 0 if min(reserved_sizes) > 0 else 5 + time.sleep(delay) def get_mysql_database(): diff --git a/examples/flask/test_rock/rockcraft.yaml b/examples/flask/test_rock/rockcraft.yaml index 91ad18b..8dcdf56 100644 --- a/examples/flask/test_rock/rockcraft.yaml +++ b/examples/flask/test_rock/rockcraft.yaml @@ -15,15 +15,13 @@ extensions: services: celery-worker: override: replace - # redis is not mandatory in the charm. We do not want the charm to fail immediately, so the sleep - command: bash -c "sleep 5; celery -A app:celery_app worker -c 2 --loglevel DEBUG" + command: celery -A app:celery_app worker -c 2 --loglevel DEBUG startup: enabled user: _daemon_ working-dir: /flask/app celery-beat-scheduler: override: replace - # redis is not mandatory in the charm. We do not want the charm to fail immediately, so the sleep - command: bash -c "sleep 5; celery -A app:celery_app beat --loglevel DEBUG -s /tmp/celerybeat-schedule" + command: celery -A app:celery_app beat --loglevel DEBUG -s /tmp/celerybeat-schedule startup: enabled user: _daemon_ working-dir: /flask/app