Skip to content

Commit

Permalink
[dbnode] additional query limit on bytes read (#2627)
Browse files Browse the repository at this point in the history
  • Loading branch information
rallen090 authored Sep 15, 2020
1 parent 3aa2f51 commit b71785e
Show file tree
Hide file tree
Showing 19 changed files with 729 additions and 609 deletions.
1 change: 1 addition & 0 deletions src/cmd/services/m3dbnode/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,7 @@ func TestConfiguration(t *testing.T) {
meta_event_reporting_enabled: false
limits:
maxRecentlyQueriedSeriesBlocks: null
maxRecentlyQueriedSeriesDiskBytesRead: null
maxOutstandingWriteRequests: 0
maxOutstandingReadRequests: 0
maxOutstandingRepairedBytes: 0
Expand Down
21 changes: 12 additions & 9 deletions src/cmd/services/m3dbnode/config/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@ type LimitsConfiguration struct {
// MaxRecentlyQueriedSeriesBlocks sets the upper limit on time series blocks
// count within a given lookback period. Queries which are issued while this
// max is surpassed encounter an error.
MaxRecentlyQueriedSeriesBlocks *MaxRecentlyQueriedSeriesBlocksConfiguration `yaml:"maxRecentlyQueriedSeriesBlocks"`
MaxRecentlyQueriedSeriesBlocks *MaxRecentQueryResourceLimitConfiguration `yaml:"maxRecentlyQueriedSeriesBlocks"`

// MaxRecentlyQueriedSeriesDiskBytesRead sets the upper limit on time series bytes
// read from disk within a given lookback period. Queries which are issued while this
// max is surpassed encounter an error.
MaxRecentlyQueriedSeriesDiskBytesRead *MaxRecentQueryResourceLimitConfiguration `yaml:"maxRecentlyQueriedSeriesDiskBytesRead"`

// MaxOutstandingWriteRequests controls the maximum number of outstanding write requests
// that the server will allow before it begins rejecting requests. Note that this value
Expand Down Expand Up @@ -55,14 +60,12 @@ type LimitsConfiguration struct {
MaxEncodersPerBlock int `yaml:"maxEncodersPerBlock" validate:"min=0"`
}

// MaxRecentlyQueriedSeriesBlocksConfiguration sets the upper limit on time
// series blocks count within a given lookback period. Queries which are issued
// while this max is surpassed encounter an error.
type MaxRecentlyQueriedSeriesBlocksConfiguration struct {
// Value sets the max recently queried time series blocks for the given
// time window.
// MaxRecentQueryResourceLimitConfiguration sets an upper limit on resources consumed by all queries
// globally within a dbnode per some lookback period of time. Once exceeded, queries within that period
// of time will be abandoned.
type MaxRecentQueryResourceLimitConfiguration struct {
// Value sets the max value for the resource limit.
Value int64 `yaml:"value" validate:"min=0"`
// Lookback is the period to time window the max value of time series
// blocks allowed to be queried.
// Lookback is the period in which a given resource limit is enforced.
Lookback time.Duration `yaml:"lookback" validate:"min=0"`
}
30 changes: 27 additions & 3 deletions src/dbnode/persist/fs/retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/sharding"
"github.com/m3db/m3/src/dbnode/storage/block"
"github.com/m3db/m3/src/dbnode/storage/limits"
"github.com/m3db/m3/src/dbnode/ts"
"github.com/m3db/m3/src/dbnode/x/xio"
"github.com/m3db/m3/src/x/checked"
Expand Down Expand Up @@ -79,9 +80,11 @@ const (
type blockRetriever struct {
sync.RWMutex

opts BlockRetrieverOptions
fsOpts Options
logger *zap.Logger
opts BlockRetrieverOptions
fsOpts Options
logger *zap.Logger
queryLimits limits.QueryLimits
bytesReadLimit limits.LookbackLimit

newSeekerMgrFn newSeekerMgrFn

Expand Down Expand Up @@ -113,6 +116,8 @@ func NewBlockRetriever(
opts: opts,
fsOpts: fsOpts,
logger: fsOpts.InstrumentOptions().Logger(),
queryLimits: opts.QueryLimits(),
bytesReadLimit: opts.QueryLimits().BytesReadLimit(),
newSeekerMgrFn: NewSeekerManager,
reqPool: opts.RetrieveRequestPool(),
bytesPool: opts.BytesPool(),
Expand Down Expand Up @@ -288,6 +293,13 @@ func (r *blockRetriever) fetchBatch(
reqs []*retrieveRequest,
seekerResources ReusableSeekerResources,
) {
if err := r.queryLimits.AnyExceeded(); err != nil {
for _, req := range reqs {
req.onError(err)
}
return
}

// Resolve the seeker from the seeker mgr
seeker, err := seekerMgr.Borrow(shard, blockStart)
if err != nil {
Expand All @@ -299,13 +311,25 @@ func (r *blockRetriever) fetchBatch(

// Sort the requests by offset into the file before seeking
// to ensure all seeks are in ascending order
var limitErr error
for _, req := range reqs {
if limitErr != nil {
req.onError(limitErr)
continue
}

entry, err := seeker.SeekIndexEntry(req.id, seekerResources)
if err != nil && err != errSeekIDNotFound {
req.onError(err)
continue
}

if err := r.bytesReadLimit.Inc(int(entry.Size)); err != nil {
req.onError(err)
limitErr = err
continue
}

if err == errSeekIDNotFound {
req.notFound = true
}
Expand Down
13 changes: 13 additions & 0 deletions src/dbnode/persist/fs/retriever_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"runtime"

"github.com/m3db/m3/src/dbnode/storage/block"
"github.com/m3db/m3/src/dbnode/storage/limits"
"github.com/m3db/m3/src/dbnode/x/xio"
"github.com/m3db/m3/src/x/ident"
"github.com/m3db/m3/src/x/pool"
Expand All @@ -43,6 +44,7 @@ type blockRetrieverOptions struct {
fetchConcurrency int
identifierPool ident.Pool
blockLeaseManager block.LeaseManager
queryLimits limits.QueryLimits
}

// NewBlockRetrieverOptions creates a new set of block retriever options
Expand All @@ -65,6 +67,7 @@ func NewBlockRetrieverOptions() BlockRetrieverOptions {
bytesPool: bytesPool,
fetchConcurrency: defaultFetchConcurrency,
identifierPool: ident.NewPool(bytesPool, ident.PoolOptions{}),
queryLimits: limits.NoOpQueryLimits(),
}

return o
Expand Down Expand Up @@ -126,3 +129,13 @@ func (o *blockRetrieverOptions) SetBlockLeaseManager(leaseMgr block.LeaseManager
func (o *blockRetrieverOptions) BlockLeaseManager() block.LeaseManager {
return o.blockLeaseManager
}

func (o *blockRetrieverOptions) SetQueryLimits(value limits.QueryLimits) BlockRetrieverOptions {
opts := *o
opts.queryLimits = value
return &opts
}

func (o *blockRetrieverOptions) QueryLimits() limits.QueryLimits {
return o.queryLimits
}
7 changes: 7 additions & 0 deletions src/dbnode/persist/fs/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/m3db/m3/src/dbnode/sharding"
"github.com/m3db/m3/src/dbnode/storage/block"
"github.com/m3db/m3/src/dbnode/storage/bootstrap/result"
"github.com/m3db/m3/src/dbnode/storage/limits"
"github.com/m3db/m3/src/dbnode/x/xio"
"github.com/m3db/m3/src/m3ninx/doc"
"github.com/m3db/m3/src/m3ninx/index/segment/fst"
Expand Down Expand Up @@ -561,6 +562,12 @@ type BlockRetrieverOptions interface {

// BlockLeaseManager returns the block leaser.
BlockLeaseManager() block.LeaseManager

// SetQueryLimits sets query limits.
SetQueryLimits(value limits.QueryLimits) BlockRetrieverOptions

// QueryLimits returns the query limits.
QueryLimits() limits.QueryLimits
}

// ForEachRemainingFn is the function that is run on each of the remaining
Expand Down
41 changes: 17 additions & 24 deletions src/dbnode/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ import (
"github.com/m3db/m3/src/dbnode/storage/bootstrap/result"
"github.com/m3db/m3/src/dbnode/storage/cluster"
"github.com/m3db/m3/src/dbnode/storage/index"
"github.com/m3db/m3/src/dbnode/storage/limits"
"github.com/m3db/m3/src/dbnode/storage/series"
"github.com/m3db/m3/src/dbnode/storage/stats"
"github.com/m3db/m3/src/dbnode/topology"
"github.com/m3db/m3/src/dbnode/ts"
"github.com/m3db/m3/src/dbnode/ts/writes"
Expand Down Expand Up @@ -138,9 +138,6 @@ type RunOptions struct {
// interrupt and shutdown the server.
InterruptCh <-chan error

// QueryStatsTrackerFn returns a tracker for tracking query stats.
QueryStatsTrackerFn func(instrument.Options, stats.QueryStatsOptions) stats.QueryStatsTracker

// CustomOptions are custom options to apply to the session.
CustomOptions []client.CustomAdminOption

Expand Down Expand Up @@ -410,27 +407,22 @@ func Run(runOpts RunOptions) {
defer stopReporting()

// Setup query stats tracking.
statsOpts := stats.QueryStatsOptions{
Lookback: stats.DefaultLookback,
}
if max := runOpts.Config.Limits.MaxRecentlyQueriedSeriesBlocks; max != nil {
statsOpts = stats.QueryStatsOptions{
MaxDocs: max.Value,
Lookback: max.Lookback,
}
docsLimit := limits.DefaultLookbackLimitOptions()
bytesReadLimit := limits.DefaultLookbackLimitOptions()
if limitConfig := runOpts.Config.Limits.MaxRecentlyQueriedSeriesBlocks; limitConfig != nil {
docsLimit.Limit = limitConfig.Value
docsLimit.Lookback = limitConfig.Lookback
}
if err := statsOpts.Validate(); err != nil {
logger.Fatal("could not construct query stats options from config", zap.Error(err))
if limitConfig := runOpts.Config.Limits.MaxRecentlyQueriedSeriesDiskBytesRead; limitConfig != nil {
bytesReadLimit.Limit = limitConfig.Value
bytesReadLimit.Lookback = limitConfig.Lookback
}

tracker := stats.DefaultQueryStatsTracker(iopts, statsOpts)
if runOpts.QueryStatsTrackerFn != nil {
tracker = runOpts.QueryStatsTrackerFn(iopts, statsOpts)
queryLimits, err := limits.NewQueryLimits(iopts, docsLimit, bytesReadLimit)
if err != nil {
logger.Fatal("could not construct docs query limits from config", zap.Error(err))
}

queryStats := stats.NewQueryStats(tracker)
queryStats.Start()
defer queryStats.Stop()
queryLimits.Start()
defer queryLimits.Stop()

// FOLLOWUP(prateek): remove this once we have the runtime options<->index wiring done
indexOpts := opts.IndexOptions()
Expand All @@ -445,7 +437,7 @@ func Run(runOpts RunOptions) {
CacheTerms: plCacheConfig.CacheTermsOrDefault(),
}).
SetMmapReporter(mmapReporter).
SetQueryStats(queryStats)
SetQueryLimits(queryLimits)
opts = opts.SetIndexOptions(indexOpts)

if tick := cfg.Tick; tick != nil {
Expand Down Expand Up @@ -559,7 +551,8 @@ func Run(runOpts RunOptions) {
SetBytesPool(opts.BytesPool()).
SetRetrieveRequestPool(opts.RetrieveRequestPool()).
SetIdentifierPool(opts.IdentifierPool()).
SetBlockLeaseManager(blockLeaseManager)
SetBlockLeaseManager(blockLeaseManager).
SetQueryLimits(queryLimits)
if blockRetrieveCfg := cfg.BlockRetrieve; blockRetrieveCfg != nil {
retrieverOpts = retrieverOpts.
SetFetchConcurrency(blockRetrieveCfg.FetchConcurrency)
Expand Down
10 changes: 10 additions & 0 deletions src/dbnode/storage/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/m3db/m3/src/dbnode/storage/block"
dberrors "github.com/m3db/m3/src/dbnode/storage/errors"
"github.com/m3db/m3/src/dbnode/storage/index"
"github.com/m3db/m3/src/dbnode/storage/limits"
"github.com/m3db/m3/src/dbnode/tracepoint"
"github.com/m3db/m3/src/dbnode/ts"
"github.com/m3db/m3/src/dbnode/ts/writes"
Expand Down Expand Up @@ -114,6 +115,8 @@ type db struct {
log *zap.Logger

writeBatchPool *writes.WriteBatchPool

queryLimits limits.QueryLimits
}

type databaseMetrics struct {
Expand Down Expand Up @@ -186,6 +189,7 @@ func NewDatabase(
metrics: newDatabaseMetrics(scope),
log: logger,
writeBatchPool: opts.WriteBatchPool(),
queryLimits: opts.IndexOptions().QueryLimits(),
}

databaseIOpts := iopts.SetMetricsScope(scope)
Expand Down Expand Up @@ -826,6 +830,12 @@ func (d *db) QueryIDs(
}
defer sp.Finish()

// Check if exceeding query limits at very beginning of
// query path to abandon as early as possible.
if err := d.queryLimits.AnyExceeded(); err != nil {
return index.QueryResult{}, err
}

n, err := d.namespaceFor(namespace)
if err != nil {
sp.LogFields(opentracinglog.Error(err))
Expand Down
12 changes: 7 additions & 5 deletions src/dbnode/storage/index/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/storage/bootstrap/result"
"github.com/m3db/m3/src/dbnode/storage/stats"
"github.com/m3db/m3/src/dbnode/storage/limits"
"github.com/m3db/m3/src/dbnode/tracepoint"
"github.com/m3db/m3/src/m3ninx/doc"
m3ninxindex "github.com/m3db/m3/src/m3ninx/index"
Expand Down Expand Up @@ -141,7 +141,8 @@ type block struct {
blockOpts BlockOptions
nsMD namespace.Metadata
namespaceRuntimeOptsMgr namespace.RuntimeOptionsManager
queryStats stats.QueryStats
queryLimits limits.QueryLimits
docsLimit limits.LookbackLimit

metrics blockMetrics
logger *zap.Logger
Expand Down Expand Up @@ -249,7 +250,8 @@ func NewBlock(
namespaceRuntimeOptsMgr: namespaceRuntimeOptsMgr,
metrics: newBlockMetrics(scope),
logger: iopts.Logger(),
queryStats: opts.QueryStats(),
queryLimits: opts.QueryLimits(),
docsLimit: opts.QueryLimits().DocsLimit(),
}
b.newFieldsAndTermsIteratorFn = newFieldsAndTermsIterator
b.newExecutorWithRLockFn = b.executorWithRLock
Expand Down Expand Up @@ -533,7 +535,7 @@ func (b *block) addQueryResults(
batch []doc.Document,
) ([]doc.Document, int, int, error) {
// update recently queried docs to monitor memory.
if err := b.queryStats.Update(len(batch)); err != nil {
if err := b.docsLimit.Inc(len(batch)); err != nil {
return batch, 0, 0, err
}

Expand Down Expand Up @@ -794,7 +796,7 @@ func (b *block) addAggregateResults(
batch []AggregateResultsEntry,
) ([]AggregateResultsEntry, int, int, error) {
// update recently queried docs to monitor memory.
if err := b.queryStats.Update(len(batch)); err != nil {
if err := b.docsLimit.Inc(len(batch)); err != nil {
return batch, 0, 0, err
}

Expand Down
28 changes: 14 additions & 14 deletions src/dbnode/storage/index/index_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit b71785e

Please sign in to comment.