Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use hiredis::pack_command to serialized the commands. #2570

Merged
merged 12 commits into from
Feb 6, 2023
1 change: 1 addition & 0 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
* Use hiredis-py pack_command if available.
* Support `.unlink()` in ClusterPipeline
* Simplify synchronous SocketBuffer state management
* Fix string cleanse in Redis Graph
Expand Down
135 changes: 90 additions & 45 deletions redis/connection.py
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io
import os
import socket
import sys
import threading
import weakref
from io import SEEK_END
Expand Down Expand Up @@ -32,7 +33,12 @@
TimeoutError,
)
from redis.retry import Retry
from redis.utils import CRYPTOGRAPHY_AVAILABLE, HIREDIS_AVAILABLE, str_if_bytes
from redis.utils import (
CRYPTOGRAPHY_AVAILABLE,
HIREDIS_AVAILABLE,
HIREDIS_PACK_AVAILABLE,
str_if_bytes,
)

try:
import ssl
Expand Down Expand Up @@ -509,6 +515,75 @@ def read_response(self, disable_decoding=False):
DefaultParser = PythonParser


class HiredisRespSerializer:
def pack(self, *args):
"""Pack a series of arguments into the Redis protocol"""
output = []

if isinstance(args[0], str):
args = tuple(args[0].encode().split()) + args[1:]
elif b" " in args[0]:
args = tuple(args[0].split()) + args[1:]
try:
output.append(hiredis.pack_command(args))
except TypeError:
_, value, traceback = sys.exc_info()
raise DataError(value).with_traceback(traceback)

return output


class PythonRespSerializer:
def __init__(self, buffer_cutoff, encode) -> None:
self._buffer_cutoff = buffer_cutoff
self.encode = encode

def pack(self, *args):
"""Pack a series of arguments into the Redis protocol"""
output = []
# the client might have included 1 or more literal arguments in
# the command name, e.g., 'CONFIG GET'. The Redis server expects these
# arguments to be sent separately, so split the first argument
# manually. These arguments should be bytestrings so that they are
# not encoded.
if isinstance(args[0], str):
args = tuple(args[0].encode().split()) + args[1:]
elif b" " in args[0]:
args = tuple(args[0].split()) + args[1:]

buff = SYM_EMPTY.join((SYM_STAR, str(len(args)).encode(), SYM_CRLF))

buffer_cutoff = self._buffer_cutoff
for arg in map(self.encode, args):
# to avoid large string mallocs, chunk the command into the
# output list if we're sending large values or memoryviews
arg_length = len(arg)
if (
len(buff) > buffer_cutoff
or arg_length > buffer_cutoff
or isinstance(arg, memoryview)
):
buff = SYM_EMPTY.join(
(buff, SYM_DOLLAR, str(arg_length).encode(), SYM_CRLF)
)
output.append(buff)
output.append(arg)
buff = SYM_CRLF
else:
buff = SYM_EMPTY.join(
(
buff,
SYM_DOLLAR,
str(arg_length).encode(),
SYM_CRLF,
arg,
SYM_CRLF,
)
)
output.append(buff)
return output


class Connection:
"Manages TCP communication to and from a Redis server"

