diff --git a/src/cmd/services/m3dbnode/config/config_test.go b/src/cmd/services/m3dbnode/config/config_test.go index c1c2b0a235..6db0bc5980 100644 --- a/src/cmd/services/m3dbnode/config/config_test.go +++ b/src/cmd/services/m3dbnode/config/config_test.go @@ -410,6 +410,7 @@ func TestConfiguration(t *testing.T) { writeTimestampOffset: null fetchSeriesBlocksBatchConcurrency: null fetchSeriesBlocksBatchSize: null + writeShardsInitializing: null gcPercentage: 100 writeNewSeriesLimitPerSecond: 1048576 writeNewSeriesBackoffDuration: 2ms diff --git a/src/dbnode/client/client_mock.go b/src/dbnode/client/client_mock.go index 9da52ea71a..3a9842dceb 100644 --- a/src/dbnode/client/client_mock.go +++ b/src/dbnode/client/client_mock.go @@ -1776,6 +1776,34 @@ func (mr *MockOptionsMockRecorder) FetchRetrier() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchRetrier", reflect.TypeOf((*MockOptions)(nil).FetchRetrier)) } +// SetWriteShardsInitializing mocks base method +func (m *MockOptions) SetWriteShardsInitializing(value bool) Options { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetWriteShardsInitializing", value) + ret0, _ := ret[0].(Options) + return ret0 +} + +// SetWriteShardsInitializing indicates an expected call of SetWriteShardsInitializing +func (mr *MockOptionsMockRecorder) SetWriteShardsInitializing(value interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetWriteShardsInitializing", reflect.TypeOf((*MockOptions)(nil).SetWriteShardsInitializing), value) +} + +// WriteShardsInitializing mocks base method +func (m *MockOptions) WriteShardsInitializing() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WriteShardsInitializing") + ret0, _ := ret[0].(bool) + return ret0 +} + +// WriteShardsInitializing indicates an expected call of WriteShardsInitializing +func (mr *MockOptionsMockRecorder) WriteShardsInitializing() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteShardsInitializing", reflect.TypeOf((*MockOptions)(nil).WriteShardsInitializing)) +} + // SetTagEncoderOptions mocks base method func (m *MockOptions) SetTagEncoderOptions(value serialize.TagEncoderOptions) Options { m.ctrl.T.Helper() @@ -3255,6 +3283,34 @@ func (mr *MockAdminOptionsMockRecorder) FetchRetrier() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchRetrier", reflect.TypeOf((*MockAdminOptions)(nil).FetchRetrier)) } +// SetWriteShardsInitializing mocks base method +func (m *MockAdminOptions) SetWriteShardsInitializing(value bool) Options { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetWriteShardsInitializing", value) + ret0, _ := ret[0].(Options) + return ret0 +} + +// SetWriteShardsInitializing indicates an expected call of SetWriteShardsInitializing +func (mr *MockAdminOptionsMockRecorder) SetWriteShardsInitializing(value interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetWriteShardsInitializing", reflect.TypeOf((*MockAdminOptions)(nil).SetWriteShardsInitializing), value) +} + +// WriteShardsInitializing mocks base method +func (m *MockAdminOptions) WriteShardsInitializing() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WriteShardsInitializing") + ret0, _ := ret[0].(bool) + return ret0 +} + +// WriteShardsInitializing indicates an expected call of WriteShardsInitializing +func (mr *MockAdminOptionsMockRecorder) WriteShardsInitializing() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteShardsInitializing", reflect.TypeOf((*MockAdminOptions)(nil).WriteShardsInitializing)) +} + // SetTagEncoderOptions mocks base method func (m *MockAdminOptions) SetTagEncoderOptions(value serialize.TagEncoderOptions) Options { m.ctrl.T.Helper() diff --git a/src/dbnode/client/config.go b/src/dbnode/client/config.go index cea4240e87..7108970d44 100644 --- a/src/dbnode/client/config.go +++ b/src/dbnode/client/config.go @@ -114,6 +114,9 @@ type Configuration struct { // FetchSeriesBlocksBatchSize sets the number of blocks to retrieve in a single batch // from the remote peer. Defaults to 4096. FetchSeriesBlocksBatchSize *int `yaml:"fetchSeriesBlocksBatchSize"` + + // WriteShardsInitializing sets whether or not to write to nodes that are initializing. + WriteShardsInitializing *bool `yaml:"writeShardsInitializing"` } // ProtoConfiguration is the configuration for running with ProtoDataMode enabled. @@ -419,6 +422,10 @@ func (c Configuration) NewAdminClient( v = v.SetSchemaRegistry(schemaRegistry) } + if c.WriteShardsInitializing != nil { + v = v.SetWriteShardsInitializing(*c.WriteShardsInitializing) + } + // Cast to admin options to apply admin config options. opts := v.(AdminOptions) diff --git a/src/dbnode/client/options.go b/src/dbnode/client/options.go index 5ddfe73a35..6463d4087e 100644 --- a/src/dbnode/client/options.go +++ b/src/dbnode/client/options.go @@ -89,6 +89,9 @@ const ( // defaultTruncateRequestTimeout is the default truncate request timeout defaultTruncateRequestTimeout = 60 * time.Second + // defaultWriteShardsInitializing is the default write to shards intializing value + defaultWriteShardsInitializing = true + // defaultIdentifierPoolSize is the default identifier pool size defaultIdentifierPoolSize = 8192 @@ -246,6 +249,7 @@ type options struct { writeRetrier xretry.Retrier fetchRetrier xretry.Retrier streamBlocksRetrier xretry.Retrier + writeShardsInitializing bool newConnectionFn NewConnectionFn readerIteratorAllocate encoding.ReaderIteratorAllocate writeOperationPoolSize int @@ -361,6 +365,7 @@ func newOptions() *options { backgroundHealthCheckFailThrottleFactor: defaultBackgroundHealthCheckFailThrottleFactor, writeRetrier: defaultWriteRetrier, fetchRetrier: defaultFetchRetrier, + writeShardsInitializing: defaultWriteShardsInitializing, tagEncoderPoolSize: defaultTagEncoderPoolSize, tagEncoderOpts: serialize.NewTagEncoderOptions(), tagDecoderPoolSize: defaultTagDecoderPoolSize, @@ -699,6 +704,16 @@ func (o *options) FetchRetrier() xretry.Retrier { return o.fetchRetrier } +func (o *options) SetWriteShardsInitializing(value bool) Options { + opts := *o + opts.writeShardsInitializing = value + return &opts +} + +func (o *options) WriteShardsInitializing() bool { + return o.writeShardsInitializing +} + func (o *options) SetTagEncoderOptions(value serialize.TagEncoderOptions) Options { opts := *o opts.tagEncoderOpts = value diff --git a/src/dbnode/client/session.go b/src/dbnode/client/session.go index c85fe493cf..c53661c728 100644 --- a/src/dbnode/client/session.go +++ b/src/dbnode/client/session.go @@ -159,6 +159,7 @@ type session struct { streamBlocksBatchSize int streamBlocksMetadataBatchTimeout time.Duration streamBlocksBatchTimeout time.Duration + writeShardsInitializing bool metrics sessionMetrics } @@ -288,7 +289,8 @@ func newSession(opts Options) (clientSession, error) { context: opts.ContextPool(), id: opts.IdentifierPool(), }, - metrics: newSessionMetrics(scope), + writeShardsInitializing: opts.WriteShardsInitializing(), + metrics: newSessionMetrics(scope), } s.reattemptStreamBlocksFromPeersFn = s.streamBlocksReattemptFromPeers s.pickBestPeerFn = s.streamBlocksPickBestPeer @@ -731,9 +733,9 @@ func (s *session) hostQueues( } // Be optimistic clusterAvailable := true - for _, shard := range shards { + for _, shardID := range shards { shardReplicasAvailable := 0 - routeErr := topoMap.RouteShardForEach(shard, func(idx int, _ topology.Host) { + routeErr := topoMap.RouteShardForEach(shardID, func(idx int, _ shard.Shard, _ topology.Host) { if queues[idx].ConnectionCount() >= minConnectionCount { shardReplicasAvailable++ } @@ -1142,7 +1144,19 @@ func (s *session) writeAttemptWithRLock( state.nsID, state.tsID, state.tagEncoder = nsID, tsID, tagEncoder op.SetCompletionFn(state.completionFn) - if err := s.state.topoMap.RouteForEach(tsID, func(idx int, host topology.Host) { + if err := s.state.topoMap.RouteForEach(tsID, func( + idx int, + hostShard shard.Shard, + host topology.Host, + ) { + if !s.writeShardsInitializing && hostShard.State() == shard.Initializing { + // NB(r): Do not write to this node as the shard is initializing + // and writing to intialized shards is not enabled (also + // depending on your config initializing shards won't count + // towards quorum, current defaults, so this is ok consistency wise). + return + } + // Count pending write requests before we enqueue the completion fns, // which rely on the count when executing state.pending++ @@ -1706,7 +1720,11 @@ func (s *session) fetchIDsAttempt( } } - if err := s.state.topoMap.RouteForEach(tsID, func(hostIdx int, host topology.Host) { + if err := s.state.topoMap.RouteForEach(tsID, func( + hostIdx int, + hostShard shard.Shard, + host topology.Host, + ) { // Inc safely as this for each is sequential enqueued++ pending++ @@ -1943,17 +1961,21 @@ func (p peers) selfExcludedAndSelfHasShardAvailable() bool { return state == shard.Available } -func (s *session) peersForShard(shard uint32) (peers, error) { +func (s *session) peersForShard(shardID uint32) (peers, error) { s.state.RLock() var ( lookupErr error result = peers{ peers: make([]peer, 0, s.state.topoMap.Replicas()), - shard: shard, + shard: shardID, majorityReplicas: s.state.topoMap.MajorityReplicas(), } ) - err := s.state.topoMap.RouteShardForEach(shard, func(idx int, host topology.Host) { + err := s.state.topoMap.RouteShardForEach(shardID, func( + idx int, + _ shard.Shard, + host topology.Host, + ) { if s.origin != nil && s.origin.ID() == host.ID() { // Don't include the origin host result.selfExcluded = true diff --git a/src/dbnode/client/types.go b/src/dbnode/client/types.go index f30a196671..3c4ba2ed03 100644 --- a/src/dbnode/client/types.go +++ b/src/dbnode/client/types.go @@ -411,6 +411,14 @@ type Options interface { // a fetch operation. Only retryable errors are retried. FetchRetrier() xretry.Retrier + // SetWriteShardsInitializing sets whether to write to shards that are + // initializing or not. + SetWriteShardsInitializing(value bool) Options + + // WriteShardsInitializing returns whether to write to shards that are + // initializing or not. + WriteShardsInitializing() bool + // SetTagEncoderOptions sets the TagEncoderOptions. SetTagEncoderOptions(value serialize.TagEncoderOptions) Options diff --git a/src/dbnode/topology/map.go b/src/dbnode/topology/map.go index 482a6d7cac..31229c0f45 100644 --- a/src/dbnode/topology/map.go +++ b/src/dbnode/topology/map.go @@ -21,20 +21,21 @@ package topology import ( + "github.com/m3db/m3/src/cluster/shard" "github.com/m3db/m3/src/dbnode/sharding" "github.com/m3db/m3/src/x/ident" xwatch "github.com/m3db/m3/src/x/watch" ) type staticMap struct { - shardSet sharding.ShardSet - hostShardSets []HostShardSet - hostShardSetsByID map[string]HostShardSet - orderedHosts []Host - hostsByShard [][]Host - orderedHostsByShard [][]orderedHost - replicas int - majority int + shardSet sharding.ShardSet + hostShardSets []HostShardSet + hostShardSetsByID map[string]HostShardSet + orderedHosts []Host + hostsByShard [][]Host + orderedShardHostsByShard [][]orderedShardHost + replicas int + majority int } // NewStaticMap creates a new static topology map @@ -42,35 +43,40 @@ func NewStaticMap(opts StaticOptions) Map { totalShards := len(opts.ShardSet().AllIDs()) hostShardSets := opts.HostShardSets() topoMap := staticMap{ - shardSet: opts.ShardSet(), - hostShardSets: hostShardSets, - hostShardSetsByID: make(map[string]HostShardSet), - orderedHosts: make([]Host, 0, len(hostShardSets)), - hostsByShard: make([][]Host, totalShards), - orderedHostsByShard: make([][]orderedHost, totalShards), - replicas: opts.Replicas(), - majority: Majority(opts.Replicas()), + shardSet: opts.ShardSet(), + hostShardSets: hostShardSets, + hostShardSetsByID: make(map[string]HostShardSet), + orderedHosts: make([]Host, 0, len(hostShardSets)), + hostsByShard: make([][]Host, totalShards), + orderedShardHostsByShard: make([][]orderedShardHost, totalShards), + replicas: opts.Replicas(), + majority: Majority(opts.Replicas()), } for idx, hostShardSet := range hostShardSets { host := hostShardSet.Host() topoMap.hostShardSetsByID[host.ID()] = hostShardSet topoMap.orderedHosts = append(topoMap.orderedHosts, host) - for _, shard := range hostShardSet.ShardSet().AllIDs() { - topoMap.hostsByShard[shard] = append(topoMap.hostsByShard[shard], host) - topoMap.orderedHostsByShard[shard] = append(topoMap.orderedHostsByShard[shard], orderedHost{ - idx: idx, - host: host, - }) + for _, shard := range hostShardSet.ShardSet().All() { + id := shard.ID() + topoMap.hostsByShard[id] = append(topoMap.hostsByShard[id], host) + elem := orderedShardHost{ + idx: idx, + shard: shard, + host: host, + } + topoMap.orderedShardHostsByShard[id] = + append(topoMap.orderedShardHostsByShard[id], elem) } } return &topoMap } -type orderedHost struct { - idx int - host Host +type orderedShardHost struct { + idx int + shard shard.Shard + host Host } func (t *staticMap) Hosts() []Host { @@ -114,12 +120,12 @@ func (t *staticMap) RouteShard(shard uint32) ([]Host, error) { } func (t *staticMap) RouteShardForEach(shard uint32, forEachFn RouteForEachFn) error { - if int(shard) >= len(t.orderedHostsByShard) { + if int(shard) >= len(t.orderedShardHostsByShard) { return errUnownedShard } - orderedHosts := t.orderedHostsByShard[shard] - for i := range orderedHosts { - forEachFn(orderedHosts[i].idx, orderedHosts[i].host) + orderedShardHosts := t.orderedShardHostsByShard[shard] + for _, elem := range orderedShardHosts { + forEachFn(elem.idx, elem.shard, elem.host) } return nil } diff --git a/src/dbnode/topology/map_test.go b/src/dbnode/topology/map_test.go index ab3967bace..fdc78241a5 100644 --- a/src/dbnode/topology/map_test.go +++ b/src/dbnode/topology/map_test.go @@ -33,15 +33,34 @@ import ( func newTestShardSet( t *testing.T, - shards []uint32, + shards []testShard, hashFn sharding.HashFn, ) sharding.ShardSet { - values := sharding.NewShards(shards, shard.Available) + var values []shard.Shard + for _, elem := range shards { + value := shard.NewShard(elem.id).SetState(elem.state) + values = append(values, value) + } shardSet, err := sharding.NewShardSet(values, hashFn) require.NoError(t, err) return shardSet } +type testShard struct { + id uint32 + state shard.State +} + +type testShards []testShard + +func (s testShards) IDs() []uint32 { + var ids []uint32 + for _, elem := range s { + ids = append(ids, elem.id) + } + return ids +} + func TestStaticMap(t *testing.T) { hashFn := func(id ident.ID) uint32 { switch id.String() { @@ -59,12 +78,12 @@ func TestStaticMap(t *testing.T) { hosts := []struct { id string addr string - shards []uint32 + shards testShards }{ - {"h1", "h1:9000", []uint32{0}}, - {"h2", "h2:9000", []uint32{1}}, - {"h3", "h3:9000", []uint32{0}}, - {"h4", "h4:9000", []uint32{1}}, + {"h1", "h1:9000", []testShard{{id: 0, state: shard.Available}}}, + {"h2", "h2:9000", []testShard{{id: 1, state: shard.Available}}}, + {"h3", "h3:9000", []testShard{{id: 0, state: shard.Available}}}, + {"h4", "h4:9000", []testShard{{id: 1, state: shard.Initializing}}}, } var hostShardSets []HostShardSet @@ -75,8 +94,12 @@ func TestStaticMap(t *testing.T) { newTestShardSet(t, h.shards, hashFn))) } + seedShardSet := newTestShardSet(t, []testShard{ + {id: 0, state: shard.Available}, + {id: 1, state: shard.Available}, + }, hashFn) opts := NewStaticOptions(). - SetShardSet(newTestShardSet(t, []uint32{0, 1}, hashFn)). + SetShardSet(seedShardSet). SetReplicas(2). SetHostShardSets(hostShardSets) @@ -93,12 +116,12 @@ func TestStaticMap(t *testing.T) { for i, h := range hosts { assert.Equal(t, h.id, m.HostShardSets()[i].Host().ID()) assert.Equal(t, h.addr, m.HostShardSets()[i].Host().Address()) - assert.Equal(t, h.shards, m.HostShardSets()[i].ShardSet().AllIDs()) + assert.Equal(t, h.shards.IDs(), m.HostShardSets()[i].ShardSet().AllIDs()) } - shard, targetHosts, err := m.Route(ident.StringID("foo")) + targetShard, targetHosts, err := m.Route(ident.StringID("foo")) require.NoError(t, err) - assert.Equal(t, uint32(0), shard) + assert.Equal(t, uint32(0), targetShard) require.Equal(t, 2, len(targetHosts)) assert.Equal(t, "h1", targetHosts[0].ID()) assert.Equal(t, "h3", targetHosts[1].ID()) @@ -117,19 +140,25 @@ func TestStaticMap(t *testing.T) { require.Error(t, err) assert.Equal(t, errUnownedShard, err) - err = m.RouteForEach(ident.StringID("bar"), func(idx int, h Host) { + err = m.RouteForEach(ident.StringID("bar"), func( + idx int, + s shard.Shard, + h Host, + ) { switch idx { case 1: assert.Equal(t, "h2", h.ID()) + assert.Equal(t, shard.Available, s.State()) case 3: assert.Equal(t, "h4", h.ID()) + assert.Equal(t, shard.Initializing, s.State()) default: assert.Fail(t, "routed to wrong host") } }) assert.NoError(t, err) - err = m.RouteForEach(ident.StringID("unowned"), func(idx int, h Host) {}) + err = m.RouteForEach(ident.StringID("unowned"), func(_ int, _ shard.Shard, _ Host) {}) require.Error(t, err) assert.Equal(t, errUnownedShard, err) diff --git a/src/dbnode/topology/types.go b/src/dbnode/topology/types.go index 9c3ffbd18f..1756b93bbd 100644 --- a/src/dbnode/topology/types.go +++ b/src/dbnode/topology/types.go @@ -134,7 +134,7 @@ type Map interface { } // RouteForEachFn is a function to execute for each routed to host -type RouteForEachFn func(idx int, host Host) +type RouteForEachFn func(idx int, shard shard.Shard, host Host) // StaticConfiguration is used for standing up M3DB with a static topology type StaticConfiguration struct {