Skip to content

Commit

Permalink
Merge branch 'master' into plan-cache-cacheable
Browse files Browse the repository at this point in the history
  • Loading branch information
qw4990 authored Jan 3, 2023
2 parents 508eee5 + 25a2479 commit 72d055d
Show file tree
Hide file tree
Showing 65 changed files with 668 additions and 2,099 deletions.
4 changes: 0 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -408,10 +408,6 @@ bazel_coverage_test: failpoint-enable bazel_ci_prepare
--build_event_json_file=bazel_1.json --@io_bazel_rules_go//go/config:cover_format=go_cover \
-- //... -//cmd/... -//tests/graceshutdown/... \
-//tests/globalkilltest/... -//tests/readonlytest/... -//br/pkg/task:task_test -//tests/realtikvtest/...
bazel $(BAZEL_GLOBAL_CONFIG) coverage $(BAZEL_CMD_CONFIG) \
--build_event_json_file=bazel_2.json --@io_bazel_rules_go//go/config:cover_format=go_cover --define gotags=featuretag \
-- //... -//cmd/... -//tests/graceshutdown/... \
-//tests/globalkilltest/... -//tests/readonlytest/... -//br/pkg/task:task_test -//tests/realtikvtest/...

bazel_build: bazel_ci_prepare
mkdir -p bin
Expand Down
1 change: 0 additions & 1 deletion br/pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,6 @@ func NewMgr(
return nil, errors.Trace(err)
}
// we must check tidb(tikv version) any time after concurrent ddl feature implemented in v6.2.
// when tidb < 6.2 we need set EnableConcurrentDDL false to make ddl works.
// we will keep this check until 7.0, which allow the breaking changes.
// NOTE: must call it after domain created!
// FIXME: remove this check in v7.0
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/local/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ go_test(
"//br/pkg/lightning/glue",
"//br/pkg/lightning/log",
"//br/pkg/lightning/mydump",
"//br/pkg/lightning/worker",
"//br/pkg/membuf",
"//br/pkg/mock",
"//br/pkg/pdutil",
Expand Down
17 changes: 11 additions & 6 deletions br/pkg/lightning/backend/local/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,17 @@ import (
"github.com/stretchr/testify/require"
)

func TestIngestSSTWithClosedEngine(t *testing.T) {
func makePebbleDB(t *testing.T, opt *pebble.Options) (*pebble.DB, string) {
dir := t.TempDir()
db, err := pebble.Open(path.Join(dir, "test"), opt)
require.NoError(t, err)
tmpPath := filepath.Join(dir, "test.sst")
err = os.Mkdir(tmpPath, 0o755)
require.NoError(t, err)
return db, tmpPath
}

func TestIngestSSTWithClosedEngine(t *testing.T) {
opt := &pebble.Options{
MemTableSize: 1024 * 1024,
MaxConcurrentCompactions: 16,
Expand All @@ -41,11 +50,7 @@ func TestIngestSSTWithClosedEngine(t *testing.T) {
DisableWAL: true,
ReadOnly: false,
}
db, err := pebble.Open(filepath.Join(dir, "test"), opt)
require.NoError(t, err)
tmpPath := filepath.Join(dir, "test.sst")
err = os.Mkdir(tmpPath, 0o755)
require.NoError(t, err)
db, tmpPath := makePebbleDB(t, opt)

_, engineUUID := backend.MakeUUID("ww", 0)
engineCtx, cancel := context.WithCancel(context.Background())
Expand Down
44 changes: 44 additions & 0 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ const (
gRPCKeepAliveTime = 10 * time.Minute
gRPCKeepAliveTimeout = 5 * time.Minute
gRPCBackOffMaxDelay = 10 * time.Minute
writeStallSleepTime = 10 * time.Second

// The max ranges count in a batch to split and scatter.
maxBatchSplitRanges = 4096
Expand Down Expand Up @@ -381,6 +382,12 @@ type local struct {

encBuilder backend.EncodingBuilder
targetInfoGetter backend.TargetInfoGetter

// When TiKV is in normal mode, ingesting too many SSTs will cause TiKV write stall.
// To avoid this, we should check write stall before ingesting SSTs. Note that, we
// must check both leader node and followers in client side, because followers will
// not check write stall as long as ingest command is accepted by leader.
shouldCheckWriteStall bool
}

func openDuplicateDB(storeDir string) (*pebble.DB, error) {
Expand Down Expand Up @@ -503,6 +510,7 @@ func NewLocalBackend(
logger: log.FromContext(ctx),
encBuilder: NewEncodingBuilder(ctx),
targetInfoGetter: NewTargetInfoGetter(tls, g, cfg.TiDB.PdAddr),
shouldCheckWriteStall: cfg.Cron.SwitchMode.Duration == 0,
}
if m, ok := metric.FromContext(ctx); ok {
local.metrics = m
Expand Down Expand Up @@ -1146,6 +1154,25 @@ func (local *local) Ingest(ctx context.Context, metas []*sst.SSTMeta, region *sp
return resp, errors.Trace(err)
}

if local.shouldCheckWriteStall {
for {
maybeWriteStall, err := local.checkWriteStall(ctx, region)
if err != nil {
return nil, err
}
if !maybeWriteStall {
break
}
log.FromContext(ctx).Warn("ingest maybe cause write stall, sleep and retry",
zap.Duration("duration", writeStallSleepTime))
select {
case <-time.After(writeStallSleepTime):
case <-ctx.Done():
return nil, errors.Trace(ctx.Err())
}
}
}

req := &sst.MultiIngestRequest{
Context: reqCtx,
Ssts: metas,
Expand All @@ -1154,6 +1181,23 @@ func (local *local) Ingest(ctx context.Context, metas []*sst.SSTMeta, region *sp
return resp, errors.Trace(err)
}

func (local *local) checkWriteStall(ctx context.Context, region *split.RegionInfo) (bool, error) {
for _, peer := range region.Region.GetPeers() {
cli, err := local.getImportClient(ctx, peer.StoreId)
if err != nil {
return false, errors.Trace(err)
}
resp, err := cli.MultiIngest(ctx, &sst.MultiIngestRequest{})
if err != nil {
return false, errors.Trace(err)
}
if resp.Error != nil && resp.Error.ServerIsBusy != nil {
return true, nil
}
}
return false, nil
}

func splitRangeBySizeProps(fullRange Range, sizeProps *sizeProperties, sizeLimit int64, keysLimit int64) []Range {
ranges := make([]Range, 0, sizeProps.totalSize/uint64(sizeLimit))
curSize := uint64(0)
Expand Down
Loading

0 comments on commit 72d055d

Please sign in to comment.