Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Document code #296

Merged
merged 2 commits into from
Jul 22, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions lib/sidekiq_unique_jobs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@
require 'sidekiq_unique_jobs/sidekiq_unique_ext'
require 'sidekiq_unique_jobs/on_conflict'

# Namespace for this gem
#
# Contains configuration and utility methods that belongs top level
#
# @author Mikael Henriksson <mikael@zoolutions.se>
module SidekiqUniqueJobs
include SidekiqUniqueJobs::Connection

Expand All @@ -44,6 +49,7 @@ module SidekiqUniqueJobs
:logger,
)

# The current configuration (See: {.configure} on how to configure)
def config
# Arguments here need to match the definition of the new class (see above)
@config ||= Concurrent::MutableStruct::Config.new(
Expand All @@ -54,14 +60,20 @@ def config
)
end

# The current logger
# @return [Logger] the configured logger
def logger
config.logger
end

# Set a new logger
# @param [Logger] other a new logger
def logger=(other)
config.logger = other
end

# Change global configuration while yielding
# @yield control to the caller
def use_config(tmp_config)
fail ::ArgumentError, "#{name}.#{__method__} needs a block" unless block_given?

Expand All @@ -71,6 +83,15 @@ def use_config(tmp_config)
configure(old_config)
end

# Configure the gem
#
# This is usually called once at startup of an application
# @param [Hash] options global gem options
# @option options [Integer] :default_lock_timeout (default is 0)
# @option options [true,false] :enabled (default is true)
# @option options [String] :unique_prefix (default is 'uniquejobs')
# @option options [Logger] :logger (default is Sidekiq.logger)
# @yield control to the caller when given block
def configure(options = {})
if block_given?
yield config
Expand Down
13 changes: 12 additions & 1 deletion lib/sidekiq_unique_jobs/client/middleware.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,20 @@

module SidekiqUniqueJobs
module Client
# The unique sidekiq middleware for the client push
#
# @author Mikael Henriksson <mikael@zoolutions.se>
class Middleware
include SidekiqUniqueJobs::Logging
include OptionsWithFallback

# :reek:LongParameterList { max_params: 4 }
# Calls this client middleware
# Used from Sidekiq.process_single
# @param [String] worker_class name of the sidekiq worker class
# @param [Hash] item a sidekiq job hash
# @param [String] queue name of the queue
# @param [Sidekiq::RedisConnection, ConnectionPool] redis_pool the redis connection
# @yield when uniqueness is disable or lock successful
def call(worker_class, item, queue, redis_pool = nil)
@worker_class = worker_class
@item = item
Expand All @@ -20,6 +29,8 @@ def call(worker_class, item, queue, redis_pool = nil)

private

# The sidekiq job hash
# @return [Hash] the Sidekiq job hash
attr_reader :item

def success?
Expand Down
6 changes: 5 additions & 1 deletion lib/sidekiq_unique_jobs/connection.rb
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
# frozen_string_literal: true

module SidekiqUniqueJobs
# Shared module for dealing with redis connections
#
# @author Mikael Henriksson <mikael@zoolutions.se>
module Connection
def self.included(base)
base.send(:extend, self)
end

# :reek:UtilityFunction { enabled: false }
# Creates a connection to redis
# @return [Sidekiq::RedisConnection, ConnectionPool] a connection to redis
def redis(redis_pool = nil)
if redis_pool
redis_pool.with { |conn| yield conn }
Expand Down
11 changes: 11 additions & 0 deletions lib/sidekiq_unique_jobs/exceptions.rb
Original file line number Diff line number Diff line change
@@ -1,18 +1,29 @@
# frozen_string_literal: true

module SidekiqUniqueJobs
# Error raised when a Lua script fails to execute
#
# @author Mikael Henriksson <mikael@zoolutions.se>
class Conflict < StandardError
def initialize(item)
super("Item with the key: #{item[UNIQUE_DIGEST_KEY]} is already scheduled or processing")
end
end

