Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
Speed up DDL execution by send DDL concurrently (#377)
Browse files Browse the repository at this point in the history
* task, restore: send DDLs parallelly

* restore: use moded id to index sessionpool

* store: return nil DB when no TiDB session provided

* *: use isolated DB instead of Session

* *: fix some mis-pushed files :|

* *: rename session pool to DB pool

* restore: rename some sessions to dbs

* restore: add some todos

Co-authored-by: 3pointer <luancheng@pingcap.com>
  • Loading branch information
YuJuncen and 3pointer authored Jul 6, 2020
1 parent 9da7b91 commit ef92332
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 14 deletions.
79 changes: 67 additions & 12 deletions pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,18 @@ type Client struct {
workerPool *utils.WorkerPool
tlsConf *tls.Config

databases map[string]*utils.Database
ddlJobs []*model.Job
backupMeta *backup.BackupMeta
databases map[string]*utils.Database
ddlJobs []*model.Job
backupMeta *backup.BackupMeta
// TODO Remove this field or replace it with a []*DB,
// since https://github.com/pingcap/br/pull/377 needs more DBs to speed up DDL execution.
// And for now, we must inject a pool of DBs to `Client.GoCreateTables`, otherwise there would be a race condition.
// Which is dirty: why we need DBs from different sources?
// By replace it with a []*DB, we can remove the dirty parameter of `Client.GoCreateTable`,
// along with them in some private functions.
// Before you do it, you can firstly read discussions at
// https://github.com/pingcap/br/pull/377#discussion_r446594501,
// this probably isn't as easy and it seems like (however, not hard, too :D)
db *DB
rateLimit uint64
isOnline bool
Expand Down Expand Up @@ -333,7 +342,7 @@ func (rc *Client) CreateTables(
for i, t := range tables {
tbMapping[t.Info.Name.String()] = i
}
dataCh := rc.GoCreateTables(context.TODO(), dom, tables, newTS, errCh)
dataCh := rc.GoCreateTables(context.TODO(), dom, tables, newTS, nil, errCh)
for et := range dataCh {
rules := et.RewriteRule
rewriteRules.Table = append(rewriteRules.Table, rules.Table...)
Expand All @@ -355,11 +364,24 @@ func (rc *Client) CreateTables(
return rewriteRules, newTables, nil
}

func (rc *Client) createTable(dom *domain.Domain, table *utils.Table, newTS uint64) (CreatedTable, error) {
func (rc *Client) createTable(
ctx context.Context,
db *DB,
dom *domain.Domain,
table *utils.Table,
newTS uint64,
) (CreatedTable, error) {
if db == nil {
db = rc.db
}

if rc.IsSkipCreateSQL() {
log.Info("skip create table and alter autoIncID", zap.Stringer("table", table.Info.Name))
} else {
err := rc.db.CreateTable(rc.ctx, table)
// don't use rc.ctx here...
// remove the ctx field of Client would be a great work,
// we just take a small step here :<
err := db.CreateTable(ctx, table)
if err != nil {
return CreatedTable{}, err
}
Expand All @@ -378,22 +400,25 @@ func (rc *Client) createTable(dom *domain.Domain, table *utils.Table, newTS uint
}

// GoCreateTables create tables, and generate their information.
// this function will use workers as the same number of sessionPool,
// leave sessionPool nil to send DDLs sequential.
func (rc *Client) GoCreateTables(
ctx context.Context,
dom *domain.Domain,
tables []*utils.Table,
newTS uint64,
dbPool []*DB,
errCh chan<- error,
) <-chan CreatedTable {
// Could we have a smaller size of tables?
outCh := make(chan CreatedTable, len(tables))
createOneTable := func(t *utils.Table) error {
createOneTable := func(db *DB, t *utils.Table) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
rt, err := rc.createTable(dom, t, newTS)
rt, err := rc.createTable(ctx, db, dom, t, newTS)
if err != nil {
log.Error("create table failed",
zap.Error(err),
Expand All @@ -408,16 +433,46 @@ func (rc *Client) GoCreateTables(
outCh <- rt
return nil
}
startWork := func(t *utils.Table, done func()) {
defer done()
if err := createOneTable(nil, t); err != nil {
errCh <- err
return
}
}
if len(dbPool) > 0 {
workers := utils.NewWorkerPool(uint(len(dbPool)), "DDL workers")
startWork = func(t *utils.Table, done func()) {
workers.ApplyWithID(func(id uint64) {
defer done()
selectedDB := int(id) % len(dbPool)
if err := createOneTable(dbPool[selectedDB], t); err != nil {
errCh <- err
return
}
})
}
}

go func() {
// TODO replace it with an errgroup
wg := new(sync.WaitGroup)
defer close(outCh)
defer log.Info("all tables created")
defer func() {
if len(dbPool) > 0 {
for _, db := range dbPool {
db.Close()
}
}
}()

for _, table := range tables {
if err := createOneTable(table); err != nil {
errCh <- err
return
}
tbl := table
wg.Add(1)
startWork(tbl, wg.Done)
}
wg.Wait()
}()
return outCh
}
Expand Down
13 changes: 13 additions & 0 deletions pkg/restore/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,19 @@ func GetSSTMetaFromFile(
}
}

// MakeDBPool makes a session pool with specficated size by sessionFactory.
func MakeDBPool(size uint, dbFactory func() (*DB, error)) ([]*DB, error) {
dbPool := make([]*DB, 0, size)
for i := uint(0); i < size; i++ {
db, e := dbFactory()
if e != nil {
return dbPool, e
}
dbPool = append(dbPool, db)
}
return dbPool, nil
}

// EstimateRangeSize estimates the total range count by file.
func EstimateRangeSize(files []*backup.File) int {
result := 0
Expand Down
17 changes: 16 additions & 1 deletion pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const (

defaultRestoreConcurrency = 128
maxRestoreBatchSizeLimit = 256
defaultDDLConcurrency = 16
)

var (
Expand Down Expand Up @@ -188,7 +189,21 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf

// We make bigger errCh so we won't block on multi-part failed.
errCh := make(chan error, 32)
tableStream := client.GoCreateTables(ctx, mgr.GetDomain(), tables, newTS, errCh)
// Maybe allow user modify the DDL concurrency isn't necessary,
// because executing DDL is really I/O bound (or, algorithm bound?),
// and we cost most of time at waiting DDL jobs be enqueued.
// So these jobs won't be faster or slower when machine become faster or slower,
// hence make it a fixed value would be fine.
dbPool, err := restore.MakeDBPool(defaultDDLConcurrency, func() (*restore.DB, error) {
return restore.NewDB(g, mgr.GetTiKV())
})
if err != nil {
log.Warn("create session pool failed, we will send DDLs only by created sessions",
zap.Error(err),
zap.Int("sessionCount", len(dbPool)),
)
}
tableStream := client.GoCreateTables(ctx, mgr.GetDomain(), tables, newTS, dbPool, errCh)
if len(files) == 0 {
log.Info("no files, empty databases and tables are restored")
summary.SetSuccessStatus(true)
Expand Down
8 changes: 7 additions & 1 deletion pkg/utils/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type Worker struct {
}

type taskFunc func()
type identifiedTaskFunc func(uint64)

// NewWorkerPool returns a WorkPool.
func NewWorkerPool(limit uint, name string) *WorkerPool {
Expand All @@ -36,6 +37,11 @@ func NewWorkerPool(limit uint, name string) *WorkerPool {

// Apply executes a task.
func (pool *WorkerPool) Apply(fn taskFunc) {
pool.ApplyWithID(func(_ uint64) { fn() })
}

// ApplyWithID execute a task and provides it with the worker ID.
func (pool *WorkerPool) ApplyWithID(fn identifiedTaskFunc) {
var worker *Worker
select {
case worker = <-pool.workers:
Expand All @@ -44,7 +50,7 @@ func (pool *WorkerPool) Apply(fn taskFunc) {
worker = <-pool.workers
}
go func() {
fn()
fn(worker.ID)
pool.recycle(worker)
}()
}
Expand Down

0 comments on commit ef92332

Please sign in to comment.