-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
The implementation of FindTraceIDs function for ElasticSearch reader. #1280
Changes from 12 commits
23bbaf5
c55fde1
4350e3a
0ace6df
d4a2644
e90848d
ae75936
8a1ec2e
0c399da
424bc31
5aae75f
7a70584
2600634
b108809
debd4d6
66f1851
75c30d3
0e7139a
49a132a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -179,7 +179,7 @@ func (s *SpanReader) GetTrace(ctx context.Context, traceID model.TraceID) (*mode | |
span, ctx := opentracing.StartSpanFromContext(ctx, "GetTrace") | ||
defer span.Finish() | ||
currentTime := time.Now() | ||
traces, err := s.multiRead(ctx, []string{traceID.String()}, currentTime.Add(-s.maxSpanAge), currentTime) | ||
traces, err := s.multiRead(ctx, []model.TraceID{traceID}, currentTime.Add(-s.maxSpanAge), currentTime) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
@@ -251,25 +251,44 @@ func (s *SpanReader) FindTraces(ctx context.Context, traceQuery *spanstore.Trace | |
span, ctx := opentracing.StartSpanFromContext(ctx, "FindTraces") | ||
defer span.Finish() | ||
|
||
uniqueTraceIDs, err := s.FindTraceIDs(ctx, traceQuery) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return s.multiRead(ctx, uniqueTraceIDs, traceQuery.StartTimeMin, traceQuery.StartTimeMax) | ||
} | ||
|
||
// FindTraceIDs retrieves traces IDs that match the traceQuery | ||
func (s *SpanReader) FindTraceIDs(ctx context.Context, traceQuery *spanstore.TraceQueryParameters) ([]model.TraceID, error) { | ||
span, ctx := opentracing.StartSpanFromContext(ctx, "FindTraceIDs") | ||
defer span.Finish() | ||
|
||
if err := validateQuery(traceQuery); err != nil { | ||
return nil, err | ||
} | ||
if traceQuery.NumTraces == 0 { | ||
traceQuery.NumTraces = defaultNumTraces | ||
} | ||
uniqueTraceIDs, err := s.findTraceIDs(ctx, traceQuery) | ||
|
||
esTraceIDs, err := s.findTraceIDs(ctx, traceQuery) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return s.multiRead(ctx, uniqueTraceIDs, traceQuery.StartTimeMin, traceQuery.StartTimeMax) | ||
} | ||
|
||
// FindTraceIDs is not implemented. | ||
func (s *SpanReader) FindTraceIDs(ctx context.Context, traceQuery *spanstore.TraceQueryParameters) ([]model.TraceID, error) { | ||
return nil, errors.New("not implemented") // TODO: Implement | ||
traceIDs := make([]model.TraceID, len(esTraceIDs)) | ||
for i, ID := range esTraceIDs { | ||
traceID, err := model.TraceIDFromString(ID) | ||
if err != nil { | ||
return nil, errors.Wrap(err, fmt.Sprintf("Making traceID from string '%s' failed", ID)) | ||
} | ||
|
||
traceIDs[i] = traceID | ||
} | ||
|
||
return traceIDs, nil | ||
} | ||
|
||
func (s *SpanReader) multiRead(ctx context.Context, traceIDs []string, startTime, endTime time.Time) ([]*model.Trace, error) { | ||
func (s *SpanReader) multiRead(ctx context.Context, traceIDs []model.TraceID, startTime, endTime time.Time) ([]*model.Trace, error) { | ||
|
||
childSpan, _ := opentracing.StartSpanFromContext(ctx, "multiRead") | ||
childSpan.LogFields(otlog.Object("trace_ids", traceIDs)) | ||
|
@@ -286,9 +305,9 @@ func (s *SpanReader) multiRead(ctx context.Context, traceIDs []string, startTime | |
indices := s.timeRangeIndices(s.spanIndexPrefix, startTime.Add(-time.Hour), endTime.Add(time.Hour)) | ||
nextTime := model.TimeAsEpochMicroseconds(startTime.Add(-time.Hour)) | ||
|
||
searchAfterTime := make(map[string]uint64) | ||
totalDocumentsFetched := make(map[string]int) | ||
tracesMap := make(map[string]*model.Trace) | ||
searchAfterTime := make(map[model.TraceID]uint64) | ||
totalDocumentsFetched := make(map[model.TraceID]int) | ||
tracesMap := make(map[model.TraceID]*model.Trace) | ||
for { | ||
if len(traceIDs) == 0 { | ||
break | ||
|
@@ -333,18 +352,17 @@ func (s *SpanReader) multiRead(ctx context.Context, traceIDs []string, startTime | |
return nil, err | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I noticed that the integration tests failed. Perhaps L298 requires a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. Thanks 👍 |
||
} | ||
lastSpan := spans[len(spans)-1] | ||
lastSpanTraceID := lastSpan.TraceID.String() | ||
|
||
if traceSpan, ok := tracesMap[lastSpanTraceID]; ok { | ||
if traceSpan, ok := tracesMap[lastSpan.TraceID]; ok { | ||
traceSpan.Spans = append(traceSpan.Spans, spans...) | ||
} else { | ||
tracesMap[lastSpanTraceID] = &model.Trace{Spans: spans} | ||
tracesMap[lastSpan.TraceID] = &model.Trace{Spans: spans} | ||
} | ||
|
||
totalDocumentsFetched[lastSpanTraceID] = totalDocumentsFetched[lastSpanTraceID] + len(result.Hits.Hits) | ||
if totalDocumentsFetched[lastSpanTraceID] < int(result.TotalHits()) { | ||
traceIDs = append(traceIDs, lastSpanTraceID) | ||
searchAfterTime[lastSpanTraceID] = model.TimeAsEpochMicroseconds(lastSpan.StartTime) | ||
totalDocumentsFetched[lastSpan.TraceID] = totalDocumentsFetched[lastSpan.TraceID] + len(result.Hits.Hits) | ||
if totalDocumentsFetched[lastSpan.TraceID] < int(result.TotalHits()) { | ||
traceIDs = append(traceIDs, lastSpan.TraceID) | ||
searchAfterTime[lastSpan.TraceID] = model.TimeAsEpochMicroseconds(lastSpan.StartTime) | ||
} | ||
} | ||
} | ||
|
@@ -377,6 +395,7 @@ func validateQuery(p *spanstore.TraceQueryParameters) error { | |
func (s *SpanReader) findTraceIDs(ctx context.Context, traceQuery *spanstore.TraceQueryParameters) ([]string, error) { | ||
childSpan, _ := opentracing.StartSpanFromContext(ctx, "findTraceIDs") | ||
defer childSpan.Finish() | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: extraneous change There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, thanks! |
||
// Below is the JSON body to our HTTP GET request to ElasticSearch. This function creates this. | ||
// { | ||
// "size": 0, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -702,11 +702,142 @@ func TestFindTraceIDs(t *testing.T) { | |
testGet(traceIDAggregation, t) | ||
} | ||
|
||
func TestFindTraceIDNotImplemented(t *testing.T) { | ||
func TestSpanReader_TestFindTraceIDs(t *testing.T) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These tests look very similar to the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, of course, I will make change soon. |
||
goodAggregations := make(map[string]*json.RawMessage) | ||
rawMessage := []byte(`{"buckets": [{"key": "1","doc_count": 16},{"key": "2","doc_count": 16},{"key": "3","doc_count": 16}]}`) | ||
goodAggregations[traceIDAggregation] = (*json.RawMessage)(&rawMessage) | ||
|
||
withSpanReader(func(r *spanReaderTest) { | ||
// find trace IDs | ||
mockSearchService(r).Return(&elastic.SearchResult{Aggregations: elastic.Aggregations(goodAggregations)}, nil) | ||
|
||
traceIDsQuery := &spanstore.TraceQueryParameters{ | ||
ServiceName: serviceName, | ||
Tags: map[string]string{ | ||
"hello": "world", | ||
}, | ||
StartTimeMin: time.Now().Add(-1 * time.Hour), | ||
StartTimeMax: time.Now(), | ||
NumTraces: 3, | ||
} | ||
|
||
traceIDs, err := r.reader.FindTraceIDs(context.Background(), traceIDsQuery) | ||
require.NoError(t, err) | ||
assert.Len(t, traceIDs, 3) | ||
assert.EqualValues(t, 1, traceIDs[0].Low) | ||
}) | ||
} | ||
|
||
func TestSpanReader_FindTraceIDsInvalidQuery(t *testing.T) { | ||
goodAggregations := make(map[string]*json.RawMessage) | ||
rawMessage := []byte(`{"buckets": [{"key": "1","doc_count": 16},{"key": "2","doc_count": 16},{"key": "3","doc_count": 16}]}`) | ||
goodAggregations[traceIDAggregation] = (*json.RawMessage)(&rawMessage) | ||
|
||
withSpanReader(func(r *spanReaderTest) { | ||
traceIDs, err := r.reader.FindTraceIDs(context.Background(), nil) | ||
mockSearchService(r).Return(&elastic.SearchResult{Aggregations: elastic.Aggregations(goodAggregations)}, nil) | ||
|
||
traceIDsQuery := &spanstore.TraceQueryParameters{ | ||
ServiceName: "", | ||
Tags: map[string]string{ | ||
"hello": "world", | ||
}, | ||
StartTimeMin: time.Now().Add(-1 * time.Hour), | ||
StartTimeMax: time.Now(), | ||
} | ||
|
||
traceIDs, err := r.reader.FindTraceIDs(context.Background(), traceIDsQuery) | ||
require.Error(t, err) | ||
assert.Nil(t, traceIDs) | ||
assert.EqualError(t, err, "not implemented") | ||
}) | ||
} | ||
|
||
func TestSpanReader_FindTraceIDsAggregationFailure(t *testing.T) { | ||
goodAggregations := make(map[string]*json.RawMessage) | ||
|
||
withSpanReader(func(r *spanReaderTest) { | ||
mockSearchService(r).Return(&elastic.SearchResult{Aggregations: elastic.Aggregations(goodAggregations)}, nil) | ||
|
||
traceIDsQuery := &spanstore.TraceQueryParameters{ | ||
ServiceName: serviceName, | ||
Tags: map[string]string{ | ||
"hello": "world", | ||
}, | ||
StartTimeMin: time.Now().Add(-1 * time.Hour), | ||
StartTimeMax: time.Now(), | ||
} | ||
|
||
traceIDs, err := r.reader.FindTraceIDs(context.Background(), traceIDsQuery) | ||
require.Error(t, err) | ||
assert.Nil(t, traceIDs) | ||
}) | ||
} | ||
|
||
func TestSpanReader_FindTraceIDsNoTraceIDs(t *testing.T) { | ||
goodAggregations := make(map[string]*json.RawMessage) | ||
rawMessage := []byte(`{"buckets": []}`) | ||
goodAggregations[traceIDAggregation] = (*json.RawMessage)(&rawMessage) | ||
|
||
withSpanReader(func(r *spanReaderTest) { | ||
mockSearchService(r).Return(&elastic.SearchResult{Aggregations: elastic.Aggregations(goodAggregations)}, nil) | ||
|
||
traceIDsQuery := &spanstore.TraceQueryParameters{ | ||
ServiceName: serviceName, | ||
Tags: map[string]string{ | ||
"hello": "world", | ||
}, | ||
StartTimeMin: time.Now().Add(-1 * time.Hour), | ||
StartTimeMax: time.Now(), | ||
} | ||
|
||
traceIDs, err := r.reader.FindTraceIDs(context.Background(), traceIDsQuery) | ||
require.NoError(t, err) | ||
assert.Len(t, traceIDs, 0) | ||
}) | ||
} | ||
|
||
func TestSpanReader_FindTraceIDsReadTraceIDsFailure(t *testing.T) { | ||
goodAggregations := make(map[string]*json.RawMessage) | ||
rawMessage := []byte(`{"buckets": [{"key": "1","doc_count": 16},{"key": "2","doc_count": 16}]}`) | ||
goodAggregations[traceIDAggregation] = (*json.RawMessage)(&rawMessage) | ||
|
||
withSpanReader(func(r *spanReaderTest) { | ||
mockSearchService(r).Return(nil, errors.New("read error")) | ||
|
||
traceIDsQuery := &spanstore.TraceQueryParameters{ | ||
ServiceName: serviceName, | ||
Tags: map[string]string{ | ||
"hello": "world", | ||
}, | ||
StartTimeMin: time.Now().Add(-1 * time.Hour), | ||
StartTimeMax: time.Now(), | ||
} | ||
|
||
traceIDs, err := r.reader.FindTraceIDs(context.Background(), traceIDsQuery) | ||
require.EqualError(t, err, "Search service failed: read error") | ||
assert.Len(t, traceIDs, 0) | ||
}) | ||
} | ||
|
||
func TestSpanReader_FindTraceIDsIncorrectTraceIDFailure(t *testing.T) { | ||
goodAggregations := make(map[string]*json.RawMessage) | ||
rawMessage := []byte(`{"buckets": [{"key": "sdfsdfdssd234nsdvsdfldjsf","doc_count": 16},{"key": "2","doc_count": 16}]}`) | ||
goodAggregations[traceIDAggregation] = (*json.RawMessage)(&rawMessage) | ||
|
||
withSpanReader(func(r *spanReaderTest) { | ||
mockSearchService(r).Return(&elastic.SearchResult{Aggregations: elastic.Aggregations(goodAggregations)}, nil) | ||
|
||
traceIDsQuery := &spanstore.TraceQueryParameters{ | ||
ServiceName: serviceName, | ||
Tags: map[string]string{ | ||
"hello": "world", | ||
}, | ||
StartTimeMin: time.Now().Add(-1 * time.Hour), | ||
StartTimeMax: time.Now(), | ||
} | ||
|
||
traceIDs, err := r.reader.FindTraceIDs(context.Background(), traceIDsQuery) | ||
require.EqualError(t, err, `Making traceID from string 'sdfsdfdssd234nsdvsdfldjsf' failed: strconv.ParseUint: parsing "sdfsdfdss": invalid syntax`) | ||
assert.Len(t, traceIDs, 0) | ||
}) | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see a lot of repeated code between
FindTraces
andFindTraceIDs
, are we able to refactor such that the initial validation and retrieval are shared?Also, could you create a new span here - similar to L221
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, thanks.