Skip to content

Commit

Permalink
[dbnode] Add ability to configure writes to initializing shards (#2615)
Browse files Browse the repository at this point in the history
  • Loading branch information
robskillington authored Sep 15, 2020
1 parent f83f576 commit 2ae9792
Show file tree
Hide file tree
Showing 9 changed files with 195 additions and 51 deletions.
1 change: 1 addition & 0 deletions src/cmd/services/m3dbnode/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ func TestConfiguration(t *testing.T) {
writeTimestampOffset: null
fetchSeriesBlocksBatchConcurrency: null
fetchSeriesBlocksBatchSize: null
writeShardsInitializing: null
gcPercentage: 100
writeNewSeriesLimitPerSecond: 1048576
writeNewSeriesBackoffDuration: 2ms
Expand Down
56 changes: 56 additions & 0 deletions src/dbnode/client/client_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions src/dbnode/client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ type Configuration struct {
// FetchSeriesBlocksBatchSize sets the number of blocks to retrieve in a single batch
// from the remote peer. Defaults to 4096.
FetchSeriesBlocksBatchSize *int `yaml:"fetchSeriesBlocksBatchSize"`

// WriteShardsInitializing sets whether or not to write to nodes that are initializing.
WriteShardsInitializing *bool `yaml:"writeShardsInitializing"`
}

// ProtoConfiguration is the configuration for running with ProtoDataMode enabled.
Expand Down Expand Up @@ -419,6 +422,10 @@ func (c Configuration) NewAdminClient(
v = v.SetSchemaRegistry(schemaRegistry)
}

if c.WriteShardsInitializing != nil {
v = v.SetWriteShardsInitializing(*c.WriteShardsInitializing)
}

// Cast to admin options to apply admin config options.
opts := v.(AdminOptions)

Expand Down
15 changes: 15 additions & 0 deletions src/dbnode/client/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ const (
// defaultTruncateRequestTimeout is the default truncate request timeout
defaultTruncateRequestTimeout = 60 * time.Second

// defaultWriteShardsInitializing is the default write to shards intializing value
defaultWriteShardsInitializing = true

// defaultIdentifierPoolSize is the default identifier pool size
defaultIdentifierPoolSize = 8192

Expand Down Expand Up @@ -246,6 +249,7 @@ type options struct {
writeRetrier xretry.Retrier
fetchRetrier xretry.Retrier
streamBlocksRetrier xretry.Retrier
writeShardsInitializing bool
newConnectionFn NewConnectionFn
readerIteratorAllocate encoding.ReaderIteratorAllocate
writeOperationPoolSize int
Expand Down Expand Up @@ -361,6 +365,7 @@ func newOptions() *options {
backgroundHealthCheckFailThrottleFactor: defaultBackgroundHealthCheckFailThrottleFactor,
writeRetrier: defaultWriteRetrier,
fetchRetrier: defaultFetchRetrier,
writeShardsInitializing: defaultWriteShardsInitializing,
tagEncoderPoolSize: defaultTagEncoderPoolSize,
tagEncoderOpts: serialize.NewTagEncoderOptions(),
tagDecoderPoolSize: defaultTagDecoderPoolSize,
Expand Down Expand Up @@ -699,6 +704,16 @@ func (o *options) FetchRetrier() xretry.Retrier {
return o.fetchRetrier
}

func (o *options) SetWriteShardsInitializing(value bool) Options {
opts := *o
opts.writeShardsInitializing = value
return &opts
}

func (o *options) WriteShardsInitializing() bool {
return o.writeShardsInitializing
}

func (o *options) SetTagEncoderOptions(value serialize.TagEncoderOptions) Options {
opts := *o
opts.tagEncoderOpts = value
Expand Down
38 changes: 30 additions & 8 deletions src/dbnode/client/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ type session struct {
streamBlocksBatchSize int
streamBlocksMetadataBatchTimeout time.Duration
streamBlocksBatchTimeout time.Duration
writeShardsInitializing bool
metrics sessionMetrics
}

Expand Down Expand Up @@ -288,7 +289,8 @@ func newSession(opts Options) (clientSession, error) {
context: opts.ContextPool(),
id: opts.IdentifierPool(),
},
metrics: newSessionMetrics(scope),
writeShardsInitializing: opts.WriteShardsInitializing(),
metrics: newSessionMetrics(scope),
}
s.reattemptStreamBlocksFromPeersFn = s.streamBlocksReattemptFromPeers
s.pickBestPeerFn = s.streamBlocksPickBestPeer
Expand Down Expand Up @@ -731,9 +733,9 @@ func (s *session) hostQueues(
}
// Be optimistic
clusterAvailable := true
for _, shard := range shards {
for _, shardID := range shards {
shardReplicasAvailable := 0
routeErr := topoMap.RouteShardForEach(shard, func(idx int, _ topology.Host) {
routeErr := topoMap.RouteShardForEach(shardID, func(idx int, _ shard.Shard, _ topology.Host) {
if queues[idx].ConnectionCount() >= minConnectionCount {
shardReplicasAvailable++
}
Expand Down Expand Up @@ -1142,7 +1144,19 @@ func (s *session) writeAttemptWithRLock(
state.nsID, state.tsID, state.tagEncoder = nsID, tsID, tagEncoder
op.SetCompletionFn(state.completionFn)

if err := s.state.topoMap.RouteForEach(tsID, func(idx int, host topology.Host) {
if err := s.state.topoMap.RouteForEach(tsID, func(
idx int,
hostShard shard.Shard,
host topology.Host,
) {
if !s.writeShardsInitializing && hostShard.State() == shard.Initializing {
// NB(r): Do not write to this node as the shard is initializing
// and writing to intialized shards is not enabled (also
// depending on your config initializing shards won't count
// towards quorum, current defaults, so this is ok consistency wise).
return
}

// Count pending write requests before we enqueue the completion fns,
// which rely on the count when executing
state.pending++
Expand Down Expand Up @@ -1706,7 +1720,11 @@ func (s *session) fetchIDsAttempt(
}
}

if err := s.state.topoMap.RouteForEach(tsID, func(hostIdx int, host topology.Host) {
if err := s.state.topoMap.RouteForEach(tsID, func(
hostIdx int,
hostShard shard.Shard,
host topology.Host,
) {
// Inc safely as this for each is sequential
enqueued++
pending++
Expand Down Expand Up @@ -1943,17 +1961,21 @@ func (p peers) selfExcludedAndSelfHasShardAvailable() bool {
return state == shard.Available
}

func (s *session) peersForShard(shard uint32) (peers, error) {
func (s *session) peersForShard(shardID uint32) (peers, error) {
s.state.RLock()
var (
lookupErr error
result = peers{
peers: make([]peer, 0, s.state.topoMap.Replicas()),
shard: shard,
shard: shardID,
majorityReplicas: s.state.topoMap.MajorityReplicas(),
}
)
err := s.state.topoMap.RouteShardForEach(shard, func(idx int, host topology.Host) {
err := s.state.topoMap.RouteShardForEach(shardID, func(
idx int,
_ shard.Shard,
host topology.Host,
) {
if s.origin != nil && s.origin.ID() == host.ID() {
// Don't include the origin host
result.selfExcluded = true
Expand Down
8 changes: 8 additions & 0 deletions src/dbnode/client/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,14 @@ type Options interface {
// a fetch operation. Only retryable errors are retried.
FetchRetrier() xretry.Retrier

// SetWriteShardsInitializing sets whether to write to shards that are
// initializing or not.
SetWriteShardsInitializing(value bool) Options

// WriteShardsInitializing returns whether to write to shards that are
// initializing or not.
WriteShardsInitializing() bool

// SetTagEncoderOptions sets the TagEncoderOptions.
SetTagEncoderOptions(value serialize.TagEncoderOptions) Options

Expand Down
64 changes: 35 additions & 29 deletions src/dbnode/topology/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,56 +21,62 @@
package topology

import (
"github.com/m3db/m3/src/cluster/shard"
"github.com/m3db/m3/src/dbnode/sharding"
"github.com/m3db/m3/src/x/ident"
xwatch "github.com/m3db/m3/src/x/watch"
)

type staticMap struct {
shardSet sharding.ShardSet
hostShardSets []HostShardSet
hostShardSetsByID map[string]HostShardSet
orderedHosts []Host
hostsByShard [][]Host
orderedHostsByShard [][]orderedHost
replicas int
majority int
shardSet sharding.ShardSet
hostShardSets []HostShardSet
hostShardSetsByID map[string]HostShardSet
orderedHosts []Host
hostsByShard [][]Host
orderedShardHostsByShard [][]orderedShardHost
replicas int
majority int
}

// NewStaticMap creates a new static topology map
func NewStaticMap(opts StaticOptions) Map {
totalShards := len(opts.ShardSet().AllIDs())
hostShardSets := opts.HostShardSets()
topoMap := staticMap{
shardSet: opts.ShardSet(),
hostShardSets: hostShardSets,
hostShardSetsByID: make(map[string]HostShardSet),
orderedHosts: make([]Host, 0, len(hostShardSets)),
hostsByShard: make([][]Host, totalShards),
orderedHostsByShard: make([][]orderedHost, totalShards),
replicas: opts.Replicas(),
majority: Majority(opts.Replicas()),
shardSet: opts.ShardSet(),
hostShardSets: hostShardSets,
hostShardSetsByID: make(map[string]HostShardSet),
orderedHosts: make([]Host, 0, len(hostShardSets)),
hostsByShard: make([][]Host, totalShards),
orderedShardHostsByShard: make([][]orderedShardHost, totalShards),
replicas: opts.Replicas(),
majority: Majority(opts.Replicas()),
}

for idx, hostShardSet := range hostShardSets {
host := hostShardSet.Host()
topoMap.hostShardSetsByID[host.ID()] = hostShardSet
topoMap.orderedHosts = append(topoMap.orderedHosts, host)
for _, shard := range hostShardSet.ShardSet().AllIDs() {
topoMap.hostsByShard[shard] = append(topoMap.hostsByShard[shard], host)
topoMap.orderedHostsByShard[shard] = append(topoMap.orderedHostsByShard[shard], orderedHost{
idx: idx,
host: host,
})
for _, shard := range hostShardSet.ShardSet().All() {
id := shard.ID()
topoMap.hostsByShard[id] = append(topoMap.hostsByShard[id], host)
elem := orderedShardHost{
idx: idx,
shard: shard,
host: host,
}
topoMap.orderedShardHostsByShard[id] =
append(topoMap.orderedShardHostsByShard[id], elem)
}
}

return &topoMap
}

type orderedHost struct {
idx int
host Host
type orderedShardHost struct {
idx int
shard shard.Shard
host Host
}

func (t *staticMap) Hosts() []Host {
Expand Down Expand Up @@ -114,12 +120,12 @@ func (t *staticMap) RouteShard(shard uint32) ([]Host, error) {
}

func (t *staticMap) RouteShardForEach(shard uint32, forEachFn RouteForEachFn) error {
if int(shard) >= len(t.orderedHostsByShard) {
if int(shard) >= len(t.orderedShardHostsByShard) {
return errUnownedShard
}
orderedHosts := t.orderedHostsByShard[shard]
for i := range orderedHosts {
forEachFn(orderedHosts[i].idx, orderedHosts[i].host)
orderedShardHosts := t.orderedShardHostsByShard[shard]
for _, elem := range orderedShardHosts {
forEachFn(elem.idx, elem.shard, elem.host)
}
return nil
}
Expand Down
Loading

0 comments on commit 2ae9792

Please sign in to comment.