Skip to content

Commit

Permalink
[dbnode/query] Optionally error if query exceeds series limit via Req…
Browse files Browse the repository at this point in the history
…uireExhaustive config (#2400)
  • Loading branch information
rallen090 authored Jun 11, 2020
1 parent ccf5669 commit 44d463a
Show file tree
Hide file tree
Showing 11 changed files with 293 additions and 131 deletions.
9 changes: 7 additions & 2 deletions src/cmd/services/m3query/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,9 @@ type PerQueryLimitsConfiguration struct {

// MaxFetchedSeries limits the number of time series returned by a storage node.
MaxFetchedSeries int `yaml:"maxFetchedSeries"`

// RequireExhaustive results in an error if the query exceeds the series limit.
RequireExhaustive bool `yaml:"requireExhaustive"`
}

// AsLimitManagerOptions converts this configuration to
Expand All @@ -306,12 +309,14 @@ func (l *PerQueryLimitsConfiguration) AsLimitManagerOptions() cost.LimitManagerO
func (l *PerQueryLimitsConfiguration) AsFetchOptionsBuilderOptions() handleroptions.FetchOptionsBuilderOptions {
if l.MaxFetchedSeries <= 0 {
return handleroptions.FetchOptionsBuilderOptions{
Limit: defaultStorageQueryLimit,
Limit: defaultStorageQueryLimit,
RequireExhaustive: l.RequireExhaustive,
}
}

return handleroptions.FetchOptionsBuilderOptions{
Limit: int(l.MaxFetchedSeries),
Limit: int(l.MaxFetchedSeries),
RequireExhaustive: l.RequireExhaustive,
}
}

Expand Down
1 change: 1 addition & 0 deletions src/dbnode/generated/thrift/rpc.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ struct FetchTaggedRequest {
5: required bool fetchData
6: optional i64 limit
7: optional TimeType rangeTimeType = TimeType.UNIX_SECONDS
8: optional bool requireExhaustive = false
}

struct FetchTaggedResult {
Expand Down
57 changes: 50 additions & 7 deletions src/dbnode/generated/thrift/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3049,14 +3049,16 @@ func (p *Segment) String() string {
// - FetchData
// - Limit
// - RangeTimeType
// - RequireExhaustive
type FetchTaggedRequest struct {
NameSpace []byte `thrift:"nameSpace,1,required" db:"nameSpace" json:"nameSpace"`
Query []byte `thrift:"query,2,required" db:"query" json:"query"`
RangeStart int64 `thrift:"rangeStart,3,required" db:"rangeStart" json:"rangeStart"`
RangeEnd int64 `thrift:"rangeEnd,4,required" db:"rangeEnd" json:"rangeEnd"`
FetchData bool `thrift:"fetchData,5,required" db:"fetchData" json:"fetchData"`
Limit *int64 `thrift:"limit,6" db:"limit" json:"limit,omitempty"`
RangeTimeType TimeType `thrift:"rangeTimeType,7" db:"rangeTimeType" json:"rangeTimeType,omitempty"`
NameSpace []byte `thrift:"nameSpace,1,required" db:"nameSpace" json:"nameSpace"`
Query []byte `thrift:"query,2,required" db:"query" json:"query"`
RangeStart int64 `thrift:"rangeStart,3,required" db:"rangeStart" json:"rangeStart"`
RangeEnd int64 `thrift:"rangeEnd,4,required" db:"rangeEnd" json:"rangeEnd"`
FetchData bool `thrift:"fetchData,5,required" db:"fetchData" json:"fetchData"`
Limit *int64 `thrift:"limit,6" db:"limit" json:"limit,omitempty"`
RangeTimeType TimeType `thrift:"rangeTimeType,7" db:"rangeTimeType" json:"rangeTimeType,omitempty"`
RequireExhaustive bool `thrift:"requireExhaustive,8" db:"requireExhaustive" json:"requireExhaustive,omitempty"`
}

func NewFetchTaggedRequest() *FetchTaggedRequest {
Expand Down Expand Up @@ -3099,6 +3101,12 @@ var FetchTaggedRequest_RangeTimeType_DEFAULT TimeType = 0
func (p *FetchTaggedRequest) GetRangeTimeType() TimeType {
return p.RangeTimeType
}

var FetchTaggedRequest_RequireExhaustive_DEFAULT bool = false

func (p *FetchTaggedRequest) GetRequireExhaustive() bool {
return p.RequireExhaustive
}
func (p *FetchTaggedRequest) IsSetLimit() bool {
return p.Limit != nil
}
Expand All @@ -3107,6 +3115,10 @@ func (p *FetchTaggedRequest) IsSetRangeTimeType() bool {
return p.RangeTimeType != FetchTaggedRequest_RangeTimeType_DEFAULT
}

func (p *FetchTaggedRequest) IsSetRequireExhaustive() bool {
return p.RequireExhaustive != FetchTaggedRequest_RequireExhaustive_DEFAULT
}

func (p *FetchTaggedRequest) Read(iprot thrift.TProtocol) error {
if _, err := iprot.ReadStructBegin(); err != nil {
return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err)
Expand Down Expand Up @@ -3160,6 +3172,10 @@ func (p *FetchTaggedRequest) Read(iprot thrift.TProtocol) error {
if err := p.ReadField7(iprot); err != nil {
return err
}
case 8:
if err := p.ReadField8(iprot); err != nil {
return err
}
default:
if err := iprot.Skip(fieldTypeId); err != nil {
return err
Expand Down Expand Up @@ -3254,6 +3270,15 @@ func (p *FetchTaggedRequest) ReadField7(iprot thrift.TProtocol) error {
return nil
}

func (p *FetchTaggedRequest) ReadField8(iprot thrift.TProtocol) error {
if v, err := iprot.ReadBool(); err != nil {
return thrift.PrependError("error reading field 8: ", err)
} else {
p.RequireExhaustive = v
}
return nil
}

func (p *FetchTaggedRequest) Write(oprot thrift.TProtocol) error {
if err := oprot.WriteStructBegin("FetchTaggedRequest"); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err)
Expand All @@ -3280,6 +3305,9 @@ func (p *FetchTaggedRequest) Write(oprot thrift.TProtocol) error {
if err := p.writeField7(oprot); err != nil {
return err
}
if err := p.writeField8(oprot); err != nil {
return err
}
}
if err := oprot.WriteFieldStop(); err != nil {
return thrift.PrependError("write field stop error: ", err)
Expand Down Expand Up @@ -3385,6 +3413,21 @@ func (p *FetchTaggedRequest) writeField7(oprot thrift.TProtocol) (err error) {
return err
}

func (p *FetchTaggedRequest) writeField8(oprot thrift.TProtocol) (err error) {
if p.IsSetRequireExhaustive() {
if err := oprot.WriteFieldBegin("requireExhaustive", thrift.BOOL, 8); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write field begin error 8:requireExhaustive: ", p), err)
}
if err := oprot.WriteBool(bool(p.RequireExhaustive)); err != nil {
return thrift.PrependError(fmt.Sprintf("%T.requireExhaustive (8) field write error: ", p), err)
}
if err := oprot.WriteFieldEnd(); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write field end error 8:requireExhaustive: ", p), err)
}
}
return err
}

func (p *FetchTaggedRequest) String() string {
if p == nil {
return "<nil>"
Expand Down
53 changes: 50 additions & 3 deletions src/dbnode/storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1210,9 +1210,32 @@ func (i *nsIndex) query(
exhaustive, err := i.queryWithSpan(ctx, query, results, opts, execBlockFn, sp, logFields)
if err != nil {
sp.LogFields(opentracinglog.Error(err))

if exhaustive {
i.metrics.queryExhaustiveInternalError.Inc(1)
} else {
i.metrics.queryNonExhaustiveInternalError.Inc(1)
}
return exhaustive, err
}

if exhaustive {
i.metrics.queryExhaustiveSuccess.Inc(1)
return exhaustive, nil
}

return exhaustive, err
// If require exhaustive but not, return error.
if opts.RequireExhaustive {
i.metrics.queryNonExhaustiveLimitError.Inc(1)
return exhaustive, fmt.Errorf("query matched too many time series: require_exhaustive=%v, limit=%d, matched=%d",
opts.RequireExhaustive,
opts.Limit,
results.Size())
}

// Otherwise non-exhaustive but not required to be.
i.metrics.queryNonExhaustiveSuccess.Inc(1)
return exhaustive, nil
}

func (i *nsIndex) queryWithSpan(
Expand Down Expand Up @@ -1370,7 +1393,6 @@ func (i *nsIndex) queryWithSpan(
if err != nil {
return false, err
}

return exhaustive, nil
}

Expand Down Expand Up @@ -1847,7 +1869,12 @@ type nsIndexMetrics struct {
blocksEvictedMutableSegments tally.Counter
blockMetrics nsIndexBlocksMetrics

loadedDocsPerQuery tally.Histogram
loadedDocsPerQuery tally.Histogram
queryExhaustiveSuccess tally.Counter
queryExhaustiveInternalError tally.Counter
queryNonExhaustiveSuccess tally.Counter
queryNonExhaustiveInternalError tally.Counter
queryNonExhaustiveLimitError tally.Counter
}

func newNamespaceIndexMetrics(
Expand Down Expand Up @@ -1897,6 +1924,26 @@ func newNamespaceIndexMetrics(
"loaded-docs-per-query",
tally.MustMakeExponentialValueBuckets(10, 2, 16),
),
queryExhaustiveSuccess: scope.Tagged(map[string]string{
"exhaustive": "true",
"result": "success",
}).Counter("query"),
queryExhaustiveInternalError: scope.Tagged(map[string]string{
"exhaustive": "true",
"result": "error_internal",
}).Counter("query"),
queryNonExhaustiveSuccess: scope.Tagged(map[string]string{
"exhaustive": "false",
"result": "success",
}).Counter("query"),
queryNonExhaustiveInternalError: scope.Tagged(map[string]string{
"exhaustive": "false",
"result": "error_internal",
}).Counter("query"),
queryNonExhaustiveLimitError: scope.Tagged(map[string]string{
"exhaustive": "false",
"result": "error_require_exhaustive",
}).Counter("query"),
}
}

Expand Down
9 changes: 5 additions & 4 deletions src/dbnode/storage/index/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,11 @@ type Query struct {
// QueryOptions enables users to specify constraints and
// preferences on query execution.
type QueryOptions struct {
StartInclusive time.Time
EndExclusive time.Time
Limit int
IterationOptions IterationOptions
StartInclusive time.Time
EndExclusive time.Time
Limit int
RequireExhaustive bool
IterationOptions IterationOptions
}

// IterationOptions enables users to specify iteration preferences.
Expand Down
Loading

0 comments on commit 44d463a

Please sign in to comment.