Expand Down Expand Up @@ -536,6 +611,7 @@ def __init__(
retry=None,
redis_connect_func=None,
credential_provider: Optional[CredentialProvider] = None,
command_packer=None,
):
"""
Initialize a new Connection.
Expand Down Expand Up @@ -590,6 +666,7 @@ def __init__(
self.set_parser(parser_class)
self._connect_callbacks = []
self._buffer_cutoff = 6000
self._command_packer = self._construct_command_packer(command_packer)

def __repr__(self):
repr_args = ",".join([f"{k}={v}" for k, v in self.repr_pieces()])
Expand All @@ -607,6 +684,14 @@ def __del__(self):
except Exception:
pass

def _construct_command_packer(self, packer):
if packer is not None:
return packer
elif HIREDIS_PACK_AVAILABLE:
return HiredisRespSerializer()
else:
return PythonRespSerializer(self._buffer_cutoff, self.encoder.encode)
Comment on lines +688 to +693

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems that elif and else are not needed:

if packer is not None:
    return packer

if HIREDIS_PACK_AVAILABLE:
    return HiredisRespSerializer()

return PythonRespSerializer(self._buffer_cutoff, self.encoder.encode)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically - yes, we can get rid of else but , my understanding of general sentiment regarding elif in Python is: it's cleaner way to show that conditions intended to be mutually exclusive.


def register_connect_callback(self, callback):
self._connect_callbacks.append(weakref.WeakMethod(callback))

Expand Down Expand Up @@ -827,7 +912,8 @@ def send_packed_command(self, command, check_health=True):
def send_command(self, *args, **kwargs):
"""Pack and send a command to the Redis server"""
self.send_packed_command(
self.pack_command(*args), check_health=kwargs.get("check_health", True)
self._command_packer.pack(*args),
check_health=kwargs.get("check_health", True),
)

def can_read(self, timeout=0):
Expand Down Expand Up @@ -872,48 +958,7 @@ def read_response(self, disable_decoding=False):

def pack_command(self, *args):
"""Pack a series of arguments into the Redis protocol"""
output = []
# the client might have included 1 or more literal arguments in
# the command name, e.g., 'CONFIG GET'. The Redis server expects these
# arguments to be sent separately, so split the first argument
# manually. These arguments should be bytestrings so that they are
# not encoded.
if isinstance(args[0], str):
args = tuple(args[0].encode().split()) + args[1:]
elif b" " in args[0]:
args = tuple(args[0].split()) + args[1:]

buff = SYM_EMPTY.join((SYM_STAR, str(len(args)).encode(), SYM_CRLF))

buffer_cutoff = self._buffer_cutoff
for arg in map(self.encoder.encode, args):
# to avoid large string mallocs, chunk the command into the
# output list if we're sending large values or memoryviews
arg_length = len(arg)
if (
len(buff) > buffer_cutoff
or arg_length > buffer_cutoff
or isinstance(arg, memoryview)
):
buff = SYM_EMPTY.join(
(buff, SYM_DOLLAR, str(arg_length).encode(), SYM_CRLF)
)
output.append(buff)
output.append(arg)
buff = SYM_CRLF
else:
buff = SYM_EMPTY.join(
(
buff,
SYM_DOLLAR,
str(arg_length).encode(),
SYM_CRLF,
arg,
SYM_CRLF,
)
)
output.append(buff)
return output
return self._command_packer.pack(*args)

def pack_commands(self, commands):
"""Pack multiple commands into the Redis protocol"""
Expand All @@ -923,7 +968,7 @@ def pack_commands(self, commands):
buffer_cutoff = self._buffer_cutoff

for cmd in commands:
for chunk in self.pack_command(*cmd):
for chunk in self._command_packer.pack(*cmd):
chunklen = len(chunk)
if (
buffer_length > buffer_cutoff
Expand Down
2 changes: 2 additions & 0 deletions redis/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@

# Only support Hiredis >= 1.0:
HIREDIS_AVAILABLE = not hiredis.__version__.startswith("0.")
HIREDIS_PACK_AVAILABLE = hasattr(hiredis, "pack_command")
except ImportError:
HIREDIS_AVAILABLE = False
HIREDIS_PACK_AVAILABLE = False

try:
import cryptography # noqa
Expand Down
5 changes: 5 additions & 0 deletions tests/test_encoding.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import redis
from redis.connection import Connection
from redis.utils import HIREDIS_PACK_AVAILABLE

from .conftest import _get_client

Expand Down Expand Up @@ -75,6 +76,10 @@ def test_replace(self, request):
assert r.get("a") == "foo\ufffd"


@pytest.mark.skipif(
HIREDIS_PACK_AVAILABLE,
reason="Packing via hiredis does not preserve memoryviews",
)
class TestMemoryviewsAreNotPacked:
def test_memoryviews_are_not_packed(self):
c = Connection()
Expand Down