From 22cfbc8cdc178d00b669601bc157c665d12714c6 Mon Sep 17 00:00:00 2001 From: Mika Hel Date: Sat, 21 Jul 2018 16:58:42 +0200 Subject: [PATCH] Rename to `unique:` to `lock:` (#295) * Update reek and rubocop configuration * Rename unique: to lock: To better match the lock_expiration and lock_timeout options * Use a strategy for conflict resolution This should scale better and be easier to understand/maintain * Updates README with the latest changes * Add missing exception and strategy method --- .codeclimate.yml | 13 +- .github/ISSUE_TEMPLATE/bug_report.md | 2 +- .reek.yml | 58 ++----- .rubocop.yml | 3 + README.md | 59 ++++++-- examples/another_unique_job.rb | 6 +- .../custom_queue_job_with_filter_method.rb | 2 +- examples/custom_queue_job_with_filter_proc.rb | 2 +- examples/expiring_job.rb | 2 +- examples/inline_worker.rb | 2 +- examples/just_a_worker.rb | 2 +- examples/long_running_job.rb | 6 +- examples/main_job.rb | 5 +- examples/my_unique_job.rb | 9 +- examples/my_unique_job_with_filter_method.rb | 6 +- examples/my_unique_job_with_filter_proc.rb | 6 +- examples/notify_worker.rb | 4 +- examples/simple_worker.rb | 4 +- examples/unique_across_workers_job.rb | 2 +- examples/unique_job_on_conflict_raise.rb | 14 ++ examples/unique_job_on_conflict_reject.rb | 14 ++ examples/unique_job_on_conflict_reschedule.rb | 14 ++ .../unique_job_with_conditional_parameter.rb | 6 +- examples/unique_job_with_filter_method.rb | 7 +- examples/unique_job_with_nil_unique_args.rb | 6 +- .../unique_job_with_no_unique_args_method.rb | 6 +- ...que_job_withthout_unique_args_parameter.rb | 6 +- examples/unique_on_all_queues_job.rb | 2 +- examples/until_and_while_executing_job.rb | 5 +- examples/until_executed_2_job.rb | 10 +- examples/until_executed_job.rb | 10 +- examples/until_executing_job.rb | 2 +- examples/until_expired_job.rb | 2 +- examples/until_global_expired_job.rb | 2 +- examples/while_executing_job.rb | 4 +- examples/while_executing_reject_job.rb | 4 +- examples/without_argument_job.rb | 2 +- lib/sidekiq_unique_jobs.rb | 8 + lib/sidekiq_unique_jobs/constants.rb | 2 + lib/sidekiq_unique_jobs/exceptions.rb | 20 +-- lib/sidekiq_unique_jobs/lock/base_lock.rb | 12 +- .../lock/until_and_while_executing.rb | 4 +- .../lock/until_executed.rb | 2 +- .../lock/until_executing.rb | 2 +- lib/sidekiq_unique_jobs/lock/until_expired.rb | 4 +- .../lock/while_executing.rb | 4 +- .../lock/while_executing_reject.rb | 67 +------- .../lock/while_executing_requeue.rb | 21 --- lib/sidekiq_unique_jobs/locksmith.rb | 11 +- lib/sidekiq_unique_jobs/on_conflict.rb | 24 +++ lib/sidekiq_unique_jobs/on_conflict/log.rb | 20 +++ .../on_conflict/null_strategy.rb | 16 ++ lib/sidekiq_unique_jobs/on_conflict/raise.rb | 17 +++ lib/sidekiq_unique_jobs/on_conflict/reject.rb | 72 +++++++++ .../on_conflict/reschedule.rb | 24 +++ .../on_conflict/strategy.rb | 28 ++++ .../options_with_fallback.rb | 8 +- lib/sidekiq_unique_jobs/timeout/calculator.rb | 4 - lib/sidekiq_unique_jobs/unique_args.rb | 10 +- rails_example/app/workers/simple_worker.rb | 2 +- .../workers/slow_until_executing_worker.rb | 2 +- spec/examples/another_unique_job_spec.rb | 2 +- ...ustom_queue_job_with_filter_method_spec.rb | 2 +- .../custom_queue_job_with_filter_proc_spec.rb | 2 +- spec/examples/expiring_job_spec.rb | 2 +- spec/examples/inline_worker_spec.rb | 2 +- spec/examples/just_a_worker_spec.rb | 2 +- spec/examples/long_running_job_spec.rb | 5 +- spec/examples/main_job_spec.rb | 2 +- spec/examples/my_unique_job_spec.rb | 7 +- .../my_unique_job_with_filter_method_spec.rb | 2 +- .../my_unique_job_with_filter_proc_spec.rb | 2 +- spec/examples/notify_worker_spec.rb | 2 +- spec/examples/simple_worker_spec.rb | 2 +- .../unique_across_workers_job_spec.rb | 4 +- .../unique_job_on_conflict_raise_spec.rb | 20 +++ .../unique_job_on_conflict_reject_spec.rb | 20 +++ .../unique_job_on_conflict_reschedule_spec.rb | 20 +++ .../unique_job_with_nil_unique_args_spec.rb | 2 +- ...que_job_with_no_unique_args_method_spec.rb | 2 +- ..._job_without_unique_args_parameter_spec.rb | 2 +- .../examples/unique_on_all_queues_job_spec.rb | 4 +- .../until_and_while_executing_job_spec.rb | 2 +- spec/examples/until_executed2_job_spec.rb | 6 +- spec/examples/until_executed_job_spec.rb | 2 +- spec/examples/until_executing_job_spec.rb | 2 +- spec/examples/until_expired_job_spec.rb | 4 +- .../examples/until_global_expired_job_spec.rb | 4 +- spec/examples/while_executing_job_spec.rb | 2 +- spec/examples/without_argument_job_spec.rb | 2 +- .../sidekiq_unique_jobs/legacy_lock_spec.rb | 2 +- .../lock/until_and_while_executing_spec.rb | 143 +++--------------- .../lock/until_executed_spec.rb | 73 +++++---- .../lock/until_expired_spec.rb | 57 ++----- .../lock/while_executing_reject_spec.rb | 10 +- .../lock/while_executing_spec.rb | 20 +-- .../until_and_while_executing_spec.rb | 2 +- .../shared_examples/a_lockable_lock.rb | 65 ++++++++ .../lock/base_lock_spec.rb | 10 +- .../lock/until_and_while_executing_spec.rb | 2 +- .../lock/until_executed_spec.rb | 6 +- .../lock/until_executing_spec.rb | 2 +- .../lock/until_expired_spec.rb | 2 +- .../lock/while_executing_reject_spec.rb | 71 +-------- .../lock/while_executing_spec.rb | 2 +- .../on_conflict/log_spec.rb | 22 +++ .../on_conflict/raise_spec.rb | 22 +++ .../on_conflict/reject_spec.rb | 83 ++++++++++ .../on_conflict/reschedule_spec.rb | 21 +++ .../options_with_fallback_spec.rb | 32 ++-- .../sidekiq_unique_ext_spec.rb | 2 +- .../timeout/calculator_spec.rb | 7 - .../sidekiq_unique_jobs/unique_args_spec.rb | 4 +- 113 files changed, 876 insertions(+), 609 deletions(-) create mode 100644 examples/unique_job_on_conflict_raise.rb create mode 100644 examples/unique_job_on_conflict_reject.rb create mode 100644 examples/unique_job_on_conflict_reschedule.rb delete mode 100644 lib/sidekiq_unique_jobs/lock/while_executing_requeue.rb create mode 100644 lib/sidekiq_unique_jobs/on_conflict.rb create mode 100644 lib/sidekiq_unique_jobs/on_conflict/log.rb create mode 100644 lib/sidekiq_unique_jobs/on_conflict/null_strategy.rb create mode 100644 lib/sidekiq_unique_jobs/on_conflict/raise.rb create mode 100644 lib/sidekiq_unique_jobs/on_conflict/reject.rb create mode 100644 lib/sidekiq_unique_jobs/on_conflict/reschedule.rb create mode 100644 lib/sidekiq_unique_jobs/on_conflict/strategy.rb create mode 100644 spec/examples/unique_job_on_conflict_raise_spec.rb create mode 100644 spec/examples/unique_job_on_conflict_reject_spec.rb create mode 100644 spec/examples/unique_job_on_conflict_reschedule_spec.rb create mode 100644 spec/support/shared_examples/a_lockable_lock.rb create mode 100644 spec/unit/sidekiq_unique_jobs/on_conflict/log_spec.rb create mode 100644 spec/unit/sidekiq_unique_jobs/on_conflict/raise_spec.rb create mode 100644 spec/unit/sidekiq_unique_jobs/on_conflict/reject_spec.rb create mode 100644 spec/unit/sidekiq_unique_jobs/on_conflict/reschedule_spec.rb diff --git a/.codeclimate.yml b/.codeclimate.yml index d6b3ba05b..0d4f28a2a 100644 --- a/.codeclimate.yml +++ b/.codeclimate.yml @@ -8,22 +8,21 @@ plugins: config: languages: - ruby - # editorconfig: - # enabled: true - # channel: beta - # config: - # editorconfig: .editorconfig flog: enabled: true fixme: enabled: true markdownlint: - enabled: true + enabled: true reek: enabled: true + config: + file: .reek.yml rubocop: enabled: true - channel: rubocop-0-54 + channel: rubocop-0-57 + config: + file: .rubocop.yml exclude_patterns: - Gemfile - "*.gemspec" diff --git a/.github/ISSUE_TEMPLATE/bug_report.md b/.github/ISSUE_TEMPLATE/bug_report.md index d6d0bf549..c482ff2ce 100644 --- a/.github/ISSUE_TEMPLATE/bug_report.md +++ b/.github/ISSUE_TEMPLATE/bug_report.md @@ -18,7 +18,7 @@ What happens instead of the expected behavior? ```ruby class MyWorker include Sidekiq::Worker - sidekiq_options unique: :until_executed, queue: :undefault + sidekiq_options lock: :until_executed, queue: :undefault def perform(args); end def self.unique_args(args) diff --git a/.reek.yml b/.reek.yml index 16314a9f4..43323b0a6 100644 --- a/.reek.yml +++ b/.reek.yml @@ -11,53 +11,15 @@ detectors: exclude: - Sidekiq#self.use_options IrresponsibleModule: + enabled: false + LongParameterList: + enabled: true exclude: - - Sidekiq::Simulator - - SidekiqUniqueJobs::Cli - - SidekiqUniqueJobs::Client::Middleware - - SidekiqUniqueJobs - - Array - - Hash - - SidekiqUniqueJobs::JidMissing - - SidekiqUniqueJobs::LockTimeout - - SidekiqUniqueJobs::MaxLockTimeMissing - - SidekiqUniqueJobs::RunLockFailed - - SidekiqUniqueJobs::ScriptError - - SidekiqUniqueJobs::UnexpectedValue - - SidekiqUniqueJobs::UniqueKeyMissing - - SidekiqUniqueJobs::UnknownLock - - SidekiqUniqueJobs::Lock::BaseLock - - SidekiqUniqueJobs::Lock::UntilAndWhileExecuting - - SidekiqUniqueJobs::Lock::UntilExecuted - - SidekiqUniqueJobs::Lock::UntilExecuting - - SidekiqUniqueJobs::Lock::UntilExpired - - SidekiqUniqueJobs::Lock::WhileExecuting - - SidekiqUniqueJobs::Lock::WhileExecutingReject - - SidekiqUniqueJobs::Lock::WhileExecutingRequeue - - SidekiqUniqueJobs::Locksmith - - SidekiqUniqueJobs::Middleware - - SidekiqUniqueJobs::Normalizer - - SidekiqUniqueJobs::Scripts - - SidekiqUniqueJobs::Server::Middleware - - Sidekiq::Job - - Sidekiq::Job::UniqueExtension - - Sidekiq::JobSet - - Sidekiq::JobSet::UniqueExtension - - Sidekiq::Queue - - Sidekiq::Queue::UniqueExtension - - Sidekiq::ScheduledSet - - Sidekiq::ScheduledSet::UniqueExtension - - Sidekiq::SortedEntry - - Sidekiq::SortedEntry::UniqueExtension - - Sidekiq - - Sidekiq::Worker - - Sidekiq::Worker::ClassMethods - - Sidekiq::Worker::Overrides - - Sidekiq::Worker::Overrides::Testing - - SidekiqUniqueJobs::Timeout::Calculator - - SidekiqUniqueJobs::Timeout - - SidekiqUniqueJobs::Unlockable - - SidekiqUniqueJobs::Util + - initialize + - Hash#slice + - SidekiqUniqueJobs::Locksmith#create_lock + - SidekiqUniqueJobs::Locksmith#expire_when_necessary + - SidekiqUniqueJobs::Client::Middleware#call TooManyStatements: exclude: - initialize @@ -84,7 +46,7 @@ detectors: exclude: - Hash#slice - Hash#slice! - - SidekiqUniqueJobs::Lock::WhileExecutingReject#deadset_kill? + - SidekiqUniqueJobs::OnConflict::Reject#deadset_kill? - SidekiqUniqueJobs::SidekiqWorkerMethods#worker_method_defined? MissingSafeMethod: exclude: @@ -93,7 +55,7 @@ detectors: enabled: false FeatureEnvy: exclude: - - SidekiqUniqueJobs::Lock::WhileExecutingReject#push_to_deadset + - SidekiqUniqueJobs::OnConflict::Reject#push_to_deadset - SidekiqUniqueJobs::Logging#debug_item - SidekiqUniqueJobs::Util#batch_delete NestedIterators: diff --git a/.rubocop.yml b/.rubocop.yml index b236f2359..e38253fd7 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -64,6 +64,9 @@ RSpec/ExampleLength: RSpec/ExpectActual: Enabled: false +RSpec/ExpectChange: + Enabled: false + RSpec/ExpectInHook: Enabled: false diff --git a/README.md b/README.md index 68ac3388c..ab0799f40 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,7 @@ * [Documentation](#documentation) * [Requirements](#requirements) * [Installation](#installation) +* [Support Me](#support-me) * [General Information](#general-information) * [Options](#options) * [Lock Expiration](#lock-expiration) @@ -18,6 +19,7 @@ * [Until Timeout](#until-timeout) * [Unique Until And While Executing](#unique-until-and-while-executing) * [While Executing](#while-executing) +* [Conflict Strategy](#conflict-strategy) * [Usage](#usage) * [Finer Control over Uniqueness](#finer-control-over-uniqueness) * [After Unlock Callback](#after-unlock-callback) @@ -65,6 +67,11 @@ Or install it yourself as: $ gem install sidekiq-unique-jobs + +## Support Me + +Want to show me some ❤️ for the hard work I do on this gem? You can use the following PayPal link https://paypal.me/mhenrixon. Any amount is welcome and let me tell you it feels good to be appreciated. Even a dollar makes me super excited about all of this. + ## General Information See [Interaction w/ Sidekiq](https://github.com/mhenrixon/sidekiq-unique-jobs/wiki/How-this-gem-interacts-with-Sidekiq) on how the gem interacts with Sidekiq. @@ -150,7 +157,7 @@ Locks from when the client pushes the job to the queue. Will be unlocked before **NOTE** this is probably not so good for jobs that shouldn't be running simultaneously (aka slow jobs). ```ruby -sidekiq_options unique: :until_executing +sidekiq_options lock: :until_executing ``` ### Until Executed @@ -158,7 +165,7 @@ sidekiq_options unique: :until_executing Locks from when the client pushes the job to the queue. Will be unlocked when the server has successfully processed the job. ```ruby -sidekiq_options unique: :until_executed +sidekiq_options lock: :until_executed ``` ### Until Timeout @@ -166,7 +173,7 @@ sidekiq_options unique: :until_executed Locks from when the client pushes the job to the queue. Will be unlocked when the specified timeout has been reached. ```ruby -sidekiq_options unique: :until_expired +sidekiq_options lock: :until_expired ``` ### Unique Until And While Executing @@ -174,7 +181,7 @@ sidekiq_options unique: :until_expired Locks when the client pushes the job to the queue. The queue will be unlocked when the server starts processing the job. The server then goes on to creating a runtime lock for the job to prevent simultaneous jobs from being executed. As soon as the server starts processing a job, the client can push the same job to the queue. ```ruby -sidekiq_options unique: :until_and_while_executing +sidekiq_options lock: :until_and_while_executing ``` ### While Executing @@ -184,7 +191,7 @@ With this lock type it is possible to put any number of these jobs on the queue, **NOTE** Unless this job is configured with a `lock_timeout: nil` or `lock_timeout: > 0` then all jobs that are attempted to be executed will just be dropped without waiting. ```ruby -sidekiq_options unique: :while_executing, lock_timeout: nil +sidekiq_options lock: :while_executing, lock_timeout: nil ``` There is an example of this to try it out in the `rails_example` application. Run `foreman start` in the root of the directory and open the url: `localhost:5000/work/duplicate_while_executing`. @@ -206,12 +213,42 @@ In the console you should see something like: 10:33:04 worker.1 | 2017-04-23T08:33:04.973Z 84404 TID-ougq8cs8s WhileExecutingWorker JID-9e197460c067b22eb1b5d07f INFO: done: 40.014 sec ``` +## Conflict Strategy + +Decides how we handle conflict. We can either reject the job to the dead queue or reschedule it. Both are useful for jobs that absolutely need to run and have been configured to use the lock `WhileExecuting` that is used only by the sidekiq server process. + +The last one is log which can be be used with the lock `UntilExecuted` and `UntilExpired`. Now we write a log entry saying the job could not be pushed because it is a duplicate of another job with the same arguments + +### Raise + +This strategy is intended to be used with `WhileExecuting`. Basically it will allow us to let the server process crash with a specific error message and be retried without messing up the Sidekiq stats. + +`sidekiq_options lock: :while_executing, on_conflict: :raise, retry: 10` + +### Reject + +This strategy is intended to be used with `WhileExecuting` and will push the job to the dead queue on conflict. + +`sidekiq_options lock: :while_executing, on_conflict: :reject` + +### Reschedule + +This strategy is intended to be used with `WhileExecuting` and will delay the job to be tried again in 5 seconds. This will mess up the sidekiq stats but will prevent exceptions from being logged and confuse your sysadmins. + +`sidekiq_options lock: :while_executing, on_conflict: :reschedule` + +### Log + +This strategy is intended to be used with `UntilExecuted` and `UntilExpired`. It will log a line about that this is job is a duplicate of another. + +`sidekiq_options lock: :until_executed, on_conflict: :log` + ## Usage All that is required is that you specifically set the sidekiq option for _unique_ to a valid value like below: ```ruby -sidekiq_options unique: :while_executing +sidekiq_options lock: :while_executing ``` Requiring the gem in your gemfile should be sufficient to enable unique jobs. @@ -227,7 +264,7 @@ The method or the proc can return a modified version of args without the transie ```ruby class UniqueJobWithFilterMethod include Sidekiq::Worker - sidekiq_options unique: :until_and_while_executing, + sidekiq_options lock: :until_and_while_executing, unique_args: :unique_args # this is default and will be used if such a method is defined def self.unique_args(args) @@ -240,7 +277,7 @@ end class UniqueJobWithFilterProc include Sidekiq::Worker - sidekiq_options unique: :until_executed, + sidekiq_options lock: :until_executed, unique_args: ->(args) { [ args.first ] } ... @@ -253,7 +290,7 @@ It is also quite possible to ensure different types of unique args based on cont ```ruby class UniqueJobWithFilterMethod include Sidekiq::Worker - sidekiq_options unique: :until_and_while_executing, unique_args: :unique_args + sidekiq_options lock: :until_and_while_executing, unique_args: :unique_args def self.unique_args(args) if Sidekiq::ProcessSet.new.size > 1 @@ -277,7 +314,7 @@ If you need to perform any additional work after the lock has been released you ```ruby class UniqueJobWithFilterMethod include Sidekiq::Worker - sidekiq_options unique: :while_executing, + sidekiq_options lock: :while_executing, def after_unlock # block has yielded and lock is released @@ -293,7 +330,7 @@ To see logging in sidekiq when duplicate payload has been filtered out you can e ```ruby class UniqueJobWithFilterMethod include Sidekiq::Worker - sidekiq_options unique: :while_executing, + sidekiq_options lock: :while_executing, log_duplicate_payload: true ... diff --git a/examples/another_unique_job.rb b/examples/another_unique_job.rb index 36b017acc..64f4a44a9 100644 --- a/examples/another_unique_job.rb +++ b/examples/another_unique_job.rb @@ -4,8 +4,10 @@ class AnotherUniqueJob include Sidekiq::Worker - sidekiq_options queue: :working2, retry: 1, backtrace: 10, - unique: :until_executed + sidekiq_options backtrace: 10, + lock: :until_executed, + queue: :working2, + retry: 1 def perform(args) args diff --git a/examples/custom_queue_job_with_filter_method.rb b/examples/custom_queue_job_with_filter_method.rb index 79dc78dfd..d7b9d0e13 100644 --- a/examples/custom_queue_job_with_filter_method.rb +++ b/examples/custom_queue_job_with_filter_method.rb @@ -5,7 +5,7 @@ require_relative 'custom_queue_job' class CustomQueueJobWithFilterMethod < CustomQueueJob - sidekiq_options unique: :until_executed, unique_args: :args_filter + sidekiq_options lock: :until_executed, unique_args: :args_filter def self.args_filter(args) args.first diff --git a/examples/custom_queue_job_with_filter_proc.rb b/examples/custom_queue_job_with_filter_proc.rb index 36cbd2bdb..01d13b761 100644 --- a/examples/custom_queue_job_with_filter_proc.rb +++ b/examples/custom_queue_job_with_filter_proc.rb @@ -7,7 +7,7 @@ class CustomQueueJobWithFilterProc < CustomQueueJob # slightly contrived example of munging args to the # worker and removing a random bit. - sidekiq_options unique: :until_expired, + sidekiq_options lock: :until_expired, unique_args: (lambda do |args| options = args.extract_options! options.delete('random') diff --git a/examples/expiring_job.rb b/examples/expiring_job.rb index 7218f49db..24d907428 100644 --- a/examples/expiring_job.rb +++ b/examples/expiring_job.rb @@ -4,7 +4,7 @@ class ExpiringJob include Sidekiq::Worker - sidekiq_options unique: :until_executed, lock_expiration: 10 * 60 + sidekiq_options lock: :until_executed, lock_expiration: 10 * 60 def perform(one, two) [one, two] diff --git a/examples/inline_worker.rb b/examples/inline_worker.rb index c801d2ba7..9f4385ec6 100644 --- a/examples/inline_worker.rb +++ b/examples/inline_worker.rb @@ -4,7 +4,7 @@ class InlineWorker include Sidekiq::Worker - sidekiq_options unique: :while_executing, lock_timeout: 5 + sidekiq_options lock: :while_executing, lock_timeout: 5 def perform(one) TestClass.run(one) diff --git a/examples/just_a_worker.rb b/examples/just_a_worker.rb index 36fd63137..c6da08b7f 100644 --- a/examples/just_a_worker.rb +++ b/examples/just_a_worker.rb @@ -5,7 +5,7 @@ class JustAWorker include Sidekiq::Worker - sidekiq_options unique: :until_executed, queue: :testqueue + sidekiq_options lock: :until_executed, queue: :testqueue def perform(options = {}) options diff --git a/examples/long_running_job.rb b/examples/long_running_job.rb index feb741fcf..991cc838f 100644 --- a/examples/long_running_job.rb +++ b/examples/long_running_job.rb @@ -4,8 +4,10 @@ class LongRunningJob include Sidekiq::Worker - sidekiq_options queue: :customqueue, retry: true, unique: :until_and_while_executing, - lock_expiration: 7_200, retry_count: 10 + sidekiq_options lock: :until_and_while_executing, + lock_expiration: 7_200, + queue: :customqueue, + retry: 10 def perform(one, two) [one, two] end diff --git a/examples/main_job.rb b/examples/main_job.rb index 16beb7dc1..d4f0026a1 100644 --- a/examples/main_job.rb +++ b/examples/main_job.rb @@ -4,8 +4,9 @@ class MainJob include Sidekiq::Worker - sidekiq_options queue: :customqueue, unique: :until_executed, - log_duplicate_payload: true + sidekiq_options lock: :until_executed, + log_duplicate_payload: true, + queue: :customqueue def perform(arg) [arg] diff --git a/examples/my_unique_job.rb b/examples/my_unique_job.rb index 2061b8218..002aa2c45 100644 --- a/examples/my_unique_job.rb +++ b/examples/my_unique_job.rb @@ -4,11 +4,10 @@ class MyUniqueJob include Sidekiq::Worker - sidekiq_options queue: :customqueue, - retry: true, - retry_count: 10, - unique: :until_executed, - lock_expiration: 7_200 + sidekiq_options lock: :until_executed, + lock_expiration: 7_200, + queue: :customqueue, + retry: 10 def perform(one, two) [one, two] diff --git a/examples/my_unique_job_with_filter_method.rb b/examples/my_unique_job_with_filter_method.rb index 1d14e85cb..1822475d5 100644 --- a/examples/my_unique_job_with_filter_method.rb +++ b/examples/my_unique_job_with_filter_method.rb @@ -4,10 +4,10 @@ class MyUniqueJobWithFilterMethod include Sidekiq::Worker - sidekiq_options queue: :customqueue, + sidekiq_options backtrace: true, + lock: :until_executed, + queue: :customqueue, retry: true, - backtrace: true, - unique: :until_executed, unique_args: :filtered_args def perform(*) diff --git a/examples/my_unique_job_with_filter_proc.rb b/examples/my_unique_job_with_filter_proc.rb index 97ff8b3b9..a424330ac 100644 --- a/examples/my_unique_job_with_filter_proc.rb +++ b/examples/my_unique_job_with_filter_proc.rb @@ -4,10 +4,10 @@ class MyUniqueJobWithFilterProc include Sidekiq::Worker - sidekiq_options queue: :customqueue, + sidekiq_options backtrace: true, + lock: :until_executed, + queue: :customqueue, retry: true, - backtrace: true, - unique: :until_executed, unique_args: (lambda do |args| options = args.extract_options! [args.first, options['type']] diff --git a/examples/notify_worker.rb b/examples/notify_worker.rb index 00164697f..8ed8f65a0 100644 --- a/examples/notify_worker.rb +++ b/examples/notify_worker.rb @@ -5,8 +5,8 @@ class NotifyWorker include Sidekiq::Worker - sidekiq_options queue: :notify_worker, - unique: :until_executed + sidekiq_options lock: :until_executed, + queue: :notify_worker def perform(pid, blob) [pid, blob] diff --git a/examples/simple_worker.rb b/examples/simple_worker.rb index 465368ee0..8f0db96a0 100644 --- a/examples/simple_worker.rb +++ b/examples/simple_worker.rb @@ -4,8 +4,8 @@ class SimpleWorker include Sidekiq::Worker - sidekiq_options queue: :default, - unique: :until_executed, + sidekiq_options lock: :until_executed, + queue: :default, unique_args: ->(args) { [args.first] } def perform(args) diff --git a/examples/unique_across_workers_job.rb b/examples/unique_across_workers_job.rb index 674de6edc..b81e5e53d 100644 --- a/examples/unique_across_workers_job.rb +++ b/examples/unique_across_workers_job.rb @@ -12,7 +12,7 @@ # - https://github.com/mhenrixon/sidekiq-unique-jobs/blob/master/spec/lib/sidekiq_unique_jobs/unique_args_spec.rb class UniqueAcrossWorkersJob include Sidekiq::Worker - sidekiq_options unique: :until_executed, unique_across_workers: true + sidekiq_options lock: :until_executed, unique_across_workers: true def perform(one, two) [one, two] diff --git a/examples/unique_job_on_conflict_raise.rb b/examples/unique_job_on_conflict_raise.rb new file mode 100644 index 000000000..a23993d0d --- /dev/null +++ b/examples/unique_job_on_conflict_raise.rb @@ -0,0 +1,14 @@ +# frozen_string_literal: true + +# :nocov: + +class UniqueJobOnConflictRaise + include Sidekiq::Worker + sidekiq_options lock: :while_executing, + queue: :customqueue, + on_conflict: :raise + + def perform(one, two) + [one, two] + end +end diff --git a/examples/unique_job_on_conflict_reject.rb b/examples/unique_job_on_conflict_reject.rb new file mode 100644 index 000000000..2bdf4d686 --- /dev/null +++ b/examples/unique_job_on_conflict_reject.rb @@ -0,0 +1,14 @@ +# frozen_string_literal: true + +# :nocov: + +class UniqueJobOnConflictReject + include Sidekiq::Worker + sidekiq_options lock: :while_executing, + queue: :customqueue, + on_conflict: :reject + + def perform(one, two) + [one, two] + end +end diff --git a/examples/unique_job_on_conflict_reschedule.rb b/examples/unique_job_on_conflict_reschedule.rb new file mode 100644 index 000000000..afe54da60 --- /dev/null +++ b/examples/unique_job_on_conflict_reschedule.rb @@ -0,0 +1,14 @@ +# frozen_string_literal: true + +# :nocov: + +class UniqueJobOnConflictReschedule + include Sidekiq::Worker + sidekiq_options lock: :while_executing, + queue: :customqueue, + on_conflict: :reschedule + + def perform(one, two) + [one, two] + end +end diff --git a/examples/unique_job_with_conditional_parameter.rb b/examples/unique_job_with_conditional_parameter.rb index 11a15da93..becbfda7d 100644 --- a/examples/unique_job_with_conditional_parameter.rb +++ b/examples/unique_job_with_conditional_parameter.rb @@ -4,10 +4,10 @@ class UniqueJobWithoutUniqueArgsParameter include Sidekiq::Worker - sidekiq_options queue: :customqueue, + sidekiq_options backtrace: true, + lock: :until_executed, + queue: :customqueue, retry: true, - backtrace: true, - unique: :until_executed, unique_args: :unique_args def perform(conditional = nil) diff --git a/examples/unique_job_with_filter_method.rb b/examples/unique_job_with_filter_method.rb index 8ece805dd..af2e8a16e 100644 --- a/examples/unique_job_with_filter_method.rb +++ b/examples/unique_job_with_filter_method.rb @@ -4,8 +4,11 @@ class UniqueJobWithFilterMethod include Sidekiq::Worker - sidekiq_options queue: :customqueue, retry: 1, backtrace: 10, - unique: :while_executing, unique_args: :filtered_args + sidekiq_options backtrace: 10, + lock: :while_executing, + queue: :customqueue, + retry: 1, + unique_args: :filtered_args def perform(*) # NO-OP diff --git a/examples/unique_job_with_nil_unique_args.rb b/examples/unique_job_with_nil_unique_args.rb index c1f1842ad..2f92b5968 100644 --- a/examples/unique_job_with_nil_unique_args.rb +++ b/examples/unique_job_with_nil_unique_args.rb @@ -4,10 +4,10 @@ class UniqueJobWithNilUniqueArgs include Sidekiq::Worker - sidekiq_options queue: :customqueue, + sidekiq_options backtrace: true, + lock: :until_executed, + queue: :customqueue, retry: true, - backtrace: true, - unique: :until_executed, unique_args: :unique_args def perform(args) diff --git a/examples/unique_job_with_no_unique_args_method.rb b/examples/unique_job_with_no_unique_args_method.rb index 3b71d6918..77109efef 100644 --- a/examples/unique_job_with_no_unique_args_method.rb +++ b/examples/unique_job_with_no_unique_args_method.rb @@ -4,10 +4,10 @@ class UniqueJobWithNoUniqueArgsMethod include Sidekiq::Worker - sidekiq_options queue: :customqueue, + sidekiq_options backtrace: true, + lock: :until_executed, + queue: :customqueue, retry: true, - backtrace: true, - unique: :until_executed, unique_args: :filtered_args def perform(one, two) diff --git a/examples/unique_job_withthout_unique_args_parameter.rb b/examples/unique_job_withthout_unique_args_parameter.rb index 08651e32a..ad670b263 100644 --- a/examples/unique_job_withthout_unique_args_parameter.rb +++ b/examples/unique_job_withthout_unique_args_parameter.rb @@ -4,10 +4,10 @@ class UniqueJobWithoutUniqueArgsParameter include Sidekiq::Worker - sidekiq_options queue: :customqueue, + sidekiq_options backtrace: true, + lock: :until_executed, + queue: :customqueue, retry: true, - backtrace: true, - unique: :until_executed, unique_args: :unique_args def perform(optional = true) diff --git a/examples/unique_on_all_queues_job.rb b/examples/unique_on_all_queues_job.rb index 60a09eeb8..dc86dd1c7 100644 --- a/examples/unique_on_all_queues_job.rb +++ b/examples/unique_on_all_queues_job.rb @@ -8,7 +8,7 @@ # queue removed it won't work. class UniqueOnAllQueuesJob include Sidekiq::Worker - sidekiq_options unique: :until_executed, unique_on_all_queues: true + sidekiq_options lock: :until_executed, unique_on_all_queues: true def perform(one, two, three = nil) [one, two, three] diff --git a/examples/until_and_while_executing_job.rb b/examples/until_and_while_executing_job.rb index 7917e389a..5b3d4ddb0 100644 --- a/examples/until_and_while_executing_job.rb +++ b/examples/until_and_while_executing_job.rb @@ -5,7 +5,10 @@ class UntilAndWhileExecutingJob include Sidekiq::Worker - sidekiq_options queue: :working, unique: :until_and_while_executing, lock_timeout: 0, lock_expiration: nil + sidekiq_options lock: :until_and_while_executing, + lock_expiration: nil, + lock_timeout: 0, + queue: :working def perform(sleepy_time) sleep(sleepy_time) diff --git a/examples/until_executed_2_job.rb b/examples/until_executed_2_job.rb index 85c60d6f9..289bc5484 100644 --- a/examples/until_executed_2_job.rb +++ b/examples/until_executed_2_job.rb @@ -8,11 +8,11 @@ # class UntilExecuted2Job include Sidekiq::Worker - sidekiq_options queue: :working, - retry: 1, - backtrace: 10, - unique: :until_executed, - lock_timeout: 0 + sidekiq_options backtrace: 10, + lock: :until_executed, + lock_timeout: 0, + queue: :working, + retry: 1 def perform(one, two) [one, two] diff --git a/examples/until_executed_job.rb b/examples/until_executed_job.rb index 32ec988f8..93249862d 100644 --- a/examples/until_executed_job.rb +++ b/examples/until_executed_job.rb @@ -8,12 +8,12 @@ # class UntilExecutedJob include Sidekiq::Worker - sidekiq_options queue: :working, - retry: 1, - backtrace: 10, - unique: :until_executed, + sidekiq_options backtrace: 10, + lock: :until_executed, + lock_expiration: 5_000, lock_timeout: 0, - lock_expiration: 5000 + queue: :working, + retry: 1 def perform(one, two = nil) [one, two] diff --git a/examples/until_executing_job.rb b/examples/until_executing_job.rb index 72eabddb0..9eb163456 100644 --- a/examples/until_executing_job.rb +++ b/examples/until_executing_job.rb @@ -5,7 +5,7 @@ class UntilExecutingJob include Sidekiq::Worker - sidekiq_options queue: :working, unique: :until_executing + sidekiq_options lock: :until_executing, queue: :working def perform; end end diff --git a/examples/until_expired_job.rb b/examples/until_expired_job.rb index 85b32b34a..61baa8468 100644 --- a/examples/until_expired_job.rb +++ b/examples/until_expired_job.rb @@ -4,7 +4,7 @@ class UntilExpiredJob include Sidekiq::Worker - sidekiq_options unique: :until_expired, lock_expiration: 1, lock_timeout: 0 + sidekiq_options lock: :until_expired, lock_expiration: 1, lock_timeout: 0 def perform(one) TestClass.run(one) diff --git a/examples/until_global_expired_job.rb b/examples/until_global_expired_job.rb index c7f2aa106..d388764e5 100644 --- a/examples/until_global_expired_job.rb +++ b/examples/until_global_expired_job.rb @@ -4,7 +4,7 @@ class UntilGlobalExpiredJob include Sidekiq::Worker - sidekiq_options unique: :until_expired + sidekiq_options lock: :until_expired def perform(arg) TestClass.run(arg) diff --git a/examples/while_executing_job.rb b/examples/while_executing_job.rb index 9dbee4296..896527b13 100644 --- a/examples/while_executing_job.rb +++ b/examples/while_executing_job.rb @@ -5,9 +5,9 @@ class WhileExecutingJob include Sidekiq::Worker sidekiq_options backtrace: 10, + lock: :while_executing, queue: :working, - retry: 1, - unique: :while_executing + retry: 1 def perform(args) [args] diff --git a/examples/while_executing_reject_job.rb b/examples/while_executing_reject_job.rb index 7b246b56a..d36610176 100644 --- a/examples/while_executing_reject_job.rb +++ b/examples/while_executing_reject_job.rb @@ -4,8 +4,8 @@ class WhileExecutingRejectJob include Sidekiq::Worker - sidekiq_options queue: :rejecting, - unique: :while_executing_reject + sidekiq_options lock: :while_executing_reject, + queue: :rejecting def perform(args) sleep 5 diff --git a/examples/without_argument_job.rb b/examples/without_argument_job.rb index 8bde947f5..5adfa98cc 100644 --- a/examples/without_argument_job.rb +++ b/examples/without_argument_job.rb @@ -4,7 +4,7 @@ class WithoutArgumentJob include Sidekiq::Worker - sidekiq_options unique: :until_executed, + sidekiq_options lock: :until_executed, log_duplicate_payload: true def perform diff --git a/lib/sidekiq_unique_jobs.rb b/lib/sidekiq_unique_jobs.rb index 8d9fdd08f..210bc91df 100644 --- a/lib/sidekiq_unique_jobs.rb +++ b/lib/sidekiq_unique_jobs.rb @@ -19,9 +19,17 @@ require 'sidekiq_unique_jobs/unique_args' require 'sidekiq_unique_jobs/unlockable' require 'sidekiq_unique_jobs/locksmith' +require 'sidekiq_unique_jobs/lock/base_lock' +require 'sidekiq_unique_jobs/lock/until_executed' +require 'sidekiq_unique_jobs/lock/until_executing' +require 'sidekiq_unique_jobs/lock/until_expired' +require 'sidekiq_unique_jobs/lock/while_executing' +require 'sidekiq_unique_jobs/lock/while_executing_reject' +require 'sidekiq_unique_jobs/lock/until_and_while_executing' require 'sidekiq_unique_jobs/options_with_fallback' require 'sidekiq_unique_jobs/middleware' require 'sidekiq_unique_jobs/sidekiq_unique_ext' +require 'sidekiq_unique_jobs/on_conflict' module SidekiqUniqueJobs include SidekiqUniqueJobs::Connection diff --git a/lib/sidekiq_unique_jobs/constants.rb b/lib/sidekiq_unique_jobs/constants.rb index b312f3000..7dbb7b22b 100644 --- a/lib/sidekiq_unique_jobs/constants.rb +++ b/lib/sidekiq_unique_jobs/constants.rb @@ -14,6 +14,8 @@ module SidekiqUniqueJobs UNIQUE_ARGS_KEY ||= 'unique_args' UNIQUE_DIGEST_KEY ||= 'unique_digest' UNIQUE_KEY ||= 'unique' + LOCK_KEY ||= 'lock' + ON_CONFLICT_KEY ||= 'on_conflict' UNIQUE_ON_ALL_QUEUES_KEY ||= 'unique_on_all_queues' # TODO: Remove in v6.1 UNIQUE_PREFIX_KEY ||= 'unique_prefix' end diff --git a/lib/sidekiq_unique_jobs/exceptions.rb b/lib/sidekiq_unique_jobs/exceptions.rb index 8aec94777..4c63b536d 100644 --- a/lib/sidekiq_unique_jobs/exceptions.rb +++ b/lib/sidekiq_unique_jobs/exceptions.rb @@ -1,10 +1,10 @@ # frozen_string_literal: true module SidekiqUniqueJobs - class LockTimeout < StandardError - end - - class RunLockFailed < StandardError + class Conflict < StandardError + def initialize(item) + super("Item with the key: #{item[UNIQUE_DIGEST_KEY]} is already scheduled or processing") + end end class ScriptError < StandardError @@ -13,18 +13,6 @@ def initialize(file_name:, source_exception:) end end - class UniqueKeyMissing < ArgumentError - end - - class JidMissing < ArgumentError - end - - class MaxLockTimeMissing < ArgumentError - end - - class UnexpectedValue < StandardError - end - class UnknownLock < StandardError end end diff --git a/lib/sidekiq_unique_jobs/lock/base_lock.rb b/lib/sidekiq_unique_jobs/lock/base_lock.rb index 3d678efed..b21997c21 100644 --- a/lib/sidekiq_unique_jobs/lock/base_lock.rb +++ b/lib/sidekiq_unique_jobs/lock/base_lock.rb @@ -12,7 +12,11 @@ def initialize(item, callback, redis_pool = nil) end def lock - locksmith.lock(item[LOCK_TIMEOUT_KEY]) + if (token = locksmith.lock(item[LOCK_TIMEOUT_KEY])) + token + else + strategy.call + end end def execute @@ -75,9 +79,13 @@ def unlock_with_callback def callback_safely callback&.call rescue StandardError - log_warn("The lock for #{item[UNIQUE_DIGEST_KEY]} has been released but the #after_unlock callback failed!") + log_warn("The unique_key: #{item[UNIQUE_DIGEST_KEY]} has been unlocked but the #after_unlock callback failed!") raise end + + def strategy + OnConflict.find_strategy(item[ON_CONFLICT_KEY]).new(item) + end end end end diff --git a/lib/sidekiq_unique_jobs/lock/until_and_while_executing.rb b/lib/sidekiq_unique_jobs/lock/until_and_while_executing.rb index 8c1388646..e4bc3d248 100644 --- a/lib/sidekiq_unique_jobs/lock/until_and_while_executing.rb +++ b/lib/sidekiq_unique_jobs/lock/until_and_while_executing.rb @@ -7,9 +7,7 @@ def execute return unless locked? unlock - runtime_lock.execute do - yield if block_given? - end + runtime_lock.execute { yield } end def runtime_lock diff --git a/lib/sidekiq_unique_jobs/lock/until_executed.rb b/lib/sidekiq_unique_jobs/lock/until_executed.rb index 878f032a3..542383f8c 100644 --- a/lib/sidekiq_unique_jobs/lock/until_executed.rb +++ b/lib/sidekiq_unique_jobs/lock/until_executed.rb @@ -7,7 +7,7 @@ class UntilExecuted < BaseLock def execute return unless locked? - with_cleanup { yield if block_given? } + with_cleanup { yield } end end end diff --git a/lib/sidekiq_unique_jobs/lock/until_executing.rb b/lib/sidekiq_unique_jobs/lock/until_executing.rb index 9aec613b1..a895798b6 100644 --- a/lib/sidekiq_unique_jobs/lock/until_executing.rb +++ b/lib/sidekiq_unique_jobs/lock/until_executing.rb @@ -5,7 +5,7 @@ class Lock class UntilExecuting < BaseLock def execute unlock_with_callback - yield if block_given? + yield end end end diff --git a/lib/sidekiq_unique_jobs/lock/until_expired.rb b/lib/sidekiq_unique_jobs/lock/until_expired.rb index 26fee2a78..219332ffb 100644 --- a/lib/sidekiq_unique_jobs/lock/until_expired.rb +++ b/lib/sidekiq_unique_jobs/lock/until_expired.rb @@ -9,8 +9,8 @@ def unlock def execute return unless locked? - yield if block_given? - # this lock does not handle after_unlock since we don't know when that would + yield + # this lock does not handle after_unlock since we don't know when that would happen end end end diff --git a/lib/sidekiq_unique_jobs/lock/while_executing.rb b/lib/sidekiq_unique_jobs/lock/while_executing.rb index 0e18267d8..16cfd2434 100644 --- a/lib/sidekiq_unique_jobs/lock/while_executing.rb +++ b/lib/sidekiq_unique_jobs/lock/while_executing.rb @@ -18,8 +18,8 @@ def lock # Locks the job with the RUN_SUFFIX appended def execute - return unless locksmith.lock(item[LOCK_TIMEOUT_KEY]) - with_cleanup { yield if block_given? } + return strategy.call unless locksmith.lock(item[LOCK_TIMEOUT_KEY]) + with_cleanup { yield } end private diff --git a/lib/sidekiq_unique_jobs/lock/while_executing_reject.rb b/lib/sidekiq_unique_jobs/lock/while_executing_reject.rb index 4cf273809..aa5e42ae2 100644 --- a/lib/sidekiq_unique_jobs/lock/while_executing_reject.rb +++ b/lib/sidekiq_unique_jobs/lock/while_executing_reject.rb @@ -4,72 +4,13 @@ module SidekiqUniqueJobs class Lock class WhileExecutingReject < WhileExecuting def execute - return reject unless locksmith.lock(item[LOCK_TIMEOUT_KEY]) + return strategy.call unless locksmith.lock(item[LOCK_TIMEOUT_KEY]) - with_cleanup { yield if block_given? } + with_cleanup { yield } end - # Private below here, keeping public due to testing reasons - - def reject - log_debug { "Rejecting job with jid: #{item[JID_KEY]} already running" } - send_to_deadset - end - - def send_to_deadset - log_info { "Adding dead #{item[CLASS_KEY]} job #{item[JID_KEY]}" } - - if deadset_kill? - deadset_kill - else - push_to_deadset - end - end - - def deadset_kill? - deadset.respond_to?(:kill) - end - - def deadset_kill - if kill_with_options? - kill_job_with_options - else - kill_job_without_options - end - end - - def kill_with_options? - Sidekiq::DeadSet.instance_method(:kill).arity > 1 - end - - def kill_job_without_options - deadset.kill(payload) - end - - def kill_job_with_options - deadset.kill(payload, notify_failure: false) - end - - def deadset - @deadset ||= Sidekiq::DeadSet.new - end - - def push_to_deadset - Sidekiq.redis do |conn| - conn.multi do - conn.zadd('dead', current_time, payload) - conn.zremrangebyscore('dead', '-inf', current_time - Sidekiq::DeadSet.timeout) - conn.zremrangebyrank('dead', 0, -Sidekiq::DeadSet.max_jobs) - end - end - end - - def current_time - @current_time ||= Time.now.to_f - end - - def payload - @payload ||= Sidekiq.dump_json(item) + def strategy + @strategy ||= OnConflict.find_strategy(:reject).new(item) end end end diff --git a/lib/sidekiq_unique_jobs/lock/while_executing_requeue.rb b/lib/sidekiq_unique_jobs/lock/while_executing_requeue.rb deleted file mode 100644 index 93a7162bc..000000000 --- a/lib/sidekiq_unique_jobs/lock/while_executing_requeue.rb +++ /dev/null @@ -1,21 +0,0 @@ -# frozen_string_literal: true - -module SidekiqUniqueJobs - class Lock - class WhileExecutingRequeue < WhileExecuting - def lock - true - end - - def execute - locksmith.lock(item[LOCK_TIMEOUT_KEY], raise: true) do - yield if block_given? - end - - unlock - - Sidekiq::Client.push(item) unless locksmith.locked? - end - end - end -end diff --git a/lib/sidekiq_unique_jobs/locksmith.rb b/lib/sidekiq_unique_jobs/locksmith.rb index 21b90189a..89c80ea2c 100644 --- a/lib/sidekiq_unique_jobs/locksmith.rb +++ b/lib/sidekiq_unique_jobs/locksmith.rb @@ -35,7 +35,7 @@ def available_count end def delete - return unless expiration.nil? + return if expiration delete! end @@ -140,12 +140,3 @@ def redis_time end end end - -require 'sidekiq_unique_jobs/lock/base_lock' -require 'sidekiq_unique_jobs/lock/until_executed' -require 'sidekiq_unique_jobs/lock/until_executing' -require 'sidekiq_unique_jobs/lock/until_expired' -require 'sidekiq_unique_jobs/lock/while_executing' -require 'sidekiq_unique_jobs/lock/while_executing_reject' -require 'sidekiq_unique_jobs/lock/while_executing_requeue' -require 'sidekiq_unique_jobs/lock/until_and_while_executing' diff --git a/lib/sidekiq_unique_jobs/on_conflict.rb b/lib/sidekiq_unique_jobs/on_conflict.rb new file mode 100644 index 000000000..83b0f3c1b --- /dev/null +++ b/lib/sidekiq_unique_jobs/on_conflict.rb @@ -0,0 +1,24 @@ +# frozen_string_literal: true + +require_relative 'on_conflict/strategy' +require_relative 'on_conflict/null_strategy' +require_relative 'on_conflict/log' +require_relative 'on_conflict/raise' +require_relative 'on_conflict/reject' +require_relative 'on_conflict/reschedule' + +module SidekiqUniqueJobs + module OnConflict + STRATEGIES = { + log: OnConflict::Log, + raise: OnConflict::Raise, + reject: OnConflict::Reject, + reschedule: OnConflict::Reschedule, + }.freeze + + # returns OnConflict::NullStrategy when no other could be found + def self.find_strategy(strategy) + STRATEGIES.fetch(strategy.to_s.to_sym) { OnConflict::NullStrategy } + end + end +end diff --git a/lib/sidekiq_unique_jobs/on_conflict/log.rb b/lib/sidekiq_unique_jobs/on_conflict/log.rb new file mode 100644 index 000000000..d45a99e42 --- /dev/null +++ b/lib/sidekiq_unique_jobs/on_conflict/log.rb @@ -0,0 +1,20 @@ +# frozen_string_literal: true + +module SidekiqUniqueJobs + module OnConflict + # Strategy to log information about conflict + # + # @author Mikael Henriksson + class Log < OnConflict::Strategy + include SidekiqUniqueJobs::Logging + + # Logs an informational message about that the job was not unique + def call + log_info( + "skipping job with id (#{item[JID_KEY]}) " \ + "because unique_digest: (#{item[UNIQUE_DIGEST_KEY]}) already exists", + ) + end + end + end +end diff --git a/lib/sidekiq_unique_jobs/on_conflict/null_strategy.rb b/lib/sidekiq_unique_jobs/on_conflict/null_strategy.rb new file mode 100644 index 000000000..ec7d9b415 --- /dev/null +++ b/lib/sidekiq_unique_jobs/on_conflict/null_strategy.rb @@ -0,0 +1,16 @@ +# frozen_string_literal: true + +module SidekiqUniqueJobs + module OnConflict + # Default conflict strategy class that does nothing + # + # @author Mikael Henriksson + class NullStrategy < OnConflict::Strategy + # Do nothing on conflict + # @return [nil] + def call + # NOOP + end + end + end +end diff --git a/lib/sidekiq_unique_jobs/on_conflict/raise.rb b/lib/sidekiq_unique_jobs/on_conflict/raise.rb new file mode 100644 index 000000000..549e6b24e --- /dev/null +++ b/lib/sidekiq_unique_jobs/on_conflict/raise.rb @@ -0,0 +1,17 @@ +# frozen_string_literal: true + +module SidekiqUniqueJobs + module OnConflict + # Strategy to raise an error on conflict + # + # @author Mikael Henriksson + class Raise < OnConflict::Strategy + # Raise an error on conflict. + # This will cause Sidekiq to retry the job + # @raise [SidekiqUniqueJobs::Conflict] + def call + fail SidekiqUniqueJobs::Conflict, item + end + end + end +end diff --git a/lib/sidekiq_unique_jobs/on_conflict/reject.rb b/lib/sidekiq_unique_jobs/on_conflict/reject.rb new file mode 100644 index 000000000..eca10e889 --- /dev/null +++ b/lib/sidekiq_unique_jobs/on_conflict/reject.rb @@ -0,0 +1,72 @@ +# frozen_string_literal: true + +module SidekiqUniqueJobs + module OnConflict + # Strategy to send jobs to dead queue + # + # @author Mikael Henriksson + class Reject < OnConflict::Strategy + # Send jobs to dead queue + def call + log_debug { "Rejecting job with jid: #{item[JID_KEY]}" } + send_to_deadset + end + + def send_to_deadset + log_info { "Adding dead #{item[CLASS_KEY]} job #{item[JID_KEY]}" } + + if deadset_kill? + deadset_kill + else + push_to_deadset + end + end + + def deadset_kill? + deadset.respond_to?(:kill) + end + + def deadset_kill + if kill_with_options? + kill_job_with_options + else + kill_job_without_options + end + end + + def kill_with_options? + Sidekiq::DeadSet.instance_method(:kill).arity > 1 + end + + def kill_job_without_options + deadset.kill(payload) + end + + def kill_job_with_options + deadset.kill(payload, notify_failure: false) + end + + def deadset + @deadset ||= Sidekiq::DeadSet.new + end + + def push_to_deadset + Sidekiq.redis do |conn| + conn.multi do + conn.zadd('dead', current_time, payload) + conn.zremrangebyscore('dead', '-inf', current_time - Sidekiq::DeadSet.timeout) + conn.zremrangebyrank('dead', 0, -Sidekiq::DeadSet.max_jobs) + end + end + end + + def current_time + @current_time ||= Time.now.to_f + end + + def payload + @payload ||= Sidekiq.dump_json(item) + end + end + end +end diff --git a/lib/sidekiq_unique_jobs/on_conflict/reschedule.rb b/lib/sidekiq_unique_jobs/on_conflict/reschedule.rb new file mode 100644 index 000000000..d24ab0f67 --- /dev/null +++ b/lib/sidekiq_unique_jobs/on_conflict/reschedule.rb @@ -0,0 +1,24 @@ +# frozen_string_literal: true + +module SidekiqUniqueJobs + module OnConflict + # Strategy to reschedule job on conflict + # + # @author Mikael Henriksson + class Reschedule < OnConflict::Strategy + include SidekiqUniqueJobs::SidekiqWorkerMethods + + # @param [Hash] item sidekiq job hash + def initialize(item) + super + @worker_class = item[CLASS_KEY] + end + + # Create a new job from the current one. + # This will mess up sidekiq stats because a new job is created + def call + worker_class&.perform_in(5, *item[ARGS_KEY]) if sidekiq_worker_class? + end + end + end +end diff --git a/lib/sidekiq_unique_jobs/on_conflict/strategy.rb b/lib/sidekiq_unique_jobs/on_conflict/strategy.rb new file mode 100644 index 000000000..bb648be2d --- /dev/null +++ b/lib/sidekiq_unique_jobs/on_conflict/strategy.rb @@ -0,0 +1,28 @@ +# frozen_string_literal: true + +module SidekiqUniqueJobs + module OnConflict + # Abstract conflict strategy class + # + # @abstract + # @author Mikael Henriksson + class Strategy + include SidekiqUniqueJobs::Logging + + # The sidekiq job hash + # @return [Hash] the Sidekiq job hash + attr_reader :item + + # @param [Hash] item the Sidekiq job hash + def initialize(item) + @item = item + end + + # Use strategy on conflict + # @raise [NotImplementedError] needs to be implemented in child class + def call + fail NotImplementedError, 'needs to be implemented in child class' + end + end + end +end diff --git a/lib/sidekiq_unique_jobs/options_with_fallback.rb b/lib/sidekiq_unique_jobs/options_with_fallback.rb index f74058513..8c9df4325 100644 --- a/lib/sidekiq_unique_jobs/options_with_fallback.rb +++ b/lib/sidekiq_unique_jobs/options_with_fallback.rb @@ -40,13 +40,17 @@ def lock def lock_class @lock_class ||= begin LOCKS.fetch(lock_type.to_sym) do - fail UnknownLock, "No implementation for `unique: :#{lock_type}`" + fail UnknownLock, "No implementation for `lock: :#{lock_type}`" end end end def lock_type - @lock_type ||= options[UNIQUE_KEY] || item[UNIQUE_KEY] + @lock_type ||= options[LOCK_KEY] || item[LOCK_KEY] || unique_type + end + + def unique_type + options[UNIQUE_KEY] || item[UNIQUE_KEY] end def options diff --git a/lib/sidekiq_unique_jobs/timeout/calculator.rb b/lib/sidekiq_unique_jobs/timeout/calculator.rb index 875612d6e..fb7905d12 100644 --- a/lib/sidekiq_unique_jobs/timeout/calculator.rb +++ b/lib/sidekiq_unique_jobs/timeout/calculator.rb @@ -20,10 +20,6 @@ def scheduled_at @scheduled_at ||= item[AT_KEY] end - def seconds - raise NotImplementedError, "##{__method__} needs to be implemented in #{self.class}" - end - def lock_expiration @lock_expiration ||= begin expiration = item[LOCK_EXPIRATION_KEY] diff --git a/lib/sidekiq_unique_jobs/unique_args.rb b/lib/sidekiq_unique_jobs/unique_args.rb index 32712e0e8..f875d1a56 100644 --- a/lib/sidekiq_unique_jobs/unique_args.rb +++ b/lib/sidekiq_unique_jobs/unique_args.rb @@ -43,7 +43,7 @@ def unique_prefix def digestable_hash @item.slice(CLASS_KEY, QUEUE_KEY, UNIQUE_ARGS_KEY).tap do |hash| - hash.delete(QUEUE_KEY) if unique_on_all_queues? + hash.delete(QUEUE_KEY) if unique_across_queues? hash.delete(CLASS_KEY) if unique_across_workers? end end @@ -53,7 +53,7 @@ def unique_args(args) args end - def unique_on_all_queues? + def unique_across_queues? item[UNIQUE_ACROSS_QUEUES_KEY] || worker_options[UNIQUE_ACROSS_QUEUES_KEY] || item[UNIQUE_ON_ALL_QUEUES_KEY] || worker_options[UNIQUE_ON_ALL_QUEUES_KEY] # TODO: Remove in v 6.1 end @@ -99,7 +99,11 @@ def filter_by_symbol(args) def unique_args_method @unique_args_method ||= worker_options[UNIQUE_ARGS_KEY] @unique_args_method ||= :unique_args if worker_method_defined?(:unique_args) - @unique_args_method ||= Sidekiq.default_worker_options.stringify_keys[UNIQUE_ARGS_KEY] + @unique_args_method ||= default_unique_args_method + end + + def default_unique_args_method + Sidekiq.default_worker_options.stringify_keys[UNIQUE_ARGS_KEY] end end end diff --git a/rails_example/app/workers/simple_worker.rb b/rails_example/app/workers/simple_worker.rb index 68edfe321..b7df430ce 100644 --- a/rails_example/app/workers/simple_worker.rb +++ b/rails_example/app/workers/simple_worker.rb @@ -2,7 +2,7 @@ class SimpleWorker include Sidekiq::Worker - sidekiq_options unique: :until_executed, + sidekiq_options lock: :until_executed, queue: :default, unique_args: (lambda do |args| [args.first] diff --git a/rails_example/app/workers/slow_until_executing_worker.rb b/rails_example/app/workers/slow_until_executing_worker.rb index e24d71245..02ac27cdc 100644 --- a/rails_example/app/workers/slow_until_executing_worker.rb +++ b/rails_example/app/workers/slow_until_executing_worker.rb @@ -2,7 +2,7 @@ class SlowUntilExecutingWorker include Sidekiq::Worker - sidekiq_options unique: :until_executing, + sidekiq_options lock: :until_executing, queue: :default, unique_args: (lambda do |args| [args.first] diff --git a/spec/examples/another_unique_job_spec.rb b/spec/examples/another_unique_job_spec.rb index c07363d95..8873750c9 100644 --- a/spec/examples/another_unique_job_spec.rb +++ b/spec/examples/another_unique_job_spec.rb @@ -9,7 +9,7 @@ 'queue' => :working2, 'retry' => 1, 'backtrace' => 10, - 'unique' => :until_executed, + 'lock' => :until_executed, } end end diff --git a/spec/examples/custom_queue_job_with_filter_method_spec.rb b/spec/examples/custom_queue_job_with_filter_method_spec.rb index afd8a119b..b0172db91 100644 --- a/spec/examples/custom_queue_job_with_filter_method_spec.rb +++ b/spec/examples/custom_queue_job_with_filter_method_spec.rb @@ -8,7 +8,7 @@ { 'queue' => :customqueue, 'retry' => true, - 'unique' => :until_executed, + 'lock' => :until_executed, 'unique_args' => :args_filter, } end diff --git a/spec/examples/custom_queue_job_with_filter_proc_spec.rb b/spec/examples/custom_queue_job_with_filter_proc_spec.rb index 8b3cb7450..a2af87fd9 100644 --- a/spec/examples/custom_queue_job_with_filter_proc_spec.rb +++ b/spec/examples/custom_queue_job_with_filter_proc_spec.rb @@ -8,7 +8,7 @@ { 'queue' => :customqueue, 'retry' => true, - 'unique' => :until_expired, + 'lock' => :until_expired, 'unique_args' => a_kind_of(Proc), } end diff --git a/spec/examples/expiring_job_spec.rb b/spec/examples/expiring_job_spec.rb index 5898a358d..e884beae4 100644 --- a/spec/examples/expiring_job_spec.rb +++ b/spec/examples/expiring_job_spec.rb @@ -8,7 +8,7 @@ { 'lock_expiration' => 600, 'retry' => true, - 'unique' => :until_executed, + 'lock' => :until_executed, } end end diff --git a/spec/examples/inline_worker_spec.rb b/spec/examples/inline_worker_spec.rb index 36efea70b..64be1cd83 100644 --- a/spec/examples/inline_worker_spec.rb +++ b/spec/examples/inline_worker_spec.rb @@ -8,7 +8,7 @@ { 'lock_timeout' => 5, 'retry' => true, - 'unique' => :while_executing, + 'lock' => :while_executing, } end end diff --git a/spec/examples/just_a_worker_spec.rb b/spec/examples/just_a_worker_spec.rb index 867ae21ae..b43c0a7da 100644 --- a/spec/examples/just_a_worker_spec.rb +++ b/spec/examples/just_a_worker_spec.rb @@ -8,7 +8,7 @@ { 'queue' => :testqueue, 'retry' => true, - 'unique' => :until_executed, + 'lock' => :until_executed, } end end diff --git a/spec/examples/long_running_job_spec.rb b/spec/examples/long_running_job_spec.rb index 4d49dbdfd..859c331f6 100644 --- a/spec/examples/long_running_job_spec.rb +++ b/spec/examples/long_running_job_spec.rb @@ -7,10 +7,9 @@ let(:options) do { 'queue' => :customqueue, - 'retry' => true, - 'unique' => :until_and_while_executing, + 'retry' => 10, + 'lock' => :until_and_while_executing, 'lock_expiration' => 7_200, - 'retry_count' => 10, } end end diff --git a/spec/examples/main_job_spec.rb b/spec/examples/main_job_spec.rb index 64ba7b772..deddf986a 100644 --- a/spec/examples/main_job_spec.rb +++ b/spec/examples/main_job_spec.rb @@ -9,7 +9,7 @@ 'log_duplicate_payload' => true, 'queue' => :customqueue, 'retry' => true, - 'unique' => :until_executed, + 'lock' => :until_executed, } end end diff --git a/spec/examples/my_unique_job_spec.rb b/spec/examples/my_unique_job_spec.rb index d0e13022b..d4a490f48 100644 --- a/spec/examples/my_unique_job_spec.rb +++ b/spec/examples/my_unique_job_spec.rb @@ -6,11 +6,10 @@ it_behaves_like 'sidekiq with options' do let(:options) do { - 'queue' => :customqueue, - 'retry' => true, - 'retry_count' => 10, + 'lock' => :until_executed, 'lock_expiration' => 7_200, - 'unique' => :until_executed, + 'queue' => :customqueue, + 'retry' => 10, } end end diff --git a/spec/examples/my_unique_job_with_filter_method_spec.rb b/spec/examples/my_unique_job_with_filter_method_spec.rb index 122533394..e69cf3141 100644 --- a/spec/examples/my_unique_job_with_filter_method_spec.rb +++ b/spec/examples/my_unique_job_with_filter_method_spec.rb @@ -9,7 +9,7 @@ 'backtrace' => true, 'queue' => :customqueue, 'retry' => true, - 'unique' => :until_executed, + 'lock' => :until_executed, 'unique_args' => :filtered_args, } end diff --git a/spec/examples/my_unique_job_with_filter_proc_spec.rb b/spec/examples/my_unique_job_with_filter_proc_spec.rb index 48dee0110..04b61eee4 100644 --- a/spec/examples/my_unique_job_with_filter_proc_spec.rb +++ b/spec/examples/my_unique_job_with_filter_proc_spec.rb @@ -9,7 +9,7 @@ 'backtrace' => true, 'queue' => :customqueue, 'retry' => true, - 'unique' => :until_executed, + 'lock' => :until_executed, } end end diff --git a/spec/examples/notify_worker_spec.rb b/spec/examples/notify_worker_spec.rb index 0be7fc916..df2db4c36 100644 --- a/spec/examples/notify_worker_spec.rb +++ b/spec/examples/notify_worker_spec.rb @@ -8,7 +8,7 @@ { 'queue' => :notify_worker, 'retry' => true, - 'unique' => :until_executed, + 'lock' => :until_executed, } end end diff --git a/spec/examples/simple_worker_spec.rb b/spec/examples/simple_worker_spec.rb index 5fb9f1314..104df0dc8 100644 --- a/spec/examples/simple_worker_spec.rb +++ b/spec/examples/simple_worker_spec.rb @@ -8,7 +8,7 @@ { 'queue' => :default, 'retry' => true, - 'unique' => :until_executed, + 'lock' => :until_executed, } end end diff --git a/spec/examples/unique_across_workers_job_spec.rb b/spec/examples/unique_across_workers_job_spec.rb index bf28212ea..fb877269d 100644 --- a/spec/examples/unique_across_workers_job_spec.rb +++ b/spec/examples/unique_across_workers_job_spec.rb @@ -6,8 +6,8 @@ it_behaves_like 'sidekiq with options' do let(:options) do { - 'retry' => true, - 'unique' => :until_executed, + 'retry' => true, + 'lock' => :until_executed, 'unique_across_workers' => true, } end diff --git a/spec/examples/unique_job_on_conflict_raise_spec.rb b/spec/examples/unique_job_on_conflict_raise_spec.rb new file mode 100644 index 000000000..0880b3495 --- /dev/null +++ b/spec/examples/unique_job_on_conflict_raise_spec.rb @@ -0,0 +1,20 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe UniqueJobOnConflictRaise do + it_behaves_like 'sidekiq with options' do + let(:options) do + { + 'lock' => :while_executing, + 'on_conflict' => :raise, + 'queue' => :customqueue, + 'retry' => true, + } + end + end + + it_behaves_like 'a performing worker' do + let(:args) { ['hundred', 'type' => 'extremely unique', 'id' => 44] } + end +end diff --git a/spec/examples/unique_job_on_conflict_reject_spec.rb b/spec/examples/unique_job_on_conflict_reject_spec.rb new file mode 100644 index 000000000..70445fd75 --- /dev/null +++ b/spec/examples/unique_job_on_conflict_reject_spec.rb @@ -0,0 +1,20 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe UniqueJobOnConflictReject do + it_behaves_like 'sidekiq with options' do + let(:options) do + { + 'lock' => :while_executing, + 'on_conflict' => :reject, + 'queue' => :customqueue, + 'retry' => true, + } + end + end + + it_behaves_like 'a performing worker' do + let(:args) { ['hundred', 'type' => 'extremely unique', 'id' => 44] } + end +end diff --git a/spec/examples/unique_job_on_conflict_reschedule_spec.rb b/spec/examples/unique_job_on_conflict_reschedule_spec.rb new file mode 100644 index 000000000..53495b90f --- /dev/null +++ b/spec/examples/unique_job_on_conflict_reschedule_spec.rb @@ -0,0 +1,20 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe UniqueJobOnConflictReschedule do + it_behaves_like 'sidekiq with options' do + let(:options) do + { + 'lock' => :while_executing, + 'on_conflict' => :reschedule, + 'queue' => :customqueue, + 'retry' => true, + } + end + end + + it_behaves_like 'a performing worker' do + let(:args) { ['hundred', 'type' => 'extremely unique', 'id' => 44] } + end +end diff --git a/spec/examples/unique_job_with_nil_unique_args_spec.rb b/spec/examples/unique_job_with_nil_unique_args_spec.rb index d7555c7ad..27f484a2e 100644 --- a/spec/examples/unique_job_with_nil_unique_args_spec.rb +++ b/spec/examples/unique_job_with_nil_unique_args_spec.rb @@ -9,7 +9,7 @@ 'backtrace' => true, 'queue' => :customqueue, 'retry' => true, - 'unique' => :until_executed, + 'lock' => :until_executed, 'unique_args' => :unique_args, } end diff --git a/spec/examples/unique_job_with_no_unique_args_method_spec.rb b/spec/examples/unique_job_with_no_unique_args_method_spec.rb index 5c024e8a0..6c583a5f7 100644 --- a/spec/examples/unique_job_with_no_unique_args_method_spec.rb +++ b/spec/examples/unique_job_with_no_unique_args_method_spec.rb @@ -9,7 +9,7 @@ 'backtrace' => true, 'queue' => :customqueue, 'retry' => true, - 'unique' => :until_executed, + 'lock' => :until_executed, 'unique_args' => :filtered_args, } end diff --git a/spec/examples/unique_job_without_unique_args_parameter_spec.rb b/spec/examples/unique_job_without_unique_args_parameter_spec.rb index f81aa19ac..726d58680 100644 --- a/spec/examples/unique_job_without_unique_args_parameter_spec.rb +++ b/spec/examples/unique_job_without_unique_args_parameter_spec.rb @@ -9,7 +9,7 @@ 'backtrace' => true, 'queue' => :customqueue, 'retry' => true, - 'unique' => :until_executed, + 'lock' => :until_executed, 'unique_args' => :unique_args, } end diff --git a/spec/examples/unique_on_all_queues_job_spec.rb b/spec/examples/unique_on_all_queues_job_spec.rb index 70cc5af06..513550878 100644 --- a/spec/examples/unique_on_all_queues_job_spec.rb +++ b/spec/examples/unique_on_all_queues_job_spec.rb @@ -6,8 +6,8 @@ it_behaves_like 'sidekiq with options' do let(:options) do { - 'retry' => true, - 'unique' => :until_executed, + 'retry' => true, + 'lock' => :until_executed, 'unique_on_all_queues' => true, } end diff --git a/spec/examples/until_and_while_executing_job_spec.rb b/spec/examples/until_and_while_executing_job_spec.rb index 26345ff22..57fc0717d 100644 --- a/spec/examples/until_and_while_executing_job_spec.rb +++ b/spec/examples/until_and_while_executing_job_spec.rb @@ -8,7 +8,7 @@ { 'queue' => :working, 'retry' => true, - 'unique' => :until_and_while_executing, + 'lock' => :until_and_while_executing, } end end diff --git a/spec/examples/until_executed2_job_spec.rb b/spec/examples/until_executed2_job_spec.rb index 310733b98..d10457fa4 100644 --- a/spec/examples/until_executed2_job_spec.rb +++ b/spec/examples/until_executed2_job_spec.rb @@ -6,11 +6,11 @@ it_behaves_like 'sidekiq with options' do let(:options) do { - 'backtrace' => 10, + 'backtrace' => 10, + 'lock' => :until_executed, + 'lock_timeout' => 0, 'queue' => :working, 'retry' => 1, - 'lock_timeout' => 0, - 'unique' => :until_executed, } end end diff --git a/spec/examples/until_executed_job_spec.rb b/spec/examples/until_executed_job_spec.rb index 9853846d0..c82416655 100644 --- a/spec/examples/until_executed_job_spec.rb +++ b/spec/examples/until_executed_job_spec.rb @@ -11,7 +11,7 @@ 'lock_expiration' => 5000, 'queue' => :working, 'retry' => 1, - 'unique' => :until_executed, + 'lock' => :until_executed, } end end diff --git a/spec/examples/until_executing_job_spec.rb b/spec/examples/until_executing_job_spec.rb index b84bbf17f..f9437a9a3 100644 --- a/spec/examples/until_executing_job_spec.rb +++ b/spec/examples/until_executing_job_spec.rb @@ -8,7 +8,7 @@ { 'queue' => :working, 'retry' => true, - 'unique' => :until_executing, + 'lock' => :until_executing, } end end diff --git a/spec/examples/until_expired_job_spec.rb b/spec/examples/until_expired_job_spec.rb index c1b8a79a9..90e88d56d 100644 --- a/spec/examples/until_expired_job_spec.rb +++ b/spec/examples/until_expired_job_spec.rb @@ -8,8 +8,8 @@ { 'lock_expiration' => 1, 'lock_timeout' => 0, - 'retry' => true, - 'unique' => :until_expired, + 'retry' => true, + 'lock' => :until_expired, } end end diff --git a/spec/examples/until_global_expired_job_spec.rb b/spec/examples/until_global_expired_job_spec.rb index 665cdfee9..a2a578602 100644 --- a/spec/examples/until_global_expired_job_spec.rb +++ b/spec/examples/until_global_expired_job_spec.rb @@ -6,8 +6,8 @@ it_behaves_like 'sidekiq with options' do let(:options) do { - 'retry' => true, - 'unique' => :until_expired, + 'retry' => true, + 'lock' => :until_expired, } end end diff --git a/spec/examples/while_executing_job_spec.rb b/spec/examples/while_executing_job_spec.rb index a1bc3b9cf..c4c2fb19e 100644 --- a/spec/examples/while_executing_job_spec.rb +++ b/spec/examples/while_executing_job_spec.rb @@ -9,7 +9,7 @@ 'backtrace' => 10, 'queue' => :working, 'retry' => 1, - 'unique' => :while_executing, + 'lock' => :while_executing, } end end diff --git a/spec/examples/without_argument_job_spec.rb b/spec/examples/without_argument_job_spec.rb index d1c880ba6..a75804719 100644 --- a/spec/examples/without_argument_job_spec.rb +++ b/spec/examples/without_argument_job_spec.rb @@ -8,7 +8,7 @@ { 'log_duplicate_payload' => true, 'retry' => true, - 'unique' => :until_executed, + 'lock' => :until_executed, } end end diff --git a/spec/integration/sidekiq_unique_jobs/legacy_lock_spec.rb b/spec/integration/sidekiq_unique_jobs/legacy_lock_spec.rb index 2955b425d..4744febae 100644 --- a/spec/integration/sidekiq_unique_jobs/legacy_lock_spec.rb +++ b/spec/integration/sidekiq_unique_jobs/legacy_lock_spec.rb @@ -20,7 +20,7 @@ 'jid' => jid_one, 'lock_expiration' => lock_expiration, 'queue' => queue, - 'unique' => unique, + 'lock' => unique, 'unique_digest' => unique_digest, } end diff --git a/spec/integration/sidekiq_unique_jobs/lock/until_and_while_executing_spec.rb b/spec/integration/sidekiq_unique_jobs/lock/until_and_while_executing_spec.rb index 57eb5f029..edf19dce8 100644 --- a/spec/integration/sidekiq_unique_jobs/lock/until_and_while_executing_spec.rb +++ b/spec/integration/sidekiq_unique_jobs/lock/until_and_while_executing_spec.rb @@ -24,7 +24,7 @@ { 'jid' => jid_one, 'class' => worker_class.to_s, 'queue' => queue, - 'unique' => unique, + 'lock' => unique, 'args' => args, 'lock_timeout' => lock_timeout } end @@ -37,136 +37,39 @@ allow(process_two).to receive(:runtime_lock).and_return(runtime_two) end - describe '#execute' do - before do - expect(process_one.lock).to eq(jid_one) - expect(process_one.locked?).to eq(true) + it_behaves_like 'a lock implementation' - expect(runtime_one.locked?).to eq(false) - expect(runtime_two.locked?).to eq(false) - end - - it 'process two cannot lock the job' do - expect(process_two.lock).to eq(nil) - expect(process_two.execute).to eq(nil) - expect(process_two.locked?).to eq(false) - end - - context 'when timeout is 0' do - let(:lock_timeout) { 0 } - - context 'when process_one executes the job in 0 seconds' do - context 'when process_one executes the job' do # rubocop:disable RSpec/NestedGroups - it 'process two can lock the job' do - process_one.execute do - expect(process_one.locked?).to eq(false) - expect(runtime_one.locked?).to eq(true) - expect(process_two.lock).to eq(jid_two) - process_two.delete! - end - end - - it 'process two cannot execute the job' do - process_one.execute do - unset = true - expect(process_two.lock).to eq(jid_two) - process_two.execute do - unset = false - end - - expect(unset).to eq(true) - process_two.delete! - end - end - end - end - - context 'when process_one executes the job in 1 seconds' do - let(:sleepy_time) { 1 } - - it 'process two can lock the job' do - process_one.execute do - expect(process_one.locked?).to eq(false) - expect(runtime_one.locked?).to eq(true) - expect(process_two.lock).to eq(jid_two) - process_two.delete! - end - end - - it 'process two cannot execute the job' do - process_one.execute do - unset = true - expect(process_two.lock).to eq(jid_two) - process_two.execute do - unset = false - end + it 'has not locked runtime_one' do + process_one.lock + expect(runtime_one.locked?).to eq(false) + end - expect(unset).to eq(true) - process_two.delete! - end - end + context 'when process_one executes the job' do + it 'releases the lock for process_one' do + process_one.execute do + expect(process_one.locked?).to eq(false) end end - context 'when timeout is 1' do - let(:lock_timeout) { 1 } - - context 'when process_one executes the job' do - it 'process two can lock the job' do - process_one.execute do - expect(process_one.locked?).to eq(false) - expect(runtime_one.locked?).to eq(true) - expect(process_two.lock).to eq(jid_two) - process_two.delete! - end - end - - it 'process two cannot execute the job' do - process_one.execute do - unset = true - expect(process_two.lock).to eq(jid_two) - process_two.execute do - unset = false - end - - expect(unset).to eq(true) - process_two.delete! - end - end + it 'is locked by runtime_one' do + process_one.execute do + expect(runtime_one.locked?).to eq(true) end end - context 'when process_one executes the job in 1 seconds' do - let(:sleepy_time) { 1 } - - it 'process two can lock the job' do - process_one.execute do - expect(process_one.locked?).to eq(false) - expect(runtime_one.locked?).to eq(true) - expect(process_two.lock).to eq(jid_two) - process_two.delete! - end + it 'allows process_two to lock' do + process_one.execute do + expect(process_two.lock).to eq(jid_two) end + end - it 'process two cannot execute the job' do - process_one.execute do - unset = true - expect(process_two.lock).to eq(jid_two) - process_two.execute do - unset = false - end - - expect(unset).to eq(true) - process_two.delete! - end + it 'process two cannot execute the job' do + process_one.execute do + process_two.lock + unset = true + process_two.execute { unset = false } + expect(unset).to eq(true) end end - - # after do - # expect(process_one.locked?).to eq(false) - # expect(process_two.locked?).to eq(false) - # expect(runtime_one.locked?).to eq(false) - # expect(runtime_two.locked?).to eq(false) - # end end end diff --git a/spec/integration/sidekiq_unique_jobs/lock/until_executed_spec.rb b/spec/integration/sidekiq_unique_jobs/lock/until_executed_spec.rb index e14c650fc..e7cedf4de 100644 --- a/spec/integration/sidekiq_unique_jobs/lock/until_executed_spec.rb +++ b/spec/integration/sidekiq_unique_jobs/lock/until_executed_spec.rb @@ -19,55 +19,68 @@ { 'jid' => jid_one, 'class' => worker_class.to_s, 'queue' => queue, - 'unique' => unique, + 'lock' => unique, 'args' => args } end let(:item_two) do { 'jid' => jid_two, 'class' => worker_class.to_s, 'queue' => queue, - 'unique' => unique, + 'lock' => unique, 'args' => args } end + before do + allow(callback).to receive(:call).and_call_original + end + + describe '#lock' do + it_behaves_like 'a lock implementation' + end + describe '#execute' do - context 'when process one has locked the job' do - before do - expect(process_one.lock).to eq(jid_one) - expect(process_one.locked?).to eq(true) - end + it_behaves_like 'an executing lock implementation' - it 'process two cannot lock the job' do - expect(process_two.lock).to eq(nil) - expect(process_two.execute).to eq(nil) - expect(process_two.locked?).to eq(false) - end + it 'unlocks after executing' do + process_one.lock + process_one.execute {} + expect(process_one.locked?).to eq(false) + end + end - context 'when worker raises an error' do - it 'keeps the lock' do - expect { process_one.execute { raise 'Hell' } } - .to raise_error('Hell') + describe '#delete' do + subject(:delete) { process_one.delete } - expect(process_one.locked?).to eq(true) + context 'when locked' do + context 'when expiration is not negative' do + it 'deletes the lock without fuss' do + worker_class.use_options(lock_expiration: nil) do + process_one.lock + expect { delete }.to change { unique_keys.size }.from(3).to(0) + end end end - context 'when process_one executes the job' do - it 'the first client process should be unlocked' do - process_one.execute do - expect(process_one.locked?).to eq(true) - expect(process_two.lock).to eq(nil) - expect(process_two.locked?).to eq(false) - - unset = true - process_two.execute do - unset = false - end - - expect(unset).to eq(true) + context 'when expiration is positive' do + it 'does not delete the lock' do + worker_class.use_options(lock_expiration: 100) do + process_one.lock + expect { delete }.not_to change(unique_keys, :size) end end end end end + + describe '#delete!' do + subject(:delete!) { process_one.delete! } + + context 'when locked' do + before { process_one.lock } + + it 'deletes the lock without fuss' do + expect { delete! }.to change { unique_keys.size }.from(3).to(0) + end + end + end end diff --git a/spec/integration/sidekiq_unique_jobs/lock/until_expired_spec.rb b/spec/integration/sidekiq_unique_jobs/lock/until_expired_spec.rb index 2a08a36d0..b89708622 100644 --- a/spec/integration/sidekiq_unique_jobs/lock/until_expired_spec.rb +++ b/spec/integration/sidekiq_unique_jobs/lock/until_expired_spec.rb @@ -16,62 +16,35 @@ let(:args) { %w[array of arguments] } let(:callback) { -> {} } let(:item_one) do - { 'jid' => jid_one, + { 'jid' => jid_one, 'class' => worker_class.to_s, 'queue' => queue, - 'unique' => unique, - 'args' => args } + 'lock' => unique, + 'args' => args } end let(:item_two) do - { 'jid' => jid_two, + { 'jid' => jid_two, 'class' => worker_class.to_s, 'queue' => queue, - 'unique' => unique, - 'args' => args } + 'lock' => unique, + 'args' => args } end before do allow(callback).to receive(:call).and_call_original end - describe '#execute' do - it 'process one can be locked' do - expect(process_one.lock).to eq(jid_one) - expect(process_one.locked?).to eq(true) - end - - context 'when process one has locked the job' do - before do - process_one.lock - end - - it 'process two cannot achieve a lock' do - expect(process_two.lock).to eq(nil) - end - - it 'process two cannot execute the lock' do - unset = true - process_two.execute do - unset = false - end - - expect(unset).to eq(true) - end - - it 'process one can execute the job' do - set = false - process_one.execute do - set = true - end - - expect(set).to eq(true) - end + describe '#lock' do + it_behaves_like 'a lock implementation' + end - it 'the job is still locked after executing' do - process_one.execute {} + describe '#execute' do + it_behaves_like 'an executing lock implementation' - expect(process_one.locked?).to eq(true) - end + it 'keeps lock after executing' do + process_one.lock + process_one.execute {} + expect(process_one.locked?).to eq(true) end end diff --git a/spec/integration/sidekiq_unique_jobs/lock/while_executing_reject_spec.rb b/spec/integration/sidekiq_unique_jobs/lock/while_executing_reject_spec.rb index 9956dba1b..944dcaa98 100644 --- a/spec/integration/sidekiq_unique_jobs/lock/while_executing_reject_spec.rb +++ b/spec/integration/sidekiq_unique_jobs/lock/while_executing_reject_spec.rb @@ -19,14 +19,14 @@ { 'jid' => jid_one, 'class' => worker_class.to_s, 'queue' => queue, - 'unique' => unique, + 'lock' => unique, 'args' => args } end let(:item_two) do { 'jid' => jid_two, 'class' => worker_class.to_s, 'queue' => queue, - 'unique' => unique, + 'lock' => unique, 'args' => args } end @@ -55,14 +55,18 @@ end end end + it_behaves_like 'rejects job to deadset' context 'when Sidekiq::DeadSet respond to kill' do it_behaves_like 'rejects job to deadset' end context 'when Sidekiq::DeadSet does not respond to kill' do + let(:strategy) { SidekiqUniqueJobs::OnConflict::Reject.new(item_two) } + before do - allow(process_two).to receive(:deadset_kill?).and_return(false) + allow(strategy).to receive(:deadset_kill?).and_return(false) + allow(process_two).to receive(:strategy).and_return(strategy) end it_behaves_like 'rejects job to deadset' diff --git a/spec/integration/sidekiq_unique_jobs/lock/while_executing_spec.rb b/spec/integration/sidekiq_unique_jobs/lock/while_executing_spec.rb index ad2b6e440..c49667962 100644 --- a/spec/integration/sidekiq_unique_jobs/lock/while_executing_spec.rb +++ b/spec/integration/sidekiq_unique_jobs/lock/while_executing_spec.rb @@ -19,14 +19,14 @@ { 'jid' => jid_one, 'class' => worker_class.to_s, 'queue' => queue, - 'unique' => unique, + 'lock' => unique, 'args' => args } end let(:item_two) do { 'jid' => jid_two, 'class' => worker_class.to_s, 'queue' => queue, - 'unique' => unique, + 'lock' => unique, 'args' => args } end @@ -34,7 +34,7 @@ allow(callback).to receive(:call).and_call_original end - describe '#execute' do + describe '#lock' do it 'does not lock jobs' do expect(process_one.lock).to eq(true) expect(process_one.locked?).to eq(false) @@ -42,8 +42,10 @@ expect(process_two.lock).to eq(true) expect(process_two.locked?).to eq(false) end + end - context 'when job is executing' do + describe '#execute' do + context 'when executing' do it 'locks the process' do process_one.execute do expect(process_one.locked?).to eq(true) @@ -51,20 +53,14 @@ end it 'calls back' do - process_one.execute do - # NO OP - end + process_one.execute {} expect(callback).to have_received(:call) end it 'prevents other processes from executing' do process_one.execute do - expect(process_two.lock).to eq(true) - expect(process_two.locked?).to eq(false) unset = true - process_two.execute do - unset = false - end + process_two.execute { unset = false } expect(unset).to eq(true) end diff --git a/spec/integration/sidekiq_unique_jobs/server/middleware/until_and_while_executing_spec.rb b/spec/integration/sidekiq_unique_jobs/server/middleware/until_and_while_executing_spec.rb index 843049280..a940d28a4 100644 --- a/spec/integration/sidekiq_unique_jobs/server/middleware/until_and_while_executing_spec.rb +++ b/spec/integration/sidekiq_unique_jobs/server/middleware/until_and_while_executing_spec.rb @@ -19,7 +19,7 @@ { 'jid' => jid_one, 'class' => worker_class.to_s, 'queue' => queue, - 'unique' => unique, + 'lock' => unique, 'args' => args, 'lock_timeout' => lock_timeout } end diff --git a/spec/support/shared_examples/a_lockable_lock.rb b/spec/support/shared_examples/a_lockable_lock.rb new file mode 100644 index 000000000..dc3f0af4c --- /dev/null +++ b/spec/support/shared_examples/a_lockable_lock.rb @@ -0,0 +1,65 @@ +# frozen_string_literal: true + +RSpec.shared_examples 'a lock implementation' do + it 'can be locked' do + expect(process_one.lock).to eq(jid_one) + end + + context 'when process one has locked the job' do + before { process_one.lock } + + it 'has locked process_one' do + expect(process_one.locked?).to eq(true) + end + + it 'prevents process_two from locking' do + expect(process_two.lock).to eq(nil) + end + + it 'prevents process_two from executing' do + expect(process_two.execute {}).to eq(nil) + end + end +end + +RSpec.shared_examples 'an executing lock implementation' do + context 'when job has not been locked' do + it 'does not execute' do + unset = true + process_one.execute { unset = false } + expect(unset).to eq(true) + end + end + + context 'when process_one executes the job' do + before { process_one.lock } + + it 'keeps being locked while executing' do + process_one.execute do + expect(process_one.locked?).to eq(true) + end + end + + it 'keeps being locked when an error is raised' do + expect { process_one.execute { raise 'Hell' } } + .to raise_error('Hell') + + expect(process_one.locked?).to eq(true) + end + + it 'prevents process_two from locking' do + process_one.execute do + expect(process_two.lock).to eq(nil) + expect(process_two.locked?).to eq(false) + end + end + + it 'prevents process_two from executing' do + process_one.execute do + unset = true + process_two.execute { unset = false } + expect(unset).to eq(true) + end + end + end +end diff --git a/spec/unit/sidekiq_unique_jobs/lock/base_lock_spec.rb b/spec/unit/sidekiq_unique_jobs/lock/base_lock_spec.rb index f7a62132d..da88ff1fb 100644 --- a/spec/unit/sidekiq_unique_jobs/lock/base_lock_spec.rb +++ b/spec/unit/sidekiq_unique_jobs/lock/base_lock_spec.rb @@ -11,7 +11,7 @@ 'jid' => 'maaaahjid', 'queue' => 'default', 'class' => 'UntilExecutedJob', - 'unique' => :until_executed, + 'lock' => :until_executed, 'args' => [1], } end @@ -61,6 +61,14 @@ it { is_expected.to eq('deleted') } end + describe '#delete!' do + subject { lock.delete! } + + before { allow(locksmith).to receive(:delete!).and_return('deleted') } + + it { is_expected.to eq('deleted') } + end + describe '#locked?' do it do allow(locksmith).to receive(:locked?).and_return(true) diff --git a/spec/unit/sidekiq_unique_jobs/lock/until_and_while_executing_spec.rb b/spec/unit/sidekiq_unique_jobs/lock/until_and_while_executing_spec.rb index b12db5b37..7d9768445 100644 --- a/spec/unit/sidekiq_unique_jobs/lock/until_and_while_executing_spec.rb +++ b/spec/unit/sidekiq_unique_jobs/lock/until_and_while_executing_spec.rb @@ -10,7 +10,7 @@ { 'jid' => 'maaaahjid', 'class' => 'UntilAndWhileExecutingJob', - 'unique' => 'until_and_while_executing', + 'lock' => 'until_and_while_executing', 'args' => ['one'], } end diff --git a/spec/unit/sidekiq_unique_jobs/lock/until_executed_spec.rb b/spec/unit/sidekiq_unique_jobs/lock/until_executed_spec.rb index df1929756..7885edbca 100644 --- a/spec/unit/sidekiq_unique_jobs/lock/until_executed_spec.rb +++ b/spec/unit/sidekiq_unique_jobs/lock/until_executed_spec.rb @@ -10,7 +10,7 @@ { 'jid' => 'maaaahjid', 'class' => 'UntilExecutedJob', - 'unique' => 'until_executed', + 'lock' => 'until_executed', 'args' => %w[one two], } end @@ -48,8 +48,8 @@ expect(lock).to have_received(:log_warn) .with( - 'The lock for uniquejobs:1b9f2f0624489ccf4e07ac88beae6ce0' \ - ' has been released but the #after_unlock callback failed!', + 'The unique_key: uniquejobs:1b9f2f0624489ccf4e07ac88beae6ce0' \ + ' has been unlocked but the #after_unlock callback failed!', ) end end diff --git a/spec/unit/sidekiq_unique_jobs/lock/until_executing_spec.rb b/spec/unit/sidekiq_unique_jobs/lock/until_executing_spec.rb index 565d5af15..6ce207c3a 100644 --- a/spec/unit/sidekiq_unique_jobs/lock/until_executing_spec.rb +++ b/spec/unit/sidekiq_unique_jobs/lock/until_executing_spec.rb @@ -9,7 +9,7 @@ let(:item) do { 'jid' => 'maaaahjid', 'class' => 'UntilExpiredJob', - 'unique' => 'until_timeout' } + 'lock' => 'until_timeout' } end describe '#execute' do diff --git a/spec/unit/sidekiq_unique_jobs/lock/until_expired_spec.rb b/spec/unit/sidekiq_unique_jobs/lock/until_expired_spec.rb index 2ab222847..736991d94 100644 --- a/spec/unit/sidekiq_unique_jobs/lock/until_expired_spec.rb +++ b/spec/unit/sidekiq_unique_jobs/lock/until_expired_spec.rb @@ -9,7 +9,7 @@ let(:item) do { 'jid' => 'maaaahjid', 'class' => 'UntilExpiredJob', - 'unique' => 'until_timeout' } + 'lock' => 'until_timeout' } end describe '#unlock' do diff --git a/spec/unit/sidekiq_unique_jobs/lock/while_executing_reject_spec.rb b/spec/unit/sidekiq_unique_jobs/lock/while_executing_reject_spec.rb index cac1d7f64..0fb93fec0 100644 --- a/spec/unit/sidekiq_unique_jobs/lock/while_executing_reject_spec.rb +++ b/spec/unit/sidekiq_unique_jobs/lock/while_executing_reject_spec.rb @@ -6,19 +6,15 @@ include_context 'with a stubbed locksmith' let(:lock) { described_class.new(item, callback) } let(:callback) { -> {} } - let(:deadset) { instance_spy(Sidekiq::DeadSet) } - let(:payload) { instance_spy('payload') } let(:item) do { 'jid' => 'maaaahjid', 'class' => 'WhileExecutingRejectJob', - 'unique' => 'while_executing_reject', + 'lock' => 'while_executing_reject', 'args' => [%w[array of arguments]] } end before do - allow(lock).to receive(:deadset).and_return(deadset) allow(lock).to receive(:unlock) - allow(lock).to receive(:payload).and_return(payload) end describe '#lock' do @@ -28,7 +24,7 @@ end describe '#execute' do - subject(:execute) { lock.execute } + subject(:execute) { lock.execute {} } let(:token) { nil } @@ -50,73 +46,10 @@ let(:token) { nil } it 'rejects the job' do - expect(lock).to receive(:reject) execute expect(lock).not_to have_received(:with_cleanup) end end end - - describe '#send_to_deadset' do - subject(:send_to_deadset) { lock.send_to_deadset } - - context 'when deadset_kill?' do - before { allow(lock).to receive(:deadset_kill?).and_return(true) } - - it 'calls deadset_kill' do - expect(lock).to receive(:deadset_kill) - send_to_deadset - end - end - - context 'when not deadset_kill?' do - before { allow(lock).to receive(:deadset_kill?).and_return(false) } - - it 'calls push_to_deadset' do - expect(lock).to receive(:push_to_deadset) - send_to_deadset - end - end - end - - describe '#deadset_kill' do - subject(:deadset_kill) { lock.deadset_kill } - - context 'when kill_with_options?' do - before { allow(lock).to receive(:kill_with_options?).and_return(true) } - - it 'calls kill_job_with_options' do - expect(lock).to receive(:kill_job_with_options) - deadset_kill - end - end - - context 'when not kill_with_options?' do - before { allow(lock).to receive(:kill_with_options?).and_return(false) } - - it 'calls kill_job_without_options' do - expect(lock).to receive(:kill_job_without_options) - deadset_kill - end - end - end - - describe '#kill_job_with_options' do - subject(:kill_job_with_options) { lock.kill_job_with_options } - - it 'calls deadset.kill with options hash', sidekiq_ver: '>= 5.1.0' do - expect(deadset).to receive(:kill).with(payload, notify_failure: false) - kill_job_with_options - end - end - - describe '#kill_job_without_options' do - subject(:kill_job_without_options) { lock.kill_job_without_options } - - it 'calls deadset.kill without options hash', sidekiq_ver: '>= 5.0.0 && < 5.1.0' do - expect(deadset).to receive(:kill).with(payload) - kill_job_without_options - end - end end diff --git a/spec/unit/sidekiq_unique_jobs/lock/while_executing_spec.rb b/spec/unit/sidekiq_unique_jobs/lock/while_executing_spec.rb index ba73f6a54..4246d14fb 100644 --- a/spec/unit/sidekiq_unique_jobs/lock/while_executing_spec.rb +++ b/spec/unit/sidekiq_unique_jobs/lock/while_executing_spec.rb @@ -10,7 +10,7 @@ let(:item) do { 'jid' => 'maaaahjid', 'class' => 'WhileExecutingJob', - 'unique' => 'while_executing', + 'lock' => 'while_executing', 'args' => [%w[array of arguments]] } end diff --git a/spec/unit/sidekiq_unique_jobs/on_conflict/log_spec.rb b/spec/unit/sidekiq_unique_jobs/on_conflict/log_spec.rb new file mode 100644 index 000000000..7e0434953 --- /dev/null +++ b/spec/unit/sidekiq_unique_jobs/on_conflict/log_spec.rb @@ -0,0 +1,22 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe SidekiqUniqueJobs::OnConflict::Log do + let(:strategy) { described_class.new(item) } + let(:unique_digest) { 'uniquejobs:random-digest-value' } + let(:jid) { 'arandomjid' } + let(:item) do + { 'unique_digest' => unique_digest, 'jid' => jid } + end + + describe '#call' do + it do + allow(strategy).to receive(:log_info) + strategy.call + expect(strategy).to have_received(:log_info).with( + "skipping job with id (#{jid}) because unique_digest: (#{unique_digest}) already exists", + ) + end + end +end diff --git a/spec/unit/sidekiq_unique_jobs/on_conflict/raise_spec.rb b/spec/unit/sidekiq_unique_jobs/on_conflict/raise_spec.rb new file mode 100644 index 000000000..2094f19cc --- /dev/null +++ b/spec/unit/sidekiq_unique_jobs/on_conflict/raise_spec.rb @@ -0,0 +1,22 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe SidekiqUniqueJobs::OnConflict::Raise do + let(:strategy) { described_class.new(item) } + let(:unique_digest) { 'uniquejobs:random-digest-value' } + let(:item) do + { 'unique_digest' => unique_digest } + end + + describe '#call' do + let(:call) { strategy.call } + + it do + expect { call }.to raise_error( + SidekiqUniqueJobs::Conflict, + 'Item with the key: uniquejobs:random-digest-value is already scheduled or processing', + ) + end + end +end diff --git a/spec/unit/sidekiq_unique_jobs/on_conflict/reject_spec.rb b/spec/unit/sidekiq_unique_jobs/on_conflict/reject_spec.rb new file mode 100644 index 000000000..8d1e91d87 --- /dev/null +++ b/spec/unit/sidekiq_unique_jobs/on_conflict/reject_spec.rb @@ -0,0 +1,83 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe SidekiqUniqueJobs::OnConflict::Reject do + include_context 'with a stubbed locksmith' + let(:strategy) { described_class.new(item) } + let(:deadset) { instance_spy(Sidekiq::DeadSet) } + let(:payload) { instance_spy('payload') } + let(:item) do + { 'jid' => 'maaaahjid', + 'class' => 'WhileExecutingRejectJob', + 'lock' => 'while_executing_reject', + 'args' => [%w[array of arguments]] } + end + + before do + allow(strategy).to receive(:deadset).and_return(deadset) + allow(strategy).to receive(:payload).and_return(payload) + end + + describe '#send_to_deadset' do + subject(:send_to_deadset) { strategy.send_to_deadset } + + context 'when deadset_kill?' do + before { allow(strategy).to receive(:deadset_kill?).and_return(true) } + + it 'calls deadset_kill' do + expect(strategy).to receive(:deadset_kill) + send_to_deadset + end + end + + context 'when not deadset_kill?' do + before { allow(strategy).to receive(:deadset_kill?).and_return(false) } + + it 'calls push_to_deadset' do + expect(strategy).to receive(:push_to_deadset) + send_to_deadset + end + end + end + + describe '#deadset_kill' do + subject(:deadset_kill) { strategy.deadset_kill } + + context 'when kill_with_options?' do + before { allow(strategy).to receive(:kill_with_options?).and_return(true) } + + it 'calls kill_job_with_options' do + expect(strategy).to receive(:kill_job_with_options) + deadset_kill + end + end + + context 'when not kill_with_options?' do + before { allow(strategy).to receive(:kill_with_options?).and_return(false) } + + it 'calls kill_job_without_options' do + expect(strategy).to receive(:kill_job_without_options) + deadset_kill + end + end + end + + describe '#kill_job_with_options' do + subject(:kill_job_with_options) { strategy.kill_job_with_options } + + it 'calls deadset.kill with options hash', sidekiq_ver: '>= 5.1.0' do + expect(deadset).to receive(:kill).with(payload, notify_failure: false) + kill_job_with_options + end + end + + describe '#kill_job_without_options' do + subject(:kill_job_without_options) { strategy.kill_job_without_options } + + it 'calls deadset.kill without options hash', sidekiq_ver: '>= 5.0.0 && < 5.1.0' do + expect(deadset).to receive(:kill).with(payload) + kill_job_without_options + end + end +end diff --git a/spec/unit/sidekiq_unique_jobs/on_conflict/reschedule_spec.rb b/spec/unit/sidekiq_unique_jobs/on_conflict/reschedule_spec.rb new file mode 100644 index 000000000..e6543ca7c --- /dev/null +++ b/spec/unit/sidekiq_unique_jobs/on_conflict/reschedule_spec.rb @@ -0,0 +1,21 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe SidekiqUniqueJobs::OnConflict::Reschedule do + let(:strategy) { described_class.new(item) } + let(:unique_digest) { 'uniquejobs:random-digest-value' } + let(:item) do + { 'class' => UniqueJobOnConflictReschedule, + 'unique_digest' => unique_digest, + 'args' => [1, 2] } + end + + describe '#call' do + let(:call) { strategy.call } + + it do + expect { call }.to change { schedule_count }.by(1) + end + end +end diff --git a/spec/unit/sidekiq_unique_jobs/options_with_fallback_spec.rb b/spec/unit/sidekiq_unique_jobs/options_with_fallback_spec.rb index 47f8db5e3..51dc377cd 100644 --- a/spec/unit/sidekiq_unique_jobs/options_with_fallback_spec.rb +++ b/spec/unit/sidekiq_unique_jobs/options_with_fallback_spec.rb @@ -27,7 +27,7 @@ def initialize(item, options, worker_class = nil) 'jid' => jid, 'queue' => queue, 'class' => worker_class, - 'unique' => unique, + 'lock' => unique, 'args' => args, 'log_duplicate_payload' => log_duplicate_payload, } @@ -42,8 +42,8 @@ def initialize(item, options, worker_class = nil) it { is_expected.to eq(nil) } context 'when options["unique"] is present' do - let(:options) { { 'unique' => 'while_executing' } } - let(:item) { { 'unique' => 'until_executed' } } + let(:options) { { 'lock' => 'while_executing' } } + let(:item) { { 'lock' => 'until_executed' } } it { is_expected.to eq('while_executing') } @@ -57,7 +57,7 @@ def initialize(item, options, worker_class = nil) end context 'when item["unique"] is present' do - let(:item) { { 'unique' => 'until_executed' } } + let(:item) { { 'lock' => 'until_executed' } } it { is_expected.to eq('until_executed') } @@ -80,15 +80,15 @@ def initialize(item, options, worker_class = nil) it { is_expected.to be_truthy } context 'when options["unique"] is present' do - let(:options) { { 'unique' => 'while_executing' } } - let(:item) { { 'unique' => 'until_executed' } } + let(:options) { { 'lock' => 'while_executing' } } + let(:item) { { 'lock' => 'until_executed' } } it { is_expected.to be_falsey } end context 'when item["unique"] is present' do let(:options) { {} } - let(:item) { { 'unique' => 'until_executed' } } + let(:item) { { 'lock' => 'until_executed' } } it { is_expected.to be_falsey } end @@ -119,7 +119,7 @@ def initialize(item, options, worker_class = nil) it { is_expected.to be_a(SidekiqUniqueJobs::Lock::UntilExecuted) } context 'when options["unique"] is present' do - let(:options) { { 'unique' => :while_executing } } + let(:options) { { 'lock' => :while_executing } } it { is_expected.to be_a(SidekiqUniqueJobs::Lock::WhileExecuting) } end @@ -130,24 +130,24 @@ def initialize(item, options, worker_class = nil) subject(:lock_class) { options_with_fallback.lock_class } context 'when item["unique"] is present' do - let(:item) { { 'unique' => :until_executed } } + let(:item) { { 'lock' => :until_executed } } it { is_expected.to eq(SidekiqUniqueJobs::Lock::UntilExecuted) } context 'when options["unique"] is present' do - let(:options) { { 'unique' => :while_executing } } + let(:options) { { 'lock' => :while_executing } } it { is_expected.to eq(SidekiqUniqueJobs::Lock::WhileExecuting) } end end context 'without matching class in LOCKS' do - let(:item) { { 'unique' => :until_unknown } } + let(:item) { { 'lock' => :until_unknown } } it do expect { lock_class } .to raise_error(SidekiqUniqueJobs::UnknownLock, - 'No implementation for `unique: :until_unknown`') + 'No implementation for `lock: :until_unknown`') end end end @@ -156,15 +156,15 @@ def initialize(item, options, worker_class = nil) subject { options_with_fallback.lock_type } context 'when options["unique"] is while_executing' do - let(:options) { { 'unique' => 'while_executing' } } - let(:item) { { 'unique' => 'until_executed' } } + let(:options) { { 'lock' => 'while_executing' } } + let(:item) { { 'lock' => 'until_executed' } } it { is_expected.to eq('while_executing') } end context 'when item["unique"] is until_executed' do let(:options) { {} } - let(:item) { { 'unique' => 'until_executed' } } + let(:item) { { 'lock' => 'until_executed' } } it { is_expected.to eq('until_executed') } end @@ -181,7 +181,7 @@ def initialize(item, options, worker_class = nil) context 'when default_worker_options has been configured' do let(:worker_class) { PlainClass } - let(:default_worker_options) { { 'unique' => :while_executing } } + let(:default_worker_options) { { 'lock' => :while_executing } } it do with_default_worker_options(default_worker_options) do diff --git a/spec/unit/sidekiq_unique_jobs/sidekiq_unique_ext_spec.rb b/spec/unit/sidekiq_unique_jobs/sidekiq_unique_ext_spec.rb index 23c3544b6..a9aefa814 100644 --- a/spec/unit/sidekiq_unique_jobs/sidekiq_unique_ext_spec.rb +++ b/spec/unit/sidekiq_unique_jobs/sidekiq_unique_ext_spec.rb @@ -44,7 +44,7 @@ 'lock_timeout' => 0, 'queue' => 'testqueue', 'retry' => true, - 'unique' => 'until_executed', + 'lock' => 'until_executed', 'unique_args' => [{ 'foo' => 'bar' }], 'unique_digest' => 'uniquejobs:863b7cb639bd71c828459b97788b2ada', 'unique_prefix' => 'uniquejobs', diff --git a/spec/unit/sidekiq_unique_jobs/timeout/calculator_spec.rb b/spec/unit/sidekiq_unique_jobs/timeout/calculator_spec.rb index 7818342eb..70134542f 100644 --- a/spec/unit/sidekiq_unique_jobs/timeout/calculator_spec.rb +++ b/spec/unit/sidekiq_unique_jobs/timeout/calculator_spec.rb @@ -12,19 +12,12 @@ it { is_expected.to respond_to(:time_until_scheduled) } it { is_expected.to respond_to(:worker_class) } - it { is_expected.to respond_to(:seconds) } it { is_expected.to respond_to(:lock_expiration) } it { is_expected.to respond_to(:lock_timeout) } it { is_expected.to respond_to(:worker_options) } it { is_expected.to respond_to(:default_worker_options) } end - describe '#seconds' do - subject { -> { calculator.seconds } } - - it { is_expected.to raise_error(NotImplementedError, "#seconds needs to be implemented in #{described_class}") } - end - describe '#time_until_scheduled' do subject { calculator.time_until_scheduled } diff --git a/spec/unit/sidekiq_unique_jobs/unique_args_spec.rb b/spec/unit/sidekiq_unique_jobs/unique_args_spec.rb index 75343c2bf..49f36e73f 100644 --- a/spec/unit/sidekiq_unique_jobs/unique_args_spec.rb +++ b/spec/unit/sidekiq_unique_jobs/unique_args_spec.rb @@ -121,8 +121,8 @@ end end - describe '#unique_on_all_queues?' do - subject(:unique_on_all_queues?) { unique_args.unique_on_all_queues? } + describe '#unique_across_queues?' do + subject(:unique_across_queues?) { unique_args.unique_across_queues? } let(:worker_class) { UntilExecutedJob }