diff --git a/lightning/config/config.go b/lightning/config/config.go index a4496fab8..ca4fbba5e 100644 --- a/lightning/config/config.go +++ b/lightning/config/config.go @@ -84,11 +84,12 @@ type PostRestore struct { } type MydumperRuntime struct { - ReadBlockSize int64 `toml:"read-block-size" json:"read-block-size"` - BatchSize int64 `toml:"batch-size" json:"batch-size"` - SourceDir string `toml:"data-source-dir" json:"data-source-dir"` - NoSchema bool `toml:"no-schema" json:"no-schema"` - CharacterSet string `toml:"character-set" json:"character-set"` + ReadBlockSize int64 `toml:"read-block-size" json:"read-block-size"` + BatchSize int64 `toml:"batch-size" json:"batch-size"` + BatchImportRatio float64 `toml:"batch-import-ratio" json:"batch-import-ratio"` + SourceDir string `toml:"data-source-dir" json:"data-source-dir"` + NoSchema bool `toml:"no-schema" json:"no-schema"` + CharacterSet string `toml:"character-set" json:"character-set"` } type TikvImporter struct { @@ -187,7 +188,10 @@ func (cfg *Config) Load() error { // handle mydumper if cfg.Mydumper.BatchSize <= 0 { - cfg.Mydumper.BatchSize = 10 * _G + cfg.Mydumper.BatchSize = 100 * _G + } + if cfg.Mydumper.BatchImportRatio < 0.0 || cfg.Mydumper.BatchImportRatio >= 1.0 { + cfg.Mydumper.BatchImportRatio = 0.75 } if cfg.Mydumper.ReadBlockSize <= 0 { cfg.Mydumper.ReadBlockSize = ReadBlockSize diff --git a/lightning/mydump/region.go b/lightning/mydump/region.go index f70e0fea8..78aba94d6 100644 --- a/lightning/mydump/region.go +++ b/lightning/mydump/region.go @@ -1,6 +1,7 @@ package mydump import ( + "math" "os" "github.com/pkg/errors" @@ -46,13 +47,89 @@ func (rs regionSlice) Less(i, j int) bool { //////////////////////////////////////////////////////////////// -func MakeTableRegions(meta *MDTableMeta, columns int, batchSize int64) ([]*TableRegion, error) { +func AllocateEngineIDs( + filesRegions []*TableRegion, + dataFileSizes []float64, + batchSize float64, + batchImportRatio float64, + tableConcurrency float64, +) { + totalDataFileSize := 0.0 + for _, dataFileSize := range dataFileSizes { + totalDataFileSize += dataFileSize + } + + // No need to batch if the size is too small :) + if totalDataFileSize <= batchSize { + return + } + + curEngineID := 0 + curEngineSize := 0.0 + curBatchSize := batchSize + + // import() step will not be concurrent. + // If multiple Batch end times are close, it will result in multiple + // Batch import serials. We need use a non-uniform batch size to create a pipeline effect. + // Here we calculate the total number of engines, which is needed to compute the scale up + // + // Total/B1 = 1/(1-R) * (N - 1/beta(N, R)) + // ≲ N/(1-R) + // + // We use a simple brute force search since the search space is extremely small. + ratio := totalDataFileSize * (1 - batchImportRatio) / batchSize + n := math.Ceil(ratio) + logGammaNPlusR, _ := math.Lgamma(n + batchImportRatio) + logGammaN, _ := math.Lgamma(n) + logGammaR, _ := math.Lgamma(batchImportRatio) + invBetaNR := math.Exp(logGammaNPlusR - logGammaN - logGammaR) // 1/B(N, R) = Γ(N+R)/Γ(N)Γ(R) + for { + if n <= 0 || n > tableConcurrency { + n = tableConcurrency + break + } + realRatio := n - invBetaNR + if realRatio >= ratio { + // we don't have enough engines. reduce the batch size to keep the pipeline smooth. + curBatchSize = totalDataFileSize * (1 - batchImportRatio) / realRatio + break + } + invBetaNR *= 1 + batchImportRatio/n // Γ(X+1) = X * Γ(X) + n += 1.0 + } + + for i, dataFileSize := range dataFileSizes { + filesRegions[i].EngineID = curEngineID + curEngineSize += dataFileSize + + if curEngineSize >= curBatchSize { + curEngineSize = 0 + curEngineID++ + + i := float64(curEngineID) + // calculate the non-uniform batch size + if i >= n { + curBatchSize = batchSize + } else { + // B_(i+1) = B_i * (I/W/(N-i) + 1) + curBatchSize *= batchImportRatio/(n-i) + 1.0 + } + } + } +} + +func MakeTableRegions( + meta *MDTableMeta, + columns int, + batchSize int64, + batchImportRatio float64, + tableConcurrency int, +) ([]*TableRegion, error) { // Split files into regions filesRegions := make(regionSlice, 0, len(meta.DataFiles)) + dataFileSizes := make([]float64, 0, len(meta.DataFiles)) prevRowIDMax := int64(0) - curEngineID := 0 - curEngineSize := int64(0) for _, dataFile := range meta.DataFiles { dataFileInfo, err := os.Stat(dataFile) if err != nil { @@ -61,10 +138,9 @@ func MakeTableRegions(meta *MDTableMeta, columns int, batchSize int64) ([]*Table dataFileSize := dataFileInfo.Size() rowIDMax := prevRowIDMax + dataFileSize/(int64(columns)+2) filesRegions = append(filesRegions, &TableRegion{ - EngineID: curEngineID, - DB: meta.DB, - Table: meta.Name, - File: dataFile, + DB: meta.DB, + Table: meta.Name, + File: dataFile, Chunk: Chunk{ Offset: 0, EndOffset: dataFileSize, @@ -73,13 +149,9 @@ func MakeTableRegions(meta *MDTableMeta, columns int, batchSize int64) ([]*Table }, }) prevRowIDMax = rowIDMax - - curEngineSize += dataFileSize - if curEngineSize > batchSize { - curEngineSize = 0 - curEngineID++ - } + dataFileSizes = append(dataFileSizes, float64(dataFileSize)) } + AllocateEngineIDs(filesRegions, dataFileSizes, float64(batchSize), batchImportRatio, float64(tableConcurrency)) return filesRegions, nil } diff --git a/lightning/mydump/region_test.go b/lightning/mydump/region_test.go index 31fca02c5..274c68001 100644 --- a/lightning/mydump/region_test.go +++ b/lightning/mydump/region_test.go @@ -38,7 +38,7 @@ func (s *testMydumpRegionSuite) TestTableRegion(c *C) { dbMeta := loader.GetDatabases()[0] for _, meta := range dbMeta.Tables { - regions, err := MakeTableRegions(meta, 1, 1) + regions, err := MakeTableRegions(meta, 1, 1, 0, 1) c.Assert(err, IsNil) table := meta.Name @@ -98,7 +98,7 @@ func (s *testMydumpRegionSuite) TestRegionReader(c *C) { dbMeta := loader.GetDatabases()[0] for _, meta := range dbMeta.Tables { - regions, err := MakeTableRegions(meta, 1, 1) + regions, err := MakeTableRegions(meta, 1, 1, 0, 1) c.Assert(err, IsNil) tolValTuples := 0 @@ -116,3 +116,84 @@ func (s *testMydumpRegionSuite) TestRegionReader(c *C) { return } + +func (s *testMydumpRegionSuite) TestAllocateEngineIDs(c *C) { + dataFileSizes := make([]float64, 700) + for i := range dataFileSizes { + dataFileSizes[i] = 1.0 + } + filesRegions := make([]*TableRegion, 0, len(dataFileSizes)) + for range dataFileSizes { + filesRegions = append(filesRegions, new(TableRegion)) + } + + checkEngineSizes := func(what string, expected map[int]int) { + actual := make(map[int]int) + for _, region := range filesRegions { + actual[region.EngineID]++ + } + c.Assert(actual, DeepEquals, expected, Commentf("%s", what)) + } + + // Batch size > Total size => Everything in the zero batch. + AllocateEngineIDs(filesRegions, dataFileSizes, 1000, 0.5, 1000) + checkEngineSizes("no batching", map[int]int{ + 0: 700, + }) + + // Allocate 3 engines. + AllocateEngineIDs(filesRegions, dataFileSizes, 200, 0.5, 1000) + checkEngineSizes("batch size = 200", map[int]int{ + 0: 170, + 1: 213, + 2: 317, + }) + + // Allocate 3 engines with an alternative ratio + AllocateEngineIDs(filesRegions, dataFileSizes, 200, 0.6, 1000) + checkEngineSizes("batch size = 200, ratio = 0.6", map[int]int{ + 0: 160, + 1: 208, + 2: 332, + }) + + // Allocate 5 engines. + AllocateEngineIDs(filesRegions, dataFileSizes, 100, 0.5, 1000) + checkEngineSizes("batch size = 100", map[int]int{ + 0: 93, + 1: 105, + 2: 122, + 3: 153, + 4: 227, + }) + + // Number of engines > table concurrency + AllocateEngineIDs(filesRegions, dataFileSizes, 50, 0.5, 4) + checkEngineSizes("batch size = 50, limit table conc = 4", map[int]int{ + 0: 50, + 1: 59, + 2: 73, + 3: 110, + 4: 50, + 5: 50, + 6: 50, + 7: 50, + 8: 50, + 9: 50, + 10: 50, + 11: 50, + 12: 8, + }) + + // Zero ratio = Uniform + AllocateEngineIDs(filesRegions, dataFileSizes, 100, 0.0, 1000) + checkEngineSizes("batch size = 100, ratio = 0", map[int]int{ + 0: 100, + 1: 100, + 2: 100, + 3: 100, + 4: 100, + 5: 100, + 6: 100, + }) +} diff --git a/lightning/restore/restore.go b/lightning/restore/restore.go index cc1e4c0c2..10493d4bf 100644 --- a/lightning/restore/restore.go +++ b/lightning/restore/restore.go @@ -461,7 +461,7 @@ func (t *TableRestore) restoreTable( if len(cp.Engines) > 0 { common.AppLogger.Infof("[%s] reusing %d engines and %d chunks from checkpoint", t.tableName, len(cp.Engines), cp.CountChunks()) } else if cp.Status < CheckpointStatusAllWritten { - if err := t.populateChunks(rc.cfg.Mydumper.BatchSize, cp); err != nil { + if err := t.populateChunks(rc.cfg, cp); err != nil { return errors.Trace(err) } if err := rc.checkpointsDB.InsertEngineCheckpoints(ctx, t.tableName, cp.Engines); err != nil { @@ -611,7 +611,17 @@ func (t *TableRestore) restoreEngine( } wg.Wait() - common.AppLogger.Infof("[%s:%d] encode kv data and write takes %v", t.tableName, engineID, time.Since(timer)) + dur := time.Since(timer) + + // Report some statistics into the log for debugging. + totalKVSize := uint64(0) + totalSQLSize := int64(0) + for _, chunk := range cp.Chunks { + totalKVSize += chunk.Checksum.SumSize() + totalSQLSize += chunk.Chunk.EndOffset + } + + common.AppLogger.Infof("[%s:%d] encode kv data and write takes %v (read %d, written %d)", t.tableName, engineID, dur, totalSQLSize, totalKVSize) err = chunkErr.Get() rc.saveStatusCheckpoint(t.tableName, engineID, err, CheckpointStatusAllWritten) if err != nil { @@ -972,11 +982,11 @@ func (tr *TableRestore) Close() { var tidbRowIDColumnRegex = regexp.MustCompile(fmt.Sprintf("`%[1]s`|(?i:\\b%[1]s\\b)", model.ExtraHandleName)) -func (t *TableRestore) populateChunks(batchSize int64, cp *TableCheckpoint) error { +func (t *TableRestore) populateChunks(cfg *config.Config, cp *TableCheckpoint) error { common.AppLogger.Infof("[%s] load chunks", t.tableName) timer := time.Now() - chunks, err := mydump.MakeTableRegions(t.tableMeta, t.tableInfo.Columns, batchSize) + chunks, err := mydump.MakeTableRegions(t.tableMeta, t.tableInfo.Columns, cfg.Mydumper.BatchSize, cfg.Mydumper.BatchImportRatio, cfg.App.TableConcurrency) if err != nil { return errors.Trace(err) } diff --git a/tests/checkpoint_engines/run.sh b/tests/checkpoint_engines/run.sh index 5b2bbf9bd..aabeae4b7 100755 --- a/tests/checkpoint_engines/run.sh +++ b/tests/checkpoint_engines/run.sh @@ -29,7 +29,7 @@ run_sql 'DROP DATABASE cpeng;' export GOFAIL_FAILPOINTS='github.com/pingcap/tidb-lightning/lightning/restore/SlowDownImport=sleep(500);github.com/pingcap/tidb-lightning/lightning/restore/FailIfStatusBecomes=return(120)' set +e -for i in $(seq 4); do +for i in $(seq "$OPEN_ENGINES_COUNT"); do echo "******** Importing Table Now (step $i/4) ********" run_lightning 2> /dev/null [ $? -ne 0 ] || exit 1 @@ -52,7 +52,7 @@ check_contains 'sum(c): 46' run_sql 'DROP DATABASE cpeng;' set +e -for i in $(seq 4); do +for i in $(seq "$OPEN_ENGINES_COUNT"); do echo "******** Importing Table Now (step $i/4) ********" run_lightning mysql 2> /dev/null [ $? -ne 0 ] || exit 1 diff --git a/tidb-lightning.toml b/tidb-lightning.toml index 988ee6fdc..3f4fcf8b6 100644 --- a/tidb-lightning.toml +++ b/tidb-lightning.toml @@ -56,7 +56,18 @@ addr = "127.0.0.1:8808" read-block-size = 65536 # Byte (default = 64 KB) # minimum size (in terms of source data file) of each batch of import. # Lightning will split a large table into multiple engine files according to this size. -batch-size = 10_737_418_240 # Byte (default = 10 GB) +batch-size = 107_374_182_400 # Byte (default = 100 GiB) + +# Engine file needs to be imported sequentially. Due to table-concurrency, multiple engines will be +# imported nearly the same time, and this will create a queue and this wastes resources. Therefore, +# Lightning will slightly increase the size of the first few batches to properly distribute +# resources. The scale up is controlled by this parameter, which expresses the ratio of duration +# between the "import" and "write" steps with full concurrency. This can be calculated as the ratio +# (import duration / write duration) of a single table of size around 1 GB. The exact timing can be +# found in the log. If "import" is faster, the batch size anomaly is smaller, and a ratio of +# zero means uniform batch size. This value should be in the range (0 <= batch-import-ratio < 1). +batch-import-ratio = 0.75 + # mydumper local source data directory data-source-dir = "/tmp/export-20180328-200751" # if no-schema is set true, lightning will get schema information from tidb-server directly without creating them.