Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RESP3 modules support #2803

Merged
merged 13 commits into from
Jun 18, 2023
55 changes: 49 additions & 6 deletions redis/commands/bf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,22 @@ def __init__(self, client, **kwargs):
# CMS_INCRBY: spaceHolder,
# CMS_QUERY: spaceHolder,
CMS_MERGE: bool_ok,
}

RESP2_MODULE_CALLBACKS = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my opinion it would be better to follow Strategy pattern here to re-organise code a little bit.

Instead of maintaining all RESP2, RESP3 stuff in one file and call if self.client.connection_pool.connection_kwargs.get("protocol") in ["3", 3] condition along all code, it's better to keep it in separate objects f.e Resp2Strategy and Resp3Strategy.

With this approach we will keep this file as simple as possible and protocol independent. Also we avoid additional code changes in this file in case if protocol will be changed or new RESP4 will be introduced in future.

From code perspective it should looks something like this:

self.protocolStrategy = strategyResolver.resolve(
   self.client.connection_pool.connection_kwargs.get("protocol")
)

And now this:

if self.client.connection_pool.connection_kwargs.get("protocol") in ["3", 3]:
            MODULE_CALLBACKS.update(RESP3_MODULE_CALLBACKS)
        else:
            MODULE_CALLBACKS.update(RESP2_MODULE_CALLBACKS)

Turns into one liner like this:

MODULE_CALLBACKS.update(protocolStrategy.getCallbacks())

Let me know if it's makes sense for you and you wan't to discuss it or you have any questions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vladvildanov correct -this is in the when ready cleanup list - I'll share it your way.

CMS_INFO: CMSInfo,
}
RESP3_MODULE_CALLBACKS = {}

self.client = client
self.commandmixin = CMSCommands
self.execute_command = client.execute_command

if self.client.connection_pool.connection_kwargs.get("protocol") in ["3", 3]:
MODULE_CALLBACKS.update(RESP3_MODULE_CALLBACKS)
else:
MODULE_CALLBACKS.update(RESP2_MODULE_CALLBACKS)

for k, v in MODULE_CALLBACKS.items():
self.client.set_response_callback(k, v)

Expand All @@ -114,18 +123,27 @@ def __init__(self, client, **kwargs):
# Set the module commands' callbacks
MODULE_CALLBACKS = {
TOPK_RESERVE: bool_ok,
TOPK_ADD: parse_to_list,
TOPK_INCRBY: parse_to_list,
# TOPK_QUERY: spaceHolder,
# TOPK_COUNT: spaceHolder,
}

RESP2_MODULE_CALLBACKS = {
TOPK_ADD: parse_to_list,
TOPK_INCRBY: parse_to_list,
TOPK_LIST: parse_to_list,
TOPK_INFO: TopKInfo,
}
RESP3_MODULE_CALLBACKS = {}

self.client = client
self.commandmixin = TOPKCommands
self.execute_command = client.execute_command

if self.client.connection_pool.connection_kwargs.get("protocol") in ["3", 3]:
MODULE_CALLBACKS.update(RESP3_MODULE_CALLBACKS)
else:
MODULE_CALLBACKS.update(RESP2_MODULE_CALLBACKS)

for k, v in MODULE_CALLBACKS.items():
self.client.set_response_callback(k, v)

Expand All @@ -145,13 +163,22 @@ def __init__(self, client, **kwargs):
# CF_COUNT: spaceHolder,
# CF_SCANDUMP: spaceHolder,
# CF_LOADCHUNK: spaceHolder,
}

RESP2_MODULE_CALLBACKS = {
CF_INFO: CFInfo,
}
RESP3_MODULE_CALLBACKS = {}

self.client = client
self.commandmixin = CFCommands
self.execute_command = client.execute_command

if self.client.connection_pool.connection_kwargs.get("protocol") in ["3", 3]:
MODULE_CALLBACKS.update(RESP3_MODULE_CALLBACKS)
else:
MODULE_CALLBACKS.update(RESP2_MODULE_CALLBACKS)

for k, v in MODULE_CALLBACKS.items():
self.client.set_response_callback(k, v)

