Skip to content

Commit

Permalink
Merge pull request #434 from fpbouchard/master
Browse files Browse the repository at this point in the history
Only apply journal for current index when resetting
  • Loading branch information
pyromaniac authored Oct 7, 2016
2 parents 7522dc1 + 48e44db commit 62d4cfb
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 13 deletions.
73 changes: 61 additions & 12 deletions lib/chewy/journal.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ def identify(objects)
end

class << self
def apply_changes_from(time)
group(entries_from(time)).each do |entry|
def apply_changes_from(time, options = {})
group(entries_from(time, options[:only])).each do |entry|
Chewy.derive_type(entry.full_type_name).import(entry.object_ids, journal: false)
end
end
Expand All @@ -65,8 +65,8 @@ def group(entries)
entries.group_by(&:full_type_name).map { |_, grouped_entries| grouped_entries.reduce(:merge) }
end

def entries_from(time)
query = query(time, :gte)
def entries_from(time, indices = [])
query = query(time, :gte, indices)
size = Chewy.client.search(index: index_name, type: type_name, body: query, search_type: 'count')['hits']['total']
if size > 0
Chewy.client.search(index: index_name, type: type_name, body: query, size: size, sort: 'created_at')['hits']['hits'].map { |r| Entry.new(r['_source']) }
Expand Down Expand Up @@ -94,7 +94,7 @@ def delete
end

def clean_until(time)
query = query(time, :lte, :query)
query = query(time, :lte, nil, false)
search_query = query.merge(fields: ['_id'], size: DELETE_BATCH_SIZE)

count = Chewy.client.count(index: index_name, body: query)['count']
Expand Down Expand Up @@ -123,19 +123,68 @@ def type_name
JOURNAL_MAPPING.keys.first
end

def query(time, comparator, query_type = :filter)
def query(time, comparator, indices, use_filter = true)
indices ||= []
{
query: {
filtered: {
query_type => {
range: {
created_at: {
comparator => time.to_i
filtered: use_filter ? using_filter_query(time, comparator, indices) : using_query_query(time, comparator, indices)
}
}
end

def using_filter_query(time, comparator, indices)
if indices.any?
{
filter: {
bool: {
must: [
range_filter(comparator, time),
bool: {
should: indices.collect { |i| index_filter(i) }
}
}
]
}
}
}
else
{
filter: range_filter(comparator, time)
}
end
end

def using_query_query(time, comparator, indices)
if indices.any?
{
query: range_filter(comparator, time),
filter: {
bool: {
should: indices.collect { |i| index_filter(i) }
}
}
}
else
{
query: range_filter(comparator, time)
}
end
end

def range_filter(comparator, time)
{
range: {
created_at: {
comparator => time.to_i
}
}
}
end

def index_filter(index)
{
term: {
index_name: index.derivable_index_name
}
}
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/chewy/rake_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def reset_index(*indexes)
index.reset!((time.to_f * 1000).round)
if index.journal?
Chewy::Journal.create
Chewy::Journal.apply_changes_from(time)
Chewy::Journal.apply_changes_from(time, only: [index])
end
end
end
Expand Down
54 changes: 54 additions & 0 deletions spec/chewy/journal_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -169,5 +169,59 @@ def timestamp(time)
end
end
end

context '.apply_changes_from with an index filter' do
let(:time) { Time.now }

before do
stub_model(:city) do
update_index 'city', :self
end
stub_model(:country) do
update_index 'country', :self
end

stub_index('city') do
define_type City do
default_import_options journal: true
end
end
stub_index('country') do
define_type Country do
default_import_options journal: true
end
end

Chewy.massacre
Timecop.freeze(time)
end

specify do
Chewy.strategy(:urgent) do
Array.new(2) { |i| City.create!(id: i + 1) }
Array.new(2) { |i| Country.create!(id: i + 1) }
expect(CityIndex.all.to_a.length).to eq 2
expect(CountryIndex.all.to_a.length).to eq 2

# simulate lost data
Chewy.client.delete(index: 'city', type: 'city', id: 1, refresh: true)
Chewy.client.delete(index: 'country', type: 'country', id: 1, refresh: true)
expect(CityIndex.all.to_a.length).to eq 1
expect(CountryIndex.all.to_a.length).to eq 1

# Replay on specific index
Chewy::Journal.apply_changes_from(time, only: [CityIndex])
expect(CityIndex.all.to_a.length).to eq 2
expect(CountryIndex.all.to_a.length).to eq 1

# Replay on both
Chewy.client.delete(index: 'city', type: 'city', id: 1, refresh: true)
expect(CityIndex.all.to_a.length).to eq 1
Chewy::Journal.apply_changes_from(time, only: [CityIndex, CountryIndex])
expect(CityIndex.all.to_a.length).to eq 2
expect(CountryIndex.all.to_a.length).to eq 2
end
end
end
end
end

0 comments on commit 62d4cfb

Please sign in to comment.