Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Until and while executing reschedule #324

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions lib/sidekiq_unique_jobs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
require 'sidekiq_unique_jobs/lock/until_expired'
require 'sidekiq_unique_jobs/lock/while_executing'
require 'sidekiq_unique_jobs/lock/while_executing_reject'
require 'sidekiq_unique_jobs/lock/while_executing_reschedule'
require 'sidekiq_unique_jobs/lock/until_and_while_executing'
require 'sidekiq_unique_jobs/lock/until_and_while_executing_reschedule'
require 'sidekiq_unique_jobs/options_with_fallback'
require 'sidekiq_unique_jobs/middleware'
require 'sidekiq_unique_jobs/sidekiq_unique_ext'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# frozen_string_literal: true

module SidekiqUniqueJobs
class Lock
# Locks jobs while the job is executing in the server process
# - Locks on perform_in or perform_async (see {UntilExecuting})
# - Unlocks before yielding to the worker's perform method (see {UntilExecuting})
# - Locks before yielding to the worker's perform method (see {WhileExecuting})
# - Unlocks after yielding to the worker's perform method (see {WhileExecuting})
#
# See {#lock} for more information about the client.
# See {#execute} for more information about the server
#
# @author Maciej Mucha <maciej@northpass.com>
class UntilAndWhileExecutingReschedule < BaseLock
# Executes in the Sidekiq server process
# @yield to the worker class perform method
def execute
return unless locked?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mfly I assume this is here because it's in UntilAndWhileExecuting. @marclennox could you provide any additional context for why this line is needed? I looked at #277 which added it but don't really understand.

I'm asking because this line appears to break Sidekiq Pro's super_fetch reliability feature. When I start a job using this lock strategy and then kill the worker suddenly with kill -9, super_fetch will see that the job died and try to re-enqueue it but won't be able to because of this line. When I remove this return unless locked? line it works as I expect.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest I haven't worked with this library in many years and have lost any context in the deep recesses of my brain... apologies.

Copy link

@JacobEvelyn JacobEvelyn May 15, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No worries! I actually think I meant to tag @mhenrixon for that question. Will something break if I remove this line?

unlock

runtime_lock.execute { yield }
end

def runtime_lock
@runtime_lock ||= SidekiqUniqueJobs::Lock::WhileExecutingReschedule.new(item, callback, redis_pool)
end
end
end
end
29 changes: 29 additions & 0 deletions lib/sidekiq_unique_jobs/lock/while_executing_reschedule.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# frozen_string_literal: true

module SidekiqUniqueJobs
class Lock
# Locks jobs while executing
# Locks from the server process
# Unlocks after the server is done processing
#
# See {#lock} for more information about the client.
# See {#execute} for more information about the server
#
# @author Maciej Mucha <maciej@northpass.com>
class WhileExecutingReschedule < WhileExecuting
# Executes in the Sidekiq server process
# @yield to the worker class perform method
def execute
return strategy.call unless locksmith.lock(item[LOCK_TIMEOUT_KEY])

with_cleanup { yield }
end

# Overridden with a forced {OnConflict::Reschedule} strategy
# @return [OnConflict::Reschedule] a reschedule strategy
def strategy
@strategy ||= OnConflict.find_strategy(:reschedule).new(item)
end
end
end
end
2 changes: 2 additions & 0 deletions lib/sidekiq_unique_jobs/options_with_fallback.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ module SidekiqUniqueJobs
# @author Mikael Henriksson <mikael@zoolutions.se>
module OptionsWithFallback
LOCKS = {
until_and_while_executing_reschedule: SidekiqUniqueJobs::Lock::UntilAndWhileExecutingReschedule,
until_and_while_executing: SidekiqUniqueJobs::Lock::UntilAndWhileExecuting,
until_executed: SidekiqUniqueJobs::Lock::UntilExecuted,
until_executing: SidekiqUniqueJobs::Lock::UntilExecuting,
until_expired: SidekiqUniqueJobs::Lock::UntilExpired,
until_timeout: SidekiqUniqueJobs::Lock::UntilExpired,
while_executing: SidekiqUniqueJobs::Lock::WhileExecuting,
while_executing_reject: SidekiqUniqueJobs::Lock::WhileExecutingReject,
while_executing_reschedule: SidekiqUniqueJobs::Lock::WhileExecutingReschedule,
}.freeze

def self.included(base)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# frozen_string_literal: true

require 'spec_helper'

RSpec.describe SidekiqUniqueJobs::Lock::UntilAndWhileExecutingReschedule do
include_context 'with a stubbed locksmith'
let(:lock) { described_class.new(item, callback) }
let(:callback) { -> {} }
let(:item) do
{
'jid' => 'maaaahjid',
'class' => 'UntilAndWhileExecutingJob',
'lock' => 'until_and_while_executing',
'args' => ['one'],
}
end

describe '#execute' do
let(:runtime_lock) { instance_spy(SidekiqUniqueJobs::Lock::WhileExecutingReschedule) }

before do
allow(lock).to receive(:locked?).and_return(locked?)
allow(lock).to receive(:unlock).and_return(true)
allow(lock).to receive(:runtime_lock).and_return(runtime_lock)
allow(runtime_lock).to receive(:execute).and_yield
end

context 'when locked?' do
let(:locked?) { true }

it 'unlocks the unique key before yielding' do
inside_block_value = false

lock.execute { inside_block_value = true }
expect(inside_block_value).to eq(true)

expect(lock).to have_received(:locked?)
expect(lock).to have_received(:unlock)
expect(runtime_lock).to have_received(:execute)
end
end

context 'when not locked?' do
let(:locked?) { false }

it 'unlocks the unique key before yielding' do
inside_block_value = false
lock.execute { inside_block_value = true }
expect(inside_block_value).to eq(false)

expect(lock).to have_received(:locked?)
expect(lock).not_to have_received(:unlock)
expect(runtime_lock).not_to have_received(:execute)
end
end
end

describe '#runtime_lock' do
subject(:runtime_lock) { lock.runtime_lock }

it { is_expected.to be_a(SidekiqUniqueJobs::Lock::WhileExecutingReschedule) }

it 'initializes with the right arguments' do
allow(SidekiqUniqueJobs::Lock::WhileExecutingReschedule).to receive(:new)
runtime_lock

expect(SidekiqUniqueJobs::Lock::WhileExecutingReschedule)
.to have_received(:new)
.with(item, callback, redis_pool)
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# frozen_string_literal: true

require 'spec_helper'

RSpec.describe SidekiqUniqueJobs::Lock::WhileExecutingReschedule do
include_context 'with a stubbed locksmith'
let(:lock) { described_class.new(item, callback) }
let(:callback) { -> {} }
let(:item) do
{ 'jid' => 'maaaahjid',
'class' => 'WhileExecutingRejectJob',
'lock' => 'while_executing_reject',
'args' => [%w[array of arguments]] }
end

before do
allow(lock).to receive(:unlock)
end

describe '#lock' do
subject { lock.lock }

it { is_expected.to eq(true) }
end

describe '#execute' do
subject(:execute) { lock.execute {} }

let(:token) { nil }

before do
allow(locksmith).to receive(:lock).with(0).and_return(token)
allow(lock).to receive(:with_cleanup).and_yield
end

context 'when lock succeeds' do
let(:token) { 'a token' }

it 'processes the job' do
execute
expect(lock).to have_received(:with_cleanup)
end
end

context 'when lock fails' do
let(:token) { nil }

it 'reschedules the job' do
expect { execute }.to change { schedule_count }.by(1)

expect(lock).not_to have_received(:with_cleanup)
end
end
end
end