Skip to content
This repository has been archived by the owner on Feb 21, 2023. It is now read-only.

Commit

Permalink
Browse files Browse the repository at this point in the history
Merged new sentinel commands from redis/redis-py#834 (redis/redis-py#1550)

Signed-off-by: Andrew-Chen-Wang <acwangpython@gmail.com>
  • Loading branch information
Andrew-Chen-Wang committed Oct 8, 2021
1 parent d11ecd9 commit a83c83e
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 18 deletions.
4 changes: 4 additions & 0 deletions aioredis/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -723,10 +723,14 @@ class Redis(Commands):
"SCRIPT FLUSH": bool_ok,
"SCRIPT KILL": bool_ok,
"SCRIPT LOAD": str_if_bytes,
"SENTINEL CKQUORUM": bool_ok,
"SENTINEL FAILOVER": bool_ok,
"SENTINEL FLUSHCONFIG": bool_ok,
"SENTINEL GET-MASTER-ADDR-BY-NAME": parse_sentinel_get_master,
"SENTINEL MASTER": parse_sentinel_master,
"SENTINEL MASTERS": parse_sentinel_masters,
"SENTINEL MONITOR": bool_ok,
"SENTINEL RESET": bool_ok,
"SENTINEL REMOVE": bool_ok,
"SENTINEL SENTINELS": parse_sentinel_slaves_and_sentinels,
"SENTINEL SET": bool_ok,
Expand Down
73 changes: 55 additions & 18 deletions aioredis/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -3543,26 +3543,24 @@ def execute(self):


class SentinelCommands:
_SELF_ANNOTATION = Union[CommandsProtocol, "SentinelCommands"]

def sentinel_get_master_addr_by_name(
self: Union[CommandsProtocol, "SentinelCommands"], service_name: str
self: _SELF_ANNOTATION, service_name: str
) -> Awaitable:
"""Returns a (host, port) pair for the given ``service_name``"""
return self.execute_command("SENTINEL GET-MASTER-ADDR-BY-NAME", service_name)

def sentinel_master(
self: Union[CommandsProtocol, "SentinelCommands"], service_name: str
) -> Awaitable:
def sentinel_master(self: _SELF_ANNOTATION, service_name: str) -> Awaitable:
"""Returns a dictionary containing the specified masters state."""
return self.execute_command("SENTINEL MASTER", service_name)

def sentinel_masters(
self: Union[CommandsProtocol, "SentinelCommands"]
) -> Awaitable:
def sentinel_masters(self: _SELF_ANNOTATION) -> Awaitable:
"""Returns a list of dictionaries containing each master's state."""
return self.execute_command("SENTINEL MASTERS")

