Skip to content

Commit

Permalink
Merge branch 'master' into command-list
Browse files Browse the repository at this point in the history
  • Loading branch information
dvora-h committed May 3, 2022
2 parents 43ed1fa + fa7b3f6 commit a830157
Show file tree
Hide file tree
Showing 25 changed files with 7,058 additions and 47 deletions.
15 changes: 3 additions & 12 deletions .github/workflows/integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,21 @@ jobs:
uses: actions/setup-python@v3
with:
python-version: 3.9
cache: 'pip'
- name: run code linters
run: |
pip install -r dev_requirements.txt
invoke linters
run-tests:
runs-on: ubuntu-latest
continue-on-error: ${{ matrix.experimental }}
timeout-minutes: 30
strategy:
max-parallel: 15
matrix:
python-version: ['3.6', '3.7', '3.8', '3.9', '3.10', 'pypy-3.7']
test-type: ['standalone', 'cluster']
connection-type: ['hiredis', 'plain']
experimental: [false]
include:
- python-version: 3.11.0-alpha.6
experimental: true
test-type: standalone
connection-type: plain
env:
ACTIONS_ALLOW_UNSECURE_COMMANDS: true
name: Python ${{ matrix.python-version }} ${{matrix.test-type}}-${{matrix.connection-type}} tests
Expand All @@ -55,6 +49,7 @@ jobs:
uses: actions/setup-python@v3
with:
python-version: ${{ matrix.python-version }}
cache: 'pip'
- name: run tests
run: |
pip install -U setuptools wheel
Expand Down Expand Up @@ -83,22 +78,18 @@ jobs:
bash .github/workflows/install_and_test.sh ${{ matrix.extension }}
install_package_from_commit:
continue-on-error: ${{ matrix.experimental }}
name: Install package from commit hash
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ['3.6', '3.7', '3.8', '3.9', '3.10', 'pypy-3.7']
experimental: [false]
include:
- python-version: 3.11.0-alpha.5
- experimental: true
steps:
- uses: actions/checkout@v2
- name: install python ${{ matrix.python-version }}
uses: actions/setup-python@v3
with:
python-version: ${{ matrix.python-version }}
cache: 'pip'
- name: install from pip
run: |
pip install --quiet git+${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY}.git@${GITHUB_SHA}
2 changes: 2 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@

