From 926601f89802c7f11297966ccd69751d53dfdc1c Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Wed, 10 Jan 2024 20:07:59 +0100 Subject: [PATCH] Don't go through the general dispatch flow when releasing claimed executions That's it, don't try to gain the concurrency lock, because claimed executions with concurrency limits that are released would most likely be holding the semaphore themselves, as it's released after completing. This means these claimed executions would go to blocked upon release, leaving the semaphore busy. Just assume that if a job has a claimed execution, it's because it already gained the lock when going to ready. --- app/models/solid_queue/claimed_execution.rb | 2 +- app/models/solid_queue/job/executable.rb | 4 ++++ test/dummy/config/environments/test.rb | 2 ++ test/integration/concurrency_controls_test.rb | 19 +++++++++++++++++-- 4 files changed, 24 insertions(+), 3 deletions(-) diff --git a/app/models/solid_queue/claimed_execution.rb b/app/models/solid_queue/claimed_execution.rb index 9af5fe38..81c12f09 100644 --- a/app/models/solid_queue/claimed_execution.rb +++ b/app/models/solid_queue/claimed_execution.rb @@ -39,7 +39,7 @@ def perform def release transaction do - job.prepare_for_execution + job.dispatch_bypassing_concurrency_limits destroy! end end diff --git a/app/models/solid_queue/job/executable.rb b/app/models/solid_queue/job/executable.rb index b9205486..4dac9d0b 100644 --- a/app/models/solid_queue/job/executable.rb +++ b/app/models/solid_queue/job/executable.rb @@ -73,6 +73,10 @@ def dispatch end end + def dispatch_bypassing_concurrency_limits + ready + end + def finished! if preserve_finished_jobs? touch(:finished_at) diff --git a/test/dummy/config/environments/test.rb b/test/dummy/config/environments/test.rb index 1fc968f6..cdf9ddba 100644 --- a/test/dummy/config/environments/test.rb +++ b/test/dummy/config/environments/test.rb @@ -51,4 +51,6 @@ logger = ActiveSupport::Logger.new(STDOUT) config.solid_queue.on_thread_error = ->(exception) { logger.error("#{exception.class.name}: #{exception.message}\n#{exception.backtrace.join("\n")}") } config.solid_queue.logger = ActiveSupport::Logger.new(nil) + + config.solid_queue.shutdown_timeout = 2.seconds end diff --git a/test/integration/concurrency_controls_test.rb b/test/integration/concurrency_controls_test.rb index c1ad240e..a7780647 100644 --- a/test/integration/concurrency_controls_test.rb +++ b/test/integration/concurrency_controls_test.rb @@ -7,8 +7,8 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase setup do @result = JobResult.create!(queue_name: "default", status: "seq: ") - default_worker = { queues: "default", polling_interval: 1, processes: 3, threads: 2 } - dispatcher = { polling_interval: 1, batch_size: 200, concurrency_maintenance_interval: 1 } + default_worker = { queues: "default", polling_interval: 0.1, processes: 3, threads: 2 } + dispatcher = { polling_interval: 0.1, batch_size: 200, concurrency_maintenance_interval: 1 } @pid = run_supervisor_as_fork(mode: :all, load_configuration_from: { workers: [ default_worker ], dispatchers: [ dispatcher ] }) @@ -167,6 +167,21 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase assert_stored_sequence @result, ("A".."K").to_a, [ "A", "C", "B" ] + ("D".."K").to_a end + test "don't block claimed executions that get released" do + SequentialUpdateResultJob.perform_later(@result, name: name, pause: SolidQueue.shutdown_timeout + 3.seconds) + job = SolidQueue::Job.last + + sleep(0.2) + assert job.claimed? + + # This won't leave time to the job to finish + signal_process(@pid, :TERM, wait: 0.1.second) + sleep(SolidQueue.shutdown_timeout + 0.2.seconds) + + assert_not job.reload.finished? + assert job.reload.ready? + end + private def assert_stored_sequence(result, *sequences) expected = sequences.map { |sequence| "seq: " + sequence.map { |name| "s#{name}c#{name}"}.join }