Skip to content

Commit

Permalink
[dbnode] Fail if FetchTagged partially retrieves results due to error (
Browse files Browse the repository at this point in the history
  • Loading branch information
nbroyles authored Sep 10, 2020
1 parent d0586ca commit 8223666
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 13 deletions.
2 changes: 2 additions & 0 deletions src/dbnode/generated/thrift/rpc.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ struct FetchTaggedIDResult {
2: required binary nameSpace
3: required binary encodedTags
4: optional list<Segments> segments

// Deprecated -- do not use.
5: optional Error err
}

Expand Down
26 changes: 13 additions & 13 deletions src/dbnode/network/server/tchannelthrift/node/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,12 +680,16 @@ func (s *service) fetchTagged(ctx context.Context, db storage.Database, req *rpc
encodedDataResults = make([][][]xio.BlockReader, results.Size())
}
if err := s.fetchReadEncoded(ctx, db, response, results, nsID, nsIDBytes, callStart, opts, fetchData, encodedDataResults); err != nil {
s.metrics.fetchTagged.ReportError(s.nowFn().Sub(callStart))
return nil, err
}

// Step 2: If fetching data read the results of the asynchronuous block readers.
// Step 2: If fetching data read the results of the asynchronous block readers.
if fetchData {
s.fetchReadResults(ctx, response, nsID, encodedDataResults)
if err := s.fetchReadResults(ctx, response, nsID, encodedDataResults); err != nil {
s.metrics.fetchTagged.ReportError(s.nowFn().Sub(callStart))
return nil, err
}
}

s.metrics.fetchTagged.ReportSuccess(s.nowFn().Sub(callStart))
Expand Down Expand Up @@ -723,7 +727,6 @@ func (s *service) fetchReadEncoded(ctx context.Context,
ctx.RegisterFinalizer(enc)
encodedTags, err := s.encodeTags(enc, tags)
if err != nil { // This is an invariant, should never happen
s.metrics.fetchTagged.ReportError(s.nowFn().Sub(callStart))
return tterrors.NewInternalError(err)
}

Expand All @@ -740,19 +743,20 @@ func (s *service) fetchReadEncoded(ctx context.Context,
encoded, err := db.ReadEncoded(ctx, nsID, tsID,
opts.StartInclusive, opts.EndExclusive)
if err != nil {
elem.Err = convert.ToRPCError(err)
return convert.ToRPCError(err)
} else {
encodedDataResults[idx] = encoded
}
}
return nil
}

func (s *service) fetchReadResults(ctx context.Context,
func (s *service) fetchReadResults(
ctx context.Context,
response *rpc.FetchTaggedResult_,
nsID ident.ID,
encodedDataResults [][][]xio.BlockReader,
) {
) error {
ctx, sp, sampled := ctx.StartSampledTraceSpan(tracepoint.FetchReadResults)
if sampled {
sp.LogFields(
Expand All @@ -762,19 +766,15 @@ func (s *service) fetchReadResults(ctx context.Context,
}
defer sp.Finish()

for idx, elem := range response.Elements {
if elem.Err != nil {
continue
}

for idx := range response.Elements {
segments, rpcErr := s.readEncodedResult(ctx, nsID, encodedDataResults[idx])
if rpcErr != nil {
elem.Err = rpcErr
continue
return rpcErr
}

response.Elements[idx].Segments = segments
}
return nil
}

func (s *service) Aggregate(tctx thrift.Context, req *rpc.AggregateQueryRequest) (*rpc.AggregateQueryResult_, error) {
Expand Down
90 changes: 90 additions & 0 deletions src/dbnode/network/server/tchannelthrift/node/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1879,6 +1879,96 @@ func TestServiceFetchTaggedErrs(t *testing.T) {
require.Error(t, err)
}

func TestServiceFetchTaggedReturnOnFirstErr(t *testing.T) {
ctrl := xtest.NewController(t)
defer ctrl.Finish()

mockDB := storage.NewMockDatabase(ctrl)
mockDB.EXPECT().Options().Return(testStorageOpts).AnyTimes()
mockDB.EXPECT().IsOverloaded().Return(false)

service := NewService(mockDB, testTChannelThriftOptions).(*service)

tctx, _ := tchannelthrift.NewContext(time.Minute)
ctx := tchannelthrift.Context(tctx)
defer ctx.Close()

mtr := mocktracer.New()
sp := mtr.StartSpan("root")
ctx.SetGoContext(opentracing.ContextWithSpan(gocontext.Background(), sp))

start := time.Now().Add(-2 * time.Hour)
end := start.Add(2 * time.Hour)
start, end = start.Truncate(time.Second), end.Truncate(time.Second)

nsID := "metrics"

id := "foo"
s := []struct {
t time.Time
v float64
}{
{start.Add(10 * time.Second), 1.0},
{start.Add(20 * time.Second), 2.0},
}
enc := testStorageOpts.EncoderPool().Get()
enc.Reset(start, 0, nil)
for _, v := range s {
dp := ts.Datapoint{
Timestamp: v.t,
Value: v.v,
}
require.NoError(t, enc.Encode(dp, xtime.Second, nil))
}

stream, _ := enc.Stream(ctx)
mockDB.EXPECT().
ReadEncoded(gomock.Any(), ident.NewIDMatcher(nsID), ident.NewIDMatcher(id), start, end).
Return([][]xio.BlockReader{{
xio.BlockReader{
SegmentReader: stream,
},
}}, fmt.Errorf("random err")) // Return error that should trigger failure of the entire call

req, err := idx.NewRegexpQuery([]byte("foo"), []byte("b.*"))
require.NoError(t, err)
qry := index.Query{Query: req}

resMap := index.NewQueryResults(ident.StringID(nsID),
index.QueryResultsOptions{}, testIndexOptions)
resMap.Map().Set(ident.StringID("foo"), ident.NewTagsIterator(ident.NewTags(
ident.StringTag("foo", "bar"),
ident.StringTag("baz", "dxk"),
)))

mockDB.EXPECT().QueryIDs(
gomock.Any(),
ident.NewIDMatcher(nsID),
index.NewQueryMatcher(qry),
index.QueryOptions{
StartInclusive: start,
EndExclusive: end,
SeriesLimit: 10,
}).Return(index.QueryResult{Results: resMap, Exhaustive: true}, nil)

startNanos, err := convert.ToValue(start, rpc.TimeType_UNIX_NANOSECONDS)
require.NoError(t, err)
endNanos, err := convert.ToValue(end, rpc.TimeType_UNIX_NANOSECONDS)
require.NoError(t, err)
var limit int64 = 10
data, err := idx.Marshal(req)
require.NoError(t, err)
_, err = service.FetchTagged(tctx, &rpc.FetchTaggedRequest{
NameSpace: []byte(nsID),
Query: data,
RangeStart: startNanos,
RangeEnd: endNanos,
FetchData: true,
Limit: &limit,
})
require.Error(t, err)
}

func TestServiceAggregate(t *testing.T) {
ctrl := xtest.NewController(t)
defer ctrl.Finish()
Expand Down

0 comments on commit 8223666

Please sign in to comment.