Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: make some errors identifiable to which cluster related #401

Merged
merged 1 commit into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/redis_client/cluster/command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class Command
)

class << self
def load(nodes, slow_command_timeout: -1)
def load(nodes, slow_command_timeout: -1) # rubocop:disable Metrics/AbcSize
cmd = errors = nil

nodes&.each do |node|
Expand All @@ -43,7 +43,7 @@ def load(nodes, slow_command_timeout: -1)

return cmd unless cmd.nil?

raise ::RedisClient::Cluster::InitialSetupError, errors
raise ::RedisClient::Cluster::InitialSetupError.from_errors(errors)
end

private
Expand Down
45 changes: 27 additions & 18 deletions lib/redis_client/cluster/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,51 +4,60 @@

class RedisClient
class Cluster
class Error < ::RedisClient::Error
def with_config(config)
@config = config
self
end
end

ERR_ARG_NORMALIZATION = ->(arg) { Array[arg].flatten.reject { |e| e.nil? || (e.respond_to?(:empty?) && e.empty?) } }

private_constant :ERR_ARG_NORMALIZATION

class InitialSetupError < ::RedisClient::Error
def initialize(errors)
class InitialSetupError < Error
def self.from_errors(errors)
msg = ERR_ARG_NORMALIZATION.call(errors).map(&:message).uniq.join(',')
super("Redis client could not fetch cluster information: #{msg}")
new("Redis client could not fetch cluster information: #{msg}")
end
end

class OrchestrationCommandNotSupported < ::RedisClient::Error
def initialize(command)
class OrchestrationCommandNotSupported < Error
def self.from_command(command)
str = ERR_ARG_NORMALIZATION.call(command).map(&:to_s).join(' ').upcase
msg = "#{str} command should be used with care " \
'only by applications orchestrating Redis Cluster, like redis-cli, ' \
'and the command if used out of the right context can leave the cluster ' \
'in a wrong state or cause data loss.'
super(msg)
new(msg)
end
end

class ErrorCollection < ::RedisClient::Error
class ErrorCollection < Error
attr_reader :errors

def initialize(errors)
@errors = {}
def self.with_errors(errors)
if !errors.is_a?(Hash) || errors.empty?
super(errors.to_s)
return
new(errors.to_s).with_errors({})
else
messages = errors.map { |node_key, error| "#{node_key}: (#{error.class}) #{error.message}" }
new(messages.join(', ')).with_errors(errors)
end
end

@errors = errors
messages = @errors.map { |node_key, error| "#{node_key}: (#{error.class}) #{error.message}" }
super(messages.join(', '))
def with_errors(errors)
@errors = errors if @errors.nil?
self
end
end

class AmbiguousNodeError < ::RedisClient::Error
def initialize(command)
super("Cluster client doesn't know which node the #{command} command should be sent to.")
class AmbiguousNodeError < Error
def self.from_command(command)
new("Cluster client doesn't know which node the #{command} command should be sent to.")
end
end

