Skip to content

Commit

Permalink
RESP3 modules support (#2803)
Browse files Browse the repository at this point in the history
* start cleaning

* clean sone callbacks

* response callbacks

* modules

* tests

* finish sync search tests

* linters

* async modules

* linters

* revert redismod-url change
  • Loading branch information
dvora-h authored Jun 18, 2023
1 parent 2a935eb commit e4faf3a
Show file tree
Hide file tree
Showing 16 changed files with 3,460 additions and 1,579 deletions.
55 changes: 49 additions & 6 deletions redis/commands/bf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,22 @@ def __init__(self, client, **kwargs):
# CMS_INCRBY: spaceHolder,
# CMS_QUERY: spaceHolder,
CMS_MERGE: bool_ok,
}

RESP2_MODULE_CALLBACKS = {
CMS_INFO: CMSInfo,
}
RESP3_MODULE_CALLBACKS = {}

self.client = client
self.commandmixin = CMSCommands
self.execute_command = client.execute_command

if self.client.connection_pool.connection_kwargs.get("protocol") in ["3", 3]:
MODULE_CALLBACKS.update(RESP3_MODULE_CALLBACKS)
else:
MODULE_CALLBACKS.update(RESP2_MODULE_CALLBACKS)

for k, v in MODULE_CALLBACKS.items():
self.client.set_response_callback(k, v)

Expand All @@ -114,18 +123,27 @@ def __init__(self, client, **kwargs):
# Set the module commands' callbacks
MODULE_CALLBACKS = {
TOPK_RESERVE: bool_ok,
TOPK_ADD: parse_to_list,
TOPK_INCRBY: parse_to_list,
# TOPK_QUERY: spaceHolder,
# TOPK_COUNT: spaceHolder,
}

RESP2_MODULE_CALLBACKS = {
TOPK_ADD: parse_to_list,
TOPK_INCRBY: parse_to_list,
TOPK_LIST: parse_to_list,
TOPK_INFO: TopKInfo,
}
RESP3_MODULE_CALLBACKS = {}

self.client = client
self.commandmixin = TOPKCommands
self.execute_command = client.execute_command

if self.client.connection_pool.connection_kwargs.get("protocol") in ["3", 3]:
MODULE_CALLBACKS.update(RESP3_MODULE_CALLBACKS)
else:
MODULE_CALLBACKS.update(RESP2_MODULE_CALLBACKS)

for k, v in MODULE_CALLBACKS.items():
self.client.set_response_callback(k, v)

Expand All @@ -145,13 +163,22 @@ def __init__(self, client, **kwargs):
# CF_COUNT: spaceHolder,
# CF_SCANDUMP: spaceHolder,
# CF_LOADCHUNK: spaceHolder,
}

RESP2_MODULE_CALLBACKS = {
CF_INFO: CFInfo,
}
RESP3_MODULE_CALLBACKS = {}

self.client = client
self.commandmixin = CFCommands
self.execute_command = client.execute_command

if self.client.connection_pool.connection_kwargs.get("protocol") in ["3", 3]:
MODULE_CALLBACKS.update(RESP3_MODULE_CALLBACKS)
else:
MODULE_CALLBACKS.update(RESP2_MODULE_CALLBACKS)

for k, v in MODULE_CALLBACKS.items():
self.client.set_response_callback(k, v)

Expand All @@ -165,22 +192,29 @@ def __init__(self, client, **kwargs):
# TDIGEST_RESET: bool_ok,
# TDIGEST_ADD: spaceHolder,
# TDIGEST_MERGE: spaceHolder,
}

RESP2_MODULE_CALLBACKS = {
TDIGEST_BYRANK: parse_to_list,
TDIGEST_BYREVRANK: parse_to_list,
TDIGEST_CDF: parse_to_list,
TDIGEST_QUANTILE: parse_to_list,
TDIGEST_MIN: float,
TDIGEST_MAX: float,
TDIGEST_TRIMMED_MEAN: float,
TDIGEST_INFO: TDigestInfo,
TDIGEST_RANK: parse_to_list,
TDIGEST_REVRANK: parse_to_list,
TDIGEST_BYRANK: parse_to_list,
TDIGEST_BYREVRANK: parse_to_list,
}
RESP3_MODULE_CALLBACKS = {}

