Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revamp recurring tasks configuration and management #338

Merged
merged 11 commits into from
Sep 11, 2024
Merged
82 changes: 53 additions & 29 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

Solid Queue is a DB-based queuing backend for [Active Job](https://edgeguides.rubyonrails.org/active_job_basics.html), designed with simplicity and performance in mind.

Besides regular job enqueuing and processing, Solid Queue supports delayed jobs, concurrency controls, pausing queues, numeric priorities per job, priorities by queue order, and bulk enqueuing (`enqueue_all` for Active Job's `perform_all_later`).
Besides regular job enqueuing and processing, Solid Queue supports delayed jobs, concurrency controls, recurring jobs, pausing queues, numeric priorities per job, priorities by queue order, and bulk enqueuing (`enqueue_all` for Active Job's `perform_all_later`).

Solid Queue can be used with SQL databases such as MySQL, PostgreSQL or SQLite, and it leverages the `FOR UPDATE SKIP LOCKED` clause, if available, to avoid blocking and waiting on locks when polling jobs. It relies on Active Job for retries, discarding, error handling, serialization, or delays, and it's compatible with Ruby on Rails multi-threading.
Solid Queue can be used with SQL databases such as MySQL, PostgreSQL or SQLite, and it leverages the `FOR UPDATE SKIP LOCKED` clause, if available, to avoid blocking and waiting on locks when polling jobs. It relies on Active Job for retries, discarding, error handling, serialization, or delays, and it's compatible with Ruby on Rails's multi-threading.

## Installation

Expand All @@ -13,9 +13,9 @@ Solid Queue is configured by default in new Rails 8 applications. But if you're
1. `bundle add solid_queue`
2. `bin/rails solid_queue:install`

This will configure Solid Queue as the production Active Job backend, create `config/solid_queue.yml`, and create the `db/queue_schema.rb`.
This will configure Solid Queue as the production Active Job backend, create the configuration files `config/solid_queue.yml` and `config/recurring.yml`, and create the `db/queue_schema.rb`. It'll also create a `bin/jobs` executable wrapper that you can use to start Solid Queue.

You will then have to add the configuration for the queue database in `config/database.yml`. If you're using sqlite, it'll look like this:
Once you've done that, you will then have to add the configuration for the queue database in `config/database.yml`. If you're using sqlite, it'll look like this:

```yaml
production:
Expand Down Expand Up @@ -55,7 +55,7 @@ For small projects, you can run Solid Queue on the same machine as your webserve

It's also possibile to use one single database for both production data:

1. Shovel `db/queue_schema.rb` into a normal migration and delete `db/queue_schema.rb`
1. Copy the contents of `db/queue_schema.rb` into a normal migration and delete `db/queue_schema.rb`
2. Remove `config.solid_queue.connects_to` from `production.rb`
3. Migrate your database. You are ready to run `bin/jobs`

Expand All @@ -73,22 +73,31 @@ class MyJob < ApplicationJob
# ...
end
```

## High performance requirements

Solid Queue was designed for the highest throughput when used with MySQL 8+ or PostgreSQL 9.5+, as they support `FOR UPDATE SKIP LOCKED`. You can use it with older versions, but in that case, you might run into lock waits if you run multiple workers for the same queue. You can also use it with SQLite on smaller applications.

## Configuration

### Workers and dispatchers
### Workers, dispatchers and scheduler

We have three types of actors in Solid Queue:
We have several types of actors in Solid Queue:
- _Workers_ are in charge of picking jobs ready to run from queues and processing them. They work off the `solid_queue_ready_executions` table.
- _Dispatchers_ are in charge of selecting jobs scheduled to run in the future that are due and _dispatching_ them, which is simply moving them from the `solid_queue_scheduled_executions` table over to the `solid_queue_ready_executions` table so that workers can pick them up. They're also in charge of managing [recurring tasks](#recurring-tasks), dispatching jobs to process them according to their schedule. On top of that, they do some maintenance work related to [concurrency controls](#concurrency-controls).
- _Dispatchers_ are in charge of selecting jobs scheduled to run in the future that are due and _dispatching_ them, which is simply moving them from the `solid_queue_scheduled_executions` table over to the `solid_queue_ready_executions` table so that workers can pick them up. On top of that, they do some maintenance work related to [concurrency controls](#concurrency-controls).
- The _scheduler_ manages [recurring tasks](#recurring-tasks), enqueuing jobs for them when they're due.
- The _supervisor_ runs workers and dispatchers according to the configuration, controls their heartbeats, and stops and starts them when needed.

Solid Queue's supervisor will fork a separate process for each supervised worker/dispatcher.
Solid Queue's supervisor will fork a separate process for each supervised worker/dispatcher/scheduler.

By default, Solid Queue will try to find your configuration under `config/solid_queue.yml`, but you can set a different path using the environment variable `SOLID_QUEUE_CONFIG` or by using the `-c/--config_file` option with `bin/jobs`, like this:

```
bin/jobs -c config/calendar.yml
```


By default, Solid Queue will try to find your configuration under `config/solid_queue.yml`, but you can set a different path using the environment variable `SOLID_QUEUE_CONFIG`. This is what this configuration looks like:
This is what this configuration looks like:

```yml
production:
Expand Down Expand Up @@ -117,6 +126,7 @@ production:
```
the supervisor will run 1 dispatcher and no workers.


Here's an overview of the different options:

- `polling_interval`: the time interval in seconds that workers and dispatchers will wait before checking for more jobs. This time defaults to `1` second for dispatchers and `0.1` seconds for workers.
Expand All @@ -139,7 +149,7 @@ Here's an overview of the different options:
- `threads`: this is the max size of the thread pool that each worker will have to run jobs. Each worker will fetch this number of jobs from their queue(s), at most and will post them to the thread pool to be run. By default, this is `3`. Only workers have this setting.
- `processes`: this is the number of worker processes that will be forked by the supervisor with the settings given. By default, this is `1`, just a single process. This setting is useful if you want to dedicate more than one CPU core to a queue or queues with the same configuration. Only workers have this setting.
- `concurrency_maintenance`: whether the dispatcher will perform the concurrency maintenance work. This is `true` by default, and it's useful if you don't use any [concurrency controls](#concurrency-controls) and want to disable it or if you run multiple dispatchers and want some of them to just dispatch jobs without doing anything else.
- `recurring_tasks`: a list of recurring tasks the dispatcher will manage. Read more details about this one in the [Recurring tasks](#recurring-tasks) section.


### Queue order and priorities

Expand Down Expand Up @@ -305,27 +315,42 @@ to your `puma.rb` configuration.

## Recurring tasks

Solid Queue supports defining recurring tasks that run at specific times in the future, on a regular basis like cron jobs. These are managed by dispatcher processes and as such, they can be defined in the dispatcher's configuration like this:
Solid Queue supports defining recurring tasks that run at specific times in the future, on a regular basis like cron jobs. These are managed by the scheduler process and are defined in their own configuration file. By default, the file is located in `config/recurring.yml`, but you can set a different path using the environment variable `SOLID_QUEUE_RECURRING_SCHEDULE` or by using the `--recurring_schedule_file` option with `bin/jobs`, like this:

```
bin/jobs --recurring_schedule_file=config/schedule.yml
```

The configuration itself looks like this:

```yml
dispatchers:
- polling_interval: 1
batch_size: 500
recurring_tasks:
my_periodic_job:
class: MyJob
args: [ 42, { status: "custom_status" } ]
schedule: every second
a_periodic_job:
class: MyJob
args: [ 42, { status: "custom_status" } ]
schedule: every second
a_cleanup_task:
command: "DeletedStuff.clear_all"
schedule: every day at 9am
```
`recurring_tasks` is a hash/dictionary, and the key will be the task key internally. Each task needs to have a class, which will be the job class to enqueue, and a schedule. The schedule is parsed using [Fugit](https://github.com/floraison/fugit), so it accepts anything [that Fugit accepts as a cron](https://github.com/floraison/fugit?tab=readme-ov-file#fugitcron). You can also provide arguments to be passed to the job, as a single argument, a hash, or an array of arguments that can also include kwargs as the last element in the array.

Tasks are specified as a hash/dictionary, where the key will be the task's key internally. Each task needs to either have a `class`, which will be the job class to enqueue, or a `command`, which will be eval'ed in the context of a job (`SolidQueue::RecurringJob`) that will be enqueued according to its schedule, in the `solid_queue_recurring` queue.

Each task needs to have also a schedule, which is parsed using [Fugit](https://github.com/floraison/fugit), so it accepts anything [that Fugit accepts as a cron](https://github.com/floraison/fugit?tab=readme-ov-file#fugitcron). You can optionally supply the following for each task:
- `args`: the arguments to be passed to the job, as a single argument, a hash, or an array of arguments that can also include kwargs as the last element in the array.

The job in the example configuration above will be enqueued every second as:
```ruby
MyJob.perform_later(42, status: "custom_status")
```

Tasks are enqueued at their corresponding times by the dispatcher that owns them, and each task schedules the next one. This is pretty much [inspired by what GoodJob does](https://github.com/bensheldon/good_job/blob/994ecff5323bf0337e10464841128fda100750e6/lib/good_job/cron_manager.rb).
- `queue`: a different queue to be used when enqueuing the job. If none, the queue set up for the job class.

It's possible to run multiple dispatchers with the same `recurring_tasks` configuration. To avoid enqueuing duplicate tasks at the same time, an entry in a new `solid_queue_recurring_executions` table is created in the same transaction as the job is enqueued. This table has a unique index on `task_key` and `run_at`, ensuring only one entry per task per time will be created. This only works if you have `preserve_finished_jobs` set to `true` (the default), and the guarantee applies as long as you keep the jobs around.
- `priority`: a numeric priority value to be used when enqueuing the job.


Tasks are enqueued at their corresponding times by the scheduler, and each task schedules the next one. This is pretty much [inspired by what GoodJob does](https://github.com/bensheldon/good_job/blob/994ecff5323bf0337e10464841128fda100750e6/lib/good_job/cron_manager.rb).

It's possible to run multiple schedulers with the same `recurring_tasks` configuration, for example, if you have multiple servers for redundancy, and you run the `scheduler` in more than one of them. To avoid enqueuing duplicate tasks at the same time, an entry in a new `solid_queue_recurring_executions` table is created in the same transaction as the job is enqueued. This table has a unique index on `task_key` and `run_at`, ensuring only one entry per task per time will be created. This only works if you have `preserve_finished_jobs` set to `true` (the default), and the guarantee applies as long as you keep the jobs around.

Finally, it's possible to configure jobs that aren't handled by Solid Queue. That is, you can have a job like this in your app:
```ruby
Expand All @@ -340,13 +365,12 @@ end

You can still configure this in Solid Queue:
```yml
dispatchers:
- recurring_tasks:
my_periodic_resque_job:
class: MyResqueJob
args: 22
schedule: "*/5 * * * *"
my_periodic_resque_job:
class: MyResqueJob
args: 22
schedule: "*/5 * * * *"
```

and the job will be enqueued via `perform_later` so it'll run in Resque. However, in this case we won't track any `solid_queue_recurring_execution` record for it and there won't be any guarantees that the job is enqueued only once each time.

## Inspiration
Expand Down
13 changes: 13 additions & 0 deletions Rakefile
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# frozen_string_literal: true

require "bundler/setup"

APP_RAKEFILE = File.expand_path("test/dummy/Rakefile", __dir__)
Expand All @@ -6,3 +8,14 @@ load "rails/tasks/engine.rake"
load "rails/tasks/statistics.rake"

require "bundler/gem_tasks"

def databases
%w[ mysql postgres sqlite ]
end

task :test do
databases.each do |database|
sh("TARGET_DB=#{database} bin/setup")
sh("TARGET_DB=#{database} bin/rails test")
end
end
5 changes: 5 additions & 0 deletions UPGRADING.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
# Upgrading to version 0.9.x
This version changes how recurring tasks are configured. Before, they would be defined as part of the _dispatcher_ configuration. Now they've been upgraded to their own configuration file, and a dedicated process (the _scheduler_) to manage them. Check the _Recurring tasks_ section in the `README` to learn how to configure them in detail.

In short, they live now in `config/recurring.yml` (by default) and follow the same format as before when they lived under `dispatchers > recurring_tasks`.

# Upgrading to version 0.8.x
*IMPORTANT*: This version collapsed all migrations into a single `db/queue_schema.rb`, that will use a separate `queue` database. If you're upgrading from a version < 0.6.0, you need to upgrade to 0.6.0 first, ensure all migrations are up-to-date, and then upgrade further.

Expand Down
9 changes: 9 additions & 0 deletions app/jobs/solid_queue/recurring_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# frozen_string_literal: true

class SolidQueue::RecurringJob < ActiveJob::Base
queue_as :solid_queue_recurring

def perform(command)
eval(command)
end
end
43 changes: 35 additions & 8 deletions app/models/solid_queue/recurring_task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,30 @@ class RecurringTask < Record
serialize :arguments, coder: Arguments, default: []

validate :supported_schedule
validate :ensure_command_or_class_present
validate :existing_job_class

scope :static, -> { where(static: true) }

mattr_accessor :default_job_class
self.default_job_class = RecurringJob

class << self
def wrap(args)
args.is_a?(self) ? args : from_configuration(args.first, **args.second)
end

def from_configuration(key, **options)
new(key: key, class_name: options[:class], schedule: options[:schedule], arguments: options[:args])
new \
key: key,
class_name: options[:class],
command: options[:command],
arguments: options[:args],
schedule: options[:schedule],
queue_name: options[:queue].presence,
priority: options[:priority].presence,
description: options[:description],
static: true
end

def create_or_update_all(tasks)
Expand Down Expand Up @@ -47,7 +60,7 @@ def enqueue(at:)
else
payload[:other_adapter] = true

perform_later do |job|
perform_later.tap do |job|
unless job.successfully_enqueued?
payload[:enqueue_error] = job.enqueue_error&.message
end
Expand Down Expand Up @@ -77,8 +90,14 @@ def supported_schedule
end
end

def ensure_command_or_class_present
unless command.present? || class_name.present?
errors.add :base, :command_and_class_blank, message: "either command or class_name must be present"
end
end

def existing_job_class
unless job_class.present?
if class_name.present? && job_class.nil?
errors.add :class_name, :undefined, message: "doesn't correspond to an existing class"
end
end
Expand All @@ -89,7 +108,7 @@ def using_solid_queue_adapter?

def enqueue_and_record(run_at:)
RecurringExecution.record(key, run_at) do
job_class.new(*arguments_with_kwargs).tap do |active_job|
job_class.new(*arguments_with_kwargs).set(enqueue_options).tap do |active_job|
active_job.run_callbacks(:enqueue) do
Job.enqueue(active_job)
end
Expand All @@ -98,12 +117,16 @@ def enqueue_and_record(run_at:)
end
end

def perform_later(&block)
job_class.perform_later(*arguments_with_kwargs, &block)
def perform_later
job_class.new(*arguments_with_kwargs).tap do |active_job|
active_job.enqueue(enqueue_options)
end
end

def arguments_with_kwargs
if arguments.last.is_a?(Hash)
if class_name.nil?
command
elsif arguments.last.is_a?(Hash)
arguments[0...-1] + [ Hash.ruby2_keywords_hash(arguments.last) ]
else
arguments
Expand All @@ -116,7 +139,11 @@ def parsed_schedule
end

def job_class
@job_class ||= class_name&.safe_constantize
@job_class ||= class_name.present? ? class_name.safe_constantize : self.class.default_job_class
end

def enqueue_options
{ queue: queue_name, priority: priority }.compact
end
end
end
14 changes: 12 additions & 2 deletions lib/solid_queue/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,17 @@

module SolidQueue
class Cli < Thor
class_option :config_file, type: :string, aliases: "-c", default: Configuration::DEFAULT_CONFIG_FILE_PATH, desc: "Path to config file"
class_option :config_file, type: :string, aliases: "-c",
default: Configuration::DEFAULT_CONFIG_FILE_PATH,
desc: "Path to config file",
banner: "SOLID_QUEUE_CONFIG"

class_option :recurring_schedule_file, type: :string,
default: Configuration::DEFAULT_RECURRING_SCHEDULE_FILE_PATH,
desc: "Path to recurring schedule definition",
banner: "SOLID_QUEUE_RECURRING_SCHEDULE"

class_option :skip_recurring, type: :boolean, default: false

def self.exit_on_failure?
true
Expand All @@ -14,7 +24,7 @@ def self.exit_on_failure?
default_command :start

def start
SolidQueue::Supervisor.start(load_configuration_from: options["config_file"])
SolidQueue::Supervisor.start(**options.symbolize_keys)
end
end
end
Loading