From 07bb15ab3d4b7072c4c7cbfbcd0c763469ffdab6 Mon Sep 17 00:00:00 2001 From: Marat <98183742+dungeon-master-666@users.noreply.github.com> Date: Fri, 24 Mar 2023 09:55:15 +0000 Subject: [PATCH] Disable Celery pub-sub mechanism with Redis backend --- indexer/celery.py | 13 +++++++++++-- scheduler/__main__.py | 5 +++++ 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/indexer/celery.py b/indexer/celery.py index e401046..5fb9838 100644 --- a/indexer/celery.py +++ b/indexer/celery.py @@ -1,5 +1,6 @@ import os from celery import Celery +from celery.backends.redis import RedisBackend from config import settings RABBITMQ_HOST = os.getenv("RABBITMQ_HOST") @@ -9,7 +10,15 @@ REDIS_PORT = os.getenv("REDIS_PORT") CELERY_BROKER_URL = f"amqp://{RABBITMQ_HOST}:{RABBITMQ_PORT}" -CELERY_BACKEND_URL = f"redis://{REDIS_HOST}:{REDIS_PORT}" +CELERY_BACKEND_URL = f"indexer.celery.CustomBackend://{REDIS_HOST}:{REDIS_PORT}" + +# To prevent celery issue https://github.com/celery/celery/issues/4983 which causes +# clients to disconnect with this log: "Client ... scheduled to be closed ASAP for overcoming of output buffer limits." +# this workaround disables pub-sub mechanism. +# More details here: https://github.com/celery/celery/issues/4983#issuecomment-518302708 +class CustomBackend(RedisBackend): + def on_task_call(self, producer, task_id): + pass app = Celery('indexer', broker=CELERY_BROKER_URL, @@ -21,7 +30,7 @@ accept_content=['pickle'], task_serializer='pickle', result_serializer='pickle', - result_expires=3600, + result_expires=600, result_extended=True, worker_max_tasks_per_child=settings.indexer.max_tasks_per_child, # recreate worker process after every max_tasks_per_child tasks task_time_limit=settings.indexer.task_time_limit, diff --git a/scheduler/__main__.py b/scheduler/__main__.py index cdc6b44..7329834 100644 --- a/scheduler/__main__.py +++ b/scheduler/__main__.py @@ -139,6 +139,7 @@ async def _fetch_blocks_read_results(self): async_result = task.result() try: result = async_result.get() + async_result.forget() except BaseException as e: logger.error("Task {async_result} raised unknown exception: {exception}. Rescheduling the task's chunk.", async_result=async_result, exception=e) self.seqnos_to_process_queue.extendleft(async_result.args[0]) @@ -176,6 +177,7 @@ async def _insert_blocks_read_results(self): param = async_result.args[0] try: result = async_result.get() + async_result.forget() except BaseException as e: logger.error("Task {async_result} raised unknown exception: {exception}. Rescheduling the task's chunk.", async_result=async_result, exception=e) self.blocks_to_insert_queue.extendleft(param) @@ -233,6 +235,7 @@ async def _index_interfaces_read_results(self): param = async_result.args[0] try: result = async_result.get() + async_result.forget() except BaseException as e: logger.error("Task {async_result} raised unknown exception: {exception}. Rescheduling the task's chunk.", async_result=async_result, exception=e) self.blocks_to_index_interfaces_queue.extendleft(param) @@ -254,6 +257,7 @@ async def _check_liteserver_health(self): try: last_mc_block_async_result = await asyncify(get_last_mc_block, [], serializer='pickle', queue=self.celery_queue) last_mc_block_async_result.get() + last_mc_block_async_result.forget() except LiteServerTimeout: if self.is_liteserver_up or self.is_liteserver_up is None: logger.critical(f"Lite Server is not responding. Pausing indexing until it's not alive.") @@ -353,6 +357,7 @@ async def _get_new_blocks(self): last_mc_block_async_result = await asyncify(get_last_mc_block, [], serializer='pickle', queue=self.celery_queue) last_mc_block = last_mc_block_async_result.get() + last_mc_block_async_result.forget() if last_mc_block['seqno'] < self.current_seqno: await asyncio.sleep(0.2) continue