diff --git a/lib/redis_client/cluster/command.rb b/lib/redis_client/cluster/command.rb index 0a6f390..3664300 100644 --- a/lib/redis_client/cluster/command.rb +++ b/lib/redis_client/cluster/command.rb @@ -25,7 +25,7 @@ class Command ) class << self - def load(nodes, slow_command_timeout: -1) + def load(nodes, slow_command_timeout: -1) # rubocop:disable Metrics/AbcSize cmd = errors = nil nodes&.each do |node| @@ -43,7 +43,7 @@ def load(nodes, slow_command_timeout: -1) return cmd unless cmd.nil? - raise ::RedisClient::Cluster::InitialSetupError, errors + raise ::RedisClient::Cluster::InitialSetupError.from_errors(errors) end private diff --git a/lib/redis_client/cluster/errors.rb b/lib/redis_client/cluster/errors.rb index ca2717d..72c79b9 100644 --- a/lib/redis_client/cluster/errors.rb +++ b/lib/redis_client/cluster/errors.rb @@ -4,51 +4,60 @@ class RedisClient class Cluster + class Error < ::RedisClient::Error + def with_config(config) + @config = config + self + end + end + ERR_ARG_NORMALIZATION = ->(arg) { Array[arg].flatten.reject { |e| e.nil? || (e.respond_to?(:empty?) && e.empty?) } } private_constant :ERR_ARG_NORMALIZATION - class InitialSetupError < ::RedisClient::Error - def initialize(errors) + class InitialSetupError < Error + def self.from_errors(errors) msg = ERR_ARG_NORMALIZATION.call(errors).map(&:message).uniq.join(',') - super("Redis client could not fetch cluster information: #{msg}") + new("Redis client could not fetch cluster information: #{msg}") end end - class OrchestrationCommandNotSupported < ::RedisClient::Error - def initialize(command) + class OrchestrationCommandNotSupported < Error + def self.from_command(command) str = ERR_ARG_NORMALIZATION.call(command).map(&:to_s).join(' ').upcase msg = "#{str} command should be used with care " \ 'only by applications orchestrating Redis Cluster, like redis-cli, ' \ 'and the command if used out of the right context can leave the cluster ' \ 'in a wrong state or cause data loss.' - super(msg) + new(msg) end end - class ErrorCollection < ::RedisClient::Error + class ErrorCollection < Error attr_reader :errors - def initialize(errors) - @errors = {} + def self.with_errors(errors) if !errors.is_a?(Hash) || errors.empty? - super(errors.to_s) - return + new(errors.to_s).with_errors({}) + else + messages = errors.map { |node_key, error| "#{node_key}: (#{error.class}) #{error.message}" } + new(messages.join(', ')).with_errors(errors) end + end - @errors = errors - messages = @errors.map { |node_key, error| "#{node_key}: (#{error.class}) #{error.message}" } - super(messages.join(', ')) + def with_errors(errors) + @errors = errors if @errors.nil? + self end end - class AmbiguousNodeError < ::RedisClient::Error - def initialize(command) - super("Cluster client doesn't know which node the #{command} command should be sent to.") + class AmbiguousNodeError < Error + def self.from_command(command) + new("Cluster client doesn't know which node the #{command} command should be sent to.") end end - class NodeMightBeDown < ::RedisClient::Error + class NodeMightBeDown < Error def initialize(_ = '') super( 'The client is trying to fetch the latest cluster state ' \ diff --git a/lib/redis_client/cluster/node.rb b/lib/redis_client/cluster/node.rb index 80a4008..230995e 100644 --- a/lib/redis_client/cluster/node.rb +++ b/lib/redis_client/cluster/node.rb @@ -28,7 +28,7 @@ class Node private_constant :USE_CHAR_ARRAY_SLOT, :SLOT_SIZE, :MIN_SLOT, :MAX_SLOT, :DEAD_FLAGS, :ROLE_FLAGS, :EMPTY_ARRAY, :EMPTY_HASH - ReloadNeeded = Class.new(::RedisClient::Error) + ReloadNeeded = Class.new(::RedisClient::Cluster::Error) Info = Struct.new( 'RedisClusterNode', @@ -148,7 +148,7 @@ def send_ping(method, command, args, &block) raise ReloadNeeded if errors.values.any?(::RedisClient::ConnectionError) - raise ::RedisClient::Cluster::ErrorCollection, errors + raise ::RedisClient::Cluster::ErrorCollection.with_errors(errors) end def clients_for_scanning(seed: nil) @@ -267,7 +267,7 @@ def call_multiple_nodes!(clients, method, command, args, &block) result_values, errors = call_multiple_nodes(clients, method, command, args, &block) return result_values if errors.nil? || errors.empty? - raise ::RedisClient::Cluster::ErrorCollection, errors + raise ::RedisClient::Cluster::ErrorCollection.with_errors(errors) end def try_map(clients, &block) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity @@ -334,7 +334,7 @@ def refetch_node_info_list(startup_clients) # rubocop:disable Metrics/AbcSize, M work_group.close - raise ::RedisClient::Cluster::InitialSetupError, errors if node_info_list.nil? + raise ::RedisClient::Cluster::InitialSetupError.from_errors(errors) if node_info_list.nil? grouped = node_info_list.compact.group_by do |info_list| info_list.sort_by!(&:id) diff --git a/lib/redis_client/cluster/pipeline.rb b/lib/redis_client/cluster/pipeline.rb index c30055d..ebe9c6e 100644 --- a/lib/redis_client/cluster/pipeline.rb +++ b/lib/redis_client/cluster/pipeline.rb @@ -108,13 +108,13 @@ def ensure_connected_cluster_scoped(retryable: true, &block) end end - ReplySizeError = Class.new(::RedisClient::Error) + ReplySizeError = Class.new(::RedisClient::Cluster::Error) - class StaleClusterState < ::RedisClient::Error + class StaleClusterState < ::RedisClient::Cluster::Error attr_accessor :replies, :first_exception end - class RedirectionNeeded < ::RedisClient::Error + class RedirectionNeeded < ::RedisClient::Cluster::Error attr_accessor :replies, :indices, :first_exception end @@ -204,7 +204,7 @@ def execute # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Met work_group.close @router.renew_cluster_state if cluster_state_errors - raise ::RedisClient::Cluster::ErrorCollection, errors unless errors.nil? + raise ::RedisClient::Cluster::ErrorCollection.with_errors(errors).with_config(@router.config) unless errors.nil? required_redirections&.each do |node_key, v| raise v.first_exception if v.first_exception diff --git a/lib/redis_client/cluster/router.rb b/lib/redis_client/cluster/router.rb index 71082a0..dad104c 100644 --- a/lib/redis_client/cluster/router.rb +++ b/lib/redis_client/cluster/router.rb @@ -21,10 +21,10 @@ class Router private_constant :ZERO_CURSOR_FOR_SCAN, :TSF + attr_reader :config + def initialize(config, concurrent_worker, pool: nil, **kwargs) - @config = config.dup - @original_config = config.dup if config.connect_with_original_config - @connect_with_original_config = config.connect_with_original_config + @config = config @concurrent_worker = concurrent_worker @pool = pool @client_kwargs = kwargs @@ -32,6 +32,9 @@ def initialize(config, concurrent_worker, pool: nil, **kwargs) @node.reload! @command = ::RedisClient::Cluster::Command.load(@node.replica_clients.shuffle, slow_command_timeout: config.slow_command_timeout) @command_builder = @config.command_builder + rescue ::RedisClient::Cluster::InitialSetupError => e + e.with_config(config) + raise end def send_command(method, command, *args, &block) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity @@ -58,9 +61,9 @@ def send_command(method, command, *args, &block) # rubocop:disable Metrics/AbcSi when 'flushall', 'flushdb' @node.call_primaries(method, command, args).first.then(&TSF.call(block)) when 'readonly', 'readwrite', 'shutdown' - raise ::RedisClient::Cluster::OrchestrationCommandNotSupported, cmd + raise ::RedisClient::Cluster::OrchestrationCommandNotSupported.from_command(cmd).with_config(@config) when 'discard', 'exec', 'multi', 'unwatch' - raise ::RedisClient::Cluster::AmbiguousNodeError, cmd + raise ::RedisClient::Cluster::AmbiguousNodeError.from_command(cmd).with_config(@config) else node = assign_node(command) try_send(node, method, command, args, &block) @@ -69,7 +72,7 @@ def send_command(method, command, *args, &block) # rubocop:disable Metrics/AbcSi raise rescue ::RedisClient::Cluster::Node::ReloadNeeded renew_cluster_state - raise ::RedisClient::Cluster::NodeMightBeDown + raise ::RedisClient::Cluster::NodeMightBeDown.new.with_config(@config) rescue ::RedisClient::ConnectionError renew_cluster_state raise @@ -77,6 +80,7 @@ def send_command(method, command, *args, &block) # rubocop:disable Metrics/AbcSi renew_cluster_state if e.message.start_with?('CLUSTERDOWN') raise rescue ::RedisClient::Cluster::ErrorCollection => e + e.with_config(@config) raise if e.errors.any?(::RedisClient::CircuitBreaker::OpenCircuitError) renew_cluster_state if e.errors.values.any? do |err| @@ -189,7 +193,7 @@ def find_node_key_by_key(key, seed: nil, primary: false) node_key = primary ? @node.find_node_key_of_primary(slot) : @node.find_node_key_of_replica(slot) if node_key.nil? renew_cluster_state - raise ::RedisClient::Cluster::NodeMightBeDown + raise ::RedisClient::Cluster::NodeMightBeDown.new.with_config(@config) end node_key else @@ -303,7 +307,7 @@ def send_cluster_command(method, command, args, &block) # rubocop:disable Metric case subcommand = ::RedisClient::Cluster::NormalizedCmdName.instance.get_by_subcommand(command) when 'addslots', 'delslots', 'failover', 'forget', 'meet', 'replicate', 'reset', 'set-config-epoch', 'setslot' - raise ::RedisClient::Cluster::OrchestrationCommandNotSupported, ['cluster', subcommand] + raise ::RedisClient::Cluster::OrchestrationCommandNotSupported.from_command(['cluster', subcommand]).with_config(@config) when 'saveconfig' then @node.call_all(method, command, args).first.then(&TSF.call(block)) when 'getkeysinslot' raise ArgumentError, command.join(' ') if command.size != 4 @@ -347,7 +351,10 @@ def send_pubsub_command(method, command, args, &block) # rubocop:disable Metrics end def send_watch_command(command) - raise ::RedisClient::Cluster::Transaction::ConsistencyError, 'A block required. And you need to use the block argument as a client for the transaction.' unless block_given? + unless block_given? + msg = 'A block required. And you need to use the block argument as a client for the transaction.' + raise ::RedisClient::Cluster::Transaction::ConsistencyError.new(msg).with_config(@config) + end ::RedisClient::Cluster::OptimisticLocking.new(self).watch(command[1..]) do |c, slot, asking| transaction = ::RedisClient::Cluster::Transaction.new( @@ -401,7 +408,7 @@ def send_multiple_keys_command(cmd, method, command, args, &block) # rubocop:dis def handle_node_reload_error(retry_count: 1) yield rescue ::RedisClient::Cluster::Node::ReloadNeeded - raise ::RedisClient::Cluster::NodeMightBeDown if retry_count <= 0 + raise ::RedisClient::Cluster::NodeMightBeDown.new.with_config(@config) if retry_count <= 0 retry_count -= 1 renew_cluster_state diff --git a/lib/redis_client/cluster/transaction.rb b/lib/redis_client/cluster/transaction.rb index 0782b65..c35b52c 100644 --- a/lib/redis_client/cluster/transaction.rb +++ b/lib/redis_client/cluster/transaction.rb @@ -1,12 +1,13 @@ # frozen_string_literal: true require 'redis_client' +require 'redis_client/cluster/errors' require 'redis_client/cluster/pipeline' class RedisClient class Cluster class Transaction - ConsistencyError = Class.new(::RedisClient::Error) + ConsistencyError = Class.new(::RedisClient::Cluster::Error) MAX_REDIRECTION = 2 EMPTY_ARRAY = [].freeze @@ -67,7 +68,7 @@ def execute @pending_commands.each(&:call) return EMPTY_ARRAY if @pipeline._empty? - raise ConsistencyError, "couldn't determine the node: #{@pipeline._commands}" if @node.nil? + raise ConsistencyError.new("couldn't determine the node: #{@pipeline._commands}").with_config(@router.config) if @node.nil? commit end @@ -163,7 +164,7 @@ def coerce_results!(results, offset: 1) def handle_command_error!(err, redirect:) # rubocop:disable Metrics/AbcSize if err.message.start_with?('CROSSSLOT') - raise ConsistencyError, "#{err.message}: #{err.command}" + raise ConsistencyError.new("#{err.message}: #{err.command}").with_config(@router.config) elsif err.message.start_with?('MOVED') node = @router.assign_redirection_node(err.message) send_transaction(node, redirect: redirect - 1) @@ -183,7 +184,7 @@ def ensure_the_same_slot!(commands) return if slots.size == 1 && @watching_slot.nil? return if slots.size == 1 && @watching_slot == slots.first - raise(ConsistencyError, "the transaction should be executed to a slot in a node: #{commands}") + raise ConsistencyError.new("the transaction should be executed to a slot in a node: #{commands}").with_config(@router.config) end def try_asking(node) diff --git a/lib/redis_client/cluster_config.rb b/lib/redis_client/cluster_config.rb index 9fbd76d..b3834bf 100644 --- a/lib/redis_client/cluster_config.rb +++ b/lib/redis_client/cluster_config.rb @@ -3,6 +3,7 @@ require 'uri' require 'redis_client' require 'redis_client/cluster' +require 'redis_client/cluster/errors' require 'redis_client/cluster/node_key' require 'redis_client/command_builder' @@ -27,7 +28,7 @@ class ClusterConfig :VALID_SCHEMES, :VALID_NODES_KEYS, :MERGE_CONFIG_KEYS, :IGNORE_GENERIC_CONFIG_KEYS, :MAX_WORKERS, :SLOW_COMMAND_TIMEOUT, :MAX_STARTUP_SAMPLE - InvalidClientConfigError = Class.new(::RedisClient::Error) + InvalidClientConfigError = Class.new(::RedisClient::Cluster::Error) attr_reader :command_builder, :client_config, :replica_affinity, :slow_command_timeout, :connect_with_original_config, :startup_nodes, :max_startup_sample @@ -92,6 +93,18 @@ def client_config_for_node(node_key) augment_client_config(config) end + def resolved? + true + end + + def sentinel? + false + end + + def server_url + nil + end + private def merge_concurrency_option(option) diff --git a/test/redis_client/cluster/test_errors.rb b/test/redis_client/cluster/test_errors.rb index 5ceac2a..c9e4c49 100644 --- a/test/redis_client/cluster/test_errors.rb +++ b/test/redis_client/cluster/test_errors.rb @@ -21,7 +21,7 @@ def test_initial_setup_error { errors: '', want: 'Redis client could not fetch cluster information: ' }, { errors: nil, want: 'Redis client could not fetch cluster information: ' } ].each_with_index do |c, idx| - raise ::RedisClient::Cluster::InitialSetupError, c[:errors] + raise ::RedisClient::Cluster::InitialSetupError.from_errors(c[:errors]) rescue StandardError => e assert_equal(c[:want], e.message, "Case: #{idx}") end @@ -34,7 +34,7 @@ def test_orchestration_command_not_supported_error { command: '', want: ' command should be' }, { command: nil, want: ' command should be' } ].each_with_index do |c, idx| - raise ::RedisClient::Cluster::OrchestrationCommandNotSupported, c[:command] + raise ::RedisClient::Cluster::OrchestrationCommandNotSupported.from_command(c[:command]) rescue StandardError => e assert(e.message.start_with?(c[:want]), "Case: #{idx}") end @@ -55,7 +55,7 @@ def test_error_collection_error { errors: '', want: { msg: '', size: 0 } }, { errors: nil, want: { msg: '', size: 0 } } ].each_with_index do |c, idx| - raise ::RedisClient::Cluster::ErrorCollection, c[:errors] + raise ::RedisClient::Cluster::ErrorCollection.with_errors(c[:errors]) rescue StandardError => e assert_equal(c[:want][:msg], e.message, "Case: #{idx}") assert_equal(c[:want][:size], e.errors.size, "Case: #{idx}") @@ -67,7 +67,7 @@ def test_ambiguous_node_error { command: 'MULTI', want: "Cluster client doesn't know which node the MULTI command should be sent to." }, { command: nil, want: "Cluster client doesn't know which node the command should be sent to." } ].each_with_index do |c, idx| - raise ::RedisClient::Cluster::AmbiguousNodeError, c[:command] + raise ::RedisClient::Cluster::AmbiguousNodeError.from_command(c[:command]) rescue StandardError => e assert_equal(e.message, c[:want], "Case: #{idx}") end