Skip to content

Commit

Permalink
Revert "[dbnode] Background cold flush process (#2460)"
Browse files Browse the repository at this point in the history
This reverts commit 3ac68a2.
  • Loading branch information
robskillington committed Aug 1, 2020
1 parent a9cba02 commit 6830a8c
Show file tree
Hide file tree
Showing 28 changed files with 328 additions and 957 deletions.
13 changes: 3 additions & 10 deletions src/dbnode/integration/disk_flush_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
"github.com/m3db/m3/src/x/ident/testutil"
xtime "github.com/m3db/m3/src/x/time"

"github.com/pborman/uuid"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -85,8 +84,7 @@ func waitUntilSnapshotFilesFlushed(
namespace ident.ID,
expectedSnapshots []snapshotID,
timeout time.Duration,
) (uuid.UUID, error) {
var snapshotID uuid.UUID
) error {
dataFlushed := func() bool {
for _, shard := range shardSet.AllIDs() {
for _, e := range expectedSnapshots {
Expand All @@ -104,19 +102,14 @@ func waitUntilSnapshotFilesFlushed(
if !(latest.ID.VolumeIndex >= e.minVolume) {
return false
}

_, snapshotID, err = latest.SnapshotTimeAndID()
if err != nil {
panic(err)
}
}
}
return true
}
if waitUntil(dataFlushed, timeout) {
return snapshotID, nil
return nil
}
return snapshotID, errDiskFlushTimedOut
return errDiskFlushTimedOut
}

func waitUntilDataFilesFlushed(
Expand Down
12 changes: 5 additions & 7 deletions src/dbnode/integration/disk_snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ func TestDiskSnapshotSimple(t *testing.T) {
maxWaitTime := time.Minute
for i, ns := range testSetup.Namespaces() {
log.Info("waiting for snapshot files to flush")
_, err := waitUntilSnapshotFilesFlushed(filePathPrefix, shardSet, ns.ID(), snapshotsToWaitForByNS[i], maxWaitTime)
require.NoError(t, err)
require.NoError(t, waitUntilSnapshotFilesFlushed(
filePathPrefix, shardSet, ns.ID(), snapshotsToWaitForByNS[i], maxWaitTime))
log.Info("verifying snapshot files")
verifySnapshottedDataFiles(t, shardSet, testSetup.StorageOpts(), ns.ID(), seriesMaps)
}
Expand All @@ -167,17 +167,15 @@ func TestDiskSnapshotSimple(t *testing.T) {
for _, ns := range testSetup.Namespaces() {
log.Info("waiting for new snapshot files to be written out")
snapshotsToWaitFor := []snapshotID{{blockStart: newTime.Truncate(blockSize)}}
// NB(bodu): We need to check if a specific snapshot ID was deleted since snapshotting logic now changed
// to always snapshotting every block start w/in retention.
snapshotID, err := waitUntilSnapshotFilesFlushed(filePathPrefix, shardSet, ns.ID(), snapshotsToWaitFor, maxWaitTime)
require.NoError(t, err)
require.NoError(t, waitUntilSnapshotFilesFlushed(
filePathPrefix, shardSet, ns.ID(), snapshotsToWaitFor, maxWaitTime))
log.Info("waiting for old snapshot files to be deleted")
for _, shard := range shardSet.All() {
waitUntil(func() bool {
// Increase the time each check to ensure that the filesystem processes are able to progress (some
// of them throttle themselves based on time elapsed since the previous time.)
testSetup.SetNowFn(testSetup.NowFn()().Add(10 * time.Second))
exists, err := fs.SnapshotFileSetExistsAt(filePathPrefix, ns.ID(), snapshotID, shard.ID(), oldTime.Truncate(blockSize))
exists, err := fs.SnapshotFileSetExistsAt(filePathPrefix, ns.ID(), shard.ID(), oldTime.Truncate(blockSize))
require.NoError(t, err)
return !exists
}, maxWaitTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import (
"testing"
"time"

"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/retention"
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/x/context"
xtime "github.com/m3db/m3/src/x/time"
"go.uber.org/zap"
Expand Down Expand Up @@ -228,7 +228,7 @@ func TestFsCommitLogMixedModeReadWriteProp(t *testing.T) {
} else {
snapshotBlock = now.Truncate(ns1BlockSize).Add(-ns1BlockSize)
}
_, err := waitUntilSnapshotFilesFlushed(
err := waitUntilSnapshotFilesFlushed(
filePathPrefix,
setup.ShardSet(),
nsID,
Expand Down
19 changes: 2 additions & 17 deletions src/dbnode/persist/fs/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -1441,32 +1441,17 @@ func DataFileSetExists(
}

// SnapshotFileSetExistsAt determines whether snapshot fileset files exist for the given namespace, shard, and block start time.
func SnapshotFileSetExistsAt(
prefix string,
namespace ident.ID,
snapshotID uuid.UUID,
shard uint32,
blockStart time.Time,
) (bool, error) {
func SnapshotFileSetExistsAt(prefix string, namespace ident.ID, shard uint32, blockStart time.Time) (bool, error) {
snapshotFiles, err := SnapshotFiles(prefix, namespace, shard)
if err != nil {
return false, err
}

latest, ok := snapshotFiles.LatestVolumeForBlock(blockStart)
_, ok := snapshotFiles.LatestVolumeForBlock(blockStart)
if !ok {
return false, nil
}

_, latestSnapshotID, err := latest.SnapshotTimeAndID()
if err != nil {
return false, err
}

if !uuid.Equal(latestSnapshotID, snapshotID) {
return false, nil
}

// LatestVolumeForBlock checks for a complete checkpoint file, so we don't
// need to recheck it here.
return true, nil
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/persist/fs/files_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -889,7 +889,7 @@ func TestSnapshotFileSetExistsAt(t *testing.T) {

writeOutTestSnapshot(t, dir, shard, ts, 0)

exists, err := SnapshotFileSetExistsAt(dir, testNs1ID, testSnapshotID, shard, ts)
exists, err := SnapshotFileSetExistsAt(dir, testNs1ID, shard, ts)
require.NoError(t, err)
require.True(t, exists)
}
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/storage/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (m *bootstrapManager) Bootstrap() (BootstrapResult, error) {

// NB(xichen): disable filesystem manager before we bootstrap to minimize
// the impact of file operations on bootstrapping performance
m.mediator.DisableFileOpsAndWait()
m.mediator.DisableFileOps()
defer m.mediator.EnableFileOps()

// Keep performing bootstraps until none pending and no error returned.
Expand Down
33 changes: 2 additions & 31 deletions src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,30 +677,6 @@ func (s *commitLogSource) bootstrapShardSnapshots(
blockSize time.Duration,
mostRecentCompleteSnapshotByBlockShard map[xtime.UnixNano]map[uint32]fs.FileSetFile,
) error {
// NB(bodu): We use info files on disk to check if a snapshot should be loaded in as cold or warm.
// We do this instead of cross refing blockstarts and current time to handle the case of bootstrapping a
// once warm block start after a node has been shut down for a long time. We consider all block starts we
// haven't flushed data for yet a warm block start.
fsOpts := s.opts.CommitLogOptions().FilesystemOptions()
readInfoFilesResults := fs.ReadInfoFiles(fsOpts.FilePathPrefix(), ns.ID(), shard,
fsOpts.InfoReaderBufferSize(), fsOpts.DecodingOptions())
shardBlockStartsOnDisk := make(map[xtime.UnixNano]struct{})
for _, result := range readInfoFilesResults {
if err := result.Err.Error(); err != nil {
// If we couldn't read the info files then keep going to be consistent
// with the way the db shard updates its flush states in UpdateFlushStates().
s.log.Error("unable to read info files in commit log bootstrap",
zap.Uint32("shard", shard),
zap.Stringer("namespace", ns.ID()),
zap.String("filepath", result.Err.Filepath()),
zap.Error(err))
continue
}
info := result.Info
at := xtime.FromNanoseconds(info.BlockStart)
shardBlockStartsOnDisk[xtime.ToUnixNano(at)] = struct{}{}
}

rangeIter := shardTimeRanges.Iter()
for rangeIter.Next() {
var (
Expand Down Expand Up @@ -733,13 +709,9 @@ func (s *commitLogSource) bootstrapShardSnapshots(
continue
}

writeType := series.WarmWrite
if _, ok := shardBlockStartsOnDisk[xtime.ToUnixNano(blockStart)]; ok {
writeType = series.ColdWrite
}
if err := s.bootstrapShardBlockSnapshot(
ns, accumulator, shard, blockStart, blockSize,
mostRecentCompleteSnapshotForShardBlock, writeType); err != nil {
mostRecentCompleteSnapshotForShardBlock); err != nil {
return err
}
}
Expand All @@ -755,7 +727,6 @@ func (s *commitLogSource) bootstrapShardBlockSnapshot(
blockStart time.Time,
blockSize time.Duration,
mostRecentCompleteSnapshot fs.FileSetFile,
writeType series.WriteType,
) error {
var (
bOpts = s.opts.ResultOptions()
Expand Down Expand Up @@ -835,7 +806,7 @@ func (s *commitLogSource) bootstrapShardBlockSnapshot(
}

// Load into series.
if err := ref.Series.LoadBlock(dbBlock, writeType); err != nil {
if err := ref.Series.LoadBlock(dbBlock, series.WarmWrite); err != nil {
return err
}

Expand Down
6 changes: 3 additions & 3 deletions src/dbnode/storage/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestDatabaseBootstrapWithBootstrapError(t *testing.T) {
db.EXPECT().OwnedNamespaces().Return(namespaces, nil)

m := NewMockdatabaseMediator(ctrl)
m.EXPECT().DisableFileOpsAndWait()
m.EXPECT().DisableFileOps()
m.EXPECT().EnableFileOps().AnyTimes()

bsm := newBootstrapManager(db, m, opts).(*bootstrapManager)
Expand Down Expand Up @@ -101,7 +101,7 @@ func TestDatabaseBootstrapSubsequentCallsQueued(t *testing.T) {
}))

m := NewMockdatabaseMediator(ctrl)
m.EXPECT().DisableFileOpsAndWait()
m.EXPECT().DisableFileOps()
m.EXPECT().EnableFileOps().AnyTimes()

db := NewMockdatabase(ctrl)
Expand Down Expand Up @@ -159,7 +159,7 @@ func TestDatabaseBootstrapBootstrapHooks(t *testing.T) {
}))

m := NewMockdatabaseMediator(ctrl)
m.EXPECT().DisableFileOpsAndWait()
m.EXPECT().DisableFileOps()
m.EXPECT().EnableFileOps().AnyTimes()

db := NewMockdatabase(ctrl)
Expand Down
75 changes: 20 additions & 55 deletions src/dbnode/storage/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,13 @@ type cleanupManager struct {

deleteFilesFn deleteFilesFn
deleteInactiveDirectoriesFn deleteInactiveDirectoriesFn
warmFlushCleanupInProgress bool
coldFlushCleanupInProgress bool
cleanupInProgress bool
metrics cleanupManagerMetrics
logger *zap.Logger
}

type cleanupManagerMetrics struct {
warmFlushCleanupStatus tally.Gauge
coldFlushCleanupStatus tally.Gauge
status tally.Gauge
corruptCommitlogFile tally.Counter
corruptSnapshotFile tally.Counter
corruptSnapshotMetadataFile tally.Counter
Expand All @@ -92,8 +90,7 @@ func newCleanupManagerMetrics(scope tally.Scope) cleanupManagerMetrics {
sScope := scope.SubScope("snapshot")
smScope := scope.SubScope("snapshot-metadata")
return cleanupManagerMetrics{
warmFlushCleanupStatus: scope.Gauge("warm-flush-cleanup"),
coldFlushCleanupStatus: scope.Gauge("cold-flush-cleanup"),
status: scope.Gauge("cleanup"),
corruptCommitlogFile: clScope.Counter("corrupt"),
corruptSnapshotFile: sScope.Counter("corrupt"),
corruptSnapshotMetadataFile: smScope.Counter("corrupt"),
Expand Down Expand Up @@ -127,20 +124,20 @@ func newCleanupManager(
}
}

func (m *cleanupManager) WarmFlushCleanup(t time.Time, isBootstrapped bool) error {
func (m *cleanupManager) Cleanup(t time.Time, isBootstrapped bool) error {
// Don't perform any cleanup if we are not boostrapped yet.
if !isBootstrapped {
m.logger.Debug("database is still bootstrapping, terminating cleanup")
return nil
}

m.Lock()
m.warmFlushCleanupInProgress = true
m.cleanupInProgress = true
m.Unlock()

defer func() {
m.Lock()
m.warmFlushCleanupInProgress = false
m.cleanupInProgress = false
m.Unlock()
}()

Expand All @@ -150,6 +147,11 @@ func (m *cleanupManager) WarmFlushCleanup(t time.Time, isBootstrapped bool) erro
}

multiErr := xerrors.NewMultiError()
if err := m.cleanupDataFiles(t, namespaces); err != nil {
multiErr = multiErr.Add(fmt.Errorf(
"encountered errors when cleaning up data files for %v: %v", t, err))
}

if err := m.cleanupExpiredIndexFiles(t, namespaces); err != nil {
multiErr = multiErr.Add(fmt.Errorf(
"encountered errors when cleaning up index files for %v: %v", t, err))
Expand All @@ -160,6 +162,11 @@ func (m *cleanupManager) WarmFlushCleanup(t time.Time, isBootstrapped bool) erro
"encountered errors when cleaning up index files for %v: %v", t, err))
}

if err := m.deleteInactiveDataFiles(namespaces); err != nil {
multiErr = multiErr.Add(fmt.Errorf(
"encountered errors when deleting inactive data files for %v: %v", t, err))
}

if err := m.deleteInactiveDataSnapshotFiles(namespaces); err != nil {
multiErr = multiErr.Add(fmt.Errorf(
"encountered errors when deleting inactive snapshot files for %v: %v", t, err))
Expand All @@ -178,57 +185,15 @@ func (m *cleanupManager) WarmFlushCleanup(t time.Time, isBootstrapped bool) erro
return multiErr.FinalError()
}

func (m *cleanupManager) ColdFlushCleanup(t time.Time, isBootstrapped bool) error {
// Don't perform any cleanup if we are not boostrapped yet.
if !isBootstrapped {
m.logger.Debug("database is still bootstrapping, terminating cleanup")
return nil
}

m.Lock()
m.coldFlushCleanupInProgress = true
m.Unlock()

defer func() {
m.Lock()
m.coldFlushCleanupInProgress = false
m.Unlock()
}()

namespaces, err := m.database.OwnedNamespaces()
if err != nil {
return err
}

multiErr := xerrors.NewMultiError()
if err := m.cleanupDataFiles(t, namespaces); err != nil {
multiErr = multiErr.Add(fmt.Errorf(
"encountered errors when cleaning up data files for %v: %v", t, err))
}

if err := m.deleteInactiveDataFiles(namespaces); err != nil {
multiErr = multiErr.Add(fmt.Errorf(
"encountered errors when deleting inactive data files for %v: %v", t, err))
}

return multiErr.FinalError()
}
func (m *cleanupManager) Report() {
m.RLock()
coldFlushCleanupInProgress := m.coldFlushCleanupInProgress
warmFlushCleanupInProgress := m.warmFlushCleanupInProgress
cleanupInProgress := m.cleanupInProgress
m.RUnlock()

if coldFlushCleanupInProgress {
m.metrics.coldFlushCleanupStatus.Update(1)
} else {
m.metrics.coldFlushCleanupStatus.Update(0)
}

if warmFlushCleanupInProgress {
m.metrics.warmFlushCleanupStatus.Update(1)
if cleanupInProgress {
m.metrics.status.Update(1)
} else {
m.metrics.warmFlushCleanupStatus.Update(0)
m.metrics.status.Update(0)
}
}

Expand Down
Loading

0 comments on commit 6830a8c

Please sign in to comment.