def sentinel_monitor(
self: Union[CommandsProtocol, "SentinelCommands"],
self: _SELF_ANNOTATION,
name: str,
ip: str,
port: int,
Expand All @@ -3571,29 +3569,68 @@ def sentinel_monitor(
"""Add a new master to Sentinel to be monitored"""
return self.execute_command("SENTINEL MONITOR", name, ip, port, quorum)

def sentinel_remove(
self: Union[CommandsProtocol, "SentinelCommands"], name: str
) -> Awaitable:
def sentinel_remove(self: _SELF_ANNOTATION, name: str) -> Awaitable:
"""Remove a master from Sentinel's monitoring"""
return self.execute_command("SENTINEL REMOVE", name)

def sentinel_sentinels(
self: Union[CommandsProtocol, "SentinelCommands"], service_name: str
) -> Awaitable:
def sentinel_sentinels(self: _SELF_ANNOTATION, service_name: str) -> Awaitable:
"""Returns a list of sentinels for ``service_name``"""
return self.execute_command("SENTINEL SENTINELS", service_name)

def sentinel_set(
self: Union[CommandsProtocol, "SentinelCommands"],
self: _SELF_ANNOTATION,
name: str,
option: str,
value: EncodableT,
) -> Awaitable:
"""Set Sentinel monitoring parameters for a given master"""
return self.execute_command("SENTINEL SET", name, option, value)

def sentinel_slaves(
self: Union[CommandsProtocol, "SentinelCommands"], service_name: str
) -> Awaitable:
def sentinel_slaves(self: _SELF_ANNOTATION, service_name: str) -> Awaitable:
"""Returns a list of slaves for ``service_name``"""
return self.execute_command("SENTINEL SLAVES", service_name)

def sentinel_reset(self: _SELF_ANNOTATION, pattern: PatternT) -> Awaitable:
"""
This command will reset all the masters with matching name.
The pattern argument is a glob-style pattern.
The reset process clears any previous state in a master (including a
failover in progress), and removes every slave and sentinel already
discovered and associated with the master.
"""
return self.execute_command("SENTINEL RESET", pattern, once=True)

def sentinel_failover(self: _SELF_ANNOTATION, new_master_name: str) -> Awaitable:
"""
Force a failover as if the master was not reachable, and without
asking for agreement to other Sentinels (however a new version of the
configuration will be published so that the other Sentinels will
update their configurations).
"""
return self.execute_command("SENTINEL FAILOVER", new_master_name)

def sentinel_ckquorum(self: _SELF_ANNOTATION, new_master_name: str) -> Awaitable:
"""
Check if the current Sentinel configuration is able to reach the
quorum needed to failover a master, and the majority needed to
authorize the failover.
This command should be used in monitoring systems to check if a
Sentinel deployment is ok.
"""
return self.execute_command("SENTINEL CKQUORUM", new_master_name, once=True)

def sentinel_flushconfig(self: _SELF_ANNOTATION) -> Awaitable:
"""
Force Sentinel to rewrite its configuration on disk, including the
current Sentinel state.
Normally Sentinel rewrites the configuration every time something
changes in its state (in the context of the subset of the state which
is persisted on disk across restart).
However sometimes it is possible that the configuration file is lost
because of operation errors, disk failures, package upgrade scripts or
configuration managers. In those cases a way to to force Sentinel to
rewrite the configuration file is handy.
This command works even if the previous configuration file is
completely missing.
"""
return self.execute_command("SENTINEL FLUSHCONFIG")
17 changes: 17 additions & 0 deletions aioredis/sentinel.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,23 @@ def __init__(
self.min_other_sentinels = min_other_sentinels
self.connection_kwargs = connection_kwargs

def execute_command(self, *args, **kwargs):
"""
Execute Sentinel command in sentinel nodes.
once - If set to True, then execute the resulting command on a single
node at random, rather than across the entire sentinel cluster.
"""
once = bool(kwargs.get("once", False))
if "once" in kwargs.keys():
kwargs.pop("once")

if once:
for sentinel in self.sentinels:
sentinel.execute_command(*args, **kwargs)
else:
random.choice(self.sentinels).execute_command(*args, **kwargs)
return True

def __repr__(self):
sentinel_addresses = []
for sentinel in self.sentinels:
Expand Down
19 changes: 19 additions & 0 deletions tests/test_sentinel.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ async def sentinel_slaves(self, master_name):
return []
return self.cluster.slaves

def execute_command(self, *args, **kwargs):
# wrapper purely to validate the calls don't explode
from aioredis.client import bool_ok

return bool_ok


class SentinelTestCluster:
def __init__(self, service_name="mymaster", ip="127.0.0.1", port=6379):
Expand Down Expand Up @@ -207,3 +213,16 @@ async def test_slave_round_robin(cluster, sentinel, master_ip):
assert await rotator.__anext__() == (master_ip, 6379)
with pytest.raises(SlaveNotFoundError):
await rotator.__anext__()


async def test_ckquorum(cluster, sentinel):
assert await sentinel.sentinel_ckquorum("mymaster")


async def test_flushconfig(cluster, sentinel):
assert await sentinel.sentinel_flushconfig()


async def test_reset(cluster, sentinel):
cluster.master["is_odown"] = True
assert await sentinel.sentinel_reset("mymaster")

0 comments on commit a83c83e

Please sign in to comment.