Expand All @@ -165,22 +192,29 @@ def __init__(self, client, **kwargs):
# TDIGEST_RESET: bool_ok,
# TDIGEST_ADD: spaceHolder,
# TDIGEST_MERGE: spaceHolder,
}

RESP2_MODULE_CALLBACKS = {
TDIGEST_BYRANK: parse_to_list,
TDIGEST_BYREVRANK: parse_to_list,
TDIGEST_CDF: parse_to_list,
TDIGEST_QUANTILE: parse_to_list,
TDIGEST_MIN: float,
TDIGEST_MAX: float,
TDIGEST_TRIMMED_MEAN: float,
TDIGEST_INFO: TDigestInfo,
TDIGEST_RANK: parse_to_list,
TDIGEST_REVRANK: parse_to_list,
TDIGEST_BYRANK: parse_to_list,
TDIGEST_BYREVRANK: parse_to_list,
}
RESP3_MODULE_CALLBACKS = {}

self.client = client
self.commandmixin = TDigestCommands
self.execute_command = client.execute_command

if self.client.connection_pool.connection_kwargs.get("protocol") in ["3", 3]:
MODULE_CALLBACKS.update(RESP3_MODULE_CALLBACKS)
else:
MODULE_CALLBACKS.update(RESP2_MODULE_CALLBACKS)

for k, v in MODULE_CALLBACKS.items():
self.client.set_response_callback(k, v)

Expand All @@ -199,12 +233,21 @@ def __init__(self, client, **kwargs):
# BF_SCANDUMP: spaceHolder,
# BF_LOADCHUNK: spaceHolder,
# BF_CARD: spaceHolder,
}

RESP2_MODULE_CALLBACKS = {
BF_INFO: BFInfo,
}
RESP3_MODULE_CALLBACKS = {}

self.client = client
self.commandmixin = BFCommands
self.execute_command = client.execute_command

if self.client.connection_pool.connection_kwargs.get("protocol") in ["3", 3]:
MODULE_CALLBACKS.update(RESP3_MODULE_CALLBACKS)
else:
MODULE_CALLBACKS.update(RESP2_MODULE_CALLBACKS)

for k, v in MODULE_CALLBACKS.items():
self.client.set_response_callback(k, v)
3 changes: 0 additions & 3 deletions redis/commands/bf/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
class BFCommands:
"""Bloom Filter commands."""

# region Bloom Filter Functions
def create(self, key, errorRate, capacity, expansion=None, noScale=None):
"""
Create a new Bloom Filter `key` with desired probability of false positives
Expand Down Expand Up @@ -178,7 +177,6 @@ def card(self, key):
class CFCommands:
"""Cuckoo Filter commands."""

# region Cuckoo Filter Functions
def create(
self, key, capacity, expansion=None, bucket_size=None, max_iterations=None
):
Expand Down Expand Up @@ -488,7 +486,6 @@ def byrevrank(self, key, rank, *ranks):
class CMSCommands:
"""Count-Min Sketch Commands"""

# region Count-Min Sketch Functions
def initbydim(self, key, width, depth):
"""
Initialize a Count-Min Sketch `key` to dimensions (`width`, `depth`) specified by user.
Expand Down
33 changes: 33 additions & 0 deletions redis/commands/bf/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@ def __init__(self, args):
self.insertedNum = response["Number of items inserted"]
self.expansionRate = response["Expansion rate"]

def get(self, item):
try:
return self.__getitem__(item)
except AttributeError:
return None

def __getitem__(self, item):
return getattr(self, item)


class CFInfo(object):
size = None
Expand All @@ -38,6 +47,15 @@ def __init__(self, args):
self.expansionRate = response["Expansion rate"]
self.maxIteration = response["Max iterations"]

def get(self, item):
try:
return self.__getitem__(item)
except AttributeError:
return None

def __getitem__(self, item):
return getattr(self, item)


class CMSInfo(object):
width = None
Expand All @@ -50,6 +68,9 @@ def __init__(self, args):
self.depth = response["depth"]
self.count = response["count"]

def __getitem__(self, item):
return getattr(self, item)


class TopKInfo(object):
k = None
Expand All @@ -64,6 +85,9 @@ def __init__(self, args):
self.depth = response["depth"]
self.decay = response["decay"]

