Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dbnode] additional query limit on bytes read #2627

Merged
merged 56 commits into from
Sep 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
b0cb1c2
prototype recent bytes
rallen090 Sep 3, 2020
62d57e8
prototype recent bytes
rallen090 Sep 3, 2020
16163e5
limit testing
rallen090 Sep 4, 2020
1feee14
limit testing 2
rallen090 Sep 4, 2020
f171993
limit testing 3
rallen090 Sep 4, 2020
4d92f3b
Remove caching for testing read perf /build-docker
nbroyles Sep 2, 2020
4e30cd8
allow zero update on query stats
rallen090 Sep 4, 2020
1e8f442
proper implementation for bytes-read limits
rallen090 Sep 8, 2020
ec0391c
proper implementation for bytes-read limits 2
rallen090 Sep 8, 2020
9bc1db4
proper implementation for bytes-read limits 3
rallen090 Sep 8, 2020
26b1500
proper implementation for bytes-read limits 4
rallen090 Sep 8, 2020
0cc8811
proper implementation for bytes-read limits 5
rallen090 Sep 8, 2020
193f105
tests 1
rallen090 Sep 8, 2020
708566e
tests 2
rallen090 Sep 8, 2020
4632616
revert makefil
rallen090 Sep 8, 2020
2833058
cleanup 1
rallen090 Sep 8, 2020
dcdf032
cleanup 2
rallen090 Sep 8, 2020
2ab180b
Merge remote-tracking branch 'origin/master' into ra/query-limit-test
rallen090 Sep 8, 2020
f8e9794
cleanup 3
rallen090 Sep 8, 2020
2228682
cleanup 4
rallen090 Sep 8, 2020
5df84b1
cleanup 5
rallen090 Sep 8, 2020
64090cc
feedback 1
rallen090 Sep 8, 2020
c20259c
feedback 2
rallen090 Sep 8, 2020
7fecc80
feedback 3
rallen090 Sep 8, 2020
6deb435
feedback 4
rallen090 Sep 8, 2020
5909159
feedback 5
rallen090 Sep 8, 2020
c715607
more cleanup
rallen090 Sep 8, 2020
5d2da73
Merge remote-tracking branch 'origin/master' into ra/query-limit-test
rallen090 Sep 8, 2020
1934e4d
fixed lookback ref
rallen090 Sep 8, 2020
6f36b33
feedback 1
rallen090 Sep 10, 2020
7707d2f
feedback 2
rallen090 Sep 10, 2020
b2d3ad2
refactor query limits 1
rallen090 Sep 10, 2020
927acaf
refactor query limits 2
rallen090 Sep 10, 2020
407a824
refactor query limits 3
rallen090 Sep 10, 2020
d5c8c59
refactor query limits 4
rallen090 Sep 10, 2020
0e892e2
feedback 1
rallen090 Sep 11, 2020
700fe85
feedback 2
rallen090 Sep 11, 2020
7d358a8
feedback 3
rallen090 Sep 11, 2020
ce0c814
feedback 4
rallen090 Sep 11, 2020
b71f8ef
feedback 5
rallen090 Sep 11, 2020
fc286d9
Merge remote-tracking branch 'origin/master' into ra/query-limit-test
rallen090 Sep 11, 2020
8d02d55
feedback 6
rallen090 Sep 11, 2020
46486ca
feedback 7
rallen090 Sep 11, 2020
95933c5
feedback 8
rallen090 Sep 11, 2020
0ed5355
feedback 0
rallen090 Sep 11, 2020
ed64402
feedback 10
rallen090 Sep 11, 2020
4dbce17
feedback 11
rallen090 Sep 11, 2020
e7c5cc6
more fixes 1
rallen090 Sep 11, 2020
334603e
more fixes 2
rallen090 Sep 11, 2020
a1a1b4d
more feedback 1
rallen090 Sep 14, 2020
4a85610
Merge branch 'master' into ra/query-limit-test
rallen090 Sep 14, 2020
b219187
Merge branch 'master' into ra/query-limit-test
rallen090 Sep 15, 2020
2aeefa0
metric name change
rallen090 Sep 15, 2020
7d98ae1
metric name change 2
rallen090 Sep 15, 2020
5a26a4a
Merge remote-tracking branch 'origin/master' into ra/query-limit-test
rallen090 Sep 15, 2020
63158be
Merge branch 'master' into ra/query-limit-test
rallen090 Sep 15, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/cmd/services/m3dbnode/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,7 @@ func TestConfiguration(t *testing.T) {
meta_event_reporting_enabled: false
limits:
maxRecentlyQueriedSeriesBlocks: null
maxRecentlyQueriedSeriesDiskBytesRead: null
maxOutstandingWriteRequests: 0
maxOutstandingReadRequests: 0
maxOutstandingRepairedBytes: 0
Expand Down
21 changes: 12 additions & 9 deletions src/cmd/services/m3dbnode/config/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@ type LimitsConfiguration struct {
// MaxRecentlyQueriedSeriesBlocks sets the upper limit on time series blocks
// count within a given lookback period. Queries which are issued while this
// max is surpassed encounter an error.
MaxRecentlyQueriedSeriesBlocks *MaxRecentlyQueriedSeriesBlocksConfiguration `yaml:"maxRecentlyQueriedSeriesBlocks"`
MaxRecentlyQueriedSeriesBlocks *MaxRecentQueryResourceLimitConfiguration `yaml:"maxRecentlyQueriedSeriesBlocks"`

// MaxRecentlyQueriedSeriesDiskBytesRead sets the upper limit on time series bytes
// read from disk within a given lookback period. Queries which are issued while this
// max is surpassed encounter an error.
MaxRecentlyQueriedSeriesDiskBytesRead *MaxRecentQueryResourceLimitConfiguration `yaml:"maxRecentlyQueriedSeriesDiskBytesRead"`

// MaxOutstandingWriteRequests controls the maximum number of outstanding write requests
// that the server will allow before it begins rejecting requests. Note that this value
Expand Down Expand Up @@ -55,14 +60,12 @@ type LimitsConfiguration struct {
MaxEncodersPerBlock int `yaml:"maxEncodersPerBlock" validate:"min=0"`
}

// MaxRecentlyQueriedSeriesBlocksConfiguration sets the upper limit on time
// series blocks count within a given lookback period. Queries which are issued
// while this max is surpassed encounter an error.
type MaxRecentlyQueriedSeriesBlocksConfiguration struct {
// Value sets the max recently queried time series blocks for the given
// time window.
// MaxRecentQueryResourceLimitConfiguration sets an upper limit on resources consumed by all queries
// globally within a dbnode per some lookback period of time. Once exceeded, queries within that period
// of time will be abandoned.
type MaxRecentQueryResourceLimitConfiguration struct {
// Value sets the max value for the resource limit.
Value int64 `yaml:"value" validate:"min=0"`
// Lookback is the period to time window the max value of time series
// blocks allowed to be queried.
// Lookback is the period in which a given resource limit is enforced.
Lookback time.Duration `yaml:"lookback" validate:"min=0"`
}
30 changes: 27 additions & 3 deletions src/dbnode/persist/fs/retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/sharding"
"github.com/m3db/m3/src/dbnode/storage/block"
"github.com/m3db/m3/src/dbnode/storage/limits"
"github.com/m3db/m3/src/dbnode/ts"
"github.com/m3db/m3/src/dbnode/x/xio"
"github.com/m3db/m3/src/x/checked"
Expand Down Expand Up @@ -79,9 +80,11 @@ const (
type blockRetriever struct {
sync.RWMutex

opts BlockRetrieverOptions
fsOpts Options
logger *zap.Logger
opts BlockRetrieverOptions
fsOpts Options
logger *zap.Logger
queryLimits limits.QueryLimits
bytesReadLimit limits.LookbackLimit

newSeekerMgrFn newSeekerMgrFn

Expand Down Expand Up @@ -113,6 +116,8 @@ func NewBlockRetriever(
opts: opts,
fsOpts: fsOpts,
logger: fsOpts.InstrumentOptions().Logger(),
queryLimits: opts.QueryLimits(),
bytesReadLimit: opts.QueryLimits().BytesReadLimit(),
newSeekerMgrFn: NewSeekerManager,
reqPool: opts.RetrieveRequestPool(),
bytesPool: opts.BytesPool(),
Expand Down Expand Up @@ -288,6 +293,13 @@ func (r *blockRetriever) fetchBatch(
reqs []*retrieveRequest,
seekerResources ReusableSeekerResources,
) {
if err := r.queryLimits.AnyExceeded(); err != nil {
for _, req := range reqs {
req.onError(err)
}
return
}

// Resolve the seeker from the seeker mgr
seeker, err := seekerMgr.Borrow(shard, blockStart)
if err != nil {
Expand All @@ -299,13 +311,25 @@ func (r *blockRetriever) fetchBatch(

// Sort the requests by offset into the file before seeking
// to ensure all seeks are in ascending order
var limitErr error
for _, req := range reqs {
if limitErr != nil {
req.onError(limitErr)
continue
}

entry, err := seeker.SeekIndexEntry(req.id, seekerResources)
if err != nil && err != errSeekIDNotFound {
req.onError(err)
continue
}

if err := r.bytesReadLimit.Inc(int(entry.Size)); err != nil {
req.onError(err)
limitErr = err
continue
}

if err == errSeekIDNotFound {
req.notFound = true
}
Expand Down
13 changes: 13 additions & 0 deletions src/dbnode/persist/fs/retriever_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"runtime"

"github.com/m3db/m3/src/dbnode/storage/block"
"github.com/m3db/m3/src/dbnode/storage/limits"
"github.com/m3db/m3/src/dbnode/x/xio"
"github.com/m3db/m3/src/x/ident"
"github.com/m3db/m3/src/x/pool"
Expand All @@ -43,6 +44,7 @@ type blockRetrieverOptions struct {
fetchConcurrency int
identifierPool ident.Pool
blockLeaseManager block.LeaseManager
queryLimits limits.QueryLimits
}

// NewBlockRetrieverOptions creates a new set of block retriever options
Expand All @@ -65,6 +67,7 @@ func NewBlockRetrieverOptions() BlockRetrieverOptions {
bytesPool: bytesPool,
fetchConcurrency: defaultFetchConcurrency,
identifierPool: ident.NewPool(bytesPool, ident.PoolOptions{}),
queryLimits: limits.NoOpQueryLimits(),
}

return o
Expand Down Expand Up @@ -126,3 +129,13 @@ func (o *blockRetrieverOptions) SetBlockLeaseManager(leaseMgr block.LeaseManager
func (o *blockRetrieverOptions) BlockLeaseManager() block.LeaseManager {
return o.blockLeaseManager
}

func (o *blockRetrieverOptions) SetQueryLimits(value limits.QueryLimits) BlockRetrieverOptions {
opts := *o
opts.queryLimits = value
return &opts
}

func (o *blockRetrieverOptions) QueryLimits() limits.QueryLimits {
return o.queryLimits
}
7 changes: 7 additions & 0 deletions src/dbnode/persist/fs/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/m3db/m3/src/dbnode/sharding"
"github.com/m3db/m3/src/dbnode/storage/block"
"github.com/m3db/m3/src/dbnode/storage/bootstrap/result"
"github.com/m3db/m3/src/dbnode/storage/limits"
"github.com/m3db/m3/src/dbnode/x/xio"
"github.com/m3db/m3/src/m3ninx/doc"
"github.com/m3db/m3/src/m3ninx/index/segment/fst"
Expand Down Expand Up @@ -561,6 +562,12 @@ type BlockRetrieverOptions interface {

// BlockLeaseManager returns the block leaser.
BlockLeaseManager() block.LeaseManager

// SetQueryLimits sets query limits.
SetQueryLimits(value limits.QueryLimits) BlockRetrieverOptions

// QueryLimits returns the query limits.
QueryLimits() limits.QueryLimits
}

// ForEachRemainingFn is the function that is run on each of the remaining
Expand Down
41 changes: 17 additions & 24 deletions src/dbnode/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ import (
"github.com/m3db/m3/src/dbnode/storage/bootstrap/result"
"github.com/m3db/m3/src/dbnode/storage/cluster"
"github.com/m3db/m3/src/dbnode/storage/index"
"github.com/m3db/m3/src/dbnode/storage/limits"
"github.com/m3db/m3/src/dbnode/storage/series"
"github.com/m3db/m3/src/dbnode/storage/stats"
"github.com/m3db/m3/src/dbnode/topology"
"github.com/m3db/m3/src/dbnode/ts"
"github.com/m3db/m3/src/dbnode/ts/writes"
Expand Down Expand Up @@ -138,9 +138,6 @@ type RunOptions struct {
// interrupt and shutdown the server.
InterruptCh <-chan error

// QueryStatsTrackerFn returns a tracker for tracking query stats.
QueryStatsTrackerFn func(instrument.Options, stats.QueryStatsOptions) stats.QueryStatsTracker

// CustomOptions are custom options to apply to the session.
CustomOptions []client.CustomAdminOption

Expand Down Expand Up @@ -410,27 +407,22 @@ func Run(runOpts RunOptions) {
defer stopReporting()

// Setup query stats tracking.
statsOpts := stats.QueryStatsOptions{
Lookback: stats.DefaultLookback,
}
if max := runOpts.Config.Limits.MaxRecentlyQueriedSeriesBlocks; max != nil {
statsOpts = stats.QueryStatsOptions{
MaxDocs: max.Value,
Lookback: max.Lookback,
}
docsLimit := limits.DefaultLookbackLimitOptions()
bytesReadLimit := limits.DefaultLookbackLimitOptions()
if limitConfig := runOpts.Config.Limits.MaxRecentlyQueriedSeriesBlocks; limitConfig != nil {
docsLimit.Limit = limitConfig.Value
docsLimit.Lookback = limitConfig.Lookback
}
if err := statsOpts.Validate(); err != nil {
logger.Fatal("could not construct query stats options from config", zap.Error(err))
if limitConfig := runOpts.Config.Limits.MaxRecentlyQueriedSeriesDiskBytesRead; limitConfig != nil {
bytesReadLimit.Limit = limitConfig.Value
bytesReadLimit.Lookback = limitConfig.Lookback
}

tracker := stats.DefaultQueryStatsTracker(iopts, statsOpts)
if runOpts.QueryStatsTrackerFn != nil {
tracker = runOpts.QueryStatsTrackerFn(iopts, statsOpts)
queryLimits, err := limits.NewQueryLimits(iopts, docsLimit, bytesReadLimit)
if err != nil {
logger.Fatal("could not construct docs query limits from config", zap.Error(err))
}

queryStats := stats.NewQueryStats(tracker)
queryStats.Start()
defer queryStats.Stop()
queryLimits.Start()
defer queryLimits.Stop()

// FOLLOWUP(prateek): remove this once we have the runtime options<->index wiring done
indexOpts := opts.IndexOptions()
Expand All @@ -445,7 +437,7 @@ func Run(runOpts RunOptions) {
CacheTerms: plCacheConfig.CacheTermsOrDefault(),
}).
SetMmapReporter(mmapReporter).
SetQueryStats(queryStats)
SetQueryLimits(queryLimits)
opts = opts.SetIndexOptions(indexOpts)

if tick := cfg.Tick; tick != nil {
Expand Down Expand Up @@ -559,7 +551,8 @@ func Run(runOpts RunOptions) {
SetBytesPool(opts.BytesPool()).
SetRetrieveRequestPool(opts.RetrieveRequestPool()).
SetIdentifierPool(opts.IdentifierPool()).
SetBlockLeaseManager(blockLeaseManager)
SetBlockLeaseManager(blockLeaseManager).
SetQueryLimits(queryLimits)
if blockRetrieveCfg := cfg.BlockRetrieve; blockRetrieveCfg != nil {
retrieverOpts = retrieverOpts.
SetFetchConcurrency(blockRetrieveCfg.FetchConcurrency)
Expand Down
10 changes: 10 additions & 0 deletions src/dbnode/storage/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/m3db/m3/src/dbnode/storage/block"
dberrors "github.com/m3db/m3/src/dbnode/storage/errors"
"github.com/m3db/m3/src/dbnode/storage/index"
"github.com/m3db/m3/src/dbnode/storage/limits"
"github.com/m3db/m3/src/dbnode/tracepoint"
"github.com/m3db/m3/src/dbnode/ts"
"github.com/m3db/m3/src/dbnode/ts/writes"
Expand Down Expand Up @@ -114,6 +115,8 @@ type db struct {
log *zap.Logger

writeBatchPool *writes.WriteBatchPool

queryLimits limits.QueryLimits
}

type databaseMetrics struct {
Expand Down Expand Up @@ -186,6 +189,7 @@ func NewDatabase(
metrics: newDatabaseMetrics(scope),
log: logger,
writeBatchPool: opts.WriteBatchPool(),
queryLimits: opts.IndexOptions().QueryLimits(),
}

databaseIOpts := iopts.SetMetricsScope(scope)
Expand Down Expand Up @@ -826,6 +830,12 @@ func (d *db) QueryIDs(
}
defer sp.Finish()

// Check if exceeding query limits at very beginning of
// query path to abandon as early as possible.
if err := d.queryLimits.AnyExceeded(); err != nil {
return index.QueryResult{}, err
}

n, err := d.namespaceFor(namespace)
if err != nil {
sp.LogFields(opentracinglog.Error(err))
Expand Down
12 changes: 7 additions & 5 deletions src/dbnode/storage/index/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/storage/bootstrap/result"
"github.com/m3db/m3/src/dbnode/storage/stats"
"github.com/m3db/m3/src/dbnode/storage/limits"
"github.com/m3db/m3/src/dbnode/tracepoint"
"github.com/m3db/m3/src/m3ninx/doc"
m3ninxindex "github.com/m3db/m3/src/m3ninx/index"
Expand Down Expand Up @@ -141,7 +141,8 @@ type block struct {
blockOpts BlockOptions
nsMD namespace.Metadata
namespaceRuntimeOptsMgr namespace.RuntimeOptionsManager
queryStats stats.QueryStats
queryLimits limits.QueryLimits
docsLimit limits.LookbackLimit

metrics blockMetrics
logger *zap.Logger
Expand Down Expand Up @@ -249,7 +250,8 @@ func NewBlock(
namespaceRuntimeOptsMgr: namespaceRuntimeOptsMgr,
metrics: newBlockMetrics(scope),
logger: iopts.Logger(),
queryStats: opts.QueryStats(),
queryLimits: opts.QueryLimits(),
docsLimit: opts.QueryLimits().DocsLimit(),
}
b.newFieldsAndTermsIteratorFn = newFieldsAndTermsIterator
b.newExecutorWithRLockFn = b.executorWithRLock
Expand Down Expand Up @@ -533,7 +535,7 @@ func (b *block) addQueryResults(
batch []doc.Document,
) ([]doc.Document, int, int, error) {
// update recently queried docs to monitor memory.
if err := b.queryStats.Update(len(batch)); err != nil {
if err := b.docsLimit.Inc(len(batch)); err != nil {
return batch, 0, 0, err
}

Expand Down Expand Up @@ -794,7 +796,7 @@ func (b *block) addAggregateResults(
batch []AggregateResultsEntry,
) ([]AggregateResultsEntry, int, int, error) {
// update recently queried docs to monitor memory.
if err := b.queryStats.Update(len(batch)); err != nil {
if err := b.docsLimit.Inc(len(batch)); err != nil {
return batch, 0, 0, err
}

Expand Down
28 changes: 14 additions & 14 deletions src/dbnode/storage/index/index_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading