diff --git a/pkg/restore/client.go b/pkg/restore/client.go index 58bc3771a..29e992416 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -59,9 +59,18 @@ type Client struct { workerPool *utils.WorkerPool tlsConf *tls.Config - databases map[string]*utils.Database - ddlJobs []*model.Job - backupMeta *backup.BackupMeta + databases map[string]*utils.Database + ddlJobs []*model.Job + backupMeta *backup.BackupMeta + // TODO Remove this field or replace it with a []*DB, + // since https://github.com/pingcap/br/pull/377 needs more DBs to speed up DDL execution. + // And for now, we must inject a pool of DBs to `Client.GoCreateTables`, otherwise there would be a race condition. + // Which is dirty: why we need DBs from different sources? + // By replace it with a []*DB, we can remove the dirty parameter of `Client.GoCreateTable`, + // along with them in some private functions. + // Before you do it, you can firstly read discussions at + // https://github.com/pingcap/br/pull/377#discussion_r446594501, + // this probably isn't as easy and it seems like (however, not hard, too :D) db *DB rateLimit uint64 isOnline bool @@ -337,7 +346,7 @@ func (rc *Client) CreateTables( for i, t := range tables { tbMapping[t.Info.Name.String()] = i } - dataCh := rc.GoCreateTables(context.TODO(), dom, tables, newTS, errCh) + dataCh := rc.GoCreateTables(context.TODO(), dom, tables, newTS, nil, errCh) for et := range dataCh { rules := et.RewriteRule rewriteRules.Table = append(rewriteRules.Table, rules.Table...) @@ -359,11 +368,24 @@ func (rc *Client) CreateTables( return rewriteRules, newTables, nil } -func (rc *Client) createTable(dom *domain.Domain, table *utils.Table, newTS uint64) (CreatedTable, error) { +func (rc *Client) createTable( + ctx context.Context, + db *DB, + dom *domain.Domain, + table *utils.Table, + newTS uint64, +) (CreatedTable, error) { + if db == nil { + db = rc.db + } + if rc.IsSkipCreateSQL() { log.Info("skip create table and alter autoIncID", zap.Stringer("table", table.Info.Name)) } else { - err := rc.db.CreateTable(rc.ctx, table) + // don't use rc.ctx here... + // remove the ctx field of Client would be a great work, + // we just take a small step here :< + err := db.CreateTable(ctx, table) if err != nil { return CreatedTable{}, err } @@ -382,22 +404,25 @@ func (rc *Client) createTable(dom *domain.Domain, table *utils.Table, newTS uint } // GoCreateTables create tables, and generate their information. +// this function will use workers as the same number of sessionPool, +// leave sessionPool nil to send DDLs sequential. func (rc *Client) GoCreateTables( ctx context.Context, dom *domain.Domain, tables []*utils.Table, newTS uint64, + dbPool []*DB, errCh chan<- error, ) <-chan CreatedTable { // Could we have a smaller size of tables? outCh := make(chan CreatedTable, len(tables)) - createOneTable := func(t *utils.Table) error { + createOneTable := func(db *DB, t *utils.Table) error { select { case <-ctx.Done(): return ctx.Err() default: } - rt, err := rc.createTable(dom, t, newTS) + rt, err := rc.createTable(ctx, db, dom, t, newTS) if err != nil { log.Error("create table failed", zap.Error(err), @@ -412,16 +437,46 @@ func (rc *Client) GoCreateTables( outCh <- rt return nil } + startWork := func(t *utils.Table, done func()) { + defer done() + if err := createOneTable(nil, t); err != nil { + errCh <- err + return + } + } + if len(dbPool) > 0 { + workers := utils.NewWorkerPool(uint(len(dbPool)), "DDL workers") + startWork = func(t *utils.Table, done func()) { + workers.ApplyWithID(func(id uint64) { + defer done() + selectedDB := int(id) % len(dbPool) + if err := createOneTable(dbPool[selectedDB], t); err != nil { + errCh <- err + return + } + }) + } + } + go func() { + // TODO replace it with an errgroup + wg := new(sync.WaitGroup) defer close(outCh) defer log.Info("all tables created") + defer func() { + if len(dbPool) > 0 { + for _, db := range dbPool { + db.Close() + } + } + }() for _, table := range tables { - if err := createOneTable(table); err != nil { - errCh <- err - return - } + tbl := table + wg.Add(1) + startWork(tbl, wg.Done) } + wg.Wait() }() return outCh } diff --git a/pkg/restore/util.go b/pkg/restore/util.go index b11db57f2..cfb991459 100644 --- a/pkg/restore/util.go +++ b/pkg/restore/util.go @@ -141,6 +141,19 @@ func GetSSTMetaFromFile( } } +// MakeDBPool makes a session pool with specficated size by sessionFactory. +func MakeDBPool(size uint, dbFactory func() (*DB, error)) ([]*DB, error) { + dbPool := make([]*DB, 0, size) + for i := uint(0); i < size; i++ { + db, e := dbFactory() + if e != nil { + return dbPool, e + } + dbPool = append(dbPool, db) + } + return dbPool, nil +} + // EstimateRangeSize estimates the total range count by file. func EstimateRangeSize(files []*backup.File) int { result := 0 diff --git a/pkg/task/restore.go b/pkg/task/restore.go index 65fcfabbd..3bc4ec3eb 100644 --- a/pkg/task/restore.go +++ b/pkg/task/restore.go @@ -30,6 +30,7 @@ const ( defaultRestoreConcurrency = 128 maxRestoreBatchSizeLimit = 256 + defaultDDLConcurrency = 16 ) var ( @@ -188,7 +189,21 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf // We make bigger errCh so we won't block on multi-part failed. errCh := make(chan error, 32) - tableStream := client.GoCreateTables(ctx, mgr.GetDomain(), tables, newTS, errCh) + // Maybe allow user modify the DDL concurrency isn't necessary, + // because executing DDL is really I/O bound (or, algorithm bound?), + // and we cost most of time at waiting DDL jobs be enqueued. + // So these jobs won't be faster or slower when machine become faster or slower, + // hence make it a fixed value would be fine. + dbPool, err := restore.MakeDBPool(defaultDDLConcurrency, func() (*restore.DB, error) { + return restore.NewDB(g, mgr.GetTiKV()) + }) + if err != nil { + log.Warn("create session pool failed, we will send DDLs only by created sessions", + zap.Error(err), + zap.Int("sessionCount", len(dbPool)), + ) + } + tableStream := client.GoCreateTables(ctx, mgr.GetDomain(), tables, newTS, dbPool, errCh) if len(files) == 0 { log.Info("no files, empty databases and tables are restored") summary.SetSuccessStatus(true) diff --git a/pkg/utils/worker.go b/pkg/utils/worker.go index 4c5bcf684..67e9d0dcb 100644 --- a/pkg/utils/worker.go +++ b/pkg/utils/worker.go @@ -21,7 +21,7 @@ type Worker struct { } type taskFunc func() -type identifiedTaskFunc func(id uint64) +type identifiedTaskFunc func(uint64) // NewWorkerPool returns a WorkPool. func NewWorkerPool(limit uint, name string) *WorkerPool {