From 451015d5c8987980b75862e306ca571d88a1505c Mon Sep 17 00:00:00 2001 From: Lonng Date: Wed, 9 Jan 2019 11:29:14 +0800 Subject: [PATCH 1/8] mydump: non-uniform batch size --- lightning/mydump/region.go | 15 ++++++++++++++- tests/checkpoint_engines/run.sh | 6 +++--- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/lightning/mydump/region.go b/lightning/mydump/region.go index f70e0fea8..1c0ffd72d 100644 --- a/lightning/mydump/region.go +++ b/lightning/mydump/region.go @@ -50,9 +50,17 @@ func MakeTableRegions(meta *MDTableMeta, columns int, batchSize int64) ([]*Table // Split files into regions filesRegions := make(regionSlice, 0, len(meta.DataFiles)) + // import() step will not be concurrent. + // If multiple Batch end times are close, it will result in multiple + // Batch import serials. `batchSizeScale` used to implement non-uniform + // batch size: curBatchSize = batchSize / int64(batchSizeScale-curEngineID) + // TODO: make it configurable + const batchSizeScale = 20 + prevRowIDMax := int64(0) curEngineID := 0 curEngineSize := int64(0) + curBatchSize := batchSize / batchSizeScale for _, dataFile := range meta.DataFiles { dataFileInfo, err := os.Stat(dataFile) if err != nil { @@ -75,9 +83,14 @@ func MakeTableRegions(meta *MDTableMeta, columns int, batchSize int64) ([]*Table prevRowIDMax = rowIDMax curEngineSize += dataFileSize - if curEngineSize > batchSize { + if curEngineSize > curBatchSize { curEngineSize = 0 curEngineID++ + if curEngineID < batchSizeScale { + curBatchSize = batchSize / int64(batchSizeScale-curEngineID) + } else { + curBatchSize = batchSize + } } } diff --git a/tests/checkpoint_engines/run.sh b/tests/checkpoint_engines/run.sh index 5b2bbf9bd..0f7f7921c 100755 --- a/tests/checkpoint_engines/run.sh +++ b/tests/checkpoint_engines/run.sh @@ -12,7 +12,7 @@ run_lightning # Check that we have indeed opened 4 engines OPEN_ENGINES_COUNT=$(grep 'open engine' "$TEST_DIR/lightning-checkpoint-engines.log" | wc -l) echo "Number of open engines: $OPEN_ENGINES_COUNT" -[ "$OPEN_ENGINES_COUNT" -eq 4 ] +[ "$OPEN_ENGINES_COUNT" -eq 5 ] # Check that everything is correctly imported run_sql 'SELECT count(*), sum(c) FROM cpeng.a' @@ -29,7 +29,7 @@ run_sql 'DROP DATABASE cpeng;' export GOFAIL_FAILPOINTS='github.com/pingcap/tidb-lightning/lightning/restore/SlowDownImport=sleep(500);github.com/pingcap/tidb-lightning/lightning/restore/FailIfStatusBecomes=return(120)' set +e -for i in $(seq 4); do +for i in $(seq 5); do echo "******** Importing Table Now (step $i/4) ********" run_lightning 2> /dev/null [ $? -ne 0 ] || exit 1 @@ -52,7 +52,7 @@ check_contains 'sum(c): 46' run_sql 'DROP DATABASE cpeng;' set +e -for i in $(seq 4); do +for i in $(seq 5); do echo "******** Importing Table Now (step $i/4) ********" run_lightning mysql 2> /dev/null [ $? -ne 0 ] || exit 1 From 9d121ba8425a51d7383098a91ccce9a9a406d141 Mon Sep 17 00:00:00 2001 From: Lonng Date: Wed, 9 Jan 2019 14:27:53 +0800 Subject: [PATCH 2/8] *: make the `batch-size-scale` configurable --- lightning/config/config.go | 14 +++++++++----- lightning/mydump/region.go | 18 ++++++++---------- lightning/mydump/region_test.go | 4 ++-- lightning/restore/restore.go | 6 +++--- tidb-lightning.toml | 5 +++++ 5 files changed, 27 insertions(+), 20 deletions(-) diff --git a/lightning/config/config.go b/lightning/config/config.go index a4496fab8..fd4a57972 100644 --- a/lightning/config/config.go +++ b/lightning/config/config.go @@ -84,11 +84,12 @@ type PostRestore struct { } type MydumperRuntime struct { - ReadBlockSize int64 `toml:"read-block-size" json:"read-block-size"` - BatchSize int64 `toml:"batch-size" json:"batch-size"` - SourceDir string `toml:"data-source-dir" json:"data-source-dir"` - NoSchema bool `toml:"no-schema" json:"no-schema"` - CharacterSet string `toml:"character-set" json:"character-set"` + ReadBlockSize int64 `toml:"read-block-size" json:"read-block-size"` + BatchSize int64 `toml:"batch-size" json:"batch-size"` + BatchSizeScale int64 `toml:"batch-size-scale" json:"batch-size-scale"` + SourceDir string `toml:"data-source-dir" json:"data-source-dir"` + NoSchema bool `toml:"no-schema" json:"no-schema"` + CharacterSet string `toml:"character-set" json:"character-set"` } type TikvImporter struct { @@ -189,6 +190,9 @@ func (cfg *Config) Load() error { if cfg.Mydumper.BatchSize <= 0 { cfg.Mydumper.BatchSize = 10 * _G } + if cfg.Mydumper.BatchSizeScale <= 0 { + cfg.Mydumper.BatchSizeScale = 5 + } if cfg.Mydumper.ReadBlockSize <= 0 { cfg.Mydumper.ReadBlockSize = ReadBlockSize } diff --git a/lightning/mydump/region.go b/lightning/mydump/region.go index 1c0ffd72d..399c3b3c8 100644 --- a/lightning/mydump/region.go +++ b/lightning/mydump/region.go @@ -46,17 +46,10 @@ func (rs regionSlice) Less(i, j int) bool { //////////////////////////////////////////////////////////////// -func MakeTableRegions(meta *MDTableMeta, columns int, batchSize int64) ([]*TableRegion, error) { +func MakeTableRegions(meta *MDTableMeta, columns int, batchSize, batchSizeScale int64) ([]*TableRegion, error) { // Split files into regions filesRegions := make(regionSlice, 0, len(meta.DataFiles)) - // import() step will not be concurrent. - // If multiple Batch end times are close, it will result in multiple - // Batch import serials. `batchSizeScale` used to implement non-uniform - // batch size: curBatchSize = batchSize / int64(batchSizeScale-curEngineID) - // TODO: make it configurable - const batchSizeScale = 20 - prevRowIDMax := int64(0) curEngineID := 0 curEngineSize := int64(0) @@ -86,8 +79,13 @@ func MakeTableRegions(meta *MDTableMeta, columns int, batchSize int64) ([]*Table if curEngineSize > curBatchSize { curEngineSize = 0 curEngineID++ - if curEngineID < batchSizeScale { - curBatchSize = batchSize / int64(batchSizeScale-curEngineID) + + // import() step will not be concurrent. + // If multiple Batch end times are close, it will result in multiple + // Batch import serials. `batchSizeScale` used to implement non-uniform + // batch size: curBatchSize = batchSize / int64(batchSizeScale-curEngineID) + if int64(curEngineID) < batchSizeScale { + curBatchSize = batchSize / (batchSizeScale - int64(curEngineID)) } else { curBatchSize = batchSize } diff --git a/lightning/mydump/region_test.go b/lightning/mydump/region_test.go index 31fca02c5..0d04fe4f0 100644 --- a/lightning/mydump/region_test.go +++ b/lightning/mydump/region_test.go @@ -38,7 +38,7 @@ func (s *testMydumpRegionSuite) TestTableRegion(c *C) { dbMeta := loader.GetDatabases()[0] for _, meta := range dbMeta.Tables { - regions, err := MakeTableRegions(meta, 1, 1) + regions, err := MakeTableRegions(meta, 1, 1, 1) c.Assert(err, IsNil) table := meta.Name @@ -98,7 +98,7 @@ func (s *testMydumpRegionSuite) TestRegionReader(c *C) { dbMeta := loader.GetDatabases()[0] for _, meta := range dbMeta.Tables { - regions, err := MakeTableRegions(meta, 1, 1) + regions, err := MakeTableRegions(meta, 1, 1, 1) c.Assert(err, IsNil) tolValTuples := 0 diff --git a/lightning/restore/restore.go b/lightning/restore/restore.go index cc1e4c0c2..0a66c9cc2 100644 --- a/lightning/restore/restore.go +++ b/lightning/restore/restore.go @@ -461,7 +461,7 @@ func (t *TableRestore) restoreTable( if len(cp.Engines) > 0 { common.AppLogger.Infof("[%s] reusing %d engines and %d chunks from checkpoint", t.tableName, len(cp.Engines), cp.CountChunks()) } else if cp.Status < CheckpointStatusAllWritten { - if err := t.populateChunks(rc.cfg.Mydumper.BatchSize, cp); err != nil { + if err := t.populateChunks(rc.cfg.Mydumper.BatchSize, rc.cfg.Mydumper.BatchSizeScale, cp); err != nil { return errors.Trace(err) } if err := rc.checkpointsDB.InsertEngineCheckpoints(ctx, t.tableName, cp.Engines); err != nil { @@ -972,11 +972,11 @@ func (tr *TableRestore) Close() { var tidbRowIDColumnRegex = regexp.MustCompile(fmt.Sprintf("`%[1]s`|(?i:\\b%[1]s\\b)", model.ExtraHandleName)) -func (t *TableRestore) populateChunks(batchSize int64, cp *TableCheckpoint) error { +func (t *TableRestore) populateChunks(batchSize, batchSizeScale int64, cp *TableCheckpoint) error { common.AppLogger.Infof("[%s] load chunks", t.tableName) timer := time.Now() - chunks, err := mydump.MakeTableRegions(t.tableMeta, t.tableInfo.Columns, batchSize) + chunks, err := mydump.MakeTableRegions(t.tableMeta, t.tableInfo.Columns, batchSize, batchSizeScale) if err != nil { return errors.Trace(err) } diff --git a/tidb-lightning.toml b/tidb-lightning.toml index 988ee6fdc..1e3f66d62 100644 --- a/tidb-lightning.toml +++ b/tidb-lightning.toml @@ -57,6 +57,11 @@ read-block-size = 65536 # Byte (default = 64 KB) # minimum size (in terms of source data file) of each batch of import. # Lightning will split a large table into multiple engine files according to this size. batch-size = 10_737_418_240 # Byte (default = 10 GB) +# The step import to tikv will execute serially if all batches have the same size +# and some of them complete in close time. `batch-size-scale` makes the batch +# size between `batch-size / batch-size-scale` and `batch-size` to avoid this situation. +# `batch-size-scale = 1` means all batch have the same size (`batch-size`) +batch-size-scale = 5 # mydumper local source data directory data-source-dir = "/tmp/export-20180328-200751" # if no-schema is set true, lightning will get schema information from tidb-server directly without creating them. From 5e68de646559fabd6fcb69d5c685c75019c9aee6 Mon Sep 17 00:00:00 2001 From: kennytm Date: Thu, 10 Jan 2019 03:48:54 +0800 Subject: [PATCH 3/8] *: implemented the optimized non-uniform strategy --- lightning/config/config.go | 16 ++--- lightning/mydump/region.go | 103 ++++++++++++++++++++++++-------- lightning/mydump/region_test.go | 69 ++++++++++++++++++++- lightning/restore/restore.go | 6 +- tidb-lightning.toml | 14 +++-- 5 files changed, 166 insertions(+), 42 deletions(-) diff --git a/lightning/config/config.go b/lightning/config/config.go index fd4a57972..218cebc99 100644 --- a/lightning/config/config.go +++ b/lightning/config/config.go @@ -84,12 +84,12 @@ type PostRestore struct { } type MydumperRuntime struct { - ReadBlockSize int64 `toml:"read-block-size" json:"read-block-size"` - BatchSize int64 `toml:"batch-size" json:"batch-size"` - BatchSizeScale int64 `toml:"batch-size-scale" json:"batch-size-scale"` - SourceDir string `toml:"data-source-dir" json:"data-source-dir"` - NoSchema bool `toml:"no-schema" json:"no-schema"` - CharacterSet string `toml:"character-set" json:"character-set"` + ReadBlockSize int64 `toml:"read-block-size" json:"read-block-size"` + BatchSize int64 `toml:"batch-size" json:"batch-size"` + BatchImportRatio float64 `toml:"batch-import-ratio" json:"batch-import-ratio"` + SourceDir string `toml:"data-source-dir" json:"data-source-dir"` + NoSchema bool `toml:"no-schema" json:"no-schema"` + CharacterSet string `toml:"character-set" json:"character-set"` } type TikvImporter struct { @@ -190,8 +190,8 @@ func (cfg *Config) Load() error { if cfg.Mydumper.BatchSize <= 0 { cfg.Mydumper.BatchSize = 10 * _G } - if cfg.Mydumper.BatchSizeScale <= 0 { - cfg.Mydumper.BatchSizeScale = 5 + if cfg.Mydumper.BatchImportRatio < 0.0 || cfg.Mydumper.BatchImportRatio >= 1.0 { + cfg.Mydumper.BatchImportRatio = 0.5 } if cfg.Mydumper.ReadBlockSize <= 0 { cfg.Mydumper.ReadBlockSize = ReadBlockSize diff --git a/lightning/mydump/region.go b/lightning/mydump/region.go index 399c3b3c8..23fea6588 100644 --- a/lightning/mydump/region.go +++ b/lightning/mydump/region.go @@ -1,6 +1,7 @@ package mydump import ( + "math" "os" "github.com/pkg/errors" @@ -46,14 +47,83 @@ func (rs regionSlice) Less(i, j int) bool { //////////////////////////////////////////////////////////////// -func MakeTableRegions(meta *MDTableMeta, columns int, batchSize, batchSizeScale int64) ([]*TableRegion, error) { +func AllocateEngineIDs( + filesRegions []*TableRegion, + dataFileSizes []float64, + batchSize float64, + batchImportRatio float64, + tableConcurrency float64, +) { + totalDataFileSize := 0.0 + for _, dataFileSize := range dataFileSizes { + totalDataFileSize += dataFileSize + } + + // No need to batch if the size is too small :) + if totalDataFileSize <= batchSize { + return + } + + // import() step will not be concurrent. + // If multiple Batch end times are close, it will result in multiple + // Batch import serials. We need use a non-uniform batch size to create a pipeline effect. + // Here we calculate the total number of engines, which is needed to compute the scale up + // + // Total/B1 = 1/(1-R) * (N - 1/beta(N, R)) + // ≈ 1/(1-R) * (N - N^R/gamma(R)) + // ≲ N/(1-R) + // + // We use a simple brute force search since the search space is extremely small. + ratio := totalDataFileSize * (1 - batchImportRatio) / batchSize + invGammaR := 1.0 / math.Gamma(batchImportRatio) + n := math.Ceil(ratio) + for { + if n <= 0 || n > tableConcurrency { + n = tableConcurrency + break + } + realRatio := n - math.Pow(n, batchImportRatio)*invGammaR + if realRatio > ratio { + break + } + n += 1.0 + } + + curEngineID := 0 + curEngineSize := 0.0 + curBatchSize := batchSize + for i, dataFileSize := range dataFileSizes { + filesRegions[i].EngineID = curEngineID + curEngineSize += dataFileSize + + if curEngineSize >= curBatchSize { + curEngineSize = 0 + curEngineID++ + + i := float64(curEngineID) + // calculate the non-uniform batch size + if i >= n { + curBatchSize = batchSize + } else { + // B_(i+1) = B_i * (I/W/(N-i) + 1) + curBatchSize *= batchImportRatio/(n-i) + 1.0 + } + } + } +} + +func MakeTableRegions( + meta *MDTableMeta, + columns int, + batchSize int64, + batchImportRatio float64, + tableConcurrency int, +) ([]*TableRegion, error) { // Split files into regions filesRegions := make(regionSlice, 0, len(meta.DataFiles)) + dataFileSizes := make([]float64, 0, len(meta.DataFiles)) prevRowIDMax := int64(0) - curEngineID := 0 - curEngineSize := int64(0) - curBatchSize := batchSize / batchSizeScale for _, dataFile := range meta.DataFiles { dataFileInfo, err := os.Stat(dataFile) if err != nil { @@ -62,10 +132,9 @@ func MakeTableRegions(meta *MDTableMeta, columns int, batchSize, batchSizeScale dataFileSize := dataFileInfo.Size() rowIDMax := prevRowIDMax + dataFileSize/(int64(columns)+2) filesRegions = append(filesRegions, &TableRegion{ - EngineID: curEngineID, - DB: meta.DB, - Table: meta.Name, - File: dataFile, + DB: meta.DB, + Table: meta.Name, + File: dataFile, Chunk: Chunk{ Offset: 0, EndOffset: dataFileSize, @@ -74,23 +143,9 @@ func MakeTableRegions(meta *MDTableMeta, columns int, batchSize, batchSizeScale }, }) prevRowIDMax = rowIDMax - - curEngineSize += dataFileSize - if curEngineSize > curBatchSize { - curEngineSize = 0 - curEngineID++ - - // import() step will not be concurrent. - // If multiple Batch end times are close, it will result in multiple - // Batch import serials. `batchSizeScale` used to implement non-uniform - // batch size: curBatchSize = batchSize / int64(batchSizeScale-curEngineID) - if int64(curEngineID) < batchSizeScale { - curBatchSize = batchSize / (batchSizeScale - int64(curEngineID)) - } else { - curBatchSize = batchSize - } - } + dataFileSizes = append(dataFileSizes, float64(dataFileSize)) } + AllocateEngineIDs(filesRegions, dataFileSizes, float64(batchSize), batchImportRatio, float64(tableConcurrency)) return filesRegions, nil } diff --git a/lightning/mydump/region_test.go b/lightning/mydump/region_test.go index 0d04fe4f0..e0cf9ef49 100644 --- a/lightning/mydump/region_test.go +++ b/lightning/mydump/region_test.go @@ -38,7 +38,7 @@ func (s *testMydumpRegionSuite) TestTableRegion(c *C) { dbMeta := loader.GetDatabases()[0] for _, meta := range dbMeta.Tables { - regions, err := MakeTableRegions(meta, 1, 1, 1) + regions, err := MakeTableRegions(meta, 1, 1, 0, 1) c.Assert(err, IsNil) table := meta.Name @@ -98,7 +98,7 @@ func (s *testMydumpRegionSuite) TestRegionReader(c *C) { dbMeta := loader.GetDatabases()[0] for _, meta := range dbMeta.Tables { - regions, err := MakeTableRegions(meta, 1, 1, 1) + regions, err := MakeTableRegions(meta, 1, 1, 0, 1) c.Assert(err, IsNil) tolValTuples := 0 @@ -116,3 +116,68 @@ func (s *testMydumpRegionSuite) TestRegionReader(c *C) { return } + +func (s *testMydumpRegionSuite) TestAllocateEngineIDs(c *C) { + dataFileSizes := make([]float64, 700) + for i := range dataFileSizes { + dataFileSizes[i] = 1.0 + } + filesRegions := make([]*TableRegion, 0, len(dataFileSizes)) + for range dataFileSizes { + filesRegions = append(filesRegions, new(TableRegion)) + } + + checkEngineSizes := func(what string, expected map[int]int) { + actual := make(map[int]int) + for _, region := range filesRegions { + actual[region.EngineID]++ + } + c.Assert(actual, DeepEquals, expected, Commentf("%s", what)) + } + + // Batch size > Total size => Everything in the zero batch. + AllocateEngineIDs(filesRegions, dataFileSizes, 1000, 0.5, 1000) + checkEngineSizes("no batching", map[int]int{ + 0: 700, + }) + + // Allocate 5 engines. + AllocateEngineIDs(filesRegions, dataFileSizes, 100, 0.5, 1000) + checkEngineSizes("batch size = 100", map[int]int{ + 0: 100, + 1: 113, + 2: 132, + 3: 165, + 4: 190, + }) + + // Number of engines > table concurrency + AllocateEngineIDs(filesRegions, dataFileSizes, 50, 0.5, 4) + checkEngineSizes("batch size = 50, limit table conc = 4", map[int]int{ + 0: 50, + 1: 59, + 2: 73, + 3: 110, + 4: 50, + 5: 50, + 6: 50, + 7: 50, + 8: 50, + 9: 50, + 10: 50, + 11: 50, + 12: 8, + }) + + // Zero ratio = Uniform + AllocateEngineIDs(filesRegions, dataFileSizes, 100, 0.0, 1000) + checkEngineSizes("batch size = 100, ratio = 0", map[int]int{ + 0: 100, + 1: 100, + 2: 100, + 3: 100, + 4: 100, + 5: 100, + 6: 100, + }) +} diff --git a/lightning/restore/restore.go b/lightning/restore/restore.go index 0a66c9cc2..dffc47387 100644 --- a/lightning/restore/restore.go +++ b/lightning/restore/restore.go @@ -461,7 +461,7 @@ func (t *TableRestore) restoreTable( if len(cp.Engines) > 0 { common.AppLogger.Infof("[%s] reusing %d engines and %d chunks from checkpoint", t.tableName, len(cp.Engines), cp.CountChunks()) } else if cp.Status < CheckpointStatusAllWritten { - if err := t.populateChunks(rc.cfg.Mydumper.BatchSize, rc.cfg.Mydumper.BatchSizeScale, cp); err != nil { + if err := t.populateChunks(rc.cfg, cp); err != nil { return errors.Trace(err) } if err := rc.checkpointsDB.InsertEngineCheckpoints(ctx, t.tableName, cp.Engines); err != nil { @@ -972,11 +972,11 @@ func (tr *TableRestore) Close() { var tidbRowIDColumnRegex = regexp.MustCompile(fmt.Sprintf("`%[1]s`|(?i:\\b%[1]s\\b)", model.ExtraHandleName)) -func (t *TableRestore) populateChunks(batchSize, batchSizeScale int64, cp *TableCheckpoint) error { +func (t *TableRestore) populateChunks(cfg *config.Config, cp *TableCheckpoint) error { common.AppLogger.Infof("[%s] load chunks", t.tableName) timer := time.Now() - chunks, err := mydump.MakeTableRegions(t.tableMeta, t.tableInfo.Columns, batchSize, batchSizeScale) + chunks, err := mydump.MakeTableRegions(t.tableMeta, t.tableInfo.Columns, cfg.Mydumper.BatchSize, cfg.Mydumper.BatchImportRatio, cfg.App.TableConcurrency) if err != nil { return errors.Trace(err) } diff --git a/tidb-lightning.toml b/tidb-lightning.toml index 1e3f66d62..63d3e0500 100644 --- a/tidb-lightning.toml +++ b/tidb-lightning.toml @@ -57,11 +57,15 @@ read-block-size = 65536 # Byte (default = 64 KB) # minimum size (in terms of source data file) of each batch of import. # Lightning will split a large table into multiple engine files according to this size. batch-size = 10_737_418_240 # Byte (default = 10 GB) -# The step import to tikv will execute serially if all batches have the same size -# and some of them complete in close time. `batch-size-scale` makes the batch -# size between `batch-size / batch-size-scale` and `batch-size` to avoid this situation. -# `batch-size-scale = 1` means all batch have the same size (`batch-size`) -batch-size-scale = 5 + +# Engine file needs to be imported sequentially. Due to table-concurrency, multiple engines will be +# imported nearly the same time, and this will create a queue and this wastes resources. Therefore, +# Lightning will slightly increase the size of the first few batches to properly distribute +# resources. The scale up is controlled by this parameter, which expresses the speed ratio between +# the "import" and "write" steps. If "import" is faster, the batch size anomaly is smaller, and +# zero means uniform batch size. This value should be in the range (0 <= batch-import-ratio < 1). +batch-import-ratio = 0.5 + # mydumper local source data directory data-source-dir = "/tmp/export-20180328-200751" # if no-schema is set true, lightning will get schema information from tidb-server directly without creating them. From 38f82b7af7c28e125368ef51de9b641b5920daf3 Mon Sep 17 00:00:00 2001 From: kennytm Date: Thu, 10 Jan 2019 03:58:30 +0800 Subject: [PATCH 4/8] tests: due to change of strategy, checkpoint_engines count becomes 4 again --- tests/checkpoint_engines/run.sh | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/checkpoint_engines/run.sh b/tests/checkpoint_engines/run.sh index 0f7f7921c..aabeae4b7 100755 --- a/tests/checkpoint_engines/run.sh +++ b/tests/checkpoint_engines/run.sh @@ -12,7 +12,7 @@ run_lightning # Check that we have indeed opened 4 engines OPEN_ENGINES_COUNT=$(grep 'open engine' "$TEST_DIR/lightning-checkpoint-engines.log" | wc -l) echo "Number of open engines: $OPEN_ENGINES_COUNT" -[ "$OPEN_ENGINES_COUNT" -eq 5 ] +[ "$OPEN_ENGINES_COUNT" -eq 4 ] # Check that everything is correctly imported run_sql 'SELECT count(*), sum(c) FROM cpeng.a' @@ -29,7 +29,7 @@ run_sql 'DROP DATABASE cpeng;' export GOFAIL_FAILPOINTS='github.com/pingcap/tidb-lightning/lightning/restore/SlowDownImport=sleep(500);github.com/pingcap/tidb-lightning/lightning/restore/FailIfStatusBecomes=return(120)' set +e -for i in $(seq 5); do +for i in $(seq "$OPEN_ENGINES_COUNT"); do echo "******** Importing Table Now (step $i/4) ********" run_lightning 2> /dev/null [ $? -ne 0 ] || exit 1 @@ -52,7 +52,7 @@ check_contains 'sum(c): 46' run_sql 'DROP DATABASE cpeng;' set +e -for i in $(seq 5); do +for i in $(seq "$OPEN_ENGINES_COUNT"); do echo "******** Importing Table Now (step $i/4) ********" run_lightning mysql 2> /dev/null [ $? -ne 0 ] || exit 1 From 7e517bd213293e851127a01646d0c8b22ec21e6c Mon Sep 17 00:00:00 2001 From: kennytm Date: Thu, 10 Jan 2019 14:45:27 +0800 Subject: [PATCH 5/8] mydump/region: slightly adjust the batch size computation * Use the exact result of 1/Beta(N, R) instead of an approximation * When the number of engines is small and the total engine size of the first (table-concurrency) batches exceed the table size, the last batch was truncated, and disrupt the pipeline. Now in these case we will reduce the batch size to avoid this disruption. --- lightning/mydump/region.go | 20 +++++++++++++------- lightning/mydump/region_test.go | 26 +++++++++++++++++++++----- 2 files changed, 34 insertions(+), 12 deletions(-) diff --git a/lightning/mydump/region.go b/lightning/mydump/region.go index 23fea6588..78aba94d6 100644 --- a/lightning/mydump/region.go +++ b/lightning/mydump/region.go @@ -64,34 +64,40 @@ func AllocateEngineIDs( return } + curEngineID := 0 + curEngineSize := 0.0 + curBatchSize := batchSize + // import() step will not be concurrent. // If multiple Batch end times are close, it will result in multiple // Batch import serials. We need use a non-uniform batch size to create a pipeline effect. // Here we calculate the total number of engines, which is needed to compute the scale up // // Total/B1 = 1/(1-R) * (N - 1/beta(N, R)) - // ≈ 1/(1-R) * (N - N^R/gamma(R)) // ≲ N/(1-R) // // We use a simple brute force search since the search space is extremely small. ratio := totalDataFileSize * (1 - batchImportRatio) / batchSize - invGammaR := 1.0 / math.Gamma(batchImportRatio) n := math.Ceil(ratio) + logGammaNPlusR, _ := math.Lgamma(n + batchImportRatio) + logGammaN, _ := math.Lgamma(n) + logGammaR, _ := math.Lgamma(batchImportRatio) + invBetaNR := math.Exp(logGammaNPlusR - logGammaN - logGammaR) // 1/B(N, R) = Γ(N+R)/Γ(N)Γ(R) for { if n <= 0 || n > tableConcurrency { n = tableConcurrency break } - realRatio := n - math.Pow(n, batchImportRatio)*invGammaR - if realRatio > ratio { + realRatio := n - invBetaNR + if realRatio >= ratio { + // we don't have enough engines. reduce the batch size to keep the pipeline smooth. + curBatchSize = totalDataFileSize * (1 - batchImportRatio) / realRatio break } + invBetaNR *= 1 + batchImportRatio/n // Γ(X+1) = X * Γ(X) n += 1.0 } - curEngineID := 0 - curEngineSize := 0.0 - curBatchSize := batchSize for i, dataFileSize := range dataFileSizes { filesRegions[i].EngineID = curEngineID curEngineSize += dataFileSize diff --git a/lightning/mydump/region_test.go b/lightning/mydump/region_test.go index e0cf9ef49..274c68001 100644 --- a/lightning/mydump/region_test.go +++ b/lightning/mydump/region_test.go @@ -141,14 +141,30 @@ func (s *testMydumpRegionSuite) TestAllocateEngineIDs(c *C) { 0: 700, }) + // Allocate 3 engines. + AllocateEngineIDs(filesRegions, dataFileSizes, 200, 0.5, 1000) + checkEngineSizes("batch size = 200", map[int]int{ + 0: 170, + 1: 213, + 2: 317, + }) + + // Allocate 3 engines with an alternative ratio + AllocateEngineIDs(filesRegions, dataFileSizes, 200, 0.6, 1000) + checkEngineSizes("batch size = 200, ratio = 0.6", map[int]int{ + 0: 160, + 1: 208, + 2: 332, + }) + // Allocate 5 engines. AllocateEngineIDs(filesRegions, dataFileSizes, 100, 0.5, 1000) checkEngineSizes("batch size = 100", map[int]int{ - 0: 100, - 1: 113, - 2: 132, - 3: 165, - 4: 190, + 0: 93, + 1: 105, + 2: 122, + 3: 153, + 4: 227, }) // Number of engines > table concurrency From cd82c4cd6073c1db7518f69f5d203e1600b78999 Mon Sep 17 00:00:00 2001 From: kennytm Date: Thu, 10 Jan 2019 14:52:17 +0800 Subject: [PATCH 6/8] restore: log the SQL size and KV size of each engine for debugging --- lightning/restore/restore.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/lightning/restore/restore.go b/lightning/restore/restore.go index dffc47387..10493d4bf 100644 --- a/lightning/restore/restore.go +++ b/lightning/restore/restore.go @@ -611,7 +611,17 @@ func (t *TableRestore) restoreEngine( } wg.Wait() - common.AppLogger.Infof("[%s:%d] encode kv data and write takes %v", t.tableName, engineID, time.Since(timer)) + dur := time.Since(timer) + + // Report some statistics into the log for debugging. + totalKVSize := uint64(0) + totalSQLSize := int64(0) + for _, chunk := range cp.Chunks { + totalKVSize += chunk.Checksum.SumSize() + totalSQLSize += chunk.Chunk.EndOffset + } + + common.AppLogger.Infof("[%s:%d] encode kv data and write takes %v (read %d, written %d)", t.tableName, engineID, dur, totalSQLSize, totalKVSize) err = chunkErr.Get() rc.saveStatusCheckpoint(t.tableName, engineID, err, CheckpointStatusAllWritten) if err != nil { From 62d8ee3cb3e4a6a21ce8641354c897c6fc04527d Mon Sep 17 00:00:00 2001 From: kennytm Date: Sat, 12 Jan 2019 16:00:20 +0800 Subject: [PATCH 7/8] config: change default batch size and ratio given experiment result --- lightning/config/config.go | 4 ++-- tidb-lightning.toml | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/lightning/config/config.go b/lightning/config/config.go index 218cebc99..ca4fbba5e 100644 --- a/lightning/config/config.go +++ b/lightning/config/config.go @@ -188,10 +188,10 @@ func (cfg *Config) Load() error { // handle mydumper if cfg.Mydumper.BatchSize <= 0 { - cfg.Mydumper.BatchSize = 10 * _G + cfg.Mydumper.BatchSize = 100 * _G } if cfg.Mydumper.BatchImportRatio < 0.0 || cfg.Mydumper.BatchImportRatio >= 1.0 { - cfg.Mydumper.BatchImportRatio = 0.5 + cfg.Mydumper.BatchImportRatio = 0.75 } if cfg.Mydumper.ReadBlockSize <= 0 { cfg.Mydumper.ReadBlockSize = ReadBlockSize diff --git a/tidb-lightning.toml b/tidb-lightning.toml index 63d3e0500..4cbacc357 100644 --- a/tidb-lightning.toml +++ b/tidb-lightning.toml @@ -56,7 +56,7 @@ addr = "127.0.0.1:8808" read-block-size = 65536 # Byte (default = 64 KB) # minimum size (in terms of source data file) of each batch of import. # Lightning will split a large table into multiple engine files according to this size. -batch-size = 10_737_418_240 # Byte (default = 10 GB) +batch-size = 107_374_182_400 # Byte (default = 100 GiB) # Engine file needs to be imported sequentially. Due to table-concurrency, multiple engines will be # imported nearly the same time, and this will create a queue and this wastes resources. Therefore, @@ -64,7 +64,7 @@ batch-size = 10_737_418_240 # Byte (default = 10 GB) # resources. The scale up is controlled by this parameter, which expresses the speed ratio between # the "import" and "write" steps. If "import" is faster, the batch size anomaly is smaller, and # zero means uniform batch size. This value should be in the range (0 <= batch-import-ratio < 1). -batch-import-ratio = 0.5 +batch-import-ratio = 0.75 # mydumper local source data directory data-source-dir = "/tmp/export-20180328-200751" From cd63d6d7abe58566e70390c1a2d8257d7df4af5f Mon Sep 17 00:00:00 2001 From: kennytm Date: Mon, 14 Jan 2019 00:10:58 +0800 Subject: [PATCH 8/8] config: added more explanation about batch-import-ratio --- tidb-lightning.toml | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tidb-lightning.toml b/tidb-lightning.toml index 4cbacc357..3f4fcf8b6 100644 --- a/tidb-lightning.toml +++ b/tidb-lightning.toml @@ -61,8 +61,10 @@ batch-size = 107_374_182_400 # Byte (default = 100 GiB) # Engine file needs to be imported sequentially. Due to table-concurrency, multiple engines will be # imported nearly the same time, and this will create a queue and this wastes resources. Therefore, # Lightning will slightly increase the size of the first few batches to properly distribute -# resources. The scale up is controlled by this parameter, which expresses the speed ratio between -# the "import" and "write" steps. If "import" is faster, the batch size anomaly is smaller, and +# resources. The scale up is controlled by this parameter, which expresses the ratio of duration +# between the "import" and "write" steps with full concurrency. This can be calculated as the ratio +# (import duration / write duration) of a single table of size around 1 GB. The exact timing can be +# found in the log. If "import" is faster, the batch size anomaly is smaller, and a ratio of # zero means uniform batch size. This value should be in the range (0 <= batch-import-ratio < 1). batch-import-ratio = 0.75