Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement locks for RedisCluster #2013

Merged
merged 9 commits into from
Mar 1, 2022
20 changes: 11 additions & 9 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
* Create codeql-analysis.yml (#1988). Thanks @chayim
* Create codeql-analysis.yml (#1988). Thanks @chayim
* Add limited support for Lua scripting with RedisCluster
* Implement `.lock()` method on RedisCluster
* 4.1.3 (Feb 8, 2022)
* Fix flushdb and flushall (#1926)
* Add redis5 and redis4 dockers (#1871)
* Change json.clear test multi to be up to date with redisjson (#1922)
* Fixing volume for unstable_cluster docker (#1914)
* Update changes file with changes since 4.0.0-beta2 (#1915)
* Fix flushdb and flushall (#1926)
* Add redis5 and redis4 dockers (#1871)
* Change json.clear test multi to be up to date with redisjson (#1922)
* Fixing volume for unstable_cluster docker (#1914)
* Update changes file with changes since 4.0.0-beta2 (#1915)
* 4.1.2 (Jan 27, 2022)
* Invalid OCSP certificates should raise ConnectionError on failed validation (#1907)
* Added retry mechanism on socket timeouts when connecting to the server (#1895)
Expand Down Expand Up @@ -94,10 +96,10 @@
* Removing command on initial connections (#1722)
* Removing hiredis warning when not installed (#1721)
* 4.0.0 (Nov 15, 2021)
* FT.EXPLAINCLI intentionally raising NotImplementedError
* FT.EXPLAINCLI intentionally raising NotImplementedError
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry for these whitespace trims -- my IDE does it 😅

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flake8 actually enforces it, but only on python files ;)

* Restoring ZRANGE desc for Redis < 6.2.0 (#1697)
* Response parsing occasionally fails to parse floats (#1692)
* Re-enabling read-the-docs (#1707)
* Re-enabling read-the-docs (#1707)
* Call HSET after FT.CREATE to avoid keyspace scan (#1706)
* Unit tests fixes for compatibility (#1703)
* Improve documentation about Locks (#1701)
Expand All @@ -117,7 +119,7 @@
* Sleep for flaky search test (#1680)
* Test function renames, to match standards (#1679)
* Docstring improvements for Redis class (#1675)
* Fix georadius tests (#1672)
* Fix georadius tests (#1672)
* Improvements to JSON coverage (#1666)
* Add python_requires setuptools check for python > 3.6 (#1656)
* SMISMEMBER support (#1667)
Expand Down
5 changes: 4 additions & 1 deletion redis/asyncio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,10 @@ def lock(
continue trying forever. ``blocking_timeout`` can be specified as a
float or integer, both representing the number of seconds to wait.

``lock_class`` forces the specified lock implementation.
``lock_class`` forces the specified lock implementation. Note that as
of redis-py 3.0, the only lock class we implement is ``Lock`` (which is
a Lua-based lock). So, it's unlikely you'll need this parameter, unless
you have created your own custom lock class.

``thread_local`` indicates whether the lock token is placed in
thread-local storage. By default, the token is placed in thread local
Expand Down
5 changes: 4 additions & 1 deletion redis/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1082,7 +1082,10 @@ def lock(
continue trying forever. ``blocking_timeout`` can be specified as a
float or integer, both representing the number of seconds to wait.

``lock_class`` forces the specified lock implementation.
``lock_class`` forces the specified lock implementation. Note that as
of redis-py 3.0, the only lock class we implement is ``Lock`` (which is
a Lua-based lock). So, it's unlikely you'll need this parameter, unless
you have created your own custom lock class.

``thread_local`` indicates whether the lock token is placed in
thread-local storage. By default, the token is placed in thread local
Expand Down
67 changes: 67 additions & 0 deletions redis/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
TimeoutError,
TryAgainError,
)
from redis.lock import Lock
from redis.utils import (
dict_merge,
list_keys_to_dict,
Expand Down Expand Up @@ -742,6 +743,72 @@ def pipeline(self, transaction=None, shard_hint=None):
reinitialize_steps=self.reinitialize_steps,
)

def lock(
self,
name,
timeout=None,
sleep=0.1,
blocking_timeout=None,
lock_class=None,
thread_local=True,
):
"""
Return a new Lock object using key ``name`` that mimics
the behavior of threading.Lock.

If specified, ``timeout`` indicates a maximum life for the lock.
By default, it will remain locked until release() is called.

``sleep`` indicates the amount of time to sleep per loop iteration
when the lock is in blocking mode and another client is currently
holding the lock.

``blocking_timeout`` indicates the maximum amount of time in seconds to
spend trying to acquire the lock. A value of ``None`` indicates
continue trying forever. ``blocking_timeout`` can be specified as a
float or integer, both representing the number of seconds to wait.

``lock_class`` forces the specified lock implementation. Note that as
of redis-py 3.0, the only lock class we implement is ``Lock`` (which is
a Lua-based lock). So, it's unlikely you'll need this parameter, unless
you have created your own custom lock class.

``thread_local`` indicates whether the lock token is placed in
thread-local storage. By default, the token is placed in thread local
storage so that a thread only sees its token, not a token set by
another thread. Consider the following timeline:

time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds.
thread-1 sets the token to "abc"
time: 1, thread-2 blocks trying to acquire `my-lock` using the
Lock instance.
time: 5, thread-1 has not yet completed. redis expires the lock
key.
time: 5, thread-2 acquired `my-lock` now that it's available.
thread-2 sets the token to "xyz"
time: 6, thread-1 finishes its work and calls release(). if the
token is *not* stored in thread local storage, then
thread-1 would see the token value as "xyz" and would be
able to successfully release the thread-2's lock.

In some use cases it's necessary to disable thread local storage. For
example, if you have code where one thread acquires a lock and passes
that lock instance to a worker thread to release later. If thread
local storage isn't disabled in this case, the worker thread won't see
the token set by the thread that acquired the lock. Our assumption
is that these cases aren't common and as such default to using
thread local storage."""
if lock_class is None:
lock_class = Lock
return lock_class(
self,
name,
timeout=timeout,
sleep=sleep,
blocking_timeout=blocking_timeout,
thread_local=thread_local,
)

def _determine_nodes(self, *args, **kwargs):
command = args[0]
nodes_flag = kwargs.pop("nodes_flag", None)
Expand Down
4 changes: 2 additions & 2 deletions redis/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ def acquire(self, blocking=None, blocking_timeout=None, token=None):
if token is None:
token = uuid.uuid1().hex.encode()
else:
encoder = self.redis.connection_pool.get_encoder()
encoder = self.redis.get_encoder()
token = encoder.encode(token)
if blocking is None:
blocking = self.blocking
Expand Down Expand Up @@ -224,7 +224,7 @@ def owned(self):
# need to always compare bytes to bytes
# TODO: this can be simplified when the context manager is finished
if stored_token and not isinstance(stored_token, bytes):
encoder = self.redis.connection_pool.get_encoder()
encoder = self.redis.get_encoder()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm beginning to confuse myself a bit -- I think this is a safe change since both redis classes have the .get_encoder() method defined on them already? But I'm not positive

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're right. Since merging in cluster and asyncio, we actually need to start cleaning and unifying these interfaces. There's a lot of duplication - but the philosophy was first get it in, then clean it up.
@dvora-h concur?

stored_token = encoder.encode(stored_token)
return self.local.token is not None and stored_token == self.local.token

Expand Down
2 changes: 0 additions & 2 deletions tests/test_lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from .conftest import _get_client


@pytest.mark.onlynoncluster
class TestLock:
@pytest.fixture()
def r_decoded(self, request):
Expand Down Expand Up @@ -223,7 +222,6 @@ def test_reacquiring_lock_no_longer_owned_raises_error(self, r):
lock.reacquire()


@pytest.mark.onlynoncluster
class TestLockClassSelection:
def test_lock_class_argument(self, r):
class MyLock:
Expand Down