# Error raised from {OnConflict::Raise}
#
# @author Mikael Henriksson <mikael@zoolutions.se>
class ScriptError < StandardError
# @param [Symbol] file_name the name of the lua script
# @param [Redis::CommandError] ex exception to handle
def initialize(file_name:, source_exception:)
super("Problem compiling #{file_name}. Message: #{source_exception.message}")
end
end

# Error raised from {OptionsWithFallback#lock_class}
#
# @author Mikael Henriksson <mikael@zoolutions.se>
class UnknownLock < StandardError
end
end
37 changes: 35 additions & 2 deletions lib/sidekiq_unique_jobs/lock/base_lock.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,25 @@

module SidekiqUniqueJobs
class Lock
# Abstract base class for locks
#
# @abstract
# @author Mikael Henriksson <mikael@zoolutions.se>
class BaseLock
include SidekiqUniqueJobs::Logging

# @param [Hash] item the Sidekiq job hash
# @param [Proc] callback the callback to use after unlock
# @param [Sidekiq::RedisConnection, ConnectionPool] redis_pool the redis connection
def initialize(item, callback, redis_pool = nil)
@item = prepare_item(item)
@callback = callback
@redis_pool = redis_pool
end

# Handles locking of sidekiq jobs.
# Will call a conflict strategy if lock can't be achieved.
# @return [String] the sidekiq job id
def lock
if (token = locksmith.lock(item[LOCK_TIMEOUT_KEY]))
token
Expand All @@ -19,30 +29,53 @@ def lock
end
end

# Execute the job in the Sidekiq server processor
# @raise [NotImplementedError] needs to be implemented in child class
def execute
raise NotImplementedError, "##{__method__} needs to be implemented in #{self.class}"
end

# Unlocks the job from redis
# @return [String] sidekiq job id when successful
# @return [false] when unsuccessful
def unlock
locksmith.signal(item[JID_KEY]) # Only signal to release the lock
end

# Deletes the job from redis if it is locked.
def delete
locksmith.delete # Soft delete (don't forcefully remove when expiration is set)
end

# Forcefully deletes the job from redis.
# This is good for jobs when a previous lock was not unlocked
def delete!
locksmith.delete! # Force delete the lock
end

# Checks if the item has achieved a lock
# @return [true] when this jid has locked the job
# @return [false] when this jid has not locked the job
def locked?
locksmith.locked?(item[JID_KEY])
end

private

attr_reader :item, :redis_pool, :callback
# The sidekiq job hash
# @return [Hash] the Sidekiq job hash
attr_reader :item

# The sidekiq redis pool
# @return [Sidekiq::RedisConnection, ConnectionPool, NilClass] the redis connection
attr_reader :redis_pool

# The sidekiq job hash
# @return [Proc] the callback to use after unlock
attr_reader :callback

# The interface to the locking mechanism
# @return [SidekiqUniqueJobs::Locksmith]
def locksmith
@locksmith ||= SidekiqUniqueJobs::Locksmith.new(item, redis_pool)
end
Expand Down Expand Up @@ -84,7 +117,7 @@ def callback_safely
end

def strategy
OnConflict.find_strategy(item[ON_CONFLICT_KEY]).new(item)
@strategy ||= OnConflict.find_strategy(item[ON_CONFLICT_KEY]).new(item)
end
end
end
Expand Down
12 changes: 12 additions & 0 deletions lib/sidekiq_unique_jobs/lock/until_and_while_executing.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,19 @@

module SidekiqUniqueJobs
class Lock
# Locks jobs while the job is executing in the server process
# - Locks on perform_in or perform_async (see {UntilExecuting})
# - Unlocks before yielding to the worker's perform method (see {UntilExecuting})
# - Locks before yielding to the worker's perform method (see {WhileExecuting})
# - Unlocks after yielding to the worker's perform method (see {WhileExecuting})
#
# See {#lock} for more information about the client.
# See {#execute} for more information about the server
#
# @author Mikael Henriksson <mikael@zoolutions.se>
class UntilAndWhileExecuting < BaseLock
# Executes in the Sidekiq server process
# @yield to the worker class perform method
def execute
return unless locked?
unlock
Expand Down
7 changes: 7 additions & 0 deletions lib/sidekiq_unique_jobs/lock/until_executed.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,16 @@

