Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

Support non-uniform batch size #114

Merged
merged 8 commits into from
Jan 14, 2019
16 changes: 10 additions & 6 deletions lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,12 @@ type PostRestore struct {
}

type MydumperRuntime struct {
ReadBlockSize int64 `toml:"read-block-size" json:"read-block-size"`
BatchSize int64 `toml:"batch-size" json:"batch-size"`
SourceDir string `toml:"data-source-dir" json:"data-source-dir"`
NoSchema bool `toml:"no-schema" json:"no-schema"`
CharacterSet string `toml:"character-set" json:"character-set"`
ReadBlockSize int64 `toml:"read-block-size" json:"read-block-size"`
BatchSize int64 `toml:"batch-size" json:"batch-size"`
BatchImportRatio float64 `toml:"batch-import-ratio" json:"batch-import-ratio"`
SourceDir string `toml:"data-source-dir" json:"data-source-dir"`
NoSchema bool `toml:"no-schema" json:"no-schema"`
CharacterSet string `toml:"character-set" json:"character-set"`
}

type TikvImporter struct {
Expand Down Expand Up @@ -187,7 +188,10 @@ func (cfg *Config) Load() error {

// handle mydumper
if cfg.Mydumper.BatchSize <= 0 {
cfg.Mydumper.BatchSize = 10 * _G
cfg.Mydumper.BatchSize = 100 * _G
}
if cfg.Mydumper.BatchImportRatio < 0.0 || cfg.Mydumper.BatchImportRatio >= 1.0 {
cfg.Mydumper.BatchImportRatio = 0.75
}
if cfg.Mydumper.ReadBlockSize <= 0 {
cfg.Mydumper.ReadBlockSize = ReadBlockSize
Expand Down
98 changes: 85 additions & 13 deletions lightning/mydump/region.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package mydump

import (
"math"
"os"

"github.com/pkg/errors"
Expand Down Expand Up @@ -46,13 +47,89 @@ func (rs regionSlice) Less(i, j int) bool {

////////////////////////////////////////////////////////////////

func MakeTableRegions(meta *MDTableMeta, columns int, batchSize int64) ([]*TableRegion, error) {
func AllocateEngineIDs(
filesRegions []*TableRegion,
dataFileSizes []float64,
batchSize float64,
batchImportRatio float64,
tableConcurrency float64,
) {
totalDataFileSize := 0.0
for _, dataFileSize := range dataFileSizes {
totalDataFileSize += dataFileSize
}

// No need to batch if the size is too small :)
if totalDataFileSize <= batchSize {
return
}

curEngineID := 0
curEngineSize := 0.0
curBatchSize := batchSize

// import() step will not be concurrent.
// If multiple Batch end times are close, it will result in multiple
// Batch import serials. We need use a non-uniform batch size to create a pipeline effect.
// Here we calculate the total number of engines, which is needed to compute the scale up
//
// Total/B1 = 1/(1-R) * (N - 1/beta(N, R))
// ≲ N/(1-R)
//
// We use a simple brute force search since the search space is extremely small.
ratio := totalDataFileSize * (1 - batchImportRatio) / batchSize
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not compute N at here directly, just try to reduce batch size?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because there's no simple formula to solve N in X = N - 1/beta(N, R) 😅.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

N is in a limit values range, means maybe we can use a heuristic way

n := math.Ceil(ratio)
logGammaNPlusR, _ := math.Lgamma(n + batchImportRatio)
logGammaN, _ := math.Lgamma(n)
logGammaR, _ := math.Lgamma(batchImportRatio)
invBetaNR := math.Exp(logGammaNPlusR - logGammaN - logGammaR) // 1/B(N, R) = Γ(N+R)/Γ(N)Γ(R)
for {
if n <= 0 || n > tableConcurrency {
n = tableConcurrency
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we check whether user set a unreasonable table conc?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in addition, it may be better to compute n by given table conc

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't directly set n = tableConcurrency as this may produce engine size which is too large or too small.

Example of too large: 8T table with table-conc = 8, forcing each batch to be ~1T, causing pressure on importer.
Example of too small: 200G table with table-conc = 8, forcing each batch to be ~25G, making the data sent to TiKV less sorted and increases compaction cost.

Copy link
Collaborator

@IANTHEREAL IANTHEREAL Jan 14, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I means we compute n by given batchImportRatio and batchSize, what would happen if batchSize is unreasonable (like too small) and table conc is also small? Will the algorithm degenerate to near-order import?

break
}
realRatio := n - invBetaNR
if realRatio >= ratio {
// we don't have enough engines. reduce the batch size to keep the pipeline smooth.
curBatchSize = totalDataFileSize * (1 - batchImportRatio) / realRatio
break
}
invBetaNR *= 1 + batchImportRatio/n // Γ(X+1) = X * Γ(X)
n += 1.0
}

for i, dataFileSize := range dataFileSizes {
filesRegions[i].EngineID = curEngineID
curEngineSize += dataFileSize

if curEngineSize >= curBatchSize {
curEngineSize = 0
curEngineID++

i := float64(curEngineID)
// calculate the non-uniform batch size
if i >= n {
curBatchSize = batchSize
} else {
// B_(i+1) = B_i * (I/W/(N-i) + 1)
curBatchSize *= batchImportRatio/(n-i) + 1.0
}
}
}
}

func MakeTableRegions(
meta *MDTableMeta,
columns int,
batchSize int64,
batchImportRatio float64,
tableConcurrency int,
) ([]*TableRegion, error) {
// Split files into regions
filesRegions := make(regionSlice, 0, len(meta.DataFiles))
dataFileSizes := make([]float64, 0, len(meta.DataFiles))

prevRowIDMax := int64(0)
curEngineID := 0
curEngineSize := int64(0)
for _, dataFile := range meta.DataFiles {
dataFileInfo, err := os.Stat(dataFile)
if err != nil {
Expand All @@ -61,10 +138,9 @@ func MakeTableRegions(meta *MDTableMeta, columns int, batchSize int64) ([]*Table
dataFileSize := dataFileInfo.Size()
rowIDMax := prevRowIDMax + dataFileSize/(int64(columns)+2)
filesRegions = append(filesRegions, &TableRegion{
EngineID: curEngineID,
DB: meta.DB,
Table: meta.Name,
File: dataFile,
DB: meta.DB,
Table: meta.Name,
File: dataFile,
Chunk: Chunk{
Offset: 0,
EndOffset: dataFileSize,
Expand All @@ -73,13 +149,9 @@ func MakeTableRegions(meta *MDTableMeta, columns int, batchSize int64) ([]*Table
},
})
prevRowIDMax = rowIDMax

curEngineSize += dataFileSize
if curEngineSize > batchSize {
curEngineSize = 0
curEngineID++
}
dataFileSizes = append(dataFileSizes, float64(dataFileSize))
}

AllocateEngineIDs(filesRegions, dataFileSizes, float64(batchSize), batchImportRatio, float64(tableConcurrency))
return filesRegions, nil
}
85 changes: 83 additions & 2 deletions lightning/mydump/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (s *testMydumpRegionSuite) TestTableRegion(c *C) {
dbMeta := loader.GetDatabases()[0]

for _, meta := range dbMeta.Tables {
regions, err := MakeTableRegions(meta, 1, 1)
regions, err := MakeTableRegions(meta, 1, 1, 0, 1)
c.Assert(err, IsNil)

table := meta.Name
Expand Down Expand Up @@ -98,7 +98,7 @@ func (s *testMydumpRegionSuite) TestRegionReader(c *C) {
dbMeta := loader.GetDatabases()[0]

for _, meta := range dbMeta.Tables {
regions, err := MakeTableRegions(meta, 1, 1)
regions, err := MakeTableRegions(meta, 1, 1, 0, 1)
c.Assert(err, IsNil)

tolValTuples := 0
Expand All @@ -116,3 +116,84 @@ func (s *testMydumpRegionSuite) TestRegionReader(c *C) {

return
}

func (s *testMydumpRegionSuite) TestAllocateEngineIDs(c *C) {
dataFileSizes := make([]float64, 700)
for i := range dataFileSizes {
dataFileSizes[i] = 1.0
}
filesRegions := make([]*TableRegion, 0, len(dataFileSizes))
for range dataFileSizes {
filesRegions = append(filesRegions, new(TableRegion))
}

checkEngineSizes := func(what string, expected map[int]int) {
actual := make(map[int]int)
for _, region := range filesRegions {
actual[region.EngineID]++
}
c.Assert(actual, DeepEquals, expected, Commentf("%s", what))
}

// Batch size > Total size => Everything in the zero batch.
AllocateEngineIDs(filesRegions, dataFileSizes, 1000, 0.5, 1000)
checkEngineSizes("no batching", map[int]int{
0: 700,
})

// Allocate 3 engines.
AllocateEngineIDs(filesRegions, dataFileSizes, 200, 0.5, 1000)
checkEngineSizes("batch size = 200", map[int]int{
0: 170,
1: 213,
2: 317,
})

// Allocate 3 engines with an alternative ratio
AllocateEngineIDs(filesRegions, dataFileSizes, 200, 0.6, 1000)
checkEngineSizes("batch size = 200, ratio = 0.6", map[int]int{
0: 160,
1: 208,
2: 332,
})

// Allocate 5 engines.
AllocateEngineIDs(filesRegions, dataFileSizes, 100, 0.5, 1000)
checkEngineSizes("batch size = 100", map[int]int{
0: 93,
1: 105,
2: 122,
3: 153,
4: 227,
})

// Number of engines > table concurrency
AllocateEngineIDs(filesRegions, dataFileSizes, 50, 0.5, 4)
checkEngineSizes("batch size = 50, limit table conc = 4", map[int]int{
0: 50,
1: 59,
2: 73,
3: 110,
4: 50,
5: 50,
6: 50,
7: 50,
8: 50,
9: 50,
10: 50,
11: 50,
12: 8,
})

// Zero ratio = Uniform
AllocateEngineIDs(filesRegions, dataFileSizes, 100, 0.0, 1000)
checkEngineSizes("batch size = 100, ratio = 0", map[int]int{
0: 100,
1: 100,
2: 100,
3: 100,
4: 100,
5: 100,
6: 100,
})
}
18 changes: 14 additions & 4 deletions lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ func (t *TableRestore) restoreTable(
if len(cp.Engines) > 0 {
common.AppLogger.Infof("[%s] reusing %d engines and %d chunks from checkpoint", t.tableName, len(cp.Engines), cp.CountChunks())
} else if cp.Status < CheckpointStatusAllWritten {
if err := t.populateChunks(rc.cfg.Mydumper.BatchSize, cp); err != nil {
if err := t.populateChunks(rc.cfg, cp); err != nil {
return errors.Trace(err)
}
if err := rc.checkpointsDB.InsertEngineCheckpoints(ctx, t.tableName, cp.Engines); err != nil {
Expand Down Expand Up @@ -611,7 +611,17 @@ func (t *TableRestore) restoreEngine(
}

wg.Wait()
common.AppLogger.Infof("[%s:%d] encode kv data and write takes %v", t.tableName, engineID, time.Since(timer))
dur := time.Since(timer)

// Report some statistics into the log for debugging.
totalKVSize := uint64(0)
totalSQLSize := int64(0)
for _, chunk := range cp.Chunks {
totalKVSize += chunk.Checksum.SumSize()
totalSQLSize += chunk.Chunk.EndOffset
}

common.AppLogger.Infof("[%s:%d] encode kv data and write takes %v (read %d, written %d)", t.tableName, engineID, dur, totalSQLSize, totalKVSize)
err = chunkErr.Get()
rc.saveStatusCheckpoint(t.tableName, engineID, err, CheckpointStatusAllWritten)
if err != nil {
Expand Down Expand Up @@ -972,11 +982,11 @@ func (tr *TableRestore) Close() {

var tidbRowIDColumnRegex = regexp.MustCompile(fmt.Sprintf("`%[1]s`|(?i:\\b%[1]s\\b)", model.ExtraHandleName))

func (t *TableRestore) populateChunks(batchSize int64, cp *TableCheckpoint) error {
func (t *TableRestore) populateChunks(cfg *config.Config, cp *TableCheckpoint) error {
common.AppLogger.Infof("[%s] load chunks", t.tableName)
timer := time.Now()

chunks, err := mydump.MakeTableRegions(t.tableMeta, t.tableInfo.Columns, batchSize)
chunks, err := mydump.MakeTableRegions(t.tableMeta, t.tableInfo.Columns, cfg.Mydumper.BatchSize, cfg.Mydumper.BatchImportRatio, cfg.App.TableConcurrency)
if err != nil {
return errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions tests/checkpoint_engines/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ run_sql 'DROP DATABASE cpeng;'

export GOFAIL_FAILPOINTS='github.com/pingcap/tidb-lightning/lightning/restore/SlowDownImport=sleep(500);github.com/pingcap/tidb-lightning/lightning/restore/FailIfStatusBecomes=return(120)'
set +e
for i in $(seq 4); do
for i in $(seq "$OPEN_ENGINES_COUNT"); do
echo "******** Importing Table Now (step $i/4) ********"
run_lightning 2> /dev/null
[ $? -ne 0 ] || exit 1
Expand All @@ -52,7 +52,7 @@ check_contains 'sum(c): 46'
run_sql 'DROP DATABASE cpeng;'

set +e
for i in $(seq 4); do
for i in $(seq "$OPEN_ENGINES_COUNT"); do
echo "******** Importing Table Now (step $i/4) ********"
run_lightning mysql 2> /dev/null
[ $? -ne 0 ] || exit 1
Expand Down
13 changes: 12 additions & 1 deletion tidb-lightning.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,18 @@ addr = "127.0.0.1:8808"
read-block-size = 65536 # Byte (default = 64 KB)
# minimum size (in terms of source data file) of each batch of import.
# Lightning will split a large table into multiple engine files according to this size.
batch-size = 10_737_418_240 # Byte (default = 10 GB)
batch-size = 107_374_182_400 # Byte (default = 100 GiB)

# Engine file needs to be imported sequentially. Due to table-concurrency, multiple engines will be
# imported nearly the same time, and this will create a queue and this wastes resources. Therefore,
# Lightning will slightly increase the size of the first few batches to properly distribute
# resources. The scale up is controlled by this parameter, which expresses the ratio of duration
# between the "import" and "write" steps with full concurrency. This can be calculated as the ratio
# (import duration / write duration) of a single table of size around 1 GB. The exact timing can be
# found in the log. If "import" is faster, the batch size anomaly is smaller, and a ratio of
# zero means uniform batch size. This value should be in the range (0 <= batch-import-ratio < 1).
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how to get import speed, should we provode a constant value?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've expanded the comment a bit. This can be calculated by (import duration / write duration) of a single small table (e.g. ~1 GB).

I do suspect that the ratio is not a constant. It could be affected by the table structure, for instance. But for the 3 tables we've tested the ratio does approach this value. We could optimize the calculation later.

Copy link
Collaborator

@IANTHEREAL IANTHEREAL Jan 14, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just mean give import duration a referenced value, but what's import duration and how to get it for user ? user must have a try and then find it in log

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will we give users a series of different recommended values for different deployments later?

Copy link
Collaborator

@kennytm kennytm Jan 14, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can explain how to choose the best value in the docs and to OPS. But we don't expect users are going to change these values unless they want to do some heavy optimization.

batch-import-ratio = 0.75

# mydumper local source data directory
data-source-dir = "/tmp/export-20180328-200751"
# if no-schema is set true, lightning will get schema information from tidb-server directly without creating them.
Expand Down