self.client = client
self.commandmixin = TDigestCommands
self.execute_command = client.execute_command

if self.client.connection_pool.connection_kwargs.get("protocol") in ["3", 3]:
MODULE_CALLBACKS.update(RESP3_MODULE_CALLBACKS)
else:
MODULE_CALLBACKS.update(RESP2_MODULE_CALLBACKS)

for k, v in MODULE_CALLBACKS.items():
self.client.set_response_callback(k, v)

Expand All @@ -199,12 +233,21 @@ def __init__(self, client, **kwargs):
# BF_SCANDUMP: spaceHolder,
# BF_LOADCHUNK: spaceHolder,
# BF_CARD: spaceHolder,
}

RESP2_MODULE_CALLBACKS = {
BF_INFO: BFInfo,
}
RESP3_MODULE_CALLBACKS = {}

self.client = client
self.commandmixin = BFCommands
self.execute_command = client.execute_command

if self.client.connection_pool.connection_kwargs.get("protocol") in ["3", 3]:
MODULE_CALLBACKS.update(RESP3_MODULE_CALLBACKS)
else:
MODULE_CALLBACKS.update(RESP2_MODULE_CALLBACKS)

for k, v in MODULE_CALLBACKS.items():
self.client.set_response_callback(k, v)
3 changes: 0 additions & 3 deletions redis/commands/bf/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
class BFCommands:
"""Bloom Filter commands."""

# region Bloom Filter Functions
def create(self, key, errorRate, capacity, expansion=None, noScale=None):
"""
Create a new Bloom Filter `key` with desired probability of false positives
Expand Down Expand Up @@ -178,7 +177,6 @@ def card(self, key):
class CFCommands:
"""Cuckoo Filter commands."""

# region Cuckoo Filter Functions
def create(
self, key, capacity, expansion=None, bucket_size=None, max_iterations=None
):
Expand Down Expand Up @@ -488,7 +486,6 @@ def byrevrank(self, key, rank, *ranks):
class CMSCommands:
"""Count-Min Sketch Commands"""

# region Count-Min Sketch Functions
def initbydim(self, key, width, depth):
"""
Initialize a Count-Min Sketch `key` to dimensions (`width`, `depth`) specified by user.
Expand Down
33 changes: 33 additions & 0 deletions redis/commands/bf/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@ def __init__(self, args):
self.insertedNum = response["Number of items inserted"]
self.expansionRate = response["Expansion rate"]

def get(self, item):
try:
return self.__getitem__(item)
except AttributeError:
return None

def __getitem__(self, item):
return getattr(self, item)


class CFInfo(object):
size = None
Expand All @@ -38,6 +47,15 @@ def __init__(self, args):
self.expansionRate = response["Expansion rate"]
self.maxIteration = response["Max iterations"]

def get(self, item):
try:
return self.__getitem__(item)
except AttributeError:
return None

def __getitem__(self, item):
return getattr(self, item)


class CMSInfo(object):
width = None
Expand All @@ -50,6 +68,9 @@ def __init__(self, args):
self.depth = response["depth"]
self.count = response["count"]

def __getitem__(self, item):
return getattr(self, item)


class TopKInfo(object):
k = None
Expand All @@ -64,6 +85,9 @@ def __init__(self, args):
self.depth = response["depth"]
self.decay = response["decay"]

def __getitem__(self, item):
return getattr(self, item)


class TDigestInfo(object):
compression = None
Expand All @@ -85,3 +109,12 @@ def __init__(self, args):
self.unmerged_weight = response["Unmerged weight"]
self.total_compressions = response["Total compressions"]
self.memory_usage = response["Memory usage"]

