Skip to content

Commit

Permalink
Fix incorrectly paginated journaling (toptal#848)
Browse files Browse the repository at this point in the history
  • Loading branch information
Danil Nurgaliev authored and Çağatay Yücelen committed Jan 28, 2023
1 parent 0be08ec commit 66db1fa
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 54 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
### Bugs Fixed

* [#842](https://github.com/toptal/chewy/issues/842): Fix `ignore_blank` handling. ([@rabotyaga][])
* [#848](https://github.com/toptal/chewy/issues/848): Fix invalid journal pagination. ([@konalegi][])

## 7.2.5 (2022-03-04)

Expand All @@ -19,7 +20,7 @@
* [#827](https://github.com/toptal/chewy/pull/827): Add `:lazy_sidekiq` strategy, that defers not only importing but also `update_index` callback evaluation for created and updated objects. ([@sl4vr][])
* [#827](https://github.com/toptal/chewy/pull/827): Add `:atomic_no_refresh` strategy. Like `:atomic`, but `refresh=false` parameter is set. ([@barthez][])
* [#827](https://github.com/toptal/chewy/pull/827): Add `:no_refresh` chain call to `update_index` matcher to ensure import was called with `refresh=false`. ([@barthez][])

### Bugs Fixed

* [#835](https://github.com/toptal/chewy/pull/835): Support keyword arguments in named scopes. ([@milk1000cc][])
Expand Down
15 changes: 11 additions & 4 deletions lib/chewy/journal.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@ def initialize(*only)
# specified indexes.
#
# @param since_time [Time, DateTime] timestamp from which changes will be applied
# @param retries [Integer] maximum number of attempts to make journal empty, 10 by default
# @param fetch_limit [Int] amount of entries to be fetched on each cycle
# @return [Integer] the amount of journal entries found
def apply(since_time, retries: 10, **import_options)
def apply(since_time, fetch_limit: 10, **import_options)
stage = 1
since_time -= 1
count = 0
while stage <= retries
entries = Chewy::Stash::Journal.entries(since_time, only: @only).to_a.presence or break

total_count = entries(since_time, fetch_limit).total_count

while count < total_count
entries = entries(since_time, fetch_limit).to_a.presence or break
count += entries.size
groups = reference_groups(entries)
ActiveSupport::Notifications.instrument 'apply_journal.chewy', stage: stage, groups: groups
Expand All @@ -46,6 +49,10 @@ def clean(until_time = nil)

private

def entries(since_time, fetch_limit)
Chewy::Stash::Journal.entries(since_time, only: @only).order(:created_at).limit(fetch_limit)
end

def reference_groups(entries)
entries.group_by(&:index_name)
.transform_keys { |index_name| Chewy.derive_name(index_name) }
Expand Down
62 changes: 13 additions & 49 deletions spec/chewy/journal_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -199,59 +199,23 @@ def timestamp(time)
end
end

context 'retries' do
let(:time) { Time.now.to_i }
before do
Timecop.freeze
Chewy.strategy(:urgent)
City.create!(id: 1)
end

after do
Chewy.strategy.pop
Timecop.return
end

specify 'journal was cleaned after the first call' do
expect(Chewy::Stash::Journal).to receive(:entries).exactly(2).and_call_original
expect(described_class.new.apply(time)).to eq(1)
end

context 'endless journal' do
let(:count_of_checks) { 10 } # default
let!(:journal_entries) do
record = Chewy::Stash::Journal.entries(time).first
Array.new(count_of_checks) do |i|
Chewy::Stash::Journal.new(
record.attributes.merge(
'created_at' => time.to_i + i,
'references' => [i.to_s]
)
)
end
end

specify '10 retries by default' do
expect(Chewy::Stash::Journal)
.to receive(:entries).exactly(count_of_checks) { [journal_entries.shift].compact }
expect(described_class.new.apply(time)).to eq(10)
end
context 'when order is not preserved' do
let(:time) { Time.now }

specify 'with :once parameter set' do
expect(Chewy::Stash::Journal)
.to receive(:entries).exactly(1) { [journal_entries.shift].compact }
expect(described_class.new.apply(time, retries: 1)).to eq(1)
it 'paginates properly through all items' do
Chewy.strategy(:urgent) do
Timecop.travel(time + 1.minute) { City.create!(id: 2) }
Timecop.travel(time + 3.minute) { City.create!(id: 4) }
Timecop.travel(time + 2.minute) { City.create!(id: 1) }
Timecop.travel(time + 4.minute) { City.create!(id: 3) }
end

context 'with retries parameter set' do
let(:retries) { 5 }
CitiesIndex.purge!
expect(CitiesIndex.all.to_a.length).to eq 0

specify do
expect(Chewy::Stash::Journal)
.to receive(:entries).exactly(retries) { [journal_entries.shift].compact }
expect(described_class.new.apply(time, retries: retries)).to eq(5)
end
end
# Replay on specific index
expect(described_class.new(CitiesIndex).apply(time, fetch_limit: 2)).to eq(4)
expect(CitiesIndex.all.to_a.map(&:id).sort).to eq([1, 2, 3, 4])
end
end
end
Expand Down

0 comments on commit 66db1fa

Please sign in to comment.