Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

mydump,restore: Make 1 Chunk = 1 File and Use Coarse Row ID Assignment #109

Merged
merged 1 commit into from
Dec 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 23 additions & 103 deletions lightning/mydump/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,7 @@ package mydump
import (
"fmt"
"os"
"runtime"
"sort"
"sync"

"github.com/pingcap/tidb-lightning/lightning/common"
"github.com/pkg/errors"
)

Expand All @@ -18,8 +14,7 @@ type TableRegion struct {
Table string
File string

Columns []byte
Chunk Chunk
Chunk Chunk
}

func (reg *TableRegion) Name() string {
Expand Down Expand Up @@ -57,107 +52,32 @@ func (rs regionSlice) Less(i, j int) bool {

////////////////////////////////////////////////////////////////

type RegionFounder struct {
processors chan int
minRegionSize int64
}

func NewRegionFounder(minRegionSize int64) *RegionFounder {
concurrency := runtime.NumCPU() >> 1
if concurrency == 0 {
concurrency = 1
}

processors := make(chan int, concurrency)
for i := 0; i < concurrency; i++ {
processors <- i
}

return &RegionFounder{
processors: processors,
minRegionSize: minRegionSize,
}
}

func (f *RegionFounder) MakeTableRegions(meta *MDTableMeta) ([]*TableRegion, error) {
var lock sync.Mutex
var wg sync.WaitGroup

db := meta.DB
table := meta.Name
processors := f.processors
minRegionSize := f.minRegionSize

var chunkErr error

func MakeTableRegions(meta *MDTableMeta, columns int) ([]*TableRegion, error) {
// Split files into regions
filesRegions := make(regionSlice, 0, len(meta.DataFiles))
for _, dataFile := range meta.DataFiles {
wg.Add(1)
go func(pid int, file string) {
common.AppLogger.Debugf("[%s] loading file's region (%s) ...", table, file)

chunks, err := splitExactChunks(db, table, file, minRegionSize)
lock.Lock()
if err == nil {
filesRegions = append(filesRegions, chunks...)
} else {
chunkErr = errors.Annotatef(err, "%s", file)
common.AppLogger.Errorf("failed to extract chunks from file: %v", chunkErr)
}
lock.Unlock()

processors <- pid
wg.Done()
}(<-processors, dataFile)
}
wg.Wait()

if chunkErr != nil {
return nil, chunkErr
}

// Setup files' regions
sort.Sort(filesRegions) // ps : sort region by - (fileName, fileOffset)
var totalRowCount int64
for i, region := range filesRegions {
region.ID = i

// Every chunk's PrevRowIDMax was uninitialized (set to 0). We need to
// re-adjust the row IDs so they won't be overlapping.
chunkRowCount := region.Chunk.RowIDMax - region.Chunk.PrevRowIDMax
region.Chunk.PrevRowIDMax = totalRowCount
totalRowCount += chunkRowCount
region.Chunk.RowIDMax = totalRowCount
}

return filesRegions, nil
}

func splitExactChunks(db string, table string, file string, minChunkSize int64) ([]*TableRegion, error) {
reader, err := os.Open(file)
if err != nil {
return nil, errors.Trace(err)
}
defer reader.Close()

parser := NewChunkParser(reader)
chunks, err := parser.ReadChunks(minChunkSize)
if err != nil {
return nil, errors.Trace(err)
}

annotatedChunks := make([]*TableRegion, len(chunks))
for i, chunk := range chunks {
annotatedChunks[i] = &TableRegion{
ID: -1,
DB: db,
Table: table,
File: file,
Columns: parser.Columns,
Chunk: chunk,
prevRowIDMax := int64(0)
for i, dataFile := range meta.DataFiles {
dataFileInfo, err := os.Stat(dataFile)
if err != nil {
return nil, errors.Annotatef(err, "cannot stat %s", dataFile)
}
dataFileSize := dataFileInfo.Size()
rowIDMax := prevRowIDMax + dataFileSize/(int64(columns)+2)
filesRegions = append(filesRegions, &TableRegion{
ID: i,
DB: meta.DB,
Table: meta.Name,
File: dataFile,
Chunk: Chunk{
Offset: 0,
EndOffset: dataFileSize,
PrevRowIDMax: prevRowIDMax,
RowIDMax: rowIDMax,
},
})
prevRowIDMax = rowIDMax
}

return annotatedChunks, nil
return filesRegions, nil
}
27 changes: 12 additions & 15 deletions lightning/mydump/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,9 @@ func (s *testMydumpRegionSuite) TestTableRegion(c *C) {
cfg := &config.Config{Mydumper: config.MydumperRuntime{SourceDir: "./examples"}}
loader, _ := NewMyDumpLoader(cfg)
dbMeta := loader.GetDatabases()[0]
founder := NewRegionFounder(defMinRegionSize)

for _, meta := range dbMeta.Tables {
regions, err := founder.MakeTableRegions(meta)
regions, err := MakeTableRegions(meta, 1)
c.Assert(err, IsNil)

table := meta.Name
Expand All @@ -61,19 +60,18 @@ func (s *testMydumpRegionSuite) TestTableRegion(c *C) {
c.Assert(err, IsNil)
tolFileSize += fileSize
}
// var tolRegionSize int64 = 0
// for _, region := range regions {
// tolRegionSize += region.Size()
// }
// c.Assert(tolRegionSize, Equals, tolFileSize)
// (The size will not be equal since the comments at the end are omitted)

// check - rows num
var tolRows int64 = 0
var tolRegionSize int64 = 0
for _, region := range regions {
tolRows += region.Rows()
tolRegionSize += region.Size()
}
c.Assert(tolRows, Equals, expectedTuplesCount[table])
c.Assert(tolRegionSize, Equals, tolFileSize)

// // check - rows num
// var tolRows int64 = 0
// for _, region := range regions {
// tolRows += region.Rows()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to update the name of Rows? because it is not real rows count now.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we would remove useless code later, all code about chunk are useless

// }
// c.Assert(tolRows, Equals, expectedTuplesCount[table])

// check - range
regionNum := len(regions)
Expand All @@ -98,10 +96,9 @@ func (s *testMydumpRegionSuite) TestRegionReader(c *C) {
cfg := &config.Config{Mydumper: config.MydumperRuntime{SourceDir: "./examples"}}
loader, _ := NewMyDumpLoader(cfg)
dbMeta := loader.GetDatabases()[0]
founder := NewRegionFounder(defMinRegionSize)

for _, meta := range dbMeta.Tables {
regions, err := founder.MakeTableRegions(meta)
regions, err := MakeTableRegions(meta, 1)
c.Assert(err, IsNil)

tolValTuples := 0
Expand Down
76 changes: 42 additions & 34 deletions lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ func (t *TableRestore) restore(ctx context.Context, rc *RestoreController, cp *T
if len(cp.Chunks) > 0 {
common.AppLogger.Infof("[%s] reusing %d chunks from checkpoint", t.tableName, len(cp.Chunks))
} else if cp.Status < CheckpointStatusAllWritten {
if err := t.populateChunks(rc.cfg.Mydumper.MinRegionSize, cp, t.tableInfo); err != nil {
if err := t.populateChunks(rc.cfg.Mydumper.MinRegionSize, cp); err != nil {
return nil, errors.Trace(err)
}
if err := rc.checkpointsDB.InsertChunkCheckpoints(ctx, t.tableName, cp.Chunks); err != nil {
Expand Down Expand Up @@ -973,56 +973,58 @@ func (tr *TableRestore) Close() {

var tidbRowIDColumnRegex = regexp.MustCompile(fmt.Sprintf("`%[1]s`|(?i:\\b%[1]s\\b)", model.ExtraHandleName))

func (t *TableRestore) populateChunks(minChunkSize int64, cp *TableCheckpoint, tableInfo *TidbTableInfo) error {
func (t *TableRestore) populateChunks(minChunkSize int64, cp *TableCheckpoint) error {
common.AppLogger.Infof("[%s] load chunks", t.tableName)
timer := time.Now()

founder := mydump.NewRegionFounder(minChunkSize)
chunks, err := founder.MakeTableRegions(t.tableMeta)
chunks, err := mydump.MakeTableRegions(t.tableMeta, t.tableInfo.Columns)
if err != nil {
return errors.Trace(err)
}

cp.Chunks = make([]*ChunkCheckpoint, 0, len(chunks))

for _, chunk := range chunks {
columns := chunk.Columns

shouldIncludeRowID := !tableInfo.core.PKIsHandle && !tidbRowIDColumnRegex.Match(columns)
if shouldIncludeRowID {
// we need to inject the _tidb_rowid column
if len(columns) != 0 {
// column listing already exists, just append the new column.
columns = append(columns[:len(columns)-1], (",`" + model.ExtraHandleName.String() + "`)")...)
} else {
// we need to recreate the columns
var buf bytes.Buffer
buf.WriteString("(`")
for _, columnInfo := range tableInfo.core.Columns {
buf.WriteString(columnInfo.Name.String())
buf.WriteString("`,`")
}
buf.WriteString(model.ExtraHandleName.String())
buf.WriteString("`)")
columns = buf.Bytes()
}
}

cp.Chunks = append(cp.Chunks, &ChunkCheckpoint{
Key: ChunkCheckpointKey{
Path: chunk.File,
Offset: chunk.Chunk.Offset,
},
Columns: columns,
ShouldIncludeRowID: shouldIncludeRowID,
Chunk: chunk.Chunk,
Columns: nil,
Chunk: chunk.Chunk,
})
}

common.AppLogger.Infof("[%s] load %d chunks takes %v", t.tableName, len(chunks), time.Since(timer))
return nil
}

func (t *TableRestore) initializeColumns(columns []byte, ccp *ChunkCheckpoint) {
shouldIncludeRowID := !t.tableInfo.core.PKIsHandle && !tidbRowIDColumnRegex.Match(columns)
if shouldIncludeRowID {
// we need to inject the _tidb_rowid column
if len(columns) != 0 {
// column listing already exists, just append the new column.
columns = append(columns[:len(columns)-1], (",`" + model.ExtraHandleName.String() + "`)")...)
} else {
// we need to recreate the columns
var buf bytes.Buffer
buf.WriteString("(`")
for _, columnInfo := range t.tableInfo.core.Columns {
buf.WriteString(columnInfo.Name.String())
buf.WriteString("`,`")
}
buf.WriteString(model.ExtraHandleName.String())
buf.WriteString("`)")
columns = buf.Bytes()
}
} else if columns == nil {
columns = []byte{}
}
ccp.Columns = columns
ccp.ShouldIncludeRowID = shouldIncludeRowID
}

func (tr *TableRestore) restoreTableMeta(ctx context.Context, db *sql.DB) error {
timer := time.Now()

Expand Down Expand Up @@ -1350,18 +1352,23 @@ func (cr *chunkRestore) restore(
buffer.Reset()
start := time.Now()

buffer.WriteString("INSERT INTO ")
buffer.WriteString(t.tableName)
buffer.Write(cr.chunk.Columns)
buffer.WriteString(" VALUES")
var sep byte = ' '
readLoop:
for cr.parser.Pos() < endOffset {
err := cr.parser.ReadRow()
switch errors.Cause(err) {
case nil:
buffer.WriteByte(sep)
sep = ','
if sep == ' ' {
buffer.WriteString("INSERT INTO ")
buffer.WriteString(t.tableName)
if cr.chunk.Columns == nil {
t.initializeColumns(cr.parser.Columns, cr.chunk)
}
buffer.Write(cr.chunk.Columns)
buffer.WriteString(" VALUES ")
sep = ','
}
lastRow := cr.parser.LastRow()
if cr.chunk.ShouldIncludeRowID {
buffer.Write(lastRow.Row[:len(lastRow.Row)-1])
Expand All @@ -1370,6 +1377,7 @@ func (cr *chunkRestore) restore(
buffer.Write(lastRow.Row)
}
case io.EOF:
cr.chunk.Chunk.EndOffset = cr.parser.Pos()
break readLoop
default:
return errors.Trace(err)
Expand Down
2 changes: 0 additions & 2 deletions tests/checkpoint_chunks/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ check_contains "count(i): $(($ROW_COUNT*$CHUNK_COUNT))"
check_contains "sum(i): $(( $ROW_COUNT*$CHUNK_COUNT*(($CHUNK_COUNT+2)*$ROW_COUNT + 1)/2 ))"
run_sql "SELECT count(*) FROM tidb_lightning_checkpoint_test_cpch.table_v1 WHERE status >= 200"
check_contains "count(*): 1"
run_sql "SELECT count(*) FROM tidb_lightning_checkpoint_test_cpch.chunk_v3 WHERE pos = end_offset"
check_contains "count(*): $CHUNK_COUNT"

# Repeat, but using the file checkpoint
run_sql 'DROP DATABASE IF EXISTS cpch_tsr'
Expand Down
4 changes: 2 additions & 2 deletions tests/examples/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ check_contains 'sum(crc32(name)): 21388950023608'

# Ensure the AUTO_INCREMENT value is properly defined
run_sql "insert into mocker_test.tbl_autoid (name) values ('new');"
run_sql "select id from mocker_test.tbl_autoid where name = 'new';"
run_sql "select id > 10000 from mocker_test.tbl_autoid where name = 'new';"
check_not_contains '* 2. row *'
check_contains 'id: 10001'
check_contains 'id > 10000: 1'

run_sql 'select count(*), avg(age), max(name), min(name), sum(crc32(name)) from mocker_test.tbl_multi_index;'
check_contains 'count(*): 10000'
Expand Down