def __getitem__(self, item):
return getattr(self, item)


class TDigestInfo(object):
compression = None
Expand All @@ -85,3 +109,12 @@ def __init__(self, args):
self.unmerged_weight = response["Unmerged weight"]
self.total_compressions = response["Total compressions"]
self.memory_usage = response["Memory usage"]

def get(self, item):
try:
return self.__getitem__(item)
except AttributeError:
return None

def __getitem__(self, item):
return getattr(self, item)
43 changes: 30 additions & 13 deletions redis/commands/json/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,33 +32,50 @@ def __init__(
"""
# Set the module commands' callbacks
self.MODULE_CALLBACKS = {
"JSON.CLEAR": int,
"JSON.DEL": int,
"JSON.FORGET": int,
"JSON.GET": self._decode,
"JSON.ARRPOP": self._decode,
"JSON.MGET": bulk_of_jsons(self._decode),
"JSON.SET": lambda r: r and nativestr(r) == "OK",
"JSON.NUMINCRBY": self._decode,
"JSON.NUMMULTBY": self._decode,
"JSON.DEBUG": self._decode,
"JSON.TOGGLE": self._decode,
"JSON.STRAPPEND": self._decode,
"JSON.STRLEN": self._decode,
"JSON.RESP": self._decode,
}

RESP2_MODULE_CALLBACKS = {
"JSON.ARRTRIM": self._decode,
"JSON.OBJLEN": self._decode,
"JSON.ARRAPPEND": self._decode,
"JSON.ARRINDEX": self._decode,
"JSON.ARRINSERT": self._decode,
"JSON.TOGGLE": self._decode,
"JSON.STRAPPEND": self._decode,
"JSON.STRLEN": self._decode,
"JSON.ARRLEN": self._decode,
"JSON.ARRPOP": self._decode,
"JSON.ARRTRIM": self._decode,
"JSON.OBJLEN": self._decode,
"JSON.CLEAR": int,
"JSON.DEL": int,
"JSON.FORGET": int,
"JSON.NUMINCRBY": self._decode,
"JSON.NUMMULTBY": self._decode,
"JSON.OBJKEYS": self._decode,
"JSON.RESP": self._decode,
"JSON.DEBUG": self._decode,
"JSON.GET": self._decode,
}

RESP3_MODULE_CALLBACKS = {
"JSON.GET": lambda response: [
[self._decode(r) for r in res] for res in response
]
if response
else response
}

self.client = client
self.execute_command = client.execute_command
self.MODULE_VERSION = version

if self.client.connection_pool.connection_kwargs.get("protocol") in ["3", 3]:
self.MODULE_CALLBACKS.update(RESP3_MODULE_CALLBACKS)
else:
self.MODULE_CALLBACKS.update(RESP2_MODULE_CALLBACKS)

for key, value in self.MODULE_CALLBACKS.items():
self.client.set_response_callback(key, value)

Expand Down
21 changes: 20 additions & 1 deletion redis/commands/search/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
import redis

from ...asyncio.client import Pipeline as AsyncioPipeline
from .commands import AsyncSearchCommands, SearchCommands
from .commands import (
AGGREGATE_CMD,
CONFIG_CMD,
INFO_CMD,
PROFILE_CMD,
SEARCH_CMD,
SPELLCHECK_CMD,
SYNDUMP_CMD,
AsyncSearchCommands,
SearchCommands,
)


class Search(SearchCommands):
Expand Down Expand Up @@ -90,6 +100,15 @@ def __init__(self, client, index_name="idx"):
self.index_name = index_name
self.execute_command = client.execute_command
self._pipeline = client.pipeline
self.RESP2_MODULE_CALLBACKS = {
INFO_CMD: self._parse_info,
SEARCH_CMD: self._parse_search,
AGGREGATE_CMD: self._parse_aggregate,
PROFILE_CMD: self._parse_profile,
SPELLCHECK_CMD: self._parse_spellcheck,
CONFIG_CMD: self._parse_config_get,
SYNDUMP_CMD: self._parse_syndump,
}

def pipeline(self, transaction=True, shard_hint=None):
"""Creates a pipeline for the SEARCH module, that can be used for executing
Expand Down
Loading