Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove verbose logging from redis-py/redis/cluster.py #2238

Merged
merged 19 commits into from
Jul 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGES
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@

* Remove verbose logging from cluster.py
* Add retry mechanism to async version of Connection
* Compare commands case-insensitively in the asyncio command parser
* Allow negative `retries` for `Retry` class to retry forever
Expand Down
63 changes: 7 additions & 56 deletions redis/cluster.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import copy
import logging
import random
import socket
import sys
Expand All @@ -15,7 +14,6 @@
from redis.exceptions import (
AskError,
AuthenticationError,
BusyLoadingError,
ClusterCrossSlotError,
ClusterDownError,
ClusterError,
Expand All @@ -39,8 +37,6 @@
str_if_bytes,
)

log = logging.getLogger(__name__)


def get_node_name(host: str, port: int) -> str:
return f"{host}:{port}"
Expand Down Expand Up @@ -535,7 +531,6 @@ def __init__(
" RedisCluster(startup_nodes=[ClusterNode('localhost', 6379),"
" ClusterNode('localhost', 6378)])"
)
log.debug(f"startup_nodes : {startup_nodes}")
# Update the connection arguments
# Whenever a new connection is established, RedisCluster's on_connect
# method should be run
Expand Down Expand Up @@ -666,13 +661,8 @@ def set_default_node(self, node):
:return True if the default node was set, else False
"""
if node is None or self.get_node(node_name=node.name) is None:
log.info(
"The requested node does not exist in the cluster, so "
"the default node was not changed."
)
return False
self.nodes_manager.default_node = node
log.info(f"Changed the default cluster node to {node}")
return True

def monitor(self, target_node=None):
Expand Down Expand Up @@ -816,8 +806,6 @@ def _determine_nodes(self, *args, **kwargs):
else:
# get the nodes group for this command if it was predefined
command_flag = self.command_flags.get(command)
if command_flag:
log.debug(f"Target node/s for {command}: {command_flag}")
if command_flag == self.__class__.RANDOM:
# return a random node
return [self.get_random_node()]
Expand All @@ -841,7 +829,6 @@ def _determine_nodes(self, *args, **kwargs):
node = self.nodes_manager.get_node_from_slot(
slot, self.read_from_replicas and command in READ_COMMANDS
)
log.debug(f"Target for {args}: slot {slot}")
return [node]

def _should_reinitialized(self):
Expand Down Expand Up @@ -1019,7 +1006,7 @@ def execute_command(self, *args, **kwargs):
res[node.name] = self._execute_command(node, *args, **kwargs)
# Return the processed result
return self._process_result(args[0], res, **kwargs)
except BaseException as e:
except Exception as e:
if type(e) in self.__class__.ERRORS_ALLOW_RETRY:
# The nodes and slots cache were reinitialized.
# Try again with the new cluster setup.
Expand Down Expand Up @@ -1059,10 +1046,6 @@ def _execute_command(self, target_node, *args, **kwargs):
)
moved = False

log.debug(
f"Executing command {command} on target node: "
f"{target_node.server_type} {target_node.name}"
)
redis_node = self.get_redis_connection(target_node)
connection = get_connection(redis_node, *args, **kwargs)
if asking:
Expand All @@ -1077,12 +1060,9 @@ def _execute_command(self, target_node, *args, **kwargs):
response, **kwargs
)
return response

except (RedisClusterException, BusyLoadingError, AuthenticationError) as e:
log.exception(type(e))
except AuthenticationError:
raise
except (ConnectionError, TimeoutError) as e:
log.exception(type(e))
# ConnectionError can also be raised if we couldn't get a
# connection from the pool before timing out, so check that
# this is an actual connection before attempting to disconnect.
Expand All @@ -1101,7 +1081,7 @@ def _execute_command(self, target_node, *args, **kwargs):
# and try again with the new setup
target_node.redis_connection = None
self.nodes_manager.initialize()
raise
raise e
except MovedError as e:
# First, we will try to patch the slots/nodes cache with the
# redirected node output and try again. If MovedError exceeds
Expand All @@ -1111,7 +1091,6 @@ def _execute_command(self, target_node, *args, **kwargs):
# the same client object is shared between multiple threads. To
# reduce the frequency you can set this variable in the
# RedisCluster constructor.
log.exception("MovedError")
self.reinitialize_counter += 1
if self._should_reinitialized():
self.nodes_manager.initialize()
Expand All @@ -1121,29 +1100,21 @@ def _execute_command(self, target_node, *args, **kwargs):
self.nodes_manager.update_moved_exception(e)
moved = True
except TryAgainError:
log.exception("TryAgainError")

if ttl < self.RedisClusterRequestTTL / 2:
time.sleep(0.05)
except AskError as e:
log.exception("AskError")

redirect_addr = get_node_name(host=e.host, port=e.port)
asking = True
except ClusterDownError as e:
log.exception("ClusterDownError")
# ClusterDownError can occur during a failover and to get
# self-healed, we will try to reinitialize the cluster layout
# and retry executing the command
time.sleep(0.25)
self.nodes_manager.initialize()
raise e
except ResponseError as e:
message = e.__str__()
log.exception(f"ResponseError: {message}")
raise e
except BaseException as e:
log.exception("BaseException")
except ResponseError:
raise
except Exception as e:
if connection:
connection.disconnect()
raise e
Expand Down Expand Up @@ -1280,11 +1251,6 @@ def get_node(self, host=None, port=None, node_name=None):
elif node_name:
return self.nodes_cache.get(node_name)
else:
log.error(
"get_node requires one of the following: "
"1. node name "
"2. host and port"
)
return None

def update_moved_exception(self, exception):
Expand Down Expand Up @@ -1432,7 +1398,6 @@ def initialize(self):
:startup_nodes:
Responsible for discovering other nodes in the cluster
"""
log.debug("Initializing the nodes' topology of the cluster")
self.reset()
tmp_nodes_cache = {}
tmp_slots = {}
Expand Down Expand Up @@ -1460,17 +1425,9 @@ def initialize(self):
)
cluster_slots = str_if_bytes(r.execute_command("CLUSTER SLOTS"))
startup_nodes_reachable = True
except (ConnectionError, TimeoutError) as e:
msg = e.__str__
log.exception(
"An exception occurred while trying to"
" initialize the cluster using the seed node"
f" {startup_node.name}:\n{msg}"
)
except (ConnectionError, TimeoutError):
continue
except ResponseError as e:
log.exception('ReseponseError sending "cluster slots" to redis server')

# Isn't a cluster connection, so it won't parse these
# exceptions automatically
message = e.__str__()
Expand Down Expand Up @@ -2042,12 +1999,6 @@ def _send_cluster_commands(
# If a lot of commands have failed, we'll be setting the
# flag to rebuild the slots table from scratch.
# So MOVED errors should correct themselves fairly quickly.
log.exception(
f"An exception occurred during pipeline execution. "
f"args: {attempt[-1].args}, "
f"error: {type(attempt[-1].result).__name__} "
f"{str(attempt[-1].result)}"
)
self.reinitialize_counter += 1
if self._should_reinitialized():
self.nodes_manager.initialize()
Expand Down