Skip to content

Commit

Permalink
Merge pull request #231 from ikyn-inc/postgres_transaction_issue
Browse files Browse the repository at this point in the history
Fixes #224.  I believe this is a Postgres only issue
  • Loading branch information
rosa authored Aug 6, 2024
2 parents b7d4e1c + 7d36912 commit addd870
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 5 deletions.
1 change: 1 addition & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ GEM

PLATFORMS
arm64-darwin-22
arm64-darwin-23
x86_64-darwin-21
x86_64-darwin-23
x86_64-linux
Expand Down
22 changes: 17 additions & 5 deletions app/models/solid_queue/semaphore.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,17 @@ def signal(job)
def signal_all(jobs)
Proxy.signal_all(jobs)
end

# Requires a unique index on key
def create_unique_by(attributes)
if connection.supports_insert_conflict_target?
insert({ **attributes }, unique_by: :key).any?
else
create!(**attributes)
end
rescue ActiveRecord::RecordNotUnique
false
end
end

class Proxy
Expand All @@ -41,18 +52,19 @@ def signal
end

private

attr_accessor :job

def attempt_creation
Semaphore.create!(key: key, value: limit - 1, expires_at: expires_at)
true
rescue ActiveRecord::RecordNotUnique
if limit == 1 then false
if Semaphore.create_unique_by(key: key, value: limit - 1, expires_at: expires_at)
true
else
attempt_decrement
check_limit_or_decrement
end
end

def check_limit_or_decrement = limit == 1 ? false : attempt_decrement

def attempt_decrement
Semaphore.available.where(key: key).update_all([ "value = value - 1, expires_at = ?", expires_at ]) > 0
end
Expand Down
15 changes: 15 additions & 0 deletions test/integration/concurrency_controls_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,22 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
assert job.reload.ready?
end

test "verify transactions remain valid after Job creation conflicts via limits_concurrency" do
ActiveRecord::Base.transaction do
SequentialUpdateResultJob.perform_later(@result, name: "A", pause: 0.2.seconds)
SequentialUpdateResultJob.perform_later(@result, name: "B")

begin
assert_equal 2, SolidQueue::Job.count
assert true, "Transaction state valid"
rescue ActiveRecord::StatementInvalid
assert false, "Transaction state unexpectedly invalid"
end
end
end

private

def assert_stored_sequence(result, *sequences)
expected = sequences.map { |sequence| "seq: " + sequence.map { |name| "s#{name}c#{name}" }.join }
skip_active_record_query_cache do
Expand Down

0 comments on commit addd870

Please sign in to comment.