Skip to content

Commit

Permalink
Toggle whether errors should be reraised
Browse files Browse the repository at this point in the history
  • Loading branch information
npezza93 committed Oct 6, 2024
1 parent 4fb5b9c commit 1fe831c
Show file tree
Hide file tree
Showing 7 changed files with 18 additions and 19 deletions.
10 changes: 5 additions & 5 deletions app/models/solid_queue/claimed_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ def release_all
end
end

def fail_all_with(error)
def fail_all_with(error, reraise:)
SolidQueue.instrument(:fail_many_claimed) do |payload|
includes(:job).tap do |executions|
executions.each { |execution| execution.failed_with(error) }
executions.each { |execution| execution.failed_with(error, reraise: reraise) }

payload[:process_ids] = executions.map(&:process_id).uniq
payload[:job_ids] = executions.map(&:job_id).uniq
Expand All @@ -63,7 +63,7 @@ def perform
if result.success?
finished
else
failed_with(result.error)
failed_with(result.error, reraise: true)
end
ensure
job.unblock_next_blocked_job
Expand All @@ -82,12 +82,12 @@ def discard
raise UndiscardableError, "Can't discard a job in progress"
end

def failed_with(error)
def failed_with(error, reraise:)
transaction do
job.failed_with(error)
destroy!
end
raise error
raise error if reraise
end

private
Expand Down
4 changes: 2 additions & 2 deletions app/models/solid_queue/process/executor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ module Executor
after_destroy :release_all_claimed_executions
end

def fail_all_claimed_executions_with(error)
def fail_all_claimed_executions_with(error, reraise:)
if claims_executions?
claimed_executions.fail_all_with(error)
claimed_executions.fail_all_with(error, reraise: reraise)
end
end

Expand Down
2 changes: 1 addition & 1 deletion app/models/solid_queue/process/prunable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def prune(excluding: nil)

def prune
error = Processes::ProcessPrunedError.new(last_heartbeat_at)
fail_all_claimed_executions_with(error)
fail_all_claimed_executions_with(error, reraise: false)

deregister(pruned: true)
end
Expand Down
12 changes: 6 additions & 6 deletions lib/solid_queue/pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,18 @@ def post(execution)
available_threads.decrement

future = Concurrent::Future.new(args: [ execution ], executor: executor) do |thread_execution|
begin
wrap_in_app_executor do
thread_execution.perform
end
rescue Exception
nil
wrap_in_app_executor do
thread_execution.perform
ensure
available_threads.increment
mutex.synchronize { on_idle.try(:call) if idle? }
end
end

future.add_observer do |_, _, error|
handle_thread_error(error) if error
end

future.execute
end

Expand Down
2 changes: 1 addition & 1 deletion lib/solid_queue/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ def replace_fork(pid, status)
def handle_claimed_jobs_by(terminated_fork, status)
if registered_process = process.supervisees.find_by(name: terminated_fork.name)
error = Processes::ProcessExitError.new(status)
registered_process.fail_all_claimed_executions_with(error)
registered_process.fail_all_claimed_executions_with(error, reraise: false)
end
end

Expand Down
3 changes: 2 additions & 1 deletion lib/solid_queue/supervisor/maintenance.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ def prune_dead_processes

def fail_orphaned_executions
wrap_in_app_executor do
ClaimedExecution.orphaned.fail_all_with(Processes::ProcessMissingError.new)
ClaimedExecution.orphaned.
fail_all_with(Processes::ProcessMissingError.new, reraise: false)
end
end
end
Expand Down
4 changes: 1 addition & 3 deletions test/models/solid_queue/claimed_execution_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,7 @@ class SolidQueue::ClaimedExecutionTest < ActiveSupport::TestCase
claimed_execution = prepare_and_claim_job AddToBufferJob.perform_later(42)
job = claimed_execution.job
assert_difference -> { SolidQueue::ClaimedExecution.count } => -1, -> { SolidQueue::FailedExecution.count } => 1 do
assert_raises RuntimeError do
claimed_execution.failed_with(RuntimeError.new)
end
claimed_execution.failed_with(RuntimeError.new, reraise: false)
end

assert job.reload.failed?
Expand Down

0 comments on commit 1fe831c

Please sign in to comment.