Skip to content

Commit

Permalink
[dbnode] Add ability to force repair regardless namespace has option …
Browse files Browse the repository at this point in the history
…set and add compare only repair type (#3550)
  • Loading branch information
robskillington authored Jun 10, 2021
1 parent 5f0956a commit d95a5df
Show file tree
Hide file tree
Showing 14 changed files with 439 additions and 28 deletions.
20 changes: 15 additions & 5 deletions src/cmd/services/m3dbnode/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,22 @@ import (
"strings"
"time"

"github.com/m3dbx/vellum/regexp"
"go.etcd.io/etcd/embed"
"go.etcd.io/etcd/pkg/transport"
"go.etcd.io/etcd/pkg/types"

coordinatorcfg "github.com/m3db/m3/src/cmd/services/m3query/config"
"github.com/m3db/m3/src/dbnode/client"
"github.com/m3db/m3/src/dbnode/discovery"
"github.com/m3db/m3/src/dbnode/environment"
"github.com/m3db/m3/src/dbnode/storage/repair"
"github.com/m3db/m3/src/dbnode/storage/series"
"github.com/m3db/m3/src/x/config/hostid"
"github.com/m3db/m3/src/x/debug/config"
"github.com/m3db/m3/src/x/instrument"
xlog "github.com/m3db/m3/src/x/log"
"github.com/m3db/m3/src/x/opentracing"

"github.com/m3dbx/vellum/regexp"
"go.etcd.io/etcd/embed"
"go.etcd.io/etcd/pkg/transport"
"go.etcd.io/etcd/pkg/types"
)

const (
Expand Down Expand Up @@ -528,11 +529,20 @@ type CommitLogQueuePolicy struct {
Size int `yaml:"size" validate:"nonzero"`
}

// RepairPolicyMode is the repair policy mode.
type RepairPolicyMode uint

// RepairPolicy is the repair policy.
type RepairPolicy struct {
// Enabled or disabled.
Enabled bool `yaml:"enabled"`

// Type is the type of repair to run.
Type repair.Type `yaml:"type"`

// Force the repair to run regardless of whether namespaces have repair enabled or not.
Force bool `yaml:"force"`

// The repair throttle.
Throttle time.Duration `yaml:"throttle"`

Expand Down
2 changes: 2 additions & 0 deletions src/cmd/services/m3dbnode/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,8 @@ func TestConfiguration(t *testing.T) {
queueChannel: null
repair:
enabled: false
type: 0
force: false
throttle: 2m0s
checkInterval: 1m0s
debugShadowComparisonsEnabled: false
Expand Down
15 changes: 11 additions & 4 deletions src/dbnode/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/uber-go/tally"
"go.uber.org/zap"

"github.com/m3db/m3/src/cluster/shard"
"github.com/m3db/m3/src/dbnode/client"
"github.com/m3db/m3/src/dbnode/integration/generate"
Expand All @@ -43,6 +47,7 @@ import (
"github.com/m3db/m3/src/dbnode/storage/bootstrap/result"
"github.com/m3db/m3/src/dbnode/storage/index"
"github.com/m3db/m3/src/dbnode/storage/index/compaction"
"github.com/m3db/m3/src/dbnode/storage/repair"
"github.com/m3db/m3/src/dbnode/topology"
"github.com/m3db/m3/src/dbnode/topology/testutil"
xmetrics "github.com/m3db/m3/src/dbnode/x/metrics"
Expand All @@ -53,10 +58,6 @@ import (
"github.com/m3db/m3/src/x/instrument"
xretry "github.com/m3db/m3/src/x/retry"
xtime "github.com/m3db/m3/src/x/time"

"github.com/stretchr/testify/require"
"github.com/uber-go/tally"
"go.uber.org/zap"
)

const (
Expand Down Expand Up @@ -131,6 +132,8 @@ type BootstrappableTestSetupOptions struct {
DisablePeersBootstrapper bool
UseTChannelClientForWriting bool
EnableRepairs bool
ForceRepairs bool
RepairType repair.Type
AdminClientCustomOpts []client.CustomAdminOption
}

Expand Down Expand Up @@ -180,6 +183,8 @@ func NewDefaultBootstrappableTestSetups( // nolint:gocyclo
topologyInitializer = setupOpts[i].TopologyInitializer
testStatsReporter = setupOpts[i].TestStatsReporter
enableRepairs = setupOpts[i].EnableRepairs
forceRepairs = setupOpts[i].ForceRepairs
repairType = setupOpts[i].RepairType
origin topology.Host
instanceOpts = newMultiAddrTestOptions(opts, instance)
adminClientCustomOpts = setupOpts[i].AdminClientCustomOpts
Expand Down Expand Up @@ -346,6 +351,8 @@ func NewDefaultBootstrappableTestSetups( // nolint:gocyclo
SetRepairEnabled(true).
SetRepairOptions(
setup.StorageOpts().RepairOptions().
SetType(repairType).
SetForce(forceRepairs).
SetRepairThrottle(time.Millisecond).
SetRepairCheckInterval(time.Millisecond).
SetAdminClients([]client.AdminClient{adminClient}).
Expand Down
204 changes: 204 additions & 0 deletions src/dbnode/integration/repair_force_only_compare_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
// +build integration

// Copyright (c) 2021 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package integration

import (
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/uber-go/tally"

"github.com/m3db/m3/src/dbnode/integration/generate"
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/retention"
"github.com/m3db/m3/src/dbnode/storage/repair"
xtest "github.com/m3db/m3/src/x/test"
xtime "github.com/m3db/m3/src/x/time"
)

func TestRepairForceAndOnlyCompare(t *testing.T) {
if testing.Short() {
t.SkipNow()
}

// Test both disjoint and shared series repair.
genRepairData := genRepairDatafn(func(now xtime.UnixNano, blockSize time.Duration) (
node0Data generate.SeriesBlocksByStart,
node1Data generate.SeriesBlocksByStart,
node2Data generate.SeriesBlocksByStart,
allData generate.SeriesBlocksByStart,
) {
currBlockStart := now.Truncate(blockSize)
node0Data = generate.BlocksByStart([]generate.BlockConfig{
{IDs: []string{"foo"}, NumPoints: 90, Start: currBlockStart.Add(-4 * blockSize)},
{IDs: []string{"foo", "baz"}, NumPoints: 90, Start: currBlockStart.Add(-3 * blockSize)},
})
node1Data = generate.BlocksByStart([]generate.BlockConfig{
{IDs: []string{"bar"}, NumPoints: 90, Start: currBlockStart.Add(-4 * blockSize)},
{IDs: []string{"foo", "baz"}, NumPoints: 90, Start: currBlockStart.Add(-3 * blockSize)},
})

allData = make(map[xtime.UnixNano]generate.SeriesBlock)
for start, data := range node0Data {
allData[start] = append(allData[start], data...)
}
for start, data := range node1Data {
allData[start] = append(allData[start], data...)
}
for start, data := range node2Data {
allData[start] = append(allData[start], data...)
}

return node0Data, node1Data, node2Data, allData
})

// Test setups.
log := xtest.NewLogger(t)
retentionOpts := retention.NewOptions().
SetRetentionPeriod(20 * time.Hour).
SetBlockSize(2 * time.Hour).
SetBufferPast(10 * time.Minute).
SetBufferFuture(2 * time.Minute)
nsOpts := namespace.NewOptions().
// Test needing to force enable repairs.
SetRepairEnabled(false).
SetRetentionOptions(retentionOpts)
namesp, err := namespace.NewMetadata(testNamespaces[0], nsOpts)
require.NoError(t, err)
opts := NewTestOptions(t).
SetNamespaces([]namespace.Metadata{namesp}).
// Use TChannel clients for writing / reading because we want to target individual nodes at a time
// and not write/read all nodes in the cluster.
SetUseTChannelClientForWriting(true).
SetUseTChannelClientForReading(true)

setupOpts := []BootstrappableTestSetupOptions{
{
DisablePeersBootstrapper: true,
EnableRepairs: true,
// Test forcing repair of type compare only repair.
ForceRepairs: true,
RepairType: repair.OnlyCompareRepair,
},
{
DisablePeersBootstrapper: true,
EnableRepairs: true,
// Test forcing repair of type compare only repair.
ForceRepairs: true,
RepairType: repair.OnlyCompareRepair,
},
{
DisablePeersBootstrapper: true,
EnableRepairs: true,
// Test forcing repair of type compare only repair.
ForceRepairs: true,
RepairType: repair.OnlyCompareRepair,
},
}

// nolint: govet
setups, closeFn := NewDefaultBootstrappableTestSetups(t, opts, setupOpts)
defer closeFn()

// Ensure that the current time is set such that the previous block is flushable.
blockSize := retentionOpts.BlockSize()
now := setups[0].NowFn()().Truncate(blockSize).Add(retentionOpts.BufferPast()).Add(time.Second)
for _, setup := range setups {
setup.SetNowFn(now)
}

node0Data, node1Data, node2Data, _ := genRepairData(now, blockSize)
if node0Data != nil {
require.NoError(t, writeTestDataToDisk(namesp, setups[0], node0Data, 0))
}
if node1Data != nil {
require.NoError(t, writeTestDataToDisk(namesp, setups[1], node1Data, 0))
}
if node2Data != nil {
require.NoError(t, writeTestDataToDisk(namesp, setups[2], node2Data, 0))
}

// Start the servers with filesystem bootstrappers.
setups.parallel(func(s TestSetup) {
if err := s.StartServer(); err != nil {
panic(err)
}
})
log.Debug("servers are now up")

// Stop the servers.
defer func() {
setups.parallel(func(s TestSetup) {
require.NoError(t, s.StopServer())
})
log.Debug("servers are now down")
}()

// Wait for repairs to occur at least once per node.
log.Debug("waiting for repairs to run")
var runSuccessPerNodeCounters []tally.CounterSnapshot
require.True(t, waitUntil(func() bool {
var successCounters []tally.CounterSnapshot
for _, setup := range setups {
scope := setup.Scope()
for _, v := range scope.Snapshot().Counters() {
if v.Name() != "repair.run" {
continue
}
repairType, ok := v.Tags()["repair_type"]
if !ok || repairType != "only_compare" {
continue
}
if v.Value() > 0 {
successCounters = append(successCounters, v)
break
}
}
}

// Check if all counters are success.
successAll := len(successCounters) == len(setups)
if successAll {
runSuccessPerNodeCounters = successCounters
return true
}
return false
}, 60*time.Second))

// Verify that the repair runs only ran comparisons without repairing data.
log.Debug("verifying repairs that ran")
require.Equal(t, len(setups), len(runSuccessPerNodeCounters),
"unexpected number of successful nodes ran repairs")
for _, counter := range runSuccessPerNodeCounters {
repairType, ok := counter.Tags()["repair_type"]
require.True(t, ok)
require.Equal(t, "only_compare", repairType)
require.True(t, counter.Value() > 0)
}

// Verify data did not change (repair type is compare only).
verifySeriesMaps(t, setups[0], namesp.ID(), node0Data)
verifySeriesMaps(t, setups[1], namesp.ID(), node1Data)
verifySeriesMaps(t, setups[2], namesp.ID(), node2Data)
}
4 changes: 3 additions & 1 deletion src/dbnode/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -921,8 +921,10 @@ func Run(runOpts RunOptions) {
repairOpts := opts.RepairOptions().
SetAdminClients(repairClients)

if cfg.Repair != nil {
if repairCfg := cfg.Repair; repairCfg != nil {
repairOpts = repairOpts.
SetType(repairCfg.Type).
SetForce(repairCfg.Force).
SetResultOptions(rsOpts).
SetDebugShadowComparisonsEnabled(cfg.Repair.DebugShadowComparisonsEnabled)
if cfg.Repair.Throttle > 0 {
Expand Down
20 changes: 17 additions & 3 deletions src/dbnode/storage/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ type dbNamespace struct {
snapshotFilesFn snapshotFilesFn
log *zap.Logger
bootstrapState BootstrapState
repairsAny bool

// schemaDescr caches the latest schema for the namespace.
// schemaDescr is updated whenever schema registry is updated.
Expand Down Expand Up @@ -1272,11 +1273,12 @@ func (n *dbNamespace) ColdFlush(flushPersist persist.FlushPreparer) error {
return errNamespaceNotBootstrapped
}
nsCtx := n.nsContextWithRLock()
repairsAny := n.repairsAny
n.RUnlock()

// If repair is enabled we still need cold flush regardless of whether cold writes is
// If repair has run we still need cold flush regardless of whether cold writes is
// enabled since repairs are dependent on the cold flushing logic.
enabled := n.nopts.ColdWritesEnabled() || n.nopts.RepairEnabled()
enabled := n.nopts.ColdWritesEnabled() || repairsAny
if n.ReadOnly() || !enabled {
n.metrics.flushColdData.ReportSuccess(n.nowFn().Sub(callStart))
return nil
Expand Down Expand Up @@ -1499,11 +1501,23 @@ func (n *dbNamespace) Truncate() (int64, error) {
func (n *dbNamespace) Repair(
repairer databaseShardRepairer,
tr xtime.Range,
opts NamespaceRepairOptions,
) error {
if !n.nopts.RepairEnabled() {
shouldRun := opts.Force || n.nopts.RepairEnabled()
if !shouldRun {
return nil
}

n.RLock()
repairsAny := n.repairsAny
n.RUnlock()
if !repairsAny {
// Only acquire write lock if required.
n.Lock()
n.repairsAny = true
n.Unlock()
}

var (
wg sync.WaitGroup
mutex sync.Mutex
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/storage/namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -866,7 +866,7 @@ func TestNamespaceRepair(t *testing.T) {
ns.shards[testShardIDs[i].ID()] = shard
}

require.Equal(t, "foo", ns.Repair(repairer, repairTimeRange).Error())
require.Equal(t, "foo", ns.Repair(repairer, repairTimeRange, NamespaceRepairOptions{}).Error())
}

func TestNamespaceShardAt(t *testing.T) {
Expand Down
Loading

0 comments on commit d95a5df

Please sign in to comment.