Skip to content

Commit

Permalink
Improve Celery Async Tasks configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
afabiani committed Oct 27, 2020
1 parent ba6233e commit 50e208a
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 16 deletions.
7 changes: 6 additions & 1 deletion geonode/br/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@
from geonode.celery_app import app


@app.task(queue='email')
@app.task(
bind=True,
name='geonode.br.tasks.restore_notification',
queue='email',
autoretry_for=(Exception, ),
retry_kwargs={'max_retries': 3, 'countdown': 180})
def restore_notification(recipients: List, backup_file: str, backup_md5: str, exception: str = None):
"""
Function sending a CC email report of the restore procedure to a provided emails.
Expand Down
10 changes: 8 additions & 2 deletions geonode/celery_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,17 @@ def setup_periodic_tasks(sender, **kwargs):
# Calls test('world') every 30 seconds
sender.add_periodic_task(30.0, test.s('world'), expires=10)
@app.task
@app.task(
bind=True,
name='{{project_name}}.test',
queue='default')
def test(arg):
_log(arg)
@app.task(bind=True)
@app.task(
bind=True,
name='{{project_name}}.debug_task',
queue='default')
def debug_task(self):
_log("Request: {!r}".format(self.request))
"""
5 changes: 4 additions & 1 deletion geonode/documents/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@
logger = get_task_logger(__name__)


@app.task(bind=True, queue='update')
@app.task(
bind=True,
name='geonode.documents.tasks.create_document_thumbnail',
queue='update')
def create_document_thumbnail(self, object_id):
"""
Create thumbnail for a document.
Expand Down
13 changes: 9 additions & 4 deletions geonode/monitoring/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#########################################################################

from celery import shared_task
from django.core.management import call_command

from geonode.celery_app import app


@shared_task
def collect_metrics():
@app.task(
bind=True,
name='geonode.monitoring.tasks.collect_metrics',
queue='update',
autoretry_for=(Exception, ),
retry_kwargs={'max_retries': 5, 'countdown': 180})
def collect_metrics(self):
"""
Collect metrics events data
"""
Expand Down
11 changes: 8 additions & 3 deletions geonode/security/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#########################################################################

from celery import shared_task
from django.conf import settings
from .utils import sync_resources_with_guardian

from geonode.celery_app import app


@shared_task
@app.task(
bind=True,
name='geonode.security.tasks.synch_guardian',
queue='update',
autoretry_for=(Exception, ),
retry_kwargs={'max_retries': 5, 'countdown': 180})
def synch_guardian():
"""
Sync resources with Guardian and clear their dirty state
Expand Down
7 changes: 4 additions & 3 deletions geonode/services/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@
logger = logging.getLogger(__name__)


@app.task(bind=True,
name='geonode.services.tasks.update.harvest_resource',
queue='update',)
@app.task(
bind=True,
name='geonode.services.tasks.update.harvest_resource',
queue='update')
def harvest_resource(self, harvest_job_id):
harvest_job = models.HarvestJob.objects.get(pk=harvest_job_id)
harvest_job.update_status(
Expand Down
5 changes: 3 additions & 2 deletions geonode/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -1675,8 +1675,9 @@ def get_geonode_catalogue_service():
CELERY_ACKS_LATE = ast.literal_eval(os.environ.get('CELERY_ACKS_LATE', 'True'))

# Set this to False in order to run async
CELERY_TASK_ALWAYS_EAGER = ast.literal_eval(os.environ.get('CELERY_TASK_ALWAYS_EAGER', 'True'))
CELERY_TASK_EAGER_PROPAGATES = ast.literal_eval(os.environ.get('CELERY_TASK_EAGER_PROPAGATES', 'True'))
_EAGER_FLAG = 'False' if ASYNC_SIGNALS else 'True'
CELERY_TASK_ALWAYS_EAGER = ast.literal_eval(os.environ.get('CELERY_TASK_ALWAYS_EAGER', _EAGER_FLAG))
CELERY_TASK_EAGER_PROPAGATES = ast.literal_eval(os.environ.get('CELERY_TASK_EAGER_PROPAGATES', _EAGER_FLAG))
CELERY_TASK_IGNORE_RESULT = ast.literal_eval(os.environ.get('CELERY_TASK_IGNORE_RESULT', 'True'))

# I use these to debug kombu crashes; we get a more informative message.
Expand Down

0 comments on commit 50e208a

Please sign in to comment.