From d7955dadba0eb5c9f28bfa0389a8625b8d6b1fd7 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Wed, 4 Sep 2024 15:26:07 +0200 Subject: [PATCH 01/11] Simplify passing options or file to configuration --- lib/solid_queue/cli.rb | 2 +- lib/solid_queue/configuration.rb | 4 +- lib/solid_queue/supervisor.rb | 4 +- test/integration/concurrency_controls_test.rb | 2 +- test/integration/lifecycle_hooks_test.rb | 2 +- test/integration/processes_lifecycle_test.rb | 2 +- test/unit/configuration_test.rb | 41 +++++++++++-------- test/unit/supervisor_test.rb | 6 +-- 8 files changed, 36 insertions(+), 27 deletions(-) diff --git a/lib/solid_queue/cli.rb b/lib/solid_queue/cli.rb index 631f9c1c..26e4973d 100644 --- a/lib/solid_queue/cli.rb +++ b/lib/solid_queue/cli.rb @@ -14,7 +14,7 @@ def self.exit_on_failure? default_command :start def start - SolidQueue::Supervisor.start(load_configuration_from: options["config_file"]) + SolidQueue::Supervisor.start(config_file: options["config_file"]) end end end diff --git a/lib/solid_queue/configuration.rb b/lib/solid_queue/configuration.rb index 654ebf92..86d6b589 100644 --- a/lib/solid_queue/configuration.rb +++ b/lib/solid_queue/configuration.rb @@ -28,8 +28,8 @@ def instantiate dispatchers: [ DISPATCHER_DEFAULTS ] } - def initialize(load_from: nil) - @raw_config = config_from(load_from) + def initialize(config_file: nil, **options) + @raw_config = config_from(config_file || options.presence) end def configured_processes diff --git a/lib/solid_queue/supervisor.rb b/lib/solid_queue/supervisor.rb index 99cb0cc0..3b492831 100644 --- a/lib/solid_queue/supervisor.rb +++ b/lib/solid_queue/supervisor.rb @@ -6,9 +6,9 @@ class Supervisor < Processes::Base include Maintenance, Signals, Pidfiled class << self - def start(load_configuration_from: nil) + def start(**options) SolidQueue.supervisor = true - configuration = Configuration.new(load_from: load_configuration_from) + configuration = Configuration.new(**options) if configuration.configured_processes.any? new(configuration).tap(&:start) diff --git a/test/integration/concurrency_controls_test.rb b/test/integration/concurrency_controls_test.rb index beab23eb..e181e4ca 100644 --- a/test/integration/concurrency_controls_test.rb +++ b/test/integration/concurrency_controls_test.rb @@ -11,7 +11,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase default_worker = { queues: "default", polling_interval: 0.1, processes: 3, threads: 2 } dispatcher = { polling_interval: 0.1, batch_size: 200, concurrency_maintenance_interval: 1 } - @pid = run_supervisor_as_fork(load_configuration_from: { workers: [ default_worker ], dispatchers: [ dispatcher ] }) + @pid = run_supervisor_as_fork(workers: [ default_worker ], dispatchers: [ dispatcher ]) wait_for_registered_processes(5, timeout: 0.5.second) # 3 workers working the default queue + dispatcher + supervisor end diff --git a/test/integration/lifecycle_hooks_test.rb b/test/integration/lifecycle_hooks_test.rb index 4b2218f7..f82fa8e8 100644 --- a/test/integration/lifecycle_hooks_test.rb +++ b/test/integration/lifecycle_hooks_test.rb @@ -12,7 +12,7 @@ class LifecycleHooksTest < ActiveSupport::TestCase SolidQueue.on_worker_start { JobResult.create!(status: :hook_called, value: :worker_start) } SolidQueue.on_worker_stop { JobResult.create!(status: :hook_called, value: :worker_stop) } - pid = run_supervisor_as_fork(load_configuration_from: { workers: [ { queues: "*" } ] }) + pid = run_supervisor_as_fork(workers: [ { queues: "*" } ]) wait_for_registered_processes(4) terminate_process(pid) diff --git a/test/integration/processes_lifecycle_test.rb b/test/integration/processes_lifecycle_test.rb index 58a80e59..444e721a 100644 --- a/test/integration/processes_lifecycle_test.rb +++ b/test/integration/processes_lifecycle_test.rb @@ -7,7 +7,7 @@ class ProcessesLifecycleTest < ActiveSupport::TestCase setup do config_as_hash = { workers: [ { queues: :background }, { queues: :default, threads: 5 } ], dispatchers: [] } - @pid = run_supervisor_as_fork(load_configuration_from: config_as_hash) + @pid = run_supervisor_as_fork(config_as_hash) wait_for_registered_processes(3, timeout: 3.second) assert_registered_workers_for(:background, :default, supervisor_pid: @pid) diff --git a/test/unit/configuration_test.rb b/test/unit/configuration_test.rb index 0861405f..8da2268e 100644 --- a/test/unit/configuration_test.rb +++ b/test/unit/configuration_test.rb @@ -7,23 +7,23 @@ class ConfigurationTest < ActiveSupport::TestCase end assert_equal 2, configuration.configured_processes.count - assert_processes configuration, :worker, 1, queues: [ "*" ] + assert_processes configuration, :worker, 1, queues: "*" assert_processes configuration, :dispatcher, 1, batch_size: SolidQueue::Configuration::DISPATCHER_DEFAULTS[:batch_size] end test "default configuration when config given doesn't include any configuration" do - configuration = SolidQueue::Configuration.new(load_from: { random_wrong_key: :random_value }) + configuration = SolidQueue::Configuration.new(random_wrong_key: :random_value) assert_equal 2, configuration.configured_processes.count - assert_processes configuration, :worker, 1, queues: [ "*" ] + assert_processes configuration, :worker, 1, queues: "*" assert_processes configuration, :dispatcher, 1, batch_size: SolidQueue::Configuration::DISPATCHER_DEFAULTS[:batch_size] end test "default configuration when config given is empty" do - configuration = SolidQueue::Configuration.new(load_from: {}) + configuration = SolidQueue::Configuration.new(config_file: Rails.root.join("config/empty_configuration.yml")) assert_equal 2, configuration.configured_processes.count - assert_processes configuration, :worker, 1, queues: [ "*" ] + assert_processes configuration, :worker, 1, queues: "*" assert_processes configuration, :dispatcher, 1, batch_size: SolidQueue::Configuration::DISPATCHER_DEFAULTS[:batch_size] end @@ -34,17 +34,22 @@ class ConfigurationTest < ActiveSupport::TestCase assert_processes configuration, :dispatcher, 1 end + test "read configuration from provided file" do + configuration = SolidQueue::Configuration.new(config_file: Rails.root.join("config/alternative_configuration.yml")) + + assert 3, configuration.configured_processes.count + assert_processes configuration, :worker, 3, processes: 1, polling_interval: 0.1, queues: %w[ queue_1 queue_2 queue_3 ], threads: [ 1, 2, 3 ] + end + test "provide configuration as a hash and fill defaults" do background_worker = { queues: "background", polling_interval: 10 } dispatcher = { batch_size: 100 } - config_as_hash = { workers: [ background_worker, background_worker ], dispatchers: [ dispatcher ] } - configuration = SolidQueue::Configuration.new(load_from: config_as_hash) + configuration = SolidQueue::Configuration.new(workers: [ background_worker, background_worker ], dispatchers: [ dispatcher ]) assert_processes configuration, :dispatcher, 1, polling_interval: SolidQueue::Configuration::DISPATCHER_DEFAULTS[:polling_interval], batch_size: 100 - assert_processes configuration, :worker, 2, queues: [ "background" ], polling_interval: 10 + assert_processes configuration, :worker, 2, queues: "background", polling_interval: 10 - config_as_hash = { workers: [ background_worker, background_worker ] } - configuration = SolidQueue::Configuration.new(load_from: config_as_hash) + configuration = SolidQueue::Configuration.new(workers: [ background_worker, background_worker ]) assert_processes configuration, :dispatcher, 0 assert_processes configuration, :worker, 2 @@ -57,20 +62,24 @@ class ConfigurationTest < ActiveSupport::TestCase test "mulitple workers with the same configuration" do background_worker = { queues: "background", polling_interval: 10, processes: 3 } - config_as_hash = { workers: [ background_worker ] } - configuration = SolidQueue::Configuration.new(load_from: config_as_hash) + configuration = SolidQueue::Configuration.new(workers: [ background_worker ]) assert_equal 3, configuration.configured_processes.count - assert_processes configuration, :worker, 3, queues: [ "background" ], polling_interval: 10 + assert_processes configuration, :worker, 3, queues: "background", polling_interval: 10 end private def assert_processes(configuration, kind, count, **attributes) - processes = configuration.configured_processes.select { |p| p.kind == kind }.map(&:instantiate) + processes = configuration.configured_processes.select { |p| p.kind == kind } assert_equal count, processes.size - attributes.each do |attr, value| - assert_equal value, processes.map { |p| p.public_send(attr) }.first + attributes.each do |attr, expected_value| + value = processes.map { |p| p.attributes.fetch(attr) } + unless expected_value.is_a?(Array) + value = value.first + end + + assert_equal expected_value, value end end end diff --git a/test/unit/supervisor_test.rb b/test/unit/supervisor_test.rb index 69d8b0a9..081f0136 100644 --- a/test/unit/supervisor_test.rb +++ b/test/unit/supervisor_test.rb @@ -29,7 +29,7 @@ class SupervisorTest < ActiveSupport::TestCase test "start with provided configuration" do config_as_hash = { workers: [], dispatchers: [ { batch_size: 100 } ] } - pid = run_supervisor_as_fork(load_configuration_from: config_as_hash) + pid = run_supervisor_as_fork(config_as_hash) wait_for_registered_processes(2, timeout: 2) # supervisor + dispatcher assert_registered_supervisor(pid) @@ -44,7 +44,7 @@ class SupervisorTest < ActiveSupport::TestCase test "start with empty configuration" do config_as_hash = { workers: [], dispatchers: [] } - pid = run_supervisor_as_fork(load_configuration_from: config_as_hash) + pid = run_supervisor_as_fork(config_as_hash) sleep(0.5) assert_no_registered_processes @@ -116,7 +116,7 @@ class SupervisorTest < ActiveSupport::TestCase workers: [ { queues: "background", polling_interval: 10, processes: 2 } ], dispatchers: [] } - pid = run_supervisor_as_fork(load_configuration_from: config_as_hash) + pid = run_supervisor_as_fork(config_as_hash) wait_for_registered_processes(3) assert_registered_supervisor(pid) From 0aa1644e38dd8c8cfcaf6d0ce3cd9e22ebd1a90b Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Thu, 5 Sep 2024 19:35:44 +0200 Subject: [PATCH 02/11] Separate recurring tasks configuration into their own file `config/recurring.yml`. Run them using the least busy dispatcher (largest polling interval) or a new default dispatcher if none. This also introduces some new options for the CLI, to allow for `--dispatch_only` and `--work_only`, and to skip recurring tasks. --- lib/solid_queue/cli.rb | 16 ++- lib/solid_queue/configuration.rb | 121 +++++++++++------- .../config/alternative_configuration.yml | 12 ++ test/dummy/config/empty_configuration.yml | 7 + test/dummy/config/invalid_configuration.yml | 1 + test/dummy/config/recurring.yml | 4 + test/dummy/config/solid_queue.yml | 5 - test/integration/processes_lifecycle_test.rb | 3 +- test/unit/configuration_test.rb | 69 ++++++++-- test/unit/supervisor_test.rb | 13 +- 10 files changed, 179 insertions(+), 72 deletions(-) create mode 100644 test/dummy/config/alternative_configuration.yml create mode 100644 test/dummy/config/empty_configuration.yml create mode 100644 test/dummy/config/invalid_configuration.yml create mode 100644 test/dummy/config/recurring.yml diff --git a/lib/solid_queue/cli.rb b/lib/solid_queue/cli.rb index 26e4973d..aa8d1878 100644 --- a/lib/solid_queue/cli.rb +++ b/lib/solid_queue/cli.rb @@ -4,7 +4,19 @@ module SolidQueue class Cli < Thor - class_option :config_file, type: :string, aliases: "-c", default: Configuration::DEFAULT_CONFIG_FILE_PATH, desc: "Path to config file" + class_option :config_file, type: :string, aliases: "-c", + default: Configuration::DEFAULT_CONFIG_FILE_PATH, + desc: "Path to config file", + banner: "SOLID_QUEUE_CONFIG" + + class_option :recurring_schedule_file, type: :string, + default: Configuration::DEFAULT_RECURRING_SCHEDULE_FILE_PATH, + desc: "Path to recurring schedule definition", + banner: "SOLID_QUEUE_RECURRING_SCHEDULE" + + class_option :dispatch_only, type: :boolean, default: false + class_option :work_only, type: :boolean, default: false + class_option :skip_recurring, type: :boolean, default: false def self.exit_on_failure? true @@ -14,7 +26,7 @@ def self.exit_on_failure? default_command :start def start - SolidQueue::Supervisor.start(config_file: options["config_file"]) + SolidQueue::Supervisor.start(**options.symbolize_keys) end end end diff --git a/lib/solid_queue/configuration.rb b/lib/solid_queue/configuration.rb index 86d6b589..a2ebaf5f 100644 --- a/lib/solid_queue/configuration.rb +++ b/lib/solid_queue/configuration.rb @@ -19,21 +19,23 @@ def instantiate batch_size: 500, polling_interval: 1, concurrency_maintenance: true, - concurrency_maintenance_interval: 600, - recurring_tasks: [] + concurrency_maintenance_interval: 600 } - DEFAULT_CONFIG = { - workers: [ WORKER_DEFAULTS ], - dispatchers: [ DISPATCHER_DEFAULTS ] - } + DEFAULT_CONFIG_FILE_PATH = "config/solid_queue.yml" + DEFAULT_RECURRING_SCHEDULE_FILE_PATH = "config/recurring.yml" - def initialize(config_file: nil, **options) - @raw_config = config_from(config_file || options.presence) + def initialize(**options) + @options = options.with_defaults(default_options) end def configured_processes - dispatchers + workers + case + when only_work? then workers + when only_dispatch? then dispatchers + else + dispatchers + workers + end end def max_number_of_threads @@ -42,9 +44,29 @@ def max_number_of_threads end private - attr_reader :raw_config + attr_reader :options + + def default_options + { + config_file: Rails.root.join(ENV["SOLID_QUEUE_CONFIG"] || DEFAULT_CONFIG_FILE_PATH), + recurring_schedule_file: Rails.root.join(ENV["SOLID_QUEUE_RECURRING_SCHEDULE"] || DEFAULT_RECURRING_SCHEDULE_FILE_PATH), + only_work: false, + only_dispatch: false, + skip_recurring: false + } + end + + def only_work? + options[:only_work] + end - DEFAULT_CONFIG_FILE_PATH = "config/solid_queue.yml" + def only_dispatch? + options[:only_dispatch] + end + + def skip_recurring_tasks? + options[:skip_recurring] || only_work? + end def workers workers_options.flat_map do |worker_options| @@ -55,41 +77,66 @@ def workers def dispatchers dispatchers_options.map do |dispatcher_options| - recurring_tasks = parse_recurring_tasks dispatcher_options[:recurring_tasks] - Process.new :dispatcher, dispatcher_options.merge(recurring_tasks: recurring_tasks).with_defaults(DISPATCHER_DEFAULTS) - end - end - - def config_from(file_or_hash, env: Rails.env) - load_config_from(file_or_hash).then do |config| - config = config[env.to_sym] ? config[env.to_sym] : config - if (config.keys & DEFAULT_CONFIG.keys).any? then config - else - DEFAULT_CONFIG - end + Process.new :dispatcher, dispatcher_options.with_defaults(DISPATCHER_DEFAULTS) end end def workers_options - @workers_options ||= options_from_raw_config(:workers) + @workers_options ||= processes_config.fetch(:workers, []) .map { |options| options.dup.symbolize_keys } end def dispatchers_options - @dispatchers_options ||= options_from_raw_config(:dispatchers) + @dispatchers_options ||= processes_config.fetch(:dispatchers, []) .map { |options| options.dup.symbolize_keys } + .then { |options| with_recurring_tasks(options) } end - def options_from_raw_config(key) - Array(raw_config[key]) + def with_recurring_tasks(options) + if !skip_recurring_tasks? && recurring_tasks.any? + options.sort_by! { |attrs| attrs[:polling_interval] } + + if least_busy_dispatcher = options.pop + least_busy_dispatcher[:recurring_tasks] = recurring_tasks + options.push(least_busy_dispatcher) + else + [ DISPATCHER_DEFAULTS.merge(recurring_tasks: recurring_tasks) ] + end + else + options + end end - def parse_recurring_tasks(tasks) - Array(tasks).map do |id, options| + def recurring_tasks + @recurring_tasks ||= recurring_tasks_config.map do |id, options| RecurringTask.from_configuration(id, **options) end.select(&:valid?) end + def processes_config + @processes_config ||= config_from \ + options.slice(:workers, :dispatchers).presence || options[:config_file], + keys: [ :workers, :dispatchers ], + fallback: { workers: [ WORKER_DEFAULTS ], dispatchers: [ DISPATCHER_DEFAULTS ] } + end + + def recurring_tasks_config + @recurring_tasks ||= config_from options[:recurring_schedule_file] + end + + + def config_from(file_or_hash, keys: [], fallback: {}, env: Rails.env) + load_config_from(file_or_hash).then do |config| + config = config[env.to_sym] ? config[env.to_sym] : config + config = config.slice(*keys) if keys.any? + + if config.empty? then fallback + else + config + end + end + end + def load_config_from(file_or_hash) case file_or_hash when Hash @@ -97,29 +144,17 @@ def load_config_from(file_or_hash) when Pathname, String load_config_from_file Pathname.new(file_or_hash) when NilClass - load_config_from_env_location || load_config_from_default_location + {} else raise "Solid Queue cannot be initialized with #{file_or_hash.inspect}" end end - def load_config_from_env_location - if ENV["SOLID_QUEUE_CONFIG"].present? - load_config_from_file Rails.root.join(ENV["SOLID_QUEUE_CONFIG"]) - end - end - - def load_config_from_default_location - Rails.root.join(DEFAULT_CONFIG_FILE_PATH).then do |config_file| - config_file.exist? ? load_config_from_file(config_file) : {} - end - end - def load_config_from_file(file) if file.exist? ActiveSupport::ConfigurationFile.parse(file).deep_symbolize_keys else - raise "Configuration file for Solid Queue not found in #{file}" + {} end end end diff --git a/test/dummy/config/alternative_configuration.yml b/test/dummy/config/alternative_configuration.yml new file mode 100644 index 00000000..0d586867 --- /dev/null +++ b/test/dummy/config/alternative_configuration.yml @@ -0,0 +1,12 @@ +default: &default + workers: + <% 3.times do |i| %> + - queues: queue_<%= i + 1 %> + threads: <%= i + 1 %> + <% end %> + +development: + <<: *default + +test: + <<: *default diff --git a/test/dummy/config/empty_configuration.yml b/test/dummy/config/empty_configuration.yml new file mode 100644 index 00000000..caf23fee --- /dev/null +++ b/test/dummy/config/empty_configuration.yml @@ -0,0 +1,7 @@ +default: &default + +development: + <<: *default + +test: + <<: *default diff --git a/test/dummy/config/invalid_configuration.yml b/test/dummy/config/invalid_configuration.yml new file mode 100644 index 00000000..debf7603 --- /dev/null +++ b/test/dummy/config/invalid_configuration.yml @@ -0,0 +1 @@ +random_wrong_key: random_value diff --git a/test/dummy/config/recurring.yml b/test/dummy/config/recurring.yml new file mode 100644 index 00000000..c2ce7437 --- /dev/null +++ b/test/dummy/config/recurring.yml @@ -0,0 +1,4 @@ +periodic_store_result: + class: StoreResultJob + args: [ 42, { status: "custom_status" } ] + schedule: every second diff --git a/test/dummy/config/solid_queue.yml b/test/dummy/config/solid_queue.yml index efa9a9e0..cce3e9cd 100644 --- a/test/dummy/config/solid_queue.yml +++ b/test/dummy/config/solid_queue.yml @@ -7,11 +7,6 @@ default: &default dispatchers: - polling_interval: 1 batch_size: 500 - recurring_tasks: - periodic_store_result: - class: StoreResultJob - args: [ 42, { status: "custom_status" } ] - schedule: every second development: <<: *default diff --git a/test/integration/processes_lifecycle_test.rb b/test/integration/processes_lifecycle_test.rb index 444e721a..38204caf 100644 --- a/test/integration/processes_lifecycle_test.rb +++ b/test/integration/processes_lifecycle_test.rb @@ -6,8 +6,7 @@ class ProcessesLifecycleTest < ActiveSupport::TestCase self.use_transactional_tests = false setup do - config_as_hash = { workers: [ { queues: :background }, { queues: :default, threads: 5 } ], dispatchers: [] } - @pid = run_supervisor_as_fork(config_as_hash) + @pid = run_supervisor_as_fork(workers: [ { queues: :background }, { queues: :default, threads: 5 } ]) wait_for_registered_processes(3, timeout: 3.second) assert_registered_workers_for(:background, :default, supervisor_pid: @pid) diff --git a/test/unit/configuration_test.rb b/test/unit/configuration_test.rb index 8da2268e..247a5f01 100644 --- a/test/unit/configuration_test.rb +++ b/test/unit/configuration_test.rb @@ -2,9 +2,7 @@ class ConfigurationTest < ActiveSupport::TestCase test "default configuration to process all queues and dispatch" do - configuration = stub_const(SolidQueue::Configuration, :DEFAULT_CONFIG_FILE_PATH, "non/existent/path") do - SolidQueue::Configuration.new - end + configuration = SolidQueue::Configuration.new(config_file: nil) assert_equal 2, configuration.configured_processes.count assert_processes configuration, :worker, 1, queues: "*" @@ -12,7 +10,7 @@ class ConfigurationTest < ActiveSupport::TestCase end test "default configuration when config given doesn't include any configuration" do - configuration = SolidQueue::Configuration.new(random_wrong_key: :random_value) + configuration = SolidQueue::Configuration.new(config_file: config_file_path(:invalid_configuration)) assert_equal 2, configuration.configured_processes.count assert_processes configuration, :worker, 1, queues: "*" @@ -20,7 +18,7 @@ class ConfigurationTest < ActiveSupport::TestCase end test "default configuration when config given is empty" do - configuration = SolidQueue::Configuration.new(config_file: Rails.root.join("config/empty_configuration.yml")) + configuration = SolidQueue::Configuration.new(config_file: config_file_path(:empty_configuration)) assert_equal 2, configuration.configured_processes.count assert_processes configuration, :worker, 1, queues: "*" @@ -35,7 +33,7 @@ class ConfigurationTest < ActiveSupport::TestCase end test "read configuration from provided file" do - configuration = SolidQueue::Configuration.new(config_file: Rails.root.join("config/alternative_configuration.yml")) + configuration = SolidQueue::Configuration.new(config_file: config_file_path(:alternative_configuration), only_work: true) assert 3, configuration.configured_processes.count assert_processes configuration, :worker, 3, processes: 1, polling_interval: 0.1, queues: %w[ queue_1 queue_2 queue_3 ], threads: [ 1, 2, 3 ] @@ -49,7 +47,7 @@ class ConfigurationTest < ActiveSupport::TestCase assert_processes configuration, :dispatcher, 1, polling_interval: SolidQueue::Configuration::DISPATCHER_DEFAULTS[:polling_interval], batch_size: 100 assert_processes configuration, :worker, 2, queues: "background", polling_interval: 10 - configuration = SolidQueue::Configuration.new(workers: [ background_worker, background_worker ]) + configuration = SolidQueue::Configuration.new(workers: [ background_worker, background_worker ], skip_recurring: true) assert_processes configuration, :dispatcher, 0 assert_processes configuration, :worker, 2 @@ -64,22 +62,73 @@ class ConfigurationTest < ActiveSupport::TestCase background_worker = { queues: "background", polling_interval: 10, processes: 3 } configuration = SolidQueue::Configuration.new(workers: [ background_worker ]) - assert_equal 3, configuration.configured_processes.count assert_processes configuration, :worker, 3, queues: "background", polling_interval: 10 end + test "recurring tasks configuration with one dispatcher" do + configuration = SolidQueue::Configuration.new(dispatchers: [ { polling_interval: 0.1 } ]) + + assert_processes configuration, :dispatcher, 1, polling_interval: 0.1 + + dispatcher = configuration.configured_processes.first.instantiate + assert_has_recurring_task dispatcher, key: "periodic_store_result", class_name: "StoreResultJob", schedule: "every second" + end + + test "recurring tasks configuration with no dispatchers uses a default dispatcher" do + configuration = SolidQueue::Configuration.new(dispatchers: []) + + assert_processes configuration, :dispatcher, 1, polling_interval: 1 + + dispatcher = configuration.configured_processes.first.instantiate + assert_has_recurring_task dispatcher, key: "periodic_store_result", class_name: "StoreResultJob", schedule: "every second" + end + + test "recurring tasks configuration with multiple dispatchers uses the least busy one" do + configuration = SolidQueue::Configuration.new(dispatchers: [ { polling_interval: 0.1 }, { polling_interval: 0.4 }, { polling_interval: 0.2 } ]) + + assert_processes configuration, :dispatcher, 3, polling_interval: [ 0.1, 0.2, 0.4 ] # sorted by polling interval + + dispatcher = configuration.configured_processes.last.instantiate + assert_has_recurring_task dispatcher, key: "periodic_store_result", class_name: "StoreResultJob", schedule: "every second" + + dispatchers_without_recurring_tasks = configuration.configured_processes.first(2) + assert_nil dispatchers_without_recurring_tasks.map { |d| d.attributes[:recurring_tasks] }.uniq.first + end + + test "no recurring tasks configuration when explicitly excluded" do + configuration = SolidQueue::Configuration.new(dispatchers: [ { polling_interval: 0.1 } ], skip_recurring: true) + assert_processes configuration, :dispatcher, 1, polling_interval: 0.1, recurring_tasks: nil + end + private def assert_processes(configuration, kind, count, **attributes) processes = configuration.configured_processes.select { |p| p.kind == kind } assert_equal count, processes.size attributes.each do |attr, expected_value| - value = processes.map { |p| p.attributes.fetch(attr) } + value = processes.map { |p| p.attributes[attr] } unless expected_value.is_a?(Array) value = value.first end - assert_equal expected_value, value + if expected_value.nil? + assert_nil value + else + assert_equal expected_value, value + end end end + + def assert_has_recurring_task(dispatcher, key:, **attributes) + assert_equal 1, dispatcher.recurring_schedule.configured_tasks.count + task = dispatcher.recurring_schedule.configured_tasks.detect { |t| t.key == key } + + attributes.each do |attr, value| + assert_equal value, task.public_send(attr) + end + end + + def config_file_path(name) + Rails.root.join("config/#{name}.yml") + end end diff --git a/test/unit/supervisor_test.rb b/test/unit/supervisor_test.rb index 081f0136..28288717 100644 --- a/test/unit/supervisor_test.rb +++ b/test/unit/supervisor_test.rb @@ -28,8 +28,7 @@ class SupervisorTest < ActiveSupport::TestCase end test "start with provided configuration" do - config_as_hash = { workers: [], dispatchers: [ { batch_size: 100 } ] } - pid = run_supervisor_as_fork(config_as_hash) + pid = run_supervisor_as_fork(dispatchers: [ { batch_size: 100 } ]) wait_for_registered_processes(2, timeout: 2) # supervisor + dispatcher assert_registered_supervisor(pid) @@ -42,9 +41,7 @@ class SupervisorTest < ActiveSupport::TestCase end test "start with empty configuration" do - config_as_hash = { workers: [], dispatchers: [] } - - pid = run_supervisor_as_fork(config_as_hash) + pid = run_supervisor_as_fork(workers: [], dispatchers: []) sleep(0.5) assert_no_registered_processes @@ -112,11 +109,7 @@ class SupervisorTest < ActiveSupport::TestCase # Simnulate orphaned executions by just wiping the claiming process process.delete - config_as_hash = { - workers: [ { queues: "background", polling_interval: 10, processes: 2 } ], - dispatchers: [] - } - pid = run_supervisor_as_fork(config_as_hash) + pid = run_supervisor_as_fork(workers: [ { queues: "background", polling_interval: 10, processes: 2 } ]) wait_for_registered_processes(3) assert_registered_supervisor(pid) From 03334b1617d65de666ba1349d3cf5c015481ecaf Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Thu, 5 Sep 2024 20:46:08 +0200 Subject: [PATCH 03/11] Allow configuring queue and priority for recurring tasks And also setting a description that we can use from Mission Control in the future. --- app/models/solid_queue/recurring_task.rb | 18 +++++-- .../models/solid_queue/recurring_task_test.rb | 47 +++++++++++++++---- test/unit/configuration_test.rb | 16 ++++--- 3 files changed, 64 insertions(+), 17 deletions(-) diff --git a/app/models/solid_queue/recurring_task.rb b/app/models/solid_queue/recurring_task.rb index 3dc2a5ac..7a1a84c7 100644 --- a/app/models/solid_queue/recurring_task.rb +++ b/app/models/solid_queue/recurring_task.rb @@ -17,7 +17,15 @@ def wrap(args) end def from_configuration(key, **options) - new(key: key, class_name: options[:class], schedule: options[:schedule], arguments: options[:args]) + new \ + key: key, + class_name: options[:class], + arguments: options[:args], + schedule: options[:schedule], + queue_name: options[:queue_name].presence, + priority: options[:priority].presence, + description: options[:description], + static: true end def create_or_update_all(tasks) @@ -89,7 +97,7 @@ def using_solid_queue_adapter? def enqueue_and_record(run_at:) RecurringExecution.record(key, run_at) do - job_class.new(*arguments_with_kwargs).tap do |active_job| + job_class.new(*arguments_with_kwargs).set(enqueue_options).tap do |active_job| active_job.run_callbacks(:enqueue) do Job.enqueue(active_job) end @@ -99,7 +107,7 @@ def enqueue_and_record(run_at:) end def perform_later(&block) - job_class.perform_later(*arguments_with_kwargs, &block) + job_class.set(enqueue_options).perform_later(*arguments_with_kwargs, &block) end def arguments_with_kwargs @@ -118,5 +126,9 @@ def parsed_schedule def job_class @job_class ||= class_name&.safe_constantize end + + def enqueue_options + { queue: queue_name, priority: priority }.compact + end end end diff --git a/test/models/solid_queue/recurring_task_test.rb b/test/models/solid_queue/recurring_task_test.rb index bad6ec30..e0676912 100644 --- a/test/models/solid_queue/recurring_task_test.rb +++ b/test/models/solid_queue/recurring_task_test.rb @@ -25,6 +25,14 @@ def perform(value, options = {}, **kwargs) end end + class JobWithPriority < ApplicationJob + queue_with_priority 10 + + def perform + JobBuffer.add "job_with_priority" + end + end + class JobUsingAsyncAdapter < ApplicationJob self.queue_adapter = :async @@ -33,11 +41,6 @@ def perform end end - setup do - @worker = SolidQueue::Worker.new(queues: "*") - @worker.mode = :inline - end - test "job without arguments" do task = recurring_task_with(class_name: "JobWithoutArguments") enqueue_and_assert_performed_with_result task, "job_without_arguments" @@ -126,6 +129,31 @@ def perform assert_not SolidQueue::RecurringTask.new(key: "task-id", schedule: "every minute").valid? end + test "task with custom queue and priority" do + task = recurring_task_with(class_name: "JobWithoutArguments", queue_name: "my_new_queue", priority: 4) + enqueue_and_assert_performed_with_result task, "job_without_arguments" + + job = SolidQueue::Job.last + assert_equal "my_new_queue", job.queue_name + assert_equal 4, job.priority + end + + test "overriding existing priority" do + task = recurring_task_with(class_name: "JobWithPriority", priority: nil).tap(&:save!) + enqueue_and_assert_performed_with_result task.reload, "job_with_priority" + + job = SolidQueue::Job.last + assert_equal 10, job.priority + + task.destroy + + task = recurring_task_with(class_name: "JobWithPriority", priority: 4).tap(&:save!) + enqueue_and_assert_performed_with_result task.reload, "job_with_priority" + + job = SolidQueue::Job.last + assert_equal 4, job.priority + end + private def enqueue_and_assert_performed_with_result(task, result) assert_difference [ -> { SolidQueue::Job.count }, -> { SolidQueue::ReadyExecution.count } ], +1 do @@ -133,13 +161,16 @@ def enqueue_and_assert_performed_with_result(task, result) end assert_difference -> { JobBuffer.size }, +1 do - @worker.start + SolidQueue::Worker.new(queues: "*").tap do |worker| + worker.mode = :inline + worker.start + end end assert_equal result, JobBuffer.last_value end - def recurring_task_with(class_name:, schedule: "every hour", args: nil) - SolidQueue::RecurringTask.new(key: "task-id", class_name: "SolidQueue::RecurringTaskTest::#{class_name}", schedule: schedule, arguments: args) + def recurring_task_with(class_name:, schedule: "every hour", args: nil, **options) + SolidQueue::RecurringTask.new(key: "task-id", class_name: "SolidQueue::RecurringTaskTest::#{class_name}", schedule: schedule, arguments: args, **options) end end diff --git a/test/unit/configuration_test.rb b/test/unit/configuration_test.rb index 247a5f01..df6dda7a 100644 --- a/test/unit/configuration_test.rb +++ b/test/unit/configuration_test.rb @@ -111,11 +111,7 @@ def assert_processes(configuration, kind, count, **attributes) value = value.first end - if expected_value.nil? - assert_nil value - else - assert_equal expected_value, value - end + assert_equal_value expected_value, value end end @@ -124,7 +120,15 @@ def assert_has_recurring_task(dispatcher, key:, **attributes) task = dispatcher.recurring_schedule.configured_tasks.detect { |t| t.key == key } attributes.each do |attr, value| - assert_equal value, task.public_send(attr) + assert_equal_value value, task.public_send(attr) + end + end + + def assert_equal_value(expected_value, value) + if expected_value.nil? + assert_nil value + else + assert_equal expected_value, value end end From a4467dbe57b28f623007d6597d157a5eea2cd9aa Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Thu, 5 Sep 2024 23:32:05 +0200 Subject: [PATCH 04/11] Split recurring scheduling responsibilities into another process A Scheduler, instead of Dispatcher, that will be created only when there are recurring tasks to run. --- lib/solid_queue/cli.rb | 1 - lib/solid_queue/configuration.rb | 32 ++++------- lib/solid_queue/dispatcher.rb | 9 ++-- lib/solid_queue/log_subscriber.rb | 4 +- lib/solid_queue/processes/poller.rb | 3 -- lib/solid_queue/processes/runnable.rb | 5 ++ lib/solid_queue/scheduler.rb | 53 +++++++++++++++++++ .../recurring_schedule.rb | 2 +- test/dummy/config/empty_configuration.yml | 8 +-- test/integration/recurring_tasks_test.rb | 22 ++++---- test/test_helpers/processes_test_helper.rb | 8 ++- test/unit/configuration_test.rb | 38 +++++-------- test/unit/dispatcher_test.rb | 41 +------------- test/unit/log_subscriber_test.rb | 2 +- test/unit/scheduler_test.rb | 36 +++++++++++++ test/unit/worker_test.rb | 6 --- 16 files changed, 148 insertions(+), 122 deletions(-) create mode 100644 lib/solid_queue/scheduler.rb rename lib/solid_queue/{dispatcher => scheduler}/recurring_schedule.rb (97%) create mode 100644 test/unit/scheduler_test.rb diff --git a/lib/solid_queue/cli.rb b/lib/solid_queue/cli.rb index aa8d1878..050966da 100644 --- a/lib/solid_queue/cli.rb +++ b/lib/solid_queue/cli.rb @@ -14,7 +14,6 @@ class Cli < Thor desc: "Path to recurring schedule definition", banner: "SOLID_QUEUE_RECURRING_SCHEDULE" - class_option :dispatch_only, type: :boolean, default: false class_option :work_only, type: :boolean, default: false class_option :skip_recurring, type: :boolean, default: false diff --git a/lib/solid_queue/configuration.rb b/lib/solid_queue/configuration.rb index a2ebaf5f..6f999cbc 100644 --- a/lib/solid_queue/configuration.rb +++ b/lib/solid_queue/configuration.rb @@ -30,11 +30,9 @@ def initialize(**options) end def configured_processes - case - when only_work? then workers - when only_dispatch? then dispatchers + if only_work? then workers else - dispatchers + workers + dispatchers + workers + schedulers end end @@ -81,6 +79,14 @@ def dispatchers end end + def schedulers + if !skip_recurring_tasks? && recurring_tasks.any? + [ Process.new(:scheduler, recurring_tasks: recurring_tasks) ] + else + [] + end + end + def workers_options @workers_options ||= processes_config.fetch(:workers, []) .map { |options| options.dup.symbolize_keys } @@ -89,22 +95,6 @@ def workers_options def dispatchers_options @dispatchers_options ||= processes_config.fetch(:dispatchers, []) .map { |options| options.dup.symbolize_keys } - .then { |options| with_recurring_tasks(options) } - end - - def with_recurring_tasks(options) - if !skip_recurring_tasks? && recurring_tasks.any? - options.sort_by! { |attrs| attrs[:polling_interval] } - - if least_busy_dispatcher = options.pop - least_busy_dispatcher[:recurring_tasks] = recurring_tasks - options.push(least_busy_dispatcher) - else - [ DISPATCHER_DEFAULTS.merge(recurring_tasks: recurring_tasks) ] - end - else - options - end end def recurring_tasks @@ -128,7 +118,7 @@ def recurring_tasks_config def config_from(file_or_hash, keys: [], fallback: {}, env: Rails.env) load_config_from(file_or_hash).then do |config| config = config[env.to_sym] ? config[env.to_sym] : config - config = config.slice(*keys) if keys.any? + config = config.slice(*keys) if keys.any? && config.present? if config.empty? then fallback else diff --git a/lib/solid_queue/dispatcher.rb b/lib/solid_queue/dispatcher.rb index b2cd80af..a22a82d8 100644 --- a/lib/solid_queue/dispatcher.rb +++ b/lib/solid_queue/dispatcher.rb @@ -2,10 +2,10 @@ module SolidQueue class Dispatcher < Processes::Poller - attr_accessor :batch_size, :concurrency_maintenance, :recurring_schedule + attr_accessor :batch_size, :concurrency_maintenance - after_boot :start_concurrency_maintenance, :schedule_recurring_tasks - before_shutdown :stop_concurrency_maintenance, :unschedule_recurring_tasks + after_boot :start_concurrency_maintenance + before_shutdown :stop_concurrency_maintenance def initialize(**options) options = options.dup.with_defaults(SolidQueue::Configuration::DISPATCHER_DEFAULTS) @@ -13,13 +13,12 @@ def initialize(**options) @batch_size = options[:batch_size] @concurrency_maintenance = ConcurrencyMaintenance.new(options[:concurrency_maintenance_interval], options[:batch_size]) if options[:concurrency_maintenance] - @recurring_schedule = RecurringSchedule.new(options[:recurring_tasks]) super(**options) end def metadata - super.merge(batch_size: batch_size, concurrency_maintenance_interval: concurrency_maintenance&.interval, recurring_schedule: recurring_schedule.task_keys.presence) + super.merge(batch_size: batch_size, concurrency_maintenance_interval: concurrency_maintenance&.interval) end private diff --git a/lib/solid_queue/log_subscriber.rb b/lib/solid_queue/log_subscriber.rb index 2639f042..3d2ec02c 100644 --- a/lib/solid_queue/log_subscriber.rb +++ b/lib/solid_queue/log_subscriber.rb @@ -94,7 +94,7 @@ def register_process(event) if error = event.payload[:error] warn formatted_event(event, action: "Error registering #{process_kind}", **attributes.merge(error: formatted_error(error))) else - info formatted_event(event, action: "Register #{process_kind}", **attributes) + debug formatted_event(event, action: "Register #{process_kind}", **attributes) end end @@ -114,7 +114,7 @@ def deregister_process(event) if error = event.payload[:error] warn formatted_event(event, action: "Error deregistering #{process.kind}", **attributes.merge(error: formatted_error(error))) else - info formatted_event(event, action: "Deregister #{process.kind}", **attributes) + debug formatted_event(event, action: "Deregister #{process.kind}", **attributes) end end diff --git a/lib/solid_queue/processes/poller.rb b/lib/solid_queue/processes/poller.rb index 6a88b06d..bf5a7450 100644 --- a/lib/solid_queue/processes/poller.rb +++ b/lib/solid_queue/processes/poller.rb @@ -41,9 +41,6 @@ def poll raise NotImplementedError end - def shutdown - end - def with_polling_volume SolidQueue.instrument(:polling) do if SolidQueue.silence_polling? && ActiveRecord::Base.logger diff --git a/lib/solid_queue/processes/runnable.rb b/lib/solid_queue/processes/runnable.rb index c2ab045b..37618ffc 100644 --- a/lib/solid_queue/processes/runnable.rb +++ b/lib/solid_queue/processes/runnable.rb @@ -18,6 +18,8 @@ def start def stop @stopped = true + wake_up if running_async? + @thread&.join end @@ -61,6 +63,9 @@ def all_work_completed? false end + def shutdown + end + def set_procline end diff --git a/lib/solid_queue/scheduler.rb b/lib/solid_queue/scheduler.rb new file mode 100644 index 00000000..98403496 --- /dev/null +++ b/lib/solid_queue/scheduler.rb @@ -0,0 +1,53 @@ +# frozen_string_literal: true + +module SolidQueue + class Scheduler < Processes::Base + include Processes::Runnable + + attr_accessor :recurring_schedule + + after_boot :schedule_recurring_tasks + before_shutdown :unschedule_recurring_tasks + + def initialize(recurring_tasks:, **options) + @recurring_schedule = RecurringSchedule.new(recurring_tasks) + + super(**options) + end + + def metadata + super.merge(recurring_schedule: recurring_schedule.task_keys.presence) + end + + private + SLEEP_INTERVAL = 300 # Right now it doesn't matter, can be set to 1 in the future for dynamic tasks + + def run + loop do + break if shutting_down? + + interruptible_sleep(SLEEP_INTERVAL) + end + ensure + SolidQueue.instrument(:shutdown_process, process: self) do + run_callbacks(:shutdown) { shutdown } + end + end + + def schedule_recurring_tasks + recurring_schedule.schedule_tasks + end + + def unschedule_recurring_tasks + recurring_schedule.unschedule_tasks + end + + def all_work_completed? + recurring_schedule.empty? + end + + def set_procline + procline "scheduling #{recurring_schedule.task_keys.join(",")}" + end + end +end diff --git a/lib/solid_queue/dispatcher/recurring_schedule.rb b/lib/solid_queue/scheduler/recurring_schedule.rb similarity index 97% rename from lib/solid_queue/dispatcher/recurring_schedule.rb rename to lib/solid_queue/scheduler/recurring_schedule.rb index c476ae6e..5b8ff6bb 100644 --- a/lib/solid_queue/dispatcher/recurring_schedule.rb +++ b/lib/solid_queue/scheduler/recurring_schedule.rb @@ -1,7 +1,7 @@ # frozen_string_literal: true module SolidQueue - class Dispatcher::RecurringSchedule + class Scheduler::RecurringSchedule include AppExecutor attr_reader :configured_tasks, :scheduled_tasks diff --git a/test/dummy/config/empty_configuration.yml b/test/dummy/config/empty_configuration.yml index caf23fee..53bfecf3 100644 --- a/test/dummy/config/empty_configuration.yml +++ b/test/dummy/config/empty_configuration.yml @@ -1,7 +1,3 @@ -default: &default +development: [] -development: - <<: *default - -test: - <<: *default +test: [] diff --git a/test/integration/recurring_tasks_test.rb b/test/integration/recurring_tasks_test.rb index 56462fdb..aa48c12a 100644 --- a/test/integration/recurring_tasks_test.rb +++ b/test/integration/recurring_tasks_test.rb @@ -6,9 +6,9 @@ class RecurringTasksTest < ActiveSupport::TestCase self.use_transactional_tests = false setup do - @pid = run_supervisor_as_fork - # 1 supervisor + 2 workers + 1 dispatcher - wait_for_registered_processes(4, timeout: 3.second) + @pid = run_supervisor_as_fork(skip_recurring: false) + # 1 supervisor + 2 workers + 1 dispatcher + 1 scheduler + wait_for_registered_processes(5, timeout: 3.second) end teardown do @@ -52,8 +52,8 @@ class RecurringTasksTest < ActiveSupport::TestCase task = SolidQueue::RecurringTask.find_by(key: "periodic_store_result") task.update!(class_name: "StoreResultJob", schedule: "every minute", arguments: [ 42 ]) - @pid = run_supervisor_as_fork - wait_for_registered_processes(4, timeout: 3.second) + @pid = run_supervisor_as_fork(skip_recurring: false) + wait_for_registered_processes(5, timeout: 3.second) # Wait for concurrency schedule loading after process registration sleep(0.5) @@ -61,20 +61,20 @@ class RecurringTasksTest < ActiveSupport::TestCase assert_recurring_tasks configured_task another_task = { example_task: { class: "AddToBufferJob", schedule: "every hour", args: [ 42 ] } } - dispatcher1 = SolidQueue::Dispatcher.new(concurrency_maintenance: false, recurring_tasks: another_task).tap(&:start) - wait_for_registered_processes(5, timeout: 1.second) + scheduler1 = SolidQueue::Scheduler.new(recurring_tasks: another_task).tap(&:start) + wait_for_registered_processes(6, timeout: 1.second) assert_recurring_tasks configured_task.merge(another_task) updated_task = { example_task: { class: "AddToBufferJob", schedule: "every minute" } } - dispatcher2 = SolidQueue::Dispatcher.new(concurrency_maintenance: false, recurring_tasks: updated_task).tap(&:start) - wait_for_registered_processes(6, timeout: 1.second) + scheduler2 = SolidQueue::Scheduler.new(recurring_tasks: updated_task).tap(&:start) + wait_for_registered_processes(7, timeout: 1.second) assert_recurring_tasks configured_task.merge(updated_task) terminate_process(@pid) - dispatcher1.stop - dispatcher2.stop + scheduler1.stop + scheduler2.stop end private diff --git a/test/test_helpers/processes_test_helper.rb b/test/test_helpers/processes_test_helper.rb index d9eb38b3..729216bd 100644 --- a/test/test_helpers/processes_test_helper.rb +++ b/test/test_helpers/processes_test_helper.rb @@ -3,7 +3,7 @@ module ProcessesTestHelper def run_supervisor_as_fork(**options) fork do - SolidQueue::Supervisor.start(**options) + SolidQueue::Supervisor.start(**options.with_defaults(skip_recurring: true)) end end @@ -38,6 +38,12 @@ def assert_registered_processes(kind:, count: 1, supervisor_pid: nil, **attribut end end + def assert_metadata(process, metadata) + metadata.each do |attr, value| + assert_equal value, process.metadata[attr.to_s] + end + end + def find_processes_registered_as(kind) skip_active_record_query_cache do SolidQueue::Process.where(kind: kind) diff --git a/test/unit/configuration_test.rb b/test/unit/configuration_test.rb index df6dda7a..556a4930 100644 --- a/test/unit/configuration_test.rb +++ b/test/unit/configuration_test.rb @@ -4,13 +4,14 @@ class ConfigurationTest < ActiveSupport::TestCase test "default configuration to process all queues and dispatch" do configuration = SolidQueue::Configuration.new(config_file: nil) - assert_equal 2, configuration.configured_processes.count + assert_equal 3, configuration.configured_processes.count assert_processes configuration, :worker, 1, queues: "*" assert_processes configuration, :dispatcher, 1, batch_size: SolidQueue::Configuration::DISPATCHER_DEFAULTS[:batch_size] + assert_processes configuration, :scheduler, 1 end test "default configuration when config given doesn't include any configuration" do - configuration = SolidQueue::Configuration.new(config_file: config_file_path(:invalid_configuration)) + configuration = SolidQueue::Configuration.new(config_file: config_file_path(:invalid_configuration), skip_recurring: true) assert_equal 2, configuration.configured_processes.count assert_processes configuration, :worker, 1, queues: "*" @@ -18,7 +19,7 @@ class ConfigurationTest < ActiveSupport::TestCase end test "default configuration when config given is empty" do - configuration = SolidQueue::Configuration.new(config_file: config_file_path(:empty_configuration)) + configuration = SolidQueue::Configuration.new(config_file: config_file_path(:empty_configuration), recurring_schedule_file: config_file_path(:empty_configuration)) assert_equal 2, configuration.configured_processes.count assert_processes configuration, :worker, 1, queues: "*" @@ -69,30 +70,19 @@ class ConfigurationTest < ActiveSupport::TestCase configuration = SolidQueue::Configuration.new(dispatchers: [ { polling_interval: 0.1 } ]) assert_processes configuration, :dispatcher, 1, polling_interval: 0.1 + assert_processes configuration, :scheduler, 1 - dispatcher = configuration.configured_processes.first.instantiate - assert_has_recurring_task dispatcher, key: "periodic_store_result", class_name: "StoreResultJob", schedule: "every second" + scheduler = configuration.configured_processes.second.instantiate + assert_has_recurring_task scheduler, key: "periodic_store_result", class_name: "StoreResultJob", schedule: "every second" end - test "recurring tasks configuration with no dispatchers uses a default dispatcher" do + test "recurring tasks configuration adds a scheduler" do configuration = SolidQueue::Configuration.new(dispatchers: []) - assert_processes configuration, :dispatcher, 1, polling_interval: 1 - - dispatcher = configuration.configured_processes.first.instantiate - assert_has_recurring_task dispatcher, key: "periodic_store_result", class_name: "StoreResultJob", schedule: "every second" - end - - test "recurring tasks configuration with multiple dispatchers uses the least busy one" do - configuration = SolidQueue::Configuration.new(dispatchers: [ { polling_interval: 0.1 }, { polling_interval: 0.4 }, { polling_interval: 0.2 } ]) - - assert_processes configuration, :dispatcher, 3, polling_interval: [ 0.1, 0.2, 0.4 ] # sorted by polling interval - - dispatcher = configuration.configured_processes.last.instantiate - assert_has_recurring_task dispatcher, key: "periodic_store_result", class_name: "StoreResultJob", schedule: "every second" + assert_processes configuration, :scheduler, 1 - dispatchers_without_recurring_tasks = configuration.configured_processes.first(2) - assert_nil dispatchers_without_recurring_tasks.map { |d| d.attributes[:recurring_tasks] }.uniq.first + scheduler = configuration.configured_processes.first.instantiate + assert_has_recurring_task scheduler, key: "periodic_store_result", class_name: "StoreResultJob", schedule: "every second" end test "no recurring tasks configuration when explicitly excluded" do @@ -115,9 +105,9 @@ def assert_processes(configuration, kind, count, **attributes) end end - def assert_has_recurring_task(dispatcher, key:, **attributes) - assert_equal 1, dispatcher.recurring_schedule.configured_tasks.count - task = dispatcher.recurring_schedule.configured_tasks.detect { |t| t.key == key } + def assert_has_recurring_task(scheduler, key:, **attributes) + assert_equal 1, scheduler.recurring_schedule.configured_tasks.count + task = scheduler.recurring_schedule.configured_tasks.detect { |t| t.key == key } attributes.each do |attr, value| assert_equal_value value, task.public_send(attr) diff --git a/test/unit/dispatcher_test.rb b/test/unit/dispatcher_test.rb index 6332fa98..42d57c92 100644 --- a/test/unit/dispatcher_test.rb +++ b/test/unit/dispatcher_test.rb @@ -36,22 +36,6 @@ class DispatcherTest < ActiveSupport::TestCase no_concurrency_maintenance_dispatcher.stop end - test "recurring schedule" do - recurring_task = { example_task: { class: "AddToBufferJob", schedule: "every hour", args: 42 } } - with_recurring_schedule = SolidQueue::Dispatcher.new(concurrency_maintenance: false, recurring_tasks: recurring_task) - - with_recurring_schedule.start - - wait_for_registered_processes(1, timeout: 1.second) - - process = SolidQueue::Process.first - assert_equal "Dispatcher", process.kind - - assert_metadata process, recurring_schedule: [ "example_task" ] - ensure - with_recurring_schedule.stop - end - test "polling queries are logged" do log = StringIO.new with_active_record_logger(ActiveSupport::Logger.new(log)) do @@ -89,7 +73,7 @@ class DispatcherTest < ActiveSupport::TestCase assert_no_registered_processes end - test "run more than one instance of the dispatcher without recurring tasks" do + test "run more than one instance of the dispatcher" do 15.times do AddToBufferJob.set(wait: 0.2).perform_later("I'm scheduled") end @@ -108,23 +92,6 @@ class DispatcherTest < ActiveSupport::TestCase another_dispatcher&.stop end - test "run more than one instance of the dispatcher with recurring tasks" do - recurring_task = { example_task: { class: "AddToBufferJob", schedule: "every second", args: 42 } } - dispatchers = 2.times.collect do - SolidQueue::Dispatcher.new(concurrency_maintenance: false, recurring_tasks: recurring_task) - end - - dispatchers.each(&:start) - sleep 2 - dispatchers.each(&:stop) - - assert_equal SolidQueue::Job.count, SolidQueue::RecurringExecution.count - run_at_times = SolidQueue::RecurringExecution.all.map(&:run_at).sort - 0.upto(run_at_times.length - 2) do |i| - assert_equal 1, run_at_times[i + 1] - run_at_times[i] - end - end - private def with_polling(silence:) old_silence_polling, SolidQueue.silence_polling = SolidQueue.silence_polling, silence @@ -139,10 +106,4 @@ def with_active_record_logger(logger) ensure ActiveRecord::Base.logger = old_logger end - - def assert_metadata(process, metadata) - metadata.each do |attr, value| - assert_equal value, process.metadata[attr.to_s] - end - end end diff --git a/test/unit/log_subscriber_test.rb b/test/unit/log_subscriber_test.rb index fa79ad38..cafae676 100644 --- a/test/unit/log_subscriber_test.rb +++ b/test/unit/log_subscriber_test.rb @@ -57,7 +57,7 @@ def set_logger(logger) attach_log_subscriber instrument "deregister_process.solid_queue", process: process, pruned: false, claimed_size: 0 - assert_match_logged :info, "Deregister Worker", "process_id: #{process.id}, pid: 42, hostname: \"localhost\", name: \"worker-123\", last_heartbeat_at: \"#{last_heartbeat_at}\", claimed_size: 0, pruned: false" + assert_match_logged :debug, "Deregister Worker", "process_id: #{process.id}, pid: 42, hostname: \"localhost\", name: \"worker-123\", last_heartbeat_at: \"#{last_heartbeat_at}\", claimed_size: 0, pruned: false" end private diff --git a/test/unit/scheduler_test.rb b/test/unit/scheduler_test.rb new file mode 100644 index 00000000..817d168f --- /dev/null +++ b/test/unit/scheduler_test.rb @@ -0,0 +1,36 @@ +require "test_helper" + +class SchedulerTest < ActiveSupport::TestCase + self.use_transactional_tests = false + + test "recurring schedule" do + recurring_tasks = { example_task: { class: "AddToBufferJob", schedule: "every hour", args: 42 } } + scheduler = SolidQueue::Scheduler.new(recurring_tasks: recurring_tasks).tap(&:start) + + wait_for_registered_processes(1, timeout: 1.second) + + process = SolidQueue::Process.first + assert_equal "Scheduler", process.kind + + assert_metadata process, recurring_schedule: [ "example_task" ] + ensure + scheduler.stop + end + + test "run more than one instance of the dispatcher with recurring tasks" do + recurring_tasks = { example_task: { class: "AddToBufferJob", schedule: "every second", args: 42 } } + schedulers = 2.times.collect do + SolidQueue::Scheduler.new(recurring_tasks: recurring_tasks) + end + + schedulers.each(&:start) + sleep 2 + schedulers.each(&:stop) + + assert_equal SolidQueue::Job.count, SolidQueue::RecurringExecution.count + run_at_times = SolidQueue::RecurringExecution.all.map(&:run_at).sort + 0.upto(run_at_times.length - 2) do |i| + assert_equal 1, run_at_times[i + 1] - run_at_times[i] + end + end +end diff --git a/test/unit/worker_test.rb b/test/unit/worker_test.rb index 36f67bb0..786c2b8f 100644 --- a/test/unit/worker_test.rb +++ b/test/unit/worker_test.rb @@ -147,10 +147,4 @@ def with_active_record_logger(logger) ensure ActiveRecord::Base.logger = old_logger end - - def assert_metadata(process, metadata) - metadata.each do |attr, value| - assert_equal value, process.metadata[attr.to_s] - end - end end From 5d7c94962b1611f8c34e9f1c868d196f4ac49942 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Mon, 9 Sep 2024 14:21:31 +0200 Subject: [PATCH 05/11] Fix enqueuing recurring task with job outside Solid Queue Calling `set` on the job class returns a `ConfiguredJob`, and calling `perform_later` on a `ConfiguredJob` doesn't yield the job to a block passed as parameter, so we can't check if the enqueuing was successful or not. To work around this, the only way is to instantiate the job ourselves with the arguments, then call `enqueue` on that, and rely on the result, which would be what `job_class.perform_later` would yield to the block. --- app/models/solid_queue/recurring_task.rb | 8 +++++--- test/integration/instrumentation_test.rb | 25 ++++++++++++------------ 2 files changed, 18 insertions(+), 15 deletions(-) diff --git a/app/models/solid_queue/recurring_task.rb b/app/models/solid_queue/recurring_task.rb index 7a1a84c7..997389bb 100644 --- a/app/models/solid_queue/recurring_task.rb +++ b/app/models/solid_queue/recurring_task.rb @@ -55,7 +55,7 @@ def enqueue(at:) else payload[:other_adapter] = true - perform_later do |job| + perform_later.tap do |job| unless job.successfully_enqueued? payload[:enqueue_error] = job.enqueue_error&.message end @@ -106,8 +106,10 @@ def enqueue_and_record(run_at:) end end - def perform_later(&block) - job_class.set(enqueue_options).perform_later(*arguments_with_kwargs, &block) + def perform_later + job_class.new(*arguments_with_kwargs).tap do |active_job| + active_job.enqueue(enqueue_options) + end end def arguments_with_kwargs diff --git a/test/integration/instrumentation_test.rb b/test/integration/instrumentation_test.rb index c997df55..c90d161a 100644 --- a/test/integration/instrumentation_test.rb +++ b/test/integration/instrumentation_test.rb @@ -305,12 +305,12 @@ class InstrumentationTest < ActiveSupport::TestCase test "enqueuing recurring task emits enqueue_recurring_task event" do recurring_task = { example_task: { class: "AddToBufferJob", schedule: "every second", args: 42 } } - dispatcher = SolidQueue::Dispatcher.new(concurrency_maintenance: false, recurring_tasks: recurring_task) + scheduler = SolidQueue::Scheduler.new(recurring_tasks: recurring_task) events = subscribed("enqueue_recurring_task.solid_queue") do - dispatcher.start + scheduler.start sleep 1.01 - dispatcher.stop + scheduler.stop end assert events.size >= 1 @@ -323,12 +323,12 @@ class InstrumentationTest < ActiveSupport::TestCase test "skipping a recurring task is reflected in the enqueue_recurring_task event" do recurring_task = { example_task: { class: "AddToBufferJob", schedule: "every second", args: 42 } } - dispatchers = 2.times.collect { SolidQueue::Dispatcher.new(concurrency_maintenance: false, recurring_tasks: recurring_task) } + schedulers = 2.times.collect { SolidQueue::Scheduler.new(recurring_tasks: recurring_task) } events = subscribed("enqueue_recurring_task.solid_queue") do - dispatchers.each(&:start) + schedulers.each(&:start) sleep 1.01 - dispatchers.each(&:stop) + schedulers.each(&:stop) end assert events.size >= 2 @@ -349,12 +349,12 @@ class InstrumentationTest < ActiveSupport::TestCase recurring_task = { example_task: { class: "AddToBufferJob", schedule: "every second", args: 42 } } SolidQueue::Job.stubs(:create!).raises(ActiveRecord::Deadlocked) - dispatcher = SolidQueue::Dispatcher.new(concurrency_maintenance: false, recurring_tasks: recurring_task) + scheduler = SolidQueue::Scheduler.new(recurring_tasks: recurring_task) events = subscribed("enqueue_recurring_task.solid_queue") do - dispatcher.start + scheduler.start sleep(1.01) - dispatcher.stop + scheduler.stop end assert events.size >= 1 @@ -368,14 +368,15 @@ class InstrumentationTest < ActiveSupport::TestCase test "an error enqueuing a recurring task with another adapter is reflected in the enqueue_recurring_task event" do AddToBufferJob.queue_adapter = :async ActiveJob::QueueAdapters::AsyncAdapter.any_instance.stubs(:enqueue).raises(ActiveJob::EnqueueError.new("All is broken")) + recurring_task = { example_task: { class: "AddToBufferJob", schedule: "every second", args: 42 } } - dispatcher = SolidQueue::Dispatcher.new(concurrency_maintenance: false, recurring_tasks: recurring_task) + scheduler = SolidQueue::Scheduler.new(recurring_tasks: recurring_task) events = subscribed("enqueue_recurring_task.solid_queue") do - dispatcher.start + scheduler.start sleep(1.01) - dispatcher.stop + scheduler.stop end assert events.size >= 1 From de2eca89f3fa07d81d1cad899b3871d0820e9608 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Mon, 9 Sep 2024 14:59:24 +0200 Subject: [PATCH 06/11] Adjust scheduler's interval value and puma test --- lib/solid_queue/processes/runnable.rb | 2 +- lib/solid_queue/processes/supervised.rb | 1 - lib/solid_queue/scheduler.rb | 2 +- test/integration/puma/plugin_test.rb | 4 ++-- 4 files changed, 4 insertions(+), 5 deletions(-) diff --git a/lib/solid_queue/processes/runnable.rb b/lib/solid_queue/processes/runnable.rb index 37618ffc..d66ebb2b 100644 --- a/lib/solid_queue/processes/runnable.rb +++ b/lib/solid_queue/processes/runnable.rb @@ -18,7 +18,7 @@ def start def stop @stopped = true - wake_up if running_async? + wake_up @thread&.join end diff --git a/lib/solid_queue/processes/supervised.rb b/lib/solid_queue/processes/supervised.rb index 29b7f40e..73f41e1d 100644 --- a/lib/solid_queue/processes/supervised.rb +++ b/lib/solid_queue/processes/supervised.rb @@ -29,7 +29,6 @@ def register_signal_handlers %w[ INT TERM ].each do |signal| trap(signal) do stop - interrupt end end diff --git a/lib/solid_queue/scheduler.rb b/lib/solid_queue/scheduler.rb index 98403496..da83bbd3 100644 --- a/lib/solid_queue/scheduler.rb +++ b/lib/solid_queue/scheduler.rb @@ -20,7 +20,7 @@ def metadata end private - SLEEP_INTERVAL = 300 # Right now it doesn't matter, can be set to 1 in the future for dynamic tasks + SLEEP_INTERVAL = 60 # Right now it doesn't matter, can be set to 1 in the future for dynamic tasks def run loop do diff --git a/test/integration/puma/plugin_test.rb b/test/integration/puma/plugin_test.rb index dafe366f..bac98a2b 100644 --- a/test/integration/puma/plugin_test.rb +++ b/test/integration/puma/plugin_test.rb @@ -19,7 +19,7 @@ class PluginTest < ActiveSupport::TestCase exec(*cmd) end end - wait_for_registered_processes(4, timeout: 3.second) + wait_for_registered_processes 5, timeout: 3.second end teardown do @@ -38,7 +38,7 @@ class PluginTest < ActiveSupport::TestCase signal_process(@pid, :SIGUSR2) # Ensure the restart finishes before we try to continue with the test wait_for_registered_processes(0, timeout: 3.second) - wait_for_registered_processes(4, timeout: 3.second) + wait_for_registered_processes(5, timeout: 3.second) StoreResultJob.perform_later(:puma_plugin) wait_for_jobs_to_finish_for(2.seconds) From 3c94a36c7c6a6b5e610f59fcf8481b24fcef6785 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Mon, 9 Sep 2024 15:30:18 +0200 Subject: [PATCH 07/11] Add `rake test` task to run tests over the 3 DBs we support --- Rakefile | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/Rakefile b/Rakefile index e7793b5c..951e3527 100644 --- a/Rakefile +++ b/Rakefile @@ -1,3 +1,5 @@ +# frozen_string_literal: true + require "bundler/setup" APP_RAKEFILE = File.expand_path("test/dummy/Rakefile", __dir__) @@ -6,3 +8,14 @@ load "rails/tasks/engine.rake" load "rails/tasks/statistics.rake" require "bundler/gem_tasks" + +def databases + %w[ mysql postgres sqlite ] +end + +task :test do + databases.each do |database| + sh("TARGET_DB=#{database} bin/setup") + sh("TARGET_DB=#{database} bin/rails test") + end +end From 8d47ed40a07bee7c090c2563b8225d22db14e851 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Mon, 9 Sep 2024 15:49:45 +0200 Subject: [PATCH 08/11] Use `queue` to specify the queue for a recurring task in configuration To match worker's `queues` configuration. --- app/models/solid_queue/recurring_task.rb | 2 +- test/dummy/config/recurring.yml | 1 + test/models/solid_queue/recurring_task_test.rb | 6 +++--- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/app/models/solid_queue/recurring_task.rb b/app/models/solid_queue/recurring_task.rb index 997389bb..596fe729 100644 --- a/app/models/solid_queue/recurring_task.rb +++ b/app/models/solid_queue/recurring_task.rb @@ -22,7 +22,7 @@ def from_configuration(key, **options) class_name: options[:class], arguments: options[:args], schedule: options[:schedule], - queue_name: options[:queue_name].presence, + queue_name: options[:queue].presence, priority: options[:priority].presence, description: options[:description], static: true diff --git a/test/dummy/config/recurring.yml b/test/dummy/config/recurring.yml index c2ce7437..9cbe895b 100644 --- a/test/dummy/config/recurring.yml +++ b/test/dummy/config/recurring.yml @@ -1,4 +1,5 @@ periodic_store_result: class: StoreResultJob + queue: default args: [ 42, { status: "custom_status" } ] schedule: every second diff --git a/test/models/solid_queue/recurring_task_test.rb b/test/models/solid_queue/recurring_task_test.rb index e0676912..5b7d5c3e 100644 --- a/test/models/solid_queue/recurring_task_test.rb +++ b/test/models/solid_queue/recurring_task_test.rb @@ -130,7 +130,7 @@ def perform end test "task with custom queue and priority" do - task = recurring_task_with(class_name: "JobWithoutArguments", queue_name: "my_new_queue", priority: 4) + task = recurring_task_with(class_name: "JobWithoutArguments", queue: "my_new_queue", priority: 4) enqueue_and_assert_performed_with_result task, "job_without_arguments" job = SolidQueue::Job.last @@ -170,7 +170,7 @@ def enqueue_and_assert_performed_with_result(task, result) assert_equal result, JobBuffer.last_value end - def recurring_task_with(class_name:, schedule: "every hour", args: nil, **options) - SolidQueue::RecurringTask.new(key: "task-id", class_name: "SolidQueue::RecurringTaskTest::#{class_name}", schedule: schedule, arguments: args, **options) + def recurring_task_with(class_name:, **options) + SolidQueue::RecurringTask.from_configuration("task-id", class: "SolidQueue::RecurringTaskTest::#{class_name}", **options.with_defaults(schedule: "every hour")) end end From 90f63c8a776a58c276ab34fd5b904e20051f550a Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Mon, 9 Sep 2024 16:16:57 +0200 Subject: [PATCH 09/11] Allow specifying recurring tasks just with a "command" No need to have a class, can specify this using just a command for which we'd provide a default RecurringtJob class that will just eval the command. --- app/jobs/solid_queue/recurring_job.rb | 9 ++++++ app/models/solid_queue/recurring_task.rb | 19 ++++++++++-- .../models/solid_queue/recurring_task_test.rb | 29 +++++++++++++++---- test/unit/scheduler_test.rb | 2 +- 4 files changed, 50 insertions(+), 9 deletions(-) create mode 100644 app/jobs/solid_queue/recurring_job.rb diff --git a/app/jobs/solid_queue/recurring_job.rb b/app/jobs/solid_queue/recurring_job.rb new file mode 100644 index 00000000..6ccd4954 --- /dev/null +++ b/app/jobs/solid_queue/recurring_job.rb @@ -0,0 +1,9 @@ +# frozen_string_literal: true + +class SolidQueue::RecurringJob < ActiveJob::Base + queue_as :solid_queue_recurring + + def perform(command) + eval(command) + end +end diff --git a/app/models/solid_queue/recurring_task.rb b/app/models/solid_queue/recurring_task.rb index 596fe729..d1016991 100644 --- a/app/models/solid_queue/recurring_task.rb +++ b/app/models/solid_queue/recurring_task.rb @@ -7,10 +7,14 @@ class RecurringTask < Record serialize :arguments, coder: Arguments, default: [] validate :supported_schedule + validate :ensure_command_or_class_present validate :existing_job_class scope :static, -> { where(static: true) } + mattr_accessor :default_job_class + self.default_job_class = RecurringJob + class << self def wrap(args) args.is_a?(self) ? args : from_configuration(args.first, **args.second) @@ -20,6 +24,7 @@ def from_configuration(key, **options) new \ key: key, class_name: options[:class], + command: options[:command], arguments: options[:args], schedule: options[:schedule], queue_name: options[:queue].presence, @@ -85,8 +90,14 @@ def supported_schedule end end + def ensure_command_or_class_present + unless command.present? || class_name.present? + errors.add :base, :command_and_class_blank, message: "either command or class_name must be present" + end + end + def existing_job_class - unless job_class.present? + if class_name.present? && job_class.nil? errors.add :class_name, :undefined, message: "doesn't correspond to an existing class" end end @@ -113,7 +124,9 @@ def perform_later end def arguments_with_kwargs - if arguments.last.is_a?(Hash) + if class_name.nil? + command + elsif arguments.last.is_a?(Hash) arguments[0...-1] + [ Hash.ruby2_keywords_hash(arguments.last) ] else arguments @@ -126,7 +139,7 @@ def parsed_schedule end def job_class - @job_class ||= class_name&.safe_constantize + @job_class ||= class_name.present? ? class_name.safe_constantize : self.class.default_job_class end def enqueue_options diff --git a/test/models/solid_queue/recurring_task_test.rb b/test/models/solid_queue/recurring_task_test.rb index 5b7d5c3e..2f65cb01 100644 --- a/test/models/solid_queue/recurring_task_test.rb +++ b/test/models/solid_queue/recurring_task_test.rb @@ -122,11 +122,17 @@ def perform assert_not SolidQueue::RecurringTask.new(key: "task-id", class_name: "SolidQueue::RecurringTaskTest::JobWithoutArguments").valid? end - test "undefined job class" do + test "valid and invalid job class and command" do + # Command + assert recurring_task_with(command: "puts '¡hola!'").valid? + # Class + assert recurring_task_with(class_name: "JobWithPriority").valid? + + # Invalid class name assert_not recurring_task_with(class_name: "UnknownJob").valid? - # Empty class name - assert_not SolidQueue::RecurringTask.new(key: "task-id", schedule: "every minute").valid? + # Empty class name and command + assert_not recurring_task_with(key: "task-id", schedule: "every minute").valid? end test "task with custom queue and priority" do @@ -154,6 +160,13 @@ def perform assert_equal 4, job.priority end + test "task configured with a command" do + task = recurring_task_with(command: "JobBuffer.add('from_a_command')") + enqueue_and_assert_performed_with_result(task, "from_a_command") + + assert_equal "SolidQueue::RecurringJob", SolidQueue::Job.last.class_name + end + private def enqueue_and_assert_performed_with_result(task, result) assert_difference [ -> { SolidQueue::Job.count }, -> { SolidQueue::ReadyExecution.count } ], +1 do @@ -170,7 +183,13 @@ def enqueue_and_assert_performed_with_result(task, result) assert_equal result, JobBuffer.last_value end - def recurring_task_with(class_name:, **options) - SolidQueue::RecurringTask.from_configuration("task-id", class: "SolidQueue::RecurringTaskTest::#{class_name}", **options.with_defaults(schedule: "every hour")) + def recurring_task_with(class_name: nil, **options) + options = options.dup.with_defaults(schedule: "every hour") + + if class_name.present? + options[:class] = "SolidQueue::RecurringTaskTest::#{class_name}" + end + + SolidQueue::RecurringTask.from_configuration("task-id", **options) end end diff --git a/test/unit/scheduler_test.rb b/test/unit/scheduler_test.rb index 817d168f..9478b9f1 100644 --- a/test/unit/scheduler_test.rb +++ b/test/unit/scheduler_test.rb @@ -17,7 +17,7 @@ class SchedulerTest < ActiveSupport::TestCase scheduler.stop end - test "run more than one instance of the dispatcher with recurring tasks" do + test "run more than one instance of the scheduler with recurring tasks" do recurring_tasks = { example_task: { class: "AddToBufferJob", schedule: "every second", args: 42 } } schedulers = 2.times.collect do SolidQueue::Scheduler.new(recurring_tasks: recurring_tasks) From 8df600df2d22261cdce3e51e103c1cf58d4da391 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Mon, 9 Sep 2024 20:33:29 +0200 Subject: [PATCH 10/11] Update README and UPGRADING with new instructions about recurring tasks --- README.md | 82 +++++++++++++++++++++++++++++---------------- UPGRADING.md | 5 +++ solid_queue.gemspec | 8 +++-- 3 files changed, 64 insertions(+), 31 deletions(-) diff --git a/README.md b/README.md index 07d0a924..256a7526 100644 --- a/README.md +++ b/README.md @@ -2,9 +2,9 @@ Solid Queue is a DB-based queuing backend for [Active Job](https://edgeguides.rubyonrails.org/active_job_basics.html), designed with simplicity and performance in mind. -Besides regular job enqueuing and processing, Solid Queue supports delayed jobs, concurrency controls, pausing queues, numeric priorities per job, priorities by queue order, and bulk enqueuing (`enqueue_all` for Active Job's `perform_all_later`). +Besides regular job enqueuing and processing, Solid Queue supports delayed jobs, concurrency controls, recurring jobs, pausing queues, numeric priorities per job, priorities by queue order, and bulk enqueuing (`enqueue_all` for Active Job's `perform_all_later`). -Solid Queue can be used with SQL databases such as MySQL, PostgreSQL or SQLite, and it leverages the `FOR UPDATE SKIP LOCKED` clause, if available, to avoid blocking and waiting on locks when polling jobs. It relies on Active Job for retries, discarding, error handling, serialization, or delays, and it's compatible with Ruby on Rails multi-threading. +Solid Queue can be used with SQL databases such as MySQL, PostgreSQL or SQLite, and it leverages the `FOR UPDATE SKIP LOCKED` clause, if available, to avoid blocking and waiting on locks when polling jobs. It relies on Active Job for retries, discarding, error handling, serialization, or delays, and it's compatible with Ruby on Rails's multi-threading. ## Installation @@ -13,9 +13,9 @@ Solid Queue is configured by default in new Rails 8 applications. But if you're 1. `bundle add solid_queue` 2. `bin/rails solid_queue:install` -This will configure Solid Queue as the production Active Job backend, create `config/solid_queue.yml`, and create the `db/queue_schema.rb`. +This will configure Solid Queue as the production Active Job backend, create the configuration files `config/solid_queue.yml` and `config/recurring.yml`, and create the `db/queue_schema.rb`. It'll also create a `bin/jobs` executable wrapper that you can use to start Solid Queue. -You will then have to add the configuration for the queue database in `config/database.yml`. If you're using sqlite, it'll look like this: +Once you've done that, you will then have to add the configuration for the queue database in `config/database.yml`. If you're using sqlite, it'll look like this: ```yaml production: @@ -55,7 +55,7 @@ For small projects, you can run Solid Queue on the same machine as your webserve It's also possibile to use one single database for both production data: -1. Shovel `db/queue_schema.rb` into a normal migration and delete `db/queue_schema.rb` +1. Copy the contents of `db/queue_schema.rb` into a normal migration and delete `db/queue_schema.rb` 2. Remove `config.solid_queue.connects_to` from `production.rb` 3. Migrate your database. You are ready to run `bin/jobs` @@ -73,22 +73,31 @@ class MyJob < ApplicationJob # ... end ``` + ## High performance requirements Solid Queue was designed for the highest throughput when used with MySQL 8+ or PostgreSQL 9.5+, as they support `FOR UPDATE SKIP LOCKED`. You can use it with older versions, but in that case, you might run into lock waits if you run multiple workers for the same queue. You can also use it with SQLite on smaller applications. ## Configuration -### Workers and dispatchers +### Workers, dispatchers and scheduler -We have three types of actors in Solid Queue: +We have several types of actors in Solid Queue: - _Workers_ are in charge of picking jobs ready to run from queues and processing them. They work off the `solid_queue_ready_executions` table. -- _Dispatchers_ are in charge of selecting jobs scheduled to run in the future that are due and _dispatching_ them, which is simply moving them from the `solid_queue_scheduled_executions` table over to the `solid_queue_ready_executions` table so that workers can pick them up. They're also in charge of managing [recurring tasks](#recurring-tasks), dispatching jobs to process them according to their schedule. On top of that, they do some maintenance work related to [concurrency controls](#concurrency-controls). +- _Dispatchers_ are in charge of selecting jobs scheduled to run in the future that are due and _dispatching_ them, which is simply moving them from the `solid_queue_scheduled_executions` table over to the `solid_queue_ready_executions` table so that workers can pick them up. On top of that, they do some maintenance work related to [concurrency controls](#concurrency-controls). +- The _scheduler_ manages [recurring tasks](#recurring-tasks), enqueuing jobs for them when they're due. - The _supervisor_ runs workers and dispatchers according to the configuration, controls their heartbeats, and stops and starts them when needed. -Solid Queue's supervisor will fork a separate process for each supervised worker/dispatcher. +Solid Queue's supervisor will fork a separate process for each supervised worker/dispatcher/scheduler. + +By default, Solid Queue will try to find your configuration under `config/solid_queue.yml`, but you can set a different path using the environment variable `SOLID_QUEUE_CONFIG` or by using the `-c/--config_file` option with `bin/jobs`, like this: + +``` +bin/jobs -c config/calendar.yml +``` + -By default, Solid Queue will try to find your configuration under `config/solid_queue.yml`, but you can set a different path using the environment variable `SOLID_QUEUE_CONFIG`. This is what this configuration looks like: +This is what this configuration looks like: ```yml production: @@ -117,6 +126,7 @@ production: ``` the supervisor will run 1 dispatcher and no workers. + Here's an overview of the different options: - `polling_interval`: the time interval in seconds that workers and dispatchers will wait before checking for more jobs. This time defaults to `1` second for dispatchers and `0.1` seconds for workers. @@ -139,7 +149,7 @@ Here's an overview of the different options: - `threads`: this is the max size of the thread pool that each worker will have to run jobs. Each worker will fetch this number of jobs from their queue(s), at most and will post them to the thread pool to be run. By default, this is `3`. Only workers have this setting. - `processes`: this is the number of worker processes that will be forked by the supervisor with the settings given. By default, this is `1`, just a single process. This setting is useful if you want to dedicate more than one CPU core to a queue or queues with the same configuration. Only workers have this setting. - `concurrency_maintenance`: whether the dispatcher will perform the concurrency maintenance work. This is `true` by default, and it's useful if you don't use any [concurrency controls](#concurrency-controls) and want to disable it or if you run multiple dispatchers and want some of them to just dispatch jobs without doing anything else. -- `recurring_tasks`: a list of recurring tasks the dispatcher will manage. Read more details about this one in the [Recurring tasks](#recurring-tasks) section. + ### Queue order and priorities @@ -305,27 +315,42 @@ to your `puma.rb` configuration. ## Recurring tasks -Solid Queue supports defining recurring tasks that run at specific times in the future, on a regular basis like cron jobs. These are managed by dispatcher processes and as such, they can be defined in the dispatcher's configuration like this: +Solid Queue supports defining recurring tasks that run at specific times in the future, on a regular basis like cron jobs. These are managed by the scheduler process and are defined in their own configuration file. By default, the file is located in `config/recurring.yml`, but you can set a different path using the environment variable `SOLID_QUEUE_RECURRING_SCHEDULE` or by using the `--recurring_schedule_file` option with `bin/jobs`, like this: + +``` +bin/jobs --recurring_schedule_file=config/schedule.yml +``` + +The configuration itself looks like this: + ```yml - dispatchers: - - polling_interval: 1 - batch_size: 500 - recurring_tasks: - my_periodic_job: - class: MyJob - args: [ 42, { status: "custom_status" } ] - schedule: every second +a_periodic_job: + class: MyJob + args: [ 42, { status: "custom_status" } ] + schedule: every second +a_cleanup_task: + command: "DeletedStuff.clear_all" + schedule: every day at 9am ``` -`recurring_tasks` is a hash/dictionary, and the key will be the task key internally. Each task needs to have a class, which will be the job class to enqueue, and a schedule. The schedule is parsed using [Fugit](https://github.com/floraison/fugit), so it accepts anything [that Fugit accepts as a cron](https://github.com/floraison/fugit?tab=readme-ov-file#fugitcron). You can also provide arguments to be passed to the job, as a single argument, a hash, or an array of arguments that can also include kwargs as the last element in the array. + +Tasks are specified as a hash/dictionary, where the key will be the task's key internally. Each task needs to either have a `class`, which will be the job class to enqueue, or a `command`, which will be eval'ed in the context of a job (`SolidQueue::RecurringJob`) that will be enqueued according to its schedule, in the `solid_queue_recurring` queue. + +Each task needs to have also a schedule, which is parsed using [Fugit](https://github.com/floraison/fugit), so it accepts anything [that Fugit accepts as a cron](https://github.com/floraison/fugit?tab=readme-ov-file#fugitcron). You can optionally supply the following for each task: +- `args`: the arguments to be passed to the job, as a single argument, a hash, or an array of arguments that can also include kwargs as the last element in the array. The job in the example configuration above will be enqueued every second as: ```ruby MyJob.perform_later(42, status: "custom_status") ``` -Tasks are enqueued at their corresponding times by the dispatcher that owns them, and each task schedules the next one. This is pretty much [inspired by what GoodJob does](https://github.com/bensheldon/good_job/blob/994ecff5323bf0337e10464841128fda100750e6/lib/good_job/cron_manager.rb). +- `queue`: a different queue to be used when enqueuing the job. If none, the queue set up for the job class. -It's possible to run multiple dispatchers with the same `recurring_tasks` configuration. To avoid enqueuing duplicate tasks at the same time, an entry in a new `solid_queue_recurring_executions` table is created in the same transaction as the job is enqueued. This table has a unique index on `task_key` and `run_at`, ensuring only one entry per task per time will be created. This only works if you have `preserve_finished_jobs` set to `true` (the default), and the guarantee applies as long as you keep the jobs around. +- `priority`: a numeric priority value to be used when enqueuing the job. + + +Tasks are enqueued at their corresponding times by the scheduler, and each task schedules the next one. This is pretty much [inspired by what GoodJob does](https://github.com/bensheldon/good_job/blob/994ecff5323bf0337e10464841128fda100750e6/lib/good_job/cron_manager.rb). + +It's possible to run multiple schedulers with the same `recurring_tasks` configuration, for example, if you have multiple servers for redundancy, and you run the `scheduler` in more than one of them. To avoid enqueuing duplicate tasks at the same time, an entry in a new `solid_queue_recurring_executions` table is created in the same transaction as the job is enqueued. This table has a unique index on `task_key` and `run_at`, ensuring only one entry per task per time will be created. This only works if you have `preserve_finished_jobs` set to `true` (the default), and the guarantee applies as long as you keep the jobs around. Finally, it's possible to configure jobs that aren't handled by Solid Queue. That is, you can have a job like this in your app: ```ruby @@ -340,13 +365,12 @@ end You can still configure this in Solid Queue: ```yml - dispatchers: - - recurring_tasks: - my_periodic_resque_job: - class: MyResqueJob - args: 22 - schedule: "*/5 * * * *" +my_periodic_resque_job: + class: MyResqueJob + args: 22 + schedule: "*/5 * * * *" ``` + and the job will be enqueued via `perform_later` so it'll run in Resque. However, in this case we won't track any `solid_queue_recurring_execution` record for it and there won't be any guarantees that the job is enqueued only once each time. ## Inspiration diff --git a/UPGRADING.md b/UPGRADING.md index eedfcb27..bb3d7f60 100644 --- a/UPGRADING.md +++ b/UPGRADING.md @@ -1,3 +1,8 @@ +# Upgrading to version 0.9.x +This version changes how recurring tasks are configured. Before, they would be defined as part of the _dispatcher_ configuration. Now they've been upgraded to their own configuration file, and a dedicated process (the _scheduler_) to manage them. Check the _Recurring tasks_ section in the `README` to learn how to configure them in detail. + +In short, they live now in `config/recurring.yml` (by default) and follow the same format as before when they lived under `dispatchers > recurring_tasks`. + # Upgrading to version 0.8.x *IMPORTANT*: This version collapsed all migrations into a single `db/queue_schema.rb`, that will use a separate `queue` database. If you're upgrading from a version < 0.6.0, you need to upgrade to 0.6.0 first, ensure all migrations are up-to-date, and then upgrade further. diff --git a/solid_queue.gemspec b/solid_queue.gemspec index 953c781a..7faeabfa 100644 --- a/solid_queue.gemspec +++ b/solid_queue.gemspec @@ -11,11 +11,15 @@ Gem::Specification.new do |spec| spec.license = "MIT" spec.post_install_message = <<~MESSAGE - Upgrading to Solid Queue 0.8.0 from < 0.6.0? You need to upgrade to 0.6.0 first. Check https://github.com/rails/solid_queue/blob/main/UPGRADING.md - for upgrade instructions. + Upgrading to Solid Queue 0.9.0? There are some breaking changes about how recurring tasks are configured. + + Upgrading to Solid Queue 0.8.0 from < 0.6.0? You need to upgrade to 0.6.0 first. Upgrading to Solid Queue 0.4.x, 0.5.x, 0.6.x or 0.7.x? There are some breaking changes about how Solid Queue is started, configuration and new migrations. + + --> Check https://github.com/rails/solid_queue/blob/main/UPGRADING.md + for upgrade instructions. MESSAGE spec.metadata["homepage_uri"] = spec.homepage From 2e53a64c82abfa8996799263af3e6fd7214809e7 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Tue, 10 Sep 2024 18:44:40 +0200 Subject: [PATCH 11/11] Remove unused CLI argument --- lib/solid_queue/cli.rb | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/solid_queue/cli.rb b/lib/solid_queue/cli.rb index 050966da..806269eb 100644 --- a/lib/solid_queue/cli.rb +++ b/lib/solid_queue/cli.rb @@ -14,7 +14,6 @@ class Cli < Thor desc: "Path to recurring schedule definition", banner: "SOLID_QUEUE_RECURRING_SCHEDULE" - class_option :work_only, type: :boolean, default: false class_option :skip_recurring, type: :boolean, default: false def self.exit_on_failure?