Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[coordinator] Configurable writes to leaving shards count towards consistency, add read level unstrict all #2687

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/cmd/services/m3dbnode/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ func TestConfiguration(t *testing.T) {
fetchSeriesBlocksBatchConcurrency: null
fetchSeriesBlocksBatchSize: null
writeShardsInitializing: null
shardsLeavingCountTowardsConsistency: null
gcPercentage: 100
writeNewSeriesLimitPerSecond: 1048576
writeNewSeriesBackoffDuration: 2ms
Expand Down
56 changes: 56 additions & 0 deletions src/dbnode/client/client_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions src/dbnode/client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ type Configuration struct {

// WriteShardsInitializing sets whether or not to write to nodes that are initializing.
WriteShardsInitializing *bool `yaml:"writeShardsInitializing"`

// ShardsLeavingCountTowardsConsistency sets whether or not to write to leaving shards
robskillington marked this conversation as resolved.
Show resolved Hide resolved
robskillington marked this conversation as resolved.
Show resolved Hide resolved
// count towards consistency, by default they do not.
ShardsLeavingCountTowardsConsistency *bool `yaml:"shardsLeavingCountTowardsConsistency"`
}

// ProtoConfiguration is the configuration for running with ProtoDataMode enabled.
Expand Down Expand Up @@ -425,6 +429,9 @@ func (c Configuration) NewAdminClient(
if c.WriteShardsInitializing != nil {
v = v.SetWriteShardsInitializing(*c.WriteShardsInitializing)
}
if c.ShardsLeavingCountTowardsConsistency != nil {
v = v.SetShardsLeavingCountTowardsConsistency(*c.ShardsLeavingCountTowardsConsistency)
}

// Cast to admin options to apply admin config options.
opts := v.(AdminOptions)
Expand Down
15 changes: 15 additions & 0 deletions src/dbnode/client/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ const (
// defaultWriteShardsInitializing is the default write to shards intializing value
defaultWriteShardsInitializing = true

// defaultShardsLeavingCountTowardsConsistency is the default shards leaving count towards consistency
defaultShardsLeavingCountTowardsConsistency = false

// defaultIdentifierPoolSize is the default identifier pool size
defaultIdentifierPoolSize = 8192

Expand Down Expand Up @@ -253,6 +256,7 @@ type options struct {
fetchRetrier xretry.Retrier
streamBlocksRetrier xretry.Retrier
writeShardsInitializing bool
shardsLeavingCountTowardsConsistency bool
newConnectionFn NewConnectionFn
readerIteratorAllocate encoding.ReaderIteratorAllocate
writeOperationPoolSize int
Expand Down Expand Up @@ -370,6 +374,7 @@ func newOptions() *options {
writeRetrier: defaultWriteRetrier,
fetchRetrier: defaultFetchRetrier,
writeShardsInitializing: defaultWriteShardsInitializing,
shardsLeavingCountTowardsConsistency: defaultShardsLeavingCountTowardsConsistency,
tagEncoderPoolSize: defaultTagEncoderPoolSize,
tagEncoderOpts: serialize.NewTagEncoderOptions(),
tagDecoderPoolSize: defaultTagDecoderPoolSize,
Expand Down Expand Up @@ -719,6 +724,16 @@ func (o *options) WriteShardsInitializing() bool {
return o.writeShardsInitializing
}

func (o *options) SetShardsLeavingCountTowardsConsistency(value bool) Options {
opts := *o
opts.shardsLeavingCountTowardsConsistency = value
return &opts
}

func (o *options) ShardsLeavingCountTowardsConsistency() bool {
return o.shardsLeavingCountTowardsConsistency
}

func (o *options) SetTagEncoderOptions(value serialize.TagEncoderOptions) Options {
opts := *o
opts.tagEncoderOpts = value
Expand Down
56 changes: 29 additions & 27 deletions src/dbnode/client/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,31 +136,32 @@ type sessionState struct {
}

type session struct {
state sessionState
opts Options
runtimeOptsListenerCloser xclose.Closer
scope tally.Scope
nowFn clock.NowFn
log *zap.Logger
logWriteErrorSampler *sampler.Sampler
logFetchErrorSampler *sampler.Sampler
newHostQueueFn newHostQueueFn
writeRetrier xretry.Retrier
fetchRetrier xretry.Retrier
streamBlocksRetrier xretry.Retrier
pools sessionPools
fetchBatchSize int
newPeerBlocksQueueFn newPeerBlocksQueueFn
reattemptStreamBlocksFromPeersFn reattemptStreamBlocksFromPeersFn
pickBestPeerFn pickBestPeerFn
origin topology.Host
streamBlocksMaxBlockRetries int
streamBlocksWorkers xsync.WorkerPool
streamBlocksBatchSize int
streamBlocksMetadataBatchTimeout time.Duration
streamBlocksBatchTimeout time.Duration
writeShardsInitializing bool
metrics sessionMetrics
state sessionState
opts Options
runtimeOptsListenerCloser xclose.Closer
scope tally.Scope
nowFn clock.NowFn
log *zap.Logger
logWriteErrorSampler *sampler.Sampler
logFetchErrorSampler *sampler.Sampler
newHostQueueFn newHostQueueFn
writeRetrier xretry.Retrier
fetchRetrier xretry.Retrier
streamBlocksRetrier xretry.Retrier
pools sessionPools
fetchBatchSize int
newPeerBlocksQueueFn newPeerBlocksQueueFn
reattemptStreamBlocksFromPeersFn reattemptStreamBlocksFromPeersFn
pickBestPeerFn pickBestPeerFn
origin topology.Host
streamBlocksMaxBlockRetries int
streamBlocksWorkers xsync.WorkerPool
streamBlocksBatchSize int
streamBlocksMetadataBatchTimeout time.Duration
streamBlocksBatchTimeout time.Duration
writeShardsInitializing bool
shardsLeavingCountTowardsConsistency bool
metrics sessionMetrics
}

type shardMetricsKey struct {
Expand Down Expand Up @@ -289,8 +290,9 @@ func newSession(opts Options) (clientSession, error) {
context: opts.ContextPool(),
id: opts.IdentifierPool(),
},
writeShardsInitializing: opts.WriteShardsInitializing(),
metrics: newSessionMetrics(scope),
writeShardsInitializing: opts.WriteShardsInitializing(),
shardsLeavingCountTowardsConsistency: opts.ShardsLeavingCountTowardsConsistency(),
metrics: newSessionMetrics(scope),
}
s.reattemptStreamBlocksFromPeersFn = s.streamBlocksReattemptFromPeers
s.pickBestPeerFn = s.streamBlocksPickBestPeer
Expand Down
8 changes: 8 additions & 0 deletions src/dbnode/client/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,14 @@ type Options interface {
// initializing or not.
WriteShardsInitializing() bool

// SetShardsLeavingCountTowardsConsistency sets whether to count shards
// that are leaving or not towards consistency level calculations.
SetShardsLeavingCountTowardsConsistency(value bool) Options

// ShardsLeavingCountTowardsConsistency returns whether to count shards
// that are leaving or not towards consistency level calculations.
robskillington marked this conversation as resolved.
Show resolved Hide resolved
ShardsLeavingCountTowardsConsistency() bool

// SetTagEncoderOptions sets the TagEncoderOptions.
SetTagEncoderOptions(value serialize.TagEncoderOptions) Options

Expand Down
54 changes: 32 additions & 22 deletions src/dbnode/client/write_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,16 @@ type writeState struct {
sync.Mutex
refCounter

consistencyLevel topology.ConsistencyLevel
topoMap topology.Map
op writeOp
nsID ident.ID
tsID ident.ID
tagEncoder serialize.TagEncoder
majority, pending int32
success int32
errors []error
consistencyLevel topology.ConsistencyLevel
shardsLeavingCountTowardsConsistency bool
topoMap topology.Map
op writeOp
nsID ident.ID
tsID ident.ID
tagEncoder serialize.TagEncoder
majority, pending int32
success int32
errors []error

queues []hostQueue
tagEncoderPool serialize.TagEncoderPool
Expand Down Expand Up @@ -128,20 +129,29 @@ func (w *writeState) completionFn(result interface{}, err error) {
} else if shardState, err := hostShardSet.ShardSet().LookupStateByID(w.op.ShardID()); err != nil {
errStr := "missing shard %d in host %s"
wErr = xerrors.NewRetryableError(fmt.Errorf(errStr, w.op.ShardID(), hostID))
} else if shardState != shard.Available {
// NB(bl): only count writes to available shards towards success
var errStr string
switch shardState {
case shard.Initializing:
errStr = "shard %d in host %s is not available (initializing)"
case shard.Leaving:
errStr = "shard %d in host %s not available (leaving)"
default:
errStr = "shard %d in host %s not available (unknown state)"
}
wErr = xerrors.NewRetryableError(fmt.Errorf(errStr, w.op.ShardID(), hostID))
} else {
w.success++
available := shardState == shard.Available
leaving := shardState == shard.Leaving
leavingAndShardsLeavingCountTowardsConsistency := leaving &&
w.shardsLeavingCountTowardsConsistency
// NB(bl): Only count writes to available shards towards success.
// NB(r): If shard is leaving and configured to allow writes to leaving
// shards to count towards consistency then allow that to count
// to success.
if !available && !leavingAndShardsLeavingCountTowardsConsistency {
var errStr string
switch shardState {
case shard.Initializing:
errStr = "shard %d in host %s is not available (initializing)"
case shard.Leaving:
errStr = "shard %d in host %s not available (leaving)"
default:
errStr = "shard %d in host %s not available (unknown state)"
}
wErr = xerrors.NewRetryableError(fmt.Errorf(errStr, w.op.ShardID(), hostID))
} else {
w.success++
}
}

if wErr != nil {
Expand Down
15 changes: 12 additions & 3 deletions src/dbnode/topology/consistency_level.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,11 @@ const (
// ReadConsistencyLevelMajority corresponds to reading from the majority of nodes
ReadConsistencyLevelMajority

// ReadConsistencyLevelUnstrictAll corresponds to reading from all nodes
// but relaxing the constraint when it cannot be met, falling back to returning success when
// reading from at least a single node after attempting reading from all of nodes
ReadConsistencyLevelUnstrictAll

// ReadConsistencyLevelAll corresponds to reading from all of the nodes
ReadConsistencyLevelAll
)
Expand All @@ -237,6 +242,8 @@ func (l ReadConsistencyLevel) String() string {
return unstrictMajority
case ReadConsistencyLevelMajority:
return majority
case ReadConsistencyLevelUnstrictAll:
return unstrictAll
case ReadConsistencyLevelAll:
return all
}
Expand All @@ -248,6 +255,7 @@ var validReadConsistencyLevels = []ReadConsistencyLevel{
ReadConsistencyLevelOne,
ReadConsistencyLevelUnstrictMajority,
ReadConsistencyLevelMajority,
ReadConsistencyLevelUnstrictAll,
ReadConsistencyLevelAll,
}

Expand Down Expand Up @@ -302,6 +310,7 @@ const (
unknown = "unknown"
any = "any"
all = "all"
unstrictAll = "unstrict_all"
one = "one"
none = "none"
majority = "majority"
Expand Down Expand Up @@ -348,7 +357,7 @@ func ReadConsistencyTermination(
return success > 0 || doneAll
case ReadConsistencyLevelMajority, ReadConsistencyLevelUnstrictMajority:
return success >= majority || doneAll
case ReadConsistencyLevelAll:
case ReadConsistencyLevelAll, ReadConsistencyLevelUnstrictAll:
return doneAll
}
panic(fmt.Errorf("unrecognized consistency level: %s", level.String()))
Expand All @@ -366,7 +375,7 @@ func ReadConsistencyAchieved(
return numSuccess == numPeers // Meets all
case ReadConsistencyLevelMajority:
return numSuccess >= majority // Meets majority
case ReadConsistencyLevelOne, ReadConsistencyLevelUnstrictMajority:
case ReadConsistencyLevelOne, ReadConsistencyLevelUnstrictMajority, ReadConsistencyLevelUnstrictAll:
return numSuccess > 0 // Meets one
case ReadConsistencyLevelNone:
return true // Always meets none
Expand All @@ -378,7 +387,7 @@ func ReadConsistencyAchieved(
// satisfy the read consistency.
func NumDesiredForReadConsistency(level ReadConsistencyLevel, numReplicas, majority int) int {
switch level {
case ReadConsistencyLevelAll:
case ReadConsistencyLevelAll, ReadConsistencyLevelUnstrictAll:
return numReplicas
case ReadConsistencyLevelMajority, ReadConsistencyLevelUnstrictMajority:
return majority
Expand Down