diff --git a/lightning/config/config.go b/lightning/config/config.go index a4496fab8..fd4a57972 100644 --- a/lightning/config/config.go +++ b/lightning/config/config.go @@ -84,11 +84,12 @@ type PostRestore struct { } type MydumperRuntime struct { - ReadBlockSize int64 `toml:"read-block-size" json:"read-block-size"` - BatchSize int64 `toml:"batch-size" json:"batch-size"` - SourceDir string `toml:"data-source-dir" json:"data-source-dir"` - NoSchema bool `toml:"no-schema" json:"no-schema"` - CharacterSet string `toml:"character-set" json:"character-set"` + ReadBlockSize int64 `toml:"read-block-size" json:"read-block-size"` + BatchSize int64 `toml:"batch-size" json:"batch-size"` + BatchSizeScale int64 `toml:"batch-size-scale" json:"batch-size-scale"` + SourceDir string `toml:"data-source-dir" json:"data-source-dir"` + NoSchema bool `toml:"no-schema" json:"no-schema"` + CharacterSet string `toml:"character-set" json:"character-set"` } type TikvImporter struct { @@ -189,6 +190,9 @@ func (cfg *Config) Load() error { if cfg.Mydumper.BatchSize <= 0 { cfg.Mydumper.BatchSize = 10 * _G } + if cfg.Mydumper.BatchSizeScale <= 0 { + cfg.Mydumper.BatchSizeScale = 5 + } if cfg.Mydumper.ReadBlockSize <= 0 { cfg.Mydumper.ReadBlockSize = ReadBlockSize } diff --git a/lightning/mydump/region.go b/lightning/mydump/region.go index 1c0ffd72d..399c3b3c8 100644 --- a/lightning/mydump/region.go +++ b/lightning/mydump/region.go @@ -46,17 +46,10 @@ func (rs regionSlice) Less(i, j int) bool { //////////////////////////////////////////////////////////////// -func MakeTableRegions(meta *MDTableMeta, columns int, batchSize int64) ([]*TableRegion, error) { +func MakeTableRegions(meta *MDTableMeta, columns int, batchSize, batchSizeScale int64) ([]*TableRegion, error) { // Split files into regions filesRegions := make(regionSlice, 0, len(meta.DataFiles)) - // import() step will not be concurrent. - // If multiple Batch end times are close, it will result in multiple - // Batch import serials. `batchSizeScale` used to implement non-uniform - // batch size: curBatchSize = batchSize / int64(batchSizeScale-curEngineID) - // TODO: make it configurable - const batchSizeScale = 20 - prevRowIDMax := int64(0) curEngineID := 0 curEngineSize := int64(0) @@ -86,8 +79,13 @@ func MakeTableRegions(meta *MDTableMeta, columns int, batchSize int64) ([]*Table if curEngineSize > curBatchSize { curEngineSize = 0 curEngineID++ - if curEngineID < batchSizeScale { - curBatchSize = batchSize / int64(batchSizeScale-curEngineID) + + // import() step will not be concurrent. + // If multiple Batch end times are close, it will result in multiple + // Batch import serials. `batchSizeScale` used to implement non-uniform + // batch size: curBatchSize = batchSize / int64(batchSizeScale-curEngineID) + if int64(curEngineID) < batchSizeScale { + curBatchSize = batchSize / (batchSizeScale - int64(curEngineID)) } else { curBatchSize = batchSize } diff --git a/lightning/mydump/region_test.go b/lightning/mydump/region_test.go index 31fca02c5..0d04fe4f0 100644 --- a/lightning/mydump/region_test.go +++ b/lightning/mydump/region_test.go @@ -38,7 +38,7 @@ func (s *testMydumpRegionSuite) TestTableRegion(c *C) { dbMeta := loader.GetDatabases()[0] for _, meta := range dbMeta.Tables { - regions, err := MakeTableRegions(meta, 1, 1) + regions, err := MakeTableRegions(meta, 1, 1, 1) c.Assert(err, IsNil) table := meta.Name @@ -98,7 +98,7 @@ func (s *testMydumpRegionSuite) TestRegionReader(c *C) { dbMeta := loader.GetDatabases()[0] for _, meta := range dbMeta.Tables { - regions, err := MakeTableRegions(meta, 1, 1) + regions, err := MakeTableRegions(meta, 1, 1, 1) c.Assert(err, IsNil) tolValTuples := 0 diff --git a/lightning/restore/restore.go b/lightning/restore/restore.go index cc1e4c0c2..0a66c9cc2 100644 --- a/lightning/restore/restore.go +++ b/lightning/restore/restore.go @@ -461,7 +461,7 @@ func (t *TableRestore) restoreTable( if len(cp.Engines) > 0 { common.AppLogger.Infof("[%s] reusing %d engines and %d chunks from checkpoint", t.tableName, len(cp.Engines), cp.CountChunks()) } else if cp.Status < CheckpointStatusAllWritten { - if err := t.populateChunks(rc.cfg.Mydumper.BatchSize, cp); err != nil { + if err := t.populateChunks(rc.cfg.Mydumper.BatchSize, rc.cfg.Mydumper.BatchSizeScale, cp); err != nil { return errors.Trace(err) } if err := rc.checkpointsDB.InsertEngineCheckpoints(ctx, t.tableName, cp.Engines); err != nil { @@ -972,11 +972,11 @@ func (tr *TableRestore) Close() { var tidbRowIDColumnRegex = regexp.MustCompile(fmt.Sprintf("`%[1]s`|(?i:\\b%[1]s\\b)", model.ExtraHandleName)) -func (t *TableRestore) populateChunks(batchSize int64, cp *TableCheckpoint) error { +func (t *TableRestore) populateChunks(batchSize, batchSizeScale int64, cp *TableCheckpoint) error { common.AppLogger.Infof("[%s] load chunks", t.tableName) timer := time.Now() - chunks, err := mydump.MakeTableRegions(t.tableMeta, t.tableInfo.Columns, batchSize) + chunks, err := mydump.MakeTableRegions(t.tableMeta, t.tableInfo.Columns, batchSize, batchSizeScale) if err != nil { return errors.Trace(err) } diff --git a/tidb-lightning.toml b/tidb-lightning.toml index 988ee6fdc..1e3f66d62 100644 --- a/tidb-lightning.toml +++ b/tidb-lightning.toml @@ -57,6 +57,11 @@ read-block-size = 65536 # Byte (default = 64 KB) # minimum size (in terms of source data file) of each batch of import. # Lightning will split a large table into multiple engine files according to this size. batch-size = 10_737_418_240 # Byte (default = 10 GB) +# The step import to tikv will execute serially if all batches have the same size +# and some of them complete in close time. `batch-size-scale` makes the batch +# size between `batch-size / batch-size-scale` and `batch-size` to avoid this situation. +# `batch-size-scale = 1` means all batch have the same size (`batch-size`) +batch-size-scale = 5 # mydumper local source data directory data-source-dir = "/tmp/export-20180328-200751" # if no-schema is set true, lightning will get schema information from tidb-server directly without creating them.