Skip to content

Commit

Permalink
Fixes #224.
Browse files Browse the repository at this point in the history
Postgres only Issue:
Jobs that meet all of the following conditions will result in the database connection entering an invalid and
unrecoverable state, requiring a rollback before any further database requests (reads or writes) can be made:

1. Use SolidQueue's 'limits_concurrency' macro
2. Are enqueued inside an application-level transaction
3. The limits_concurrency key already exists in the Semaphore table (i.e., this is job 2 - N for a given key)

SolidQueue uses the following design pattern to implement the conflict detection of the limits_concurrency
macro which works 100% correctly, 100% of the time, with MySQL and SQLite:
  begin
    Semaphore.create!(...)
      no_conflict_path()
    rescue ActiveRecord::RecordNotUnique
      handle_concurrency_conflict()
    end

The problem is Postgres:
It's not possible to rescue and recover from an insert failing due to an conflict on unique index (or any other
database constraint). Until a rollback is executed, the database connection is in an invalid state and unusable.

Possible Solutions:
1. Nested transactions
   + Easiest to implement
   + Portable across all three standard Rails databases
   - Has significant performance issues, especially for databases under load or replicated databases with
     long-running queries (Postgres specific)
   - Requires using Raise and Rescue for flow of control (performance, less than ideal coding practice)
   - Requires issuing a rollback (performance, additional command that must round trip from the client to
     database and back)

2. Insert into the Semaphore table using 'INSERT INTO table (..) VALUES (...) ON CONFLICT DO NOTHING' syntax
   + ANSI standard syntax (not a Postgres 'one off' solution)
   + Rails natively supports identifying database adaptors that support this syntax, making the implementation
     portable and maintainable
   + Supported by Postgres and allows this issue to be addressed
   + Does not require Raise and Rescue for flow of control
   + Performant (native, fast path database functionality)

Solution:
Leverage Rails connection adaptors allowing code to easily identifying supported database features/functionality to
implement strategy #2 (INSERT ON CONFLICT) for those databases that support it (i.e., Postgres) and leave the
original implementation of rescuing RecordNotUnique for those that do not.

Note: Although I'd love to take credit for the "quality" of this implementation, all that credit belongs to @rosa who's
excellent feedback on an earlier implementation resulted in this significantly better implementation.
  • Loading branch information
hms committed Aug 5, 2024
1 parent 1d115b6 commit c817349
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 57 deletions.
38 changes: 18 additions & 20 deletions app/models/solid_queue/semaphore.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,17 @@ def signal(job)
def signal_all(jobs)
Proxy.signal_all(jobs)
end

# Requires a unique index on key
def create_unique_by(attributes)
if connection.supports_insert_conflict_target?
insert({ **attributes }, unique_by: :key).any?
else
create!(**attributes)
end
rescue ActiveRecord::RecordNotUnique
false
end
end

class Proxy
Expand Down Expand Up @@ -44,35 +55,22 @@ def signal

attr_accessor :job

def attempt_creation_with_insert_on_conflict
results = Semaphore.insert({ key: key, value: limit - 1, expires_at: expires_at }, unique_by: :key)

if results.length.zero?
limit == 1 ? false : attempt_decrement
else
def attempt_creation
if Semaphore.create_unique_by(key: key, value: limit - 1, expires_at: expires_at)
true
else
check_limit_or_decrement
end
end

def attempt_creation_with_create_and_exception_handling
Semaphore.create!(key: key, value: limit - 1, expires_at: expires_at)
true
rescue ActiveRecord::RecordNotUnique
limit == 1 ? false : attempt_decrement
end

if ActiveRecord::Base.connection.adapter_name == "PostgreSQL"
alias attempt_creation attempt_creation_with_insert_on_conflict
else
alias attempt_creation attempt_creation_with_create_and_exception_handling
end
def check_limit_or_decrement = limit == 1 ? false : attempt_decrement

def attempt_decrement
Semaphore.available.where(key: key).update_all([ "value = value - 1, expires_at = ?", expires_at ]) > 0
Semaphore.available.where(key: key).update_all(["value = value - 1, expires_at = ?", expires_at]) > 0
end

