forked from geronime/es-reindex
-
Notifications
You must be signed in to change notification settings - Fork 0
/
es-reindex.rb
executable file
·192 lines (169 loc) · 5.4 KB
/
es-reindex.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
#!/usr/bin/env ruby
#encoding:utf-8
require 'bundler/setup'
require 'rest-client'
require 'oj'
VERSION = '0.0.8'
STDOUT.sync = true
if ARGV.size == 0 or ARGV[0] =~ /^-(?:h|-?help)$/
puts "
Script to copy particular ES index including its (re)creation w/options set
and mapping copied.
Usage:
#{__FILE__} [-r] [-f <frame>] [source_url/]<index> [destination_url/]<index>
- -r - remove the index in the new location first
- -f - specify frame size to be obtained with one fetch during scrolling
- -u - update existing documents (default: only create non-existing)
- optional source/destination urls default to http://127.0.0.1:9200
\n"
exit 1
end
Oj.default_options = {:mode => :compat}
remove, update, frame, src, dst = false, false, 1000, nil, nil
while ARGV[0]
case arg = ARGV.shift
when '-r' then remove = true
when '-f' then frame = ARGV.shift.to_i
when '-u' then update = true
else
u = arg.chomp '/'
!src ? (src = u) : !dst ? (dst = u) :
raise("Unexpected parameter '#{arg}'. Use '-h' for help.")
end
end
surl, durl, sidx, didx = '', '', '', ''
[[src, surl, sidx], [dst, durl, didx]].each do |param, url, idx|
if param =~ %r{^(.*)/(.*?)$}
url.replace $1
idx.replace $2
else
url.replace 'http://127.0.0.1:9200'
idx.replace param
end
end
printf "Copying '%s/%s' to '%s/%s'%s\n Confirm or hit Ctrl-c to abort...\n",
surl, sidx, durl, didx,
remove ?
' with rewriting destination mapping!' :
update ? ' with updating existing documents!' : '.'
$stdin.readline
def tm_len l
t = []
t.push l/86400; l %= 86400
t.push l/3600; l %= 3600
t.push l/60; l %= 60
t.push l
out = sprintf '%u', t.shift
out = out == '0' ? '' : out + ' days, '
out << sprintf('%u:%02u:%02u', *t)
out
end
def retried_request method, url, data=nil
while true
begin
return data ?
RestClient.send(method, url, data) :
RestClient.send(method, url)
rescue RestClient::ResourceNotFound # no point to retry
return nil
rescue => e
warn "\nRetrying #{method.to_s.upcase} ERROR: #{e.class} - #{e.message}"
warn e.response
end
end
end
# remove old index in case of remove=true
retried_request(:delete, "#{durl}/#{didx}") \
if remove && retried_request(:get, "#{durl}/#{didx}/_status")
# (re)create destination index
unless retried_request(:get, "#{durl}/#{didx}/_status")
# obtain the original index settings first
unless settings = retried_request(:get, "#{surl}/#{sidx}/_settings")
warn "Failed to obtain original index '#{surl}/#{sidx}' settings!"
exit 1
end
settings = Oj.load settings
sidx = settings.keys[0]
settings[sidx].delete 'index.version.created'
printf 'Creating \'%s/%s\' index with settings from \'%s/%s\'... ',
durl, didx, surl, sidx
unless retried_request(:post, "#{durl}/#{didx}", Oj.dump(settings[sidx]))
puts 'FAILED!'
exit 1
else
puts 'OK.'
end
unless mappings = retried_request(:get, "#{surl}/#{sidx}/_mapping")
warn "Failed to obtain original index '#{surl}/#{sidx}' mappings!"
exit 1
end
mappings = Oj.load mappings
mappings[sidx].each_pair{|type, mapping|
printf 'Copying mapping \'%s/%s/%s\'... ', durl, didx, type
unless retried_request(:put, "#{durl}/#{didx}/#{type}/_mapping",
Oj.dump({type => mapping}))
puts 'FAILED!'
exit 1
else
puts 'OK.'
end
}
end
printf "Copying '%s/%s' to '%s/%s'... \n", surl, sidx, durl, didx
t, done = Time.now, 0
shards = retried_request :get, "#{surl}/#{sidx}/_count?q=*"
shards = Oj.load(shards)['_shards']['total'].to_i
scan = retried_request(:get, "#{surl}/#{sidx}/_search" +
"?search_type=scan&scroll=10m&size=#{frame / shards}")
scan = Oj.load scan
scroll_id = scan['_scroll_id']
total = scan['hits']['total']
printf " %u/%u (%.1f%%) done.\r", done, total, 0
bulk_op = update ? 'index' : 'create'
while true do
data = retried_request(:get,
"#{surl}/_search/scroll?scroll=10m&scroll_id=#{scroll_id}")
data = Oj.load data
break if data['hits']['hits'].empty?
scroll_id = data['_scroll_id']
bulk = ''
data['hits']['hits'].each do |doc|
### === implement possible modifications to the document
### === end modifications to the document
base = {'_index' => didx, '_id' => doc['_id'], '_type' => doc['_type']}
['_timestamp', '_ttl'].each{|doc_arg|
base[doc_arg] = doc[doc_arg] if doc.key? doc_arg
}
bulk << Oj.dump({bulk_op => base}) + "\n"
bulk << Oj.dump(doc['_source']) + "\n"
done += 1
end
unless bulk.empty?
bulk << "\n" # empty line in the end required
retried_request :post, "#{durl}/_bulk", bulk
end
eta = total * (Time.now - t) / done
printf " %u/%u (%.1f%%) done in %s, E.T.A.: %s.\r",
done, total, 100.0 * done / total, tm_len(Time.now - t), t + eta
end
printf "#{' ' * 80}\r %u/%u done in %s.\n",
done, total, tm_len(Time.now - t)
# no point for large reindexation with data still being stored in index
printf 'Checking document count... '
scount, dcount = 1, 0
begin
Timeout::timeout(60) do
while true
scount = retried_request :get, "#{surl}/#{sidx}/_count?q=*"
dcount = retried_request :get, "#{durl}/#{didx}/_count?q=*"
scount = Oj.load(scount)['count'].to_i
dcount = Oj.load(dcount)['count'].to_i
break if scount == dcount
sleep 1
end
end
rescue Timeout::Error
end
printf "%u == %u (%s\n",
scount, dcount, scount == dcount ? 'equals).' : 'NOT EQUAL)!'
exit 0