-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
The implementation of FindTraceIDs function for ElasticSearch reader. #1280
Changes from 5 commits
23bbaf5
c55fde1
4350e3a
0ace6df
d4a2644
e90848d
ae75936
8a1ec2e
0c399da
424bc31
5aae75f
7a70584
2600634
b108809
debd4d6
66f1851
75c30d3
0e7139a
49a132a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -234,9 +234,35 @@ func (s *SpanReader) FindTraces(ctx context.Context, traceQuery *spanstore.Trace | |
return s.multiRead(ctx, uniqueTraceIDs, traceQuery.StartTimeMin, traceQuery.StartTimeMax) | ||
} | ||
|
||
// FindTraceIDs is not implemented. | ||
// FindTraceIDs retrieves traces IDs that match the traceQuery | ||
func (s *SpanReader) FindTraceIDs(ctx context.Context, traceQuery *spanstore.TraceQueryParameters) ([]model.TraceID, error) { | ||
return nil, errors.New("not implemented") // TODO: Implement | ||
if err := validateQuery(traceQuery); err != nil { | ||
return nil, err | ||
} | ||
if traceQuery.NumTraces == 0 { | ||
traceQuery.NumTraces = defaultNumTraces | ||
} | ||
|
||
esTraceIDs, err := s.findTraceIDs(ctx, traceQuery) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
traceIDs := []model.TraceID{} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You know the length of result slice, so you can preallocate it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree. Thanks. |
||
for _, ID := range esTraceIDs { | ||
if len(traceIDs) >= traceQuery.NumTraces { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is this required? I see that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I studied the implementation of such function for cassandra and there are some troubles with it, therefore the implementation for cassandra uses this condition. It seems that here it is ok. I will remove it. |
||
break | ||
} | ||
|
||
traceID, err := model.TraceIDFromString(ID) | ||
if err != nil { | ||
return nil, err | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: Could you log the traceID string? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good suggestion. Thanks. |
||
} | ||
|
||
traceIDs = append(traceIDs, traceID) | ||
} | ||
|
||
return traceIDs, nil | ||
} | ||
|
||
func (s *SpanReader) multiRead(ctx context.Context, traceIDs []string, startTime, endTime time.Time) ([]*model.Trace, error) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -668,11 +668,142 @@ func TestFindTraceIDs(t *testing.T) { | |
testGet(traceIDAggregation, t) | ||
} | ||
|
||
func TestFindTraceIDNotImplemented(t *testing.T) { | ||
func TestSpanReader_TestFindTraceIDs(t *testing.T) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These tests look very similar to the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, of course, I will make change soon. |
||
goodAggregations := make(map[string]*json.RawMessage) | ||
rawMessage := []byte(`{"buckets": [{"key": "1","doc_count": 16},{"key": "2","doc_count": 16},{"key": "3","doc_count": 16}]}`) | ||
goodAggregations[traceIDAggregation] = (*json.RawMessage)(&rawMessage) | ||
|
||
withSpanReader(func(r *spanReaderTest) { | ||
// find trace IDs | ||
mockSearchService(r).Return(&elastic.SearchResult{Aggregations: elastic.Aggregations(goodAggregations)}, nil) | ||
|
||
traceIDsQuery := &spanstore.TraceQueryParameters{ | ||
ServiceName: serviceName, | ||
Tags: map[string]string{ | ||
"hello": "world", | ||
}, | ||
StartTimeMin: time.Now().Add(-1 * time.Hour), | ||
StartTimeMax: time.Now(), | ||
NumTraces: 1, | ||
} | ||
|
||
traceIDs, err := r.reader.FindTraceIDs(context.Background(), traceIDsQuery) | ||
require.NoError(t, err) | ||
assert.Len(t, traceIDs, 1) | ||
assert.EqualValues(t, 1, traceIDs[0].Low) | ||
}) | ||
} | ||
|
||
func TestSpanReader_FindTraceIDsInvalidQuery(t *testing.T) { | ||
goodAggregations := make(map[string]*json.RawMessage) | ||
rawMessage := []byte(`{"buckets": [{"key": "1","doc_count": 16},{"key": "2","doc_count": 16},{"key": "3","doc_count": 16}]}`) | ||
goodAggregations[traceIDAggregation] = (*json.RawMessage)(&rawMessage) | ||
|
||
withSpanReader(func(r *spanReaderTest) { | ||
traceIDs, err := r.reader.FindTraceIDs(context.Background(), nil) | ||
mockSearchService(r).Return(&elastic.SearchResult{Aggregations: elastic.Aggregations(goodAggregations)}, nil) | ||
|
||
traceIDsQuery := &spanstore.TraceQueryParameters{ | ||
ServiceName: "", | ||
Tags: map[string]string{ | ||
"hello": "world", | ||
}, | ||
StartTimeMin: time.Now().Add(-1 * time.Hour), | ||
StartTimeMax: time.Now(), | ||
} | ||
|
||
traceIDs, err := r.reader.FindTraceIDs(context.Background(), traceIDsQuery) | ||
require.Error(t, err) | ||
assert.Nil(t, traceIDs) | ||
}) | ||
} | ||
|
||
func TestSpanReader_FindTraceIDsAggregationFailure(t *testing.T) { | ||
goodAggregations := make(map[string]*json.RawMessage) | ||
|
||
withSpanReader(func(r *spanReaderTest) { | ||
mockSearchService(r).Return(&elastic.SearchResult{Aggregations: elastic.Aggregations(goodAggregations)}, nil) | ||
|
||
traceIDsQuery := &spanstore.TraceQueryParameters{ | ||
ServiceName: serviceName, | ||
Tags: map[string]string{ | ||
"hello": "world", | ||
}, | ||
StartTimeMin: time.Now().Add(-1 * time.Hour), | ||
StartTimeMax: time.Now(), | ||
} | ||
|
||
traceIDs, err := r.reader.FindTraceIDs(context.Background(), traceIDsQuery) | ||
require.Error(t, err) | ||
assert.Nil(t, traceIDs) | ||
assert.EqualError(t, err, "not implemented") | ||
}) | ||
} | ||
|
||
func TestSpanReader_FindTraceIDsNoTraceIDs(t *testing.T) { | ||
goodAggregations := make(map[string]*json.RawMessage) | ||
rawMessage := []byte(`{"buckets": []}`) | ||
goodAggregations[traceIDAggregation] = (*json.RawMessage)(&rawMessage) | ||
|
||
withSpanReader(func(r *spanReaderTest) { | ||
mockSearchService(r).Return(&elastic.SearchResult{Aggregations: elastic.Aggregations(goodAggregations)}, nil) | ||
|
||
traceIDsQuery := &spanstore.TraceQueryParameters{ | ||
ServiceName: serviceName, | ||
Tags: map[string]string{ | ||
"hello": "world", | ||
}, | ||
StartTimeMin: time.Now().Add(-1 * time.Hour), | ||
StartTimeMax: time.Now(), | ||
} | ||
|
||
traceIDs, err := r.reader.FindTraceIDs(context.Background(), traceIDsQuery) | ||
require.NoError(t, err) | ||
assert.Len(t, traceIDs, 0) | ||
}) | ||
} | ||
|
||
func TestSpanReader_FindTraceIDsReadTraceIDsFailure(t *testing.T) { | ||
goodAggregations := make(map[string]*json.RawMessage) | ||
rawMessage := []byte(`{"buckets": [{"key": "1","doc_count": 16},{"key": "2","doc_count": 16}]}`) | ||
goodAggregations[traceIDAggregation] = (*json.RawMessage)(&rawMessage) | ||
|
||
withSpanReader(func(r *spanReaderTest) { | ||
mockSearchService(r).Return(nil, errors.New("read error")) | ||
|
||
traceIDsQuery := &spanstore.TraceQueryParameters{ | ||
ServiceName: serviceName, | ||
Tags: map[string]string{ | ||
"hello": "world", | ||
}, | ||
StartTimeMin: time.Now().Add(-1 * time.Hour), | ||
StartTimeMax: time.Now(), | ||
} | ||
|
||
traceIDs, err := r.reader.FindTraceIDs(context.Background(), traceIDsQuery) | ||
require.EqualError(t, err, "Search service failed: read error") | ||
assert.Len(t, traceIDs, 0) | ||
}) | ||
} | ||
|
||
func TestSpanReader_FindTraceIDsIncorrectTraceIDFailure(t *testing.T) { | ||
goodAggregations := make(map[string]*json.RawMessage) | ||
rawMessage := []byte(`{"buckets": [{"key": "sdfsdfdssd234nsdvsdfldjsf","doc_count": 16},{"key": "2","doc_count": 16}]}`) | ||
goodAggregations[traceIDAggregation] = (*json.RawMessage)(&rawMessage) | ||
|
||
withSpanReader(func(r *spanReaderTest) { | ||
mockSearchService(r).Return(&elastic.SearchResult{Aggregations: elastic.Aggregations(goodAggregations)}, nil) | ||
|
||
traceIDsQuery := &spanstore.TraceQueryParameters{ | ||
ServiceName: serviceName, | ||
Tags: map[string]string{ | ||
"hello": "world", | ||
}, | ||
StartTimeMin: time.Now().Add(-1 * time.Hour), | ||
StartTimeMax: time.Now(), | ||
} | ||
|
||
traceIDs, err := r.reader.FindTraceIDs(context.Background(), traceIDsQuery) | ||
require.EqualError(t, err, `strconv.ParseUint: parsing "sdfsdfdss": invalid syntax`) | ||
assert.Len(t, traceIDs, 0) | ||
}) | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see a lot of repeated code between
FindTraces
andFindTraceIDs
, are we able to refactor such that the initial validation and retrieval are shared?Also, could you create a new span here - similar to L221
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, thanks.