Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a couple of race-conditions on shutdown #119

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions app/models/solid_queue/claimed_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ def perform
else
failed_with(result.error)
end
ensure

job.unblock_next_blocked_job
end

def release
transaction do
job.prepare_for_execution
job.dispatch_bypassing_concurrency_limits
destroy!
end
end
Expand Down
4 changes: 4 additions & 0 deletions app/models/solid_queue/job/executable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ def dispatch
end
end

def dispatch_bypassing_concurrency_limits
ready
end

def finished!
if preserve_finished_jobs?
touch(:finished_at)
Expand Down
3 changes: 0 additions & 3 deletions app/models/solid_queue/process.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,5 @@ def heartbeat

def deregister
destroy!
rescue Exception
SolidQueue.logger.error("[SolidQueue] Error deregistering process #{id} - #{metadata}")
raise
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
class AddMissingIndexToBlockedExecutions < ActiveRecord::Migration[7.1]
def change
add_index :solid_queue_blocked_executions, [ :concurrency_key, :priority, :job_id ], name: "index_solid_queue_blocked_executions_for_release"
end
end
2 changes: 1 addition & 1 deletion lib/solid_queue/processes/runnable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def do_start_loop
run
end
end
ensure

run_callbacks(:shutdown) { shutdown }
end

Expand Down
2 changes: 1 addition & 1 deletion lib/solid_queue/processes/supervised.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def register_signal_handlers
end

trap(:QUIT) do
exit!
exit
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/solid_queue/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ def wait_until(timeout, condition, &block)
if timeout > 0
deadline = monotonic_time_now + timeout

while monotonic_time_now < deadline && !condition.call
while monotonic_time_now <= deadline && !condition.call
sleep 0.1
block.call
end
Expand Down
3 changes: 2 additions & 1 deletion test/dummy/db/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#
# It's strongly recommended that you check this file into your version control system.

ActiveRecord::Schema[7.1].define(version: 2023_12_11_200639) do
ActiveRecord::Schema[7.1].define(version: 2024_01_10_143450) do
create_table "job_results", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t|
t.string "queue_name"
t.string "status"
Expand All @@ -26,6 +26,7 @@
t.string "concurrency_key", null: false
t.datetime "expires_at", null: false
t.datetime "created_at", null: false
t.index ["concurrency_key", "priority", "job_id"], name: "index_solid_queue_blocked_executions_for_release"
t.index ["expires_at", "concurrency_key"], name: "index_solid_queue_blocked_executions_for_maintenance"
t.index ["job_id"], name: "index_solid_queue_blocked_executions_on_job_id", unique: true
end
Expand Down