Skip to content

Commit

Permalink
fix: make some errors identifiable to which cluster related
Browse files Browse the repository at this point in the history
  • Loading branch information
supercaracal committed Nov 14, 2024
1 parent 972f03e commit 9da11d0
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 47 deletions.
4 changes: 2 additions & 2 deletions lib/redis_client/cluster/command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class Command
)

class << self
def load(nodes, slow_command_timeout: -1)
def load(nodes, slow_command_timeout: -1) # rubocop:disable Metrics/AbcSize
cmd = errors = nil

nodes&.each do |node|
Expand All @@ -43,7 +43,7 @@ def load(nodes, slow_command_timeout: -1)

return cmd unless cmd.nil?

raise ::RedisClient::Cluster::InitialSetupError, errors
raise ::RedisClient::Cluster::InitialSetupError.from_errors(errors)
end

private
Expand Down
45 changes: 27 additions & 18 deletions lib/redis_client/cluster/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,51 +4,60 @@

class RedisClient
class Cluster
class Error < ::RedisClient::Error
def with_config(config)
@config = config
self
end
end

ERR_ARG_NORMALIZATION = ->(arg) { Array[arg].flatten.reject { |e| e.nil? || (e.respond_to?(:empty?) && e.empty?) } }

private_constant :ERR_ARG_NORMALIZATION

class InitialSetupError < ::RedisClient::Error
def initialize(errors)
class InitialSetupError < Error
def self.from_errors(errors)
msg = ERR_ARG_NORMALIZATION.call(errors).map(&:message).uniq.join(',')
super("Redis client could not fetch cluster information: #{msg}")
new("Redis client could not fetch cluster information: #{msg}")
end
end

class OrchestrationCommandNotSupported < ::RedisClient::Error
def initialize(command)
class OrchestrationCommandNotSupported < Error
def self.from_command(command)
str = ERR_ARG_NORMALIZATION.call(command).map(&:to_s).join(' ').upcase
msg = "#{str} command should be used with care " \
'only by applications orchestrating Redis Cluster, like redis-cli, ' \
'and the command if used out of the right context can leave the cluster ' \
'in a wrong state or cause data loss.'
super(msg)
new(msg)
end
end

class ErrorCollection < ::RedisClient::Error
class ErrorCollection < Error
attr_reader :errors

def initialize(errors)
@errors = {}
def self.with_errors(errors)
if !errors.is_a?(Hash) || errors.empty?
super(errors.to_s)
return
new(errors.to_s).with_errors({})
else
messages = errors.map { |node_key, error| "#{node_key}: (#{error.class}) #{error.message}" }
new(messages.join(', ')).with_errors(errors)
end
end

@errors = errors
messages = @errors.map { |node_key, error| "#{node_key}: (#{error.class}) #{error.message}" }
super(messages.join(', '))
def with_errors(errors)
@errors = errors if @errors.nil?
self
end
end

class AmbiguousNodeError < ::RedisClient::Error
def initialize(command)
super("Cluster client doesn't know which node the #{command} command should be sent to.")
class AmbiguousNodeError < Error
def self.from_command(command)
new("Cluster client doesn't know which node the #{command} command should be sent to.")
end
end

