diff --git a/Gemfile.lock b/Gemfile.lock index df034d83..86f1b490 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -171,6 +171,7 @@ GEM PLATFORMS arm64-darwin-22 + arm64-darwin-23 x86_64-darwin-21 x86_64-darwin-23 x86_64-linux diff --git a/app/models/solid_queue/semaphore.rb b/app/models/solid_queue/semaphore.rb index 34bf8dc0..80f53fd9 100644 --- a/app/models/solid_queue/semaphore.rb +++ b/app/models/solid_queue/semaphore.rb @@ -17,6 +17,17 @@ def signal(job) def signal_all(jobs) Proxy.signal_all(jobs) end + + # Requires a unique index on key + def create_unique_by(attributes) + if connection.supports_insert_conflict_target? + insert({ **attributes }, unique_by: :key).any? + else + create!(**attributes) + end + rescue ActiveRecord::RecordNotUnique + false + end end class Proxy @@ -41,18 +52,19 @@ def signal end private + attr_accessor :job def attempt_creation - Semaphore.create!(key: key, value: limit - 1, expires_at: expires_at) - true - rescue ActiveRecord::RecordNotUnique - if limit == 1 then false + if Semaphore.create_unique_by(key: key, value: limit - 1, expires_at: expires_at) + true else - attempt_decrement + check_limit_or_decrement end end + def check_limit_or_decrement = limit == 1 ? false : attempt_decrement + def attempt_decrement Semaphore.available.where(key: key).update_all([ "value = value - 1, expires_at = ?", expires_at ]) > 0 end diff --git a/test/integration/concurrency_controls_test.rb b/test/integration/concurrency_controls_test.rb index 11ddcbe2..da8d5e38 100644 --- a/test/integration/concurrency_controls_test.rb +++ b/test/integration/concurrency_controls_test.rb @@ -183,7 +183,22 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase assert job.reload.ready? end + test "verify transactions remain valid after Job creation conflicts via limits_concurrency" do + ActiveRecord::Base.transaction do + SequentialUpdateResultJob.perform_later(@result, name: "A", pause: 0.2.seconds) + SequentialUpdateResultJob.perform_later(@result, name: "B") + + begin + assert_equal 2, SolidQueue::Job.count + assert true, "Transaction state valid" + rescue ActiveRecord::StatementInvalid + assert false, "Transaction state unexpectedly invalid" + end + end + end + private + def assert_stored_sequence(result, *sequences) expected = sequences.map { |sequence| "seq: " + sequence.map { |name| "s#{name}c#{name}" }.join } skip_active_record_query_cache do