module SidekiqUniqueJobs
class Lock
# Locks jobs until the server is done executing the job
# - Locks on perform_in or perform_async
# - Unlocks after yielding to the worker's perform method
#
# @author Mikael Henriksson <mikael@zoolutions.se>
class UntilExecuted < BaseLock
OK ||= 'OK'

# Executes in the Sidekiq server process
# @yield to the worker class perform method
def execute
return unless locked?
with_cleanup { yield }
Expand Down
7 changes: 7 additions & 0 deletions lib/sidekiq_unique_jobs/lock/until_executing.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,14 @@

module SidekiqUniqueJobs
class Lock
# Locks jobs until {#execute} starts
# - Locks on perform_in or perform_async
# - Unlocks after yielding to the worker's perform method
#
# @author Mikael Henriksson <mikael@zoolutions.se>
class UntilExecuting < BaseLock
# Executes in the Sidekiq server process
# @yield to the worker class perform method
def execute
unlock_with_callback
yield
Expand Down
12 changes: 12 additions & 0 deletions lib/sidekiq_unique_jobs/lock/until_expired.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,23 @@

module SidekiqUniqueJobs
class Lock
# Locks jobs until the lock has expired
# - Locks on perform_in or perform_async
# - Unlocks when the expiration is hit
#
# See {#lock} for more information about the client.
# See {#execute} for more information about the server
#
# @author Mikael Henriksson <mikael@zoolutions.se>
class UntilExpired < BaseLock
# Prevents these locks from being unlocked
# @return [true] always returns true
def unlock
true
end

# Executes in the Sidekiq server process
# @yield to the worker class perform method
def execute
return unless locked?
yield
Expand Down
20 changes: 17 additions & 3 deletions lib/sidekiq_unique_jobs/lock/while_executing.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,35 @@

module SidekiqUniqueJobs
class Lock
# Locks jobs while the job is executing in the server process
# - Locks before yielding to the worker's perform method
# - Unlocks after yielding to the worker's perform method
#
# See {#lock} for more information about the client.
# See {#execute} for more information about the server
#
# @author Mikael Henriksson <mikael@zoolutions.se>
class WhileExecuting < BaseLock
RUN_SUFFIX ||= ':RUN'

# @param [Hash] item the Sidekiq job hash
# @param [Proc] callback callback to call after unlock
# @param [Sidekiq::RedisConnection, ConnectionPool] redis_pool the redis connection
def initialize(item, callback, redis_pool = nil)
super(item, callback, redis_pool)
append_unique_key_suffix
end

# Returning true makes sure the client
# can push the job on the queue
# Simulate that a client lock was achieved.
# These locks should only ever be created in the server process.
# @return [true] always returns true
def lock
true
end

# Locks the job with the RUN_SUFFIX appended
# Executes in the Sidekiq server process.
# These jobs are locked in the server process not from the client
# @yield to the worker class perform method
def execute
return strategy.call unless locksmith.lock(item[LOCK_TIMEOUT_KEY])
with_cleanup { yield }
Expand Down
12 changes: 12 additions & 0 deletions lib/sidekiq_unique_jobs/lock/while_executing_reject.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,25 @@

module SidekiqUniqueJobs
class Lock
# Locks jobs while executing
# Locks from the server process
# Unlocks after the server is done processing
#
# See {#lock} for more information about the client.
# See {#execute} for more information about the server
#
# @author Mikael Henriksson <mikael@zoolutions.se>
class WhileExecutingReject < WhileExecuting
# Executes in the Sidekiq server process
# @yield to the worker class perform method
def execute
return strategy.call unless locksmith.lock(item[LOCK_TIMEOUT_KEY])

with_cleanup { yield }
end

# Overridden with a forced {OnConflict::Reject} strategy
# @return [OnConflict::Reject] a reject strategy
def strategy
@strategy ||= OnConflict.find_strategy(:reject).new(item)
end
Expand Down
Loading