Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Visibility into and ability to limit number of encoders per block #2516

Merged
merged 10 commits into from
Aug 24, 2020
1 change: 1 addition & 0 deletions src/cmd/services/m3dbnode/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,7 @@ func TestConfiguration(t *testing.T) {
maxOutstandingWriteRequests: 0
maxOutstandingReadRequests: 0
maxOutstandingRepairedBytes: 0
maxEncodersPerBlock: 0
tchannel: null
coordinator: null
`
Expand Down
6 changes: 6 additions & 0 deletions src/cmd/services/m3dbnode/config/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ type LimitsConfiguration struct {
// process would pause until some of the repaired bytes had been persisted to disk (and subsequently
// evicted from memory) at which point it would resume.
MaxOutstandingRepairedBytes int64 `yaml:"maxOutstandingRepairedBytes" validate:"min=0"`

// MaxEncodersPerBlock is the maximum number of encoders permitted in a block.
// When there are too many encoders, merging them (during a tick) puts a high
// load on the CPU, which can prevent other DB operations.
// A setting of 0 means there is no maximum.
MaxEncodersPerBlock int `yaml:"maxEncodersPerBlock" validate:"min=0"`
}

// MaxRecentlyQueriedSeriesBlocksConfiguration sets the upper limit on time
Expand Down
118 changes: 118 additions & 0 deletions src/dbnode/integration/encoder_limit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// +build integration

// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package integration

import (
"testing"
"time"

"github.com/m3db/m3/src/dbnode/client"
"github.com/m3db/m3/src/x/ident"
xtime "github.com/m3db/m3/src/x/time"

"github.com/stretchr/testify/require"
)

func TestEncoderLimit(t *testing.T) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice test!

if testing.Short() {
t.SkipNow()
}

// We don't want a tick to happen during this test, since that will
// interfere with testing encoders due to the tick merging them.
testOpts := NewTestOptions(t).SetTickMinimumInterval(time.Minute)
testSetup, err := NewTestSetup(t, testOpts, nil)
require.NoError(t, err)
defer testSetup.Close()

log := testSetup.StorageOpts().InstrumentOptions().Logger()
require.NoError(t, testSetup.StartServer())
log.Info("server is now up")

defer func() {
require.NoError(t, testSetup.StopServer())
log.Info("server is now down")
}()

now := testSetup.NowFn()()

db := testSetup.DB()
mgr := db.Options().RuntimeOptionsManager()
encoderLimit := 5
newRuntimeOpts := mgr.Get().SetEncodersPerBlockLimit(encoderLimit)
mgr.Update(newRuntimeOpts)

session, err := testSetup.M3DBClient().DefaultSession()
require.NoError(t, err)
nsID := testNamespaces[0]
seriesID := ident.StringID("foo")

for i := 0; i < encoderLimit+5; i++ {
err = session.Write(
nsID, seriesID,
// Write backwards so that a new encoder gets created every write.
now.Add(time.Duration(50-i)*time.Second),
123, xtime.Second, nil,
)

if i >= encoderLimit {
require.Error(t, err)
// A rejected write due to hitting the max encoder limit should be
// a bad request so that the client knows to not repeat the write
// request, since that will exacerbate the problem.
require.True(t, client.IsBadRequestError(err))
} else {
require.NoError(t, err)
}
}

for i := 0; i < 10; i++ {
err = session.Write(
nsID, seriesID,
now.Add(time.Duration(51+i)*time.Second),
123, xtime.Second, nil,
)

// Even though we're doing more writes, these can fit into existing
// encoders since they are all ahead of existing writes, so expect
// no errors writing.
require.NoError(t, err)
}

// Now allow an unlimited number of encoders.
encoderLimit = 0
newRuntimeOpts = mgr.Get().SetEncodersPerBlockLimit(encoderLimit)
mgr.Update(newRuntimeOpts)

for i := 0; i < 20; i++ {
err = session.Write(
nsID, seriesID,
now.Add(time.Duration(20-i)*time.Second),
123, xtime.Second, nil,
)

// Now there's no encoder limit, so no error even though each of these
// additional writes create a new encoder.
require.NoError(t, err)
}
}
5 changes: 5 additions & 0 deletions src/dbnode/kvconfig/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ const (
// configuration specifying a hard limit for a cluster new series insertions.
ClusterNewSeriesInsertLimitKey = "m3db.node.cluster-new-series-insert-limit"

// EncodersPerBlockLimitKey is the KV config key for the runtime
// configuration specifying a hard limit on the number of active encoders
// per block.
EncodersPerBlockLimitKey = "m3db.node.encoders-per-block-limit"

// ClientBootstrapConsistencyLevel is the KV config key for the runtime
// configuration specifying the client bootstrap consistency level
ClientBootstrapConsistencyLevel = "m3db.client.bootstrap-consistency-level"
Expand Down
28 changes: 28 additions & 0 deletions src/dbnode/runtime/runtime_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions src/dbnode/runtime/runtime_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type options struct {
writeNewSeriesAsync bool
writeNewSeriesBackoffDuration time.Duration
writeNewSeriesLimitPerShardPerSecond int
encodersPerBlockLimit int
tickSeriesBatchSize int
tickPerSeriesSleepDuration time.Duration
tickMinimumInterval time.Duration
Expand Down Expand Up @@ -160,6 +161,16 @@ func (o *options) WriteNewSeriesLimitPerShardPerSecond() int {
return o.writeNewSeriesLimitPerShardPerSecond
}

func (o *options) SetEncodersPerBlockLimit(value int) Options {
opts := *o
opts.encodersPerBlockLimit = value
return &opts
}

func (o *options) EncodersPerBlockLimit() int {
return o.encodersPerBlockLimit
}

func (o *options) SetTickSeriesBatchSize(value int) Options {
opts := *o
opts.tickSeriesBatchSize = value
Expand Down
14 changes: 14 additions & 0 deletions src/dbnode/runtime/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,20 @@ type Options interface {
// time series being inserted.
WriteNewSeriesLimitPerShardPerSecond() int

// SetEncodersPerBlockLimit sets the maximum number of encoders per block
// allowed. Setting to zero means an unlimited number of encoders are
// permitted. This rate limit is primarily offered to defend against
// bursts of out of order writes, which creates many encoders, subsequently
// causing a large burst in CPU load when trying to merge them.
SetEncodersPerBlockLimit(value int) Options

// EncodersPerBlockLimit sets the maximum number of encoders per block
// allowed. Setting to zero means an unlimited number of encoders are
// permitted. This rate limit is primarily offered to defend against
// bursts of out of order writes, which creates many encoders, subsequently
// causing a large burst in CPU load when trying to merge them.
EncodersPerBlockLimit() int

// SetTickSeriesBatchSize sets the batch size to process series together
// during a tick before yielding and sleeping the per series duration
// multiplied by the batch size.
Expand Down
73 changes: 73 additions & 0 deletions src/dbnode/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -945,6 +945,8 @@ func Run(runOpts RunOptions) {
// Only set the write new series limit after bootstrapping
kvWatchNewSeriesLimitPerShard(syncCfg.KVStore, logger, topo,
runtimeOptsMgr, cfg.WriteNewSeriesLimitPerSecond)
kvWatchEncodersPerBlockLimit(syncCfg.KVStore, logger,
runtimeOptsMgr, cfg.Limits.MaxEncodersPerBlock)
}()

// Wait for process interrupt.
Expand Down Expand Up @@ -1061,6 +1063,62 @@ func kvWatchNewSeriesLimitPerShard(
}()
}

func kvWatchEncodersPerBlockLimit(
store kv.Store,
logger *zap.Logger,
runtimeOptsMgr m3dbruntime.OptionsManager,
defaultEncodersPerBlockLimit int,
) {
var initEncoderLimit int

value, err := store.Get(kvconfig.EncodersPerBlockLimitKey)
if err == nil {
protoValue := &commonpb.Int64Proto{}
err = value.Unmarshal(protoValue)
if err == nil {
initEncoderLimit = int(protoValue.Value)
}
}

if err != nil {
if err != kv.ErrNotFound {
logger.Warn("error resolving encoder per block limit", zap.Error(err))
}
initEncoderLimit = defaultEncodersPerBlockLimit
}

err = setEncodersPerBlockLimitOnChange(runtimeOptsMgr, initEncoderLimit)
if err != nil {
logger.Warn("unable to set encoder per block limit", zap.Error(err))
}

watch, err := store.Watch(kvconfig.EncodersPerBlockLimitKey)
if err != nil {
logger.Error("could not watch encoder per block limit", zap.Error(err))
return
}

go func() {
protoValue := &commonpb.Int64Proto{}
for range watch.C() {
value := defaultEncodersPerBlockLimit
if newValue := watch.Get(); newValue != nil {
if err := newValue.Unmarshal(protoValue); err != nil {
logger.Warn("unable to parse new encoder per block limit", zap.Error(err))
continue
}
value = int(protoValue.Value)
}

err = setEncodersPerBlockLimitOnChange(runtimeOptsMgr, value)
if err != nil {
logger.Warn("unable to set encoder per block limit", zap.Error(err))
continue
}
}
}()
}

func kvWatchClientConsistencyLevels(
store kv.Store,
logger *zap.Logger,
Expand Down Expand Up @@ -1220,6 +1278,21 @@ func clusterLimitToPlacedShardLimit(topo topology.Topology, clusterLimit int) in
return nodeLimit
}

func setEncodersPerBlockLimitOnChange(
runtimeOptsMgr m3dbruntime.OptionsManager,
encoderLimit int,
) error {
runtimeOpts := runtimeOptsMgr.Get()
if runtimeOpts.EncodersPerBlockLimit() == encoderLimit {
// Not changed, no need to set the value and trigger a runtime options update
return nil
}

newRuntimeOpts := runtimeOpts.
SetEncodersPerBlockLimit(encoderLimit)
return runtimeOptsMgr.Update(newRuntimeOpts)
}

// this function will block for at most waitTimeout to try to get an initial value
// before we kick off the bootstrap
func kvWatchBootstrappers(
Expand Down
3 changes: 2 additions & 1 deletion src/dbnode/storage/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ func NewSeriesOptionsFromOptions(opts Options, ropts retention.Options) series.O
SetMultiReaderIteratorPool(opts.MultiReaderIteratorPool()).
SetIdentifierPool(opts.IdentifierPool()).
SetBufferBucketPool(opts.BufferBucketPool()).
SetBufferBucketVersionsPool(opts.BufferBucketVersionsPool())
SetBufferBucketVersionsPool(opts.BufferBucketVersionsPool()).
SetRuntimeOptionsManager(opts.RuntimeOptionsManager())
}

type options struct {
Expand Down
22 changes: 21 additions & 1 deletion src/dbnode/storage/series/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ const (
var (
timeZero time.Time
errIncompleteMerge = errors.New("bucket merge did not result in only one encoder")
errTooManyEncoders = xerrors.NewInvalidParamsError(errors.New("too many encoders per block"))
logger, _ = zap.NewProduction()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why create this here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch... not sure where that came from.

)

const (
Expand Down Expand Up @@ -506,6 +508,8 @@ func (b *dbBuffer) Tick(blockStates ShardBlockStateSnapshot, nsCtx namespace.Con
}
}

buckets.recordActiveEncoders()

// Once we've evicted all eligible buckets, we merge duplicate encoders
// in the remaining ones to try and reclaim memory.
merges, err := buckets.merge(WarmWrite, nsCtx)
Expand Down Expand Up @@ -1156,6 +1160,16 @@ func (b *BufferBucketVersions) mergeToStreams(ctx context.Context, opts streamsO
return res, nil
}

func (b *BufferBucketVersions) recordActiveEncoders() {
var numActiveEncoders int
for _, bucket := range b.buckets {
if bucket.version == writableBucketVersion {
numActiveEncoders += len(bucket.encoders)
}
}
b.opts.Stats().RecordEncodersPerBlock(numActiveEncoders)
}

type streamsOptions struct {
filterWriteType bool
writeType WriteType
Expand Down Expand Up @@ -1266,7 +1280,13 @@ func (b *BufferBucket) write(
return err == nil, err
}

// Need a new encoder, we didn't find an encoder to write to
// Need a new encoder, we didn't find an encoder to write to.
maxEncoders := b.opts.RuntimeOptionsManager().Get().EncodersPerBlockLimit()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be good to add tests

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if maxEncoders != 0 && len(b.encoders) >= int(maxEncoders) {
b.opts.Stats().IncEncoderLimitWriteRejected()
return false, errTooManyEncoders
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one addition: a metric indicating a write was rejected due to this reason (can be done at this level) or you can make it ns/db level based on a check of an exported error, your call.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, that's what IncEncoderLimitWriteRejected() right above this line does. Or am I missing something?

}

b.opts.Stats().IncCreatedEncoders()
bopts := b.opts.DatabaseBlockOptions()
blockSize := b.opts.RetentionOptions().BlockSize()
Expand Down
Loading