diff --git a/br/pkg/lightning/config/config.go b/br/pkg/lightning/config/config.go index 2de0f6d144c75..0fc5ae66e28ef 100644 --- a/br/pkg/lightning/config/config.go +++ b/br/pkg/lightning/config/config.go @@ -1050,9 +1050,11 @@ type TikvImporter struct { DiskQuota ByteSize `toml:"disk-quota" json:"disk-quota"` RangeConcurrency int `toml:"range-concurrency" json:"range-concurrency"` DuplicateResolution DuplicateResolutionAlgorithm `toml:"duplicate-resolution" json:"duplicate-resolution"` - IncrementalImport bool `toml:"incremental-import" json:"incremental-import"` - KeyspaceName string `toml:"keyspace-name" json:"keyspace-name"` - AddIndexBySQL bool `toml:"add-index-by-sql" json:"add-index-by-sql"` + // deprecated, use ParallelImport instead. + IncrementalImport bool `toml:"incremental-import" json:"incremental-import"` + ParallelImport bool `toml:"parallel-import" json:"parallel-import"` + KeyspaceName string `toml:"keyspace-name" json:"keyspace-name"` + AddIndexBySQL bool `toml:"add-index-by-sql" json:"add-index-by-sql"` EngineMemCacheSize ByteSize `toml:"engine-mem-cache-size" json:"engine-mem-cache-size"` LocalWriterMemCacheSize ByteSize `toml:"local-writer-mem-cache-size" json:"local-writer-mem-cache-size"` @@ -1066,6 +1068,10 @@ func (t *TikvImporter) adjust() error { return common.ErrInvalidConfig.GenWithStack("tikv-importer.backend must not be empty!") } t.Backend = strings.ToLower(t.Backend) + // only need to assign t.IncrementalImport to t.ParallelImport when t.ParallelImport is false and t.IncrementalImport is true + if !t.ParallelImport && t.IncrementalImport { + t.ParallelImport = t.IncrementalImport + } switch t.Backend { case BackendTiDB: t.DuplicateResolution = DupeResAlgNone @@ -1090,9 +1096,9 @@ func (t *TikvImporter) adjust() error { t.LocalWriterMemCacheSize = DefaultLocalWriterMemCacheSize } - if t.IncrementalImport && t.AddIndexBySQL { + if t.ParallelImport && t.AddIndexBySQL { return common.ErrInvalidConfig. - GenWithStack("tikv-importer.add-index-using-ddl cannot be used with tikv-importer.incremental-import") + GenWithStack("tikv-importer.add-index-using-ddl cannot be used with tikv-importer.parallel-import") } if len(t.SortedKVDir) == 0 { @@ -1321,9 +1327,9 @@ func (c *Conflict) adjust(i *TikvImporter, l *Lightning) error { "unsupported `%s` (%s)", strategyConfigFrom, c.Strategy) } if c.Strategy != "" { - if i.IncrementalImport { + if i.ParallelImport { return common.ErrInvalidConfig.GenWithStack( - "%s cannot be used with tikv-importer.incremental-import", + "%s cannot be used with tikv-importer.parallel-import", strategyConfigFrom) } if i.DuplicateResolution != DupeResAlgNone { diff --git a/br/pkg/lightning/config/config_test.go b/br/pkg/lightning/config/config_test.go index fa6cc28f2d103..5094203c2cbac 100644 --- a/br/pkg/lightning/config/config_test.go +++ b/br/pkg/lightning/config/config_test.go @@ -982,19 +982,19 @@ func TestAdjustConflictStrategy(t *testing.T) { cfg.TikvImporter.Backend = BackendLocal cfg.Conflict.Strategy = ReplaceOnDup - cfg.TikvImporter.IncrementalImport = true - require.ErrorContains(t, cfg.Adjust(ctx), "conflict.strategy cannot be used with tikv-importer.incremental-import") + cfg.TikvImporter.ParallelImport = true + require.ErrorContains(t, cfg.Adjust(ctx), "conflict.strategy cannot be used with tikv-importer.parallel-import") cfg.TikvImporter.Backend = BackendLocal cfg.Conflict.Strategy = ReplaceOnDup - cfg.TikvImporter.IncrementalImport = false + cfg.TikvImporter.ParallelImport = false cfg.TikvImporter.DuplicateResolution = DupeResAlgRemove require.ErrorContains(t, cfg.Adjust(ctx), "conflict.strategy cannot be used with tikv-importer.duplicate-resolution") cfg.TikvImporter.Backend = BackendLocal cfg.Conflict.Strategy = "" cfg.TikvImporter.OnDuplicate = ReplaceOnDup - cfg.TikvImporter.IncrementalImport = false + cfg.TikvImporter.ParallelImport = false cfg.TikvImporter.DuplicateResolution = DupeResAlgRemove require.ErrorContains(t, cfg.Adjust(ctx), "tikv-importer.on-duplicate cannot be used with tikv-importer.duplicate-resolution") } @@ -1201,9 +1201,9 @@ func TestAdjustTikvImporter(t *testing.T) { cfg.TikvImporter.SortedKVDir = base require.NoError(t, cfg.TikvImporter.adjust()) - cfg.TikvImporter.IncrementalImport = true + cfg.TikvImporter.ParallelImport = true cfg.TikvImporter.AddIndexBySQL = true - require.ErrorContains(t, cfg.TikvImporter.adjust(), "tikv-importer.add-index-using-ddl cannot be used with tikv-importer.incremental-import") + require.ErrorContains(t, cfg.TikvImporter.adjust(), "tikv-importer.add-index-using-ddl cannot be used with tikv-importer.parallel-import") } func TestCreateSeveralConfigsWithDifferentFilters(t *testing.T) { diff --git a/br/pkg/lightning/importer/check_info.go b/br/pkg/lightning/importer/check_info.go index aa321b09a087c..1dd5f784409e0 100644 --- a/br/pkg/lightning/importer/check_info.go +++ b/br/pkg/lightning/importer/check_info.go @@ -137,7 +137,7 @@ func (rc *Controller) checkCSVHeader(ctx context.Context) error { } func (rc *Controller) checkTableEmpty(ctx context.Context) error { - if rc.cfg.TikvImporter.Backend == config.BackendTiDB || rc.cfg.TikvImporter.IncrementalImport { + if rc.cfg.TikvImporter.Backend == config.BackendTiDB || rc.cfg.TikvImporter.ParallelImport { return nil } return rc.doPreCheckOnItem(ctx, precheck.CheckTargetTableEmpty) diff --git a/br/pkg/lightning/importer/check_info_test.go b/br/pkg/lightning/importer/check_info_test.go index be55454b88fbf..27105aeeb536c 100644 --- a/br/pkg/lightning/importer/check_info_test.go +++ b/br/pkg/lightning/importer/check_info_test.go @@ -482,13 +482,13 @@ func TestCheckTableEmpty(t *testing.T) { err := rc.checkTableEmpty(ctx) require.NoError(t, err) - // test incremental mode + // test parallel mode rc.cfg.TikvImporter.Backend = config.BackendLocal - rc.cfg.TikvImporter.IncrementalImport = true + rc.cfg.TikvImporter.ParallelImport = true err = rc.checkTableEmpty(ctx) require.NoError(t, err) - rc.cfg.TikvImporter.IncrementalImport = false + rc.cfg.TikvImporter.ParallelImport = false db, mock, err := sqlmock.New() require.NoError(t, err) mock.MatchExpectationsInOrder(false) diff --git a/br/pkg/lightning/importer/import.go b/br/pkg/lightning/importer/import.go index fc83a63f601f4..486d9ed40b049 100644 --- a/br/pkg/lightning/importer/import.go +++ b/br/pkg/lightning/importer/import.go @@ -378,7 +378,7 @@ func NewImportControllerWithPauser( var metaBuilder metaMgrBuilder isSSTImport := cfg.TikvImporter.Backend == config.BackendLocal switch { - case isSSTImport && cfg.TikvImporter.IncrementalImport: + case isSSTImport && cfg.TikvImporter.ParallelImport: metaBuilder = &dbMetaMgrBuilder{ db: db, taskID: cfg.TaskID, diff --git a/br/tests/lightning_incremental/config.toml b/br/tests/lightning_incremental/config.toml index 761e60b91b804..1d3a437e21dd0 100644 --- a/br/tests/lightning_incremental/config.toml +++ b/br/tests/lightning_incremental/config.toml @@ -1,2 +1,2 @@ [tikv-importer] -incremental-import = true +parallel-import = true