Skip to content

Commit

Permalink
[storage] Emit spans for Elasticsearch backend (#1128)
Browse files Browse the repository at this point in the history
* [Storage] Emit spans for elastic storage backend

Signed-off-by: Annanay <annanay.a@media.net>

* [Storage] Pass context vis-a-vis span, add tags to span

Signed-off-by: Annanay <annanay.a@media.net>

* [Storage] Edit span tags to add more useful information

Signed-off-by: Annanay <annanay.a@media.net>

* Fix test coverage

Signed-off-by: Annanay <annanay.a@media.net>
  • Loading branch information
annanay25 authored and yurishkuro committed Nov 11, 2018
1 parent de8a9ad commit be62816
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 6 deletions.
35 changes: 30 additions & 5 deletions plugin/storage/es/spanstore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import (
"fmt"
"time"

"github.com/opentracing/opentracing-go"
ottag "github.com/opentracing/opentracing-go/ext"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/pkg/errors"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"
Expand Down Expand Up @@ -130,8 +133,10 @@ func newSpanReader(p SpanReaderParams) *SpanReader {

// GetTrace takes a traceID and returns a Trace associated with that traceID
func (s *SpanReader) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "GetTrace")
defer span.Finish()
currentTime := time.Now()
traces, err := s.multiRead([]string{traceID.String()}, currentTime.Add(-s.maxSpanAge), currentTime)
traces, err := s.multiRead(ctx, []string{traceID.String()}, currentTime.Add(-s.maxSpanAge), currentTime)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -183,13 +188,17 @@ func (s *SpanReader) indicesForTimeRange(indexName string, startTime time.Time,

// GetServices returns all services traced by Jaeger, ordered by frequency
func (s *SpanReader) GetServices(ctx context.Context) ([]string, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "GetServices")
defer span.Finish()
currentTime := time.Now()
jaegerIndices := s.indicesForTimeRange(s.serviceIndexPrefix, currentTime.Add(-s.maxSpanAge), currentTime)
return s.serviceOperationStorage.getServices(jaegerIndices)
}

// GetOperations returns all operations for a specific service traced by Jaeger
func (s *SpanReader) GetOperations(ctx context.Context, service string) ([]string, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "GetOperations")
defer span.Finish()
currentTime := time.Now()
jaegerIndices := s.indicesForTimeRange(s.serviceIndexPrefix, currentTime.Add(-s.maxSpanAge), currentTime)
return s.serviceOperationStorage.getOperations(jaegerIndices, service)
Expand All @@ -209,20 +218,27 @@ func bucketToStringArray(buckets []*elastic.AggregationBucketKeyItem) ([]string,

// FindTraces retrieves traces that match the traceQuery
func (s *SpanReader) FindTraces(ctx context.Context, traceQuery *spanstore.TraceQueryParameters) ([]*model.Trace, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "FindTraces")
defer span.Finish()

if err := validateQuery(traceQuery); err != nil {
return nil, err
}
if traceQuery.NumTraces == 0 {
traceQuery.NumTraces = defaultNumTraces
}
uniqueTraceIDs, err := s.findTraceIDs(traceQuery)
uniqueTraceIDs, err := s.findTraceIDs(ctx, traceQuery)
if err != nil {
return nil, err
}
return s.multiRead(uniqueTraceIDs, traceQuery.StartTimeMin, traceQuery.StartTimeMax)
return s.multiRead(ctx, uniqueTraceIDs, traceQuery.StartTimeMin, traceQuery.StartTimeMax)
}

func (s *SpanReader) multiRead(traceIDs []string, startTime, endTime time.Time) ([]*model.Trace, error) {
func (s *SpanReader) multiRead(ctx context.Context, traceIDs []string, startTime, endTime time.Time) ([]*model.Trace, error) {

childSpan, _ := opentracing.StartSpanFromContext(ctx, "multiRead")
childSpan.LogFields(otlog.Object("trace_ids", traceIDs))
defer childSpan.Finish()

if len(traceIDs) == 0 {
return []*model.Trace{}, nil
Expand Down Expand Up @@ -256,6 +272,7 @@ func (s *SpanReader) multiRead(traceIDs []string, startTime, endTime time.Time)
results, err := s.client.MultiSearch().Add(searchRequests...).Index(indices...).Do(s.ctx)

if err != nil {
logErrorToSpan(childSpan, err)
return nil, err
}

Expand All @@ -269,6 +286,7 @@ func (s *SpanReader) multiRead(traceIDs []string, startTime, endTime time.Time)
}
spans, err := s.collectSpans(result.Hits.Hits)
if err != nil {
logErrorToSpan(childSpan, err)
return nil, err
}
lastSpan := spans[len(spans)-1]
Expand Down Expand Up @@ -313,7 +331,9 @@ func validateQuery(p *spanstore.TraceQueryParameters) error {
return nil
}

func (s *SpanReader) findTraceIDs(traceQuery *spanstore.TraceQueryParameters) ([]string, error) {
func (s *SpanReader) findTraceIDs(ctx context.Context, traceQuery *spanstore.TraceQueryParameters) ([]string, error) {
childSpan, _ := opentracing.StartSpanFromContext(ctx, "findTraceIDs")
defer childSpan.Finish()
// Below is the JSON body to our HTTP GET request to ElasticSearch. This function creates this.
// {
// "size": 0,
Expand Down Expand Up @@ -492,3 +512,8 @@ func (s *SpanReader) buildObjectQuery(field string, k string, v string) elastic.
keyQuery := elastic.NewMatchQuery(keyField, v)
return elastic.NewBoolQuery().Must(keyQuery)
}

func logErrorToSpan(span opentracing.Span, err error) {
ottag.Error.Set(span, true)
span.LogFields(otlog.Error(err))
}
2 changes: 1 addition & 1 deletion plugin/storage/es/spanstore/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ func returnSearchFunc(typ string, r *spanReaderTest) ([]string, error) {
} else if typ == operationsAggregation {
return r.reader.GetOperations(context.Background(), "someService")
} else if typ == traceIDAggregation {
return r.reader.findTraceIDs(&spanstore.TraceQueryParameters{})
return r.reader.findTraceIDs(context.Background(), &spanstore.TraceQueryParameters{})
}
return nil, errors.New("Specify services, operations, traceIDs only")
}
Expand Down

0 comments on commit be62816

Please sign in to comment.