Skip to content

Commit

Permalink
Change tootctl search deploy algorithm (mastodon#14300)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gargron authored and Mage committed Jan 14, 2022
1 parent 2d40190 commit c14c98e
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 17 deletions.
2 changes: 1 addition & 1 deletion app/chewy/statuses_index.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class StatusesIndex < Chewy::Index
},
}

define_type ::Status.unscoped.kept.without_reblogs.includes(:media_attachments), delete_if: ->(status) { status.searchable_by.empty? } do
define_type ::Status.unscoped.kept.without_reblogs.includes(:media_attachments, :preloadable_poll) do
crutch :mentions do |collection|
data = ::Mention.where(status_id: collection.map(&:id)).where(account: Account.local, silent: false).pluck(:status_id, :account_id)
data.each.with_object({}) { |(id, name), result| (result[id] ||= []).push(name) }
Expand Down
1 change: 1 addition & 0 deletions lib/mastodon/cli_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
ActiveJob::Base.logger = dev_null
HttpLog.configuration.logger = dev_null
Paperclip.options[:log] = false
Chewy.logger = dev_null

module Mastodon
module CLIHelper
Expand Down
142 changes: 126 additions & 16 deletions lib/mastodon/search_cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,36 +6,146 @@

module Mastodon
class SearchCLI < Thor
option :processes, default: 2, aliases: [:p]
desc 'deploy', 'Create or update an ElasticSearch index and populate it'
include CLIHelper

# Indices are sorted by amount of data to be expected in each, so that
# smaller indices can go online sooner
INDICES = [
AccountsIndex,
TagsIndex,
StatusesIndex,
].freeze

option :concurrency, type: :numeric, default: 2, aliases: [:c], desc: 'Workload will be split between this number of threads'
option :only, type: :array, enum: %w(accounts tags statuses), desc: 'Only process these indices'
desc 'deploy', 'Create or upgrade ElasticSearch indices and populate them'
long_desc <<~LONG_DESC
If ElasticSearch is empty, this command will create the necessary indices
and then import data from the database into those indices.
This command will also upgrade indices if the underlying schema has been
changed since the last run.
With the --processes option, parallelize execution of the command. The
default is 2. If "auto" is specified, the number is automatically
derived from available CPUs.
Even if creating or upgrading indices is not necessary, data from the
database will be imported into the indices.
LONG_DESC
def deploy
processed = Chewy::RakeHelper.upgrade(parallel: processes)
Chewy::RakeHelper.sync(except: processed, parallel: processes)
end
if options[:concurrency] < 1
say('Cannot run with this concurrency setting, must be at least 1', :red)
exit(1)
end

indices = begin
if options[:only]
options[:only].map { |str| "#{str.camelize}Index".constantize }
else
INDICES
end
end

progress = ProgressBar.create(total: nil, format: '%t%c/%u |%b%i| %e (%r docs/s)', autofinish: false)

# First, ensure all indices are created and have the correct
# structure, so that live data can already be written
indices.select { |index| index.specification.changed? }.each do |index|
progress.title = "Upgrading #{index} "
index.purge
index.specification.lock!
end

ActiveRecord::Base.configurations[Rails.env]['pool'] = options[:concurrency] + 1

pool = Concurrent::FixedThreadPool.new(options[:concurrency])
added = Concurrent::AtomicFixnum.new(0)
removed = Concurrent::AtomicFixnum.new(0)

progress.title = 'Estimating workload '

# Estimate the amount of data that has to be imported first
indices.each do |index|
index.types.each do |type|
progress.total = (progress.total || 0) + type.adapter.default_scope.count
end
end

# Now import all the actual data. Mind that unlike chewy:sync, we don't
# fetch and compare all record IDs from the database and the index to
# find out which to add and which to remove from the index. Because with
# potentially millions of rows, the memory footprint of such a calculation
# is uneconomical. So we only ever add.
indices.each do |index|
progress.title = "Importing #{index} "
batch_size = 1_000
slice_size = (batch_size / options[:concurrency]).ceil

index.types.each do |type|
type.adapter.default_scope.reorder(nil).find_in_batches(batch_size: batch_size) do |batch|
futures = []

batch.each_slice(slice_size) do |records|
futures << Concurrent::Future.execute(executor: pool) do
begin
if !progress.total.nil? && progress.progress + records.size > progress.total
# The number of items has changed between start and now,
# since there is no good way to predict the final count from
# here, just change the progress bar to an indeterminate one

progress.total = nil
end

grouped_records = nil
bulk_body = nil
index_count = 0
delete_count = 0

ActiveRecord::Base.connection_pool.with_connection do
grouped_records = type.adapter.send(:grouped_objects, records)
bulk_body = Chewy::Type::Import::BulkBuilder.new(type, grouped_records).bulk_body
end

private
index_count = grouped_records[:index].size if grouped_records.key?(:index)
delete_count = grouped_records[:delete].size if grouped_records.key?(:delete)

def processes
return true if options[:processes] == 'auto'
# The following is an optimization for statuses specifically, since
# we want to de-index statuses that cannot be searched by anybody,
# but can't use Chewy's delete_if logic because it doesn't use
# crutches and our searchable_by logic depends on them
if type == StatusesIndex::Status
bulk_body.map! do |entry|
if entry[:index] && entry.dig(:index, :data, 'searchable_by').blank?
index_count -= 1
delete_count += 1

num = options[:processes].to_i
{ delete: entry[:index].except(:data) }
else
entry
end
end
end

if num < 2
nil
else
num
Chewy::Type::Import::BulkRequest.new(type).perform(bulk_body)

progress.progress += records.size

added.increment(index_count)
removed.increment(delete_count)

sleep 1
rescue => e
progress.log pastel.red("Error importing #{index}: #{e}")
end
end
end

futures.map(&:value)
end
end
end

progress.title = ''
progress.stop

say("Indexed #{added.value} records, de-indexed #{removed.value}", :green, true)
end
end
end

0 comments on commit c14c98e

Please sign in to comment.