class NodeMightBeDown < ::RedisClient::Error
class NodeMightBeDown < Error
def initialize(_ = '')
super(
'The client is trying to fetch the latest cluster state ' \
Expand Down
8 changes: 4 additions & 4 deletions lib/redis_client/cluster/node.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class Node
private_constant :USE_CHAR_ARRAY_SLOT, :SLOT_SIZE, :MIN_SLOT, :MAX_SLOT,
:DEAD_FLAGS, :ROLE_FLAGS, :EMPTY_ARRAY, :EMPTY_HASH

ReloadNeeded = Class.new(::RedisClient::Error)
ReloadNeeded = Class.new(::RedisClient::Cluster::Error)

Info = Struct.new(
'RedisClusterNode',
Expand Down Expand Up @@ -148,7 +148,7 @@ def send_ping(method, command, args, &block)

raise ReloadNeeded if errors.values.any?(::RedisClient::ConnectionError)

raise ::RedisClient::Cluster::ErrorCollection, errors
raise ::RedisClient::Cluster::ErrorCollection.with_errors(errors)
end

def clients_for_scanning(seed: nil)
Expand Down Expand Up @@ -267,7 +267,7 @@ def call_multiple_nodes!(clients, method, command, args, &block)
result_values, errors = call_multiple_nodes(clients, method, command, args, &block)
return result_values if errors.nil? || errors.empty?

raise ::RedisClient::Cluster::ErrorCollection, errors
raise ::RedisClient::Cluster::ErrorCollection.with_errors(errors)
end

def try_map(clients, &block) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity
Expand Down Expand Up @@ -334,7 +334,7 @@ def refetch_node_info_list(startup_clients) # rubocop:disable Metrics/AbcSize, M

work_group.close

raise ::RedisClient::Cluster::InitialSetupError, errors if node_info_list.nil?
raise ::RedisClient::Cluster::InitialSetupError.from_errors(errors) if node_info_list.nil?

grouped = node_info_list.compact.group_by do |info_list|
info_list.sort_by!(&:id)
Expand Down
8 changes: 4 additions & 4 deletions lib/redis_client/cluster/pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,13 @@ def ensure_connected_cluster_scoped(retryable: true, &block)
end
end

ReplySizeError = Class.new(::RedisClient::Error)
ReplySizeError = Class.new(::RedisClient::Cluster::Error)

class StaleClusterState < ::RedisClient::Error
class StaleClusterState < ::RedisClient::Cluster::Error
attr_accessor :replies, :first_exception
end

class RedirectionNeeded < ::RedisClient::Error
class RedirectionNeeded < ::RedisClient::Cluster::Error
attr_accessor :replies, :indices, :first_exception
end

Expand Down Expand Up @@ -204,7 +204,7 @@ def execute # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Met

work_group.close
@router.renew_cluster_state if cluster_state_errors
raise ::RedisClient::Cluster::ErrorCollection, errors unless errors.nil?
raise ::RedisClient::Cluster::ErrorCollection.with_errors(errors).with_config(@router.config) unless errors.nil?

required_redirections&.each do |node_key, v|
raise v.first_exception if v.first_exception
Expand Down
27 changes: 17 additions & 10 deletions lib/redis_client/cluster/router.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,20 @@ class Router

private_constant :ZERO_CURSOR_FOR_SCAN, :TSF

attr_reader :config

def initialize(config, concurrent_worker, pool: nil, **kwargs)
@config = config.dup
@original_config = config.dup if config.connect_with_original_config
@connect_with_original_config = config.connect_with_original_config
@config = config
@concurrent_worker = concurrent_worker
@pool = pool
@client_kwargs = kwargs
@node = ::RedisClient::Cluster::Node.new(concurrent_worker, config: config, pool: pool, **kwargs)
@node.reload!
@command = ::RedisClient::Cluster::Command.load(@node.replica_clients.shuffle, slow_command_timeout: config.slow_command_timeout)
@command_builder = @config.command_builder
rescue ::RedisClient::Cluster::InitialSetupError => e
e.with_config(config)
raise
end

def send_command(method, command, *args, &block) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
Expand All @@ -58,9 +61,9 @@ def send_command(method, command, *args, &block) # rubocop:disable Metrics/AbcSi
when 'flushall', 'flushdb'
@node.call_primaries(method, command, args).first.then(&TSF.call(block))
when 'readonly', 'readwrite', 'shutdown'
raise ::RedisClient::Cluster::OrchestrationCommandNotSupported, cmd
raise ::RedisClient::Cluster::OrchestrationCommandNotSupported.from_command(cmd).with_config(@config)
when 'discard', 'exec', 'multi', 'unwatch'
raise ::RedisClient::Cluster::AmbiguousNodeError, cmd
raise ::RedisClient::Cluster::AmbiguousNodeError.from_command(cmd).with_config(@config)
else
node = assign_node(command)
try_send(node, method, command, args, &block)
Expand All @@ -69,14 +72,15 @@ def send_command(method, command, *args, &block) # rubocop:disable Metrics/AbcSi
raise
rescue ::RedisClient::Cluster::Node::ReloadNeeded
renew_cluster_state
raise ::RedisClient::Cluster::NodeMightBeDown
raise ::RedisClient::Cluster::NodeMightBeDown.new.with_config(@config)
rescue ::RedisClient::ConnectionError
renew_cluster_state
raise
rescue ::RedisClient::CommandError => e
renew_cluster_state if e.message.start_with?('CLUSTERDOWN')
raise
rescue ::RedisClient::Cluster::ErrorCollection => e
e.with_config(@config)
raise if e.errors.any?(::RedisClient::CircuitBreaker::OpenCircuitError)

renew_cluster_state if e.errors.values.any? do |err|
Expand Down Expand Up @@ -189,7 +193,7 @@ def find_node_key_by_key(key, seed: nil, primary: false)
node_key = primary ? @node.find_node_key_of_primary(slot) : @node.find_node_key_of_replica(slot)
if node_key.nil?
renew_cluster_state
raise ::RedisClient::Cluster::NodeMightBeDown
raise ::RedisClient::Cluster::NodeMightBeDown.new.with_config(@config)
end
node_key
else
Expand Down Expand Up @@ -303,7 +307,7 @@ def send_cluster_command(method, command, args, &block) # rubocop:disable Metric
case subcommand = ::RedisClient::Cluster::NormalizedCmdName.instance.get_by_subcommand(command)
when 'addslots', 'delslots', 'failover', 'forget', 'meet', 'replicate',
'reset', 'set-config-epoch', 'setslot'
raise ::RedisClient::Cluster::OrchestrationCommandNotSupported, ['cluster', subcommand]
raise ::RedisClient::Cluster::OrchestrationCommandNotSupported.from_command(['cluster', subcommand]).with_config(@config)
when 'saveconfig' then @node.call_all(method, command, args).first.then(&TSF.call(block))
when 'getkeysinslot'
raise ArgumentError, command.join(' ') if command.size != 4
Expand Down Expand Up @@ -347,7 +351,10 @@ def send_pubsub_command(method, command, args, &block) # rubocop:disable Metrics
end

def send_watch_command(command)
raise ::RedisClient::Cluster::Transaction::ConsistencyError, 'A block required. And you need to use the block argument as a client for the transaction.' unless block_given?
unless block_given?
msg = 'A block required. And you need to use the block argument as a client for the transaction.'
raise ::RedisClient::Cluster::Transaction::ConsistencyError.new(msg).with_config(@config)
end

::RedisClient::Cluster::OptimisticLocking.new(self).watch(command[1..]) do |c, slot, asking|
transaction = ::RedisClient::Cluster::Transaction.new(
Expand Down Expand Up @@ -401,7 +408,7 @@ def send_multiple_keys_command(cmd, method, command, args, &block) # rubocop:dis
def handle_node_reload_error(retry_count: 1)
yield
rescue ::RedisClient::Cluster::Node::ReloadNeeded
raise ::RedisClient::Cluster::NodeMightBeDown if retry_count <= 0
raise ::RedisClient::Cluster::NodeMightBeDown.new.with_config(@config) if retry_count <= 0

retry_count -= 1
renew_cluster_state
Expand Down
9 changes: 5 additions & 4 deletions lib/redis_client/cluster/transaction.rb
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
# frozen_string_literal: true

require 'redis_client'
require 'redis_client/cluster/errors'
require 'redis_client/cluster/pipeline'

class RedisClient
class Cluster
class Transaction
ConsistencyError = Class.new(::RedisClient::Error)
ConsistencyError = Class.new(::RedisClient::Cluster::Error)

MAX_REDIRECTION = 2
EMPTY_ARRAY = [].freeze
Expand Down Expand Up @@ -67,7 +68,7 @@ def execute
@pending_commands.each(&:call)

return EMPTY_ARRAY if @pipeline._empty?
raise ConsistencyError, "couldn't determine the node: #{@pipeline._commands}" if @node.nil?
raise ConsistencyError.new("couldn't determine the node: #{@pipeline._commands}").with_config(@router.config) if @node.nil?

commit
end
Expand Down Expand Up @@ -163,7 +164,7 @@ def coerce_results!(results, offset: 1)

def handle_command_error!(err, redirect:) # rubocop:disable Metrics/AbcSize
if err.message.start_with?('CROSSSLOT')
raise ConsistencyError, "#{err.message}: #{err.command}"
raise ConsistencyError.new("#{err.message}: #{err.command}").with_config(@router.config)
elsif err.message.start_with?('MOVED')
node = @router.assign_redirection_node(err.message)
send_transaction(node, redirect: redirect - 1)
Expand All @@ -183,7 +184,7 @@ def ensure_the_same_slot!(commands)
return if slots.size == 1 && @watching_slot.nil?
return if slots.size == 1 && @watching_slot == slots.first

raise(ConsistencyError, "the transaction should be executed to a slot in a node: #{commands}")
raise ConsistencyError.new("the transaction should be executed to a slot in a node: #{commands}").with_config(@router.config)
end

def try_asking(node)
Expand Down
15 changes: 14 additions & 1 deletion lib/redis_client/cluster_config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require 'uri'
require 'redis_client'
require 'redis_client/cluster'
require 'redis_client/cluster/errors'
require 'redis_client/cluster/node_key'
require 'redis_client/command_builder'

Expand All @@ -27,7 +28,7 @@ class ClusterConfig
:VALID_SCHEMES, :VALID_NODES_KEYS, :MERGE_CONFIG_KEYS, :IGNORE_GENERIC_CONFIG_KEYS,
:MAX_WORKERS, :SLOW_COMMAND_TIMEOUT, :MAX_STARTUP_SAMPLE

InvalidClientConfigError = Class.new(::RedisClient::Error)
InvalidClientConfigError = Class.new(::RedisClient::Cluster::Error)

attr_reader :command_builder, :client_config, :replica_affinity, :slow_command_timeout,
:connect_with_original_config, :startup_nodes, :max_startup_sample
Expand Down Expand Up @@ -92,6 +93,18 @@ def client_config_for_node(node_key)
augment_client_config(config)
end

def resolved?
true
end

def sentinel?
false
end

def server_url
nil
end

private

def merge_concurrency_option(option)
Expand Down
8 changes: 4 additions & 4 deletions test/redis_client/cluster/test_errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def test_initial_setup_error
{ errors: '', want: 'Redis client could not fetch cluster information: ' },
{ errors: nil, want: 'Redis client could not fetch cluster information: ' }
].each_with_index do |c, idx|
raise ::RedisClient::Cluster::InitialSetupError, c[:errors]
raise ::RedisClient::Cluster::InitialSetupError.from_errors(c[:errors])
rescue StandardError => e
assert_equal(c[:want], e.message, "Case: #{idx}")
end
Expand All @@ -34,7 +34,7 @@ def test_orchestration_command_not_supported_error
{ command: '', want: ' command should be' },
{ command: nil, want: ' command should be' }
].each_with_index do |c, idx|
raise ::RedisClient::Cluster::OrchestrationCommandNotSupported, c[:command]
raise ::RedisClient::Cluster::OrchestrationCommandNotSupported.from_command(c[:command])
rescue StandardError => e
assert(e.message.start_with?(c[:want]), "Case: #{idx}")
end
Expand All @@ -55,7 +55,7 @@ def test_error_collection_error
{ errors: '', want: { msg: '', size: 0 } },
{ errors: nil, want: { msg: '', size: 0 } }
].each_with_index do |c, idx|
raise ::RedisClient::Cluster::ErrorCollection, c[:errors]
raise ::RedisClient::Cluster::ErrorCollection.with_errors(c[:errors])
rescue StandardError => e
assert_equal(c[:want][:msg], e.message, "Case: #{idx}")
assert_equal(c[:want][:size], e.errors.size, "Case: #{idx}")
Expand All @@ -67,7 +67,7 @@ def test_ambiguous_node_error
{ command: 'MULTI', want: "Cluster client doesn't know which node the MULTI command should be sent to." },
{ command: nil, want: "Cluster client doesn't know which node the command should be sent to." }
].each_with_index do |c, idx|
raise ::RedisClient::Cluster::AmbiguousNodeError, c[:command]
raise ::RedisClient::Cluster::AmbiguousNodeError.from_command(c[:command])
rescue StandardError => e
assert_equal(e.message, c[:want], "Case: #{idx}")
end
Expand Down

0 comments on commit 9da11d0

Please sign in to comment.