Skip to content

Commit

Permalink
Merge pull request #2149 from certtools/enh-1827
Browse files Browse the repository at this point in the history
FIX: Use redis BLMOVE instead of BRPOPLPUSH (deprecated)
  • Loading branch information
sebix authored Jul 25, 2022
2 parents 28ccf11 + d6a09f0 commit 60b2f07
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 2 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ CHANGELOG
- Enhance behaviour if an unconfigured bot is started (PR#2054 by Sebastian Wagner).
- Fix line recovery and message dumping of the `ParserBot` (PR#2192 by Sebastian Wagner).
- Previously the dumped message was always the last message of a report if the report contained muliple lines leading to data-loss.
- `intelmq.lib.pipeline`:
- Changed `BRPOPLPUSH` to `BLMOVE`, because `BRPOPLPUSH` has been marked as deprecated by redis in favor of `BLMOVE` (PR#2149 by Sebastian Waldbauer, fixes #1827)

### Development

Expand Down
8 changes: 6 additions & 2 deletions intelmq/lib/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ class Redis(Pipeline):
destination_pipeline_db = 2
source_pipeline_password = None
destination_pipeline_password = None
_redis_server_version = None

def load_configurations(self, queues_type):
self.host = self.pipeline_args.get(f"{queues_type}_pipeline_host", "127.0.0.1")
Expand Down Expand Up @@ -213,6 +214,7 @@ def connect(self):
kwargs['single_connection_client'] = True

self.pipe = redis.Redis(db=self.db, password=self.password, **kwargs)
self._redis_server_version = tuple(int(x) for x in self.pipe.execute_command('INFO')['redis_version'].split('.'))

def disconnect(self):
pass
Expand Down Expand Up @@ -261,8 +263,10 @@ def _receive(self) -> bytes:
else:
break
if not retval:
retval = self.pipe.brpoplpush(self.source_queue,
self.internal_queue, 0)
if self._redis_server_version >= (6, 2, 0):
retval = self.pipe.blmove(self.source_queue, self.internal_queue, 0, 'RIGHT', 'LEFT')
else:
retval = self.pipe.brpoplpush(self.source_queue, self.internal_queue, 0)
except Exception as exc:
raise exceptions.PipelineError(exc)
else:
Expand Down

0 comments on commit 60b2f07

Please sign in to comment.