Skip to content
This repository has been archived by the owner on Oct 16, 2024. It is now read-only.

Commit

Permalink
Create celery_app even if there is no broker_url (#50)
Browse files Browse the repository at this point in the history
  • Loading branch information
javierdelapuente authored Oct 1, 2024
1 parent 01e0f88 commit 5642b8c
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 43 deletions.
79 changes: 40 additions & 39 deletions examples/flask/test_rock/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,46 +57,47 @@ def __call__(self, *args: object, **kwargs: object) -> object:

broker_url = os.environ.get("REDIS_DB_CONNECT_STRING")
# Configure Celery only if Redis is configured
if broker_url:
celery_app = celery_init_app(app, broker_url)
redis_client = redis.Redis.from_url(broker_url)
celery_app = celery_init_app(app, broker_url)
redis_client = redis.Redis.from_url(broker_url) if broker_url else None

@celery_app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
"""Set up periodic tasks in the scheduler."""
try:
# This will only have an effect in the beat scheduler.
sender.add_periodic_task(0.5, scheduled_task.s(hostname()), name="every 0.5s")
except NameError as e:
logging.exception("Failed to configure the periodic task")

@celery_app.task
def scheduled_task(scheduler_hostname):
"""Function to run a schedule task in a worker.
The worker that will run this task will add the scheduler hostname argument
to the "schedulers" set in Redis, and the worker's hostname to the "workers"
set in Redis.
"""
worker_hostname = hostname()
logging.info(
"scheduler host received %s in worker host %s", scheduler_hostname, worker_hostname
)
redis_client.sadd("schedulers", scheduler_hostname)
redis_client.sadd("workers", worker_hostname)
logging.info("schedulers: %s", redis_client.smembers("schedulers"))
logging.info("workers: %s", redis_client.smembers("workers"))
# The goal is to have all workers busy in all processes.
# For that it maybe necessary to exhaust all workers, but not to get the pending tasks
# too big, so all schedulers can manage to run their scheduled tasks.
# Celery prefetches tasks, and if they cannot be run they are put in reserved.
# If all processes have tasks in reserved, this task will finish immediately to not make
# queues any longer.
inspect_obj = celery_app.control.inspect()
reserved_sizes = [len(tasks) for tasks in inspect_obj.reserved().values()]
logging.info("number of reserved tasks %s", reserved_sizes)
delay = 0 if min(reserved_sizes) > 0 else 5
time.sleep(delay)

@celery_app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
"""Set up periodic tasks in the scheduler."""
try:
# This will only have an effect in the beat scheduler.
sender.add_periodic_task(0.5, scheduled_task.s(hostname()), name="every 0.5s")
except NameError as e:
logging.exception("Failed to configure the periodic task")


@celery_app.task
def scheduled_task(scheduler_hostname):
"""Function to run a schedule task in a worker.
The worker that will run this task will add the scheduler hostname argument
to the "schedulers" set in Redis, and the worker's hostname to the "workers"
set in Redis.
"""
worker_hostname = hostname()
logging.info(
"scheduler host received %s in worker host %s", scheduler_hostname, worker_hostname
)
redis_client.sadd("schedulers", scheduler_hostname)
redis_client.sadd("workers", worker_hostname)
logging.info("schedulers: %s", redis_client.smembers("schedulers"))
logging.info("workers: %s", redis_client.smembers("workers"))
# The goal is to have all workers busy in all processes.
# For that it maybe necessary to exhaust all workers, but not to get the pending tasks
# too big, so all schedulers can manage to run their scheduled tasks.
# Celery prefetches tasks, and if they cannot be run they are put in reserved.
# If all processes have tasks in reserved, this task will finish immediately to not make
# queues any longer.
inspect_obj = celery_app.control.inspect()
reserved_sizes = [len(tasks) for tasks in inspect_obj.reserved().values()]
logging.info("number of reserved tasks %s", reserved_sizes)
delay = 0 if min(reserved_sizes) > 0 else 5
time.sleep(delay)


def get_mysql_database():
Expand Down
6 changes: 2 additions & 4 deletions examples/flask/test_rock/rockcraft.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,13 @@ extensions:
services:
celery-worker:
override: replace
# redis is not mandatory in the charm. We do not want the charm to fail immediately, so the sleep
command: bash -c "sleep 5; celery -A app:celery_app worker -c 2 --loglevel DEBUG"
command: celery -A app:celery_app worker -c 2 --loglevel DEBUG
startup: enabled
user: _daemon_
working-dir: /flask/app
celery-beat-scheduler:
override: replace
# redis is not mandatory in the charm. We do not want the charm to fail immediately, so the sleep
command: bash -c "sleep 5; celery -A app:celery_app beat --loglevel DEBUG -s /tmp/celerybeat-schedule"
command: celery -A app:celery_app beat --loglevel DEBUG -s /tmp/celerybeat-schedule
startup: enabled
user: _daemon_
working-dir: /flask/app

0 comments on commit 5642b8c

Please sign in to comment.