diff --git a/lib/sidekiq_unique_jobs/locksmith.rb b/lib/sidekiq_unique_jobs/locksmith.rb index 76035d61e..9fe566492 100644 --- a/lib/sidekiq_unique_jobs/locksmith.rb +++ b/lib/sidekiq_unique_jobs/locksmith.rb @@ -4,6 +4,7 @@ module SidekiqUniqueJobs # Lock manager class that handles all the various locks # # @author Mikael Henriksson + # rubocop:disable ClassLength class Locksmith include SidekiqUniqueJobs::Connection @@ -14,9 +15,11 @@ class Locksmith # @param [Sidekiq::RedisConnection, ConnectionPool] redis_pool the redis connection def initialize(item, redis_pool = nil) @concurrency = 1 # removed in a0cff5bc42edbe7190d6ede7e7f845074d2d7af6 - @expiration = item[LOCK_EXPIRATION_KEY] + @ttl = item[LOCK_EXPIRATION_KEY] @jid = item[JID_KEY] @unique_digest = item[UNIQUE_DIGEST_KEY] + @lock_type = item[LOCK_KEY] + @lock_type &&= @lock_type.to_sym @redis_pool = redis_pool end @@ -34,14 +37,14 @@ def available_count redis(redis_pool) { |conn| conn.llen(available_key) } end - # Deletes the lock unless it has an expiration set + # Deletes the lock unless it has a ttl set def delete - return if expiration + return if ttl delete! end - # Deletes the lock regardless of if it has an expiration set + # Deletes the lock regardless of if it has a ttl set def delete! Scripts.call( :delete, @@ -58,7 +61,7 @@ def delete! def lock(timeout = nil, &block) Scripts.call(:lock, redis_pool, keys: [exists_key, grabbed_key, available_key, UNIQUE_SET, unique_digest], - argv: [jid, expiration]) + argv: [jid, ttl, lock_type]) grab_token(timeout) do |token| touch_grabbed_token(token) @@ -87,7 +90,7 @@ def unlock!(token = nil) :unlock, redis_pool, keys: [exists_key, grabbed_key, available_key, version_key, UNIQUE_SET, unique_digest], - argv: [token, expiration], + argv: [token, ttl, lock_type], ) end @@ -102,7 +105,7 @@ def locked?(token = nil) private - attr_reader :concurrency, :unique_digest, :expiration, :jid, :redis_pool + attr_reader :concurrency, :unique_digest, :ttl, :jid, :redis_pool, :lock_type def grab_token(timeout = nil) redis(redis_pool) do |conn| @@ -118,7 +121,10 @@ def grab_token(timeout = nil) end def touch_grabbed_token(token) - redis(redis_pool) { |conn| conn.hset(grabbed_key, token, current_time.to_f) } + redis(redis_pool) do |conn| + conn.hset(grabbed_key, token, current_time.to_f) + conn.expire(grabbed_key, ttl) if ttl && lock_type == :until_expired + end end def return_token_or_block_value(token) @@ -161,4 +167,5 @@ def redis_time redis(&:time) end end + # rubocop:enable ClassLength end diff --git a/lib/sidekiq_unique_jobs/logging.rb b/lib/sidekiq_unique_jobs/logging.rb index 037cc2402..c5f4eb4bf 100644 --- a/lib/sidekiq_unique_jobs/logging.rb +++ b/lib/sidekiq_unique_jobs/logging.rb @@ -51,11 +51,11 @@ def log_fatal(message_or_exception = nil, &block) end def logging_context(middleware_class, job_hash) + digest = job_hash["unique_digest"] if defined?(Sidekiq::Logging) - digest = job_hash["unique_digest"] "#{middleware_class} #{"DIG-#{digest}" if digest}" else - { middleware: middleware_class, unique_digest: job_hash["unique_digest"] } + { middleware: middleware_class, unique_digest: digest } end end end diff --git a/lib/sidekiq_unique_jobs/sidekiq_unique_jobs.rb b/lib/sidekiq_unique_jobs/sidekiq_unique_jobs.rb index 9d531be6b..751867ae7 100644 --- a/lib/sidekiq_unique_jobs/sidekiq_unique_jobs.rb +++ b/lib/sidekiq_unique_jobs/sidekiq_unique_jobs.rb @@ -33,6 +33,7 @@ def logger config.logger end + # :reek:ManualDispatch def with_context(context, &block) if logger.respond_to?(:with_context) logger.with_context(context, &block) diff --git a/redis/lock.lua b/redis/lock.lua index 6593c9ffb..45e7757f7 100644 --- a/redis/lock.lua +++ b/redis/lock.lua @@ -6,8 +6,9 @@ local available_key = KEYS[3] local unique_keys = KEYS[4] local unique_digest = KEYS[5] -local job_id = ARGV[1] -local expiration = tonumber(ARGV[2]) +local job_id = ARGV[1] +local ttl = tonumber(ARGV[2]) +local lock = ARGV[3] local function current_time() local time = redis.call('time') @@ -49,10 +50,15 @@ redis.call('DEL', grabbed_key) redis.call('DEL', available_key) redis.call('RPUSH', available_key, job_id) -if expiration then - redis.call('EXPIRE', available_key, expiration) - redis.call('EXPIRE', exists_key, expiration) - redis.call('EXPIRE', grabbed_key, expiration) +-- The client should only set ttl for until_expired +-- The server should set ttl for all other jobs +if lock == "until_expired" and ttl then + -- We can't keep the key here because it will otherwise never be deleted + redis.call('SREM', unique_keys, unique_digest) + + redis.call('EXPIRE', available_key, ttl) + redis.call('EXPIRE', exists_key, ttl) + redis.call('EXPIRE', unique_digest, ttl) end return job_id diff --git a/redis/unlock.lua b/redis/unlock.lua index 253bbeaca..cafe15258 100644 --- a/redis/unlock.lua +++ b/redis/unlock.lua @@ -5,19 +5,19 @@ local version_key = KEYS[4] local unique_keys = KEYS[5] local unique_digest = KEYS[6] -- TODO: Legacy support (Remove in v6.1) -local token = ARGV[1] -local expiration = tonumber(ARGV[2]) +local token = ARGV[1] +local ttl = tonumber(ARGV[2]) +local lock = ARGV[3] -redis.call('HDEL', grabbed_key, token) redis.call('SREM', unique_keys, unique_digest) -if expiration then - redis.log(redis.LOG_DEBUG, "signal_locks.lua - expiring stale locks") +if ttl then redis.call('SREM', unique_keys, unique_digest) - redis.call('EXPIRE', exists_key, expiration) - redis.call('EXPIRE', available_key, expiration) - redis.call('EXPIRE', version_key, expiration) -- TODO: Legacy support (Remove in v6.1) - redis.call('EXPIRE', unique_digest, expiration) -- TODO: Legacy support (Remove in v6.1) + redis.call('EXPIRE', exists_key, ttl) + redis.call('EXPIRE', grabbed_key, ttl) + redis.call('EXPIRE', available_key, ttl) + redis.call('EXPIRE', version_key, ttl) -- TODO: Legacy support (Remove in v6.1) + redis.call('EXPIRE', unique_digest, ttl) -- TODO: Legacy support (Remove in v6.1) else redis.call('DEL', exists_key) redis.call('SREM', unique_keys, unique_digest) @@ -28,6 +28,7 @@ else redis.call('DEL', unique_digest) -- TODO: Legacy support (Remove in v6.1) end +redis.call('HDEL', grabbed_key, token) local count = redis.call('LPUSH', available_key, token) redis.call('EXPIRE', available_key, 5) return count diff --git a/spec/examples/expiring_job_spec.rb b/spec/examples/expiring_job_spec.rb index fdd65e1e0..4748e73b2 100644 --- a/spec/examples/expiring_job_spec.rb +++ b/spec/examples/expiring_job_spec.rb @@ -39,15 +39,6 @@ expect(1).to be_enqueued_in("customqueue2") end end - - it "sets keys to expire as per configuration" do - lock_expiration = described_class.get_sidekiq_options["lock_expiration"] - unique_keys.each do |key| - next if key.end_with?(":GRABBED") - - expect(ttl(key)).to be_within(1).of(lock_expiration + 60) - end - end end context "when job is pushed" do @@ -74,15 +65,6 @@ expect(1).to be_enqueued_in("customqueue2") end end - - it "sets keys to expire as per configuration" do - lock_expiration = described_class.get_sidekiq_options["lock_expiration"] - unique_keys.each do |key| - next if key.end_with?(":GRABBED") - - expect(ttl(key)).to be_within(1).of(lock_expiration) - end - end end end end diff --git a/spec/examples/until_expired_job_spec.rb b/spec/examples/until_expired_job_spec.rb index 2ff813ba0..7b729ff90 100644 --- a/spec/examples/until_expired_job_spec.rb +++ b/spec/examples/until_expired_job_spec.rb @@ -16,4 +16,74 @@ it_behaves_like "a performing worker" do let(:args) { "one" } end + + describe "client middleware" do + context "when job is delayed" do + before { described_class.perform_in(60, 1, 2) } + + it "rejects new scheduled jobs" do + expect(1).to be_enqueued_in("customqueue") + described_class.perform_in(3600, 1, 2) + expect(1).to be_enqueued_in("customqueue") + expect(1).to be_scheduled_at(Time.now.to_f + 2 * 60) + end + + it "rejects new jobs" do + described_class.perform_async(1, 2) + expect(1).to be_enqueued_in("customqueue") + end + + it "allows duplicate messages to different queues" do + expect(1).to be_enqueued_in("customqueue2") + with_sidekiq_options_for(described_class, queue: "customqueue2") do + described_class.perform_async(1, 2) + expect(1).to be_enqueued_in("customqueue2") + end + end + + it "sets keys to expire as per configuration" do + lock_expiration = described_class.get_sidekiq_options["lock_expiration"] + unique_keys.each do |key| + next if key.include?(":GRABBED") + + expect(ttl(key)).to be_within(1).of(lock_expiration + 60) + end + end + end + + context "when job is pushed" do + before { described_class.perform_async(1, 2) } + + it "rejects new scheduled jobs" do + expect(1).to be_enqueued_in("customqueue") + described_class.perform_in(60, 1, 2) + expect(1).to be_enqueued_in("customqueue") + expect(0).to be_scheduled_at(Time.now.to_f + 2 * 60) + end + + it "rejects new jobs" do + expect(1).to be_enqueued_in("customqueue") + described_class.perform_async(1, 2) + expect(1).to be_enqueued_in("customqueue") + end + + it "allows duplicate messages to different queues" do + expect(1).to be_enqueued_in("customqueue") + expect(0).to be_enqueued_in("customqueue2") + with_sidekiq_options_for(described_class, queue: "customqueue2") do + described_class.perform_async(1, 2) + expect(1).to be_enqueued_in("customqueue2") + end + end + + it "sets keys to expire as per configuration" do + lock_expiration = described_class.get_sidekiq_options["lock_expiration"] + unique_keys.each do |key| + next if key.include?(":GRABBED") + + expect(ttl(key)).to be_within(1).of(lock_expiration) + end + end + end + end end diff --git a/spec/integration/sidekiq/retry_set_spec.rb b/spec/integration/sidekiq/retry_set_spec.rb index 2e2a26c69..17f267e24 100644 --- a/spec/integration/sidekiq/retry_set_spec.rb +++ b/spec/integration/sidekiq/retry_set_spec.rb @@ -47,7 +47,7 @@ ]) end - specify { expect(ttl("#{unique_digest}:EXISTS")).to eq(lock_expiration) } + specify { expect(ttl("#{unique_digest}:EXISTS")).to eq(-1) } specify { expect(ttl("#{unique_digest}:GRABBED")).to eq(-1) } it "can be put back on queue" do diff --git a/spec/integration/sidekiq_unique_jobs/client/middleware_spec.rb b/spec/integration/sidekiq_unique_jobs/client/middleware_spec.rb index 2d11b620e..bb0389462 100644 --- a/spec/integration/sidekiq_unique_jobs/client/middleware_spec.rb +++ b/spec/integration/sidekiq_unique_jobs/client/middleware_spec.rb @@ -173,9 +173,7 @@ def self.do_it(_one) MyUniqueJob.perform_in(expected_expires_at, "mika", "hel") unique_keys.each do |key| - next if key.end_with?(":GRABBED") - - expect(ttl(key)).to be_within(10).of(8_099) + expect(ttl(key)).to eq(-1) end end diff --git a/spec/integration/sidekiq_unique_jobs/locksmith_spec.rb b/spec/integration/sidekiq_unique_jobs/locksmith_spec.rb index 263db6cf1..5775b69ef 100644 --- a/spec/integration/sidekiq_unique_jobs/locksmith_spec.rb +++ b/spec/integration/sidekiq_unique_jobs/locksmith_spec.rb @@ -8,12 +8,14 @@ let(:jid_one) { "maaaahjid" } let(:jid_two) { "jidmayhem" } let(:lock_expiration) { nil } + let(:lock_type) { "until_executed" } let(:unique_digest) { "uniquejobs:randomvalue" } let(:item_one) do { "jid" => jid_one, "unique_digest" => unique_digest, "lock_expiration" => lock_expiration, + "lock" => lock_type, } end let(:item_two) { item_one.merge("jid" => jid_two) } @@ -111,40 +113,64 @@ describe "lock with expiration" do let(:lock_expiration) { 3 } + let(:lock_type) { :while_executing } it_behaves_like "a lock" - it "creates the expected keys" do - locksmith_one.lock + context "when lock_type is until_expired" do + let(:lock_type) { :until_expired } - expect(ttl("uniquejobs:randomvalue:EXISTS")).to eq(3) + it "prevents other processes from locking" do + locksmith_one.lock - # PLEASE keep this spec. It verifies that the next lock - # doesn't persist the exist_key of another lock - sleep 1 + expect(ttl("uniquejobs:randomvalue:EXISTS")).to eq(3) - expect(ttl("uniquejobs:randomvalue:EXISTS")).to eq(2) - expect(locksmith_two.lock(0)).to eq(nil) - expect(ttl("uniquejobs:randomvalue:EXISTS")).to eq(2) + # PLEASE keep this spec. It verifies that the next lock + # doesn't persist the exist_key of another lock + sleep 1 - expect(unique_digests).to match_array(["uniquejobs:randomvalue"]) - expect(unique_keys).to match_array(%w[ - uniquejobs:randomvalue:EXISTS - uniquejobs:randomvalue:GRABBED - ]) + expect(ttl("uniquejobs:randomvalue:EXISTS")).to eq(2) + expect(locksmith_two.lock(0)).to eq(nil) + expect(ttl("uniquejobs:randomvalue:EXISTS")).to eq(2) + + expect(unique_digests).to match_array([]) + expect(unique_keys).to match_array(%w[ + uniquejobs:randomvalue:EXISTS + uniquejobs:randomvalue:GRABBED + ]) + end + + it "expires the expected keys" do + locksmith_one.lock + expect(unique_digests).to match_array([]) + expect(unique_keys).to match_array(%w[ + uniquejobs:randomvalue:EXISTS + uniquejobs:randomvalue:GRABBED + ]) + + expect(ttl("uniquejobs:randomvalue:EXISTS")).to eq(3) + expect(ttl("uniquejobs:randomvalue:GRABBED")).to eq(3) + end end - it "expires the expected keys" do - locksmith_one.lock - expect(unique_digests).to match_array(["uniquejobs:randomvalue"]) - expect(unique_keys).to match_array(%w[ - uniquejobs:randomvalue:EXISTS - uniquejobs:randomvalue:GRABBED - ]) - locksmith_one.unlock + context "when lock_type is anything else than until_expired" do + let(:lock_type) { :until_executed } - expect(unique_digests).to match_array([]) - expect(ttl("uniquejobs:randomvalue:EXISTS")).to eq(3) + it "expires the expected keys" do + locksmith_one.lock + expect(unique_digests).to match_array(["uniquejobs:randomvalue"]) + expect(unique_keys).to match_array(%w[ + uniquejobs:randomvalue:EXISTS + uniquejobs:randomvalue:GRABBED + ]) + expect(ttl("uniquejobs:randomvalue:EXISTS")).to eq(-1) + expect(ttl("uniquejobs:randomvalue:GRABBED")).to eq(-1) + + locksmith_one.unlock + + expect(ttl("uniquejobs:randomvalue:EXISTS")).to eq(3) + expect(ttl("uniquejobs:randomvalue:GRABBED")).to eq(-2) + end end it "deletes the expected keys" do