Skip to content

Commit

Permalink
Don't go through the general dispatch flow when releasing claimed exe…
Browse files Browse the repository at this point in the history
…cutions

That's it, don't try to gain the concurrency lock, because claimed executions with
concurrency limits that are released would most likely be holding the semaphore
themselves, as it's released after completing. This means these claimed executions
would go to blocked upon release, leaving the semaphore busy. Just assume that if
a job has a claimed execution, it's because it already gained the lock when going
to ready.
  • Loading branch information
rosa committed Jan 11, 2024
1 parent 3c70451 commit 926601f
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 3 deletions.
2 changes: 1 addition & 1 deletion app/models/solid_queue/claimed_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def perform

def release
transaction do
job.prepare_for_execution
job.dispatch_bypassing_concurrency_limits
destroy!
end
end
Expand Down
4 changes: 4 additions & 0 deletions app/models/solid_queue/job/executable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ def dispatch
end
end

def dispatch_bypassing_concurrency_limits
ready
end

def finished!
if preserve_finished_jobs?
touch(:finished_at)
Expand Down
2 changes: 2 additions & 0 deletions test/dummy/config/environments/test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,6 @@
logger = ActiveSupport::Logger.new(STDOUT)
config.solid_queue.on_thread_error = ->(exception) { logger.error("#{exception.class.name}: #{exception.message}\n#{exception.backtrace.join("\n")}") }
config.solid_queue.logger = ActiveSupport::Logger.new(nil)

config.solid_queue.shutdown_timeout = 2.seconds
end
19 changes: 17 additions & 2 deletions test/integration/concurrency_controls_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
setup do
@result = JobResult.create!(queue_name: "default", status: "seq: ")

default_worker = { queues: "default", polling_interval: 1, processes: 3, threads: 2 }
dispatcher = { polling_interval: 1, batch_size: 200, concurrency_maintenance_interval: 1 }
default_worker = { queues: "default", polling_interval: 0.1, processes: 3, threads: 2 }
dispatcher = { polling_interval: 0.1, batch_size: 200, concurrency_maintenance_interval: 1 }

@pid = run_supervisor_as_fork(mode: :all, load_configuration_from: { workers: [ default_worker ], dispatchers: [ dispatcher ] })

Expand Down Expand Up @@ -167,6 +167,21 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
assert_stored_sequence @result, ("A".."K").to_a, [ "A", "C", "B" ] + ("D".."K").to_a
end

test "don't block claimed executions that get released" do
SequentialUpdateResultJob.perform_later(@result, name: name, pause: SolidQueue.shutdown_timeout + 3.seconds)
job = SolidQueue::Job.last

sleep(0.2)
assert job.claimed?

# This won't leave time to the job to finish
signal_process(@pid, :TERM, wait: 0.1.second)
sleep(SolidQueue.shutdown_timeout + 0.2.seconds)

assert_not job.reload.finished?
assert job.reload.ready?
end

private
def assert_stored_sequence(result, *sequences)
expected = sequences.map { |sequence| "seq: " + sequence.map { |name| "s#{name}c#{name}"}.join }
Expand Down

0 comments on commit 926601f

Please sign in to comment.