From af624489a04e89d770a0190284f5c7e8e87c7017 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois-Pierre=20Bouchard?= Date: Tue, 27 Sep 2016 16:30:18 -0400 Subject: [PATCH] Only apply journal for current index when resetting --- lib/chewy/journal.rb | 69 +++++++++++++++++++++++++++++++------- lib/chewy/rake_helper.rb | 2 +- spec/chewy/journal_spec.rb | 47 ++++++++++++++++++++++++++ 3 files changed, 104 insertions(+), 14 deletions(-) diff --git a/lib/chewy/journal.rb b/lib/chewy/journal.rb index 713cda458..9b7d4c5f5 100644 --- a/lib/chewy/journal.rb +++ b/lib/chewy/journal.rb @@ -48,6 +48,10 @@ def any_records? @records.any? end + def apply_changes_from(time) + Chewy::Journal.apply_changes_from(time, only: @index) + end + private def identify(objects) @@ -55,8 +59,8 @@ def identify(objects) end class << self - def apply_changes_from(time) - group(entries_from(time)).each do |entry| + def apply_changes_from(time, options = {}) + group(entries_from(time, options[:only])).each do |entry| Chewy.derive_type(entry.full_type_name).import(entry.object_ids, journal: false) end end @@ -65,8 +69,8 @@ def group(entries) entries.group_by(&:full_type_name).map { |_, grouped_entries| grouped_entries.reduce(:merge) } end - def entries_from(time) - query = query(time, :gte) + def entries_from(time, index = nil) + query = query(time, :gte, index) size = Chewy.client.search(index: index_name, type: type_name, body: query, search_type: 'count')['hits']['total'] if size > 0 Chewy.client.search(index: index_name, type: type_name, body: query, size: size, sort: 'created_at')['hits']['hits'].map { |r| Entry.new(r['_source']) } @@ -94,7 +98,7 @@ def delete end def clean_until(time) - query = query(time, :lte, :query) + query = query(time, :lte, nil, false) search_query = query.merge(fields: ['_id'], size: DELETE_BATCH_SIZE) count = Chewy.client.count(index: index_name, body: query)['count'] @@ -123,21 +127,60 @@ def type_name JOURNAL_MAPPING.keys.first end - def query(time, comparator, query_type = :filter) - { - query: { - filtered: { - query_type => { - range: { - created_at: { - comparator => time.to_i + def query(time, comparator, index = nil, use_filter = true) + filter_query = + if use_filter + if index.present? + { + filter: { + bool: { + must: [ + range_filter(comparator, time), + index_filter(index) + ] } } } + else + { + filter: range_filter(comparator, time) + } + end + elsif index.present? + { + query: range_filter(comparator, time), + filter: index_filter(index) + } + else + { + query: range_filter(comparator, time) + } + end + + { + query: { + filtered: filter_query + } + } + end + + def range_filter(comparator, time) + { + range: { + created_at: { + comparator => time.to_i } } } end + + def index_filter(index) + { + term: { + index_name: index.derivable_index_name + } + } + end end class Entry diff --git a/lib/chewy/rake_helper.rb b/lib/chewy/rake_helper.rb index b8576d4de..be4a90a1b 100644 --- a/lib/chewy/rake_helper.rb +++ b/lib/chewy/rake_helper.rb @@ -50,7 +50,7 @@ def reset_index(*indexes) index.reset!((time.to_f * 1000).round) if index.journal? Chewy::Journal.create - Chewy::Journal.apply_changes_from(time) + Chewy::Journal.apply_changes_from(time, only: index) end end end diff --git a/spec/chewy/journal_spec.rb b/spec/chewy/journal_spec.rb index 41828b666..07ea8a8ca 100644 --- a/spec/chewy/journal_spec.rb +++ b/spec/chewy/journal_spec.rb @@ -169,5 +169,52 @@ def timestamp(time) end end end + + context '.apply_changes_from with an index filter' do + let(:time) { Time.now } + + before do + stub_model(:city) do + update_index 'city', :self + end + stub_model(:country) do + update_index 'country', :self + end + + stub_index('city') do + define_type City do + default_import_options journal: true + end + end + stub_index('country') do + define_type Country do + default_import_options journal: true + end + end + + Chewy.massacre + Timecop.freeze(time) + end + + specify do + Chewy.strategy(:urgent) do + Array.new(2) { |i| City.create!(id: i + 1) } + Array.new(2) { |i| Country.create!(id: i + 1) } + expect(CityIndex.all.to_a.length).to eq 2 + expect(CountryIndex.all.to_a.length).to eq 2 + + # simulate lost data + Chewy.client.delete(index: 'city', type: 'city', id: 1, refresh: true) + Chewy.client.delete(index: 'country', type: 'country', id: 1, refresh: true) + expect(CityIndex.all.to_a.length).to eq 1 + expect(CountryIndex.all.to_a.length).to eq 1 + + # Replay on specific index + Chewy::Journal.new(CityIndex).apply_changes_from(time) + expect(CityIndex.all.to_a.length).to eq 2 + expect(CountryIndex.all.to_a.length).to eq 1 + end + end + end end end