From da9bc3c4929f6a587d87328c3c294c6b1a110dd9 Mon Sep 17 00:00:00 2001 From: Danil Nurgaliev Date: Mon, 19 Sep 2022 14:23:17 +0300 Subject: [PATCH] Allow asynchronously cleanup journal --- README.md | 11 +++++++++ lib/chewy/journal.rb | 8 +++++-- lib/chewy/rake_helper.rb | 39 +++++++++++++++++++++++++++---- lib/chewy/search/request.rb | 17 +++++++++++--- lib/chewy/stash.rb | 4 ++-- lib/chewy/version.rb | 2 +- lib/tasks/chewy.rake | 9 ++++++- spec/chewy/rake_helper_spec.rb | 27 +++++++++++++++++++++ spec/chewy/search/request_spec.rb | 25 ++++++++++++++++++++ 9 files changed, 128 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index 38c1431fc..7e74e3737 100644 --- a/README.md +++ b/README.md @@ -1144,6 +1144,17 @@ rake chewy:journal:apply["$(date -v-1H -u +%FT%TZ)"] # apply journaled changes f rake chewy:journal:apply["$(date -v-1H -u +%FT%TZ)",users] # apply journaled changes for the past hour on UsersIndex only ``` +For the cases when journal has grown up to enormous size, classical way of deletion is not quite possible. Fortunately chewy internally uses internally uses [delete-by-query](https://www.elastic.co/guide/en/elasticsearch/reference/7.17/docs-delete-by-query.html#docs-delete-by-query-task-api) ES function which support async execution with batching and [throttling](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html#docs-delete-by-query-throttle). +The full list of available options are listed below: +* `wait_for_completion` - boolean, options controls async execution, `false` - `async`, `true` - `sync`. When set to `false` Elasticsearch performs some preflight checks, launches the request, and returns a task you can use to cancel or get the status of the task +* `requests_per_second` - float, The throttle for this request in sub-requests per second +* `scroll_size` - integer, Size of the scroll request that powers the operation + + +```bash +rake chewy:journal:clean -- --wait_for_completion=false --requests_per_second=10 --scroll_size=5000 +``` + ### RSpec integration Just add `require 'chewy/rspec'` to your spec_helper.rb and you will get additional features: diff --git a/lib/chewy/journal.rb b/lib/chewy/journal.rb index d1a887140..056601465 100644 --- a/lib/chewy/journal.rb +++ b/lib/chewy/journal.rb @@ -43,8 +43,12 @@ def apply(since_time, fetch_limit: 10, **import_options) # # @param until_time [Time, DateTime] time to clean up until it # @return [Hash] delete_by_query ES API call result - def clean(until_time = nil) - Chewy::Stash::Journal.clean(until_time, only: @only) + def clean(until_time = nil, delete_by_query_options: {}) + Chewy::Stash::Journal.clean( + until_time, + only: @only, + delete_by_query_options: delete_by_query_options.merge(refresh: false) + ) end private diff --git a/lib/chewy/rake_helper.rb b/lib/chewy/rake_helper.rb index 8e0de510d..5cd250438 100644 --- a/lib/chewy/rake_helper.rb +++ b/lib/chewy/rake_helper.rb @@ -162,7 +162,7 @@ def journal_apply(time: nil, only: nil, except: nil, output: $stdout) subscribed_task_stats(output) do output.puts "Applying journal entries created after #{time}" - count = Chewy::Journal.new(indexes_from(only: only, except: except)).apply(time) + count = Chewy::Journal.new(journal_indexes_from(only: only, except: except)).apply(time) output.puts 'No journal entries were created after the specified time' if count.zero? end end @@ -181,12 +181,16 @@ def journal_apply(time: nil, only: nil, except: nil, output: $stdout) # @param except [Array, Chewy::Index, String] indexes to exclude from processing # @param output [IO] output io for logging # @return [Array] indexes that were actually updated - def journal_clean(time: nil, only: nil, except: nil, output: $stdout) + def journal_clean(time: nil, only: nil, except: nil, delete_by_query_options: {}, output: $stdout) subscribed_task_stats(output) do output.puts "Cleaning journal entries created before #{time}" if time - response = Chewy::Journal.new(indexes_from(only: only, except: except)).clean(time) - count = response['deleted'] || response['_indices']['_all']['deleted'] - output.puts "Cleaned up #{count} journal entries" + response = Chewy::Journal.new(journal_indexes_from(only: only, except: except)).clean(time, delete_by_query_options: delete_by_query_options) + if response.key?('task') + output.puts "Task to cleanup the journal has been created, #{response['task']}" + else + count = response['deleted'] || response['_indices']['_all']['deleted'] + output.puts "Cleaned up #{count} journal entries" + end end end @@ -228,6 +232,25 @@ def update_mapping(name:, output: $stdout) end end + # Parses options that are required to run journal cleanup asynchronously + # @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html + # + # @example + # Chewy::RakeHelper.parse_delete_by_query_options(["chewy:journal:clean", "--", "--wait_for_completion=false", "--requests_per_second=10", "--scroll_size=5000"]) + # + def parse_delete_by_query_options(args) + str_to_bool = ->(str) { {'true' => true, 'false' => false}.fetch(str) { raise('unknown value for boolean is provided') } } + options = {} + opts = OptionParser.new + opts.on('-w', '--wait_for_completion=ARG', String) { |val| options[:wait_for_completion] = str_to_bool.call(val) } + opts.on('-r', '--requests_per_second=ARG', Float) { |rps| options[:requests_per_second] = rps } + opts.on('-r', '--scroll_size=ARG', Integer) { |ss| options[:scroll_size] = ss } + + ordered_args = opts.order(args) {} # rubocop:disable Lint/EmptyBlock + opts.parse(ordered_args) + options + end + def normalize_indexes(*identifiers) identifiers.flatten(1).map { |identifier| normalize_index(identifier) } end @@ -248,6 +271,12 @@ def subscribed_task_stats(output = $stdout, &block) private + def journal_indexes_from(only: nil, except: nil) + return if Array.wrap(only).empty? && Array.wrap(except).empty? + + indexes_from(only: only, except: except) + end + def indexes_from(only: nil, except: nil) indexes = if only.present? normalize_indexes(Array.wrap(only)) diff --git a/lib/chewy/search/request.rb b/lib/chewy/search/request.rb index 687e8a093..f955c85a4 100644 --- a/lib/chewy/search/request.rb +++ b/lib/chewy/search/request.rb @@ -962,10 +962,21 @@ def pluck(*fields) # # @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html # @note The result hash is different for different API used. - # @param refresh [true, false] field names + # @param refresh [true, false] Refreshes all shards involved in the delete by query + # @param wait_for_completion [true, false] Refreshes all shards involved in the delete by query + # @param requests_per_second [Float] The throttle for this request in sub-requests per second + # @param scroll_size [Integer] Size of the scroll request that powers the operation + # @return [Hash] the result of query execution - def delete_all(refresh: true) - request_body = only(WHERE_STORAGES).render.merge(refresh: refresh) + def delete_all(refresh: true, wait_for_completion: nil, requests_per_second: nil, scroll_size: nil) + request_body = only(WHERE_STORAGES).render.merge( + { + refresh: refresh, + wait_for_completion: wait_for_completion, + requests_per_second: requests_per_second, + scroll_size: scroll_size + }.compact + ) ActiveSupport::Notifications.instrument 'delete_query.chewy', notification_payload(request: request_body) do request_body[:body] = {query: {match_all: {}}} if request_body[:body].empty? Chewy.client.delete_by_query(request_body) diff --git a/lib/chewy/stash.rb b/lib/chewy/stash.rb index 2181ee492..a1e770c9a 100644 --- a/lib/chewy/stash.rb +++ b/lib/chewy/stash.rb @@ -30,10 +30,10 @@ def self.entries(since_time, only: []) # # @param since_time [Time, DateTime] the time top boundary # @param only [Chewy::Index, Array] indexes to clean up journal entries for - def self.clean(until_time = nil, only: []) + def self.clean(until_time = nil, only: [], delete_by_query_options: {}) scope = self.for(only) scope = scope.filter(range: {created_at: {lte: until_time}}) if until_time - scope.delete_all + scope.delete_all(**delete_by_query_options) end # Selects all the journal entries for the specified indices. diff --git a/lib/chewy/version.rb b/lib/chewy/version.rb index 5b896f45c..39af809b9 100644 --- a/lib/chewy/version.rb +++ b/lib/chewy/version.rb @@ -1,3 +1,3 @@ module Chewy - VERSION = '7.2.6'.freeze + VERSION = '7.2.7'.freeze end diff --git a/lib/tasks/chewy.rake b/lib/tasks/chewy.rake index 5cf4ef17c..fd80f0090 100644 --- a/lib/tasks/chewy.rake +++ b/lib/tasks/chewy.rake @@ -94,7 +94,14 @@ namespace :chewy do desc 'Removes journal records created before the specified timestamp for the specified indexes/types or all of them' task clean: :environment do |_task, args| - Chewy::RakeHelper.journal_clean(**parse_journal_args(args.extras)) + journal_args = parse_journal_args(args.extras) + delete_options = Chewy::RakeHelper.parse_delete_by_query_options(ARGV) + Chewy::RakeHelper.journal_clean( + [ + journal_args, + {delete_by_query_options: delete_options} + ].reduce({}, :merge) + ) end end end diff --git a/spec/chewy/rake_helper_spec.rb b/spec/chewy/rake_helper_spec.rb index d9754ef68..f5d9c6584 100644 --- a/spec/chewy/rake_helper_spec.rb +++ b/spec/chewy/rake_helper_spec.rb @@ -426,6 +426,33 @@ described_class.journal_clean(except: CitiesIndex, output: output) expect(output.string).to match(Regexp.new(<<-OUTPUT, Regexp::MULTILINE)) \\ACleaned up 1 journal entries +Total: \\d+s\\Z + OUTPUT + end + + it 'executes asynchronously' do + output = StringIO.new + expect(Chewy.client).to receive(:delete_by_query).with( + { + body: {query: {match_all: {}}}, + index: ['chewy_journal'], + refresh: false, + requests_per_second: 10.0, + scroll_size: 200, + wait_for_completion: false + } + ).and_call_original + described_class.journal_clean( + output: output, + delete_by_query_options: { + wait_for_completion: false, + requests_per_second: 10.0, + scroll_size: 200 + } + ) + + expect(output.string).to match(Regexp.new(<<-OUTPUT, Regexp::MULTILINE)) +\\ATask to cleanup the journal has been created, [^\\n]* Total: \\d+s\\Z OUTPUT end diff --git a/spec/chewy/search/request_spec.rb b/spec/chewy/search/request_spec.rb index f8ea8d1ea..4aede10f3 100644 --- a/spec/chewy/search/request_spec.rb +++ b/spec/chewy/search/request_spec.rb @@ -817,6 +817,31 @@ request: {index: ['products'], body: {query: {match: {name: 'name3'}}}, refresh: false} ) end + + it 'delete records asynchronously' do + outer_payload = nil + ActiveSupport::Notifications.subscribe('delete_query.chewy') do |_name, _start, _finish, _id, payload| + outer_payload = payload + end + subject.query(match: {name: 'name3'}).delete_all( + refresh: false, + wait_for_completion: false, + requests_per_second: 10.0, + scroll_size: 2000 + ) + expect(outer_payload).to eq( + index: ProductsIndex, + indexes: [ProductsIndex], + request: { + index: ['products'], + body: {query: {match: {name: 'name3'}}}, + refresh: false, + wait_for_completion: false, + requests_per_second: 10.0, + scroll_size: 2000 + } + ) + end end describe '#response=' do