diff --git a/src/cmd/services/m3dbnode/config/config_test.go b/src/cmd/services/m3dbnode/config/config_test.go index 6db0bc5980..1a7210267d 100644 --- a/src/cmd/services/m3dbnode/config/config_test.go +++ b/src/cmd/services/m3dbnode/config/config_test.go @@ -719,6 +719,7 @@ func TestConfiguration(t *testing.T) { meta_event_reporting_enabled: false limits: maxRecentlyQueriedSeriesBlocks: null + maxRecentlyQueriedSeriesDiskBytesRead: null maxOutstandingWriteRequests: 0 maxOutstandingReadRequests: 0 maxOutstandingRepairedBytes: 0 diff --git a/src/cmd/services/m3dbnode/config/limits.go b/src/cmd/services/m3dbnode/config/limits.go index f2edac16ba..98e649457a 100644 --- a/src/cmd/services/m3dbnode/config/limits.go +++ b/src/cmd/services/m3dbnode/config/limits.go @@ -27,7 +27,12 @@ type LimitsConfiguration struct { // MaxRecentlyQueriedSeriesBlocks sets the upper limit on time series blocks // count within a given lookback period. Queries which are issued while this // max is surpassed encounter an error. - MaxRecentlyQueriedSeriesBlocks *MaxRecentlyQueriedSeriesBlocksConfiguration `yaml:"maxRecentlyQueriedSeriesBlocks"` + MaxRecentlyQueriedSeriesBlocks *MaxRecentQueryResourceLimitConfiguration `yaml:"maxRecentlyQueriedSeriesBlocks"` + + // MaxRecentlyQueriedSeriesDiskBytesRead sets the upper limit on time series bytes + // read from disk within a given lookback period. Queries which are issued while this + // max is surpassed encounter an error. + MaxRecentlyQueriedSeriesDiskBytesRead *MaxRecentQueryResourceLimitConfiguration `yaml:"maxRecentlyQueriedSeriesDiskBytesRead"` // MaxOutstandingWriteRequests controls the maximum number of outstanding write requests // that the server will allow before it begins rejecting requests. Note that this value @@ -55,14 +60,12 @@ type LimitsConfiguration struct { MaxEncodersPerBlock int `yaml:"maxEncodersPerBlock" validate:"min=0"` } -// MaxRecentlyQueriedSeriesBlocksConfiguration sets the upper limit on time -// series blocks count within a given lookback period. Queries which are issued -// while this max is surpassed encounter an error. -type MaxRecentlyQueriedSeriesBlocksConfiguration struct { - // Value sets the max recently queried time series blocks for the given - // time window. +// MaxRecentQueryResourceLimitConfiguration sets an upper limit on resources consumed by all queries +// globally within a dbnode per some lookback period of time. Once exceeded, queries within that period +// of time will be abandoned. +type MaxRecentQueryResourceLimitConfiguration struct { + // Value sets the max value for the resource limit. Value int64 `yaml:"value" validate:"min=0"` - // Lookback is the period to time window the max value of time series - // blocks allowed to be queried. + // Lookback is the period in which a given resource limit is enforced. Lookback time.Duration `yaml:"lookback" validate:"min=0"` } diff --git a/src/dbnode/persist/fs/retriever.go b/src/dbnode/persist/fs/retriever.go index 19e28763df..7332e4705a 100644 --- a/src/dbnode/persist/fs/retriever.go +++ b/src/dbnode/persist/fs/retriever.go @@ -41,6 +41,7 @@ import ( "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/sharding" "github.com/m3db/m3/src/dbnode/storage/block" + "github.com/m3db/m3/src/dbnode/storage/limits" "github.com/m3db/m3/src/dbnode/ts" "github.com/m3db/m3/src/dbnode/x/xio" "github.com/m3db/m3/src/x/checked" @@ -79,9 +80,11 @@ const ( type blockRetriever struct { sync.RWMutex - opts BlockRetrieverOptions - fsOpts Options - logger *zap.Logger + opts BlockRetrieverOptions + fsOpts Options + logger *zap.Logger + queryLimits limits.QueryLimits + bytesReadLimit limits.LookbackLimit newSeekerMgrFn newSeekerMgrFn @@ -113,6 +116,8 @@ func NewBlockRetriever( opts: opts, fsOpts: fsOpts, logger: fsOpts.InstrumentOptions().Logger(), + queryLimits: opts.QueryLimits(), + bytesReadLimit: opts.QueryLimits().BytesReadLimit(), newSeekerMgrFn: NewSeekerManager, reqPool: opts.RetrieveRequestPool(), bytesPool: opts.BytesPool(), @@ -288,6 +293,13 @@ func (r *blockRetriever) fetchBatch( reqs []*retrieveRequest, seekerResources ReusableSeekerResources, ) { + if err := r.queryLimits.AnyExceeded(); err != nil { + for _, req := range reqs { + req.onError(err) + } + return + } + // Resolve the seeker from the seeker mgr seeker, err := seekerMgr.Borrow(shard, blockStart) if err != nil { @@ -299,13 +311,25 @@ func (r *blockRetriever) fetchBatch( // Sort the requests by offset into the file before seeking // to ensure all seeks are in ascending order + var limitErr error for _, req := range reqs { + if limitErr != nil { + req.onError(limitErr) + continue + } + entry, err := seeker.SeekIndexEntry(req.id, seekerResources) if err != nil && err != errSeekIDNotFound { req.onError(err) continue } + if err := r.bytesReadLimit.Inc(int(entry.Size)); err != nil { + req.onError(err) + limitErr = err + continue + } + if err == errSeekIDNotFound { req.notFound = true } diff --git a/src/dbnode/persist/fs/retriever_options.go b/src/dbnode/persist/fs/retriever_options.go index 7bc9947406..8e0d9920b2 100644 --- a/src/dbnode/persist/fs/retriever_options.go +++ b/src/dbnode/persist/fs/retriever_options.go @@ -25,6 +25,7 @@ import ( "runtime" "github.com/m3db/m3/src/dbnode/storage/block" + "github.com/m3db/m3/src/dbnode/storage/limits" "github.com/m3db/m3/src/dbnode/x/xio" "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/pool" @@ -43,6 +44,7 @@ type blockRetrieverOptions struct { fetchConcurrency int identifierPool ident.Pool blockLeaseManager block.LeaseManager + queryLimits limits.QueryLimits } // NewBlockRetrieverOptions creates a new set of block retriever options @@ -65,6 +67,7 @@ func NewBlockRetrieverOptions() BlockRetrieverOptions { bytesPool: bytesPool, fetchConcurrency: defaultFetchConcurrency, identifierPool: ident.NewPool(bytesPool, ident.PoolOptions{}), + queryLimits: limits.NoOpQueryLimits(), } return o @@ -126,3 +129,13 @@ func (o *blockRetrieverOptions) SetBlockLeaseManager(leaseMgr block.LeaseManager func (o *blockRetrieverOptions) BlockLeaseManager() block.LeaseManager { return o.blockLeaseManager } + +func (o *blockRetrieverOptions) SetQueryLimits(value limits.QueryLimits) BlockRetrieverOptions { + opts := *o + opts.queryLimits = value + return &opts +} + +func (o *blockRetrieverOptions) QueryLimits() limits.QueryLimits { + return o.queryLimits +} diff --git a/src/dbnode/persist/fs/types.go b/src/dbnode/persist/fs/types.go index 3c1ed6154e..6997b35c17 100644 --- a/src/dbnode/persist/fs/types.go +++ b/src/dbnode/persist/fs/types.go @@ -34,6 +34,7 @@ import ( "github.com/m3db/m3/src/dbnode/sharding" "github.com/m3db/m3/src/dbnode/storage/block" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" + "github.com/m3db/m3/src/dbnode/storage/limits" "github.com/m3db/m3/src/dbnode/x/xio" "github.com/m3db/m3/src/m3ninx/doc" "github.com/m3db/m3/src/m3ninx/index/segment/fst" @@ -561,6 +562,12 @@ type BlockRetrieverOptions interface { // BlockLeaseManager returns the block leaser. BlockLeaseManager() block.LeaseManager + + // SetQueryLimits sets query limits. + SetQueryLimits(value limits.QueryLimits) BlockRetrieverOptions + + // QueryLimits returns the query limits. + QueryLimits() limits.QueryLimits } // ForEachRemainingFn is the function that is run on each of the remaining diff --git a/src/dbnode/server/server.go b/src/dbnode/server/server.go index da343e8a86..b8f48cce49 100644 --- a/src/dbnode/server/server.go +++ b/src/dbnode/server/server.go @@ -65,8 +65,8 @@ import ( "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/dbnode/storage/cluster" "github.com/m3db/m3/src/dbnode/storage/index" + "github.com/m3db/m3/src/dbnode/storage/limits" "github.com/m3db/m3/src/dbnode/storage/series" - "github.com/m3db/m3/src/dbnode/storage/stats" "github.com/m3db/m3/src/dbnode/topology" "github.com/m3db/m3/src/dbnode/ts" "github.com/m3db/m3/src/dbnode/ts/writes" @@ -138,9 +138,6 @@ type RunOptions struct { // interrupt and shutdown the server. InterruptCh <-chan error - // QueryStatsTrackerFn returns a tracker for tracking query stats. - QueryStatsTrackerFn func(instrument.Options, stats.QueryStatsOptions) stats.QueryStatsTracker - // CustomOptions are custom options to apply to the session. CustomOptions []client.CustomAdminOption @@ -410,27 +407,22 @@ func Run(runOpts RunOptions) { defer stopReporting() // Setup query stats tracking. - statsOpts := stats.QueryStatsOptions{ - Lookback: stats.DefaultLookback, - } - if max := runOpts.Config.Limits.MaxRecentlyQueriedSeriesBlocks; max != nil { - statsOpts = stats.QueryStatsOptions{ - MaxDocs: max.Value, - Lookback: max.Lookback, - } + docsLimit := limits.DefaultLookbackLimitOptions() + bytesReadLimit := limits.DefaultLookbackLimitOptions() + if limitConfig := runOpts.Config.Limits.MaxRecentlyQueriedSeriesBlocks; limitConfig != nil { + docsLimit.Limit = limitConfig.Value + docsLimit.Lookback = limitConfig.Lookback } - if err := statsOpts.Validate(); err != nil { - logger.Fatal("could not construct query stats options from config", zap.Error(err)) + if limitConfig := runOpts.Config.Limits.MaxRecentlyQueriedSeriesDiskBytesRead; limitConfig != nil { + bytesReadLimit.Limit = limitConfig.Value + bytesReadLimit.Lookback = limitConfig.Lookback } - - tracker := stats.DefaultQueryStatsTracker(iopts, statsOpts) - if runOpts.QueryStatsTrackerFn != nil { - tracker = runOpts.QueryStatsTrackerFn(iopts, statsOpts) + queryLimits, err := limits.NewQueryLimits(iopts, docsLimit, bytesReadLimit) + if err != nil { + logger.Fatal("could not construct docs query limits from config", zap.Error(err)) } - - queryStats := stats.NewQueryStats(tracker) - queryStats.Start() - defer queryStats.Stop() + queryLimits.Start() + defer queryLimits.Stop() // FOLLOWUP(prateek): remove this once we have the runtime options<->index wiring done indexOpts := opts.IndexOptions() @@ -445,7 +437,7 @@ func Run(runOpts RunOptions) { CacheTerms: plCacheConfig.CacheTermsOrDefault(), }). SetMmapReporter(mmapReporter). - SetQueryStats(queryStats) + SetQueryLimits(queryLimits) opts = opts.SetIndexOptions(indexOpts) if tick := cfg.Tick; tick != nil { @@ -559,7 +551,8 @@ func Run(runOpts RunOptions) { SetBytesPool(opts.BytesPool()). SetRetrieveRequestPool(opts.RetrieveRequestPool()). SetIdentifierPool(opts.IdentifierPool()). - SetBlockLeaseManager(blockLeaseManager) + SetBlockLeaseManager(blockLeaseManager). + SetQueryLimits(queryLimits) if blockRetrieveCfg := cfg.BlockRetrieve; blockRetrieveCfg != nil { retrieverOpts = retrieverOpts. SetFetchConcurrency(blockRetrieveCfg.FetchConcurrency) diff --git a/src/dbnode/storage/database.go b/src/dbnode/storage/database.go index f1e2670951..b6241b95b2 100644 --- a/src/dbnode/storage/database.go +++ b/src/dbnode/storage/database.go @@ -35,6 +35,7 @@ import ( "github.com/m3db/m3/src/dbnode/storage/block" dberrors "github.com/m3db/m3/src/dbnode/storage/errors" "github.com/m3db/m3/src/dbnode/storage/index" + "github.com/m3db/m3/src/dbnode/storage/limits" "github.com/m3db/m3/src/dbnode/tracepoint" "github.com/m3db/m3/src/dbnode/ts" "github.com/m3db/m3/src/dbnode/ts/writes" @@ -114,6 +115,8 @@ type db struct { log *zap.Logger writeBatchPool *writes.WriteBatchPool + + queryLimits limits.QueryLimits } type databaseMetrics struct { @@ -186,6 +189,7 @@ func NewDatabase( metrics: newDatabaseMetrics(scope), log: logger, writeBatchPool: opts.WriteBatchPool(), + queryLimits: opts.IndexOptions().QueryLimits(), } databaseIOpts := iopts.SetMetricsScope(scope) @@ -826,6 +830,12 @@ func (d *db) QueryIDs( } defer sp.Finish() + // Check if exceeding query limits at very beginning of + // query path to abandon as early as possible. + if err := d.queryLimits.AnyExceeded(); err != nil { + return index.QueryResult{}, err + } + n, err := d.namespaceFor(namespace) if err != nil { sp.LogFields(opentracinglog.Error(err)) diff --git a/src/dbnode/storage/index/block.go b/src/dbnode/storage/index/block.go index d7f2668fd8..2fab38bdb0 100644 --- a/src/dbnode/storage/index/block.go +++ b/src/dbnode/storage/index/block.go @@ -30,7 +30,7 @@ import ( "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" - "github.com/m3db/m3/src/dbnode/storage/stats" + "github.com/m3db/m3/src/dbnode/storage/limits" "github.com/m3db/m3/src/dbnode/tracepoint" "github.com/m3db/m3/src/m3ninx/doc" m3ninxindex "github.com/m3db/m3/src/m3ninx/index" @@ -141,7 +141,8 @@ type block struct { blockOpts BlockOptions nsMD namespace.Metadata namespaceRuntimeOptsMgr namespace.RuntimeOptionsManager - queryStats stats.QueryStats + queryLimits limits.QueryLimits + docsLimit limits.LookbackLimit metrics blockMetrics logger *zap.Logger @@ -249,7 +250,8 @@ func NewBlock( namespaceRuntimeOptsMgr: namespaceRuntimeOptsMgr, metrics: newBlockMetrics(scope), logger: iopts.Logger(), - queryStats: opts.QueryStats(), + queryLimits: opts.QueryLimits(), + docsLimit: opts.QueryLimits().DocsLimit(), } b.newFieldsAndTermsIteratorFn = newFieldsAndTermsIterator b.newExecutorWithRLockFn = b.executorWithRLock @@ -533,7 +535,7 @@ func (b *block) addQueryResults( batch []doc.Document, ) ([]doc.Document, int, int, error) { // update recently queried docs to monitor memory. - if err := b.queryStats.Update(len(batch)); err != nil { + if err := b.docsLimit.Inc(len(batch)); err != nil { return batch, 0, 0, err } @@ -794,7 +796,7 @@ func (b *block) addAggregateResults( batch []AggregateResultsEntry, ) ([]AggregateResultsEntry, int, int, error) { // update recently queried docs to monitor memory. - if err := b.queryStats.Update(len(batch)); err != nil { + if err := b.docsLimit.Inc(len(batch)); err != nil { return batch, 0, 0, err } diff --git a/src/dbnode/storage/index/index_mock.go b/src/dbnode/storage/index/index_mock.go index 3498361b80..cf36717d23 100644 --- a/src/dbnode/storage/index/index_mock.go +++ b/src/dbnode/storage/index/index_mock.go @@ -31,7 +31,7 @@ import ( "github.com/m3db/m3/src/dbnode/clock" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/dbnode/storage/index/compaction" - "github.com/m3db/m3/src/dbnode/storage/stats" + "github.com/m3db/m3/src/dbnode/storage/limits" "github.com/m3db/m3/src/m3ninx/doc" "github.com/m3db/m3/src/m3ninx/index/segment" "github.com/m3db/m3/src/m3ninx/index/segment/builder" @@ -1667,30 +1667,30 @@ func (mr *MockOptionsMockRecorder) MmapReporter() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MmapReporter", reflect.TypeOf((*MockOptions)(nil).MmapReporter)) } -// SetQueryStats mocks base method -func (m *MockOptions) SetQueryStats(value stats.QueryStats) Options { +// SetQueryLimits mocks base method +func (m *MockOptions) SetQueryLimits(value limits.QueryLimits) Options { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "SetQueryStats", value) + ret := m.ctrl.Call(m, "SetQueryLimits", value) ret0, _ := ret[0].(Options) return ret0 } -// SetQueryStats indicates an expected call of SetQueryStats -func (mr *MockOptionsMockRecorder) SetQueryStats(value interface{}) *gomock.Call { +// SetQueryLimits indicates an expected call of SetQueryLimits +func (mr *MockOptionsMockRecorder) SetQueryLimits(value interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetQueryStats", reflect.TypeOf((*MockOptions)(nil).SetQueryStats), value) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetQueryLimits", reflect.TypeOf((*MockOptions)(nil).SetQueryLimits), value) } -// QueryStats mocks base method -func (m *MockOptions) QueryStats() stats.QueryStats { +// QueryLimits mocks base method +func (m *MockOptions) QueryLimits() limits.QueryLimits { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "QueryStats") - ret0, _ := ret[0].(stats.QueryStats) + ret := m.ctrl.Call(m, "QueryLimits") + ret0, _ := ret[0].(limits.QueryLimits) return ret0 } -// QueryStats indicates an expected call of QueryStats -func (mr *MockOptionsMockRecorder) QueryStats() *gomock.Call { +// QueryLimits indicates an expected call of QueryLimits +func (mr *MockOptionsMockRecorder) QueryLimits() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "QueryStats", reflect.TypeOf((*MockOptions)(nil).QueryStats)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "QueryLimits", reflect.TypeOf((*MockOptions)(nil).QueryLimits)) } diff --git a/src/dbnode/storage/index/options.go b/src/dbnode/storage/index/options.go index 15f7738d16..ba5243272e 100644 --- a/src/dbnode/storage/index/options.go +++ b/src/dbnode/storage/index/options.go @@ -25,7 +25,7 @@ import ( "github.com/m3db/m3/src/dbnode/clock" "github.com/m3db/m3/src/dbnode/storage/index/compaction" - "github.com/m3db/m3/src/dbnode/storage/stats" + "github.com/m3db/m3/src/dbnode/storage/limits" "github.com/m3db/m3/src/m3ninx/doc" "github.com/m3db/m3/src/m3ninx/index/segment/builder" "github.com/m3db/m3/src/m3ninx/index/segment/fst" @@ -68,7 +68,7 @@ var ( errOptionsAggResultsEntryPoolUnspecified = errors.New("aggregate results entry array pool is unset") errIDGenerationDisabled = errors.New("id generation is disabled") errPostingsListCacheUnspecified = errors.New("postings list cache is unset") - errOptionsQueryStatsUnspecified = errors.New("query stats is unset") + errOptionsQueryLimitsUnspecified = errors.New("query limits is unset") defaultForegroundCompactionOpts compaction.PlannerOptions defaultBackgroundCompactionOpts compaction.PlannerOptions @@ -124,7 +124,7 @@ type opts struct { postingsListCache *PostingsListCache readThroughSegmentOptions ReadThroughSegmentOptions mmapReporter mmap.Reporter - queryStats stats.QueryStats + queryLimits limits.QueryLimits } var undefinedUUIDFn = func() ([]byte, error) { return nil, errIDGenerationDisabled } @@ -175,7 +175,7 @@ func NewOptions() Options { aggResultsEntryArrayPool: aggResultsEntryArrayPool, foregroundCompactionPlannerOpts: defaultForegroundCompactionOpts, backgroundCompactionPlannerOpts: defaultBackgroundCompactionOpts, - queryStats: stats.NoOpQueryStats(), + queryLimits: limits.NoOpQueryLimits(), } resultsPool.Init(func() QueryResults { return NewQueryResults(nil, QueryResultsOptions{}, opts) @@ -212,9 +212,6 @@ func (o *opts) Validate() error { if o.postingsListCache == nil { return errPostingsListCacheUnspecified } - if o.queryStats == nil { - return errOptionsQueryStatsUnspecified - } return nil } @@ -422,12 +419,12 @@ func (o *opts) MmapReporter() mmap.Reporter { return o.mmapReporter } -func (o *opts) SetQueryStats(value stats.QueryStats) Options { +func (o *opts) SetQueryLimits(value limits.QueryLimits) Options { opts := *o - opts.queryStats = value + opts.queryLimits = value return &opts } -func (o *opts) QueryStats() stats.QueryStats { - return o.queryStats +func (o *opts) QueryLimits() limits.QueryLimits { + return o.queryLimits } diff --git a/src/dbnode/storage/index/types.go b/src/dbnode/storage/index/types.go index 4a667f57a6..cc0366e975 100644 --- a/src/dbnode/storage/index/types.go +++ b/src/dbnode/storage/index/types.go @@ -29,7 +29,7 @@ import ( "github.com/m3db/m3/src/dbnode/encoding" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/dbnode/storage/index/compaction" - "github.com/m3db/m3/src/dbnode/storage/stats" + "github.com/m3db/m3/src/dbnode/storage/limits" "github.com/m3db/m3/src/m3ninx/doc" "github.com/m3db/m3/src/m3ninx/idx" "github.com/m3db/m3/src/m3ninx/index/segment" @@ -962,9 +962,9 @@ type Options interface { // MmapReporter returns the mmap reporter. MmapReporter() mmap.Reporter - // SetQueryStatsTracker sets current query stats. - SetQueryStats(value stats.QueryStats) Options + // SetQueryLimits sets current query limits. + SetQueryLimits(value limits.QueryLimits) Options - // QueryStats returns the current query stats. - QueryStats() stats.QueryStats + // QueryLimits returns the current query limits. + QueryLimits() limits.QueryLimits } diff --git a/src/dbnode/storage/limits/noop_query_limits.go b/src/dbnode/storage/limits/noop_query_limits.go new file mode 100644 index 0000000000..533c7cd2d1 --- /dev/null +++ b/src/dbnode/storage/limits/noop_query_limits.go @@ -0,0 +1,76 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package limits + +type noOpQueryLimits struct { +} + +type noOpLookbackLimit struct { +} + +var ( + _ QueryLimits = (*noOpQueryLimits)(nil) + _ LookbackLimit = (*noOpLookbackLimit)(nil) +) + +// NoOpQueryLimits returns inactive query limits. +func NoOpQueryLimits() QueryLimits { + return &noOpQueryLimits{} +} + +func (q *noOpQueryLimits) DocsLimit() LookbackLimit { + return &noOpLookbackLimit{} +} + +func (q *noOpQueryLimits) BytesReadLimit() LookbackLimit { + return &noOpLookbackLimit{} +} + +func (q *noOpQueryLimits) AnyExceeded() error { + return nil +} + +func (q *noOpQueryLimits) Stop() { +} + +func (q *noOpQueryLimits) Start() { +} + +func (q *noOpLookbackLimit) Inc(int) error { + return nil +} + +func (q *noOpLookbackLimit) Exceeded() error { + return nil +} + +func (q *noOpLookbackLimit) Stop() { +} + +func (q *noOpLookbackLimit) Start() { +} + +func (q *noOpLookbackLimit) current() int64 { + return 0 +} + +func (q *noOpLookbackLimit) reset() { +} diff --git a/src/dbnode/storage/limits/query_limits.go b/src/dbnode/storage/limits/query_limits.go new file mode 100644 index 0000000000..191669dc20 --- /dev/null +++ b/src/dbnode/storage/limits/query_limits.go @@ -0,0 +1,220 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package limits + +import ( + "fmt" + "time" + + "github.com/m3db/m3/src/x/instrument" + "github.com/uber-go/tally" + "go.uber.org/atomic" +) + +const defaultLookback = time.Second * 15 + +type queryLimits struct { + docsLimit *lookbackLimit + bytesReadLimit *lookbackLimit +} + +type lookbackLimit struct { + name string + options LookbackLimitOptions + metrics lookbackLimitMetrics + recent *atomic.Int64 + stopCh chan struct{} +} + +type lookbackLimitMetrics struct { + recentCount tally.Gauge + recentMax tally.Gauge + total tally.Counter + exceeded tally.Counter +} + +var ( + _ QueryLimits = (*queryLimits)(nil) + _ LookbackLimit = (*lookbackLimit)(nil) +) + +// DefaultLookbackLimitOptions returns a new query limits manager. +func DefaultLookbackLimitOptions() LookbackLimitOptions { + return LookbackLimitOptions{ + // Default to no limit. + Limit: 0, + Lookback: defaultLookback, + } +} + +// NewQueryLimits returns a new query limits manager. +func NewQueryLimits( + instrumentOpts instrument.Options, + docsLimitOpts LookbackLimitOptions, + bytesReadLimitOpts LookbackLimitOptions, +) (QueryLimits, error) { + if err := docsLimitOpts.validate(); err != nil { + return nil, err + } + if err := bytesReadLimitOpts.validate(); err != nil { + return nil, err + } + docsLimit := newLookbackLimit(instrumentOpts, docsLimitOpts, "docs-matched") + bytesReadLimit := newLookbackLimit(instrumentOpts, bytesReadLimitOpts, "disk-bytes-read") + return &queryLimits{ + docsLimit: docsLimit, + bytesReadLimit: bytesReadLimit, + }, nil +} + +func newLookbackLimit( + instrumentOpts instrument.Options, + opts LookbackLimitOptions, + name string, +) *lookbackLimit { + return &lookbackLimit{ + name: name, + options: opts, + metrics: newLookbackLimitMetrics(instrumentOpts, name), + recent: atomic.NewInt64(0), + stopCh: make(chan struct{}), + } +} + +func newLookbackLimitMetrics(instrumentOpts instrument.Options, name string) lookbackLimitMetrics { + scope := instrumentOpts. + MetricsScope(). + SubScope("query-limit") + return lookbackLimitMetrics{ + recentCount: scope.Gauge(fmt.Sprintf("recent-count-%s", name)), + recentMax: scope.Gauge(fmt.Sprintf("recent-max-%s", name)), + total: scope.Counter(fmt.Sprintf("total-%s", name)), + exceeded: scope.Tagged(map[string]string{"limit": name}).Counter("exceeded"), + } +} + +func (q *queryLimits) DocsLimit() LookbackLimit { + return q.docsLimit +} + +func (q *queryLimits) BytesReadLimit() LookbackLimit { + return q.bytesReadLimit +} + +func (q *queryLimits) Start() { + q.docsLimit.start() + q.bytesReadLimit.start() +} + +func (q *queryLimits) Stop() { + q.docsLimit.stop() + q.bytesReadLimit.stop() +} + +func (q *queryLimits) AnyExceeded() error { + if err := q.docsLimit.exceeded(); err != nil { + return err + } + if err := q.bytesReadLimit.exceeded(); err != nil { + return err + } + return nil +} + +// Inc increments the current value and returns an error if above the limit. +func (q *lookbackLimit) Inc(val int) error { + if val < 0 { + return fmt.Errorf("invalid negative query limit inc %d", val) + } + if val == 0 { + return q.exceeded() + } + + // Add the new stats to the global state. + valI64 := int64(val) + recent := q.recent.Add(valI64) + + // Update metrics. + q.metrics.recentCount.Update(float64(recent)) + q.metrics.total.Inc(valI64) + + // Enforce limit (if specified). + return q.checkLimit(recent) +} + +func (q *lookbackLimit) exceeded() error { + return q.checkLimit(q.recent.Load()) +} + +func (q *lookbackLimit) checkLimit(recent int64) error { + if q.options.Limit > 0 && recent > q.options.Limit { + q.metrics.exceeded.Inc(1) + return fmt.Errorf( + "query aborted due to limit: name=%s, limit=%d, current=%d, within=%s", + q.name, q.options.Limit, recent, q.options.Lookback) + } + return nil +} + +func (q *lookbackLimit) start() { + ticker := time.NewTicker(q.options.Lookback) + go func() { + for { + select { + case <-ticker.C: + q.reset() + case <-q.stopCh: + ticker.Stop() + return + } + } + }() +} + +func (q *lookbackLimit) stop() { + close(q.stopCh) +} + +func (q *lookbackLimit) current() int64 { + return q.recent.Load() +} + +func (q *lookbackLimit) reset() { + // Update peak gauge only on resets so it only tracks + // the peak values for each lookback period. + recent := q.recent.Load() + q.metrics.recentMax.Update(float64(recent)) + + // Update the standard recent gauge to reflect drop back to zero. + q.metrics.recentCount.Update(0) + + q.recent.Store(0) +} + +func (opts LookbackLimitOptions) validate() error { + if opts.Limit < 0 { + return fmt.Errorf("query limit requires limit >= 0 (%d)", opts.Limit) + } + if opts.Lookback <= 0 { + return fmt.Errorf("query limit requires lookback > 0 (%d)", opts.Lookback) + } + return nil +} diff --git a/src/dbnode/storage/limits/query_limits_test.go b/src/dbnode/storage/limits/query_limits_test.go new file mode 100644 index 0000000000..8e55458d3b --- /dev/null +++ b/src/dbnode/storage/limits/query_limits_test.go @@ -0,0 +1,258 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package limits + +import ( + "fmt" + "testing" + "time" + + xclock "github.com/m3db/m3/src/x/clock" + "github.com/m3db/m3/src/x/instrument" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber-go/tally" +) + +func TestQueryLimits(t *testing.T) { + docOpts := LookbackLimitOptions{ + Limit: 1, + Lookback: time.Second, + } + bytesOpts := LookbackLimitOptions{ + Limit: 1, + Lookback: time.Second, + } + queryLimits, err := NewQueryLimits(instrument.NewOptions(), docOpts, bytesOpts) + require.NoError(t, err) + require.NotNil(t, queryLimits) + + // No error yet. + require.NoError(t, queryLimits.AnyExceeded()) + + // Limit from docs. + queryLimits.DocsLimit().Inc(2) + require.Error(t, queryLimits.AnyExceeded()) + + queryLimits, err = NewQueryLimits(instrument.NewOptions(), docOpts, bytesOpts) + require.NoError(t, err) + require.NotNil(t, queryLimits) + + // No error yet. + require.NoError(t, queryLimits.AnyExceeded()) + + // Limit from bytes. + queryLimits.BytesReadLimit().Inc(2) + require.Error(t, queryLimits.AnyExceeded()) +} + +func TestLookbackLimit(t *testing.T) { + for _, test := range []struct { + name string + limit int64 + }{ + {name: "no limit", limit: 0}, + {name: "limit", limit: 5}, + } { + t.Run(test.name, func(t *testing.T) { + scope := tally.NewTestScope("", nil) + iOpts := instrument.NewOptions().SetMetricsScope(scope) + opts := LookbackLimitOptions{ + Limit: test.limit, + Lookback: time.Millisecond * 100, + } + name := "test" + limit := newLookbackLimit(iOpts, opts, name) + + require.Equal(t, int64(0), limit.current()) + err := limit.exceeded() + require.NoError(t, err) + + // Validate ascending while checking limits. + var exceededCount int64 + exceededCount += verifyLimit(t, limit, 3, test.limit) + require.Equal(t, int64(3), limit.current()) + verifyMetrics(t, scope, name, 3, 0, 3, exceededCount) + + exceededCount += verifyLimit(t, limit, 2, test.limit) + require.Equal(t, int64(5), limit.current()) + verifyMetrics(t, scope, name, 5, 0, 5, exceededCount) + + exceededCount += verifyLimit(t, limit, 1, test.limit) + require.Equal(t, int64(6), limit.current()) + verifyMetrics(t, scope, name, 6, 0, 6, exceededCount) + + exceededCount += verifyLimit(t, limit, 4, test.limit) + require.Equal(t, int64(10), limit.current()) + verifyMetrics(t, scope, name, 10, 0, 10, exceededCount) + + // Validate first reset. + limit.reset() + require.Equal(t, int64(0), limit.current()) + verifyMetrics(t, scope, name, 0, 10, 10, exceededCount) + + // Validate ascending again post-reset. + exceededCount += verifyLimit(t, limit, 2, test.limit) + require.Equal(t, int64(2), limit.current()) + verifyMetrics(t, scope, name, 2, 10, 12, exceededCount) + + exceededCount += verifyLimit(t, limit, 5, test.limit) + require.Equal(t, int64(7), limit.current()) + verifyMetrics(t, scope, name, 7, 10, 17, exceededCount) + + // Validate second reset. + limit.reset() + + require.Equal(t, int64(0), limit.current()) + verifyMetrics(t, scope, name, 0, 7, 17, exceededCount) + + // Validate consecutive reset (ensure peak goes to zero). + limit.reset() + + require.Equal(t, int64(0), limit.current()) + verifyMetrics(t, scope, name, 0, 0, 17, exceededCount) + }) + } +} + +func verifyLimit(t *testing.T, limit *lookbackLimit, inc int, expectedLimit int64) int64 { + var exceededCount int64 + err := limit.Inc(inc) + if limit.current() <= expectedLimit || expectedLimit == 0 { + require.NoError(t, err) + } else { + require.Error(t, err) + exceededCount++ + } + err = limit.exceeded() + if limit.current() <= expectedLimit || expectedLimit == 0 { + require.NoError(t, err) + } else { + require.Error(t, err) + exceededCount++ + } + return exceededCount +} + +func TestLookbackReset(t *testing.T) { + scope := tally.NewTestScope("", nil) + iOpts := instrument.NewOptions().SetMetricsScope(scope) + opts := LookbackLimitOptions{ + Limit: 5, + Lookback: time.Millisecond * 100, + } + name := "test" + limit := newLookbackLimit(iOpts, opts, name) + + err := limit.Inc(3) + require.NoError(t, err) + require.Equal(t, int64(3), limit.current()) + + limit.start() + defer limit.stop() + time.Sleep(opts.Lookback * 2) + + success := xclock.WaitUntil(func() bool { + return limit.current() == 0 + }, 5*time.Second) + require.True(t, success, "did not eventually reset to zero") +} + +func TestValidateLookbackLimitOptions(t *testing.T) { + for _, test := range []struct { + name string + max int64 + lookback time.Duration + expectError bool + }{ + { + name: "valid lookback without limit", + max: 0, + lookback: time.Millisecond, + }, + { + name: "valid lookback with valid limit", + max: 1, + lookback: time.Millisecond, + }, + { + name: "negative lookback", + max: 0, + lookback: -time.Millisecond, + expectError: true, + }, + { + name: "zero lookback", + max: 0, + lookback: time.Duration(0), + expectError: true, + }, + { + name: "negative max", + max: -1, + lookback: time.Millisecond, + expectError: true, + }, + } { + t.Run(test.name, func(t *testing.T) { + err := LookbackLimitOptions{ + Limit: test.max, + Lookback: test.lookback, + }.validate() + if test.expectError { + require.Error(t, err) + } else { + require.NoError(t, err) + } + + // Validate empty. + require.Error(t, LookbackLimitOptions{}.validate()) + }) + } +} + +func verifyMetrics(t *testing.T, + scope tally.TestScope, + name string, + expectedRecent float64, + expectedRecentPeak float64, + expectedTotal int64, + expectedExceeded int64, +) { + snapshot := scope.Snapshot() + + recent, exists := snapshot.Gauges()[fmt.Sprintf("query-limit.recent-count-%s+", name)] + assert.True(t, exists) + assert.Equal(t, expectedRecent, recent.Value(), "recent count wrong") + + recentPeak, exists := snapshot.Gauges()[fmt.Sprintf("query-limit.recent-max-%s+", name)] + assert.True(t, exists) + assert.Equal(t, expectedRecentPeak, recentPeak.Value(), "recent max wrong") + + total, exists := snapshot.Counters()[fmt.Sprintf("query-limit.total-%s+", name)] + assert.True(t, exists) + assert.Equal(t, expectedTotal, total.Value(), "total wrong") + + exceeded, exists := snapshot.Counters()[fmt.Sprintf("query-limit.exceeded+limit=%s", name)] + assert.True(t, exists) + assert.Equal(t, expectedExceeded, exceeded.Value(), "exceeded wrong") +} diff --git a/src/dbnode/storage/limits/types.go b/src/dbnode/storage/limits/types.go new file mode 100644 index 0000000000..ebe027a2df --- /dev/null +++ b/src/dbnode/storage/limits/types.go @@ -0,0 +1,54 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package limits + +import ( + "time" +) + +// QueryLimits provides an interface for managing query limits. +type QueryLimits interface { + // DocsLimit limits queries by a global concurrent count of index docs matched. + DocsLimit() LookbackLimit + // DocsLimit limits queries by a global concurrent count of bytes read from disk. + BytesReadLimit() LookbackLimit + + // AnyExceeded returns an error if any of the query limits are exceeded. + AnyExceeded() error + // Start begins background resetting of the query limits. + Start() + // Stop end background resetting of the query limits. + Stop() +} + +// LookbackLimit provides an interface for a specific query limit. +type LookbackLimit interface { + // Inc increments the recent value for the limit. + Inc(new int) error +} + +// LookbackLimitOptions holds options for a lookback limit to be enforced. +type LookbackLimitOptions struct { + // Limit past which errors will be returned. + Limit int64 + // Lookback is the period over which the limit is enforced. + Lookback time.Duration +} diff --git a/src/dbnode/storage/stats/query_stats.go b/src/dbnode/storage/stats/query_stats.go deleted file mode 100644 index 1e925f97a2..0000000000 --- a/src/dbnode/storage/stats/query_stats.go +++ /dev/null @@ -1,165 +0,0 @@ -// Copyright (c) 2020 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package stats - -import ( - "fmt" - "time" - - "go.uber.org/atomic" -) - -// For tracking query stats in past X duration such as blocks queried. -type queryStats struct { - tracker QueryStatsTracker - - recentDocs *atomic.Int64 - stopCh chan struct{} -} - -type noOpQueryStats struct { -} - -var ( - _ QueryStats = (*queryStats)(nil) - _ QueryStats = (*noOpQueryStats)(nil) -) - -// QueryStats provides an interface for updating query stats. -type QueryStats interface { - Update(newDocs int) error - Start() - Stop() -} - -// QueryStatsOptions holds options for how a tracker should handle query stats. -type QueryStatsOptions struct { - // MaxDocs limits how many recently queried max - // documents are allowed before queries are abandoned. - MaxDocs int64 - // Lookback specifies the lookback period over which stats are aggregated. - Lookback time.Duration -} - -// QueryStatsValues stores values of query stats. -type QueryStatsValues struct { - RecentDocs int64 - NewDocs int64 -} - -var zeros = QueryStatsValues{ - RecentDocs: 0, - NewDocs: 0, -} - -// QueryStatsTracker provides an interface for tracking current query stats. -type QueryStatsTracker interface { - Lookback() time.Duration - TrackStats(stats QueryStatsValues) error -} - -// NewQueryStats enables query stats to be tracked within a recency lookback duration. -func NewQueryStats(tracker QueryStatsTracker) QueryStats { - return &queryStats{ - tracker: tracker, - recentDocs: atomic.NewInt64(0), - stopCh: make(chan struct{}), - } -} - -// NoOpQueryStats returns inactive query stats. -func NoOpQueryStats() QueryStats { - return &noOpQueryStats{} -} - -// UpdateQueryStats adds new query stats which are being tracked. -func (q *queryStats) Update(newDocs int) error { - if q == nil { - return nil - } - if newDocs <= 0 { - return nil - } - - newDocsI64 := int64(newDocs) - - // Add the new stats to the global state. - recentDocs := q.recentDocs.Add(newDocsI64) - - values := QueryStatsValues{ - RecentDocs: recentDocs, - NewDocs: newDocsI64, - } - - // Invoke the custom tracker based on the new stats values. - return q.tracker.TrackStats(values) -} - -// Start initializes background processing for handling query stats. -func (q *queryStats) Start() { - if q == nil { - return - } - go func() { - ticker := time.NewTicker(q.tracker.Lookback()) - defer ticker.Stop() - for { - select { - case <-ticker.C: - // Clear recent docs every X duration. - q.recentDocs.Store(0) - - // Also invoke the track func for having zero value. - _ = q.tracker.TrackStats(zeros) - case <-q.stopCh: - return - } - } - }() -} - -func (q *queryStats) Stop() { - if q == nil { - return - } - close(q.stopCh) -} - -func (q *noOpQueryStats) Update(int) error { - return nil -} - -func (q *noOpQueryStats) Stop() { -} - -func (q *noOpQueryStats) Start() { -} - -// Validate returns an error if the query stats options are invalid. -func (opts QueryStatsOptions) Validate() error { - if opts.MaxDocs < 0 { - return fmt.Errorf("query stats tracker requires max docs >= 0 (%d)", opts.MaxDocs) - } - if opts.Lookback <= 0 { - return fmt.Errorf("query stats tracker requires lookback > 0 (%d)", opts.Lookback) - } - return nil -} diff --git a/src/dbnode/storage/stats/query_stats_default_tracker.go b/src/dbnode/storage/stats/query_stats_default_tracker.go deleted file mode 100644 index 220ba1c5ef..0000000000 --- a/src/dbnode/storage/stats/query_stats_default_tracker.go +++ /dev/null @@ -1,79 +0,0 @@ -// Copyright (c) 2020 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package stats - -import ( - "fmt" - "time" - - "github.com/m3db/m3/src/x/instrument" - - "github.com/uber-go/tally" -) - -// DefaultLookback is the default lookback used for query stats tracking. -const DefaultLookback = time.Second * 5 - -// Tracker implementation that emits query stats as metrics. -type queryStatsTracker struct { - recentDocs tally.Gauge - totalDocs tally.Counter - - options QueryStatsOptions -} - -var _ QueryStatsTracker = (*queryStatsTracker)(nil) - -// DefaultQueryStatsTracker provides a tracker -// implementation that emits query stats as metrics -// and enforces limits. -func DefaultQueryStatsTracker( - instrumentOpts instrument.Options, - queryStatsOpts QueryStatsOptions, -) QueryStatsTracker { - scope := instrumentOpts. - MetricsScope(). - SubScope("query-stats") - return &queryStatsTracker{ - options: queryStatsOpts, - recentDocs: scope.Gauge("recent-docs-per-block"), - totalDocs: scope.Counter("total-docs-per-block"), - } -} - -func (t *queryStatsTracker) TrackStats(values QueryStatsValues) error { - // Track stats as metrics. - t.recentDocs.Update(float64(values.RecentDocs)) - t.totalDocs.Inc(values.NewDocs) - - // Enforce max queried docs (if specified). - if t.options.MaxDocs > 0 && values.RecentDocs > t.options.MaxDocs { - return fmt.Errorf( - "query aborted, global recent time series blocks over limit: "+ - "limit=%d, current=%d, within=%s", - t.options.MaxDocs, values.RecentDocs, t.options.Lookback) - } - return nil -} - -func (t *queryStatsTracker) Lookback() time.Duration { - return t.options.Lookback -} diff --git a/src/dbnode/storage/stats/query_stats_default_tracker_test.go b/src/dbnode/storage/stats/query_stats_default_tracker_test.go deleted file mode 100644 index 8913cb1972..0000000000 --- a/src/dbnode/storage/stats/query_stats_default_tracker_test.go +++ /dev/null @@ -1,191 +0,0 @@ -// Copyright (c) 2020 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package stats - -import ( - "testing" - "time" - - "github.com/m3db/m3/src/x/instrument" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "github.com/uber-go/tally" -) - -func TestValidateTrackerInputs(t *testing.T) { - for _, test := range []struct { - name string - maxDocs int64 - lookback time.Duration - expectedError string - }{ - { - name: "valid lookback without limit", - maxDocs: 0, - lookback: time.Millisecond, - }, - { - name: "valid lookback with valid limit", - maxDocs: 1, - lookback: time.Millisecond, - }, - { - name: "negative lookback", - maxDocs: 0, - lookback: -time.Millisecond, - expectedError: "query stats tracker requires lookback > 0 (-1000000)", - }, - { - name: "zero lookback", - maxDocs: 0, - lookback: time.Duration(0), - expectedError: "query stats tracker requires lookback > 0 (0)", - }, - { - name: "negative max", - maxDocs: -1, - lookback: time.Millisecond, - expectedError: "query stats tracker requires max docs >= 0 (-1)", - }, - } { - t.Run(test.name, func(t *testing.T) { - err := QueryStatsOptions{ - MaxDocs: test.maxDocs, - Lookback: test.lookback, - }.Validate() - if test.expectedError != "" { - require.Error(t, err) - require.Equal(t, test.expectedError, err.Error()) - } else { - require.NoError(t, err) - } - }) - } -} - -func TestEmitQueryStatsBasedMetrics(t *testing.T) { - for _, test := range []struct { - name string - opts QueryStatsOptions - }{ - { - name: "metrics only", - opts: QueryStatsOptions{ - Lookback: time.Second, - }, - }, - { - name: "metrics and limits", - opts: QueryStatsOptions{ - MaxDocs: 1000, - Lookback: time.Second, - }, - }, - } { - t.Run(test.name, func(t *testing.T) { - scope := tally.NewTestScope("", nil) - opts := instrument.NewOptions().SetMetricsScope(scope) - - tracker := DefaultQueryStatsTracker(opts, test.opts) - - err := tracker.TrackStats(QueryStatsValues{RecentDocs: 100, NewDocs: 5}) - require.NoError(t, err) - verifyMetrics(t, scope, 100, 5) - - err = tracker.TrackStats(QueryStatsValues{RecentDocs: 140, NewDocs: 10}) - require.NoError(t, err) - verifyMetrics(t, scope, 140, 15) - }) - } -} - -func TestLimitMaxDocs(t *testing.T) { - scope := tally.NewTestScope("", nil) - opts := instrument.NewOptions().SetMetricsScope(scope) - - maxDocs := int64(100) - - for _, test := range []struct { - name string - opts QueryStatsOptions - expectLimitError string - }{ - { - name: "metrics only", - opts: QueryStatsOptions{ - Lookback: time.Second, - }, - }, - { - name: "metrics and limits", - opts: QueryStatsOptions{ - MaxDocs: 100, - Lookback: time.Second, - }, - expectLimitError: "query aborted, global recent time series blocks over limit: limit=100, current=101, within=1s", - }, - } { - t.Run(test.name, func(t *testing.T) { - tracker := DefaultQueryStatsTracker(opts, test.opts) - - err := tracker.TrackStats(QueryStatsValues{RecentDocs: maxDocs + 1}) - if test.expectLimitError != "" { - require.Error(t, err) - require.Equal(t, test.expectLimitError, err.Error()) - } else { - require.NoError(t, err) - } - - err = tracker.TrackStats(QueryStatsValues{RecentDocs: maxDocs - 1}) - require.NoError(t, err) - - err = tracker.TrackStats(QueryStatsValues{RecentDocs: 0}) - require.NoError(t, err) - - err = tracker.TrackStats(QueryStatsValues{RecentDocs: maxDocs + 1}) - if test.expectLimitError != "" { - require.Error(t, err) - require.Equal(t, test.expectLimitError, err.Error()) - } else { - require.NoError(t, err) - } - - err = tracker.TrackStats(QueryStatsValues{RecentDocs: maxDocs - 1}) - require.NoError(t, err) - - err = tracker.TrackStats(QueryStatsValues{RecentDocs: 0}) - require.NoError(t, err) - }) - } -} - -func verifyMetrics(t *testing.T, scope tally.TestScope, expectedRecent float64, expectedTotal int64) { - snapshot := scope.Snapshot() - - recent, exists := snapshot.Gauges()["query-stats.recent-docs-per-block+"] - assert.True(t, exists) - assert.Equal(t, expectedRecent, recent.Value()) - - total, exists := snapshot.Counters()["query-stats.total-docs-per-block+"] - assert.True(t, exists) - assert.Equal(t, expectedTotal, total.Value()) -} diff --git a/src/dbnode/storage/stats/query_stats_test.go b/src/dbnode/storage/stats/query_stats_test.go deleted file mode 100644 index db746c7094..0000000000 --- a/src/dbnode/storage/stats/query_stats_test.go +++ /dev/null @@ -1,103 +0,0 @@ -// Copyright (c) 2020 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package stats - -import ( - "sync" - "testing" - "time" - - xclock "github.com/m3db/m3/src/x/clock" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -type testQueryStatsTracker struct { - sync.RWMutex - QueryStatsValues - lookback time.Duration -} - -var _ QueryStatsTracker = (*testQueryStatsTracker)(nil) - -func (t *testQueryStatsTracker) TrackStats(values QueryStatsValues) error { - t.Lock() - defer t.Unlock() - - t.QueryStatsValues = values - return nil -} - -func (t *testQueryStatsTracker) StatsValues() QueryStatsValues { - t.RLock() - defer t.RUnlock() - - return t.QueryStatsValues -} - -func (t *testQueryStatsTracker) Lookback() time.Duration { - return t.lookback -} - -func TestUpdateTracker(t *testing.T) { - tracker := &testQueryStatsTracker{} - - queryStats := NewQueryStats(tracker) - defer queryStats.Stop() - - err := queryStats.Update(3) - require.NoError(t, err) - verifyStats(t, tracker, 3, 3) - - err = queryStats.Update(2) - require.NoError(t, err) - verifyStats(t, tracker, 2, 5) -} - -func TestPeriodicallyResetRecentDocs(t *testing.T) { - tracker := &testQueryStatsTracker{lookback: time.Millisecond} - - queryStats := NewQueryStats(tracker) - - err := queryStats.Update(1) - require.NoError(t, err) - verifyStats(t, tracker, 1, 1) - - queryStats.Start() - defer queryStats.Stop() - time.Sleep(tracker.lookback * 2) - - success := xclock.WaitUntil(func() bool { - return statsEqual(tracker.StatsValues(), 0, 0) - }, 10*time.Second) - require.True(t, success, "did not eventually reset") -} - -func verifyStats(t *testing.T, tracker *testQueryStatsTracker, expectedNew int64, expectedRecent int64) { - values := tracker.StatsValues() - assert.True(t, statsEqual(values, expectedNew, expectedRecent)) -} - -func statsEqual(values QueryStatsValues, expectedNew int64, expectedRecent int64) bool { - return expectedNew == values.NewDocs && - expectedRecent == values.RecentDocs -}