Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue when pruning a supervisor and its supervisees via callbacks #306

Merged
merged 1 commit into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions app/models/solid_queue/claimed_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,21 @@ def claiming(job_ids, process_id, &block)
def release_all
SolidQueue.instrument(:release_many_claimed) do |payload|
includes(:job).tap do |executions|
payload[:size] = executions.size
executions.each(&:release)

payload[:size] = executions.size
end
end
end

def fail_all_with(error)
SolidQueue.instrument(:fail_many_claimed) do |payload|
includes(:job).tap do |executions|
payload[:size] = executions.size
executions.each { |execution| execution.failed_with(error) }

payload[:process_ids] = executions.map(&:process_id).uniq
payload[:job_ids] = executions.map(&:job_id).uniq

executions.each { |execution| execution.failed_with(error) }
payload[:size] = executions.size
end
end
end
Expand Down
11 changes: 10 additions & 1 deletion app/models/solid_queue/process.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ class SolidQueue::Process < SolidQueue::Record
include Executor, Prunable

belongs_to :supervisor, class_name: "SolidQueue::Process", optional: true, inverse_of: :supervisees
has_many :supervisees, class_name: "SolidQueue::Process", inverse_of: :supervisor, foreign_key: :supervisor_id, dependent: :destroy
has_many :supervisees, class_name: "SolidQueue::Process", inverse_of: :supervisor, foreign_key: :supervisor_id

store :metadata, coder: JSON

Expand All @@ -26,9 +26,18 @@ def heartbeat
def deregister(pruned: false)
SolidQueue.instrument :deregister_process, process: self, pruned: pruned do |payload|
destroy!

unless supervised? || pruned
supervisees.each(&:deregister)
end
rescue Exception => error
payload[:error] = error
raise
end
end

private
def supervised?
supervisor_id.present?
end
end
8 changes: 7 additions & 1 deletion app/models/solid_queue/process/executor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ module Executor
included do
has_many :claimed_executions

after_destroy -> { claimed_executions.release_all }, if: :claims_executions?
after_destroy :release_all_claimed_executions
end

def fail_all_claimed_executions_with(error)
Expand All @@ -17,6 +17,12 @@ def fail_all_claimed_executions_with(error)
end
end

def release_all_claimed_executions
if claims_executions?
claimed_executions.release_all
end
end

private
def claims_executions?
kind == "Worker"
Expand Down
2 changes: 1 addition & 1 deletion lib/solid_queue/supervisor/maintenance.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def prune_dead_processes

def fail_orphaned_executions
wrap_in_app_executor do
SolidQueue::ClaimedExecution.orphaned.fail_all_with(ProcessMissingError.new)
ClaimedExecution.orphaned.fail_all_with(ProcessMissingError.new)
end
end
end
Expand Down
16 changes: 9 additions & 7 deletions test/integration/forked_processes_lifecycle_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class ForkedProcessesLifecycleTest < ActiveSupport::TestCase
@pid = run_supervisor_as_fork(load_configuration_from: config_as_hash)

wait_for_registered_processes(3, timeout: 3.second)
assert_registered_workers_for(:background, :default)
assert_registered_workers_for(:background, :default, supervisor_pid: @pid)
end

teardown do
Expand Down Expand Up @@ -49,7 +49,7 @@ class ForkedProcessesLifecycleTest < ActiveSupport::TestCase
assert_job_status(pause, :finished)

# Termination is almost clean, but the supervisor remains
assert_registered_supervisor
assert_registered_supervisor_with(@pid)
assert_no_registered_workers
assert_no_claimed_jobs
end
Expand Down Expand Up @@ -217,7 +217,7 @@ class ForkedProcessesLifecycleTest < ActiveSupport::TestCase

# And there's a new worker that has been registered for that queue:
wait_for_registered_processes(3, timeout: 3.second)
assert_registered_workers_for(:background, :default)
assert_registered_workers_for(:background, :default, supervisor_pid: @pid)

# And they can process jobs just fine
enqueue_store_result_job("no_pause")
Expand Down Expand Up @@ -272,17 +272,19 @@ def assert_clean_termination
assert_not process_exists?(@pid)
end

def assert_registered_workers_for(*queues)
def assert_registered_workers_for(*queues, supervisor_pid: nil)
workers = find_processes_registered_as("Worker")
registered_queues = workers.map { |process| process.metadata["queues"] }.compact
assert_equal queues.map(&:to_s).sort, registered_queues.sort
assert_equal [ @pid ], workers.map { |process| process.supervisor.pid }.uniq
if supervisor_pid
assert_equal [ supervisor_pid ], workers.map { |process| process.supervisor.pid }.uniq
end
end

def assert_registered_supervisor
def assert_registered_supervisor_with(pid)
processes = find_processes_registered_as("Supervisor(fork)")
assert_equal 1, processes.count
assert_equal @pid, processes.first.pid
assert_equal pid, processes.first.pid
end

def assert_no_registered_workers
Expand Down
22 changes: 22 additions & 0 deletions test/models/solid_queue/process_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,28 @@ class SolidQueue::ProcessTest < ActiveSupport::TestCase
assert jobs.all?(&:failed?)
end

test "prune processes including their supervisor with expired heartbeats and fail claimed executions" do
supervisor = SolidQueue::Process.register(kind: "Supervisor(fork)", pid: 42, name: "supervisor-42")
process = SolidQueue::Process.register(kind: "Worker", pid: 43, name: "worker-43", supervisor_id: supervisor.id)
3.times { |i| StoreResultJob.set(queue: :new_queue).perform_later(i) }
jobs = SolidQueue::Job.last(3)

SolidQueue::ReadyExecution.claim("*", 5, process.id)

travel_to 10.minutes.from_now

assert_difference -> { SolidQueue::Process.count }, -2 do
assert_difference -> { SolidQueue::FailedExecution.count }, 3 do
assert_difference -> { SolidQueue::ClaimedExecution.count }, -3 do
SolidQueue::Process.prune
end
end
end

jobs.each(&:reload)
assert jobs.all?(&:failed?)
end

test "hostname's with special characters are properly loaded" do
worker = SolidQueue::Worker.new(queues: "*", threads: 3, polling_interval: 0.2)
hostname = "Basecamp’s-Computer"
Expand Down
Loading