Skip to content

Commit

Permalink
Use a concurrent timer task to expire semaphores and unblock executions
Browse files Browse the repository at this point in the history
This task is managed by the scheduler and uses its same polling interval to
run.
  • Loading branch information
rosa committed Nov 16, 2023
1 parent 6b11253 commit 1964198
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 35 deletions.
2 changes: 1 addition & 1 deletion app/models/solid_queue/blocked_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ class BlockedExecution < SolidQueue::Execution

has_one :semaphore, foreign_key: :concurrency_key, primary_key: :concurrency_key

scope :releasable, -> { left_outer_joins(:execution_semaphore).merge(Semaphore.available.or(Semaphore.where(id: nil))) }
scope :releasable, -> { left_outer_joins(:semaphore).merge(Semaphore.available.or(Semaphore.where(id: nil))) }
scope :ordered, -> { order(priority: :asc) }

class << self
Expand Down
1 change: 1 addition & 0 deletions app/models/solid_queue/semaphore.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
class SolidQueue::Semaphore < SolidQueue::Record
scope :available, -> { where("value > 0") }
scope :locked, -> { where(value: 0) }
scope :expired, -> { where(expires_at: ...Time.current)}

class << self
def wait_for(concurrency_key, limit, duration)
Expand Down
4 changes: 2 additions & 2 deletions lib/solid_queue/process_registration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ module ProcessRegistration
define_callbacks :start, :run, :shutdown

set_callback :start, :before, :register
set_callback :start, :before, :start_heartbeat
set_callback :start, :before, :launch_heartbeat

set_callback :run, :after, -> { stop unless registered? }

Expand Down Expand Up @@ -43,7 +43,7 @@ def registered?
process.persisted?
end

def start_heartbeat
def launch_heartbeat
@heartbeat_task = Concurrent::TimerTask.new(execution_interval: SolidQueue.process_heartbeat_interval) { heartbeat }
@heartbeat_task.execute
end
Expand Down
65 changes: 43 additions & 22 deletions lib/solid_queue/scheduler.rb
Original file line number Diff line number Diff line change
@@ -1,34 +1,55 @@
# frozen_string_literal: true

class SolidQueue::Scheduler
include SolidQueue::Runner
module SolidQueue
class Scheduler
include Runner

attr_accessor :batch_size, :polling_interval
attr_accessor :batch_size, :polling_interval

def initialize(**options)
options = options.dup.with_defaults(SolidQueue::Configuration::SCHEDULER_DEFAULTS)
set_callback :start, :before, :launch_concurrency_maintenance

@batch_size = options[:batch_size]
@polling_interval = options[:polling_interval]
end
def initialize(**options)
options = options.dup.with_defaults(SolidQueue::Configuration::SCHEDULER_DEFAULTS)

@batch_size = options[:batch_size]
@polling_interval = options[:polling_interval]
end

private
def run
with_polling_volume do
batch = SolidQueue::ScheduledExecution.next_batch(batch_size)
private
def run
with_polling_volume do
batch = SolidQueue::ScheduledExecution.next_batch(batch_size).tap(&:load)

if batch.size > 0
procline "preparing #{batch.size} jobs for execution"
if batch.size > 0
procline "preparing #{batch.size} jobs for execution"

SolidQueue::ScheduledExecution.prepare_batch(batch)
else
procline "waiting"
interruptible_sleep(polling_interval)
SolidQueue::ScheduledExecution.prepare_batch(batch)
else
procline "waiting"
interruptible_sleep(polling_interval)
end
end
end
end

def metadata
super.merge(batch_size: batch_size, polling_interval: polling_interval)
end
def launch_concurrency_maintenance
@concurrency_maintenance_task = Concurrent::TimerTask.new(run_now: true, execution_interval: polling_interval) do
expire_semaphores
unblock_blocked_executions
end

@concurrency_maintenance_task.execute
end

def expire_semaphores
Semaphore.expired.in_batches(of: batch_size, &:delete_all)
end

def unblock_blocked_executions
BlockedExecution.unblock(batch_size)
end

def metadata
super.merge(batch_size: batch_size, polling_interval: polling_interval)
end
end
end
18 changes: 8 additions & 10 deletions test/integration/concurrency_controls_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
@result = JobResult.create!(queue_name: "default", status: "seq: ")

default_worker = { queues: "default", polling_interval: 1, processes: 3, threads: 2 }
@pid = run_supervisor_as_fork(load_configuration_from: { workers: [ default_worker ] })
scheduler = { polling_interval: 1, batch_size: 200 }

@pid = run_supervisor_as_fork(mode: :all, load_configuration_from: { workers: [ default_worker ], scheduler: scheduler })

wait_for_registered_processes(4, timeout: 0.2.second) # 3 workers working the default queue + supervisor
end
Expand Down Expand Up @@ -80,9 +82,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
assert_stored_sequence @result, [ "B", "D", "F" ] + ("G".."K").to_a
end

test "rely on worker to unblock blocked executions with an available semaphore" do
skip "Moving this task to the supervisor"

test "rely on scheduler to unblock blocked executions with an available semaphore" do
# Simulate a scenario where we got an available semaphore and some stuck jobs
job = SequentialUpdateResultJob.perform_later(@result, name: "A")

Expand Down Expand Up @@ -115,9 +115,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
assert_stored_sequence @result, ("A".."K").to_a, [ "A", "C", "B" ] + ("D".."K").to_a
end

test "rely on worker to unblock blocked executions with a missing semaphore" do
skip "Moving this task to the supervisor"

test "rely on scheduler to unblock blocked executions with an expired semaphore" do
# Simulate a scenario where we got an available semaphore and some stuck jobs
job = SequentialUpdateResultJob.perform_later(@result, name: "A")
wait_for_jobs_to_finish_for(2.seconds)
Expand All @@ -135,10 +133,10 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
end
end

# Then delete the semaphore, as if we had cleared it
SolidQueue::Semaphore.find_by(concurrency_key: job.concurrency_key).destroy!
# Simulate semaphore expiration
SolidQueue::Semaphore.find_by(concurrency_key: job.concurrency_key).update(expires_at: 1.hour.ago)

# And wait for workers to release the jobs
# And wait for scheduler to release the jobs
wait_for_jobs_to_finish_for(2.seconds)
assert_no_pending_jobs

Expand Down

0 comments on commit 1964198

Please sign in to comment.