Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The implementation of FindTraceIDs function for ElasticSearch reader. #1280

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
23bbaf5
The implementations of "FindTraceIDs" function for ElasticSearch reader.
Jan 11, 2019
c55fde1
The using validation of query and added default numTraces value.
Jan 11, 2019
4350e3a
The tests for FindTraceIDs were added.
Jan 11, 2019
0ace6df
The test for checking incorrect traceID.
Jan 11, 2019
d4a2644
The tests are fixed.
Jan 11, 2019
e90848d
The improvements by code-review.
Jan 15, 2019
ae75936
The new function validateQueryAndFindTraceIDs.
Jan 15, 2019
8a1ec2e
Merge branch 'master' into find_trace_ids_for_es_reader
vlamug Jan 16, 2019
0c399da
Merge branch 'master' into find_trace_ids_for_es_reader
vprithvi Jan 28, 2019
424bc31
The span validateQueryAndFindTraceIDs was removed.
Jan 29, 2019
5aae75f
The using slice of model.TraceID instead slice of string in multiRead…
Jan 29, 2019
7a70584
Merge remote-tracking branch 'origin/find_trace_ids_for_es_reader' in…
Jan 29, 2019
2600634
The converting slice of string to slice of mode.TraceID was extracted…
Jan 30, 2019
b108809
The method "convertTraceIDsModelsToStrings" was added to convert slic…
Jan 30, 2019
debd4d6
The fix of passing traceID into NewTermQuery function.
Jan 30, 2019
66f1851
Merge branch 'master' into find_trace_ids_for_es_reader
vprithvi Jan 30, 2019
75c30d3
The convertTraceIDsModelsToStrings method was remove, because it is r…
Jan 31, 2019
0e7139a
Merge remote-tracking branch 'origin/find_trace_ids_for_es_reader' in…
Jan 31, 2019
49a132a
Merge branch 'master' into find_trace_ids_for_es_reader
vprithvi Jan 31, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 41 additions & 19 deletions plugin/storage/es/spanstore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (s *SpanReader) GetTrace(ctx context.Context, traceID model.TraceID) (*mode
span, ctx := opentracing.StartSpanFromContext(ctx, "GetTrace")
defer span.Finish()
currentTime := time.Now()
traces, err := s.multiRead(ctx, []string{traceID.String()}, currentTime.Add(-s.maxSpanAge), currentTime)
traces, err := s.multiRead(ctx, []model.TraceID{traceID}, currentTime.Add(-s.maxSpanAge), currentTime)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -251,25 +251,34 @@ func (s *SpanReader) FindTraces(ctx context.Context, traceQuery *spanstore.Trace
span, ctx := opentracing.StartSpanFromContext(ctx, "FindTraces")
defer span.Finish()

uniqueTraceIDs, err := s.FindTraceIDs(ctx, traceQuery)
if err != nil {
return nil, err
}
return s.multiRead(ctx, uniqueTraceIDs, traceQuery.StartTimeMin, traceQuery.StartTimeMax)
}

// FindTraceIDs retrieves traces IDs that match the traceQuery
func (s *SpanReader) FindTraceIDs(ctx context.Context, traceQuery *spanstore.TraceQueryParameters) ([]model.TraceID, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "FindTraceIDs")
defer span.Finish()

if err := validateQuery(traceQuery); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see a lot of repeated code between FindTraces and FindTraceIDs, are we able to refactor such that the initial validation and retrieval are shared?

Also, could you create a new span here - similar to L221

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, thanks.

return nil, err
}
if traceQuery.NumTraces == 0 {
traceQuery.NumTraces = defaultNumTraces
}
uniqueTraceIDs, err := s.findTraceIDs(ctx, traceQuery)

esTraceIDs, err := s.findTraceIDs(ctx, traceQuery)
if err != nil {
return nil, err
}
return s.multiRead(ctx, uniqueTraceIDs, traceQuery.StartTimeMin, traceQuery.StartTimeMax)
}

