diff --git a/geonode/br/tasks.py b/geonode/br/tasks.py index 572ae812788..ead8bb74a4e 100644 --- a/geonode/br/tasks.py +++ b/geonode/br/tasks.py @@ -25,7 +25,12 @@ from geonode.celery_app import app -@app.task(queue='email') +@app.task( + bind=True, + name='geonode.br.tasks.restore_notification', + queue='email', + autoretry_for=(Exception, ), + retry_kwargs={'max_retries': 3, 'countdown': 180}) def restore_notification(recipients: List, backup_file: str, backup_md5: str, exception: str = None): """ Function sending a CC email report of the restore procedure to a provided emails. diff --git a/geonode/celery_app.py b/geonode/celery_app.py index 72f556d20c6..0df20e35547 100644 --- a/geonode/celery_app.py +++ b/geonode/celery_app.py @@ -48,11 +48,17 @@ def setup_periodic_tasks(sender, **kwargs): # Calls test('world') every 30 seconds sender.add_periodic_task(30.0, test.s('world'), expires=10) -@app.task +@app.task( + bind=True, + name='{{project_name}}.test', + queue='default') def test(arg): _log(arg) -@app.task(bind=True) +@app.task( + bind=True, + name='{{project_name}}.debug_task', + queue='default') def debug_task(self): _log("Request: {!r}".format(self.request)) """ diff --git a/geonode/documents/tasks.py b/geonode/documents/tasks.py index d0b95f53450..d11c20dd33d 100644 --- a/geonode/documents/tasks.py +++ b/geonode/documents/tasks.py @@ -33,7 +33,10 @@ logger = get_task_logger(__name__) -@app.task(bind=True, queue='update') +@app.task( + bind=True, + name='geonode.documents.tasks.create_document_thumbnail', + queue='update') def create_document_thumbnail(self, object_id): """ Create thumbnail for a document. diff --git a/geonode/monitoring/tasks.py b/geonode/monitoring/tasks.py index bcd71c56af2..38ef3a4650a 100644 --- a/geonode/monitoring/tasks.py +++ b/geonode/monitoring/tasks.py @@ -17,13 +17,18 @@ # along with this program. If not, see . # ######################################################################### - -from celery import shared_task from django.core.management import call_command +from geonode.celery_app import app + -@shared_task -def collect_metrics(): +@app.task( + bind=True, + name='geonode.monitoring.tasks.collect_metrics', + queue='update', + autoretry_for=(Exception, ), + retry_kwargs={'max_retries': 5, 'countdown': 180}) +def collect_metrics(self): """ Collect metrics events data """ diff --git a/geonode/security/tasks.py b/geonode/security/tasks.py index e2746f7425c..0e7664e1654 100644 --- a/geonode/security/tasks.py +++ b/geonode/security/tasks.py @@ -17,13 +17,18 @@ # along with this program. If not, see . # ######################################################################### - -from celery import shared_task from django.conf import settings from .utils import sync_resources_with_guardian +from geonode.celery_app import app + -@shared_task +@app.task( + bind=True, + name='geonode.security.tasks.synch_guardian', + queue='update', + autoretry_for=(Exception, ), + retry_kwargs={'max_retries': 5, 'countdown': 180}) def synch_guardian(): """ Sync resources with Guardian and clear their dirty state diff --git a/geonode/services/tasks.py b/geonode/services/tasks.py index 84187e8a618..1e02c91014e 100644 --- a/geonode/services/tasks.py +++ b/geonode/services/tasks.py @@ -34,9 +34,10 @@ logger = logging.getLogger(__name__) -@app.task(bind=True, - name='geonode.services.tasks.update.harvest_resource', - queue='update',) +@app.task( + bind=True, + name='geonode.services.tasks.update.harvest_resource', + queue='update') def harvest_resource(self, harvest_job_id): harvest_job = models.HarvestJob.objects.get(pk=harvest_job_id) harvest_job.update_status( diff --git a/geonode/settings.py b/geonode/settings.py index d502f77285b..b97075d476b 100644 --- a/geonode/settings.py +++ b/geonode/settings.py @@ -1675,8 +1675,9 @@ def get_geonode_catalogue_service(): CELERY_ACKS_LATE = ast.literal_eval(os.environ.get('CELERY_ACKS_LATE', 'True')) # Set this to False in order to run async -CELERY_TASK_ALWAYS_EAGER = ast.literal_eval(os.environ.get('CELERY_TASK_ALWAYS_EAGER', 'True')) -CELERY_TASK_EAGER_PROPAGATES = ast.literal_eval(os.environ.get('CELERY_TASK_EAGER_PROPAGATES', 'True')) +_EAGER_FLAG = 'False' if ASYNC_SIGNALS else 'True' +CELERY_TASK_ALWAYS_EAGER = ast.literal_eval(os.environ.get('CELERY_TASK_ALWAYS_EAGER', _EAGER_FLAG)) +CELERY_TASK_EAGER_PROPAGATES = ast.literal_eval(os.environ.get('CELERY_TASK_EAGER_PROPAGATES', _EAGER_FLAG)) CELERY_TASK_IGNORE_RESULT = ast.literal_eval(os.environ.get('CELERY_TASK_IGNORE_RESULT', 'True')) # I use these to debug kombu crashes; we get a more informative message.