Skip to content
This repository has been archived by the owner on Dec 1, 2018. It is now read-only.

ElasticSearch 5 support #1607

Merged
merged 3 commits into from
Jun 23, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Godeps/Godeps.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

156 changes: 89 additions & 67 deletions common/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ import (
"time"

"github.com/golang/glog"
"github.com/pborman/uuid"

"gopkg.in/olivere/elastic.v3"
"errors"
elastic2 "gopkg.in/olivere/elastic.v3"
elastic5 "gopkg.in/olivere/elastic.v5"
"os"
)

Expand All @@ -32,10 +33,9 @@ const (
)

type ElasticSearchService struct {
EsClient *elastic.Client
bulkProcessor *elastic.BulkProcessor
baseIndex string
ClusterName string
EsClient *esClient
baseIndex string
ClusterName string
}

func (esSvc *ElasticSearchService) Index(date time.Time) string {
Expand All @@ -46,7 +46,7 @@ func (esSvc *ElasticSearchService) IndexAlias(date time.Time, typeName string) s
}

func (esSvc *ElasticSearchService) FlushData() error {
return esSvc.bulkProcessor.Flush()
return esSvc.EsClient.FlushBulk()
}

// SaveDataIntoES save metrics and events to ES by using ES client
Expand All @@ -58,44 +58,63 @@ func (esSvc *ElasticSearchService) SaveData(date time.Time, typeName string, sin
indexName := esSvc.Index(date)

// Use the IndexExists service to check if a specified index exists.
exists, err := esSvc.EsClient.IndexExists(indexName).Do()
exists, err := esSvc.EsClient.IndexExists(indexName)
if err != nil {
return err
}

if !exists {
// Create a new index.
createIndex, err := esSvc.EsClient.CreateIndex(indexName).BodyString(mapping).Do()
createIndex, err := esSvc.EsClient.CreateIndex(indexName, mapping)
if err != nil {
return err
}
if !createIndex.Acknowledged {
return fmt.Errorf("Failed to create Index in ES cluster: %s", err)

ack := false
switch i := createIndex.(type) {
case *elastic2.IndicesCreateResult:
ack = i.Acknowledged
case *elastic5.IndicesCreateResult:
ack = i.Acknowledged
}
if !ack {
return errors.New("Failed to acknoledge index creation")
}
}

aliases, err := esSvc.EsClient.Aliases().Index(indexName).Do()
aliases, err := esSvc.EsClient.GetAliases(indexName)
if err != nil {
return err
}
aliasName := esSvc.IndexAlias(date, typeName)
if !aliases.Indices[indexName].HasAlias(aliasName) {
createAlias, err := esSvc.EsClient.Alias().Add(indexName, esSvc.IndexAlias(date, typeName)).Do()

hasAlias := false
switch a := aliases.(type) {
case *elastic2.AliasesResult:
hasAlias = a.Indices[indexName].HasAlias(aliasName)
case *elastic5.AliasesResult:
hasAlias = a.Indices[indexName].HasAlias(aliasName)
}
if !hasAlias {
createAlias, err := esSvc.EsClient.AddAlias(indexName, esSvc.IndexAlias(date, typeName))
if err != nil {
return err
}
if !createAlias.Acknowledged {
return fmt.Errorf("Failed to create Index Alias in ES cluster: %s", err)

ack := false
switch i := createAlias.(type) {
case *elastic2.AliasResult:
ack = i.Acknowledged
case *elastic5.AliasResult:
ack = i.Acknowledged
}
if !ack {
return errors.New("Failed to acknoledge index alias creation")
}
}

for _, data := range sinkData {
indexID := uuid.NewUUID()
req := elastic.NewBulkIndexRequest().
Index(indexName).
Type(typeName).
Id(indexID.String()).
Doc(data)
esSvc.bulkProcessor.Add(req)
esSvc.EsClient.AddBulkReq(indexName, typeName, data)
}

return nil
Expand All @@ -111,6 +130,14 @@ func CreateElasticSearchService(uri *url.URL) (*ElasticSearchService, error) {
return nil, fmt.Errorf("Failed to parser url's query string: %s", err)
}

version := 5
if len(opts["ver"]) > 0 {
version, err = strconv.Atoi(opts["ver"][0])
if err != nil {
return nil, fmt.Errorf("Failed to parse URL's version value into an int: %v", err)
}
}

esSvc.ClusterName = ESClusterName
if len(opts["cluster_name"]) > 0 {
esSvc.ClusterName = opts["cluster_name"][0]
Expand All @@ -122,45 +149,53 @@ func CreateElasticSearchService(uri *url.URL) (*ElasticSearchService, error) {
esSvc.baseIndex = opts["index"][0]
}

var startupFnsV5 []elastic5.ClientOptionFunc
var startupFnsV2 []elastic2.ClientOptionFunc

// Set the URL endpoints of the ES's nodes. Notice that when sniffing is
// enabled, these URLs are used to initially sniff the cluster on startup.
var startupFns []elastic.ClientOptionFunc
if len(opts["nodes"]) > 0 {
startupFns = append(startupFns, elastic.SetURL(opts["nodes"]...))
startupFnsV2 = append(startupFnsV2, elastic2.SetURL(opts["nodes"]...))
startupFnsV5 = append(startupFnsV5, elastic5.SetURL(opts["nodes"]...))
} else if uri.Scheme != "" && uri.Host != "" {
startupFns = append(startupFns, elastic.SetURL(uri.Scheme+"://"+uri.Host))
startupFnsV2 = append(startupFnsV2, elastic2.SetURL(uri.Scheme+"://"+uri.Host))
startupFnsV5 = append(startupFnsV5, elastic5.SetURL(uri.Scheme+"://"+uri.Host))
} else {
return nil, fmt.Errorf("There is no node assigned for connecting ES cluster")
return nil, errors.New("There is no node assigned for connecting ES cluster")
}

// If the ES cluster needs authentication, the username and secret
// should be set in sink config.Else, set the Authenticate flag to false
if len(opts["esUserName"]) > 0 && len(opts["esUserSecret"]) > 0 {
startupFns = append(startupFns, elastic.SetBasicAuth(opts["esUserName"][0], opts["esUserSecret"][0]))
startupFnsV2 = append(startupFnsV2, elastic2.SetBasicAuth(opts["esUserName"][0], opts["esUserSecret"][0]))
startupFnsV5 = append(startupFnsV5, elastic5.SetBasicAuth(opts["esUserName"][0], opts["esUserSecret"][0]))
}

if len(opts["maxRetries"]) > 0 {
maxRetries, err := strconv.Atoi(opts["maxRetries"][0])
if err != nil {
return nil, fmt.Errorf("Failed to parse URL's maxRetries value into an int")
return nil, errors.New("Failed to parse URL's maxRetries value into an int")
}
startupFns = append(startupFns, elastic.SetMaxRetries(maxRetries))
startupFnsV2 = append(startupFnsV2, elastic2.SetMaxRetries(maxRetries))
startupFnsV5 = append(startupFnsV5, elastic5.SetMaxRetries(maxRetries))
}

if len(opts["healthCheck"]) > 0 {
healthCheck, err := strconv.ParseBool(opts["healthCheck"][0])
if err != nil {
return nil, fmt.Errorf("Failed to parse URL's healthCheck value into a bool")
return nil, errors.New("Failed to parse URL's healthCheck value into a bool")
}
startupFns = append(startupFns, elastic.SetHealthcheck(healthCheck))
startupFnsV2 = append(startupFnsV2, elastic2.SetHealthcheck(healthCheck))
startupFnsV5 = append(startupFnsV5, elastic5.SetHealthcheck(healthCheck))
}

if len(opts["startupHealthcheckTimeout"]) > 0 {
timeout, err := time.ParseDuration(opts["startupHealthcheckTimeout"][0] + "s")
if err != nil {
return nil, fmt.Errorf("Failed to parse URL's startupHealthcheckTimeout: %s", err.Error())
}
startupFns = append(startupFns, elastic.SetHealthcheckTimeoutStartup(timeout))
startupFnsV2 = append(startupFnsV2, elastic2.SetHealthcheckTimeoutStartup(timeout))
startupFnsV5 = append(startupFnsV5, elastic5.SetHealthcheckTimeoutStartup(timeout))
}

if os.Getenv("AWS_ACCESS_KEY_ID") != "" || os.Getenv("AWS_ACCESS_KEY") != "" ||
Expand All @@ -172,58 +207,45 @@ func CreateElasticSearchService(uri *url.URL) (*ElasticSearchService, error) {
return nil, err
}

startupFns = append(startupFns, elastic.SetHttpClient(awsClient), elastic.SetSniff(false))
startupFnsV2 = append(startupFnsV2, elastic2.SetHttpClient(awsClient), elastic2.SetSniff(false))
startupFnsV5 = append(startupFnsV5, elastic5.SetHttpClient(awsClient), elastic5.SetSniff(false))
} else {
if len(opts["sniff"]) > 0 {
sniff, err := strconv.ParseBool(opts["sniff"][0])
if err != nil {
return nil, fmt.Errorf("Failed to parse URL's sniff value into a bool")
return nil, errors.New("Failed to parse URL's sniff value into a bool")
}
startupFns = append(startupFns, elastic.SetSniff(sniff))
startupFnsV2 = append(startupFnsV2, elastic2.SetSniff(sniff))
startupFnsV5 = append(startupFnsV5, elastic5.SetSniff(sniff))
}
}

esSvc.EsClient, err = elastic.NewClient(startupFns...)
if err != nil {
return nil, fmt.Errorf("Failed to create ElasticSearch client: %v", err)
}

bulkWorkers := 5
if len(opts["bulkWorkers"]) > 0 {
bulkWorkers, err = strconv.Atoi(opts["bulkWorkers"][0])
if err != nil {
return nil, fmt.Errorf("Failed to parse URL's bulkWorkers value into an int")
return nil, errors.New("Failed to parse URL's bulkWorkers value into an int")
}
}
esSvc.bulkProcessor, err = esSvc.EsClient.BulkProcessor().
Name("ElasticSearchWorker").
Workers(bulkWorkers).
After(bulkAfterCB).
BulkActions(1000). // commit if # requests >= 1000
BulkSize(2 << 20). // commit if size of requests >= 2 MB
FlushInterval(10 * time.Second). // commit every 10s
Do()

pipeline := ""
if len(opts["pipeline"]) > 0 {
pipeline = opts["pipeline"][0]
}

switch version {
case 2:
esSvc.EsClient, err = newEsClientV2(startupFnsV2, bulkWorkers)
case 5:
esSvc.EsClient, err = newEsClientV5(startupFnsV5, bulkWorkers, pipeline)
default:
return nil, UnsupportedVersion{}
}
if err != nil {
return nil, fmt.Errorf("Failed to an ElasticSearch Bulk Processor: %v", err)
return nil, fmt.Errorf("Failed to create ElasticSearch client: %v", err)
}

glog.V(2).Infof("ElasticSearch sink configure successfully")

return &esSvc, nil
}

func bulkAfterCB(executionId int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) {
if err != nil {
glog.Warningf("Failed to execute bulk operation to ElasticSearch: %v", err)
}

if response.Errors {
for _, list := range response.Items {
for name, itm := range list {
if itm.Error != nil {
glog.V(3).Infof("Failed to execute bulk operation to ElasticSearch on %s: %v", name, itm.Error)
}
}
}
}
}
Loading