Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Speed up DDL execution by send DDL concurrently #377

Merged
merged 14 commits into from
Jul 6, 2020
63 changes: 54 additions & 9 deletions pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ func (rc *Client) CreateTables(
for i, t := range tables {
tbMapping[t.Info.Name.String()] = i
}
dataCh := rc.GoCreateTables(context.TODO(), dom, tables, newTS, errCh)
dataCh := rc.GoCreateTables(context.TODO(), dom, tables, newTS, nil, errCh)
for et := range dataCh {
rules := et.RewriteRule
rewriteRules.Table = append(rewriteRules.Table, rules.Table...)
Expand All @@ -355,11 +355,24 @@ func (rc *Client) CreateTables(
return rewriteRules, newTables, nil
}

func (rc *Client) createTable(dom *domain.Domain, table *utils.Table, newTS uint64) (CreatedTable, error) {
func (rc *Client) createTable(
ctx context.Context,
db *DB,
dom *domain.Domain,
table *utils.Table,
newTS uint64,
) (CreatedTable, error) {
if db == nil {
db = rc.db
}
Comment on lines +374 to +376
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove rc.db, or make the DB pool []*DB a field of rc *Client?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rc.db still be used for other DDLs (e.g. CREATE DATABASE, ALTER TABLE SET TIFLASH_REPLICA = x, incremental DDL execution, etc.), if we want to remove it, we must make many changes. (BTW It's seems a little strange if these uses db like rc.dbs[n] outside GoCreateTables, there is an array but we just need one of them at anytime, since those DDLs are executed sequential.).

Maybe do that at another refactoring PR would be better?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could have a rc.GetDB() and rotate the db list on every call 🙃.

anyway, as we decide to refactor in the future, please add a TODO on the db field.

Copy link
Collaborator Author

@YuJuncen YuJuncen Jun 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But how GetDB() should be implemented? If we trivially return rc.dbs[0], seems we cannot take any good from it... Because we still need to inject some 'id' to every call if we need to use other dbs (hence we can't remove the extra parameter of createTable), and we use just one db at once (hence we can't speed up execution outside restoreTables).

I think maybe we need farther discussion to make sure whether we need to prune the db field... 🤔

Or any better idea if possible?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@YuJuncen round-robin?

db := rc.dbs[rc.curDB]
rc.curDB++
if rc.curDB >= len(rc.dbs) {
    rc.curDB = 0
}
return db

Copy link
Collaborator Author

@YuJuncen YuJuncen Jun 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's a good idea, but seem here would still be some risk if we want to use this algorithm to allocate session for processes concurrently running, that is, when some session stuck:

Assume we have two sessions, and two processes.

(Process 0) |(takes dbs[0]) -------------------------------------------------------------------->(done)|
(Process 1)      |(takes dbs[1])----->(done)|    |(takes dbs[0], boom, race condition!)*

Maybe this cannot provide more for us... Our questions still present: we cannot remove the extra parameter of createTable, and out of createTable, we cannot take good of multi-dbs too, since they are executed sequentially.


if rc.IsSkipCreateSQL() {
log.Info("skip create table and alter autoIncID", zap.Stringer("table", table.Info.Name))
} else {
err := rc.db.CreateTable(rc.ctx, table)
// don't use rc.ctx here...
// remove the ctx field of Client would be a great work,
// we just take a small step here :<
err := db.CreateTable(ctx, table)
if err != nil {
return CreatedTable{}, err
}
Expand All @@ -378,22 +391,25 @@ func (rc *Client) createTable(dom *domain.Domain, table *utils.Table, newTS uint
}

// GoCreateTables create tables, and generate their information.
// this function will use workers as the same number of sessionPool,
// leave sessionPool nil to send DDLs sequential.
func (rc *Client) GoCreateTables(
ctx context.Context,
dom *domain.Domain,
tables []*utils.Table,
newTS uint64,
dbPool []*DB,
errCh chan<- error,
) <-chan CreatedTable {
// Could we have a smaller size of tables?
outCh := make(chan CreatedTable, len(tables))
createOneTable := func(t *utils.Table) error {
createOneTable := func(db *DB, t *utils.Table) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
rt, err := rc.createTable(dom, t, newTS)
rt, err := rc.createTable(ctx, db, dom, t, newTS)
if err != nil {
log.Error("create table failed",
zap.Error(err),
Expand All @@ -408,16 +424,45 @@ func (rc *Client) GoCreateTables(
outCh <- rt
return nil
}
startWork := func(t *utils.Table, done func()) {
defer done()
if err := createOneTable(nil, t); err != nil {
errCh <- err
return
}
}
if len(dbPool) > 0 {
workers := utils.NewWorkerPool(uint(len(dbPool)), "DDL workers")
startWork = func(t *utils.Table, done func()) {
workers.ApplyWithID(func(id uint64) {
defer done()
selectedDB := int(id) % len(dbPool)
if err := createOneTable(dbPool[selectedDB], t); err != nil {
errCh <- err
return
}
})
}
}

go func() {
wg := new(sync.WaitGroup)
YuJuncen marked this conversation as resolved.
Show resolved Hide resolved
defer close(outCh)
defer log.Info("all tables created")
defer func() {
if len(dbPool) > 0 {
for _, db := range dbPool {
db.Close()
}
}
}()

for _, table := range tables {
if err := createOneTable(table); err != nil {
errCh <- err
return
}
tbl := table
wg.Add(1)
startWork(tbl, wg.Done)
}
wg.Wait()
}()
return outCh
}
Expand Down
13 changes: 13 additions & 0 deletions pkg/restore/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,19 @@ func GetSSTMetaFromFile(
}
}

// MakeDBPool makes a session pool with specficated size by sessionFactory.
func MakeDBPool(size uint, dbFactory func() (*DB, error)) ([]*DB, error) {
dbPool := make([]*DB, 0, size)
for i := uint(0); i < size; i++ {
db, e := dbFactory()
if e != nil {
return dbPool, e
}
dbPool = append(dbPool, db)
}
return dbPool, nil
}

// EstimateRangeSize estimates the total range count by file.
func EstimateRangeSize(files []*backup.File) int {
result := 0
Expand Down
17 changes: 16 additions & 1 deletion pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const (

defaultRestoreConcurrency = 128
maxRestoreBatchSizeLimit = 256
defaultDDLConcurrency = 16
)

var (
Expand Down Expand Up @@ -188,7 +189,21 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf

// We make bigger errCh so we won't block on multi-part failed.
errCh := make(chan error, 32)
tableStream := client.GoCreateTables(ctx, mgr.GetDomain(), tables, newTS, errCh)
// Maybe allow user modify the DDL concurrency isn't necessary,
// because executing DDL is really I/O bound (or, algorithm bound?),
// and we cost most of time at waiting DDL jobs be enqueued.
// So these jobs won't be faster or slower when machine become faster or slower,
// hence make it a fixed value would be fine.
dbPool, err := restore.MakeDBPool(defaultDDLConcurrency, func() (*restore.DB, error) {
return restore.NewDB(g, mgr.GetTiKV())
})
if err != nil {
log.Warn("create session pool failed, we will send DDLs only by created sessions",
zap.Error(err),
zap.Int("sessionCount", len(dbPool)),
)
YuJuncen marked this conversation as resolved.
Show resolved Hide resolved
}
tableStream := client.GoCreateTables(ctx, mgr.GetDomain(), tables, newTS, dbPool, errCh)
if len(files) == 0 {
log.Info("no files, empty databases and tables are restored")
summary.SetSuccessStatus(true)
Expand Down
8 changes: 7 additions & 1 deletion pkg/utils/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type Worker struct {
}

type taskFunc func()
type identifiedTaskFunc func(uint64)

// NewWorkerPool returns a WorkPool.
func NewWorkerPool(limit uint, name string) *WorkerPool {
Expand All @@ -36,6 +37,11 @@ func NewWorkerPool(limit uint, name string) *WorkerPool {

// Apply executes a task.
func (pool *WorkerPool) Apply(fn taskFunc) {
pool.ApplyWithID(func(_ uint64) { fn() })
}

// ApplyWithID execute a task and provides it with the worker ID.
func (pool *WorkerPool) ApplyWithID(fn identifiedTaskFunc) {
var worker *Worker
select {
case worker = <-pool.workers:
Expand All @@ -44,7 +50,7 @@ func (pool *WorkerPool) Apply(fn taskFunc) {
worker = <-pool.workers
}
go func() {
fn()
fn(worker.ID)
pool.recycle(worker)
}()
}
Expand Down