Skip to content

Commit

Permalink
Merge pull request #231 from collectiveidea/fixup-skip-locked
Browse files Browse the repository at this point in the history
Fixup reserve_with_scope_using_optimized_postgres
  • Loading branch information
albus522 committed Aug 15, 2024
2 parents a50c45b + 507e553 commit c68cc56
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 7 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ jobs:
with:
ruby-version: ${{ matrix.ruby }}
bundler-cache: true # runs 'bundle install' and caches installed gems automatically
cache-version: 2
- name: Run tests
env:
BUNDLE_GEMFILE: ${{ matrix.gemfile }}
Expand Down
18 changes: 11 additions & 7 deletions lib/delayed/backend/active_record.rb
Original file line number Diff line number Diff line change
Expand Up @@ -133,19 +133,23 @@ def self.reserve_with_scope_using_optimized_postgres(ready_scope, worker, now)
# Note: active_record would attempt to generate UPDATE...LIMIT like
# SQL for Postgres if we use a .limit() filter, but it would not
# use 'FOR UPDATE' and we would have many locking conflicts
subquery = ready_scope.limit(1).lock(true).select("id").to_sql

# On PostgreSQL >= 9.5 we leverage SKIP LOCK to avoid multiple workers blocking each other
# when attempting to get the next available job
# https://www.postgresql.org/docs/9.5/sql-select.html#SQL-FOR-UPDATE-SHARE
skip_locked = ""
if connection.respond_to?(:postgresql_version) && connection.postgresql_version >= 90500
skip_locked = " SKIP LOCKED"
if connection.send(:postgresql_version) >= 9_05_00 # rubocop:disable Style/NumericLiterals
subquery += " SKIP LOCKED"
end

quoted_name = connection.quote_table_name(table_name)
subquery = ready_scope.limit(1).lock(true).select("id").to_sql
sql = "UPDATE #{quoted_name} SET locked_at = ?, locked_by = ? WHERE id IN (#{subquery}#{skip_locked}) RETURNING *"
reserved = find_by_sql([sql, now, worker.name])
reserved[0]
find_by_sql(
[
"UPDATE #{quoted_name} SET locked_at = ?, locked_by = ? WHERE id IN (#{subquery}) RETURNING *",
now,
worker.name
]
).first
end

def self.reserve_with_scope_using_optimized_mysql(ready_scope, worker, now)
Expand Down

0 comments on commit c68cc56

Please sign in to comment.