diff --git a/.github/wordlist.txt b/.github/wordlist.txt index 540e915277..0c546e3e8a 100644 --- a/.github/wordlist.txt +++ b/.github/wordlist.txt @@ -44,6 +44,7 @@ RedisJSON RedisTimeSeries SHA SearchCommands +SentinelBlockingConnectionPool SentinelCommands SentinelConnectionPool Sharded diff --git a/CHANGES b/CHANGES index 3204d6df3b..361be24544 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,4 @@ + * Add redis.asyncio.SentinelBlockingConnectionPool * Use SentinelConnectionPoolProxy in asyncio.sentinel module * Move doctests (doc code examples) to main branch * Update `ResponseT` type hint diff --git a/docs/connections.rst b/docs/connections.rst index 1c826a0cf8..d5542ec41b 100644 --- a/docs/connections.rst +++ b/docs/connections.rst @@ -63,6 +63,24 @@ This client is used for communicating with Redis, asynchronously. :members: +Async Sentinel Client +********************* + +Sentinel (Async) +======== +.. autoclass:: redis.asyncio.sentinel.Sentinel + :members: + +SentinelConnectionPool (Async) +============================== +.. autoclass:: redis.asyncio.sentinel.SentinelConnectionPool + :members: + +SentinelBlockingConnectionPool (Async) +====================================== +.. autoclass:: redis.asyncio.sentinel.SentinelBlockingConnectionPool + :members: + Async Cluster Client ******************** diff --git a/redis/asyncio/__init__.py b/redis/asyncio/__init__.py index 3545ab44c2..651472a8e3 100644 --- a/redis/asyncio/__init__.py +++ b/redis/asyncio/__init__.py @@ -9,6 +9,7 @@ ) from redis.asyncio.sentinel import ( Sentinel, + SentinelBlockingConnectionPool, SentinelConnectionPool, SentinelManagedConnection, SentinelManagedSSLConnection, @@ -53,6 +54,7 @@ "RedisError", "ResponseError", "Sentinel", + "SentinelBlockingConnectionPool", "SentinelConnectionPool", "SentinelManagedConnection", "SentinelManagedSSLConnection", diff --git a/redis/asyncio/sentinel.py b/redis/asyncio/sentinel.py index 2ba275d451..407b28e06a 100644 --- a/redis/asyncio/sentinel.py +++ b/redis/asyncio/sentinel.py @@ -1,10 +1,20 @@ import asyncio import random import weakref -from typing import AsyncIterator, Iterable, Mapping, Optional, Sequence, Tuple, Type +from typing import ( + AsyncIterator, + Iterable, + Mapping, + Optional, + Sequence, + Tuple, + Type, + Union, +) from redis.asyncio.client import Redis from redis.asyncio.connection import ( + BlockingConnectionPool, Connection, ConnectionPool, EncodableT, @@ -203,12 +213,38 @@ async def get_master_address(self): def rotate_slaves(self) -> AsyncIterator: """Round-robin slave balancer""" return self.proxy.rotate_slaves() + + +class SentinelBlockingConnectionPool(BlockingConnectionPool): + """ + Sentinel blocking connection pool. + + If ``check_connection`` flag is set to True, SentinelManagedConnection + sends a PING command right after establishing the connection. + """ + + def __init__(self, service_name, sentinel_manager, **kwargs): + kwargs["connection_class"] = kwargs.get( + "connection_class", + ( + SentinelManagedSSLConnection + if kwargs.pop("ssl", False) + else SentinelManagedConnection + ), + ) + self.is_master = kwargs.pop("is_master", True) + self.check_connection = kwargs.pop("check_connection", False) + self.proxy = SentinelConnectionPoolProxy( + connection_pool=self, + is_master=self.is_master, + check_connection=self.check_connection, + service_name=service_name, + sentinel_manager=sentinel_manager, + ) super().__init__(**kwargs) - self.connection_kwargs["connection_pool"] = weakref.proxy(self) + self.connection_kwargs["connection_pool"] = self.proxy self.service_name = service_name self.sentinel_manager = sentinel_manager - self.master_address = None - self.slave_rr_counter = None def __repr__(self): return ( @@ -218,8 +254,11 @@ def __repr__(self): def reset(self): super().reset() - self.master_address = None - self.slave_rr_counter = None + self.proxy.reset() + + @property + def master_address(self): + return self.proxy.master_address def owns_connection(self, connection: Connection): check = not self.is_master or ( @@ -228,31 +267,11 @@ def owns_connection(self, connection: Connection): return check and super().owns_connection(connection) async def get_master_address(self): - master_address = await self.sentinel_manager.discover_master(self.service_name) - if self.is_master: - if self.master_address != master_address: - self.master_address = master_address - # disconnect any idle connections so that they reconnect - # to the new master the next time that they are used. - await self.disconnect(inuse_connections=False) - return master_address + return await self.proxy.get_master_address() - async def rotate_slaves(self) -> AsyncIterator: + def rotate_slaves(self) -> AsyncIterator: """Round-robin slave balancer""" - slaves = await self.sentinel_manager.discover_slaves(self.service_name) - if slaves: - if self.slave_rr_counter is None: - self.slave_rr_counter = random.randint(0, len(slaves) - 1) - for _ in range(len(slaves)): - self.slave_rr_counter = (self.slave_rr_counter + 1) % len(slaves) - slave = slaves[self.slave_rr_counter] - yield slave - # Fallback to the master connection - try: - yield await self.get_master_address() - except MasterNotFoundError: - pass - raise SlaveNotFoundError(f"No slave found for {self.service_name!r}") + return self.proxy.rotate_slaves() class Sentinel(AsyncSentinelCommands): @@ -405,7 +424,10 @@ def master_for( self, service_name: str, redis_class: Type[Redis] = Redis, - connection_pool_class: Type[SentinelConnectionPool] = SentinelConnectionPool, + connection_pool_class: Union[ + Type[SentinelConnectionPool], + Type[SentinelBlockingConnectionPool], + ] = SentinelConnectionPool, **kwargs, ): """ @@ -442,7 +464,10 @@ def slave_for( self, service_name: str, redis_class: Type[Redis] = Redis, - connection_pool_class: Type[SentinelConnectionPool] = SentinelConnectionPool, + connection_pool_class: Union[ + Type[SentinelConnectionPool], + Type[SentinelBlockingConnectionPool], + ] = SentinelConnectionPool, **kwargs, ): """ diff --git a/script.py b/script.py new file mode 100644 index 0000000000..12bbcd23c1 --- /dev/null +++ b/script.py @@ -0,0 +1,14 @@ +import asyncio + +from redis.asyncio import Sentinel, SentinelBlockingConnectionPool + + +async def main() -> None: + sentinel = Sentinel([('localhost', 26379)], socket_timeout=5.0, check_connection=True) + conn = await sentinel.master_for('mymaster', connection_pool_class=SentinelBlockingConnectionPool, max_connections=5, timeout=None) + print(await conn.ping()) + # await conn.aclose() + + +if __name__ == '__main__': + asyncio.run(main())