Skip to content

Commit

Permalink
Use elasticsearch bulk API (#656)
Browse files Browse the repository at this point in the history
* Use elasticsearch bulk API

Signed-off-by: Pavol Loffay <ploffay@redhat.com>

* Fix review comments

Signed-off-by: Pavol Loffay <ploffay@redhat.com>
  • Loading branch information
pavolloffay authored Jan 23, 2018
1 parent 5d13dcd commit e52ecff
Show file tree
Hide file tree
Showing 23 changed files with 241 additions and 287 deletions.
10 changes: 10 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ COLORIZE=$(SED) ''/PASS/s//$(PASS)/'' | $(SED) ''/FAIL/s//$(FAIL)/''
DOCKER_NAMESPACE?=jaegertracing
DOCKER_TAG?=latest

MOCKERY=mockery

.DEFAULT_GOAL := test-and-lint

.PHONY: test-and-lint
Expand Down Expand Up @@ -236,3 +238,11 @@ thrift-image:
generate-zipkin-swagger: idl-submodule
$(SWAGGER) generate server -f ./idl/swagger/zipkin2-api.yaml -t $(SWAGGER_GEN_DIR) -O PostSpans --exclude-main
rm $(SWAGGER_GEN_DIR)/restapi/operations/post_spans_urlbuilder.go $(SWAGGER_GEN_DIR)/restapi/server.go $(SWAGGER_GEN_DIR)/restapi/configure_zipkin.go $(SWAGGER_GEN_DIR)/models/trace.go $(SWAGGER_GEN_DIR)/models/list_of_traces.go $(SWAGGER_GEN_DIR)/models/dependency_link.go

.PHONY: install-mockery
install-mockery:
go get github.com/vektra/mockery

.PHONY: generate-mocks
generate-mocks: install-mockery
$(MOCKERY) -all -dir ./pkg/es/ -output ./pkg/es/mocks && rm pkg/es/mocks/ClientBuilder.go
8 changes: 8 additions & 0 deletions cmd/collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package main

import (
"fmt"
"io"
"log"
"net"
"net/http"
Expand Down Expand Up @@ -148,6 +149,13 @@ func main() {
hc.Ready()
select {
case <-signalsChannel:
if closer, ok := spanWriter.(io.Closer); ok {
err := closer.Close()
if err != nil {
logger.Error("Failed to close span writer", zap.Error(err))
}
}

logger.Info("Jaeger Collector is finishing")
}
return nil
Expand Down
6 changes: 4 additions & 2 deletions pkg/es/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package es

import (
"context"
"io"

"gopkg.in/olivere/elastic.v5"
)
Expand All @@ -27,6 +28,7 @@ type Client interface {
Index() IndexService
Search(indices ...string) SearchService
MultiSearch() MultiSearchService
io.Closer
}

// IndicesExistsService is an abstraction for elastic.IndicesExistsService
Expand All @@ -40,13 +42,13 @@ type IndicesCreateService interface {
Do(ctx context.Context) (*elastic.IndicesCreateResult, error)
}

// IndexService is an abstraction for elastic.IndexService
// IndexService is an abstraction for elastic BulkService
type IndexService interface {
Index(index string) IndexService
Type(typ string) IndexService
Id(id string) IndexService
BodyJson(body interface{}) IndexService
Do(ctx context.Context) (*elastic.IndexResponse, error)
Add()
}

// SearchService is an abstraction for elastic.SearchService
Expand Down
61 changes: 51 additions & 10 deletions pkg/es/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,43 +15,72 @@
package config

import (
"bytes"
"context"
"time"

"github.com/pkg/errors"
"go.uber.org/zap"
"gopkg.in/olivere/elastic.v5"

"github.com/jaegertracing/jaeger/pkg/es"
)

// Configuration describes the configuration properties needed to connect to an ElasticSearch cluster
type Configuration struct {
Servers []string
Username string
Password string
Sniffer bool // https://github.com/olivere/elastic/wiki/Sniffing
MaxSpanAge time.Duration `yaml:"max_span_age"` // configures the maximum lookback on span reads
NumShards int64 `yaml:"shards"`
NumReplicas int64 `yaml:"replicas"`
Servers []string
Username string
Password string
Sniffer bool // https://github.com/olivere/elastic/wiki/Sniffing
MaxSpanAge time.Duration `yaml:"max_span_age"` // configures the maximum lookback on span reads
NumShards int64 `yaml:"shards"`
NumReplicas int64 `yaml:"replicas"`
BulkSize int
BulkWorkers int
BulkActions int
BulkFlushInterval time.Duration
}

// ClientBuilder creates new es.Client
type ClientBuilder interface {
NewClient() (es.Client, error)
NewClient(logger *zap.Logger) (es.Client, error)
GetNumShards() int64
GetNumReplicas() int64
GetMaxSpanAge() time.Duration
}

// NewClient creates a new ElasticSearch client
func (c *Configuration) NewClient() (es.Client, error) {
func (c *Configuration) NewClient(logger *zap.Logger) (es.Client, error) {
if len(c.Servers) < 1 {
return nil, errors.New("No servers specified")
}
rawClient, err := elastic.NewClient(c.GetConfigs()...)
if err != nil {
return nil, err
}
return es.WrapESClient(rawClient), nil
service, err := rawClient.BulkProcessor().
After(func(id int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) {
if err != nil {
var buffer bytes.Buffer
for i, r := range requests {
buffer.WriteString(r.String())
if i+1 < len(requests) {
buffer.WriteByte('\n')
}
}
logger.Error("Elasticsearch could not process bulk request", zap.Error(err),
zap.Any("response", response), zap.String("requests", buffer.String()))
}
}).
BulkSize(c.BulkSize).
Workers(c.BulkWorkers).
BulkActions(c.BulkActions).
FlushInterval(c.BulkFlushInterval).
Do(context.Background())
if err != nil {
return nil, err
}
return es.WrapESClient(rawClient, service), nil
}

// ApplyDefaults copies settings from source unless its own value is non-zero.
Expand All @@ -74,6 +103,18 @@ func (c *Configuration) ApplyDefaults(source *Configuration) {
if c.NumReplicas == 0 {
c.NumReplicas = source.NumReplicas
}
if c.BulkSize == 0 {
c.BulkSize = source.BulkSize
}
if c.BulkWorkers == 0 {
c.BulkWorkers = source.BulkWorkers
}
if c.BulkActions == 0 {
c.BulkActions = source.BulkActions
}
if c.BulkFlushInterval == 0 {
c.BulkFlushInterval = source.BulkFlushInterval
}
}

// GetNumShards returns number of shards from Configuration
Expand Down
16 changes: 15 additions & 1 deletion pkg/es/mocks/Client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 6 additions & 26 deletions pkg/es/mocks/IndexService.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/es/mocks/IndicesCreateService.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/es/mocks/IndicesExistsService.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/es/mocks/MultiSearchService.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/es/mocks/SearchService.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 22 additions & 14 deletions pkg/es/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@ import (

// ESClient is a wrapper around elastic.Client
type ESClient struct {
client *elastic.Client
client *elastic.Client
bulkService *elastic.BulkProcessor
}

// WrapESClient creates a ESClient out of *elastic.Client.
func WrapESClient(client *elastic.Client) ESClient {
return ESClient{client: client}
func WrapESClient(client *elastic.Client, s *elastic.BulkProcessor) ESClient {
return ESClient{client: client, bulkService: s}
}

// IndexExists calls this function to internal client.
Expand All @@ -44,7 +45,8 @@ func (c ESClient) CreateIndex(index string) IndicesCreateService {

// Index calls this function to internal client.
func (c ESClient) Index() IndexService {
return WrapESIndexService(c.client.Index())
r := elastic.NewBulkIndexRequest()
return WrapESIndexService(r, c.bulkService)
}

// Search calls this function to internal client.
Expand All @@ -57,6 +59,11 @@ func (c ESClient) MultiSearch() MultiSearchService {
return WrapESMultiSearchService(c.client.MultiSearch())
}

// Close closes ESClient and flushes all data to the storage.
func (c ESClient) Close() error {
return c.bulkService.Close()
}

// ---

// ESIndicesExistsService is a wrapper around elastic.IndicesExistsService
Expand Down Expand Up @@ -100,37 +107,38 @@ func (c ESIndicesCreateService) Do(ctx context.Context) (*elastic.IndicesCreateR

// ESIndexService is a wrapper around elastic.ESIndexService
type ESIndexService struct {
indexService *elastic.IndexService
bulkIndexReq *elastic.BulkIndexRequest
bulkService *elastic.BulkProcessor
}

// WrapESIndexService creates an ESIndexService out of *elastic.ESIndexService.
func WrapESIndexService(indexService *elastic.IndexService) ESIndexService {
return ESIndexService{indexService: indexService}
func WrapESIndexService(indexService *elastic.BulkIndexRequest, bulkService *elastic.BulkProcessor) ESIndexService {
return ESIndexService{bulkIndexReq: indexService, bulkService: bulkService}
}

// Index calls this function to internal service.
func (i ESIndexService) Index(index string) IndexService {
return WrapESIndexService(i.indexService.Index(index))
return WrapESIndexService(i.bulkIndexReq.Index(index), i.bulkService)
}

// Type calls this function to internal service.
func (i ESIndexService) Type(typ string) IndexService {
return WrapESIndexService(i.indexService.Type(typ))
return WrapESIndexService(i.bulkIndexReq.Type(typ), i.bulkService)
}

// Id calls this function to internal service.
func (i ESIndexService) Id(id string) IndexService {
return WrapESIndexService(i.indexService.Id(id))
return WrapESIndexService(i.bulkIndexReq.Id(id), i.bulkService)
}

// BodyJson calls this function to internal service.
func (i ESIndexService) BodyJson(body interface{}) IndexService {
return WrapESIndexService(i.indexService.BodyJson(body))
return WrapESIndexService(i.bulkIndexReq.Doc(body), i.bulkService)
}

// Do calls this function to internal service.
func (i ESIndexService) Do(ctx context.Context) (*elastic.IndexResponse, error) {
return i.indexService.Do(ctx)
// Add adds the request to bulk service
func (i ESIndexService) Add() {
i.bulkService.Add(i.bulkIndexReq)
}

// ---
Expand Down
6 changes: 6 additions & 0 deletions plugin/storage/cassandra/spanstore/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,12 @@ func NewSpanWriter(
}
}

// Close closes SpanWriter
func (s *SpanWriter) Close() error {
s.session.Close()
return nil
}

// WriteSpan saves the span into Cassandra
func (s *SpanWriter) WriteSpan(span *model.Span) error {
ds := dbmodel.FromDomain(span)
Expand Down
Loading

0 comments on commit e52ecff

Please sign in to comment.