From 04894951a633fa3957aa5c5cb44690ba556133f5 Mon Sep 17 00:00:00 2001 From: Mikael Henriksson Date: Sat, 21 Jul 2018 17:33:28 +0200 Subject: [PATCH 1/2] Adds documentation --- lib/sidekiq_unique_jobs.rb | 21 ++++++++++ lib/sidekiq_unique_jobs/client/middleware.rb | 13 +++++- lib/sidekiq_unique_jobs/connection.rb | 6 ++- lib/sidekiq_unique_jobs/exceptions.rb | 17 ++++++-- lib/sidekiq_unique_jobs/lock/base_lock.rb | 37 +++++++++++++++- .../lock/until_and_while_executing.rb | 12 ++++++ .../lock/until_executed.rb | 7 ++++ .../lock/until_executing.rb | 7 ++++ lib/sidekiq_unique_jobs/lock/until_expired.rb | 12 ++++++ .../lock/while_executing.rb | 20 +++++++-- .../lock/while_executing_reject.rb | 12 ++++++ lib/sidekiq_unique_jobs/locksmith.rb | 32 ++++++++++++++ lib/sidekiq_unique_jobs/logging.rb | 25 ++++++++++- lib/sidekiq_unique_jobs/normalizer.rb | 6 +++ .../options_with_fallback.rb | 15 ++++++- lib/sidekiq_unique_jobs/scripts.rb | 31 ++++++++++++++ lib/sidekiq_unique_jobs/server/middleware.rb | 10 +++++ .../sidekiq_worker_methods.rb | 16 ++++++- lib/sidekiq_unique_jobs/timeout/calculator.rb | 17 ++++++++ lib/sidekiq_unique_jobs/unique_args.rb | 42 ++++++++++++++++++- lib/sidekiq_unique_jobs/unlockable.rb | 10 +++++ lib/sidekiq_unique_jobs/util.rb | 16 +++++-- 22 files changed, 364 insertions(+), 20 deletions(-) diff --git a/lib/sidekiq_unique_jobs.rb b/lib/sidekiq_unique_jobs.rb index 210bc91df..686255ade 100644 --- a/lib/sidekiq_unique_jobs.rb +++ b/lib/sidekiq_unique_jobs.rb @@ -31,6 +31,11 @@ require 'sidekiq_unique_jobs/sidekiq_unique_ext' require 'sidekiq_unique_jobs/on_conflict' +# Namespace for this gem +# +# Contains configuration and utility methods that belongs top level +# +# @author Mikael Henriksson module SidekiqUniqueJobs include SidekiqUniqueJobs::Connection @@ -44,6 +49,7 @@ module SidekiqUniqueJobs :logger, ) + # The current configuration (See: {.configure} on how to configure) def config # Arguments here need to match the definition of the new class (see above) @config ||= Concurrent::MutableStruct::Config.new( @@ -54,14 +60,20 @@ def config ) end + # The current logger + # @return [Logger] the configured logger def logger config.logger end + # Set a new logger + # @param [Logger] other a new logger def logger=(other) config.logger = other end + # Change global configuration while yielding + # @yield control to the caller def use_config(tmp_config) fail ::ArgumentError, "#{name}.#{__method__} needs a block" unless block_given? @@ -71,6 +83,15 @@ def use_config(tmp_config) configure(old_config) end + # Configure the gem + # + # This is usually called once at startup of an application + # @param [Hash] options global gem options + # @option options [Integer] :default_lock_timeout (default is 0) + # @option options [true,false] :enabled (default is true) + # @option options [String] :unique_prefix (default is 'uniquejobs') + # @option options [Logger] :logger (default is Sidekiq.logger) + # @yield control to the caller when given block def configure(options = {}) if block_given? yield config diff --git a/lib/sidekiq_unique_jobs/client/middleware.rb b/lib/sidekiq_unique_jobs/client/middleware.rb index df54c9545..e2e3c6164 100644 --- a/lib/sidekiq_unique_jobs/client/middleware.rb +++ b/lib/sidekiq_unique_jobs/client/middleware.rb @@ -4,11 +4,20 @@ module SidekiqUniqueJobs module Client + # The unique sidekiq middleware for the client push + # + # @author Mikael Henriksson class Middleware include SidekiqUniqueJobs::Logging include OptionsWithFallback - # :reek:LongParameterList { max_params: 4 } + # Calls this client middleware + # Used from Sidekiq.process_single + # @param [String] worker_class name of the sidekiq worker class + # @param [Hash] item a sidekiq job hash + # @param [String] queue name of the queue + # @param [Sidekiq::RedisConnection, ConnectionPool] redis_pool the redis connection + # @yield when uniqueness is disable or lock successful def call(worker_class, item, queue, redis_pool = nil) @worker_class = worker_class @item = item @@ -20,6 +29,8 @@ def call(worker_class, item, queue, redis_pool = nil) private + # The sidekiq job hash + # @return [Hash] the Sidekiq job hash attr_reader :item def success? diff --git a/lib/sidekiq_unique_jobs/connection.rb b/lib/sidekiq_unique_jobs/connection.rb index be68225f0..8f3cd7ff2 100644 --- a/lib/sidekiq_unique_jobs/connection.rb +++ b/lib/sidekiq_unique_jobs/connection.rb @@ -1,12 +1,16 @@ # frozen_string_literal: true module SidekiqUniqueJobs + # Shared module for dealing with redis connections + # + # @author Mikael Henriksson module Connection def self.included(base) base.send(:extend, self) end - # :reek:UtilityFunction { enabled: false } + # Creates a connection to redis + # @return [Sidekiq::RedisConnection, ConnectionPool] a connection to redis def redis(redis_pool = nil) if redis_pool redis_pool.with { |conn| yield conn } diff --git a/lib/sidekiq_unique_jobs/exceptions.rb b/lib/sidekiq_unique_jobs/exceptions.rb index 4c63b536d..3b64c318b 100644 --- a/lib/sidekiq_unique_jobs/exceptions.rb +++ b/lib/sidekiq_unique_jobs/exceptions.rb @@ -1,18 +1,29 @@ # frozen_string_literal: true module SidekiqUniqueJobs + # Error raised when a Lua script fails to execute + # + # @author Mikael Henriksson class Conflict < StandardError def initialize(item) super("Item with the key: #{item[UNIQUE_DIGEST_KEY]} is already scheduled or processing") end end - class ScriptError < StandardError - def initialize(file_name:, source_exception:) - super("Problem compiling #{file_name}. Message: #{source_exception.message}") + # Error raised from {OnConflict::Raise} + # + # @author Mikael Henriksson + class Conflict < StandardError + # @param [Hash] item the Sidekiq job hash + # @option item [String] :unique_digest the unique digest (See: {UniqueArgs#unique_digest}) + def initialize(item) + super("Item with the key: #{item[UNIQUE_DIGEST_KEY]} is already scheduled or processing") end end + # Error raised from {OptionsWithFallback#lock_class} + # + # @author Mikael Henriksson class UnknownLock < StandardError end end diff --git a/lib/sidekiq_unique_jobs/lock/base_lock.rb b/lib/sidekiq_unique_jobs/lock/base_lock.rb index b21997c21..5a02a5c94 100644 --- a/lib/sidekiq_unique_jobs/lock/base_lock.rb +++ b/lib/sidekiq_unique_jobs/lock/base_lock.rb @@ -2,15 +2,25 @@ module SidekiqUniqueJobs class Lock + # Abstract base class for locks + # + # @abstract + # @author Mikael Henriksson class BaseLock include SidekiqUniqueJobs::Logging + # @param [Hash] item the Sidekiq job hash + # @param [Proc] callback the callback to use after unlock + # @param [Sidekiq::RedisConnection, ConnectionPool] redis_pool the redis connection def initialize(item, callback, redis_pool = nil) @item = prepare_item(item) @callback = callback @redis_pool = redis_pool end + # Handles locking of sidekiq jobs. + # Will call a conflict strategy if lock can't be achieved. + # @return [String] the sidekiq job id def lock if (token = locksmith.lock(item[LOCK_TIMEOUT_KEY])) token @@ -19,30 +29,53 @@ def lock end end + # Execute the job in the Sidekiq server processor + # @raise [NotImplementedError] needs to be implemented in child class def execute raise NotImplementedError, "##{__method__} needs to be implemented in #{self.class}" end + # Unlocks the job from redis + # @return [String] sidekiq job id when successful + # @return [false] when unsuccessful def unlock locksmith.signal(item[JID_KEY]) # Only signal to release the lock end + # Deletes the job from redis if it is locked. def delete locksmith.delete # Soft delete (don't forcefully remove when expiration is set) end + # Forcefully deletes the job from redis. + # This is good for jobs when a previous lock was not unlocked def delete! locksmith.delete! # Force delete the lock end + # Checks if the item has achieved a lock + # @return [true] when this jid has locked the job + # @return [false] when this jid has not locked the job def locked? locksmith.locked?(item[JID_KEY]) end private - attr_reader :item, :redis_pool, :callback + # The sidekiq job hash + # @return [Hash] the Sidekiq job hash + attr_reader :item + # The sidekiq redis pool + # @return [Sidekiq::RedisConnection, ConnectionPool, NilClass] the redis connection + attr_reader :redis_pool + + # The sidekiq job hash + # @return [Proc] the callback to use after unlock + attr_reader :callback + + # The interface to the locking mechanism + # @return [SidekiqUniqueJobs::Locksmith] def locksmith @locksmith ||= SidekiqUniqueJobs::Locksmith.new(item, redis_pool) end @@ -84,7 +117,7 @@ def callback_safely end def strategy - OnConflict.find_strategy(item[ON_CONFLICT_KEY]).new(item) + @strategy ||= OnConflict.find_strategy(item[ON_CONFLICT_KEY]).new(item) end end end diff --git a/lib/sidekiq_unique_jobs/lock/until_and_while_executing.rb b/lib/sidekiq_unique_jobs/lock/until_and_while_executing.rb index e4bc3d248..3c1c559a9 100644 --- a/lib/sidekiq_unique_jobs/lock/until_and_while_executing.rb +++ b/lib/sidekiq_unique_jobs/lock/until_and_while_executing.rb @@ -2,7 +2,19 @@ module SidekiqUniqueJobs class Lock + # Locks jobs while the job is executing in the server process + # - Locks on perform_in or perform_async (see {UntilExecuting}) + # - Unlocks before yielding to the worker's perform method (see {UntilExecuting}) + # - Locks before yielding to the worker's perform method (see {WhileExecuting}) + # - Unlocks after yielding to the worker's perform method (see {WhileExecuting}) + # + # See {#lock} for more information about the client. + # See {#execute} for more information about the server + # + # @author Mikael Henriksson class UntilAndWhileExecuting < BaseLock + # Executes in the Sidekiq server process + # @yield to the worker class perform method def execute return unless locked? unlock diff --git a/lib/sidekiq_unique_jobs/lock/until_executed.rb b/lib/sidekiq_unique_jobs/lock/until_executed.rb index 542383f8c..818e017cf 100644 --- a/lib/sidekiq_unique_jobs/lock/until_executed.rb +++ b/lib/sidekiq_unique_jobs/lock/until_executed.rb @@ -2,9 +2,16 @@ module SidekiqUniqueJobs class Lock + # Locks jobs until the server is done executing the job + # - Locks on perform_in or perform_async + # - Unlocks after yielding to the worker's perform method + # + # @author Mikael Henriksson class UntilExecuted < BaseLock OK ||= 'OK' + # Executes in the Sidekiq server process + # @yield to the worker class perform method def execute return unless locked? with_cleanup { yield } diff --git a/lib/sidekiq_unique_jobs/lock/until_executing.rb b/lib/sidekiq_unique_jobs/lock/until_executing.rb index a895798b6..2b4f30c1d 100644 --- a/lib/sidekiq_unique_jobs/lock/until_executing.rb +++ b/lib/sidekiq_unique_jobs/lock/until_executing.rb @@ -2,7 +2,14 @@ module SidekiqUniqueJobs class Lock + # Locks jobs until {#execute} starts + # - Locks on perform_in or perform_async + # - Unlocks after yielding to the worker's perform method + # + # @author Mikael Henriksson class UntilExecuting < BaseLock + # Executes in the Sidekiq server process + # @yield to the worker class perform method def execute unlock_with_callback yield diff --git a/lib/sidekiq_unique_jobs/lock/until_expired.rb b/lib/sidekiq_unique_jobs/lock/until_expired.rb index 219332ffb..701992fee 100644 --- a/lib/sidekiq_unique_jobs/lock/until_expired.rb +++ b/lib/sidekiq_unique_jobs/lock/until_expired.rb @@ -2,11 +2,23 @@ module SidekiqUniqueJobs class Lock + # Locks jobs until the lock has expired + # - Locks on perform_in or perform_async + # - Unlocks when the expiration is hit + # + # See {#lock} for more information about the client. + # See {#execute} for more information about the server + # + # @author Mikael Henriksson class UntilExpired < BaseLock + # Prevents these locks from being unlocked + # @return [true] always returns true def unlock true end + # Executes in the Sidekiq server process + # @yield to the worker class perform method def execute return unless locked? yield diff --git a/lib/sidekiq_unique_jobs/lock/while_executing.rb b/lib/sidekiq_unique_jobs/lock/while_executing.rb index 16cfd2434..ef10eb7ec 100644 --- a/lib/sidekiq_unique_jobs/lock/while_executing.rb +++ b/lib/sidekiq_unique_jobs/lock/while_executing.rb @@ -2,21 +2,35 @@ module SidekiqUniqueJobs class Lock + # Locks jobs while the job is executing in the server process + # - Locks before yielding to the worker's perform method + # - Unlocks after yielding to the worker's perform method + # + # See {#lock} for more information about the client. + # See {#execute} for more information about the server + # + # @author Mikael Henriksson class WhileExecuting < BaseLock RUN_SUFFIX ||= ':RUN' + # @param [Hash] item the Sidekiq job hash + # @param [Proc] callback callback to call after unlock + # @param [Sidekiq::RedisConnection, ConnectionPool] redis_pool the redis connection def initialize(item, callback, redis_pool = nil) super(item, callback, redis_pool) append_unique_key_suffix end - # Returning true makes sure the client - # can push the job on the queue + # Simulate that a client lock was achieved. + # These locks should only ever be created in the server process. + # @return [true] always returns true def lock true end - # Locks the job with the RUN_SUFFIX appended + # Executes in the Sidekiq server process. + # These jobs are locked in the server process not from the client + # @yield to the worker class perform method def execute return strategy.call unless locksmith.lock(item[LOCK_TIMEOUT_KEY]) with_cleanup { yield } diff --git a/lib/sidekiq_unique_jobs/lock/while_executing_reject.rb b/lib/sidekiq_unique_jobs/lock/while_executing_reject.rb index aa5e42ae2..95fa880d8 100644 --- a/lib/sidekiq_unique_jobs/lock/while_executing_reject.rb +++ b/lib/sidekiq_unique_jobs/lock/while_executing_reject.rb @@ -2,13 +2,25 @@ module SidekiqUniqueJobs class Lock + # Locks jobs while executing + # Locks from the server process + # Unlocks after the server is done processing + # + # See {#lock} for more information about the client. + # See {#execute} for more information about the server + # + # @author Mikael Henriksson class WhileExecutingReject < WhileExecuting + # Executes in the Sidekiq server process + # @yield to the worker class perform method def execute return strategy.call unless locksmith.lock(item[LOCK_TIMEOUT_KEY]) with_cleanup { yield } end + # Overridden with a forced {OnConflict::Reject} strategy + # @return [OnConflict::Reject] a reject strategy def strategy @strategy ||= OnConflict.find_strategy(:reject).new(item) end diff --git a/lib/sidekiq_unique_jobs/locksmith.rb b/lib/sidekiq_unique_jobs/locksmith.rb index 89c80ea2c..a9b11d625 100644 --- a/lib/sidekiq_unique_jobs/locksmith.rb +++ b/lib/sidekiq_unique_jobs/locksmith.rb @@ -1,12 +1,20 @@ # frozen_string_literal: true module SidekiqUniqueJobs + # Lock manager class that handles all the various locks + # + # @author Mikael Henriksson class Locksmith # rubocop:disable ClassLength API_VERSION = '1' EXPIRES_IN = 10 include SidekiqUniqueJobs::Connection + # @param [Hash] item a Sidekiq job hash + # @option item [Integer] :lock_expiration the configured expiration + # @option item [String] :jid the sidekiq job id + # @option item [String] :unique_digest the unique digest (See: {UniqueArgs#unique_digest}) + # @param [Sidekiq::RedisConnection, ConnectionPool] redis_pool the redis connection def initialize(item, redis_pool = nil) @concurrency = 1 # removed in a0cff5bc42edbe7190d6ede7e7f845074d2d7af6 @expiration = item[LOCK_EXPIRATION_KEY] @@ -15,6 +23,8 @@ def initialize(item, redis_pool = nil) @redis_pool = redis_pool end + # Creates the necessary keys in redis to attempt a lock + # @return [String] the Sidekiq job_id def create Scripts.call( :create, @@ -24,21 +34,27 @@ def create ) end + # Checks if the exists key is created in redis + # @return [true, false] def exists? redis(redis_pool) { |conn| conn.exists(exists_key) } end + # The number of available resourced for this lock + # @return [Integer] the number of available resources def available_count return concurrency unless exists? redis(redis_pool) { |conn| conn.llen(available_key) } end + # Deletes the lock unless it has an expiration set def delete return if expiration delete! end + # Deletes the lock regardless of if it has an expiration set def delete! Scripts.call( :delete, @@ -47,6 +63,11 @@ def delete! ) end + # Create a lock for the item + # @param [Integer] timeout the number of seconds to wait for a lock. + # nil means wait indefinitely + # @yield the block to execute if a lock is successful + # @return the Sidekiq job_id (jid) def lock(timeout = nil, &block) create @@ -57,16 +78,27 @@ def lock(timeout = nil, &block) end alias wait lock + # Removes the lock keys from Redis + # @return [false] unless locked? + # @return [String] Sidekiq job_id (jid) if successful def unlock return false unless locked? signal(jid) end + # Removes the lock keys from Redis + # @param [String] token the unique token to check for a lock. + # nil will default to the jid provided in the initializer + # @return [true, false] def locked?(token = nil) token ||= jid redis(redis_pool) { |conn| conn.hexists(grabbed_key, token) } end + # Signal that the token should be released + # @param [String] token the unique token to check for a lockk. + # nil will default to the jid provided in the initializer. + # @return [Integer] the number of available lock resources def signal(token = nil) token ||= jid diff --git a/lib/sidekiq_unique_jobs/logging.rb b/lib/sidekiq_unique_jobs/logging.rb index 37d72e2d3..85d764976 100644 --- a/lib/sidekiq_unique_jobs/logging.rb +++ b/lib/sidekiq_unique_jobs/logging.rb @@ -1,28 +1,51 @@ # frozen_string_literal: true module SidekiqUniqueJobs + # Utility module for reducing the number of uses of logger. + # + # @author Mikael Henriksson module Logging - # :reek:UtilityFunction { enabled: false } + # A convenience method for using the configured logger def logger SidekiqUniqueJobs.logger end + # Logs a message at debug level + # @param message_or_exception [String, Exception] the message or exception to log + # @yield the message or exception to use for log message + # Used for compatibility with logger def log_debug(message_or_exception = nil, &block) logger.debug(message_or_exception, &block) end + # Logs a message at info level + # @param message_or_exception [String, Exception] the message or exception to log + # @yield the message or exception to use for log message + # Used for compatibility with logger def log_info(message_or_exception = nil, &block) logger.info(message_or_exception, &block) end + # Logs a message at warn level + # @param message_or_exception [String, Exception] the message or exception to log + # @yield the message or exception to use for log message + # Used for compatibility with logger def log_warn(message_or_exception = nil, &block) logger.warn(message_or_exception, &block) end + # Logs a message at error level + # @param message_or_exception [String, Exception] the message or exception to log + # @yield the message or exception to use for log message + # Used for compatibility with logger def log_error(message_or_exception = nil, &block) logger.error(message_or_exception, &block) end + # Logs a message at fatal level + # @param message_or_exception [String, Exception] the message or exception to log + # @yield the message or exception to use for log message + # Used for compatibility with logger def log_fatal(message_or_exception = nil, &block) logger.fatal(message_or_exception, &block) end diff --git a/lib/sidekiq_unique_jobs/normalizer.rb b/lib/sidekiq_unique_jobs/normalizer.rb index 3e18097e7..d91f34159 100644 --- a/lib/sidekiq_unique_jobs/normalizer.rb +++ b/lib/sidekiq_unique_jobs/normalizer.rb @@ -3,7 +3,13 @@ require 'json' module SidekiqUniqueJobs + # Normalizes hashes by dumping them to json and loading them from json + # + # @author Mikael Henriksson module Normalizer + # Changes hash to a json compatible hash + # @param [Hash] args + # @return [Hash] a json compatible hash def self.jsonify(args) Sidekiq.load_json(Sidekiq.dump_json(args)) end diff --git a/lib/sidekiq_unique_jobs/options_with_fallback.rb b/lib/sidekiq_unique_jobs/options_with_fallback.rb index 8c9df4325..4386c3e7c 100644 --- a/lib/sidekiq_unique_jobs/options_with_fallback.rb +++ b/lib/sidekiq_unique_jobs/options_with_fallback.rb @@ -1,11 +1,13 @@ # frozen_string_literal: true module SidekiqUniqueJobs - # Shared logic for dealing with options - # This class requires 3 things to be defined in the class including it + # Module containing methods shared between client and server middleware + # + # Requires the following methods to be defined in the including class # 1. item (required) # 2. options (can be nil) # 3. worker_class (required, can be anything) + # @author Mikael Henriksson module OptionsWithFallback LOCKS = { until_and_while_executing: SidekiqUniqueJobs::Lock::UntilAndWhileExecuting, @@ -21,22 +23,30 @@ def self.included(base) base.send(:include, SidekiqUniqueJobs::SidekiqWorkerMethods) end + # Check if unique has been enabled + # @return [true, false] indicate if the gem has been enabled def unique_enabled? SidekiqUniqueJobs.config.enabled && lock_type end + # Check if unique has been disabled def unique_disabled? !unique_enabled? end + # Check if we should log duplicate payloads def log_duplicate_payload? options[LOG_DUPLICATE_KEY] || item[LOG_DUPLICATE_KEY] end + # Check if we should log duplicate payloads + # @return [SidekiqUniqueJobs::Lock::BaseLock] an instance of a child class def lock @lock ||= lock_class.new(item, after_unlock_hook, @redis_pool) end + # Check if we should log duplicate payloads + # @return [SidekiqUniqueJobs::Lock::BaseLock] an instance of a child class def lock_class @lock_class ||= begin LOCKS.fetch(lock_type.to_sym) do @@ -45,6 +55,7 @@ def lock_class end end + # @return [Symbol] def lock_type @lock_type ||= options[LOCK_KEY] || item[LOCK_KEY] || unique_type end diff --git a/lib/sidekiq_unique_jobs/scripts.rb b/lib/sidekiq_unique_jobs/scripts.rb index cffab0bb1..c6f2726e1 100644 --- a/lib/sidekiq_unique_jobs/scripts.rb +++ b/lib/sidekiq_unique_jobs/scripts.rb @@ -5,6 +5,9 @@ require 'concurrent/map' module SidekiqUniqueJobs + # Interface to dealing with .lua files + # + # @author Mikael Henriksson module Scripts LUA_PATHNAME ||= Pathname.new(__FILE__).dirname.join('../../redis').freeze SCRIPT_SHAS ||= Concurrent::Map.new @@ -13,6 +16,14 @@ module Scripts module_function + # Call a lua script with the provided file_name + # @param [Symbol] file_name the name of the lua script + # @param [Sidekiq::RedisConnection, ConnectionPool] redis_pool the redis connection + # @param [Hash] options arguments to pass to the script file + # @option options [Array] :keys the array of keys to pass to the script + # @option options [Array] :argv the array of arguments to pass to the script + # @note this method is recursive if we need to load a lua script + # that wasn't previously loaded. def call(file_name, redis_pool, options = {}) execute_script(file_name, redis_pool, options) rescue Redis::CommandError => ex @@ -21,6 +32,12 @@ def call(file_name, redis_pool, options = {}) end end + # Execute the script file + # @param [Symbol] file_name the name of the lua script + # @param [Sidekiq::RedisConnection, ConnectionPool] redis_pool the redis connection + # @param [Hash] options arguments to pass to the script file + # @option options [Array] :keys the array of keys to pass to the script + # @option options [Array] :argv the array of arguments to pass to the script def execute_script(file_name, redis_pool, options = {}) redis(redis_pool) do |conn| sha = script_sha(conn, file_name) @@ -28,6 +45,10 @@ def execute_script(file_name, redis_pool, options = {}) end end + # Return sha of already loaded lua script or load it and return the sha + # @param [Sidekiq::RedisConnection] conn the redis connection + # @param [Symbol] file_name the name of the lua script + # @return [String] sha of the script file def script_sha(conn, file_name) if (sha = SCRIPT_SHAS.get(file_name)) return sha @@ -38,6 +59,10 @@ def script_sha(conn, file_name) sha end + # Return sha of already loaded lua script or load it and return the sha + # @param [Redis::CommandError] ex exception to handle + # @param [Symbol] file_name the name of the lua script + # @raise [ScriptError] when the error isn't handled def handle_error(ex, file_name) if ex.message == 'NOSCRIPT No matching script. Please use EVAL.' SCRIPT_SHAS.delete(file_name) @@ -47,10 +72,16 @@ def handle_error(ex, file_name) raise ScriptError, file_name: file_name, source_exception: ex end + # Reads the lua file from disk + # @param [Symbol] file_name the name of the lua script + # @return [String] the content of the lua file def script_source(file_name) script_path(file_name).read end + # Construct a Pathname to a lua script + # @param [Symbol] file_name the name of the lua script + # @return [Pathname] the full path to the gems lua script def script_path(file_name) LUA_PATHNAME.join("#{file_name}.lua") end diff --git a/lib/sidekiq_unique_jobs/server/middleware.rb b/lib/sidekiq_unique_jobs/server/middleware.rb index 8b397fe95..bcade621d 100644 --- a/lib/sidekiq_unique_jobs/server/middleware.rb +++ b/lib/sidekiq_unique_jobs/server/middleware.rb @@ -2,9 +2,19 @@ module SidekiqUniqueJobs module Server + # The unique sidekiq middleware for the server processor + # + # @author Mikael Henriksson class Middleware include OptionsWithFallback + # Runs the server middleware + # Used from Sidekiq::Processor#process + # @param [Sidekiq::Worker] worker_class + # @param [Hash] item a sidekiq job hash + # @param [String] queue name of the queue + # @yield when uniqueness is disabled + # @yield when the lock class executes successfully def call(worker_class, item, queue) @worker_class = worker_class @item = item diff --git a/lib/sidekiq_unique_jobs/sidekiq_worker_methods.rb b/lib/sidekiq_unique_jobs/sidekiq_worker_methods.rb index be0cd795a..06b22e355 100644 --- a/lib/sidekiq_unique_jobs/sidekiq_worker_methods.rb +++ b/lib/sidekiq_unique_jobs/sidekiq_worker_methods.rb @@ -1,24 +1,38 @@ # frozen_string_literal: true module SidekiqUniqueJobs + # Module with convenience methods for the Sidekiq::Worker class + # + # @author Mikael Henriksson module SidekiqWorkerMethods + # Avoids duplicating worker_class.respond_to? in multiple places + # @return [true, false] def worker_method_defined?(method_sym) worker_class.respond_to?(method_sym) end + # Wraps #get_sidekiq_options to always work with a hash + # @return [Hash] of the worker class sidekiq options def worker_options return {} unless sidekiq_worker_class? worker_class.get_sidekiq_options.stringify_keys end + # Tests that the + # @return [true] if worker_class responds to get_sidekiq_options + # @return [false] if worker_class does not respond to get_sidekiq_options def sidekiq_worker_class? worker_method_defined?(:get_sidekiq_options) end + # The Sidekiq::Worker implementation + # @return [Sidekiq::Worker] def worker_class @_worker_class ||= worker_class_constantize # rubocop:disable Naming/MemoizedInstanceVariableName end + # The hook to call after a successful unlock + # @return [Proc] def after_unlock_hook -> { worker_class.after_unlock if worker_method_defined?(:after_unlock) } end @@ -26,7 +40,7 @@ def after_unlock_hook # Attempt to constantize a string worker_class argument, always # failing back to the original argument when the constant can't be found # - # raises an error for other errors + # @return [Sidekiq::Worker] def worker_class_constantize(klazz = @worker_class) return klazz unless klazz.is_a?(String) Object.const_get(klazz) diff --git a/lib/sidekiq_unique_jobs/timeout/calculator.rb b/lib/sidekiq_unique_jobs/timeout/calculator.rb index fb7905d12..1d73ee547 100644 --- a/lib/sidekiq_unique_jobs/timeout/calculator.rb +++ b/lib/sidekiq_unique_jobs/timeout/calculator.rb @@ -2,24 +2,39 @@ module SidekiqUniqueJobs module Timeout + # Calculates timeout and expiration + # + # @author Mikael Henriksson class Calculator include SidekiqUniqueJobs::SidekiqWorkerMethods + + # @attr [Hash] item the Sidekiq job hash attr_reader :item + # @param [Hash] item the Sidekiq job hash + # @option item [Integer, nil] :lock_expiration the configured lock expiration + # @option item [Integer, nil] :lock_timeout the configured lock timeout + # @option item [String] :class the class of the sidekiq worker + # @option item [Float] :at the unix time the job is scheduled at def initialize(item) @item = item @worker_class = item[CLASS_KEY] end + # The time until a job is scheduled + # @return [Integer] the number of seconds until job is scheduled def time_until_scheduled return 0 unless scheduled_at scheduled_at.to_i - Time.now.utc.to_i end + # The time a job is scheduled + # @return [Float] the exact unix time the job is scheduled at def scheduled_at @scheduled_at ||= item[AT_KEY] end + # The configured lock_expiration def lock_expiration @lock_expiration ||= begin expiration = item[LOCK_EXPIRATION_KEY] @@ -28,6 +43,7 @@ def lock_expiration end end + # The configured lock_timeout def lock_timeout @lock_timeout = begin timeout = default_worker_options[LOCK_TIMEOUT_KEY] @@ -37,6 +53,7 @@ def lock_timeout end end + # The default lock_timeout of this gem def default_lock_timeout SidekiqUniqueJobs.config.default_lock_timeout end diff --git a/lib/sidekiq_unique_jobs/unique_args.rb b/lib/sidekiq_unique_jobs/unique_args.rb index f875d1a56..d121bb77c 100644 --- a/lib/sidekiq_unique_jobs/unique_args.rb +++ b/lib/sidekiq_unique_jobs/unique_args.rb @@ -4,17 +4,25 @@ require 'sidekiq_unique_jobs/normalizer' module SidekiqUniqueJobs - # This class exists to be testable and the entire api should be considered private + # Handles uniqueness of sidekiq arguments + # + # @author Mikael Henriksson class UniqueArgs include SidekiqUniqueJobs::Logging include SidekiqUniqueJobs::SidekiqWorkerMethods + # Convenience method for returning a digest + # @param [Hash] item a Sidekiq job hash + # @return [String] a unique digest def self.digest(item) new(item).unique_digest end + # The sidekiq job hash + # @return [Hash] the Sidekiq job hash attr_reader :item + # @param [Hash] item a Sidekiq job hash def initialize(item) @item = item @worker_class = item[CLASS_KEY] @@ -22,25 +30,35 @@ def initialize(item) add_uniqueness_to_item end + # Appends the keys unique_prefix, unique_args and {#unique_digest} to the sidekiq job hash {#item} + # @return [void] def add_uniqueness_to_item item[UNIQUE_PREFIX_KEY] ||= unique_prefix item[UNIQUE_ARGS_KEY] = unique_args(item[ARGS_KEY]) item[UNIQUE_DIGEST_KEY] = unique_digest end + # Memoized unique_digest + # @return [String] a unique digest def unique_digest @unique_digest ||= create_digest end + # Creates a namespaced unique digest based on the {#digestable_hash} and the {#unique_prefix} + # @return [String] a unique digest def create_digest digest = Digest::MD5.hexdigest(Sidekiq.dump_json(digestable_hash)) "#{unique_prefix}:#{digest}" end + # A prefix to use as namespace for the {#unique_digest} + # @return [String] a unique digest def unique_prefix worker_options[UNIQUE_PREFIX_KEY] || SidekiqUniqueJobs.config.unique_prefix end + # Filter a hash to use for digest + # @return [Hash] to use for digest def digestable_hash @item.slice(CLASS_KEY, QUEUE_KEY, UNIQUE_ARGS_KEY).tap do |hash| hash.delete(QUEUE_KEY) if unique_across_queues? @@ -48,26 +66,37 @@ def digestable_hash end end + # The unique arguments to use for creating a lock + # @return [Array] the arguments filters by the {#filtered_args} method if {#unique_args_enabled?} def unique_args(args) return filtered_args(args) if unique_args_enabled? args end + # Checks if we should disregard the queue when creating the unique digest + # @return [true, false] def unique_across_queues? item[UNIQUE_ACROSS_QUEUES_KEY] || worker_options[UNIQUE_ACROSS_QUEUES_KEY] || item[UNIQUE_ON_ALL_QUEUES_KEY] || worker_options[UNIQUE_ON_ALL_QUEUES_KEY] # TODO: Remove in v 6.1 end + # Checks if we should disregard the worker when creating the unique digest + # @return [true, false] def unique_across_workers? item[UNIQUE_ACROSS_WORKERS_KEY] || worker_options[UNIQUE_ACROSS_WORKERS_KEY] end + # Checks if the worker class has been enabled for unique_args? + # @return [true, false] def unique_args_enabled? unique_args_method # && !unique_args_method.is_a?(Boolean) end # Filters unique arguments by proc or symbol - # returns provided arguments for other configurations + # @param [Array] args the arguments passed to the sidekiq worker + # @return [Array] {#filter_by_proc} when {#unique_args_method} is a Proc + # @return [Array] {#filter_by_symbol} when {#unique_args_method} is a Symbol + # @return [Array] args unfiltered when neither of the above def filtered_args(args) return args if args.empty? json_args = Normalizer.jsonify(args) @@ -83,10 +112,17 @@ def filtered_args(args) end end + # Filters unique arguments by proc configured in the sidekiq worker + # @param [Array] args the arguments passed to the sidekiq worker + # @return [Array] with the filtered arguments def filter_by_proc(args) unique_args_method.call(args) end + # Filters unique arguments by method configured in the sidekiq worker + # @param [Array] args the arguments passed to the sidekiq worker + # @return [Array] unfiltered unless {#worker_method_defined?} + # @return [Array] with the filtered arguments def filter_by_symbol(args) return args unless worker_method_defined?(unique_args_method) @@ -96,12 +132,14 @@ def filter_by_symbol(args) args end + # The method to use for filtering unique arguments def unique_args_method @unique_args_method ||= worker_options[UNIQUE_ARGS_KEY] @unique_args_method ||= :unique_args if worker_method_defined?(:unique_args) @unique_args_method ||= default_unique_args_method end + # The global worker options defined in Sidekiq directly def default_unique_args_method Sidekiq.default_worker_options.stringify_keys[UNIQUE_ARGS_KEY] end diff --git a/lib/sidekiq_unique_jobs/unlockable.rb b/lib/sidekiq_unique_jobs/unlockable.rb index 11d0bc3e7..e4465f4bd 100644 --- a/lib/sidekiq_unique_jobs/unlockable.rb +++ b/lib/sidekiq_unique_jobs/unlockable.rb @@ -1,14 +1,24 @@ # frozen_string_literal: true module SidekiqUniqueJobs + # Utility module to help manage unique keys in redis. + # Useful for deleting keys that for whatever reason wasn't deleted + # + # @author Mikael Henriksson module Unlockable module_function + # Unlocks a job. + # @param [Hash] item a Sidekiq job hash def unlock(item) SidekiqUniqueJobs::UniqueArgs.digest(item) SidekiqUniqueJobs::Locksmith.new(item).unlock end + # Deletes a lock regardless of if it was locked or not. + # + # This is good for situations when a job is locked by another item + # @param [Hash] item a Sidekiq job hash def delete(item) SidekiqUniqueJobs::UniqueArgs.digest(item) SidekiqUniqueJobs::Locksmith.new(item).delete! diff --git a/lib/sidekiq_unique_jobs/util.rb b/lib/sidekiq_unique_jobs/util.rb index 366022690..bb40398cf 100644 --- a/lib/sidekiq_unique_jobs/util.rb +++ b/lib/sidekiq_unique_jobs/util.rb @@ -1,6 +1,10 @@ # frozen_string_literal: true module SidekiqUniqueJobs + # Utility module to help manage unique keys in redis. + # Useful for deleting keys that for whatever reason wasn't deleted + # + # @author Mikael Henriksson module Util COUNT = 'COUNT' DEFAULT_COUNT = 1_000 @@ -12,6 +16,11 @@ module Util include SidekiqUniqueJobs::Connection extend self # rubocop:disable Style/ModuleFunction + # Find unique keys in redis + # + # @param [String] pattern a pattern to scan for in redis + # @param [Integer] count the maximum number of keys to delete + # @return [Array] an array with active unique keys def keys(pattern = SCAN_PATTERN, count = DEFAULT_COUNT) return redis(&:keys) if pattern.nil? redis { |conn| conn.scan_each(match: prefix(pattern), count: count).to_a } @@ -19,10 +28,9 @@ def keys(pattern = SCAN_PATTERN, count = DEFAULT_COUNT) # Deletes unique keys from redis # - # - # @param pattern [String] a pattern to scan for in redis - # @param count [Integer] the maximum number of keys to delete - # @return [Boolean] report success + # @param [String] pattern a pattern to scan for in redis + # @param [Integer] count the maximum number of keys to delete + # @return [Integer] the number of keys deleted def del(pattern = SCAN_PATTERN, count = 0) raise ArgumentError, 'Please provide a number of keys to delete greater than zero' if count.zero? pattern = "#{pattern}:*" unless pattern.end_with?(':*') From 36afac3855e0179c5daf228e47999095ccd09931 Mon Sep 17 00:00:00 2001 From: Mikael Henriksson Date: Sat, 21 Jul 2018 19:23:15 +0200 Subject: [PATCH 2/2] Fix broken build --- lib/sidekiq_unique_jobs/exceptions.rb | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/lib/sidekiq_unique_jobs/exceptions.rb b/lib/sidekiq_unique_jobs/exceptions.rb index 3b64c318b..9bea66dce 100644 --- a/lib/sidekiq_unique_jobs/exceptions.rb +++ b/lib/sidekiq_unique_jobs/exceptions.rb @@ -13,11 +13,11 @@ def initialize(item) # Error raised from {OnConflict::Raise} # # @author Mikael Henriksson - class Conflict < StandardError - # @param [Hash] item the Sidekiq job hash - # @option item [String] :unique_digest the unique digest (See: {UniqueArgs#unique_digest}) - def initialize(item) - super("Item with the key: #{item[UNIQUE_DIGEST_KEY]} is already scheduled or processing") + class ScriptError < StandardError + # @param [Symbol] file_name the name of the lua script + # @param [Redis::CommandError] ex exception to handle + def initialize(file_name:, source_exception:) + super("Problem compiling #{file_name}. Message: #{source_exception.message}") end end