Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce leftover keys #374

Merged
merged 3 commits into from
Feb 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions lib/sidekiq_unique_jobs/locksmith.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ module SidekiqUniqueJobs
# Lock manager class that handles all the various locks
#
# @author Mikael Henriksson <mikael@zoolutions.se>
# rubocop:disable ClassLength
class Locksmith
include SidekiqUniqueJobs::Connection

Expand All @@ -14,9 +15,11 @@ class Locksmith
# @param [Sidekiq::RedisConnection, ConnectionPool] redis_pool the redis connection
def initialize(item, redis_pool = nil)
@concurrency = 1 # removed in a0cff5bc42edbe7190d6ede7e7f845074d2d7af6
@expiration = item[LOCK_EXPIRATION_KEY]
@ttl = item[LOCK_EXPIRATION_KEY]
@jid = item[JID_KEY]
@unique_digest = item[UNIQUE_DIGEST_KEY]
@lock_type = item[LOCK_KEY]
@lock_type &&= @lock_type.to_sym
@redis_pool = redis_pool
end

Expand All @@ -34,14 +37,14 @@ def available_count
redis(redis_pool) { |conn| conn.llen(available_key) }
end

# Deletes the lock unless it has an expiration set
# Deletes the lock unless it has a ttl set
def delete
return if expiration
return if ttl

delete!
end

# Deletes the lock regardless of if it has an expiration set
# Deletes the lock regardless of if it has a ttl set
def delete!
Scripts.call(
:delete,
Expand All @@ -58,7 +61,7 @@ def delete!
def lock(timeout = nil, &block)
Scripts.call(:lock, redis_pool,
keys: [exists_key, grabbed_key, available_key, UNIQUE_SET, unique_digest],
argv: [jid, expiration])
argv: [jid, ttl, lock_type])

grab_token(timeout) do |token|
touch_grabbed_token(token)
Expand Down Expand Up @@ -87,7 +90,7 @@ def unlock!(token = nil)
:unlock,
redis_pool,
keys: [exists_key, grabbed_key, available_key, version_key, UNIQUE_SET, unique_digest],
argv: [token, expiration],
argv: [token, ttl, lock_type],
)
end

Expand All @@ -102,7 +105,7 @@ def locked?(token = nil)

private

attr_reader :concurrency, :unique_digest, :expiration, :jid, :redis_pool
attr_reader :concurrency, :unique_digest, :ttl, :jid, :redis_pool, :lock_type

def grab_token(timeout = nil)
redis(redis_pool) do |conn|
Expand All @@ -118,7 +121,10 @@ def grab_token(timeout = nil)
end

def touch_grabbed_token(token)
redis(redis_pool) { |conn| conn.hset(grabbed_key, token, current_time.to_f) }
redis(redis_pool) do |conn|
conn.hset(grabbed_key, token, current_time.to_f)
conn.expire(grabbed_key, ttl) if ttl && lock_type == :until_expired
end
end

def return_token_or_block_value(token)
Expand Down Expand Up @@ -161,4 +167,5 @@ def redis_time
redis(&:time)
end
end
# rubocop:enable ClassLength
end
4 changes: 2 additions & 2 deletions lib/sidekiq_unique_jobs/logging.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ def log_fatal(message_or_exception = nil, &block)
end

def logging_context(middleware_class, job_hash)
digest = job_hash["unique_digest"]
if defined?(Sidekiq::Logging)
digest = job_hash["unique_digest"]
"#{middleware_class} #{"DIG-#{digest}" if digest}"
else
{ middleware: middleware_class, unique_digest: job_hash["unique_digest"] }
{ middleware: middleware_class, unique_digest: digest }
end
end
end
Expand Down
1 change: 1 addition & 0 deletions lib/sidekiq_unique_jobs/sidekiq_unique_jobs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def logger
config.logger
end

# :reek:ManualDispatch
def with_context(context, &block)
if logger.respond_to?(:with_context)
logger.with_context(context, &block)
Expand Down
18 changes: 12 additions & 6 deletions redis/lock.lua
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ local available_key = KEYS[3]
local unique_keys = KEYS[4]
local unique_digest = KEYS[5]

local job_id = ARGV[1]
local expiration = tonumber(ARGV[2])
local job_id = ARGV[1]
local ttl = tonumber(ARGV[2])
local lock = ARGV[3]

local function current_time()
local time = redis.call('time')
Expand Down Expand Up @@ -49,10 +50,15 @@ redis.call('DEL', grabbed_key)
redis.call('DEL', available_key)
redis.call('RPUSH', available_key, job_id)

if expiration then
redis.call('EXPIRE', available_key, expiration)
redis.call('EXPIRE', exists_key, expiration)
redis.call('EXPIRE', grabbed_key, expiration)
-- The client should only set ttl for until_expired
-- The server should set ttl for all other jobs
if lock == "until_expired" and ttl then
-- We can't keep the key here because it will otherwise never be deleted
redis.call('SREM', unique_keys, unique_digest)

redis.call('EXPIRE', available_key, ttl)
redis.call('EXPIRE', exists_key, ttl)
redis.call('EXPIRE', unique_digest, ttl)
end

return job_id
19 changes: 10 additions & 9 deletions redis/unlock.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,19 @@ local version_key = KEYS[4]
local unique_keys = KEYS[5]
local unique_digest = KEYS[6] -- TODO: Legacy support (Remove in v6.1)

