diff --git a/lib/chewy/type/import.rb b/lib/chewy/type/import.rb index 16bb81316..70f0cf268 100644 --- a/lib/chewy/type/import.rb +++ b/lib/chewy/type/import.rb @@ -181,15 +181,7 @@ def import_parallel(objects, routine) ::ActiveRecord::Base.connection.reconnect! if defined?(::ActiveRecord::Base) errors, import, leftovers = process_parallel_import_results(results) - if leftovers.present? - batches = leftovers.each_slice(routine.options[:batch_size]) - results = ::Parallel.map_with_index( - batches, - routine.parallel_options, - &LEFTOVERS_WORKER.curry[self, routine.options, batches.size] - ) - errors.concat(results.flatten(1)) - end + execute_leftovers(leftovers, routine, self) payload[:import] = import payload[:errors] = payload_errors(errors) if errors.present? @@ -197,6 +189,18 @@ def import_parallel(objects, routine) end end + def execute_leftovers(leftovers, routine, self_object) + return unless leftovers.present? + + batches = leftovers.each_slice(routine.options[:batch_size]) + results = ::Parallel.map_with_index( + batches, + routine.parallel_options, + &LEFTOVERS_WORKER.curry[self_object, routine.options, batches.size] + ) + errors.concat(results.flatten(1)) + end + def process_parallel_import_results(results) results.each_with_object([[], {}, []]) do |r, (e, i, l)| e.concat(r[:errors])