def get(self, item):
try:
return self.__getitem__(item)
except AttributeError:
return None

def __getitem__(self, item):
return getattr(self, item)
43 changes: 30 additions & 13 deletions redis/commands/json/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,33 +32,50 @@ def __init__(
"""
# Set the module commands' callbacks
self.MODULE_CALLBACKS = {
"JSON.CLEAR": int,
"JSON.DEL": int,
"JSON.FORGET": int,
"JSON.GET": self._decode,
"JSON.ARRPOP": self._decode,
"JSON.MGET": bulk_of_jsons(self._decode),
"JSON.SET": lambda r: r and nativestr(r) == "OK",
"JSON.NUMINCRBY": self._decode,
"JSON.NUMMULTBY": self._decode,
"JSON.DEBUG": self._decode,
"JSON.TOGGLE": self._decode,
"JSON.STRAPPEND": self._decode,
"JSON.STRLEN": self._decode,
"JSON.RESP": self._decode,
}

RESP2_MODULE_CALLBACKS = {
"JSON.ARRTRIM": self._decode,
"JSON.OBJLEN": self._decode,
"JSON.ARRAPPEND": self._decode,
"JSON.ARRINDEX": self._decode,
"JSON.ARRINSERT": self._decode,
"JSON.TOGGLE": self._decode,
"JSON.STRAPPEND": self._decode,
"JSON.STRLEN": self._decode,
"JSON.ARRLEN": self._decode,
"JSON.ARRPOP": self._decode,
"JSON.ARRTRIM": self._decode,
"JSON.OBJLEN": self._decode,
"JSON.CLEAR": int,
"JSON.DEL": int,
"JSON.FORGET": int,
"JSON.NUMINCRBY": self._decode,
"JSON.NUMMULTBY": self._decode,
"JSON.OBJKEYS": self._decode,
"JSON.RESP": self._decode,
"JSON.DEBUG": self._decode,
"JSON.GET": self._decode,
}

RESP3_MODULE_CALLBACKS = {
"JSON.GET": lambda response: [
[self._decode(r) for r in res] for res in response
]
if response
else response
}

self.client = client
self.execute_command = client.execute_command
self.MODULE_VERSION = version

if self.client.connection_pool.connection_kwargs.get("protocol") in ["3", 3]:
self.MODULE_CALLBACKS.update(RESP3_MODULE_CALLBACKS)
else:
self.MODULE_CALLBACKS.update(RESP2_MODULE_CALLBACKS)

for key, value in self.MODULE_CALLBACKS.items():
self.client.set_response_callback(key, value)

Expand Down
21 changes: 20 additions & 1 deletion redis/commands/search/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
import redis

from ...asyncio.client import Pipeline as AsyncioPipeline
from .commands import AsyncSearchCommands, SearchCommands
from .commands import (
AGGREGATE_CMD,
CONFIG_CMD,
INFO_CMD,
PROFILE_CMD,
SEARCH_CMD,
SPELLCHECK_CMD,
SYNDUMP_CMD,
AsyncSearchCommands,
SearchCommands,
)


class Search(SearchCommands):
Expand Down Expand Up @@ -90,6 +100,15 @@ def __init__(self, client, index_name="idx"):
self.index_name = index_name
self.execute_command = client.execute_command
self._pipeline = client.pipeline
self.RESP2_MODULE_CALLBACKS = {
INFO_CMD: self._parse_info,
SEARCH_CMD: self._parse_search,
AGGREGATE_CMD: self._parse_aggregate,
PROFILE_CMD: self._parse_profile,
SPELLCHECK_CMD: self._parse_spellcheck,
CONFIG_CMD: self._parse_config_get,
SYNDUMP_CMD: self._parse_syndump,
}

def pipeline(self, transaction=True, shard_hint=None):
"""Creates a pipeline for the SEARCH module, that can be used for executing
Expand Down
Loading

0 comments on commit e4faf3a

Please sign in to comment.