Skip to content

Commit

Permalink
lightning: convert pebbleDB block size to default when user passes in…
Browse files Browse the repository at this point in the history
… 0 (#50690)

ref #45037
  • Loading branch information
lyzx2001 authored Jan 25, 2024
1 parent cf320b2 commit c852515
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 2 deletions.
2 changes: 1 addition & 1 deletion br/pkg/lightning/checkpoints/checkpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -1650,7 +1650,7 @@ func (cpdb *MySQLCheckpointsDB) DestroyErrorCheckpoint(ctx context.Context, tabl
DELETE FROM %[1]s.%[2]s WHERE table_name IN (SELECT table_name FROM %[1]s.%[3]s WHERE status <= ?)
`, cpdb.schema, CheckpointTableNameEngine, CheckpointTableNameTable)
deleteTableQuery = common.SprintfWithIdentifiers(`
DELETE FROM %s.%s status <= ?
DELETE FROM %s.%s WHERE status <= ?
`, cpdb.schema, CheckpointTableNameTable)
args = []any{CheckpointStatusMaxInvalid}
} else {
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/config/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ go_test(
],
embed = [":config"],
flaky = True,
shard_count = 47,
shard_count = 48,
deps = [
"//br/pkg/lightning/common",
"@com_github_burntsushi_toml//:toml",
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ const (

DefaultEngineMemCacheSize = 512 * units.MiB
DefaultLocalWriterMemCacheSize = 128 * units.MiB
DefaultBlockSize = 16 * units.KiB

defaultCSVDataCharacterSet = "binary"
defaultCSVDataInvalidCharReplace = utf8.RuneError
Expand Down Expand Up @@ -1115,6 +1116,9 @@ func (t *TikvImporter) adjust() error {
if t.LocalWriterMemCacheSize == 0 {
t.LocalWriterMemCacheSize = DefaultLocalWriterMemCacheSize
}
if t.BlockSize == 0 {
t.BlockSize = DefaultBlockSize
}

if t.ParallelImport && t.AddIndexBySQL {
return common.ErrInvalidConfig.
Expand Down
13 changes: 13 additions & 0 deletions br/pkg/lightning/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1299,3 +1299,16 @@ func TestAdjustConflict(t *testing.T) {
cfg.Conflict.MaxRecordRows = 1
require.ErrorContains(t, cfg.Conflict.adjust(&cfg.TikvImporter, &cfg.App), `cannot record duplication (conflict.max-record-rows > 0) when use tikv-importer.backend = "tidb" and conflict.strategy = "replace"`)
}

func TestAdjustBlockSize(t *testing.T) {
cfg := NewConfig()
cfg.TikvImporter.Backend = BackendLocal
cfg.TikvImporter.SortedKVDir = "."
cfg.TiDB.DistSQLScanConcurrency = 1
cfg.Mydumper.SourceDir = "."
cfg.TikvImporter.BlockSize = 0

err := cfg.Adjust(context.Background())
require.Error(t, err)
require.Equal(t, ByteSize(16384), cfg.TikvImporter.BlockSize)
}
1 change: 1 addition & 0 deletions pkg/executor/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -1331,6 +1331,7 @@ func (e *LoadDataController) getLocalBackendCfg(pdAddr, dataDir string) local.Ba
KeyspaceName: tidb.GetGlobalKeyspaceName(),
PausePDSchedulerScope: config.PausePDSchedulerScopeTable,
DisableAutomaticCompactions: true,
BlockSize: config.DefaultBlockSize,
}
if e.IsRaftKV2 {
backendConfig.RaftKV2SwitchModeDuration = config.DefaultSwitchTiKVModeInterval
Expand Down

0 comments on commit c852515

Please sign in to comment.