Skip to content

Commit

Permalink
Apply changes following review
Browse files Browse the repository at this point in the history
  • Loading branch information
François-Pierre Bouchard committed Oct 5, 2016
1 parent af62448 commit 48e44db
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 37 deletions.
76 changes: 41 additions & 35 deletions lib/chewy/journal.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,6 @@ def any_records?
@records.any?
end

def apply_changes_from(time)
Chewy::Journal.apply_changes_from(time, only: @index)
end

private

def identify(objects)
Expand All @@ -69,8 +65,8 @@ def group(entries)
entries.group_by(&:full_type_name).map { |_, grouped_entries| grouped_entries.reduce(:merge) }
end

def entries_from(time, index = nil)
query = query(time, :gte, index)
def entries_from(time, indices = [])
query = query(time, :gte, indices)
size = Chewy.client.search(index: index_name, type: type_name, body: query, search_type: 'count')['hits']['total']
if size > 0
Chewy.client.search(index: index_name, type: type_name, body: query, size: size, sort: 'created_at')['hits']['hits'].map { |r| Entry.new(r['_source']) }
Expand Down Expand Up @@ -127,41 +123,51 @@ def type_name
JOURNAL_MAPPING.keys.first
end

def query(time, comparator, index = nil, use_filter = true)
filter_query =
if use_filter
if index.present?
{
filter: {
def query(time, comparator, indices, use_filter = true)
indices ||= []
{
query: {
filtered: use_filter ? using_filter_query(time, comparator, indices) : using_query_query(time, comparator, indices)
}
}
end

def using_filter_query(time, comparator, indices)
if indices.any?
{
filter: {
bool: {
must: [
range_filter(comparator, time),
bool: {
must: [
range_filter(comparator, time),
index_filter(index)
]
should: indices.collect { |i| index_filter(i) }
}
}
]
}
else
{
filter: range_filter(comparator, time)
}
end
elsif index.present?
{
query: range_filter(comparator, time),
filter: index_filter(index)
}
else
{
query: range_filter(comparator, time)
}
end
}
else
{
filter: range_filter(comparator, time)
}
end
end

{
query: {
filtered: filter_query
def using_query_query(time, comparator, indices)
if indices.any?
{
query: range_filter(comparator, time),
filter: {
bool: {
should: indices.collect { |i| index_filter(i) }
}
}
}
}
else
{
query: range_filter(comparator, time)
}
end
end

def range_filter(comparator, time)
Expand Down
2 changes: 1 addition & 1 deletion lib/chewy/rake_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def reset_index(*indexes)
index.reset!((time.to_f * 1000).round)
if index.journal?
Chewy::Journal.create
Chewy::Journal.apply_changes_from(time, only: index)
Chewy::Journal.apply_changes_from(time, only: [index])
end
end
end
Expand Down
9 changes: 8 additions & 1 deletion spec/chewy/journal_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,16 @@ def timestamp(time)
expect(CountryIndex.all.to_a.length).to eq 1

# Replay on specific index
Chewy::Journal.new(CityIndex).apply_changes_from(time)
Chewy::Journal.apply_changes_from(time, only: [CityIndex])
expect(CityIndex.all.to_a.length).to eq 2
expect(CountryIndex.all.to_a.length).to eq 1

# Replay on both
Chewy.client.delete(index: 'city', type: 'city', id: 1, refresh: true)
expect(CityIndex.all.to_a.length).to eq 1
Chewy::Journal.apply_changes_from(time, only: [CityIndex, CountryIndex])
expect(CityIndex.all.to_a.length).to eq 2
expect(CountryIndex.all.to_a.length).to eq 2
end
end
end
Expand Down

0 comments on commit 48e44db

Please sign in to comment.