Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

Commit

Permalink
*: make the batch-size-scale configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
lonng committed Jan 9, 2019
1 parent 451015d commit 9d121ba
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 20 deletions.
14 changes: 9 additions & 5 deletions lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,12 @@ type PostRestore struct {
}

type MydumperRuntime struct {
ReadBlockSize int64 `toml:"read-block-size" json:"read-block-size"`
BatchSize int64 `toml:"batch-size" json:"batch-size"`
SourceDir string `toml:"data-source-dir" json:"data-source-dir"`
NoSchema bool `toml:"no-schema" json:"no-schema"`
CharacterSet string `toml:"character-set" json:"character-set"`
ReadBlockSize int64 `toml:"read-block-size" json:"read-block-size"`
BatchSize int64 `toml:"batch-size" json:"batch-size"`
BatchSizeScale int64 `toml:"batch-size-scale" json:"batch-size-scale"`
SourceDir string `toml:"data-source-dir" json:"data-source-dir"`
NoSchema bool `toml:"no-schema" json:"no-schema"`
CharacterSet string `toml:"character-set" json:"character-set"`
}

type TikvImporter struct {
Expand Down Expand Up @@ -189,6 +190,9 @@ func (cfg *Config) Load() error {
if cfg.Mydumper.BatchSize <= 0 {
cfg.Mydumper.BatchSize = 10 * _G
}
if cfg.Mydumper.BatchSizeScale <= 0 {
cfg.Mydumper.BatchSizeScale = 5
}
if cfg.Mydumper.ReadBlockSize <= 0 {
cfg.Mydumper.ReadBlockSize = ReadBlockSize
}
Expand Down
18 changes: 8 additions & 10 deletions lightning/mydump/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,10 @@ func (rs regionSlice) Less(i, j int) bool {

////////////////////////////////////////////////////////////////

func MakeTableRegions(meta *MDTableMeta, columns int, batchSize int64) ([]*TableRegion, error) {
func MakeTableRegions(meta *MDTableMeta, columns int, batchSize, batchSizeScale int64) ([]*TableRegion, error) {
// Split files into regions
filesRegions := make(regionSlice, 0, len(meta.DataFiles))

// import() step will not be concurrent.
// If multiple Batch end times are close, it will result in multiple
// Batch import serials. `batchSizeScale` used to implement non-uniform
// batch size: curBatchSize = batchSize / int64(batchSizeScale-curEngineID)
// TODO: make it configurable
const batchSizeScale = 20

prevRowIDMax := int64(0)
curEngineID := 0
curEngineSize := int64(0)
Expand Down Expand Up @@ -86,8 +79,13 @@ func MakeTableRegions(meta *MDTableMeta, columns int, batchSize int64) ([]*Table
if curEngineSize > curBatchSize {
curEngineSize = 0
curEngineID++
if curEngineID < batchSizeScale {
curBatchSize = batchSize / int64(batchSizeScale-curEngineID)

// import() step will not be concurrent.
// If multiple Batch end times are close, it will result in multiple
// Batch import serials. `batchSizeScale` used to implement non-uniform
// batch size: curBatchSize = batchSize / int64(batchSizeScale-curEngineID)
if int64(curEngineID) < batchSizeScale {
curBatchSize = batchSize / (batchSizeScale - int64(curEngineID))
} else {
curBatchSize = batchSize
}
Expand Down
4 changes: 2 additions & 2 deletions lightning/mydump/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (s *testMydumpRegionSuite) TestTableRegion(c *C) {
dbMeta := loader.GetDatabases()[0]

for _, meta := range dbMeta.Tables {
regions, err := MakeTableRegions(meta, 1, 1)
regions, err := MakeTableRegions(meta, 1, 1, 1)
c.Assert(err, IsNil)

table := meta.Name
Expand Down Expand Up @@ -98,7 +98,7 @@ func (s *testMydumpRegionSuite) TestRegionReader(c *C) {
dbMeta := loader.GetDatabases()[0]

for _, meta := range dbMeta.Tables {
regions, err := MakeTableRegions(meta, 1, 1)
regions, err := MakeTableRegions(meta, 1, 1, 1)
c.Assert(err, IsNil)

tolValTuples := 0
Expand Down
6 changes: 3 additions & 3 deletions lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ func (t *TableRestore) restoreTable(
if len(cp.Engines) > 0 {
common.AppLogger.Infof("[%s] reusing %d engines and %d chunks from checkpoint", t.tableName, len(cp.Engines), cp.CountChunks())
} else if cp.Status < CheckpointStatusAllWritten {
if err := t.populateChunks(rc.cfg.Mydumper.BatchSize, cp); err != nil {
if err := t.populateChunks(rc.cfg.Mydumper.BatchSize, rc.cfg.Mydumper.BatchSizeScale, cp); err != nil {
return errors.Trace(err)
}
if err := rc.checkpointsDB.InsertEngineCheckpoints(ctx, t.tableName, cp.Engines); err != nil {
Expand Down Expand Up @@ -972,11 +972,11 @@ func (tr *TableRestore) Close() {

var tidbRowIDColumnRegex = regexp.MustCompile(fmt.Sprintf("`%[1]s`|(?i:\\b%[1]s\\b)", model.ExtraHandleName))

func (t *TableRestore) populateChunks(batchSize int64, cp *TableCheckpoint) error {
func (t *TableRestore) populateChunks(batchSize, batchSizeScale int64, cp *TableCheckpoint) error {
common.AppLogger.Infof("[%s] load chunks", t.tableName)
timer := time.Now()

chunks, err := mydump.MakeTableRegions(t.tableMeta, t.tableInfo.Columns, batchSize)
chunks, err := mydump.MakeTableRegions(t.tableMeta, t.tableInfo.Columns, batchSize, batchSizeScale)
if err != nil {
return errors.Trace(err)
}
Expand Down
5 changes: 5 additions & 0 deletions tidb-lightning.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ read-block-size = 65536 # Byte (default = 64 KB)
# minimum size (in terms of source data file) of each batch of import.
# Lightning will split a large table into multiple engine files according to this size.
batch-size = 10_737_418_240 # Byte (default = 10 GB)
# The step import to tikv will execute serially if all batches have the same size
# and some of them complete in close time. `batch-size-scale` makes the batch
# size between `batch-size / batch-size-scale` and `batch-size` to avoid this situation.
# `batch-size-scale = 1` means all batch have the same size (`batch-size`)
batch-size-scale = 5
# mydumper local source data directory
data-source-dir = "/tmp/export-20180328-200751"
# if no-schema is set true, lightning will get schema information from tidb-server directly without creating them.
Expand Down

0 comments on commit 9d121ba

Please sign in to comment.