Skip to content

Commit

Permalink
br: retry to flush checkpoint (#56427)
Browse files Browse the repository at this point in the history
close #56394
  • Loading branch information
Leavrth authored Oct 8, 2024
1 parent df3a0fb commit 940e29c
Show file tree
Hide file tree
Showing 6 changed files with 183 additions and 20 deletions.
3 changes: 2 additions & 1 deletion br/pkg/checkpoint/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ go_test(
srcs = ["checkpoint_test.go"],
flaky = True,
race = "on",
shard_count = 6,
shard_count = 8,
deps = [
":checkpoint",
"//br/pkg/gluetidb",
Expand All @@ -54,6 +54,7 @@ go_test(
"//br/pkg/utiltest",
"//pkg/meta/model",
"//pkg/parser/model",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_pingcap_kvproto//pkg/encryptionpb",
"@com_github_stretchr_testify//require",
Expand Down
5 changes: 3 additions & 2 deletions br/pkg/checkpoint/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func StartCheckpointBackupRunnerForTest(
runner := newCheckpointRunner[BackupKeyType, BackupValueType](
checkpointStorage, cipher, valueMarshalerForBackup)

runner.startCheckpointMainLoop(ctx, tick, tick, tick)
runner.startCheckpointMainLoop(ctx, tick, tick, tick, tick)
return runner, nil
}

Expand All @@ -84,8 +84,9 @@ func StartCheckpointRunnerForBackup(
runner.startCheckpointMainLoop(
ctx,
defaultTickDurationForFlush,
defaultTckDurationForChecksum,
defaultTickDurationForChecksum,
defaultTickDurationForLock,
defaultRetryDuration,
)
return runner, nil
}
Expand Down
93 changes: 81 additions & 12 deletions br/pkg/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,12 @@ const MaxChecksumTotalCost float64 = 60.0

const defaultTickDurationForFlush = 30 * time.Second

const defaultTckDurationForChecksum = 5 * time.Second
const defaultTickDurationForChecksum = 5 * time.Second

const defaultTickDurationForLock = 4 * time.Minute

const defaultRetryDuration = 3 * time.Second

const lockTimeToLive = 5 * time.Minute

type KeyType interface {
Expand Down Expand Up @@ -305,12 +307,80 @@ func (r *CheckpointRunner[K, V]) setLock(ctx context.Context, errCh chan error)
return nil
}

type flusher[K KeyType, V ValueType] struct {
incompleteMetas []map[K]*RangeGroup[K, V]
incompleteChecksums []ChecksumItems
}

func newFlusher[K KeyType, V ValueType]() *flusher[K, V] {
return &flusher[K, V]{
incompleteMetas: make([]map[K]*RangeGroup[K, V], 0),
incompleteChecksums: make([]ChecksumItems, 0),
}
}

func (f *flusher[K, V]) doFlush(ctx context.Context, r *CheckpointRunner[K, V], meta map[K]*RangeGroup[K, V]) {
if err := r.doFlush(ctx, meta); err != nil {
log.Warn("failed to flush checkpoint data", zap.Error(err))
f.incompleteMetas = append(f.incompleteMetas, meta)
}
}

func (f *flusher[K, V]) doChecksumFlush(ctx context.Context, r *CheckpointRunner[K, V], checksums ChecksumItems) {
if err := r.doChecksumFlush(ctx, checksums); err != nil {
log.Warn("failed to flush checkpoint checksum", zap.Error(err))
f.incompleteChecksums = append(f.incompleteChecksums, checksums)
}
}

func (f *flusher[K, V]) flushOneIncomplete(ctx context.Context, r *CheckpointRunner[K, V]) {
// retry the last item to avoid frequent changes to the slice capacity
if len(f.incompleteMetas) > 0 {
lastIdx := len(f.incompleteMetas) - 1
if err := r.doFlush(ctx, f.incompleteMetas[lastIdx]); err != nil {
log.Warn("failed to retry to flush checkpoint data", zap.Error(err))
return
}
f.incompleteMetas = f.incompleteMetas[:lastIdx]
} else if len(f.incompleteChecksums) > 0 {
lastIdx := len(f.incompleteChecksums) - 1
if err := r.doChecksumFlush(ctx, f.incompleteChecksums[lastIdx]); err != nil {
log.Warn("failed to retry to flush checkpoint checksum", zap.Error(err))
return
}
f.incompleteChecksums = f.incompleteChecksums[:lastIdx]
}
}

func (f *flusher[K, V]) flushAllIncompleteMeta(ctx context.Context, r *CheckpointRunner[K, V]) {
for _, meta := range f.incompleteMetas {
if err := r.doFlush(ctx, meta); err != nil {
log.Warn("failed to retry to flush checkpoint data", zap.Error(err))
}
}
}

func (f *flusher[K, V]) flushAllIncompleteChecksum(ctx context.Context, r *CheckpointRunner[K, V]) {
for _, checksums := range f.incompleteChecksums {
if err := r.doChecksumFlush(ctx, checksums); err != nil {
log.Warn("failed to retry to flush checkpoint checksum", zap.Error(err))
}
}
}

// start a goroutine to flush the meta, which is sent from `checkpoint looper`, to the external storage
func (r *CheckpointRunner[K, V]) startCheckpointFlushLoop(ctx context.Context, wg *sync.WaitGroup) chan error {
func (r *CheckpointRunner[K, V]) startCheckpointFlushLoop(
ctx context.Context,
wg *sync.WaitGroup,
retryDuration time.Duration,
) chan error {
errCh := make(chan error, 1)
wg.Add(1)
flushWorker := func(ctx context.Context, errCh chan error) {
defer wg.Done()
flusher := newFlusher[K, V]()
retryTicker := time.NewTicker(retryDuration)
defer retryTicker.Stop()
for {
select {
case <-ctx.Done():
Expand All @@ -320,22 +390,18 @@ func (r *CheckpointRunner[K, V]) startCheckpointFlushLoop(ctx context.Context, w
return
case meta, ok := <-r.metaCh:
if !ok {
flusher.flushAllIncompleteMeta(ctx, r)
log.Info("stop checkpoint flush worker")
return
}
if err := r.doFlush(ctx, meta); err != nil {
errCh <- errors.Annotate(err, "failed to flush checkpoint data.")
return
}
flusher.doFlush(ctx, r, meta)
case checksums, ok := <-r.checksumMetaCh:
if !ok {
flusher.flushAllIncompleteChecksum(ctx, r)
log.Info("stop checkpoint flush worker")
return
}
if err := r.doChecksumFlush(ctx, checksums); err != nil {
errCh <- errors.Annotate(err, "failed to flush checkpoint checksum.")
return
}
flusher.doChecksumFlush(ctx, r, checksums)
case _, ok := <-r.lockCh:
if !ok {
log.Info("stop checkpoint flush worker")
Expand All @@ -345,6 +411,8 @@ func (r *CheckpointRunner[K, V]) startCheckpointFlushLoop(ctx context.Context, w
errCh <- errors.Annotate(err, "failed to update checkpoint lock.")
return
}
case <-retryTicker.C:
flusher.flushOneIncomplete(ctx, r)
}
}
}
Expand All @@ -370,7 +438,8 @@ func (r *CheckpointRunner[K, V]) startCheckpointMainLoop(
ctx context.Context,
tickDurationForFlush,
tickDurationForChecksum,
tickDurationForLock time.Duration,
tickDurationForLock,
retryDuration time.Duration,
) {
failpoint.Inject("checkpoint-more-quickly-flush", func(_ failpoint.Value) {
tickDurationForChecksum = 1 * time.Second
Expand All @@ -390,7 +459,7 @@ func (r *CheckpointRunner[K, V]) startCheckpointMainLoop(
cctx, cancel := context.WithCancel(ctx)
defer cancel()
var wg sync.WaitGroup
errCh := r.startCheckpointFlushLoop(cctx, &wg)
errCh := r.startCheckpointFlushLoop(cctx, &wg, retryDuration)
flushTicker := time.NewTicker(tickDurationForFlush)
defer flushTicker.Stop()
checksumTicker := time.NewTicker(tickDurationForChecksum)
Expand Down
89 changes: 88 additions & 1 deletion br/pkg/checkpoint/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ package checkpoint_test
import (
"context"
"encoding/json"
"fmt"
"os"
"testing"
"time"

"github.com/pingcap/failpoint"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/encryptionpb"
"github.com/pingcap/tidb/br/pkg/checkpoint"
Expand Down Expand Up @@ -278,7 +280,7 @@ func TestCheckpointRestoreRunner(t *testing.T) {

err = checkpoint.SaveCheckpointMetadataForSnapshotRestore(ctx, se, &checkpoint.CheckpointMetadataForSnapshotRestore{})
require.NoError(t, err)
checkpointRunner, err := checkpoint.StartCheckpointRestoreRunnerForTest(ctx, se, 5*time.Second)
checkpointRunner, err := checkpoint.StartCheckpointRestoreRunnerForTest(ctx, se, 5*time.Second, 3*time.Second)
require.NoError(t, err)

data := map[string]struct {
Expand Down Expand Up @@ -360,6 +362,91 @@ func TestCheckpointRestoreRunner(t *testing.T) {
require.False(t, exists)
}

func TestCheckpointRunnerRetry(t *testing.T) {
ctx := context.Background()
s := utiltest.CreateRestoreSchemaSuite(t)
g := gluetidb.New()
se, err := g.CreateSession(s.Mock.Storage)
require.NoError(t, err)

err = checkpoint.SaveCheckpointMetadataForSnapshotRestore(ctx, se, &checkpoint.CheckpointMetadataForSnapshotRestore{})
require.NoError(t, err)
checkpointRunner, err := checkpoint.StartCheckpointRestoreRunnerForTest(ctx, se, 100*time.Millisecond, 300*time.Millisecond)
require.NoError(t, err)

err = failpoint.Enable("github.com/pingcap/tidb/br/pkg/checkpoint/failed-after-checkpoint-flushes", "return(true)")
require.NoError(t, err)
defer func() {
err = failpoint.Disable("github.com/pingcap/tidb/br/pkg/checkpoint/failed-after-checkpoint-flushes")
require.NoError(t, err)
}()
err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 1, "123")
require.NoError(t, err)
err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 2, "456")
require.NoError(t, err)
err = checkpointRunner.FlushChecksum(ctx, 1, 1, 1, 1)
require.NoError(t, err)
err = checkpointRunner.FlushChecksum(ctx, 2, 2, 2, 2)
time.Sleep(time.Second)
err = failpoint.Disable("github.com/pingcap/tidb/br/pkg/checkpoint/failed-after-checkpoint-flushes")
require.NoError(t, err)
err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 3, "789")
require.NoError(t, err)
err = checkpointRunner.FlushChecksum(ctx, 3, 3, 3, 3)
require.NoError(t, err)
checkpointRunner.WaitForFinish(ctx, true)
recordSet := make(map[string]int)
_, err = checkpoint.LoadCheckpointDataForSnapshotRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor(),
func(tableID int64, rangeKey checkpoint.RestoreValueType) {
recordSet[fmt.Sprintf("%d_%s", tableID, rangeKey)] += 1
})
require.NoError(t, err)
require.LessOrEqual(t, 1, recordSet["1_{123}"])
require.LessOrEqual(t, 1, recordSet["2_{456}"])
require.LessOrEqual(t, 1, recordSet["3_{789}"])
items, _, err := checkpoint.LoadCheckpointChecksumForRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor())
require.NoError(t, err)
require.Equal(t, fmt.Sprintf("%d_%d_%d", items[1].Crc64xor, items[1].TotalBytes, items[1].TotalKvs), "1_1_1")
require.Equal(t, fmt.Sprintf("%d_%d_%d", items[2].Crc64xor, items[2].TotalBytes, items[2].TotalKvs), "2_2_2")
require.Equal(t, fmt.Sprintf("%d_%d_%d", items[3].Crc64xor, items[3].TotalBytes, items[3].TotalKvs), "3_3_3")
}

func TestCheckpointRunnerNoRetry(t *testing.T) {
ctx := context.Background()
s := utiltest.CreateRestoreSchemaSuite(t)
g := gluetidb.New()
se, err := g.CreateSession(s.Mock.Storage)
require.NoError(t, err)

err = checkpoint.SaveCheckpointMetadataForSnapshotRestore(ctx, se, &checkpoint.CheckpointMetadataForSnapshotRestore{})
require.NoError(t, err)
checkpointRunner, err := checkpoint.StartCheckpointRestoreRunnerForTest(ctx, se, 100*time.Millisecond, 300*time.Millisecond)
require.NoError(t, err)

err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 1, "123")
require.NoError(t, err)
err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 2, "456")
require.NoError(t, err)
err = checkpointRunner.FlushChecksum(ctx, 1, 1, 1, 1)
require.NoError(t, err)
err = checkpointRunner.FlushChecksum(ctx, 2, 2, 2, 2)
require.NoError(t, err)
time.Sleep(time.Second)
checkpointRunner.WaitForFinish(ctx, true)
recordSet := make(map[string]int)
_, err = checkpoint.LoadCheckpointDataForSnapshotRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor(),
func(tableID int64, rangeKey checkpoint.RestoreValueType) {
recordSet[fmt.Sprintf("%d_%s", tableID, rangeKey)] += 1
})
require.NoError(t, err)
require.Equal(t, 1, recordSet["1_{123}"])
require.Equal(t, 1, recordSet["2_{456}"])
items, _, err := checkpoint.LoadCheckpointChecksumForRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor())
require.NoError(t, err)
require.Equal(t, fmt.Sprintf("%d_%d_%d", items[1].Crc64xor, items[1].TotalBytes, items[1].TotalKvs), "1_1_1")
require.Equal(t, fmt.Sprintf("%d_%d_%d", items[2].Crc64xor, items[2].TotalBytes, items[2].TotalKvs), "2_2_2")
}

func TestCheckpointLogRestoreRunner(t *testing.T) {
ctx := context.Background()
s := utiltest.CreateRestoreSchemaSuite(t)
Expand Down
6 changes: 4 additions & 2 deletions br/pkg/checkpoint/log_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func StartCheckpointLogRestoreRunnerForTest(
newTableCheckpointStorage(se, LogRestoreCheckpointDatabaseName),
nil, valueMarshalerForLogRestore)

runner.startCheckpointMainLoop(ctx, tick, tick, 0)
runner.startCheckpointMainLoop(ctx, tick, tick, 0, defaultRetryDuration)
return runner, nil
}

Expand All @@ -129,7 +129,9 @@ func StartCheckpointRunnerForLogRestore(
nil, valueMarshalerForLogRestore)

// for restore, no need to set lock
runner.startCheckpointMainLoop(ctx, defaultTickDurationForFlush, defaultTckDurationForChecksum, 0)
runner.startCheckpointMainLoop(
ctx,
defaultTickDurationForFlush, defaultTickDurationForChecksum, 0, defaultRetryDuration)
return runner, nil
}

Expand Down
7 changes: 5 additions & 2 deletions br/pkg/checkpoint/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,13 @@ func StartCheckpointRestoreRunnerForTest(
ctx context.Context,
se glue.Session,
tick time.Duration,
retryDuration time.Duration,
) (*CheckpointRunner[RestoreKeyType, RestoreValueType], error) {
runner := newCheckpointRunner[RestoreKeyType, RestoreValueType](
newTableCheckpointStorage(se, SnapshotRestoreCheckpointDatabaseName),
nil, valueMarshalerForRestore)

runner.startCheckpointMainLoop(ctx, tick, tick, 0)
runner.startCheckpointMainLoop(ctx, tick, tick, 0, retryDuration)
return runner, nil
}

Expand All @@ -64,7 +65,9 @@ func StartCheckpointRunnerForRestore(
nil, valueMarshalerForRestore)

// for restore, no need to set lock
runner.startCheckpointMainLoop(ctx, defaultTickDurationForFlush, defaultTckDurationForChecksum, 0)
runner.startCheckpointMainLoop(
ctx,
defaultTickDurationForFlush, defaultTickDurationForChecksum, 0, defaultRetryDuration)
return runner, nil
}

Expand Down

0 comments on commit 940e29c

Please sign in to comment.