Skip to content

Commit

Permalink
Disable Celery pub-sub mechanism with Redis backend
Browse files Browse the repository at this point in the history
  • Loading branch information
dungeon-master-666 authored Mar 24, 2023
1 parent dc6f2aa commit 07bb15a
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 2 deletions.
13 changes: 11 additions & 2 deletions indexer/celery.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
from celery import Celery
from celery.backends.redis import RedisBackend
from config import settings

RABBITMQ_HOST = os.getenv("RABBITMQ_HOST")
Expand All @@ -9,7 +10,15 @@
REDIS_PORT = os.getenv("REDIS_PORT")

CELERY_BROKER_URL = f"amqp://{RABBITMQ_HOST}:{RABBITMQ_PORT}"
CELERY_BACKEND_URL = f"redis://{REDIS_HOST}:{REDIS_PORT}"
CELERY_BACKEND_URL = f"indexer.celery.CustomBackend://{REDIS_HOST}:{REDIS_PORT}"

# To prevent celery issue https://github.com/celery/celery/issues/4983 which causes
# clients to disconnect with this log: "Client ... scheduled to be closed ASAP for overcoming of output buffer limits."
# this workaround disables pub-sub mechanism.
# More details here: https://github.com/celery/celery/issues/4983#issuecomment-518302708
class CustomBackend(RedisBackend):
def on_task_call(self, producer, task_id):
pass

app = Celery('indexer',
broker=CELERY_BROKER_URL,
Expand All @@ -21,7 +30,7 @@
accept_content=['pickle'],
task_serializer='pickle',
result_serializer='pickle',
result_expires=3600,
result_expires=600,
result_extended=True,
worker_max_tasks_per_child=settings.indexer.max_tasks_per_child, # recreate worker process after every max_tasks_per_child tasks
task_time_limit=settings.indexer.task_time_limit,
Expand Down
5 changes: 5 additions & 0 deletions scheduler/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ async def _fetch_blocks_read_results(self):
async_result = task.result()
try:
result = async_result.get()
async_result.forget()
except BaseException as e:
logger.error("Task {async_result} raised unknown exception: {exception}. Rescheduling the task's chunk.", async_result=async_result, exception=e)
self.seqnos_to_process_queue.extendleft(async_result.args[0])
Expand Down Expand Up @@ -176,6 +177,7 @@ async def _insert_blocks_read_results(self):
param = async_result.args[0]
try:
result = async_result.get()
async_result.forget()
except BaseException as e:
logger.error("Task {async_result} raised unknown exception: {exception}. Rescheduling the task's chunk.", async_result=async_result, exception=e)
self.blocks_to_insert_queue.extendleft(param)
Expand Down Expand Up @@ -233,6 +235,7 @@ async def _index_interfaces_read_results(self):
param = async_result.args[0]
try:
result = async_result.get()
async_result.forget()
except BaseException as e:
logger.error("Task {async_result} raised unknown exception: {exception}. Rescheduling the task's chunk.", async_result=async_result, exception=e)
self.blocks_to_index_interfaces_queue.extendleft(param)
Expand All @@ -254,6 +257,7 @@ async def _check_liteserver_health(self):
try:
last_mc_block_async_result = await asyncify(get_last_mc_block, [], serializer='pickle', queue=self.celery_queue)
last_mc_block_async_result.get()
last_mc_block_async_result.forget()
except LiteServerTimeout:
if self.is_liteserver_up or self.is_liteserver_up is None:
logger.critical(f"Lite Server is not responding. Pausing indexing until it's not alive.")
Expand Down Expand Up @@ -353,6 +357,7 @@ async def _get_new_blocks(self):

last_mc_block_async_result = await asyncify(get_last_mc_block, [], serializer='pickle', queue=self.celery_queue)
last_mc_block = last_mc_block_async_result.get()
last_mc_block_async_result.forget()
if last_mc_block['seqno'] < self.current_seqno:
await asyncio.sleep(0.2)
continue
Expand Down

0 comments on commit 07bb15a

Please sign in to comment.