diff --git a/app/models/solid_queue/semaphore.rb b/app/models/solid_queue/semaphore.rb index b34cdd81..515c4f08 100644 --- a/app/models/solid_queue/semaphore.rb +++ b/app/models/solid_queue/semaphore.rb @@ -17,6 +17,17 @@ def signal(job) def signal_all(jobs) Proxy.signal_all(jobs) end + + # Requires a unique index on key + def create_unique_by(attributes) + if connection.supports_insert_conflict_target? + insert({ **attributes }, unique_by: :key).any? + else + create!(**attributes) + end + rescue ActiveRecord::RecordNotUnique + false + end end class Proxy @@ -44,35 +55,22 @@ def signal attr_accessor :job - def attempt_creation_with_insert_on_conflict - results = Semaphore.insert({ key: key, value: limit - 1, expires_at: expires_at }, unique_by: :key) - - if results.length.zero? - limit == 1 ? false : attempt_decrement - else + def attempt_creation + if Semaphore.create_unique_by(key: key, value: limit - 1, expires_at: expires_at) true + else + check_limit_or_decrement end end - def attempt_creation_with_create_and_exception_handling - Semaphore.create!(key: key, value: limit - 1, expires_at: expires_at) - true - rescue ActiveRecord::RecordNotUnique - limit == 1 ? false : attempt_decrement - end - - if ActiveRecord::Base.connection.adapter_name == "PostgreSQL" - alias attempt_creation attempt_creation_with_insert_on_conflict - else - alias attempt_creation attempt_creation_with_create_and_exception_handling - end + def check_limit_or_decrement = limit == 1 ? false : attempt_decrement def attempt_decrement - Semaphore.available.where(key: key).update_all([ "value = value - 1, expires_at = ?", expires_at ]) > 0 + Semaphore.available.where(key: key).update_all(["value = value - 1, expires_at = ?", expires_at]) > 0 end def attempt_increment - Semaphore.where(key: key, value: ...limit).update_all([ "value = value + 1, expires_at = ?", expires_at ]) > 0 + Semaphore.where(key: key, value: ...limit).update_all(["value = value + 1, expires_at = ?", expires_at]) > 0 end def key diff --git a/test/integration/concurrency_controls_test.rb b/test/integration/concurrency_controls_test.rb index 98c317a6..5e696faa 100644 --- a/test/integration/concurrency_controls_test.rb +++ b/test/integration/concurrency_controls_test.rb @@ -83,7 +83,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase # C would have started in the beginning, seeing the status empty, and would finish after # all other jobs, so it'll do the last update with only itself - assert_stored_sequence(@result, [ "C" ]) + assert_stored_sequence(@result, ["C"]) end test "run several jobs over the same record sequentially, with some of them failing" do @@ -99,7 +99,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase wait_for_jobs_to_finish_for(3.seconds) assert_equal 3, SolidQueue::FailedExecution.count - assert_stored_sequence @result, [ "B", "D", "F" ] + ("G".."K").to_a + assert_stored_sequence @result, ["B", "D", "F"] + ("G".."K").to_a end test "rely on dispatcher to unblock blocked executions with an available semaphore" do @@ -133,7 +133,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase # We can't ensure the order between B and C, because it depends on which worker wins when # unblocking, as one will try to unblock B and another C - assert_stored_sequence @result, ("A".."K").to_a, [ "A", "C", "B" ] + ("D".."K").to_a + assert_stored_sequence @result, ("A".."K").to_a, ["A", "C", "B"] + ("D".."K").to_a end test "rely on dispatcher to unblock blocked executions with an expired semaphore" do @@ -165,7 +165,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase # We can't ensure the order between B and C, because it depends on which worker wins when # unblocking, as one will try to unblock B and another C - assert_stored_sequence @result, ("A".."K").to_a, [ "A", "C", "B" ] + ("D".."K").to_a + assert_stored_sequence @result, ("A".."K").to_a, ["A", "C", "B"] + ("D".."K").to_a end test "don't block claimed executions that get released" do @@ -183,44 +183,20 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase assert job.reload.ready? end - if ActiveRecord::Base.connection.adapter_name == "PostgreSQL" - test "insert_with_unique_by_has_same_database_results_as_create!_with_exception_handling" do - key = "key", limit = 1, expires_at = 1.minute.from_now + test "verify transactions remain valid after Job creation conflicts via limits_concurrency" do + ActiveRecord::Base.transaction do + SequentialUpdateResultJob.perform_later(@result, name: "A", pause: 0.2.seconds) + SequentialUpdateResultJob.perform_later(@result, name: "B") - assert SolidQueue::Semaphore.count == 0 - - SolidQueue::Semaphore.insert({ key: key, value: limit - 1, expires_at: expires_at }, unique_by: :key) - assert SolidQueue::Semaphore.count == 1 - - SolidQueue::Semaphore.insert({ key: key, value: limit - 1, expires_at: expires_at }, unique_by: :key) - assert SolidQueue::Semaphore.count == 1 - - SolidQueue::Semaphore.delete_all - assert SolidQueue::Semaphore.count == 0 - - SolidQueue::Semaphore.create!(key: key, value: limit - 1, expires_at: expires_at) - assert SolidQueue::Semaphore.count == 1 - - assert_raises ActiveRecord::RecordNotUnique do - SolidQueue::Semaphore.create!(key: key, value: limit - 1, expires_at: expires_at) + begin + assert_equal 2, SolidQueue::Job.count + assert true, "Transaction state valid" + rescue ActiveRecord::StatementInvalid + assert false, "Transaction state unexpectedly invalid" end end end - test "confirm correct version of attempt_creation by database adaptor" do - proxy = SolidQueue::Semaphore::Proxy.new(true) - - aliased_method = proxy.method(:attempt_creation) - - if ActiveRecord::Base.connection.adapter_name == "PostgreSQL" - original_method = proxy.method(:attempt_creation_with_insert_on_conflict) - else - original_method = proxy.method(:attempt_creation_with_create_and_exception_handling) - end - - assert_equal original_method.name, aliased_method.original_name, "The alias maps to the correct original method" - end - private def assert_stored_sequence(result, *sequences)