* Allow negative `retries` for `Retry` class to retry forever
* Add `items` parameter to `hset` signature
* Create codeql-analysis.yml (#1988). Thanks @chayim
* Add limited support for Lua scripting with RedisCluster
Expand All @@ -7,6 +8,7 @@
* Fix scan_iter for RedisCluster
* Remove verbose logging when initializing ClusterPubSub, ClusterPipeline or RedisCluster
* Fix broken connection writer lock-up for asyncio (#2065)
* Fix auth bug when provided with no username (#2086)

* 4.1.3 (Feb 8, 2022)
* Fix flushdb and flushall (#1926)
Expand Down
4 changes: 4 additions & 0 deletions docker/redis7/master/redis.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
port 6379
save ""
enable-debug-command yes
enable-module-command yes
19 changes: 14 additions & 5 deletions redis/asyncio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@
)
from redis.commands import (
AsyncCoreCommands,
AsyncRedisModuleCommands,
AsyncSentinelCommands,
RedisModuleCommands,
list_or_args,
)
from redis.compat import Protocol, TypedDict
Expand Down Expand Up @@ -81,7 +81,7 @@ async def __call__(self, response: Any, **kwargs):


class Redis(
AbstractRedis, RedisModuleCommands, AsyncCoreCommands, AsyncSentinelCommands
AbstractRedis, AsyncRedisModuleCommands, AsyncCoreCommands, AsyncSentinelCommands
):
"""
Implementation of the Redis protocol.
Expand Down Expand Up @@ -693,16 +693,24 @@ async def execute_command(self, *args: EncodableT):
# legitimate message off the stack if the connection is already
# subscribed to one or more channels

await self.connect()
connection = self.connection
kwargs = {"check_health": not self.subscribed}
await self._execute(connection, connection.send_command, *args, **kwargs)

async def connect(self):
"""
Ensure that the PubSub is connected
"""
if self.connection is None:
self.connection = await self.connection_pool.get_connection(
"pubsub", self.shard_hint
)
# register a callback that re-subscribes to any channels we
# were listening to when we were disconnected
self.connection.register_connect_callback(self.on_connect)
connection = self.connection
kwargs = {"check_health": not self.subscribed}
await self._execute(connection, connection.send_command, *args, **kwargs)
else:
await self.connection.connect()

async def _disconnect_raise_connect(self, conn, error):
"""
Expand Down Expand Up @@ -962,6 +970,7 @@ async def run(
if handler is None:
raise PubSubError(f"Pattern: '{pattern}' has no handler registered")

await self.connect()
while True:
try:
await self.get_message(
Expand Down
6 changes: 3 additions & 3 deletions redis/asyncio/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import threading
import uuid
from types import SimpleNamespace
from typing import TYPE_CHECKING, Awaitable, NoReturn, Optional, Union
from typing import TYPE_CHECKING, Awaitable, Optional, Union

from redis.exceptions import LockError, LockNotOwnedError

Expand Down Expand Up @@ -243,15 +243,15 @@ async def owned(self) -> bool:
stored_token = encoder.encode(stored_token)
return self.local.token is not None and stored_token == self.local.token

def release(self) -> Awaitable[NoReturn]:
def release(self) -> Awaitable[None]:
"""Releases the already acquired lock"""
expected_token = self.local.token
if expected_token is None:
raise LockError("Cannot release an unlocked lock")
self.local.token = None
return self.do_release(expected_token)

async def do_release(self, expected_token: bytes):
async def do_release(self, expected_token: bytes) -> None:
if not bool(
await self.lua_release(
keys=[self.name], args=[expected_token], client=self.redis
Expand Down
3 changes: 2 additions & 1 deletion redis/asyncio/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def __init__(
"""
Initialize a `Retry` object with a `Backoff` object
that retries a maximum of `retries` times.
`retries` can be negative to retry forever.
You can specify the types of supported errors which trigger
a retry with the `supported_errors` parameter.
"""
Expand All @@ -51,7 +52,7 @@ async def call_with_retry(
except self._supported_errors as error:
failures += 1
await fail(error)
if failures > self._retries:
if self._retries >= 0 and failures > self._retries:
raise error
backoff = self._backoff.compute(failures)
if backoff > 0:
Expand Down
5 changes: 4 additions & 1 deletion redis/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -867,7 +867,10 @@ def set_response_callback(self, command, callback):
self.cluster_response_callbacks[command] = callback

def _determine_nodes(self, *args, **kwargs):
command = args[0]
command = args[0].upper()
if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags:
command = f"{args[0]} {args[1]}".upper()

nodes_flag = kwargs.pop("nodes_flag", None)
if nodes_flag is not None:
# nodes flag passed by the user
Expand Down
3 changes: 2 additions & 1 deletion redis/commands/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from .core import AsyncCoreCommands, CoreCommands
from .helpers import list_or_args
from .parser import CommandsParser
from .redismodules import RedisModuleCommands
from .redismodules import AsyncRedisModuleCommands, RedisModuleCommands
from .sentinel import AsyncSentinelCommands, SentinelCommands

__all__ = [
Expand All @@ -11,6 +11,7 @@
"AsyncCoreCommands",
"CoreCommands",
"list_or_args",
"AsyncRedisModuleCommands",
"RedisModuleCommands",
"AsyncSentinelCommands",
"SentinelCommands",
Expand Down
7 changes: 3 additions & 4 deletions redis/commands/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,15 +189,15 @@ def replicaof(self, *args, **kwargs):
For more information see https://redis.io/commands/replicaof
"""
raise RedisClusterException("REPLICAOF is not supported in cluster" " mode")
raise RedisClusterException("REPLICAOF is not supported in cluster mode")

def swapdb(self, *args, **kwargs):
"""
Swaps two Redis databases.
For more information see https://redis.io/commands/swapdb
"""
raise RedisClusterException("SWAPDB is not supported in cluster" " mode")
raise RedisClusterException("SWAPDB is not supported in cluster mode")


class ClusterDataAccessCommands(DataAccessCommands):
Expand Down Expand Up @@ -310,7 +310,6 @@ class RedisClusterCommands(
target specific nodes. By default, if target_nodes is not specified, the
command will be executed on the default cluster node.
:param :target_nodes: type can be one of the followings:
- nodes flag: ALL_NODES, PRIMARIES, REPLICAS, RANDOM
- 'ClusterNode'
Expand All @@ -323,7 +322,7 @@ class RedisClusterCommands(

def cluster_myid(self, target_node):
"""
Returns the nodes id.
Returns the node's id.
:target_node: 'ClusterNode'
The node to execute the command on
Expand Down
29 changes: 24 additions & 5 deletions redis/commands/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,9 +376,11 @@ def auth(self, password, username=None, **kwargs):
authenticate for the given user.
For more information see https://redis.io/commands/auth
"""
if username:
return self.execute_command("AUTH", username, password, **kwargs)
return self.execute_command
pieces = []
if username is not None:
pieces.append(username)
pieces.append(password)
return self.execute_command("AUTH", *pieces, **kwargs)

def bgrewriteaof(self, **kwargs):
"""Tell the Redis server to rewrite the AOF file from data in memory.
Expand Down Expand Up @@ -1090,6 +1092,15 @@ def memory_purge(self, **kwargs) -> ResponseT:
"""
return self.execute_command("MEMORY PURGE", **kwargs)

def latency_histogram(self, *args):
"""
This function throws a NotImplementedError since it is intentionally
not supported.
"""
raise NotImplementedError(
"LATENCY HISTOGRAM is intentionally not implemented in the client."
)

def ping(self, **kwargs) -> ResponseT:
"""
Ping the Redis server
Expand Down Expand Up @@ -3522,6 +3533,7 @@ def xgroup_create(
groupname: GroupT,
id: StreamIdT = "$",
mkstream: bool = False,
entries_read: Optional[int] = None,
) -> ResponseT:
"""
Create a new consumer group associated with a stream.
Expand All @@ -3534,6 +3546,9 @@ def xgroup_create(
pieces: list[EncodableT] = ["XGROUP CREATE", name, groupname, id]
if mkstream:
pieces.append(b"MKSTREAM")
if entries_read is not None:
pieces.extend(["ENTRIESREAD", entries_read])

return self.execute_command(*pieces)

def xgroup_delconsumer(
Expand Down Expand Up @@ -3589,6 +3604,7 @@ def xgroup_setid(
name: KeyT,
groupname: GroupT,
id: StreamIdT,
entries_read: Optional[int] = None,
) -> ResponseT:
"""
Set the consumer group last delivered ID to something else.
Expand All @@ -3598,7 +3614,10 @@ def xgroup_setid(
For more information see https://redis.io/commands/xgroup-setid
"""
return self.execute_command("XGROUP SETID", name, groupname, id)
pieces = [name, groupname, id]
if entries_read is not None:
pieces.extend(["ENTRIESREAD", entries_read])
return self.execute_command("XGROUP SETID", *pieces)

def xinfo_consumers(self, name: KeyT, groupname: GroupT) -> ResponseT:
"""
Expand Down Expand Up @@ -3845,7 +3864,7 @@ def xrevrange(
def xtrim(
self,
name: KeyT,
maxlen: int,
maxlen: Union[int, None],
approximate: bool = True,
minid: Union[StreamIdT, None] = None,
limit: Union[int, None] = None,
Expand Down
10 changes: 10 additions & 0 deletions redis/commands/redismodules.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,13 @@ def graph(self, index_name="idx"):

g = Graph(client=self, name=index_name)
return g


class AsyncRedisModuleCommands(RedisModuleCommands):
def ft(self, index_name="idx"):
"""Access the search namespace, providing support for redis search."""

from .search import AsyncSearch

s = AsyncSearch(client=self, index_name=index_name)
return s
Loading

0 comments on commit a830157

Please sign in to comment.