def attempt_increment
Semaphore.where(key: key, value: ...limit).update_all([ "value = value + 1, expires_at = ?", expires_at ]) > 0
Semaphore.where(key: key, value: ...limit).update_all(["value = value + 1, expires_at = ?", expires_at]) > 0
end

def key
Expand Down
50 changes: 13 additions & 37 deletions test/integration/concurrency_controls_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase

# C would have started in the beginning, seeing the status empty, and would finish after
# all other jobs, so it'll do the last update with only itself
assert_stored_sequence(@result, [ "C" ])
assert_stored_sequence(@result, ["C"])
end

test "run several jobs over the same record sequentially, with some of them failing" do
Expand All @@ -99,7 +99,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
wait_for_jobs_to_finish_for(3.seconds)
assert_equal 3, SolidQueue::FailedExecution.count

assert_stored_sequence @result, [ "B", "D", "F" ] + ("G".."K").to_a
assert_stored_sequence @result, ["B", "D", "F"] + ("G".."K").to_a
end

test "rely on dispatcher to unblock blocked executions with an available semaphore" do
Expand Down Expand Up @@ -133,7 +133,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase

# We can't ensure the order between B and C, because it depends on which worker wins when
# unblocking, as one will try to unblock B and another C
assert_stored_sequence @result, ("A".."K").to_a, [ "A", "C", "B" ] + ("D".."K").to_a
assert_stored_sequence @result, ("A".."K").to_a, ["A", "C", "B"] + ("D".."K").to_a
end

test "rely on dispatcher to unblock blocked executions with an expired semaphore" do
Expand Down Expand Up @@ -165,7 +165,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase

# We can't ensure the order between B and C, because it depends on which worker wins when
# unblocking, as one will try to unblock B and another C
assert_stored_sequence @result, ("A".."K").to_a, [ "A", "C", "B" ] + ("D".."K").to_a
assert_stored_sequence @result, ("A".."K").to_a, ["A", "C", "B"] + ("D".."K").to_a
end

test "don't block claimed executions that get released" do
Expand All @@ -183,44 +183,20 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
assert job.reload.ready?
end

if ActiveRecord::Base.connection.adapter_name == "PostgreSQL"
test "insert_with_unique_by_has_same_database_results_as_create!_with_exception_handling" do
key = "key", limit = 1, expires_at = 1.minute.from_now
test "verify transactions remain valid after Job creation conflicts via limits_concurrency" do
ActiveRecord::Base.transaction do
SequentialUpdateResultJob.perform_later(@result, name: "A", pause: 0.2.seconds)
SequentialUpdateResultJob.perform_later(@result, name: "B")

assert SolidQueue::Semaphore.count == 0

SolidQueue::Semaphore.insert({ key: key, value: limit - 1, expires_at: expires_at }, unique_by: :key)
assert SolidQueue::Semaphore.count == 1

SolidQueue::Semaphore.insert({ key: key, value: limit - 1, expires_at: expires_at }, unique_by: :key)
assert SolidQueue::Semaphore.count == 1

SolidQueue::Semaphore.delete_all
assert SolidQueue::Semaphore.count == 0

SolidQueue::Semaphore.create!(key: key, value: limit - 1, expires_at: expires_at)
assert SolidQueue::Semaphore.count == 1

assert_raises ActiveRecord::RecordNotUnique do
SolidQueue::Semaphore.create!(key: key, value: limit - 1, expires_at: expires_at)
begin
assert_equal 2, SolidQueue::Job.count
assert true, "Transaction state valid"
rescue ActiveRecord::StatementInvalid
assert false, "Transaction state unexpectedly invalid"
end
end
end

test "confirm correct version of attempt_creation by database adaptor" do
proxy = SolidQueue::Semaphore::Proxy.new(true)

aliased_method = proxy.method(:attempt_creation)

if ActiveRecord::Base.connection.adapter_name == "PostgreSQL"
original_method = proxy.method(:attempt_creation_with_insert_on_conflict)
else
original_method = proxy.method(:attempt_creation_with_create_and_exception_handling)
end

assert_equal original_method.name, aliased_method.original_name, "The alias maps to the correct original method"
end

private

def assert_stored_sequence(result, *sequences)
Expand Down

0 comments on commit c817349

Please sign in to comment.