local token = ARGV[1]
local expiration = tonumber(ARGV[2])
local token = ARGV[1]
local ttl = tonumber(ARGV[2])
local lock = ARGV[3]

redis.call('HDEL', grabbed_key, token)
redis.call('SREM', unique_keys, unique_digest)

if expiration then
redis.log(redis.LOG_DEBUG, "signal_locks.lua - expiring stale locks")
if ttl then
redis.call('SREM', unique_keys, unique_digest)
redis.call('EXPIRE', exists_key, expiration)
redis.call('EXPIRE', available_key, expiration)
redis.call('EXPIRE', version_key, expiration) -- TODO: Legacy support (Remove in v6.1)
redis.call('EXPIRE', unique_digest, expiration) -- TODO: Legacy support (Remove in v6.1)
redis.call('EXPIRE', exists_key, ttl)
redis.call('EXPIRE', grabbed_key, ttl)
redis.call('EXPIRE', available_key, ttl)
redis.call('EXPIRE', version_key, ttl) -- TODO: Legacy support (Remove in v6.1)
redis.call('EXPIRE', unique_digest, ttl) -- TODO: Legacy support (Remove in v6.1)
else
redis.call('DEL', exists_key)
redis.call('SREM', unique_keys, unique_digest)
Expand All @@ -28,6 +28,7 @@ else
redis.call('DEL', unique_digest) -- TODO: Legacy support (Remove in v6.1)
end

redis.call('HDEL', grabbed_key, token)
local count = redis.call('LPUSH', available_key, token)
redis.call('EXPIRE', available_key, 5)
return count
Expand Down
18 changes: 0 additions & 18 deletions spec/examples/expiring_job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,6 @@
expect(1).to be_enqueued_in("customqueue2")
end
end

it "sets keys to expire as per configuration" do
lock_expiration = described_class.get_sidekiq_options["lock_expiration"]
unique_keys.each do |key|
next if key.end_with?(":GRABBED")

expect(ttl(key)).to be_within(1).of(lock_expiration + 60)
end
end
end

context "when job is pushed" do
Expand All @@ -74,15 +65,6 @@
expect(1).to be_enqueued_in("customqueue2")
end
end

it "sets keys to expire as per configuration" do
lock_expiration = described_class.get_sidekiq_options["lock_expiration"]
unique_keys.each do |key|
next if key.end_with?(":GRABBED")

expect(ttl(key)).to be_within(1).of(lock_expiration)
end
end
end
end
end
70 changes: 70 additions & 0 deletions spec/examples/until_expired_job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,74 @@
it_behaves_like "a performing worker" do
let(:args) { "one" }
end

describe "client middleware" do
context "when job is delayed" do
before { described_class.perform_in(60, 1, 2) }

it "rejects new scheduled jobs" do
expect(1).to be_enqueued_in("customqueue")
described_class.perform_in(3600, 1, 2)
expect(1).to be_enqueued_in("customqueue")
expect(1).to be_scheduled_at(Time.now.to_f + 2 * 60)
end

it "rejects new jobs" do
described_class.perform_async(1, 2)
expect(1).to be_enqueued_in("customqueue")
end

it "allows duplicate messages to different queues" do
expect(1).to be_enqueued_in("customqueue2")
with_sidekiq_options_for(described_class, queue: "customqueue2") do
described_class.perform_async(1, 2)
expect(1).to be_enqueued_in("customqueue2")
end
end

it "sets keys to expire as per configuration" do
lock_expiration = described_class.get_sidekiq_options["lock_expiration"]
unique_keys.each do |key|
next if key.include?(":GRABBED")

expect(ttl(key)).to be_within(1).of(lock_expiration + 60)
end
end
end

context "when job is pushed" do
before { described_class.perform_async(1, 2) }

it "rejects new scheduled jobs" do
expect(1).to be_enqueued_in("customqueue")
described_class.perform_in(60, 1, 2)
expect(1).to be_enqueued_in("customqueue")
expect(0).to be_scheduled_at(Time.now.to_f + 2 * 60)
end

it "rejects new jobs" do
expect(1).to be_enqueued_in("customqueue")
described_class.perform_async(1, 2)
expect(1).to be_enqueued_in("customqueue")
end

it "allows duplicate messages to different queues" do
expect(1).to be_enqueued_in("customqueue")
expect(0).to be_enqueued_in("customqueue2")
with_sidekiq_options_for(described_class, queue: "customqueue2") do
described_class.perform_async(1, 2)
expect(1).to be_enqueued_in("customqueue2")
end
end

it "sets keys to expire as per configuration" do
lock_expiration = described_class.get_sidekiq_options["lock_expiration"]
unique_keys.each do |key|
next if key.include?(":GRABBED")

expect(ttl(key)).to be_within(1).of(lock_expiration)
end
end
end
end
end
2 changes: 1 addition & 1 deletion spec/integration/sidekiq/retry_set_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
])
end

specify { expect(ttl("#{unique_digest}:EXISTS")).to eq(lock_expiration) }
specify { expect(ttl("#{unique_digest}:EXISTS")).to eq(-1) }
specify { expect(ttl("#{unique_digest}:GRABBED")).to eq(-1) }

it "can be put back on queue" do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,7 @@ def self.do_it(_one)
MyUniqueJob.perform_in(expected_expires_at, "mika", "hel")

unique_keys.each do |key|
next if key.end_with?(":GRABBED")

expect(ttl(key)).to be_within(10).of(8_099)
expect(ttl(key)).to eq(-1)
end
end

Expand Down
Loading