// FindTraceIDs is not implemented.
func (s *SpanReader) FindTraceIDs(ctx context.Context, traceQuery *spanstore.TraceQueryParameters) ([]model.TraceID, error) {
return nil, errors.New("not implemented") // TODO: Implement
return convertTraceIDsStringsToModels(esTraceIDs)
}

func (s *SpanReader) multiRead(ctx context.Context, traceIDs []string, startTime, endTime time.Time) ([]*model.Trace, error) {
func (s *SpanReader) multiRead(ctx context.Context, traceIDs []model.TraceID, startTime, endTime time.Time) ([]*model.Trace, error) {

childSpan, _ := opentracing.StartSpanFromContext(ctx, "multiRead")
childSpan.LogFields(otlog.Object("trace_ids", traceIDs))
Expand All @@ -286,16 +295,16 @@ func (s *SpanReader) multiRead(ctx context.Context, traceIDs []string, startTime
indices := s.timeRangeIndices(s.spanIndexPrefix, startTime.Add(-time.Hour), endTime.Add(time.Hour))
nextTime := model.TimeAsEpochMicroseconds(startTime.Add(-time.Hour))

searchAfterTime := make(map[string]uint64)
totalDocumentsFetched := make(map[string]int)
tracesMap := make(map[string]*model.Trace)
searchAfterTime := make(map[model.TraceID]uint64)
totalDocumentsFetched := make(map[model.TraceID]int)
tracesMap := make(map[model.TraceID]*model.Trace)
for {
if len(traceIDs) == 0 {
break
}

for i, traceID := range traceIDs {
query := elastic.NewTermQuery("traceID", traceID)
query := elastic.NewTermQuery("traceID", traceID.String())
if val, ok := searchAfterTime[traceID]; ok {
nextTime = val
}
Expand Down Expand Up @@ -333,18 +342,17 @@ func (s *SpanReader) multiRead(ctx context.Context, traceIDs []string, startTime
return nil, err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that the integration tests failed. Perhaps L298 requires a traceID.String()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Thanks 👍

}
lastSpan := spans[len(spans)-1]
lastSpanTraceID := lastSpan.TraceID.String()

if traceSpan, ok := tracesMap[lastSpanTraceID]; ok {
if traceSpan, ok := tracesMap[lastSpan.TraceID]; ok {
traceSpan.Spans = append(traceSpan.Spans, spans...)
} else {
tracesMap[lastSpanTraceID] = &model.Trace{Spans: spans}
tracesMap[lastSpan.TraceID] = &model.Trace{Spans: spans}
}

totalDocumentsFetched[lastSpanTraceID] = totalDocumentsFetched[lastSpanTraceID] + len(result.Hits.Hits)
if totalDocumentsFetched[lastSpanTraceID] < int(result.TotalHits()) {
traceIDs = append(traceIDs, lastSpanTraceID)
searchAfterTime[lastSpanTraceID] = model.TimeAsEpochMicroseconds(lastSpan.StartTime)
totalDocumentsFetched[lastSpan.TraceID] = totalDocumentsFetched[lastSpan.TraceID] + len(result.Hits.Hits)
if totalDocumentsFetched[lastSpan.TraceID] < int(result.TotalHits()) {
traceIDs = append(traceIDs, lastSpan.TraceID)
searchAfterTime[lastSpan.TraceID] = model.TimeAsEpochMicroseconds(lastSpan.StartTime)
}
}
}
Expand All @@ -355,6 +363,20 @@ func (s *SpanReader) multiRead(ctx context.Context, traceIDs []string, startTime
return traces, nil
}

func convertTraceIDsStringsToModels(traceIDs []string) ([]model.TraceID, error) {
traceIDsModels := make([]model.TraceID, len(traceIDs))
for i, ID := range traceIDs {
traceID, err := model.TraceIDFromString(ID)
if err != nil {
return nil, errors.Wrap(err, fmt.Sprintf("Making traceID from string '%s' failed", ID))
}

traceIDsModels[i] = traceID
}

return traceIDsModels, nil
}

func validateQuery(p *spanstore.TraceQueryParameters) error {
if p == nil {
return ErrMalformedRequestObject
Expand Down
49 changes: 26 additions & 23 deletions plugin/storage/es/spanstore/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ import (
"testing"
"time"

"github.com/uber/jaeger-lib/metrics/metricstest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/uber/jaeger-lib/metrics"
"github.com/uber/jaeger-lib/metrics/metricstest"
"go.uber.org/zap"
"gopkg.in/olivere/elastic.v5"

Expand Down Expand Up @@ -145,23 +145,23 @@ func TestSpanReaderIndices(t *testing.T) {
dateFormat := date.UTC().Format("2006-01-02")
testCases := []struct {
indices []string
params SpanReaderParams
params SpanReaderParams
}{
{params:SpanReaderParams{Client:client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix:"", Archive: false},
indices: []string{spanIndex+dateFormat}},
{params:SpanReaderParams{Client:client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix:"foo:", Archive: false},
indices: []string{"foo:"+indexPrefixSeparator+spanIndex+dateFormat,"foo:"+indexPrefixSeparatorDeprecated+spanIndex+dateFormat}},
{params:SpanReaderParams{Client:client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix:"", Archive: true},
indices: []string{spanIndex+archiveIndexSuffix}},
{params:SpanReaderParams{Client:client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix:"foo:", Archive: true},
indices: []string{"foo:"+indexPrefixSeparator+spanIndex+archiveIndexSuffix}},
{params:SpanReaderParams{Client:client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix:"foo:", Archive: true, UseReadWriteAliases:true},
indices: []string{"foo:"+indexPrefixSeparator+spanIndex+archiveReadIndexSuffix}},
{params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "", Archive: false},
indices: []string{spanIndex + dateFormat}},
{params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "foo:", Archive: false},
indices: []string{"foo:" + indexPrefixSeparator + spanIndex + dateFormat, "foo:" + indexPrefixSeparatorDeprecated + spanIndex + dateFormat}},
{params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "", Archive: true},
indices: []string{spanIndex + archiveIndexSuffix}},
{params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "foo:", Archive: true},
indices: []string{"foo:" + indexPrefixSeparator + spanIndex + archiveIndexSuffix}},
{params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "foo:", Archive: true, UseReadWriteAliases: true},
indices: []string{"foo:" + indexPrefixSeparator + spanIndex + archiveReadIndexSuffix}},
}
for _, testCase := range testCases {
r := NewSpanReader(testCase.params)
Expand Down Expand Up @@ -702,12 +702,15 @@ func TestFindTraceIDs(t *testing.T) {
testGet(traceIDAggregation, t)
}

func TestFindTraceIDNotImplemented(t *testing.T) {
withSpanReader(func(r *spanReaderTest) {
traceIDs, err := r.reader.FindTraceIDs(context.Background(), nil)
assert.Nil(t, traceIDs)
assert.EqualError(t, err, "not implemented")
})
func TestTraceIDsStringsToModelsConversion(t *testing.T) {
traceIDs, err := convertTraceIDsStringsToModels([]string{"1", "2", "3"})
assert.NoError(t, err)
assert.Equal(t, 3, len(traceIDs))
assert.Equal(t, "1", traceIDs[0].String())

traceIDs, err = convertTraceIDsStringsToModels([]string{"dsfjsdklfjdsofdfsdbfkgbgoaemlrksdfbsdofgerjl"})
assert.EqualError(t, err, "Making traceID from string 'dsfjsdklfjdsofdfsdbfkgbgoaemlrksdfbsdofgerjl' failed: TraceID cannot be longer than 32 hex characters: dsfjsdklfjdsofdfsdbfkgbgoaemlrksdfbsdofgerjl")
assert.Equal(t, 0, len(traceIDs))
}

func mockMultiSearchService(r *spanReaderTest) *mock.Call {
Expand Down