diff --git a/CHANGELOG.md b/CHANGELOG.md index 74c3a5573..51db56ca0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,8 @@ ### New Features +* [#857](https://github.com/toptal/chewy/pull/857): Allow passing `wait_for_completion`, `request_per_second` and `scroll_size` options to `chewy:journal:clean` rake task and `delete_all` query builder method. ([@konalegi][])([@barthez][]) + ### Changes ### Bugs Fixed diff --git a/README.md b/README.md index 38c1431fc..a84d0c4df 100644 --- a/README.md +++ b/README.md @@ -677,6 +677,8 @@ You may be wondering why do you need it? The answer is simple: not to lose the d Imagine that you reset your index in a zero-downtime manner (to separate index), and at the meantime somebody keeps updating the data frequently (to old index). So all these actions will be written to the journal index and you'll be able to apply them after index reset using the `Chewy::Journal` interface. +When enabled, journal can grow to enormous size, consider setting up cron job that would clean it occasionally using [`chewy:journal:clean` rake task](#chewyjournal). + ### Index manipulation ```ruby @@ -1144,6 +1146,17 @@ rake chewy:journal:apply["$(date -v-1H -u +%FT%TZ)"] # apply journaled changes f rake chewy:journal:apply["$(date -v-1H -u +%FT%TZ)",users] # apply journaled changes for the past hour on UsersIndex only ``` +When the size of the journal becomes very large, the classical way of deletion would be obstructive and resource consuming. Fortunately, Chewy internally uses [delete-by-query](https://www.elastic.co/guide/en/elasticsearch/reference/7.17/docs-delete-by-query.html#docs-delete-by-query-task-api) ES function which supports async execution with batching and [throttling](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html#docs-delete-by-query-throttle). + +The available options, which can be set by ENV variables, are listed below: +* `WAIT_FOR_COMPLETION` - a boolean flag. It controls async execution. It waits by default. When set to `false` (`0`, `f`, `false` or `off` in any case spelling is accepted as `false`), Elasticsearch performs some preflight checks, launches the request, and returns a task reference you can use to cancel the task or get its status. +* `REQUESTS_PER_SECOND` - float. The throttle for this request in sub-requests per second. No throttling is enforced by default. +* `SCROLL_SIZE` - integer. The number of documents to be deleted in single sub-request. The default batch size is 1000. + +```bash +rake chewy:journal:clean WAIT_FOR_COMPLETION=false REQUESTS_PER_SECOND=10 SCROLL_SIZE=5000 +``` + ### RSpec integration Just add `require 'chewy/rspec'` to your spec_helper.rb and you will get additional features: diff --git a/lib/chewy/journal.rb b/lib/chewy/journal.rb index d1a887140..056601465 100644 --- a/lib/chewy/journal.rb +++ b/lib/chewy/journal.rb @@ -43,8 +43,12 @@ def apply(since_time, fetch_limit: 10, **import_options) # # @param until_time [Time, DateTime] time to clean up until it # @return [Hash] delete_by_query ES API call result - def clean(until_time = nil) - Chewy::Stash::Journal.clean(until_time, only: @only) + def clean(until_time = nil, delete_by_query_options: {}) + Chewy::Stash::Journal.clean( + until_time, + only: @only, + delete_by_query_options: delete_by_query_options.merge(refresh: false) + ) end private diff --git a/lib/chewy/rake_helper.rb b/lib/chewy/rake_helper.rb index 8e0de510d..c9150b8e8 100644 --- a/lib/chewy/rake_helper.rb +++ b/lib/chewy/rake_helper.rb @@ -19,6 +19,9 @@ module RakeHelper output.puts " Applying journal to #{targets}, #{count} entries, stage #{payload[:stage]}" end + DELETE_BY_QUERY_OPTIONS = %w[WAIT_FOR_COMPLETION REQUESTS_PER_SECOND SCROLL_SIZE].freeze + FALSE_VALUES = %w[0 f false off].freeze + class << self # Performs zero-downtime reindexing of all documents for the specified indexes # @@ -162,7 +165,7 @@ def journal_apply(time: nil, only: nil, except: nil, output: $stdout) subscribed_task_stats(output) do output.puts "Applying journal entries created after #{time}" - count = Chewy::Journal.new(indexes_from(only: only, except: except)).apply(time) + count = Chewy::Journal.new(journal_indexes_from(only: only, except: except)).apply(time) output.puts 'No journal entries were created after the specified time' if count.zero? end end @@ -181,12 +184,16 @@ def journal_apply(time: nil, only: nil, except: nil, output: $stdout) # @param except [Array, Chewy::Index, String] indexes to exclude from processing # @param output [IO] output io for logging # @return [Array] indexes that were actually updated - def journal_clean(time: nil, only: nil, except: nil, output: $stdout) + def journal_clean(time: nil, only: nil, except: nil, delete_by_query_options: {}, output: $stdout) subscribed_task_stats(output) do output.puts "Cleaning journal entries created before #{time}" if time - response = Chewy::Journal.new(indexes_from(only: only, except: except)).clean(time) - count = response['deleted'] || response['_indices']['_all']['deleted'] - output.puts "Cleaned up #{count} journal entries" + response = Chewy::Journal.new(journal_indexes_from(only: only, except: except)).clean(time, delete_by_query_options: delete_by_query_options) + if response.key?('task') + output.puts "Task to cleanup the journal has been created, #{response['task']}" + else + count = response['deleted'] || response['_indices']['_all']['deleted'] + output.puts "Cleaned up #{count} journal entries" + end end end @@ -228,6 +235,26 @@ def update_mapping(name:, output: $stdout) end end + # Reads options that are required to run journal cleanup asynchronously from ENV hash + # @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html + # + # @example + # Chewy::RakeHelper.delete_by_query_options_from_env({'WAIT_FOR_COMPLETION' => 'false','REQUESTS_PER_SECOND' => '10','SCROLL_SIZE' => '5000'}) + # # => { wait_for_completion: false, requests_per_second: 10.0, scroll_size: 5000 } + # + def delete_by_query_options_from_env(env) + env + .slice(*DELETE_BY_QUERY_OPTIONS) + .transform_keys { |k| k.downcase.to_sym } + .to_h do |key, value| + case key + when :wait_for_completion then [key, !FALSE_VALUES.include?(value.downcase)] + when :requests_per_second then [key, value.to_f] + when :scroll_size then [key, value.to_i] + end + end + end + def normalize_indexes(*identifiers) identifiers.flatten(1).map { |identifier| normalize_index(identifier) } end @@ -248,6 +275,12 @@ def subscribed_task_stats(output = $stdout, &block) private + def journal_indexes_from(only: nil, except: nil) + return if Array.wrap(only).empty? && Array.wrap(except).empty? + + indexes_from(only: only, except: except) + end + def indexes_from(only: nil, except: nil) indexes = if only.present? normalize_indexes(Array.wrap(only)) diff --git a/lib/chewy/search/request.rb b/lib/chewy/search/request.rb index 687e8a093..6c7a62156 100644 --- a/lib/chewy/search/request.rb +++ b/lib/chewy/search/request.rb @@ -962,10 +962,22 @@ def pluck(*fields) # # @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html # @note The result hash is different for different API used. - # @param refresh [true, false] field names + # @param refresh [true, false] Refreshes all shards involved in the delete by query + # @param wait_for_completion [true, false] wait for request completion or run it asynchronously + # and return task reference at `.tasks/task/${taskId}`. + # @param requests_per_second [Float] The throttle for this request in sub-requests per second + # @param scroll_size [Integer] Size of the scroll request that powers the operation + # @return [Hash] the result of query execution - def delete_all(refresh: true) - request_body = only(WHERE_STORAGES).render.merge(refresh: refresh) + def delete_all(refresh: true, wait_for_completion: nil, requests_per_second: nil, scroll_size: nil) + request_body = only(WHERE_STORAGES).render.merge( + { + refresh: refresh, + wait_for_completion: wait_for_completion, + requests_per_second: requests_per_second, + scroll_size: scroll_size + }.compact + ) ActiveSupport::Notifications.instrument 'delete_query.chewy', notification_payload(request: request_body) do request_body[:body] = {query: {match_all: {}}} if request_body[:body].empty? Chewy.client.delete_by_query(request_body) diff --git a/lib/chewy/stash.rb b/lib/chewy/stash.rb index 2181ee492..d49957d50 100644 --- a/lib/chewy/stash.rb +++ b/lib/chewy/stash.rb @@ -28,12 +28,12 @@ def self.entries(since_time, only: []) # Cleans up all the journal entries until the specified time. If nothing is # specified - cleans up everything. # - # @param since_time [Time, DateTime] the time top boundary + # @param until_time [Time, DateTime] Clean everything before that date # @param only [Chewy::Index, Array] indexes to clean up journal entries for - def self.clean(until_time = nil, only: []) + def self.clean(until_time = nil, only: [], delete_by_query_options: {}) scope = self.for(only) scope = scope.filter(range: {created_at: {lte: until_time}}) if until_time - scope.delete_all + scope.delete_all(**delete_by_query_options) end # Selects all the journal entries for the specified indices. diff --git a/lib/tasks/chewy.rake b/lib/tasks/chewy.rake index 5cf4ef17c..6071ccdca 100644 --- a/lib/tasks/chewy.rake +++ b/lib/tasks/chewy.rake @@ -94,7 +94,13 @@ namespace :chewy do desc 'Removes journal records created before the specified timestamp for the specified indexes/types or all of them' task clean: :environment do |_task, args| - Chewy::RakeHelper.journal_clean(**parse_journal_args(args.extras)) + delete_options = Chewy::RakeHelper.delete_by_query_options_from_env(ENV) + Chewy::RakeHelper.journal_clean( + [ + parse_journal_args(args.extras), + {delete_by_query_options: delete_options} + ].reduce({}, :merge) + ) end end end diff --git a/spec/chewy/rake_helper_spec.rb b/spec/chewy/rake_helper_spec.rb index d9754ef68..d0f84b15c 100644 --- a/spec/chewy/rake_helper_spec.rb +++ b/spec/chewy/rake_helper_spec.rb @@ -426,6 +426,33 @@ described_class.journal_clean(except: CitiesIndex, output: output) expect(output.string).to match(Regexp.new(<<-OUTPUT, Regexp::MULTILINE)) \\ACleaned up 1 journal entries +Total: \\d+s\\Z + OUTPUT + end + + it 'executes asynchronously' do + output = StringIO.new + expect(Chewy.client).to receive(:delete_by_query).with( + { + body: {query: {match_all: {}}}, + index: ['chewy_journal'], + refresh: false, + requests_per_second: 10.0, + scroll_size: 200, + wait_for_completion: false + } + ).and_call_original + described_class.journal_clean( + output: output, + delete_by_query_options: { + wait_for_completion: false, + requests_per_second: 10.0, + scroll_size: 200 + } + ) + + expect(output.string).to match(Regexp.new(<<-OUTPUT, Regexp::MULTILINE)) +\\ATask to cleanup the journal has been created, [^\\n]* Total: \\d+s\\Z OUTPUT end @@ -502,4 +529,45 @@ end end end + + describe '.delete_by_query_options_from_env' do + subject(:options) { described_class.delete_by_query_options_from_env(env) } + let(:env) do + { + 'WAIT_FOR_COMPLETION' => 'false', + 'REQUESTS_PER_SECOND' => '10', + 'SCROLL_SIZE' => '5000' + } + end + + it 'parses the options' do + expect(options).to eq( + wait_for_completion: false, + requests_per_second: 10.0, + scroll_size: 5000 + ) + end + + context 'with different boolean values' do + it 'parses the option correctly' do + %w[1 t true TRUE on ON].each do |v| + expect(described_class.delete_by_query_options_from_env({'WAIT_FOR_COMPLETION' => v})) + .to eq(wait_for_completion: true) + end + + %w[0 f false FALSE off OFF].each do |v| + expect(described_class.delete_by_query_options_from_env({'WAIT_FOR_COMPLETION' => v})) + .to eq(wait_for_completion: false) + end + end + end + + context 'with other env' do + let(:env) { {'SOME_ENV' => '123', 'REQUESTS_PER_SECOND' => '15'} } + + it 'parses only the options' do + expect(options).to eq(requests_per_second: 15.0) + end + end + end end diff --git a/spec/chewy/search/request_spec.rb b/spec/chewy/search/request_spec.rb index f8ea8d1ea..4aede10f3 100644 --- a/spec/chewy/search/request_spec.rb +++ b/spec/chewy/search/request_spec.rb @@ -817,6 +817,31 @@ request: {index: ['products'], body: {query: {match: {name: 'name3'}}}, refresh: false} ) end + + it 'delete records asynchronously' do + outer_payload = nil + ActiveSupport::Notifications.subscribe('delete_query.chewy') do |_name, _start, _finish, _id, payload| + outer_payload = payload + end + subject.query(match: {name: 'name3'}).delete_all( + refresh: false, + wait_for_completion: false, + requests_per_second: 10.0, + scroll_size: 2000 + ) + expect(outer_payload).to eq( + index: ProductsIndex, + indexes: [ProductsIndex], + request: { + index: ['products'], + body: {query: {match: {name: 'name3'}}}, + refresh: false, + wait_for_completion: false, + requests_per_second: 10.0, + scroll_size: 2000 + } + ) + end end describe '#response=' do