-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Reindexer
Reindexing is e.g. necessary when you want to apply changes to your mapping in existing data.
There are currently two solutions for Reindexing. A home-grown version that is specific to Elastic (and available in all recent versions of Elastic), and a native Reindex API that was introduced in Elasticsearch 2.3.0.
The Reindex API in Elasticsearch 2.3.0+ is documented here. It is available in Elastic 3.0.29 or later.
Here's an example of using the Reindex API with Elastic:
elastic.v3
client, err := elastic.NewClient()
if err != nil { ... }
src := elastic.NewReindexSource().Index("source_index")
dst := elastic.NewReindexDestination().Index("target_index")
res, err := client.ReindexTask().Source(src).Destination(dst).Refresh(true).Do()
if err != nil { ... }
fmt.Printf("Reindexed a total of %d documents\n", res.Total)
elastic.v5+
client, err := elastic.NewClient()
if err != nil { ... }
src := elastic.NewReindexSource().Index("source_index")
dst := elastic.NewReindexDestination().Index("target_index")
res, err := client.Reindex().Source(src).Destination(dst).Refresh("true").Do(context.Background())
if err != nil { ... }
fmt.Printf("Reindexed a total of %d documents\n", res.Total)
The tests in reindex_test.go
illustrate how to use the API in detail.
Reindexing is e.g. necessary when you want to apply changes to your mapping in existing data. However, before 2.3.0, reindexing could only be done as a client-side operation. To make things easier for application programmers, Elastic contains a helper to reindex indices, even between clusters. You can still use this Reindexer, even with versions later than 2.3.0. The Elasticsearch documentation has a section about reindexing if you want more information.
Here's an example of the Reindexer with various options:
client, err := elastic.NewClient()
if err != nil { ... }
ix := client.Reindex("source", "target")
ix = ix.Progress(func(current, total int64) {
fmt.Printf("%d of %d\r", current, total)
})
result, err := ix.Do()
if err != nil { ... }
fmt.Printf("%d operations succeeded, %d failed", result.Success, result.Failed)
As you see in the example above, you can provide a progress callback function with Progress(func(int64,int64))
.
If you need more information about which bulk item requests exactly failed, use StatsOnly(false)
. The result will then contain all failed items in result.Errors
([]*elastic.BulkResponseItem
).
You can also specify the chunk size of bulk items sent to Elasticsearch with BulkSize(int)
. The default is 500.
Also, a scroll timeout can be specified with Scroll(string)
, e.g. Scroll("15m")
. The default is 5 minutes (5m).
The Reindexer has become more versatile with this PR (thanks to @nwolff). In fact, copying data from a source index to a target index is just a special case of the more general Reindexer.
If you use the Reindexer class directly (not via client.Reindexer(...)
), you can pass a handler that will be called for each hit. Here's an example of how to use that to e.g. preserve the _ttl
of each hit:
// Carries over the source item's ttl to the reindexed item
copyWithTTL := func(hit *elastic.SearchHit, bulkService *elastic.BulkService) error {
source := make(map[string]interface{})
if err := json.Unmarshal(*hit.Source, &source); err != nil {
return err
}
req := elastic.NewBulkIndexRequest().Index(targetIndexName).Type(hit.Type).Id(hit.Id).Doc(source)
if ttl, ok := hit.Fields["_ttl"].(float64); ok {
req.Ttl(int64(ttl))
}
bulkService.Add(req)
return nil
}
r := NewReindexer(client, sourceIndexName, copyWithTTL).ScanFields("_source", "_ttl")
ret, err := r.Do()
if err != nil {
t.Fatal(err)
}
You can also use the Reindexer to copy data from one cluster to another. You do so by providing a different client for the target. This code will use the sourceClient
to copy the sourceIndexName
to the targetIndexName
, using the targetClient
.
sourceClient, err := elastic.NewClient(elastic.SetURL("http://localhost:9200"))
...
targetClient, err := elastic.NewClient(elastic.SetURL("http://localhost:8200"))
...
r := elastic.NewReindexer(sourceClient, sourceIndexName, elastic.CopyToTargetIndex(targetIndexName))
r = r.TargetClient(targetClient)
ret, err := r.Do()
if err != nil {
t.Fatal(err)
}
More examples are available in reindexer_test.go.