Skip to content

Commit

Permalink
Lightning: Update Parameter Name from `tikv-importer.incremental-impo…
Browse files Browse the repository at this point in the history
…rt` to `tikv-importer.parallel-import` (#45539)

close #45501
  • Loading branch information
lyzx2001 authored Jul 26, 2023
1 parent eff4970 commit 1d762be
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 19 deletions.
20 changes: 13 additions & 7 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1050,9 +1050,11 @@ type TikvImporter struct {
DiskQuota ByteSize `toml:"disk-quota" json:"disk-quota"`
RangeConcurrency int `toml:"range-concurrency" json:"range-concurrency"`
DuplicateResolution DuplicateResolutionAlgorithm `toml:"duplicate-resolution" json:"duplicate-resolution"`
IncrementalImport bool `toml:"incremental-import" json:"incremental-import"`
KeyspaceName string `toml:"keyspace-name" json:"keyspace-name"`
AddIndexBySQL bool `toml:"add-index-by-sql" json:"add-index-by-sql"`
// deprecated, use ParallelImport instead.
IncrementalImport bool `toml:"incremental-import" json:"incremental-import"`
ParallelImport bool `toml:"parallel-import" json:"parallel-import"`
KeyspaceName string `toml:"keyspace-name" json:"keyspace-name"`
AddIndexBySQL bool `toml:"add-index-by-sql" json:"add-index-by-sql"`

EngineMemCacheSize ByteSize `toml:"engine-mem-cache-size" json:"engine-mem-cache-size"`
LocalWriterMemCacheSize ByteSize `toml:"local-writer-mem-cache-size" json:"local-writer-mem-cache-size"`
Expand All @@ -1066,6 +1068,10 @@ func (t *TikvImporter) adjust() error {
return common.ErrInvalidConfig.GenWithStack("tikv-importer.backend must not be empty!")
}
t.Backend = strings.ToLower(t.Backend)
// only need to assign t.IncrementalImport to t.ParallelImport when t.ParallelImport is false and t.IncrementalImport is true
if !t.ParallelImport && t.IncrementalImport {
t.ParallelImport = t.IncrementalImport
}
switch t.Backend {
case BackendTiDB:
t.DuplicateResolution = DupeResAlgNone
Expand All @@ -1090,9 +1096,9 @@ func (t *TikvImporter) adjust() error {
t.LocalWriterMemCacheSize = DefaultLocalWriterMemCacheSize
}

if t.IncrementalImport && t.AddIndexBySQL {
if t.ParallelImport && t.AddIndexBySQL {
return common.ErrInvalidConfig.
GenWithStack("tikv-importer.add-index-using-ddl cannot be used with tikv-importer.incremental-import")
GenWithStack("tikv-importer.add-index-using-ddl cannot be used with tikv-importer.parallel-import")
}

if len(t.SortedKVDir) == 0 {
Expand Down Expand Up @@ -1321,9 +1327,9 @@ func (c *Conflict) adjust(i *TikvImporter, l *Lightning) error {
"unsupported `%s` (%s)", strategyConfigFrom, c.Strategy)
}
if c.Strategy != "" {
if i.IncrementalImport {
if i.ParallelImport {
return common.ErrInvalidConfig.GenWithStack(
"%s cannot be used with tikv-importer.incremental-import",
"%s cannot be used with tikv-importer.parallel-import",
strategyConfigFrom)
}
if i.DuplicateResolution != DupeResAlgNone {
Expand Down
12 changes: 6 additions & 6 deletions br/pkg/lightning/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -982,19 +982,19 @@ func TestAdjustConflictStrategy(t *testing.T) {

cfg.TikvImporter.Backend = BackendLocal
cfg.Conflict.Strategy = ReplaceOnDup
cfg.TikvImporter.IncrementalImport = true
require.ErrorContains(t, cfg.Adjust(ctx), "conflict.strategy cannot be used with tikv-importer.incremental-import")
cfg.TikvImporter.ParallelImport = true
require.ErrorContains(t, cfg.Adjust(ctx), "conflict.strategy cannot be used with tikv-importer.parallel-import")

cfg.TikvImporter.Backend = BackendLocal
cfg.Conflict.Strategy = ReplaceOnDup
cfg.TikvImporter.IncrementalImport = false
cfg.TikvImporter.ParallelImport = false
cfg.TikvImporter.DuplicateResolution = DupeResAlgRemove
require.ErrorContains(t, cfg.Adjust(ctx), "conflict.strategy cannot be used with tikv-importer.duplicate-resolution")

cfg.TikvImporter.Backend = BackendLocal
cfg.Conflict.Strategy = ""
cfg.TikvImporter.OnDuplicate = ReplaceOnDup
cfg.TikvImporter.IncrementalImport = false
cfg.TikvImporter.ParallelImport = false
cfg.TikvImporter.DuplicateResolution = DupeResAlgRemove
require.ErrorContains(t, cfg.Adjust(ctx), "tikv-importer.on-duplicate cannot be used with tikv-importer.duplicate-resolution")
}
Expand Down Expand Up @@ -1201,9 +1201,9 @@ func TestAdjustTikvImporter(t *testing.T) {
cfg.TikvImporter.SortedKVDir = base
require.NoError(t, cfg.TikvImporter.adjust())

cfg.TikvImporter.IncrementalImport = true
cfg.TikvImporter.ParallelImport = true
cfg.TikvImporter.AddIndexBySQL = true
require.ErrorContains(t, cfg.TikvImporter.adjust(), "tikv-importer.add-index-using-ddl cannot be used with tikv-importer.incremental-import")
require.ErrorContains(t, cfg.TikvImporter.adjust(), "tikv-importer.add-index-using-ddl cannot be used with tikv-importer.parallel-import")
}

func TestCreateSeveralConfigsWithDifferentFilters(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/importer/check_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (rc *Controller) checkCSVHeader(ctx context.Context) error {
}

func (rc *Controller) checkTableEmpty(ctx context.Context) error {
if rc.cfg.TikvImporter.Backend == config.BackendTiDB || rc.cfg.TikvImporter.IncrementalImport {
if rc.cfg.TikvImporter.Backend == config.BackendTiDB || rc.cfg.TikvImporter.ParallelImport {
return nil
}
return rc.doPreCheckOnItem(ctx, precheck.CheckTargetTableEmpty)
Expand Down
6 changes: 3 additions & 3 deletions br/pkg/lightning/importer/check_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,13 +482,13 @@ func TestCheckTableEmpty(t *testing.T) {
err := rc.checkTableEmpty(ctx)
require.NoError(t, err)

// test incremental mode
// test parallel mode
rc.cfg.TikvImporter.Backend = config.BackendLocal
rc.cfg.TikvImporter.IncrementalImport = true
rc.cfg.TikvImporter.ParallelImport = true
err = rc.checkTableEmpty(ctx)
require.NoError(t, err)

rc.cfg.TikvImporter.IncrementalImport = false
rc.cfg.TikvImporter.ParallelImport = false
db, mock, err := sqlmock.New()
require.NoError(t, err)
mock.MatchExpectationsInOrder(false)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ func NewImportControllerWithPauser(
var metaBuilder metaMgrBuilder
isSSTImport := cfg.TikvImporter.Backend == config.BackendLocal
switch {
case isSSTImport && cfg.TikvImporter.IncrementalImport:
case isSSTImport && cfg.TikvImporter.ParallelImport:
metaBuilder = &dbMetaMgrBuilder{
db: db,
taskID: cfg.TaskID,
Expand Down
2 changes: 1 addition & 1 deletion br/tests/lightning_incremental/config.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
[tikv-importer]
incremental-import = true
parallel-import = true

0 comments on commit 1d762be

Please sign in to comment.