From 8c8702bba5ed78eb553e3b433957a84c3ec73550 Mon Sep 17 00:00:00 2001 From: Sebastian Waldbauer Date: Thu, 3 Feb 2022 16:05:33 +0100 Subject: [PATCH] FIX: Use redis BLMOVE instead of BRPOPLPUSH (deprecated) Fixes #1827 Signed-off-by: Sebastian Waldbauer --- CHANGELOG.md | 2 ++ intelmq/lib/pipeline.py | 8 ++++++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 934b9405f..65f538932 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,8 @@ CHANGELOG The `LogLevel` and `ReturnType` Enums were added to `intelmq.lib.datatypes`. - `intelmq.lib.bot`: - Enhance behaviour if an unconfigured bot is started (PR#2054 by Sebastian Wagner). +- `intelmq.lib.pipeline`: + - Changed `BRPOPLPUSH` to `BLMOVE`, because `BRPOPLPUSH` has been marked as deprecated by redis in favor of `BLMOVE` (PR#2149 by Sebastian Waldbauer, fixes #1827) ### Development diff --git a/intelmq/lib/pipeline.py b/intelmq/lib/pipeline.py index cf58b453c..a95ea908d 100644 --- a/intelmq/lib/pipeline.py +++ b/intelmq/lib/pipeline.py @@ -182,6 +182,7 @@ class Redis(Pipeline): destination_pipeline_db = 2 source_pipeline_password = None destination_pipeline_password = None + _redis_server_version = None def load_configurations(self, queues_type): self.host = self.pipeline_args.get(f"{queues_type}_pipeline_host", "127.0.0.1") @@ -213,6 +214,7 @@ def connect(self): kwargs['single_connection_client'] = True self.pipe = redis.Redis(db=self.db, password=self.password, **kwargs) + self._redis_server_version = tuple(int(x) for x in self.pipe.execute_command('INFO')['redis_version'].split('.')) def disconnect(self): pass @@ -261,8 +263,10 @@ def _receive(self) -> bytes: else: break if not retval: - retval = self.pipe.brpoplpush(self.source_queue, - self.internal_queue, 0) + if self._redis_server_version >= (6, 2, 0): + retval = self.pipe.blmove(self.source_queue, self.internal_queue, 0, 'RIGHT', 'LEFT') + else: + retval = self.pipe.brpoplpush(self.source_queue, self.internal_queue, 0) except Exception as exc: raise exceptions.PipelineError(exc) else: