diff --git a/kube/bundle.yaml b/kube/bundle.yaml index f9df985bb3..8cd5a6e7fa 100644 --- a/kube/bundle.yaml +++ b/kube/bundle.yaml @@ -176,6 +176,8 @@ data: bootstrappers: - filesystem - commitlog + - peers + - uninitialized_topology fs: numProcessorsPerCPU: 0.125 diff --git a/kube/m3dbnode-configmap.yaml b/kube/m3dbnode-configmap.yaml index 277663bc6f..927c48c9b7 100644 --- a/kube/m3dbnode-configmap.yaml +++ b/kube/m3dbnode-configmap.yaml @@ -68,6 +68,8 @@ data: bootstrappers: - filesystem - commitlog + - peers + - uninitialized_topology fs: numProcessorsPerCPU: 0.125 diff --git a/src/cmd/services/m3dbnode/config/bootstrap.go b/src/cmd/services/m3dbnode/config/bootstrap.go index b74c377618..8476f96582 100644 --- a/src/cmd/services/m3dbnode/config/bootstrap.go +++ b/src/cmd/services/m3dbnode/config/bootstrap.go @@ -33,6 +33,7 @@ import ( "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper/commitlog" bfs "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper/fs" "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper/peers" + "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper/uninitialized" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/dbnode/storage/index" ) @@ -120,7 +121,7 @@ func (bsc BootstrapConfiguration) New( case bootstrapper.NoOpNoneBootstrapperName: bs = bootstrapper.NewNoOpNoneBootstrapperProvider() case bfs.FileSystemBootstrapperName: - fsbopts := bfs.NewOptions(). + fsbOpts := bfs.NewOptions(). SetInstrumentOptions(opts.InstrumentOptions()). SetResultOptions(rsOpts). SetFilesystemOptions(fsOpts). @@ -129,12 +130,12 @@ func (bsc BootstrapConfiguration) New( SetDatabaseBlockRetrieverManager(opts.DatabaseBlockRetrieverManager()). SetRuntimeOptionsManager(opts.RuntimeOptionsManager()). SetIdentifierPool(opts.IdentifierPool()) - bs, err = bfs.NewFileSystemBootstrapperProvider(fsbopts, bs) + bs, err = bfs.NewFileSystemBootstrapperProvider(fsbOpts, bs) if err != nil { return nil, err } case commitlog.CommitLogBootstrapperName: - copts := commitlog.NewOptions(). + cOpts := commitlog.NewOptions(). SetResultOptions(rsOpts). SetCommitLogOptions(opts.CommitLogOptions()) @@ -142,22 +143,27 @@ func (bsc BootstrapConfiguration) New( if err != nil { return nil, err } - bs, err = commitlog.NewCommitLogBootstrapperProvider(copts, inspection, bs) + bs, err = commitlog.NewCommitLogBootstrapperProvider(cOpts, inspection, bs) if err != nil { return nil, err } case peers.PeersBootstrapperName: - popts := peers.NewOptions(). + pOpts := peers.NewOptions(). SetResultOptions(rsOpts). SetAdminClient(adminClient). SetPersistManager(opts.PersistManager()). SetDatabaseBlockRetrieverManager(opts.DatabaseBlockRetrieverManager()). SetFetchBlocksMetadataEndpointVersion(bsc.peersFetchBlocksMetadataEndpointVersion()). SetRuntimeOptionsManager(opts.RuntimeOptionsManager()) - bs, err = peers.NewPeersBootstrapperProvider(popts, bs) + bs, err = peers.NewPeersBootstrapperProvider(pOpts, bs) if err != nil { return nil, err } + case uninitialized.UninitializedTopologyBootstrapperName: + uopts := uninitialized.NewOptions(). + SetResultOptions(rsOpts). + SetInstrumentOptions(opts.InstrumentOptions()) + bs = uninitialized.NewuninitializedTopologyBootstrapperProvider(uopts, bs) default: return nil, fmt.Errorf("unknown bootstrapper: %s", bsc.Bootstrappers[i]) } @@ -189,12 +195,20 @@ func ValidateBootstrappersOrder(names []string) error { peers.PeersBootstrapperName: []string{ // Peers must always appear after filesystem bfs.FileSystemBootstrapperName, + // Peers may appear before OR after commitlog + commitlog.CommitLogBootstrapperName, }, commitlog.CommitLogBootstrapperName: []string{ // Commit log bootstrapper may appear after filesystem or peers bfs.FileSystemBootstrapperName, peers.PeersBootstrapperName, }, + uninitialized.UninitializedTopologyBootstrapperName: []string{ + // Unintialized bootstrapper may appear after filesystem or peers or commitlog + bfs.FileSystemBootstrapperName, + commitlog.CommitLogBootstrapperName, + peers.PeersBootstrapperName, + }, } validated := make(map[string]struct{}) diff --git a/src/cmd/services/m3dbnode/main/common_test.go b/src/cmd/services/m3dbnode/main/common_test.go index a48cfed732..15791dfe10 100644 --- a/src/cmd/services/m3dbnode/main/common_test.go +++ b/src/cmd/services/m3dbnode/main/common_test.go @@ -34,8 +34,10 @@ import ( "time" "github.com/gogo/protobuf/proto" + "github.com/m3db/m3/src/dbnode/client" "github.com/m3db/m3/src/dbnode/retention" "github.com/m3db/m3/src/dbnode/storage/namespace" + "github.com/m3db/m3cluster/shard" "github.com/m3db/m3x/ident" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -139,3 +141,35 @@ func newNamespaceWithIndexProtoValue(id string, indexEnabled bool) (proto.Messag } return namespace.ToProto(nsMap), nil } + +// waitUntilAllShardsAreAvailable continually polls the session checking to see if the topology.Map +// that the session is currently storing contains a non-zero number of host shard sets, and if so, +// makes sure that all their shard states are Available. +func waitUntilAllShardsAreAvailable(t *testing.T, session client.AdminSession) { +outer: + for { + time.Sleep(10 * time.Millisecond) + + topoMap, err := session.TopologyMap() + require.NoError(t, err) + + var ( + hostShardSets = topoMap.HostShardSets() + ) + + if len(hostShardSets) == 0 { + // We haven't received an actual topology yet. + continue + } + + for _, hostShardSet := range hostShardSets { + for _, hostShard := range hostShardSet.ShardSet().All() { + if hostShard.State() != shard.Available { + continue outer + } + } + } + + break + } +} diff --git a/src/cmd/services/m3dbnode/main/main_index_test.go b/src/cmd/services/m3dbnode/main/main_index_test.go index 3f9678934d..cbf0181d2a 100644 --- a/src/cmd/services/m3dbnode/main/main_index_test.go +++ b/src/cmd/services/m3dbnode/main/main_index_test.go @@ -327,6 +327,8 @@ db: bootstrappers: - filesystem - commitlog + - peers + - uninitialized_topology fs: numProcessorsPerCPU: 0.125 diff --git a/src/cmd/services/m3dbnode/main/main_test.go b/src/cmd/services/m3dbnode/main/main_test.go index 0173bfe5ef..146f5e50c5 100644 --- a/src/cmd/services/m3dbnode/main/main_test.go +++ b/src/cmd/services/m3dbnode/main/main_test.go @@ -37,7 +37,6 @@ import ( "github.com/m3db/m3cluster/integration/etcd" "github.com/m3db/m3cluster/placement" "github.com/m3db/m3cluster/services" - "github.com/m3db/m3cluster/shard" xconfig "github.com/m3db/m3x/config" "github.com/m3db/m3x/ident" "github.com/m3db/m3x/instrument" @@ -488,6 +487,8 @@ db: bootstrappers: - filesystem - commitlog + - peers + - uninitialized_topology fs: numProcessorsPerCPU: 0.125 @@ -632,35 +633,3 @@ db: endpoint: {{.InitialClusterEndpoint}} ` ) - -// waitUntilAllShardsAreAvailable continually polls the session checking to see if the topology.Map -// that the session is currently storing contains a non-zero number of host shard sets, and if so, -// makes sure that all their shard states are Available. -func waitUntilAllShardsAreAvailable(t *testing.T, session client.AdminSession) { -outer: - for { - time.Sleep(10 * time.Millisecond) - - topoMap, err := session.TopologyMap() - require.NoError(t, err) - - var ( - hostShardSets = topoMap.HostShardSets() - ) - - if len(hostShardSets) == 0 { - // We haven't received an actual topology yet. - continue - } - - for _, hostShardSet := range hostShardSets { - for _, hostShard := range hostShardSet.ShardSet().All() { - if hostShard.State() != shard.Available { - continue outer - } - } - } - - break - } -} diff --git a/src/dbnode/config/m3dbnode-local-etcd.yml b/src/dbnode/config/m3dbnode-local-etcd.yml index 86ada7bde1..37c1c95ff6 100644 --- a/src/dbnode/config/m3dbnode-local-etcd.yml +++ b/src/dbnode/config/m3dbnode-local-etcd.yml @@ -63,7 +63,8 @@ db: bootstrappers: - filesystem - commitlog - - noop-none + - peers + - uninitialized_topology fs: numProcessorsPerCPU: 0.125 diff --git a/src/dbnode/config/m3dbnode-local.yml b/src/dbnode/config/m3dbnode-local.yml index 2762db550a..929c656c81 100644 --- a/src/dbnode/config/m3dbnode-local.yml +++ b/src/dbnode/config/m3dbnode-local.yml @@ -63,7 +63,8 @@ db: bootstrappers: - filesystem - commitlog - - noop-none + - peers + - uninitialized_topology fs: numProcessorsPerCPU: 0.125 diff --git a/src/dbnode/example/m3db-node-config.yaml b/src/dbnode/example/m3db-node-config.yaml index 7b668eed79..10276cda72 100644 --- a/src/dbnode/example/m3db-node-config.yaml +++ b/src/dbnode/example/m3db-node-config.yaml @@ -47,6 +47,8 @@ bootstrap: bootstrappers: - filesystem - commitlog + - peers + - uninitialized_topology fs: numProcessorsPerCPU: 0.125 diff --git a/src/dbnode/integration/bootstrap_after_buffer_rotation_regression_test.go b/src/dbnode/integration/bootstrap_after_buffer_rotation_regression_test.go index 9f1c604ee4..05303b9c5a 100644 --- a/src/dbnode/integration/bootstrap_after_buffer_rotation_regression_test.go +++ b/src/dbnode/integration/bootstrap_after_buffer_rotation_regression_test.go @@ -118,6 +118,7 @@ func TestBootstrapAfterBufferRotation(t *testing.T) { signalCh := make(chan struct{}) bootstrapper, err := commitlogBootstrapperProvider.Provide() require.NoError(t, err) + test := newTestBootstrapperSource(testBootstrapperSourceOptions{ readData: func( _ namespace.Metadata, diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go index 28dbfbe849..3b092a420f 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go @@ -36,8 +36,10 @@ import ( "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/dbnode/storage/index/convert" "github.com/m3db/m3/src/dbnode/storage/namespace" + "github.com/m3db/m3/src/dbnode/topology" "github.com/m3db/m3/src/dbnode/ts" "github.com/m3db/m3/src/dbnode/x/xio" + "github.com/m3db/m3cluster/shard" "github.com/m3db/m3x/checked" "github.com/m3db/m3x/ident" "github.com/m3db/m3x/instrument" @@ -104,10 +106,7 @@ func (s *commitLogSource) AvailableData( shardsTimeRanges result.ShardTimeRanges, runOpts bootstrap.RunOptions, ) result.ShardTimeRanges { - // Commit log bootstrapper is a last ditch effort, so fulfill all - // time ranges requested even if not enough data, just to succeed - // the bootstrap - return shardsTimeRanges + return s.availability(ns, shardsTimeRanges, runOpts) } // ReadData will read a combination of the available snapshot files and commit log files to @@ -1249,10 +1248,7 @@ func (s *commitLogSource) AvailableIndex( shardsTimeRanges result.ShardTimeRanges, runOpts bootstrap.RunOptions, ) result.ShardTimeRanges { - // Commit log bootstrapper is a last ditch effort, so fulfill all - // time ranges requested even if not enough data, just to succeed - // the bootstrap - return shardsTimeRanges + return s.availability(ns, shardsTimeRanges, runOpts) } func (s *commitLogSource) ReadIndex( @@ -1425,6 +1421,76 @@ func (s commitLogSource) maybeAddToIndex( return err } +// The commitlog bootstrapper determines availability primarily by checking if the +// origin host has ever reached the "Available" state for the shard that is being +// bootstrapped. If not, then it can't provide data for that shard because it doesn't +// have all of it by definition. +func (s *commitLogSource) availability( + ns namespace.Metadata, + shardsTimeRanges result.ShardTimeRanges, + runOpts bootstrap.RunOptions, +) result.ShardTimeRanges { + var ( + topoState = runOpts.InitialTopologyState() + availableShardTimeRanges = result.ShardTimeRanges{} + ) + + for shardIDUint := range shardsTimeRanges { + shardID := topology.ShardID(shardIDUint) + hostShardStates, ok := topoState.ShardStates[shardID] + if !ok { + // This shard was not part of the topology when the bootstrapping + // process began. + continue + } + + originHostShardState, ok := hostShardStates[topology.HostID(topoState.Origin.ID())] + if !ok { + // TODO(rartoul): Make this a hard error once we refactor the interface to support + // returning errors. + iOpts := s.opts.CommitLogOptions().InstrumentOptions() + invariantLogger := instrument.EmitInvariantViolationAndGetLogger(iOpts) + invariantLogger.Errorf( + "initial topology state does not contain shard state for origin node and shard: %d", shardIDUint) + continue + } + + originShardState := originHostShardState.ShardState + switch originShardState { + // In the Initializing state we have to assume that the commit log + // is missing data and can't satisfy the bootstrap request. + case shard.Initializing: + // In the Leaving and Available case, we assume that the commit log contains + // all the data required to satisfy the bootstrap request because the node + // had (at some point) been completely bootstrapped for the requested shard. + // This doesn't mean that the node can't be missing any data or wasn't down + // for some period of time and missing writes in a multi-node deployment, it + // only means that technically the node has successfully taken ownership of + // the data for this shard and made it to a "bootstrapped" state which is + // all that is required to maintain our cluster-level consistency guarantees. + case shard.Leaving: + fallthrough + case shard.Available: + // Assume that we can bootstrap any requested time range, which is valid as + // long as the FS bootstrapper precedes the commit log bootstrapper. + // TODO(rartoul): Once we make changes to the bootstrapping interfaces + // to distinguish between "unfulfilled" data and "corrupt" data, then + // modify this to only say the commit log bootstrapper can fullfil + // "unfulfilled" data, but not corrupt data. + availableShardTimeRanges[shardIDUint] = shardsTimeRanges[shardIDUint] + case shard.Unknown: + fallthrough + default: + // TODO(rartoul): Make this a hard error once we refactor the interface to support + // returning errors. + s.log.Errorf("unknown shard state: %v", originShardState) + return result.ShardTimeRanges{} + } + } + + return availableShardTimeRanges +} + func newReadSeriesPredicate(ns namespace.Metadata) commitlog.SeriesFilterPredicate { nsID := ns.ID() return func(id ident.ID, namespace ident.ID) bool { diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_data_test.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_data_test.go index 5964e46407..f47cb335f4 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_data_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_data_test.go @@ -86,14 +86,14 @@ func testOptions() Options { } func TestAvailableEmptyRangeError(t *testing.T) { - opts := testOptions() + opts := testDefaultOpts src := newCommitLogSource(opts, fs.Inspection{}) res := src.AvailableData(testNsMetadata(t), result.ShardTimeRanges{}, testDefaultRunOpts) require.True(t, result.ShardTimeRanges{}.Equal(res)) } func TestReadEmpty(t *testing.T) { - opts := testOptions() + opts := testDefaultOpts src := newCommitLogSource(opts, fs.Inspection{}) @@ -105,7 +105,7 @@ func TestReadEmpty(t *testing.T) { } func TestReadErrorOnNewIteratorError(t *testing.T) { - opts := testOptions() + opts := testDefaultOpts src := newCommitLogSource(opts, fs.Inspection{}).(*commitLogSource) src.newIteratorFn = func(_ commitlog.IteratorOpts) (commitlog.Iterator, error) { @@ -124,7 +124,7 @@ func TestReadErrorOnNewIteratorError(t *testing.T) { } func TestReadOrderedValues(t *testing.T) { - opts := testOptions() + opts := testDefaultOpts md := testNsMetadata(t) src := newCommitLogSource(opts, fs.Inspection{}).(*commitLogSource) @@ -169,7 +169,7 @@ func TestReadOrderedValues(t *testing.T) { } func TestReadUnorderedValues(t *testing.T) { - opts := testOptions() + opts := testDefaultOpts md := testNsMetadata(t) src := newCommitLogSource(opts, fs.Inspection{}).(*commitLogSource) @@ -216,7 +216,7 @@ func TestReadUnorderedValues(t *testing.T) { // files can span multiple M3DB processes which means that unique indexes could be re-used for multiple // different series. func TestReadHandlesDifferentSeriesWithIdenticalUniqueIndex(t *testing.T) { - opts := testOptions() + opts := testDefaultOpts md := testNsMetadata(t) src := newCommitLogSource(opts, fs.Inspection{}).(*commitLogSource) @@ -257,7 +257,7 @@ func TestReadHandlesDifferentSeriesWithIdenticalUniqueIndex(t *testing.T) { } func TestReadTrimsToRanges(t *testing.T) { - opts := testOptions() + opts := testDefaultOpts md := testNsMetadata(t) src := newCommitLogSource(opts, fs.Inspection{}).(*commitLogSource) @@ -302,7 +302,7 @@ func TestItMergesSnapshotsAndCommitLogs(t *testing.T) { defer ctrl.Finish() var ( - opts = testOptions() + opts = testDefaultOpts md = testNsMetadata(t) src = newCommitLogSource(opts, fs.Inspection{}).(*commitLogSource) blockSize = md.Options().RetentionOptions().BlockSize() diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_prop_test.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_prop_test.go index 1097e3c887..ef5ff1142b 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_prop_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_prop_test.go @@ -40,7 +40,9 @@ import ( "github.com/m3db/m3/src/dbnode/persist/fs/commitlog" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/dbnode/storage/namespace" + tu "github.com/m3db/m3/src/dbnode/topology/testutil" "github.com/m3db/m3/src/dbnode/ts" + "github.com/m3db/m3cluster/shard" "github.com/m3db/m3x/checked" "github.com/m3db/m3x/context" "github.com/m3db/m3x/ident" @@ -306,19 +308,31 @@ func TestCommitLogSourcePropCorrectlyBootstrapsFromCommitlog(t *testing.T) { // Determine which shards we need to bootstrap (based on the randomly // generated data) - allShards := map[uint32]bool{} + var ( + allShardsMap = map[uint32]bool{} + allShardsSlice = []uint32{} + ) for _, write := range input.writes { - allShards[write.series.Shard] = true + shard := write.series.Shard + if _, ok := allShardsMap[shard]; !ok { + allShardsSlice = append(allShardsSlice, shard) + } + allShardsMap[shard] = true } // Assign the previously-determined bootstrap range to each known shard shardTimeRanges := result.ShardTimeRanges{} - for shard := range allShards { + for shard := range allShardsMap { shardTimeRanges[shard] = ranges } // Perform the bootstrap - runOpts := testDefaultRunOpts + var ( + initialTopoState = tu.NewStateSnapshot(1, tu.HostShardStates{ + tu.SelfID: tu.Shards(allShardsSlice, shard.Available), + }) + runOpts = testDefaultRunOpts.SetInitialTopologyState(initialTopoState) + ) dataResult, err := source.BootstrapData(nsMeta, shardTimeRanges, runOpts) if err != nil { return false, err @@ -341,7 +355,7 @@ func TestCommitLogSourcePropCorrectlyBootstrapsFromCommitlog(t *testing.T) { return false, err } - indexResult, err := source.BootstrapIndex(nsMeta, shardTimeRanges, testDefaultRunOpts) + indexResult, err := source.BootstrapIndex(nsMeta, shardTimeRanges, runOpts) if err != nil { return false, err } diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_test.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_test.go new file mode 100644 index 0000000000..96cbc56485 --- /dev/null +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_test.go @@ -0,0 +1,132 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package commitlog + +import ( + "testing" + "time" + + "github.com/m3db/m3/src/dbnode/persist/fs" + "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" + "github.com/m3db/m3/src/dbnode/topology" + tu "github.com/m3db/m3/src/dbnode/topology/testutil" + "github.com/m3db/m3cluster/shard" + xtime "github.com/m3db/m3x/time" + + "github.com/stretchr/testify/require" +) + +var ( + testDefaultOpts = testOptions() + notSelfID = "not-self" +) + +func TestAvailableData(t *testing.T) { + var ( + nsMetadata = testNsMetadata(t) + blockSize = 2 * time.Hour + numShards = uint32(4) + blockStart = time.Now().Truncate(blockSize) + shardTimeRangesToBootstrap = result.ShardTimeRanges{} + bootstrapRanges = xtime.Ranges{}.AddRange(xtime.Range{ + Start: blockStart, + End: blockStart.Add(blockSize), + }) + ) + + for i := 0; i < int(numShards); i++ { + shardTimeRangesToBootstrap[uint32(i)] = bootstrapRanges + } + + testCases := []struct { + title string + topoState *topology.StateSnapshot + shardsTimeRangesToBootstrap result.ShardTimeRanges + expectedAvailableShardsTimeRanges result.ShardTimeRanges + }{ + { + title: "Single node - Shard initializing", + topoState: tu.NewStateSnapshot(1, tu.HostShardStates{ + tu.SelfID: tu.ShardsRange(0, numShards, shard.Initializing), + }), + shardsTimeRangesToBootstrap: shardTimeRangesToBootstrap, + expectedAvailableShardsTimeRanges: result.ShardTimeRanges{}, + }, + { + title: "Single node - Shard unknown", + topoState: tu.NewStateSnapshot(1, tu.HostShardStates{ + tu.SelfID: tu.ShardsRange(0, numShards, shard.Unknown), + }), + shardsTimeRangesToBootstrap: shardTimeRangesToBootstrap, + expectedAvailableShardsTimeRanges: result.ShardTimeRanges{}, + }, + { + title: "Single node - Shard leaving", + topoState: tu.NewStateSnapshot(1, tu.HostShardStates{ + tu.SelfID: tu.ShardsRange(0, numShards, shard.Leaving), + }), + shardsTimeRangesToBootstrap: shardTimeRangesToBootstrap, + expectedAvailableShardsTimeRanges: shardTimeRangesToBootstrap, + }, + { + title: "Single node - Shard available", + topoState: tu.NewStateSnapshot(1, tu.HostShardStates{ + tu.SelfID: tu.ShardsRange(0, numShards, shard.Available), + }), + shardsTimeRangesToBootstrap: shardTimeRangesToBootstrap, + expectedAvailableShardsTimeRanges: shardTimeRangesToBootstrap, + }, + { + title: "Multi node - Origin available", + topoState: tu.NewStateSnapshot(1, tu.HostShardStates{ + tu.SelfID: tu.ShardsRange(0, numShards, shard.Available), + notSelfID: tu.ShardsRange(0, numShards, shard.Initializing), + }), + shardsTimeRangesToBootstrap: shardTimeRangesToBootstrap, + expectedAvailableShardsTimeRanges: shardTimeRangesToBootstrap, + }, + { + title: "Multi node - Origin not available", + topoState: tu.NewStateSnapshot(1, tu.HostShardStates{ + tu.SelfID: tu.ShardsRange(0, numShards, shard.Initializing), + notSelfID: tu.ShardsRange(0, numShards, shard.Available), + }), + shardsTimeRangesToBootstrap: shardTimeRangesToBootstrap, + expectedAvailableShardsTimeRanges: result.ShardTimeRanges{}, + }, + } + + for _, tc := range testCases { + t.Run(tc.title, func(t *testing.T) { + + var ( + src = newCommitLogSource(testOptions(), fs.Inspection{}) + runOpts = testDefaultRunOpts.SetInitialTopologyState(tc.topoState) + dataRes = src.AvailableData(nsMetadata, tc.shardsTimeRangesToBootstrap, runOpts) + ) + + require.Equal(t, tc.expectedAvailableShardsTimeRanges, dataRes) + + indexRes := src.AvailableIndex(nsMetadata, tc.shardsTimeRangesToBootstrap, runOpts) + require.Equal(t, tc.expectedAvailableShardsTimeRanges, indexRes) + }) + } +} diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/types.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/types.go index 4407f1e8fd..047a100369 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/types.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/types.go @@ -30,10 +30,10 @@ type Options interface { // Validate validates the options Validate() error - // SetResultOptions sets the instrumentation options + // SetResultOptions sets the result options SetResultOptions(value result.Options) Options - // ResultOptions returns the instrumentation options + // ResultOptions returns the result options ResultOptions() result.Options // SetCommitLogOptions sets the commit log options diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go index cfba16e48a..ef12104442 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go @@ -710,20 +710,22 @@ func (s *peersSource) peerAvailability( shardState := hostShardState.ShardState switch shardState { - // Skip cases - We cannot bootstrap from this host + // Don't want to peer bootstrap from a node that has not yet completely + // taken ownership of the shard. case shard.Initializing: - // Don't want to peer bootstrap from a node that has not yet completely - // taken ownership of the shard. - case shard.Unknown: // Success cases - We can bootstrap from this host, which is enough to // mark this shard as bootstrappable. case shard.Leaving: fallthrough case shard.Available: shardPeers.numAvailablePeers++ + case shard.Unknown: + fallthrough default: - panic( - fmt.Sprintf("encountered unknown shard state: %s", shardState.String())) + // TODO(rartoul): Make this a hard error once we refactor the interface to support + // returning errors. + s.log.Errorf("unknown shard state: %v", shardState) + return result.ShardTimeRanges{} } } } diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source_test.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source_test.go index dd88225650..419ca8d6a4 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source_test.go @@ -25,9 +25,9 @@ import ( "time" m3dbruntime "github.com/m3db/m3/src/dbnode/runtime" - "github.com/m3db/m3/src/dbnode/sharding" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/dbnode/topology" + tu "github.com/m3db/m3/src/dbnode/topology/testutil" "github.com/m3db/m3cluster/shard" xtime "github.com/m3db/m3x/time" @@ -36,51 +36,18 @@ import ( ) const ( - selfID = "self" + notSelfID1 = "not-self1" + notSelfID2 = "not-self2" ) -type sourceAvailableHost struct { - name string - shards []uint32 - shardStates shard.State -} - -type sourceAvailableHosts []sourceAvailableHost - -func (s sourceAvailableHosts) topologyState() *topology.StateSnapshot { - topoState := &topology.StateSnapshot{ - Origin: topology.NewHost(selfID, "127.0.0.1"), - MajorityReplicas: 2, - ShardStates: make(map[topology.ShardID]map[topology.HostID]topology.HostShardState), - } - - for _, host := range s { - for _, shard := range host.shards { - hostShardStates, ok := topoState.ShardStates[topology.ShardID(shard)] - if !ok { - hostShardStates = make(map[topology.HostID]topology.HostShardState) - } - - hostShardStates[topology.HostID(host.name)] = topology.HostShardState{ - Host: topology.NewHost(host.name, host.name+"address"), - ShardState: host.shardStates, - } - topoState.ShardStates[topology.ShardID(shard)] = hostShardStates - } - } - - return topoState -} - func TestPeersSourceAvailableDataAndIndex(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() var ( - replicaMajority = 2 blockSize = 2 * time.Hour nsMetadata = testNamespaceMetadata(t) - shards = []uint32{0, 1, 2, 3} + numShards = uint32(4) blockStart = time.Now().Truncate(blockSize) shardTimeRangesToBootstrap = result.ShardTimeRanges{} bootstrapRanges = xtime.Ranges{}.AddRange(xtime.Range{ @@ -89,8 +56,8 @@ func TestPeersSourceAvailableDataAndIndex(t *testing.T) { }) ) - for _, shard := range shards { - shardTimeRangesToBootstrap[shard] = bootstrapRanges + for i := 0; i < int(numShards); i++ { + shardTimeRangesToBootstrap[uint32(i)] = bootstrapRanges } shardTimeRangesToBootstrapOneExtra := shardTimeRangesToBootstrap.Copy() @@ -98,112 +65,60 @@ func TestPeersSourceAvailableDataAndIndex(t *testing.T) { testCases := []struct { title string - hosts sourceAvailableHosts + topoState *topology.StateSnapshot bootstrapReadConsistency topology.ReadConsistencyLevel shardsTimeRangesToBootstrap result.ShardTimeRanges expectedAvailableShardsTimeRanges result.ShardTimeRanges }{ { title: "Returns empty if only self is available", - hosts: []sourceAvailableHost{ - sourceAvailableHost{ - name: selfID, - shards: shards, - shardStates: shard.Available, - }, - }, + topoState: tu.NewStateSnapshot(1, tu.HostShardStates{ + tu.SelfID: tu.ShardsRange(0, numShards, shard.Available), + }), bootstrapReadConsistency: topology.ReadConsistencyLevelMajority, shardsTimeRangesToBootstrap: shardTimeRangesToBootstrap, expectedAvailableShardsTimeRanges: result.ShardTimeRanges{}, }, { title: "Returns empty if all other peers initializing/unknown", - hosts: []sourceAvailableHost{ - sourceAvailableHost{ - name: selfID, - shards: shards, - shardStates: shard.Available, - }, - sourceAvailableHost{ - name: "other1", - shards: shards, - shardStates: shard.Initializing, - }, - sourceAvailableHost{ - name: "other2", - shards: shards, - shardStates: shard.Unknown, - }, - }, + topoState: tu.NewStateSnapshot(2, tu.HostShardStates{ + tu.SelfID: tu.ShardsRange(0, numShards, shard.Available), + notSelfID1: tu.ShardsRange(0, numShards, shard.Initializing), + notSelfID2: tu.ShardsRange(0, numShards, shard.Unknown), + }), bootstrapReadConsistency: topology.ReadConsistencyLevelMajority, shardsTimeRangesToBootstrap: shardTimeRangesToBootstrap, expectedAvailableShardsTimeRanges: result.ShardTimeRanges{}, }, { title: "Returns success if consistency can be met (available/leaving)", - hosts: []sourceAvailableHost{ - sourceAvailableHost{ - name: selfID, - shards: shards, - shardStates: shard.Initializing, - }, - sourceAvailableHost{ - name: "other1", - shards: shards, - shardStates: shard.Available, - }, - sourceAvailableHost{ - name: "other2", - shards: shards, - shardStates: shard.Leaving, - }, - }, + topoState: tu.NewStateSnapshot(2, tu.HostShardStates{ + tu.SelfID: tu.ShardsRange(0, numShards, shard.Initializing), + notSelfID1: tu.ShardsRange(0, numShards, shard.Available), + notSelfID2: tu.ShardsRange(0, numShards, shard.Leaving), + }), bootstrapReadConsistency: topology.ReadConsistencyLevelMajority, shardsTimeRangesToBootstrap: shardTimeRangesToBootstrap, expectedAvailableShardsTimeRanges: shardTimeRangesToBootstrap, }, { title: "Skips shards that were not in the topology at start", - hosts: []sourceAvailableHost{ - sourceAvailableHost{ - name: selfID, - shards: shards, - shardStates: shard.Initializing, - }, - sourceAvailableHost{ - name: "other1", - shards: shards, - shardStates: shard.Available, - }, - sourceAvailableHost{ - name: "other2", - shards: shards, - shardStates: shard.Available, - }, - }, + topoState: tu.NewStateSnapshot(2, tu.HostShardStates{ + tu.SelfID: tu.ShardsRange(0, numShards, shard.Initializing), + notSelfID1: tu.ShardsRange(0, numShards, shard.Available), + notSelfID2: tu.ShardsRange(0, numShards, shard.Available), + }), bootstrapReadConsistency: topology.ReadConsistencyLevelMajority, shardsTimeRangesToBootstrap: shardTimeRangesToBootstrapOneExtra, expectedAvailableShardsTimeRanges: shardTimeRangesToBootstrap, }, { title: "Returns empty if consistency can not be met", - hosts: []sourceAvailableHost{ - sourceAvailableHost{ - name: selfID, - shards: shards, - shardStates: shard.Initializing, - }, - sourceAvailableHost{ - name: "other1", - shards: shards, - shardStates: shard.Available, - }, - sourceAvailableHost{ - name: "other2", - shards: shards, - shardStates: shard.Available, - }, - }, + topoState: tu.NewStateSnapshot(2, tu.HostShardStates{ + tu.SelfID: tu.ShardsRange(0, numShards, shard.Initializing), + notSelfID1: tu.ShardsRange(0, numShards, shard.Available), + notSelfID2: tu.ShardsRange(0, numShards, shard.Available), + }), bootstrapReadConsistency: topology.ReadConsistencyLevelAll, shardsTimeRangesToBootstrap: shardTimeRangesToBootstrap, expectedAvailableShardsTimeRanges: result.ShardTimeRanges{}, @@ -212,22 +127,6 @@ func TestPeersSourceAvailableDataAndIndex(t *testing.T) { for _, tc := range testCases { t.Run(tc.title, func(t *testing.T) { - hostShardSets := []topology.HostShardSet{} - - for _, host := range tc.hosts { - shards := sharding.NewShards(host.shards, host.shardStates) - shardSet, err := sharding.NewShardSet(shards, sharding.DefaultHashFn(0)) - require.NoError(t, err) - - hostShardSet := topology.NewHostShardSet( - topology.NewHost(host.name, host.name+"address"), shardSet) - hostShardSets = append(hostShardSets, hostShardSet) - } - - mockMap := topology.NewMockMap(ctrl) - mockMap.EXPECT().HostShardSets().Return(hostShardSets).AnyTimes() - mockMap.EXPECT().MajorityReplicas().Return(replicaMajority).AnyTimes() - mockRuntimeOpts := m3dbruntime.NewMockOptions(ctrl) mockRuntimeOpts. EXPECT(). @@ -243,14 +142,15 @@ func TestPeersSourceAvailableDataAndIndex(t *testing.T) { AnyTimes() opts := testDefaultOpts. - // SetAdminClient(mockClient). SetRuntimeOptionsManager(mockRuntimeOptsMgr) src, err := newPeersSource(opts) require.NoError(t, err) - runOpts := testDefaultRunOpts.SetInitialTopologyState(tc.hosts.topologyState()) - dataRes := src.AvailableData(nsMetadata, tc.shardsTimeRangesToBootstrap, runOpts) + var ( + runOpts = testDefaultRunOpts.SetInitialTopologyState(tc.topoState) + dataRes = src.AvailableData(nsMetadata, tc.shardsTimeRangesToBootstrap, runOpts) + ) require.Equal(t, tc.expectedAvailableShardsTimeRanges, dataRes) indexRes := src.AvailableIndex(nsMetadata, tc.shardsTimeRangesToBootstrap, runOpts) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/uninitialized/options.go b/src/dbnode/storage/bootstrap/bootstrapper/uninitialized/options.go new file mode 100644 index 0000000000..dfc75bbf9e --- /dev/null +++ b/src/dbnode/storage/bootstrap/bootstrapper/uninitialized/options.go @@ -0,0 +1,59 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package uninitialized + +import ( + "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" + "github.com/m3db/m3x/instrument" +) + +type options struct { + resultOpts result.Options + iOpts instrument.Options +} + +// NewOptions creates a new Options. +func NewOptions() Options { + return &options{ + resultOpts: result.NewOptions(), + iOpts: instrument.NewOptions(), + } +} + +func (o *options) SetResultOptions(value result.Options) Options { + opts := *o + opts.resultOpts = value + return &opts +} + +func (o *options) ResultOptions() result.Options { + return o.resultOpts +} + +func (o *options) SetInstrumentOptions(value instrument.Options) Options { + opts := *o + opts.iOpts = value + return &opts +} + +func (o *options) InstrumentOptions() instrument.Options { + return o.iOpts +} diff --git a/src/dbnode/storage/bootstrap/bootstrapper/uninitialized/provider.go b/src/dbnode/storage/bootstrap/bootstrapper/uninitialized/provider.go new file mode 100644 index 0000000000..a47dd97f05 --- /dev/null +++ b/src/dbnode/storage/bootstrap/bootstrapper/uninitialized/provider.go @@ -0,0 +1,79 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package uninitialized + +import ( + "github.com/m3db/m3/src/dbnode/storage/bootstrap" + "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper" +) + +const ( + // UninitializedTopologyBootstrapperName is the name of the uninitialized bootstrapper. + UninitializedTopologyBootstrapperName = "uninitialized_topology" +) + +type uninitializedTopologyBootstrapperProvider struct { + opts Options + next bootstrap.BootstrapperProvider +} + +// NewuninitializedTopologyBootstrapperProvider creates a new uninitialized bootstrapper +// provider. +func NewuninitializedTopologyBootstrapperProvider( + opts Options, + next bootstrap.BootstrapperProvider, +) bootstrap.BootstrapperProvider { + return uninitializedTopologyBootstrapperProvider{ + opts: opts, + next: next, + } +} + +func (p uninitializedTopologyBootstrapperProvider) Provide() (bootstrap.Bootstrapper, error) { + var ( + src = newTopologyUninitializedSource(p.opts) + b = &uninitializedTopologyBootstrapper{} + next bootstrap.Bootstrapper + err error + ) + + if p.next != nil { + next, err = p.next.Provide() + if err != nil { + return nil, err + } + } + + return bootstrapper.NewBaseBootstrapper( + b.String(), src, p.opts.ResultOptions(), next) +} + +func (p uninitializedTopologyBootstrapperProvider) String() string { + return UninitializedTopologyBootstrapperName +} + +type uninitializedTopologyBootstrapper struct { + bootstrap.Bootstrapper +} + +func (*uninitializedTopologyBootstrapper) String() string { + return UninitializedTopologyBootstrapperName +} diff --git a/src/dbnode/storage/bootstrap/bootstrapper/uninitialized/source.go b/src/dbnode/storage/bootstrap/bootstrapper/uninitialized/source.go new file mode 100644 index 0000000000..0e40be5876 --- /dev/null +++ b/src/dbnode/storage/bootstrap/bootstrapper/uninitialized/source.go @@ -0,0 +1,178 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package uninitialized + +import ( + "github.com/m3db/m3/src/dbnode/storage/bootstrap" + "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" + "github.com/m3db/m3/src/dbnode/storage/namespace" + "github.com/m3db/m3/src/dbnode/topology" + "github.com/m3db/m3cluster/shard" +) + +// The purpose of the unitializedSource is to succeed bootstraps for any +// shard/time-ranges if the cluster they're associated with has never +// been completely initialized (is a new cluster). This is required for +// allowing us to configure the bootstrappers such that the commitlog +// bootstrapper can precede the peers bootstrapper and still succeed bootstraps +// for brand new namespaces without permitting unintentional data loss by +// putting the noop-all or noop-none bootstrappers at the end of the process. +// Behavior is best understood by reading the test cases for the test: +// TestUnitializedSourceAvailableDataAndAvailableIndex +type uninitializedTopologySource struct { + opts Options +} + +// newTopologyUninitializedSource creates a new uninitialized source. +func newTopologyUninitializedSource(opts Options) bootstrap.Source { + return &uninitializedTopologySource{ + opts: opts, + } +} + +func (s *uninitializedTopologySource) Can(strategy bootstrap.Strategy) bool { + switch strategy { + case bootstrap.BootstrapSequential: + return true + } + + return false +} + +func (s *uninitializedTopologySource) AvailableData( + ns namespace.Metadata, + shardsTimeRanges result.ShardTimeRanges, + runOpts bootstrap.RunOptions, +) result.ShardTimeRanges { + return s.availability(ns, shardsTimeRanges, runOpts) +} + +func (s *uninitializedTopologySource) AvailableIndex( + ns namespace.Metadata, + shardsTimeRanges result.ShardTimeRanges, + runOpts bootstrap.RunOptions, +) result.ShardTimeRanges { + return s.availability(ns, shardsTimeRanges, runOpts) +} + +func (s *uninitializedTopologySource) availability( + ns namespace.Metadata, + shardsTimeRanges result.ShardTimeRanges, + runOpts bootstrap.RunOptions, +) result.ShardTimeRanges { + var ( + topoState = runOpts.InitialTopologyState() + availableShardTimeRanges = result.ShardTimeRanges{} + ) + + for shardIDUint := range shardsTimeRanges { + shardID := topology.ShardID(shardIDUint) + hostShardStates, ok := topoState.ShardStates[shardID] + if !ok { + // This shard was not part of the topology when the bootstrapping + // process began. + continue + } + + // The basic idea for the algorithm is that on a shard-by-shard basis we + // need to determine if the cluster is "new" in the sense that it has + // never been completely initialized (reached a state where all the hosts + // in the topology are "available" for that specific shard). + // In order to determine this, we simply count the number of hosts in the + // "initializing" state. If this number is larger than zero, than the + // cluster is "new". + // The one exception to this case is when we perform topology changes and + // we end up with one extra node that is initializing which should be offset + // by the corresponding node that is leaving. I.E if numInitializing > 0 + // BUT numLeaving >= numInitializing then it is still not a new namespace. + // See the TestUnitializedSourceAvailableDataAndAvailableIndex test for more details. + var ( + numInitializing = 0 + numLeaving = 0 + ) + for _, hostState := range hostShardStates { + shardState := hostState.ShardState + switch shardState { + case shard.Initializing: + numInitializing++ + case shard.Leaving: + numLeaving++ + case shard.Available: + case shard.Unknown: + fallthrough + default: + // TODO(rartoul): Make this a hard error once we refactor the interface to support + // returning errors. + s.opts.InstrumentOptions().Logger().Errorf("unknown shard state: %v", shardState) + return result.ShardTimeRanges{} + } + } + + // This heuristic works for all scenarios except for if we tried to change the replication + // factor of a cluster that was already initialized. In that case, we might have to come + // up with a new heuristic, or simply require that the peers bootstrapper be configured as + // a bootstrapper if users want to change the replication factor dynamically, which is fine + // because otherwise you'd have to wait for one entire retention period for the replicaiton + // factor to actually increase correctly. + shardHasNeverBeenCompletelyInitialized := numInitializing-numLeaving > 0 + if shardHasNeverBeenCompletelyInitialized { + availableShardTimeRanges[shardIDUint] = shardsTimeRanges[shardIDUint] + } + } + + return availableShardTimeRanges +} + +func (s *uninitializedTopologySource) ReadData( + ns namespace.Metadata, + shardsTimeRanges result.ShardTimeRanges, + runOpts bootstrap.RunOptions, +) (result.DataBootstrapResult, error) { + var ( + availability = s.availability(ns, shardsTimeRanges, runOpts) + missing = shardsTimeRanges.Copy() + ) + missing.Subtract(availability) + + if missing.IsEmpty() { + return result.NewDataBootstrapResult(), nil + } + + return missing.ToUnfulfilledDataResult(), nil +} + +func (s *uninitializedTopologySource) ReadIndex( + ns namespace.Metadata, + shardsTimeRanges result.ShardTimeRanges, + runOpts bootstrap.RunOptions, +) (result.IndexBootstrapResult, error) { + var ( + availability = s.availability(ns, shardsTimeRanges, runOpts) + missing = shardsTimeRanges.Copy() + ) + missing.Subtract(availability) + + if missing.IsEmpty() { + return result.NewIndexBootstrapResult(), nil + } + + return missing.ToUnfulfilledIndexResult(), nil +} diff --git a/src/dbnode/storage/bootstrap/bootstrapper/uninitialized/source_test.go b/src/dbnode/storage/bootstrap/bootstrapper/uninitialized/source_test.go new file mode 100644 index 0000000000..be20411eea --- /dev/null +++ b/src/dbnode/storage/bootstrap/bootstrapper/uninitialized/source_test.go @@ -0,0 +1,206 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package uninitialized + +import ( + "testing" + "time" + + "github.com/m3db/m3/src/dbnode/storage/bootstrap" + "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" + "github.com/m3db/m3/src/dbnode/storage/namespace" + "github.com/m3db/m3/src/dbnode/topology" + tu "github.com/m3db/m3/src/dbnode/topology/testutil" + "github.com/m3db/m3cluster/shard" + "github.com/m3db/m3x/ident" + "github.com/m3db/m3x/instrument" + xtime "github.com/m3db/m3x/time" + + "github.com/stretchr/testify/require" +) + +var ( + testNamespaceID = ident.StringID("testnamespace") + testDefaultRunOpts = bootstrap.NewRunOptions() + notSelfID1 = "not-self-1" + notSelfID2 = "not-self-2" + notSelfID3 = "not-self-3" +) + +func TestUnitializedTopologySourceAvailableDataAndAvailableIndex(t *testing.T) { + var ( + blockSize = 2 * time.Hour + numShards = uint32(4) + blockStart = time.Now().Truncate(blockSize) + shardTimeRangesToBootstrap = result.ShardTimeRanges{} + bootstrapRanges = xtime.Ranges{}.AddRange(xtime.Range{ + Start: blockStart, + End: blockStart.Add(blockSize), + }) + ) + nsMetadata, err := namespace.NewMetadata(testNamespaceID, namespace.NewOptions()) + require.NoError(t, err) + + for i := 0; i < int(numShards); i++ { + shardTimeRangesToBootstrap[uint32(i)] = bootstrapRanges + } + + testCases := []struct { + title string + topoState *topology.StateSnapshot + shardsTimeRangesToBootstrap result.ShardTimeRanges + expectedAvailableShardsTimeRanges result.ShardTimeRanges + }{ + // Snould return that it can bootstrap everything because + // it's a new namespace. + { + title: "Single node - Shard initializing", + topoState: tu.NewStateSnapshot(1, tu.HostShardStates{ + tu.SelfID: tu.ShardsRange(0, numShards, shard.Initializing), + }), + shardsTimeRangesToBootstrap: shardTimeRangesToBootstrap, + expectedAvailableShardsTimeRanges: shardTimeRangesToBootstrap, + }, + // Snould return that it can't bootstrap anything because we don't + // know how to handle unknown shard states. + { + title: "Single node - Shard unknown", + topoState: tu.NewStateSnapshot(1, tu.HostShardStates{ + tu.SelfID: tu.ShardsRange(0, numShards, shard.Unknown), + }), + shardsTimeRangesToBootstrap: shardTimeRangesToBootstrap, + expectedAvailableShardsTimeRanges: result.ShardTimeRanges{}, + }, + // Snould return that it can't bootstrap anything because it's not + // a new namespace. + { + title: "Single node - Shard leaving", + topoState: tu.NewStateSnapshot(1, tu.HostShardStates{ + tu.SelfID: tu.ShardsRange(0, numShards, shard.Leaving), + }), + shardsTimeRangesToBootstrap: shardTimeRangesToBootstrap, + expectedAvailableShardsTimeRanges: result.ShardTimeRanges{}, + }, + // Snould return that it can't bootstrap anything because it's not + // a new namespace. + { + title: "Single node - Shard available", + topoState: tu.NewStateSnapshot(1, tu.HostShardStates{ + tu.SelfID: tu.ShardsRange(0, numShards, shard.Available), + }), + shardsTimeRangesToBootstrap: shardTimeRangesToBootstrap, + expectedAvailableShardsTimeRanges: result.ShardTimeRanges{}, + }, + // Snould return that it can bootstrap everything because + // it's a new namespace. + { + title: "Multi node - Brand new namespace (all nodes initializing)", + topoState: tu.NewStateSnapshot(2, tu.HostShardStates{ + tu.SelfID: tu.ShardsRange(0, numShards, shard.Initializing), + notSelfID1: tu.ShardsRange(0, numShards, shard.Initializing), + notSelfID2: tu.ShardsRange(0, numShards, shard.Initializing), + }), + shardsTimeRangesToBootstrap: shardTimeRangesToBootstrap, + expectedAvailableShardsTimeRanges: shardTimeRangesToBootstrap, + }, + // Snould return that it can bootstrap everything because + // it's a new namespace (one of the nodes hasn't completed + // initializing yet.) + { + title: "Multi node - Recently created namespace (one node still initializing)", + topoState: tu.NewStateSnapshot(2, tu.HostShardStates{ + tu.SelfID: tu.ShardsRange(0, numShards, shard.Initializing), + notSelfID1: tu.ShardsRange(0, numShards, shard.Available), + notSelfID2: tu.ShardsRange(0, numShards, shard.Available), + }), + shardsTimeRangesToBootstrap: shardTimeRangesToBootstrap, + expectedAvailableShardsTimeRanges: shardTimeRangesToBootstrap, + }, + // Snould return that it can't bootstrap anything because it's not + // a new namespace. + { + title: "Multi node - Initialized namespace (no nodes initializing)", + topoState: tu.NewStateSnapshot(2, tu.HostShardStates{ + tu.SelfID: tu.ShardsRange(0, numShards, shard.Available), + notSelfID1: tu.ShardsRange(0, numShards, shard.Available), + notSelfID2: tu.ShardsRange(0, numShards, shard.Available), + }), + shardsTimeRangesToBootstrap: shardTimeRangesToBootstrap, + expectedAvailableShardsTimeRanges: result.ShardTimeRanges{}, + }, + // Snould return that it can't bootstrap anything because it's not + // a new namespace, we're just doing a node replace. + { + title: "Multi node - Node replace (one node leaving, one initializing)", + topoState: tu.NewStateSnapshot(2, tu.HostShardStates{ + tu.SelfID: tu.ShardsRange(0, numShards, shard.Available), + notSelfID1: tu.ShardsRange(0, numShards, shard.Leaving), + notSelfID2: tu.ShardsRange(0, numShards, shard.Available), + notSelfID3: tu.ShardsRange(0, numShards, shard.Initializing), + }), + shardsTimeRangesToBootstrap: shardTimeRangesToBootstrap, + expectedAvailableShardsTimeRanges: result.ShardTimeRanges{}, + }, + // Snould return that it can't bootstrap anything because we don't + // know how to interpret the unknown host. + { + title: "Multi node - One node unknown", + topoState: tu.NewStateSnapshot(2, tu.HostShardStates{ + tu.SelfID: tu.ShardsRange(0, numShards, shard.Available), + notSelfID1: tu.ShardsRange(0, numShards, shard.Available), + notSelfID2: tu.ShardsRange(0, numShards, shard.Unknown), + }), + shardsTimeRangesToBootstrap: shardTimeRangesToBootstrap, + expectedAvailableShardsTimeRanges: result.ShardTimeRanges{}, + }, + } + + for _, tc := range testCases { + t.Run(tc.title, func(t *testing.T) { + + var ( + srcOpts = NewOptions().SetInstrumentOptions(instrument.NewOptions()) + src = newTopologyUninitializedSource(srcOpts) + runOpts = testDefaultRunOpts.SetInitialTopologyState(tc.topoState) + dataAvailabilityResult = src.AvailableData(nsMetadata, tc.shardsTimeRangesToBootstrap, runOpts) + indexAvailabilityResult = src.AvailableIndex(nsMetadata, tc.shardsTimeRangesToBootstrap, runOpts) + ) + + // Make sure AvailableData and AvailableIndex return the correct result + require.Equal(t, tc.expectedAvailableShardsTimeRanges, dataAvailabilityResult) + require.Equal(t, tc.expectedAvailableShardsTimeRanges, indexAvailabilityResult) + + // Make sure ReadData marks anything that AvailableData wouldn't return as unfulfilled + dataResult, err := src.ReadData(nsMetadata, tc.shardsTimeRangesToBootstrap, runOpts) + require.NoError(t, err) + expectedDataUnfulfilled := tc.shardsTimeRangesToBootstrap.Copy() + expectedDataUnfulfilled.Subtract(tc.expectedAvailableShardsTimeRanges) + require.Equal(t, expectedDataUnfulfilled, dataResult.Unfulfilled()) + + // Make sure ReadIndex marks anything that AvailableIndex wouldn't return as unfulfilled + indexResult, err := src.ReadIndex(nsMetadata, tc.shardsTimeRangesToBootstrap, runOpts) + require.NoError(t, err) + expectedIndexUnfulfilled := tc.shardsTimeRangesToBootstrap.Copy() + expectedIndexUnfulfilled.Subtract(tc.expectedAvailableShardsTimeRanges) + require.Equal(t, expectedIndexUnfulfilled, indexResult.Unfulfilled()) + }) + } +} diff --git a/src/dbnode/storage/bootstrap/bootstrapper/uninitialized/types.go b/src/dbnode/storage/bootstrap/bootstrapper/uninitialized/types.go new file mode 100644 index 0000000000..5e529ad6e0 --- /dev/null +++ b/src/dbnode/storage/bootstrap/bootstrapper/uninitialized/types.go @@ -0,0 +1,41 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package uninitialized + +import ( + "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" + "github.com/m3db/m3x/instrument" +) + +// Options is the options interface for the uninitialized source. +type Options interface { + // SetResultOptions sets the result options + SetResultOptions(value result.Options) Options + + // ResultOptions returns the result options + ResultOptions() result.Options + + // Set the instrument options. + SetInstrumentOptions(value instrument.Options) Options + + // Return the instrument options. + InstrumentOptions() instrument.Options +} diff --git a/src/dbnode/storage/bootstrap/process.go b/src/dbnode/storage/bootstrap/process.go index 09e391fa4d..9f184a11f6 100644 --- a/src/dbnode/storage/bootstrap/process.go +++ b/src/dbnode/storage/bootstrap/process.go @@ -132,7 +132,7 @@ func (b *bootstrapProcessProvider) newInitialTopologyState() (*topology.StateSna topologyState.ShardStates[shardID] = existing } - hostID := topology.HostID(hostShardSet.Host().String()) + hostID := topology.HostID(hostShardSet.Host().ID()) existing[hostID] = topology.HostShardState{ Host: hostShardSet.Host(), ShardState: currShard.State(), diff --git a/src/dbnode/storage/bootstrap/result/result_data_test.go b/src/dbnode/storage/bootstrap/result/result_data_test.go index 642af841a7..c65cb3819b 100644 --- a/src/dbnode/storage/bootstrap/result/result_data_test.go +++ b/src/dbnode/storage/bootstrap/result/result_data_test.go @@ -363,7 +363,7 @@ func TestShardTimeRangesCopy(t *testing.T) { assert.True(t, str.Equal(copied)) } -func TestShardTimeRangesToUnfulfilledResult(t *testing.T) { +func TestShardTimeRangesToUnfulfilledDataResult(t *testing.T) { str := ShardTimeRanges{ 0: xtime.NewRanges(xtime.Range{ Start: time.Now(), @@ -374,7 +374,7 @@ func TestShardTimeRangesToUnfulfilledResult(t *testing.T) { End: time.Now().Add(4 * time.Minute), }), } - r := str.ToUnfulfilledResult() + r := str.ToUnfulfilledDataResult() assert.Equal(t, 0, len(r.ShardResults())) assert.True(t, r.Unfulfilled().Equal(str)) } diff --git a/src/dbnode/storage/bootstrap/result/result_index_test.go b/src/dbnode/storage/bootstrap/result/result_index_test.go index 318bf3189a..eb05686729 100644 --- a/src/dbnode/storage/bootstrap/result/result_index_test.go +++ b/src/dbnode/storage/bootstrap/result/result_index_test.go @@ -135,6 +135,22 @@ func TestIndexResultAdd(t *testing.T) { require.Equal(t, testRanges, results.Unfulfilled()) } +func TestShardTimeRangesToUnfulfilledIndexResult(t *testing.T) { + str := ShardTimeRanges{ + 0: xtime.NewRanges(xtime.Range{ + Start: time.Now(), + End: time.Now().Add(time.Minute), + }), + 1: xtime.NewRanges(xtime.Range{ + Start: time.Now().Add(3 * time.Minute), + End: time.Now().Add(4 * time.Minute), + }), + } + r := str.ToUnfulfilledIndexResult() + assert.Equal(t, 0, len(r.IndexResults())) + assert.True(t, r.Unfulfilled().Equal(str)) +} + func TestIndexResulsMarkFulfilled(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() diff --git a/src/dbnode/storage/bootstrap/result/shard_ranges.go b/src/dbnode/storage/bootstrap/result/shard_ranges.go index 1e47e6741f..0ee87b8565 100644 --- a/src/dbnode/storage/bootstrap/result/shard_ranges.go +++ b/src/dbnode/storage/bootstrap/result/shard_ranges.go @@ -99,9 +99,9 @@ func (r ShardTimeRanges) AddRanges(other ShardTimeRanges) { } } -// ToUnfulfilledResult will return a result that is comprised of wholly +// ToUnfulfilledDataResult will return a result that is comprised of wholly // unfufilled time ranges from the set of shard time ranges. -func (r ShardTimeRanges) ToUnfulfilledResult() DataBootstrapResult { +func (r ShardTimeRanges) ToUnfulfilledDataResult() DataBootstrapResult { result := NewDataBootstrapResult() for shard, ranges := range r { result.Add(shard, nil, ranges) @@ -109,6 +109,14 @@ func (r ShardTimeRanges) ToUnfulfilledResult() DataBootstrapResult { return result } +// ToUnfulfilledIndexResult will return a result that is comprised of wholly +// unfufilled time ranges from the set of shard time ranges. +func (r ShardTimeRanges) ToUnfulfilledIndexResult() IndexBootstrapResult { + result := NewIndexBootstrapResult() + result.SetUnfulfilled(r) + return result +} + // Subtract will subtract another range from the current range. func (r ShardTimeRanges) Subtract(other ShardTimeRanges) { for shard, ranges := range r { diff --git a/src/dbnode/topology/testutil/topology.go b/src/dbnode/topology/testutil/topology.go index 446b0c6c3b..27df4ffbbc 100644 --- a/src/dbnode/topology/testutil/topology.go +++ b/src/dbnode/topology/testutil/topology.go @@ -28,6 +28,11 @@ import ( "github.com/m3db/m3cluster/shard" ) +const ( + // SelfID is the string used to represent the ID of the origin node. + SelfID = "self" +) + // MustNewTopologyMap returns a new topology.Map with provided parameters. // It's a utility method to make tests easier to write. func MustNewTopologyMap( @@ -101,3 +106,34 @@ func (v TopologyView) Map() (topology.Map, error) { return topology.NewStaticMap(opts), nil } + +// HostShardStates is a human-readable way of describing an initial state topology +// on a host-by-host basis. +type HostShardStates map[string][]shard.Shard + +// NewStateSnapshot creates a new initial topology state snapshot using HostShardStates +// as input. +func NewStateSnapshot(numMajorityReplicas int, hostShardStates HostShardStates) *topology.StateSnapshot { + topoState := &topology.StateSnapshot{ + Origin: topology.NewHost(SelfID, "127.0.0.1"), + MajorityReplicas: numMajorityReplicas, + ShardStates: make(map[topology.ShardID]map[topology.HostID]topology.HostShardState), + } + + for host, shards := range hostShardStates { + for _, shard := range shards { + hostShardStates, ok := topoState.ShardStates[topology.ShardID(shard.ID())] + if !ok { + hostShardStates = make(map[topology.HostID]topology.HostShardState) + } + + hostShardStates[topology.HostID(host)] = topology.HostShardState{ + Host: topology.NewHost(host, host+"address"), + ShardState: shard.State(), + } + topoState.ShardStates[topology.ShardID(shard.ID())] = hostShardStates + } + } + + return topoState +} diff --git a/src/query/benchmark/benchmarker/main/m3dbnode-local-config.yaml b/src/query/benchmark/benchmarker/main/m3dbnode-local-config.yaml index 9c8613fdf3..c161b7ada5 100644 --- a/src/query/benchmark/benchmarker/main/m3dbnode-local-config.yaml +++ b/src/query/benchmark/benchmarker/main/m3dbnode-local-config.yaml @@ -50,6 +50,8 @@ bootstrap: bootstrappers: - filesystem - commitlog + - peers + - uninitialized_topology fs: numProcessorsPerCPU: 0.125 diff --git a/src/query/benchmark/configs/m3db_config.yaml b/src/query/benchmark/configs/m3db_config.yaml index 70eaca9b9f..45c28e86c8 100644 --- a/src/query/benchmark/configs/m3db_config.yaml +++ b/src/query/benchmark/configs/m3db_config.yaml @@ -49,6 +49,8 @@ bootstrap: bootstrappers: - filesystem - commitlog + - peers + - uninitialized_topology fs: numProcessorsPerCPU: 0.125 diff --git a/src/query/benchmark/configs/multi_node_setup/m3dbnode-server1-config.yaml b/src/query/benchmark/configs/multi_node_setup/m3dbnode-server1-config.yaml index f92e5b92f9..8056f15006 100644 --- a/src/query/benchmark/configs/multi_node_setup/m3dbnode-server1-config.yaml +++ b/src/query/benchmark/configs/multi_node_setup/m3dbnode-server1-config.yaml @@ -48,6 +48,8 @@ bootstrap: bootstrappers: - filesystem - commitlog + - peers + - uninitialized_topology fs: numProcessorsPerCPU: 0.125 diff --git a/src/query/benchmark/configs/multi_node_setup/m3dbnode-server2-config.yaml b/src/query/benchmark/configs/multi_node_setup/m3dbnode-server2-config.yaml index fd913b50a7..e2710995e5 100644 --- a/src/query/benchmark/configs/multi_node_setup/m3dbnode-server2-config.yaml +++ b/src/query/benchmark/configs/multi_node_setup/m3dbnode-server2-config.yaml @@ -48,6 +48,8 @@ bootstrap: bootstrappers: - filesystem - commitlog + - peers + - uninitialized_topology fs: numProcessorsPerCPU: 0.125 diff --git a/src/query/benchmark/configs/multi_node_setup/m3dbnode-server3-config.yaml b/src/query/benchmark/configs/multi_node_setup/m3dbnode-server3-config.yaml index 2488a5e822..5aafa571c4 100644 --- a/src/query/benchmark/configs/multi_node_setup/m3dbnode-server3-config.yaml +++ b/src/query/benchmark/configs/multi_node_setup/m3dbnode-server3-config.yaml @@ -48,6 +48,8 @@ bootstrap: bootstrappers: - filesystem - commitlog + - peers + - uninitialized_topology fs: numProcessorsPerCPU: 0.125