class NodeMightBeDown < ::RedisClient::Error
class NodeMightBeDown < Error
def initialize(_ = '')
super(
'The client is trying to fetch the latest cluster state ' \
Expand Down
8 changes: 4 additions & 4 deletions lib/redis_client/cluster/node.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class Node
private_constant :USE_CHAR_ARRAY_SLOT, :SLOT_SIZE, :MIN_SLOT, :MAX_SLOT,
:DEAD_FLAGS, :ROLE_FLAGS, :EMPTY_ARRAY, :EMPTY_HASH

ReloadNeeded = Class.new(::RedisClient::Error)
ReloadNeeded = Class.new(::RedisClient::Cluster::Error)

Info = Struct.new(
'RedisClusterNode',
Expand Down Expand Up @@ -148,7 +148,7 @@ def send_ping(method, command, args, &block)

raise ReloadNeeded if errors.values.any?(::RedisClient::ConnectionError)

raise ::RedisClient::Cluster::ErrorCollection, errors
raise ::RedisClient::Cluster::ErrorCollection.with_errors(errors)
end

def clients_for_scanning(seed: nil)
Expand Down Expand Up @@ -267,7 +267,7 @@ def call_multiple_nodes!(clients, method, command, args, &block)
result_values, errors = call_multiple_nodes(clients, method, command, args, &block)
return result_values if errors.nil? || errors.empty?

raise ::RedisClient::Cluster::ErrorCollection, errors
raise ::RedisClient::Cluster::ErrorCollection.with_errors(errors)
end

def try_map(clients, &block) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity
Expand Down Expand Up @@ -334,7 +334,7 @@ def refetch_node_info_list(startup_clients) # rubocop:disable Metrics/AbcSize, M

work_group.close

raise ::RedisClient::Cluster::InitialSetupError, errors if node_info_list.nil?
raise ::RedisClient::Cluster::InitialSetupError.from_errors(errors) if node_info_list.nil?

grouped = node_info_list.compact.group_by do |info_list|
info_list.sort_by!(&:id)
Expand Down
8 changes: 4 additions & 4 deletions lib/redis_client/cluster/pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,13 @@ def ensure_connected_cluster_scoped(retryable: true, &block)
end
end

ReplySizeError = Class.new(::RedisClient::Error)
ReplySizeError = Class.new(::RedisClient::Cluster::Error)

class StaleClusterState < ::RedisClient::Error
class StaleClusterState < ::RedisClient::Cluster::Error
attr_accessor :replies, :first_exception
end

class RedirectionNeeded < ::RedisClient::Error
class RedirectionNeeded < ::RedisClient::Cluster::Error
attr_accessor :replies, :indices, :first_exception
end

Expand Down Expand Up @@ -204,7 +204,7 @@ def execute # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Met

work_group.close
@router.renew_cluster_state if cluster_state_errors
raise ::RedisClient::Cluster::ErrorCollection, errors unless errors.nil?
raise ::RedisClient::Cluster::ErrorCollection.with_errors(errors).with_config(@router.config) unless errors.nil?

required_redirections&.each do |node_key, v|
raise v.first_exception if v.first_exception
Expand Down
27 changes: 17 additions & 10 deletions lib/redis_client/cluster/router.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,20 @@ class Router

private_constant :ZERO_CURSOR_FOR_SCAN, :TSF

attr_reader :config

def initialize(config, concurrent_worker, pool: nil, **kwargs)
@config = config.dup
@original_config = config.dup if config.connect_with_original_config
@connect_with_original_config = config.connect_with_original_config
@config = config
@concurrent_worker = concurrent_worker
@pool = pool
@client_kwargs = kwargs
@node = ::RedisClient::Cluster::Node.new(concurrent_worker, config: config, pool: pool, **kwargs)
@node.reload!
@command = ::RedisClient::Cluster::Command.load(@node.replica_clients.shuffle, slow_command_timeout: config.slow_command_timeout)
@command_builder = @config.command_builder
rescue ::RedisClient::Cluster::InitialSetupError => e
e.with_config(config)
raise
end

def send_command(method, command, *args, &block) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
Expand All @@ -58,9 +61,9 @@ def send_command(method, command, *args, &block) # rubocop:disable Metrics/AbcSi
when 'flushall', 'flushdb'
@node.call_primaries(method, command, args).first.then(&TSF.call(block))
when 'readonly', 'readwrite', 'shutdown'
raise ::RedisClient::Cluster::OrchestrationCommandNotSupported, cmd
raise ::RedisClient::Cluster::OrchestrationCommandNotSupported.from_command(cmd).with_config(@config)
when 'discard', 'exec', 'multi', 'unwatch'
raise ::RedisClient::Cluster::AmbiguousNodeError, cmd
raise ::RedisClient::Cluster::AmbiguousNodeError.from_command(cmd).with_config(@config)
else
node = assign_node(command)
try_send(node, method, command, args, &block)
Expand All @@ -69,14 +72,15 @@ def send_command(method, command, *args, &block) # rubocop:disable Metrics/AbcSi
raise
rescue ::RedisClient::Cluster::Node::ReloadNeeded
renew_cluster_state
raise ::RedisClient::Cluster::NodeMightBeDown
raise ::RedisClient::Cluster::NodeMightBeDown.new.with_config(@config)
rescue ::RedisClient::ConnectionError
renew_cluster_state
raise
rescue ::RedisClient::CommandError => e
renew_cluster_state if e.message.start_with?('CLUSTERDOWN')
raise
rescue ::RedisClient::Cluster::ErrorCollection => e
e.with_config(@config)
raise if e.errors.any?(::RedisClient::CircuitBreaker::OpenCircuitError)

renew_cluster_state if e.errors.values.any? do |err|
Expand Down Expand Up @@ -189,7 +193,7 @@ def find_node_key_by_key(key, seed: nil, primary: false)
node_key = primary ? @node.find_node_key_of_primary(slot) : @node.find_node_key_of_replica(slot)
if node_key.nil?
renew_cluster_state
raise ::RedisClient::Cluster::NodeMightBeDown
raise ::RedisClient::Cluster::NodeMightBeDown.new.with_config(@config)
end
node_key
else
Expand Down Expand Up @@ -303,7 +307,7 @@ def send_cluster_command(method, command, args, &block) # rubocop:disable Metric
case subcommand = ::RedisClient::Cluster::NormalizedCmdName.instance.get_by_subcommand(command)
when 'addslots', 'delslots', 'failover', 'forget', 'meet', 'replicate',
'reset', 'set-config-epoch', 'setslot'
raise ::RedisClient::Cluster::OrchestrationCommandNotSupported, ['cluster', subcommand]
raise ::RedisClient::Cluster::OrchestrationCommandNotSupported.from_command(['cluster', subcommand]).with_config(@config)
when 'saveconfig' then @node.call_all(method, command, args).first.then(&TSF.call(block))
when 'getkeysinslot'
raise ArgumentError, command.join(' ') if command.size != 4
Expand Down Expand Up @@ -347,7 +351,10 @@ def send_pubsub_command(method, command, args, &block) # rubocop:disable Metrics
end

def send_watch_command(command)
raise ::RedisClient::Cluster::Transaction::ConsistencyError, 'A block required. And you need to use the block argument as a client for the transaction.' unless block_given?
unless block_given?
msg = 'A block required. And you need to use the block argument as a client for the transaction.'
raise ::RedisClient::Cluster::Transaction::ConsistencyError.new(msg).with_config(@config)
end

::RedisClient::Cluster::OptimisticLocking.new(self).watch(command[1..]) do |c, slot, asking|
transaction = ::RedisClient::Cluster::Transaction.new(
Expand Down Expand Up @@ -401,7 +408,7 @@ def send_multiple_keys_command(cmd, method, command, args, &block) # rubocop:dis
def handle_node_reload_error(retry_count: 1)
yield
rescue ::RedisClient::Cluster::Node::ReloadNeeded
raise ::RedisClient::Cluster::NodeMightBeDown if retry_count <= 0
raise ::RedisClient::Cluster::NodeMightBeDown.new.with_config(@config) if retry_count <= 0

retry_count -= 1
renew_cluster_state
Expand Down
9 changes: 5 additions & 4 deletions lib/redis_client/cluster/transaction.rb
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
# frozen_string_literal: true

require 'redis_client'
require 'redis_client/cluster/errors'
require 'redis_client/cluster/pipeline'

class RedisClient
class Cluster
class Transaction
ConsistencyError = Class.new(::RedisClient::Error)
ConsistencyError = Class.new(::RedisClient::Cluster::Error)

MAX_REDIRECTION = 2
EMPTY_ARRAY = [].freeze
Expand Down Expand Up @@ -67,7 +68,7 @@ def execute
@pending_commands.each(&:call)

return EMPTY_ARRAY if @pipeline._empty?
raise ConsistencyError, "couldn't determine the node: #{@pipeline._commands}" if @node.nil?
raise ConsistencyError.new("couldn't determine the node: #{@pipeline._commands}").with_config(@router.config) if @node.nil?

commit
end
Expand Down Expand Up @@ -163,7 +164,7 @@ def coerce_results!(results, offset: 1)

def handle_command_error!(err, redirect:) # rubocop:disable Metrics/AbcSize
if err.message.start_with?('CROSSSLOT')
raise ConsistencyError, "#{err.message}: #{err.command}"
raise ConsistencyError.new("#{err.message}: #{err.command}").with_config(@router.config)
elsif err.message.start_with?('MOVED')
node = @router.assign_redirection_node(err.message)
send_transaction(node, redirect: redirect - 1)
Expand All @@ -183,7 +184,7 @@ def ensure_the_same_slot!(commands)
return if slots.size == 1 && @watching_slot.nil?
return if slots.size == 1 && @watching_slot == slots.first

raise(ConsistencyError, "the transaction should be executed to a slot in a node: #{commands}")
raise ConsistencyError.new("the transaction should be executed to a slot in a node: #{commands}").with_config(@router.config)
end

def try_asking(node)
Expand Down
15 changes: 14 additions & 1 deletion lib/redis_client/cluster_config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require 'uri'
require 'redis_client'
require 'redis_client/cluster'
require 'redis_client/cluster/errors'
require 'redis_client/cluster/node_key'
require 'redis_client/command_builder'

Expand All @@ -27,7 +28,7 @@ class ClusterConfig
:VALID_SCHEMES, :VALID_NODES_KEYS, :MERGE_CONFIG_KEYS, :IGNORE_GENERIC_CONFIG_KEYS,
:MAX_WORKERS, :SLOW_COMMAND_TIMEOUT, :MAX_STARTUP_SAMPLE

InvalidClientConfigError = Class.new(::RedisClient::Error)
InvalidClientConfigError = Class.new(::RedisClient::Cluster::Error)

attr_reader :command_builder, :client_config, :replica_affinity, :slow_command_timeout,
:connect_with_original_config, :startup_nodes, :max_startup_sample
Expand Down Expand Up @@ -92,6 +93,18 @@ def client_config_for_node(node_key)
augment_client_config(config)
end

def resolved?
true
end

def sentinel?
false
end

def server_url
nil
end

private

def merge_concurrency_option(option)
Expand Down
8 changes: 4 additions & 4 deletions test/redis_client/cluster/test_errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def test_initial_setup_error
{ errors: '', want: 'Redis client could not fetch cluster information: ' },
{ errors: nil, want: 'Redis client could not fetch cluster information: ' }
].each_with_index do |c, idx|
raise ::RedisClient::Cluster::InitialSetupError, c[:errors]
raise ::RedisClient::Cluster::InitialSetupError.from_errors(c[:errors])
rescue StandardError => e
assert_equal(c[:want], e.message, "Case: #{idx}")
end
Expand All @@ -34,7 +34,7 @@ def test_orchestration_command_not_supported_error
{ command: '', want: ' command should be' },
{ command: nil, want: ' command should be' }
].each_with_index do |c, idx|
raise ::RedisClient::Cluster::OrchestrationCommandNotSupported, c[:command]
raise ::RedisClient::Cluster::OrchestrationCommandNotSupported.from_command(c[:command])
rescue StandardError => e
assert(e.message.start_with?(c[:want]), "Case: #{idx}")
end
Expand All @@ -55,7 +55,7 @@ def test_error_collection_error
{ errors: '', want: { msg: '', size: 0 } },
{ errors: nil, want: { msg: '', size: 0 } }
].each_with_index do |c, idx|
raise ::RedisClient::Cluster::ErrorCollection, c[:errors]
raise ::RedisClient::Cluster::ErrorCollection.with_errors(c[:errors])
rescue StandardError => e
assert_equal(c[:want][:msg], e.message, "Case: #{idx}")
assert_equal(c[:want][:size], e.errors.size, "Case: #{idx}")
Expand All @@ -67,7 +67,7 @@ def test_ambiguous_node_error
{ command: 'MULTI', want: "Cluster client doesn't know which node the MULTI command should be sent to." },
{ command: nil, want: "Cluster client doesn't know which node the command should be sent to." }
].each_with_index do |c, idx|
raise ::RedisClient::Cluster::AmbiguousNodeError, c[:command]
raise ::RedisClient::Cluster::AmbiguousNodeError.from_command(c[:command])
rescue StandardError => e
assert_equal(e.message, c[:want], "Case: #{idx}")
end
Expand Down