Skip to content

Commit

Permalink
Only apply journal for current index when resetting
Browse files Browse the repository at this point in the history
  • Loading branch information
François-Pierre Bouchard committed Sep 27, 2016
1 parent 4ae2065 commit af62448
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 14 deletions.
69 changes: 56 additions & 13 deletions lib/chewy/journal.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,19 @@ def any_records?
@records.any?
end

def apply_changes_from(time)
Chewy::Journal.apply_changes_from(time, only: @index)
end

private

def identify(objects)
@index.adapter.identify(objects)
end

class << self
def apply_changes_from(time)
group(entries_from(time)).each do |entry|
def apply_changes_from(time, options = {})
group(entries_from(time, options[:only])).each do |entry|
Chewy.derive_type(entry.full_type_name).import(entry.object_ids, journal: false)
end
end
Expand All @@ -65,8 +69,8 @@ def group(entries)
entries.group_by(&:full_type_name).map { |_, grouped_entries| grouped_entries.reduce(:merge) }
end

def entries_from(time)
query = query(time, :gte)
def entries_from(time, index = nil)
query = query(time, :gte, index)
size = Chewy.client.search(index: index_name, type: type_name, body: query, search_type: 'count')['hits']['total']
if size > 0
Chewy.client.search(index: index_name, type: type_name, body: query, size: size, sort: 'created_at')['hits']['hits'].map { |r| Entry.new(r['_source']) }
Expand Down Expand Up @@ -94,7 +98,7 @@ def delete
end

def clean_until(time)
query = query(time, :lte, :query)
query = query(time, :lte, nil, false)
search_query = query.merge(fields: ['_id'], size: DELETE_BATCH_SIZE)

count = Chewy.client.count(index: index_name, body: query)['count']
Expand Down Expand Up @@ -123,21 +127,60 @@ def type_name
JOURNAL_MAPPING.keys.first
end

def query(time, comparator, query_type = :filter)
{
query: {
filtered: {
query_type => {
range: {
created_at: {
comparator => time.to_i
def query(time, comparator, index = nil, use_filter = true)
filter_query =
if use_filter
if index.present?
{
filter: {
bool: {
must: [
range_filter(comparator, time),
index_filter(index)
]
}
}
}
else
{
filter: range_filter(comparator, time)
}
end
elsif index.present?
{
query: range_filter(comparator, time),
filter: index_filter(index)
}
else
{
query: range_filter(comparator, time)
}
end

{
query: {
filtered: filter_query
}
}
end

def range_filter(comparator, time)
{
range: {
created_at: {
comparator => time.to_i
}
}
}
end

def index_filter(index)
{
term: {
index_name: index.derivable_index_name
}
}
end
end

class Entry
Expand Down
2 changes: 1 addition & 1 deletion lib/chewy/rake_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def reset_index(*indexes)
index.reset!((time.to_f * 1000).round)
if index.journal?
Chewy::Journal.create
Chewy::Journal.apply_changes_from(time)
Chewy::Journal.apply_changes_from(time, only: index)
end
end
end
Expand Down
47 changes: 47 additions & 0 deletions spec/chewy/journal_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -169,5 +169,52 @@ def timestamp(time)
end
end
end

context '.apply_changes_from with an index filter' do
let(:time) { Time.now }

before do
stub_model(:city) do
update_index 'city', :self
end
stub_model(:country) do
update_index 'country', :self
end

stub_index('city') do
define_type City do
default_import_options journal: true
end
end
stub_index('country') do
define_type Country do
default_import_options journal: true
end
end

Chewy.massacre
Timecop.freeze(time)
end

specify do
Chewy.strategy(:urgent) do
Array.new(2) { |i| City.create!(id: i + 1) }
Array.new(2) { |i| Country.create!(id: i + 1) }
expect(CityIndex.all.to_a.length).to eq 2
expect(CountryIndex.all.to_a.length).to eq 2

# simulate lost data
Chewy.client.delete(index: 'city', type: 'city', id: 1, refresh: true)
Chewy.client.delete(index: 'country', type: 'country', id: 1, refresh: true)
expect(CityIndex.all.to_a.length).to eq 1
expect(CountryIndex.all.to_a.length).to eq 1

# Replay on specific index
Chewy::Journal.new(CityIndex).apply_changes_from(time)
expect(CityIndex.all.to_a.length).to eq 2
expect(CountryIndex.all.to_a.length).to eq 1
end
end
end
end
end

0 comments on commit af62448

Please sign in to comment.