diff --git a/redis/commands/cluster.py b/redis/commands/cluster.py index a1060d2cbb..68b66fa2de 100644 --- a/redis/commands/cluster.py +++ b/redis/commands/cluster.py @@ -316,6 +316,25 @@ async def _split_command_across_slots(self, command: str, *keys: KeyT) -> int: # Sum up the reply from each command return sum(await self._execute_pipeline_by_slot(command, slots_to_keys)) + async def _execute_pipeline_by_slot( + self, command: str, slots_to_args: Mapping[int, Iterable[EncodableT]] + ) -> List[Any]: + if self._initialize: + await self.initialize() + read_from_replicas = self.read_from_replicas and command in READ_COMMANDS + pipe = self.pipeline() + [ + pipe.execute_command( + command, + *slot_args, + target_nodes=[ + self.nodes_manager.get_node_from_slot(slot, read_from_replicas) + ], + ) + for slot, slot_args in slots_to_args.items() + ] + return await pipe.execute() + class ClusterManagementCommands(ManagementCommands): """ diff --git a/tests/test_asyncio/test_cluster.py b/tests/test_asyncio/test_cluster.py index 4a001bfc11..7e1049ecd1 100644 --- a/tests/test_asyncio/test_cluster.py +++ b/tests/test_asyncio/test_cluster.py @@ -812,6 +812,15 @@ async def test_unlink(self, r: RedisCluster) -> None: await asyncio.sleep(0.1) assert await r.unlink(*d.keys()) == 0 + async def test_initialize_before_execute_multi_key_command( + self, request: FixtureRequest + ) -> None: + # Test for issue https://github.com/redis/redis-py/issues/2437 + url = request.config.getoption("--redis-url") + r = RedisCluster.from_url(url) + assert 0 == await r.exists("a", "b", "c") + await r.close() + @skip_if_redis_enterprise() async def test_cluster_myid(self, r: RedisCluster) -> None: node = r.get_random_node()