From 6af92ed6d21a032ad7ddf83d1106eb27da79647e Mon Sep 17 00:00:00 2001 From: Hillium Date: Tue, 23 Jun 2020 16:45:28 +0800 Subject: [PATCH 1/8] task, restore: send DDLs parallelly --- pkg/restore/client.go | 58 ++++++++++++++++++++++++++++++++++++------- pkg/restore/db.go | 58 ++++++++++++++++++++++++++++++------------- pkg/task/restore.go | 14 ++++++++++- pkg/utils/worker.go | 8 +++++- 4 files changed, 110 insertions(+), 28 deletions(-) diff --git a/pkg/restore/client.go b/pkg/restore/client.go index 4e1f855c0..fc302b94c 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -333,7 +333,7 @@ func (rc *Client) CreateTables( for i, t := range tables { tbMapping[t.Info.Name.String()] = i } - dataCh := rc.GoCreateTables(context.TODO(), dom, tables, newTS, errCh) + dataCh := rc.GoCreateTables(context.TODO(), dom, tables, newTS, nil, errCh) for et := range dataCh { rules := et.RewriteRule rewriteRules.Table = append(rewriteRules.Table, rules.Table...) @@ -355,11 +355,19 @@ func (rc *Client) CreateTables( return rewriteRules, newTables, nil } -func (rc *Client) createTable(dom *domain.Domain, table *utils.Table, newTS uint64) (CreatedTable, error) { +func (rc *Client) createTable( + ctx context.Context, + dom *domain.Domain, + table *utils.Table, + newTS uint64, +) (CreatedTable, error) { if rc.IsSkipCreateSQL() { log.Info("skip create table and alter autoIncID", zap.Stringer("table", table.Info.Name)) } else { - err := rc.db.CreateTable(rc.ctx, table) + // don't use rc.ctx here... + // remove the ctx field of Client would be a great work, + // we just take a small step here :< + err := rc.db.CreateTable(ctx, table) if err != nil { return CreatedTable{}, err } @@ -378,22 +386,25 @@ func (rc *Client) createTable(dom *domain.Domain, table *utils.Table, newTS uint } // GoCreateTables create tables, and generate their information. +// this function will use workers as the same number of sessionPool, +// leave sessionPool nil to send DDLs sequential. func (rc *Client) GoCreateTables( ctx context.Context, dom *domain.Domain, tables []*utils.Table, newTS uint64, + sessionPool []glue.Session, errCh chan<- error, ) <-chan CreatedTable { // Could we have a smaller size of tables? outCh := make(chan CreatedTable, len(tables)) - createOneTable := func(t *utils.Table) error { + createOneTable := func(ctx context.Context, t *utils.Table) error { select { case <-ctx.Done(): return ctx.Err() default: } - rt, err := rc.createTable(dom, t, newTS) + rt, err := rc.createTable(ctx, dom, t, newTS) if err != nil { log.Error("create table failed", zap.Error(err), @@ -408,16 +419,45 @@ func (rc *Client) GoCreateTables( outCh <- rt return nil } + startWork := func(t *utils.Table, done func()) { + defer done() + if err := createOneTable(ctx, t); err != nil { + errCh <- err + return + } + } + if len(sessionPool) > 0 { + workers := utils.NewWorkerPool(uint(len(sessionPool)), "DDL workers") + startWork = func(t *utils.Table, done func()) { + workers.ApplyWithID(func(id uint64) { + defer done() + vctx := context.WithValue(ctx, SessionInContext, sessionPool[id]) + if err := createOneTable(vctx, t); err != nil { + errCh <- err + return + } + }) + } + } + go func() { + wg := new(sync.WaitGroup) defer close(outCh) defer log.Info("all tables created") + defer func() { + if len(sessionPool) > 0 { + for _, se := range sessionPool { + se.Close() + } + } + }() for _, table := range tables { - if err := createOneTable(table); err != nil { - errCh <- err - return - } + tbl := table + wg.Add(1) + startWork(tbl, wg.Done) } + wg.Wait() }() return outCh } diff --git a/pkg/restore/db.go b/pkg/restore/db.go index dc862d53e..28683f442 100644 --- a/pkg/restore/db.go +++ b/pkg/restore/db.go @@ -17,13 +17,20 @@ import ( "github.com/pingcap/br/pkg/utils" ) +// DBContextKey is the key type of the context value this file uses. +type DBContextKey int + +// SessionInContext can get the value of current contextual ID from context. +const SessionInContext DBContextKey = iota + // DB is a TiDB instance, not thread-safe. +// If you want share it between goroutines, +// please put the `SessionInContext` value at each goroutine. type DB struct { - se glue.Session + globalSession glue.Session } -// NewDB returns a new DB. -func NewDB(g glue.Glue, store kv.Storage) (*DB, error) { +func makeSession(g glue.Glue, store kv.Storage) (glue.Session, error) { se, err := g.CreateSession(store) if err != nil { return nil, errors.Trace(err) @@ -37,8 +44,25 @@ func NewDB(g glue.Glue, store kv.Storage) (*DB, error) { if err != nil { return nil, errors.Trace(err) } + return se, nil +} + +func (db *DB) contextualSession(ctx context.Context) glue.Session { + if session, ok := ctx.Value(SessionInContext).(glue.Session); ok { + return session + } + return db.globalSession +} + +// NewDB returns a new DB. +func NewDB(g glue.Glue, store kv.Storage) (*DB, error) { + se, err := makeSession(g, store) + if err != nil { + return nil, err + } + return &DB{ - se: se, + globalSession: se, }, nil } @@ -49,13 +73,13 @@ func (db *DB) ExecDDL(ctx context.Context, ddlJob *model.Job) error { dbInfo := ddlJob.BinlogInfo.DBInfo switch ddlJob.Type { case model.ActionCreateSchema: - err = db.se.CreateDatabase(ctx, dbInfo) + err = db.contextualSession(ctx).CreateDatabase(ctx, dbInfo) if err != nil { log.Error("create database failed", zap.Stringer("db", dbInfo.Name), zap.Error(err)) } return errors.Trace(err) case model.ActionCreateTable: - err = db.se.CreateTable(ctx, model.NewCIStr(ddlJob.SchemaName), tableInfo) + err = db.contextualSession(ctx).CreateTable(ctx, model.NewCIStr(ddlJob.SchemaName), tableInfo) if err != nil { log.Error("create table failed", zap.Stringer("db", dbInfo.Name), @@ -67,7 +91,7 @@ func (db *DB) ExecDDL(ctx context.Context, ddlJob *model.Job) error { if tableInfo != nil { switchDbSQL := fmt.Sprintf("use %s;", utils.EncloseName(ddlJob.SchemaName)) - err = db.se.Execute(ctx, switchDbSQL) + err = db.contextualSession(ctx).Execute(ctx, switchDbSQL) if err != nil { log.Error("switch db failed", zap.String("query", switchDbSQL), @@ -76,7 +100,7 @@ func (db *DB) ExecDDL(ctx context.Context, ddlJob *model.Job) error { return errors.Trace(err) } } - err = db.se.Execute(ctx, ddlJob.Query) + err = db.contextualSession(ctx).Execute(ctx, ddlJob.Query) if err != nil { log.Error("execute ddl query failed", zap.String("query", ddlJob.Query), @@ -89,7 +113,7 @@ func (db *DB) ExecDDL(ctx context.Context, ddlJob *model.Job) error { // CreateDatabase executes a CREATE DATABASE SQL. func (db *DB) CreateDatabase(ctx context.Context, schema *model.DBInfo) error { - err := db.se.CreateDatabase(ctx, schema) + err := db.contextualSession(ctx).CreateDatabase(ctx, schema) if err != nil { log.Error("create database failed", zap.Stringer("db", schema.Name), zap.Error(err)) } @@ -98,7 +122,7 @@ func (db *DB) CreateDatabase(ctx context.Context, schema *model.DBInfo) error { // CreateTable executes a CREATE TABLE SQL. func (db *DB) CreateTable(ctx context.Context, table *utils.Table) error { - err := db.se.CreateTable(ctx, table.Db.Name, table.Info) + err := db.contextualSession(ctx).CreateTable(ctx, table.Db.Name, table.Info) if err != nil { log.Error("create table failed", zap.Stringer("db", table.Db.Name), @@ -128,7 +152,7 @@ func (db *DB) CreateTable(ctx context.Context, table *utils.Table) error { } else { setValSQL = fmt.Sprintf(setValFormat, table.Info.Sequence.MaxValue) } - err = db.se.Execute(ctx, setValSQL) + err = db.contextualSession(ctx).Execute(ctx, setValSQL) if err != nil { log.Error("restore meta sql failed", zap.String("query", setValSQL), @@ -139,7 +163,7 @@ func (db *DB) CreateTable(ctx context.Context, table *utils.Table) error { } // trigger cycle round > 0 - err = db.se.Execute(ctx, nextSeqSQL) + err = db.contextualSession(ctx).Execute(ctx, nextSeqSQL) if err != nil { log.Error("restore meta sql failed", zap.String("query", nextSeqSQL), @@ -165,7 +189,7 @@ func (db *DB) CreateTable(ctx context.Context, table *utils.Table) error { table.Info.AutoIncID) } - err = db.se.Execute(ctx, restoreMetaSQL) + err = db.contextualSession(ctx).Execute(ctx, restoreMetaSQL) if err != nil { log.Error("restore meta sql failed", zap.String("query", restoreMetaSQL), @@ -185,7 +209,7 @@ func (db *DB) CreateTable(ctx context.Context, table *utils.Table) error { utils.EncloseName(table.Info.Name.O), table.Info.AutoRandID) - err = db.se.Execute(ctx, alterAutoRandIDSQL) + err = db.contextualSession(ctx).Execute(ctx, alterAutoRandIDSQL) if err != nil { log.Error("alter AutoRandID failed", zap.String("query", alterAutoRandIDSQL), @@ -201,7 +225,7 @@ func (db *DB) CreateTable(ctx context.Context, table *utils.Table) error { // AlterTiflashReplica alters the replica count of tiflash. func (db *DB) AlterTiflashReplica(ctx context.Context, table *utils.Table, count int) error { switchDbSQL := fmt.Sprintf("use %s;", utils.EncloseName(table.Db.Name.O)) - err := db.se.Execute(ctx, switchDbSQL) + err := db.contextualSession(ctx).Execute(ctx, switchDbSQL) if err != nil { log.Error("switch db failed", zap.String("SQL", switchDbSQL), @@ -214,7 +238,7 @@ func (db *DB) AlterTiflashReplica(ctx context.Context, table *utils.Table, count utils.EncloseName(table.Info.Name.O), count, ) - err = db.se.Execute(ctx, alterTiFlashSQL) + err = db.contextualSession(ctx).Execute(ctx, alterTiFlashSQL) if err != nil { log.Error("alter tiflash replica failed", zap.String("query", alterTiFlashSQL), @@ -233,7 +257,7 @@ func (db *DB) AlterTiflashReplica(ctx context.Context, table *utils.Table, count // Close closes the connection. func (db *DB) Close() { - db.se.Close() + db.globalSession.Close() } // FilterDDLJobs filters ddl jobs. diff --git a/pkg/task/restore.go b/pkg/task/restore.go index 684665135..560ef60ce 100644 --- a/pkg/task/restore.go +++ b/pkg/task/restore.go @@ -188,7 +188,19 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf // We make bigger errCh so we won't block on multi-part failed. errCh := make(chan error, 32) - tableStream := client.GoCreateTables(ctx, mgr.GetDomain(), tables, newTS, errCh) + sessionPool := make([]glue.Session, 0, cfg.Concurrency) + for i := uint32(0); i < cfg.Concurrency; i++ { + session, e := g.CreateSession(mgr.GetTiKV()) + if e != nil { + log.Warn("create session pool failed, we will send DDLs only by created sessions", + zap.Error(e), + zap.Int("sessionCount", len(sessionPool)), + ) + break + } + sessionPool = append(sessionPool, session) + } + tableStream := client.GoCreateTables(ctx, mgr.GetDomain(), tables, newTS, sessionPool, errCh) if len(files) == 0 { log.Info("no files, empty databases and tables are restored") summary.SetSuccessStatus(true) diff --git a/pkg/utils/worker.go b/pkg/utils/worker.go index 635748314..a04f59886 100644 --- a/pkg/utils/worker.go +++ b/pkg/utils/worker.go @@ -20,6 +20,7 @@ type Worker struct { } type taskFunc func() +type identifiedTaskFunc func(uint64) // NewWorkerPool returns a WorkPool. func NewWorkerPool(limit uint, name string) *WorkerPool { @@ -36,6 +37,11 @@ func NewWorkerPool(limit uint, name string) *WorkerPool { // Apply executes a task. func (pool *WorkerPool) Apply(fn taskFunc) { + pool.ApplyWithID(func(_ uint64) { fn() }) +} + +// ApplyWithID execute a task and provides it with the worker ID. +func (pool *WorkerPool) ApplyWithID(fn identifiedTaskFunc) { var worker *Worker select { case worker = <-pool.workers: @@ -44,7 +50,7 @@ func (pool *WorkerPool) Apply(fn taskFunc) { worker = <-pool.workers } go func() { - fn() + fn(worker.ID) pool.recycle(worker) }() } From e7035139d0057795dd915883bf56dcb31c182a00 Mon Sep 17 00:00:00 2001 From: Hillium Date: Tue, 23 Jun 2020 17:15:22 +0800 Subject: [PATCH 2/8] restore: use moded id to index sessionpool --- pkg/restore/client.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/restore/client.go b/pkg/restore/client.go index fc302b94c..a1e4c687e 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -431,7 +431,8 @@ func (rc *Client) GoCreateTables( startWork = func(t *utils.Table, done func()) { workers.ApplyWithID(func(id uint64) { defer done() - vctx := context.WithValue(ctx, SessionInContext, sessionPool[id]) + selectedSession := int(id) % len(sessionPool) + vctx := context.WithValue(ctx, SessionInContext, sessionPool[selectedSession]) if err := createOneTable(vctx, t); err != nil { errCh <- err return From cecdece513eac994b72b2c3a11c5afa3867cf880 Mon Sep 17 00:00:00 2001 From: yujuncen Date: Tue, 23 Jun 2020 18:21:02 +0800 Subject: [PATCH 3/8] store: return nil DB when no TiDB session provided --- pkg/restore/db.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/restore/db.go b/pkg/restore/db.go index 28683f442..50fdbfa7e 100644 --- a/pkg/restore/db.go +++ b/pkg/restore/db.go @@ -27,7 +27,7 @@ const SessionInContext DBContextKey = iota // If you want share it between goroutines, // please put the `SessionInContext` value at each goroutine. type DB struct { - globalSession glue.Session + se glue.Session } func makeSession(g glue.Glue, store kv.Storage) (glue.Session, error) { @@ -51,18 +51,18 @@ func (db *DB) contextualSession(ctx context.Context) glue.Session { if session, ok := ctx.Value(SessionInContext).(glue.Session); ok { return session } - return db.globalSession + return db.se } // NewDB returns a new DB. func NewDB(g glue.Glue, store kv.Storage) (*DB, error) { se, err := makeSession(g, store) - if err != nil { + if err != nil || se == nil { return nil, err } return &DB{ - globalSession: se, + se: se, }, nil } @@ -257,7 +257,7 @@ func (db *DB) AlterTiflashReplica(ctx context.Context, table *utils.Table, count // Close closes the connection. func (db *DB) Close() { - db.globalSession.Close() + db.se.Close() } // FilterDDLJobs filters ddl jobs. From 9c5ec46fd0d045b82e95d50f122ecc5ae4d0ec9f Mon Sep 17 00:00:00 2001 From: yujuncen Date: Sun, 28 Jun 2020 10:39:53 +0800 Subject: [PATCH 4/8] *: use isolated DB instead of Session --- pkg/restore/client.go | 18 ++++++---- pkg/restore/db.go | 78 ++++++++++--------------------------------- pkg/restore/util.go | 13 ++++++++ pkg/task/restore.go | 18 ++++++++-- 4 files changed, 58 insertions(+), 69 deletions(-) diff --git a/pkg/restore/client.go b/pkg/restore/client.go index a1e4c687e..0dd133421 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -357,17 +357,22 @@ func (rc *Client) CreateTables( func (rc *Client) createTable( ctx context.Context, + db *DB, dom *domain.Domain, table *utils.Table, newTS uint64, ) (CreatedTable, error) { + if db == nil { + db = rc.db + } + if rc.IsSkipCreateSQL() { log.Info("skip create table and alter autoIncID", zap.Stringer("table", table.Info.Name)) } else { // don't use rc.ctx here... // remove the ctx field of Client would be a great work, // we just take a small step here :< - err := rc.db.CreateTable(ctx, table) + err := db.CreateTable(ctx, table) if err != nil { return CreatedTable{}, err } @@ -393,18 +398,18 @@ func (rc *Client) GoCreateTables( dom *domain.Domain, tables []*utils.Table, newTS uint64, - sessionPool []glue.Session, + sessionPool []*DB, errCh chan<- error, ) <-chan CreatedTable { // Could we have a smaller size of tables? outCh := make(chan CreatedTable, len(tables)) - createOneTable := func(ctx context.Context, t *utils.Table) error { + createOneTable := func(db *DB, t *utils.Table) error { select { case <-ctx.Done(): return ctx.Err() default: } - rt, err := rc.createTable(ctx, dom, t, newTS) + rt, err := rc.createTable(ctx, db, dom, t, newTS) if err != nil { log.Error("create table failed", zap.Error(err), @@ -421,7 +426,7 @@ func (rc *Client) GoCreateTables( } startWork := func(t *utils.Table, done func()) { defer done() - if err := createOneTable(ctx, t); err != nil { + if err := createOneTable(nil, t); err != nil { errCh <- err return } @@ -432,8 +437,7 @@ func (rc *Client) GoCreateTables( workers.ApplyWithID(func(id uint64) { defer done() selectedSession := int(id) % len(sessionPool) - vctx := context.WithValue(ctx, SessionInContext, sessionPool[selectedSession]) - if err := createOneTable(vctx, t); err != nil { + if err := createOneTable(sessionPool[selectedSession], t); err != nil { errCh <- err return } diff --git a/pkg/restore/db.go b/pkg/restore/db.go index 50fdbfa7e..70a250a15 100644 --- a/pkg/restore/db.go +++ b/pkg/restore/db.go @@ -17,20 +17,13 @@ import ( "github.com/pingcap/br/pkg/utils" ) -// DBContextKey is the key type of the context value this file uses. -type DBContextKey int - -// SessionInContext can get the value of current contextual ID from context. -const SessionInContext DBContextKey = iota - // DB is a TiDB instance, not thread-safe. -// If you want share it between goroutines, -// please put the `SessionInContext` value at each goroutine. type DB struct { se glue.Session } -func makeSession(g glue.Glue, store kv.Storage) (glue.Session, error) { +// NewDB returns a new DB. +func NewDB(g glue.Glue, store kv.Storage) (*DB, error) { se, err := g.CreateSession(store) if err != nil { return nil, errors.Trace(err) @@ -44,23 +37,6 @@ func makeSession(g glue.Glue, store kv.Storage) (glue.Session, error) { if err != nil { return nil, errors.Trace(err) } - return se, nil -} - -func (db *DB) contextualSession(ctx context.Context) glue.Session { - if session, ok := ctx.Value(SessionInContext).(glue.Session); ok { - return session - } - return db.se -} - -// NewDB returns a new DB. -func NewDB(g glue.Glue, store kv.Storage) (*DB, error) { - se, err := makeSession(g, store) - if err != nil || se == nil { - return nil, err - } - return &DB{ se: se, }, nil @@ -73,13 +49,13 @@ func (db *DB) ExecDDL(ctx context.Context, ddlJob *model.Job) error { dbInfo := ddlJob.BinlogInfo.DBInfo switch ddlJob.Type { case model.ActionCreateSchema: - err = db.contextualSession(ctx).CreateDatabase(ctx, dbInfo) + err = db.se.CreateDatabase(ctx, dbInfo) if err != nil { log.Error("create database failed", zap.Stringer("db", dbInfo.Name), zap.Error(err)) } return errors.Trace(err) case model.ActionCreateTable: - err = db.contextualSession(ctx).CreateTable(ctx, model.NewCIStr(ddlJob.SchemaName), tableInfo) + err = db.se.CreateTable(ctx, model.NewCIStr(ddlJob.SchemaName), tableInfo) if err != nil { log.Error("create table failed", zap.Stringer("db", dbInfo.Name), @@ -91,7 +67,7 @@ func (db *DB) ExecDDL(ctx context.Context, ddlJob *model.Job) error { if tableInfo != nil { switchDbSQL := fmt.Sprintf("use %s;", utils.EncloseName(ddlJob.SchemaName)) - err = db.contextualSession(ctx).Execute(ctx, switchDbSQL) + err = db.se.Execute(ctx, switchDbSQL) if err != nil { log.Error("switch db failed", zap.String("query", switchDbSQL), @@ -100,7 +76,7 @@ func (db *DB) ExecDDL(ctx context.Context, ddlJob *model.Job) error { return errors.Trace(err) } } - err = db.contextualSession(ctx).Execute(ctx, ddlJob.Query) + err = db.se.Execute(ctx, ddlJob.Query) if err != nil { log.Error("execute ddl query failed", zap.String("query", ddlJob.Query), @@ -113,7 +89,7 @@ func (db *DB) ExecDDL(ctx context.Context, ddlJob *model.Job) error { // CreateDatabase executes a CREATE DATABASE SQL. func (db *DB) CreateDatabase(ctx context.Context, schema *model.DBInfo) error { - err := db.contextualSession(ctx).CreateDatabase(ctx, schema) + err := db.se.CreateDatabase(ctx, schema) if err != nil { log.Error("create database failed", zap.Stringer("db", schema.Name), zap.Error(err)) } @@ -122,7 +98,7 @@ func (db *DB) CreateDatabase(ctx context.Context, schema *model.DBInfo) error { // CreateTable executes a CREATE TABLE SQL. func (db *DB) CreateTable(ctx context.Context, table *utils.Table) error { - err := db.contextualSession(ctx).CreateTable(ctx, table.Db.Name, table.Info) + err := db.se.CreateTable(ctx, table.Db.Name, table.Info) if err != nil { log.Error("create table failed", zap.Stringer("db", table.Db.Name), @@ -146,34 +122,17 @@ func (db *DB) CreateTable(ctx context.Context, table *utils.Table) error { nextSeqSQL := fmt.Sprintf("do nextval(%s.%s);", utils.EncloseName(table.Db.Name.O), utils.EncloseName(table.Info.Name.O)) - var setValSQL string if increment < 0 { - setValSQL = fmt.Sprintf(setValFormat, table.Info.Sequence.MinValue) + restoreMetaSQL += fmt.Sprintf(setValFormat, table.Info.Sequence.MinValue) } else { - setValSQL = fmt.Sprintf(setValFormat, table.Info.Sequence.MaxValue) + restoreMetaSQL += fmt.Sprintf(setValFormat, table.Info.Sequence.MaxValue) } - err = db.contextualSession(ctx).Execute(ctx, setValSQL) - if err != nil { - log.Error("restore meta sql failed", - zap.String("query", setValSQL), - zap.Stringer("db", table.Db.Name), - zap.Stringer("table", table.Info.Name), - zap.Error(err)) - return errors.Trace(err) - } - // trigger cycle round > 0 - err = db.contextualSession(ctx).Execute(ctx, nextSeqSQL) - if err != nil { - log.Error("restore meta sql failed", - zap.String("query", nextSeqSQL), - zap.Stringer("db", table.Db.Name), - zap.Stringer("table", table.Info.Name), - zap.Error(err)) - return errors.Trace(err) - } + restoreMetaSQL += nextSeqSQL + restoreMetaSQL += fmt.Sprintf(setValFormat, table.Info.AutoIncID) + } else { + restoreMetaSQL = fmt.Sprintf(setValFormat, table.Info.AutoIncID) } - restoreMetaSQL = fmt.Sprintf(setValFormat, table.Info.AutoIncID) } else { var alterAutoIncIDFormat string switch { @@ -189,14 +148,13 @@ func (db *DB) CreateTable(ctx context.Context, table *utils.Table) error { table.Info.AutoIncID) } - err = db.contextualSession(ctx).Execute(ctx, restoreMetaSQL) + err = db.se.Execute(ctx, restoreMetaSQL) if err != nil { log.Error("restore meta sql failed", zap.String("query", restoreMetaSQL), zap.Stringer("db", table.Db.Name), zap.Stringer("table", table.Info.Name), zap.Error(err)) - return errors.Trace(err) } if table.Info.PKIsHandle && table.Info.ContainsAutoRandomBits() { // this table has auto random id, we need rebase it @@ -209,7 +167,7 @@ func (db *DB) CreateTable(ctx context.Context, table *utils.Table) error { utils.EncloseName(table.Info.Name.O), table.Info.AutoRandID) - err = db.contextualSession(ctx).Execute(ctx, alterAutoRandIDSQL) + err = db.se.Execute(ctx, alterAutoRandIDSQL) if err != nil { log.Error("alter AutoRandID failed", zap.String("query", alterAutoRandIDSQL), @@ -225,7 +183,7 @@ func (db *DB) CreateTable(ctx context.Context, table *utils.Table) error { // AlterTiflashReplica alters the replica count of tiflash. func (db *DB) AlterTiflashReplica(ctx context.Context, table *utils.Table, count int) error { switchDbSQL := fmt.Sprintf("use %s;", utils.EncloseName(table.Db.Name.O)) - err := db.contextualSession(ctx).Execute(ctx, switchDbSQL) + err := db.se.Execute(ctx, switchDbSQL) if err != nil { log.Error("switch db failed", zap.String("SQL", switchDbSQL), @@ -238,7 +196,7 @@ func (db *DB) AlterTiflashReplica(ctx context.Context, table *utils.Table, count utils.EncloseName(table.Info.Name.O), count, ) - err = db.contextualSession(ctx).Execute(ctx, alterTiFlashSQL) + err = db.se.Execute(ctx, alterTiFlashSQL) if err != nil { log.Error("alter tiflash replica failed", zap.String("query", alterTiFlashSQL), diff --git a/pkg/restore/util.go b/pkg/restore/util.go index 382750f98..a728a1036 100644 --- a/pkg/restore/util.go +++ b/pkg/restore/util.go @@ -142,6 +142,19 @@ func GetSSTMetaFromFile( } } +// MakeSessionPool makes a session pool with specficated size by sessionFactory. +func MakeSessionPool(size uint, sessionFactory func() (*DB, error)) ([]*DB, error) { + sessionPool := make([]*DB, 0, size) + for i := uint(0); i < size; i++ { + session, e := sessionFactory() + if e != nil { + return nil, e + } + sessionPool = append(sessionPool, session) + } + return sessionPool, nil +} + // EstimateRangeSize estimates the total range count by file. func EstimateRangeSize(files []*backup.File) int { result := 0 diff --git a/pkg/task/restore.go b/pkg/task/restore.go index 560ef60ce..2f405b743 100644 --- a/pkg/task/restore.go +++ b/pkg/task/restore.go @@ -30,6 +30,7 @@ const ( defaultRestoreConcurrency = 128 maxRestoreBatchSizeLimit = 256 + defaultDDLConcurrency = 16 ) var ( @@ -188,9 +189,22 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf // We make bigger errCh so we won't block on multi-part failed. errCh := make(chan error, 32) - sessionPool := make([]glue.Session, 0, cfg.Concurrency) + // Maybe allow user modify the DDL concurrency isn't necessary, + // because executing DDL is really I/O bound (or, algorithm bound?), + // and we cost most of time at waiting DDL jobs be enqueued. + // So these jobs won't be faster or slower when machine become faster or slower, + // hence make it a fixed value would be fine. + sessionPool, err := restore.MakeSessionPool(defaultDDLConcurrency, func() (*restore.DB, error) { + return restore.NewDB(g, mgr.GetTiKV()) + }) + if err != nil { + log.Warn("create session pool failed, we will send DDLs only by created sessions", + zap.Error(err), + zap.Int("sessionCount", len(sessionPool)), + ) + } for i := uint32(0); i < cfg.Concurrency; i++ { - session, e := g.CreateSession(mgr.GetTiKV()) + session, e := restore.NewDB(g, mgr.GetTiKV()) if e != nil { log.Warn("create session pool failed, we will send DDLs only by created sessions", zap.Error(e), From 95324967049b38201c19000900820acce3c1d8a0 Mon Sep 17 00:00:00 2001 From: yujuncen Date: Sun, 28 Jun 2020 10:47:58 +0800 Subject: [PATCH 5/8] *: fix some mis-pushed files :| --- pkg/restore/db.go | 30 ++++++++++++++++++++++++------ pkg/task/restore.go | 11 ----------- 2 files changed, 24 insertions(+), 17 deletions(-) diff --git a/pkg/restore/db.go b/pkg/restore/db.go index 70a250a15..dc862d53e 100644 --- a/pkg/restore/db.go +++ b/pkg/restore/db.go @@ -122,17 +122,34 @@ func (db *DB) CreateTable(ctx context.Context, table *utils.Table) error { nextSeqSQL := fmt.Sprintf("do nextval(%s.%s);", utils.EncloseName(table.Db.Name.O), utils.EncloseName(table.Info.Name.O)) + var setValSQL string if increment < 0 { - restoreMetaSQL += fmt.Sprintf(setValFormat, table.Info.Sequence.MinValue) + setValSQL = fmt.Sprintf(setValFormat, table.Info.Sequence.MinValue) } else { - restoreMetaSQL += fmt.Sprintf(setValFormat, table.Info.Sequence.MaxValue) + setValSQL = fmt.Sprintf(setValFormat, table.Info.Sequence.MaxValue) } + err = db.se.Execute(ctx, setValSQL) + if err != nil { + log.Error("restore meta sql failed", + zap.String("query", setValSQL), + zap.Stringer("db", table.Db.Name), + zap.Stringer("table", table.Info.Name), + zap.Error(err)) + return errors.Trace(err) + } + // trigger cycle round > 0 - restoreMetaSQL += nextSeqSQL - restoreMetaSQL += fmt.Sprintf(setValFormat, table.Info.AutoIncID) - } else { - restoreMetaSQL = fmt.Sprintf(setValFormat, table.Info.AutoIncID) + err = db.se.Execute(ctx, nextSeqSQL) + if err != nil { + log.Error("restore meta sql failed", + zap.String("query", nextSeqSQL), + zap.Stringer("db", table.Db.Name), + zap.Stringer("table", table.Info.Name), + zap.Error(err)) + return errors.Trace(err) + } } + restoreMetaSQL = fmt.Sprintf(setValFormat, table.Info.AutoIncID) } else { var alterAutoIncIDFormat string switch { @@ -155,6 +172,7 @@ func (db *DB) CreateTable(ctx context.Context, table *utils.Table) error { zap.Stringer("db", table.Db.Name), zap.Stringer("table", table.Info.Name), zap.Error(err)) + return errors.Trace(err) } if table.Info.PKIsHandle && table.Info.ContainsAutoRandomBits() { // this table has auto random id, we need rebase it diff --git a/pkg/task/restore.go b/pkg/task/restore.go index 2f405b743..732fe9ea1 100644 --- a/pkg/task/restore.go +++ b/pkg/task/restore.go @@ -203,17 +203,6 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf zap.Int("sessionCount", len(sessionPool)), ) } - for i := uint32(0); i < cfg.Concurrency; i++ { - session, e := restore.NewDB(g, mgr.GetTiKV()) - if e != nil { - log.Warn("create session pool failed, we will send DDLs only by created sessions", - zap.Error(e), - zap.Int("sessionCount", len(sessionPool)), - ) - break - } - sessionPool = append(sessionPool, session) - } tableStream := client.GoCreateTables(ctx, mgr.GetDomain(), tables, newTS, sessionPool, errCh) if len(files) == 0 { log.Info("no files, empty databases and tables are restored") From 4cdb7587aef752803932bfe981a587501f78369c Mon Sep 17 00:00:00 2001 From: yujuncen Date: Sun, 28 Jun 2020 13:35:15 +0800 Subject: [PATCH 6/8] *: rename session pool to DB pool --- pkg/restore/client.go | 16 ++++++++-------- pkg/restore/util.go | 4 ++-- pkg/task/restore.go | 6 +++--- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/pkg/restore/client.go b/pkg/restore/client.go index 0dd133421..3713f77f1 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -398,7 +398,7 @@ func (rc *Client) GoCreateTables( dom *domain.Domain, tables []*utils.Table, newTS uint64, - sessionPool []*DB, + dbPool []*DB, errCh chan<- error, ) <-chan CreatedTable { // Could we have a smaller size of tables? @@ -431,13 +431,13 @@ func (rc *Client) GoCreateTables( return } } - if len(sessionPool) > 0 { - workers := utils.NewWorkerPool(uint(len(sessionPool)), "DDL workers") + if len(dbPool) > 0 { + workers := utils.NewWorkerPool(uint(len(dbPool)), "DDL workers") startWork = func(t *utils.Table, done func()) { workers.ApplyWithID(func(id uint64) { defer done() - selectedSession := int(id) % len(sessionPool) - if err := createOneTable(sessionPool[selectedSession], t); err != nil { + selectedDB := int(id) % len(dbPool) + if err := createOneTable(dbPool[selectedDB], t); err != nil { errCh <- err return } @@ -450,9 +450,9 @@ func (rc *Client) GoCreateTables( defer close(outCh) defer log.Info("all tables created") defer func() { - if len(sessionPool) > 0 { - for _, se := range sessionPool { - se.Close() + if len(dbPool) > 0 { + for _, db := range dbPool { + db.Close() } } }() diff --git a/pkg/restore/util.go b/pkg/restore/util.go index a728a1036..9865fde56 100644 --- a/pkg/restore/util.go +++ b/pkg/restore/util.go @@ -142,8 +142,8 @@ func GetSSTMetaFromFile( } } -// MakeSessionPool makes a session pool with specficated size by sessionFactory. -func MakeSessionPool(size uint, sessionFactory func() (*DB, error)) ([]*DB, error) { +// MakeDBPool makes a session pool with specficated size by sessionFactory. +func MakeDBPool(size uint, sessionFactory func() (*DB, error)) ([]*DB, error) { sessionPool := make([]*DB, 0, size) for i := uint(0); i < size; i++ { session, e := sessionFactory() diff --git a/pkg/task/restore.go b/pkg/task/restore.go index 732fe9ea1..e87ed3430 100644 --- a/pkg/task/restore.go +++ b/pkg/task/restore.go @@ -194,16 +194,16 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf // and we cost most of time at waiting DDL jobs be enqueued. // So these jobs won't be faster or slower when machine become faster or slower, // hence make it a fixed value would be fine. - sessionPool, err := restore.MakeSessionPool(defaultDDLConcurrency, func() (*restore.DB, error) { + dbPool, err := restore.MakeDBPool(defaultDDLConcurrency, func() (*restore.DB, error) { return restore.NewDB(g, mgr.GetTiKV()) }) if err != nil { log.Warn("create session pool failed, we will send DDLs only by created sessions", zap.Error(err), - zap.Int("sessionCount", len(sessionPool)), + zap.Int("sessionCount", len(dbPool)), ) } - tableStream := client.GoCreateTables(ctx, mgr.GetDomain(), tables, newTS, sessionPool, errCh) + tableStream := client.GoCreateTables(ctx, mgr.GetDomain(), tables, newTS, dbPool, errCh) if len(files) == 0 { log.Info("no files, empty databases and tables are restored") summary.SetSuccessStatus(true) From 3e910abc9575d7c89aeac08a15b766e6da4b03aa Mon Sep 17 00:00:00 2001 From: yujuncen Date: Tue, 30 Jun 2020 11:01:56 +0800 Subject: [PATCH 7/8] restore: rename some sessions to dbs --- pkg/restore/util.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/restore/util.go b/pkg/restore/util.go index 9865fde56..a7d8c5e33 100644 --- a/pkg/restore/util.go +++ b/pkg/restore/util.go @@ -143,16 +143,16 @@ func GetSSTMetaFromFile( } // MakeDBPool makes a session pool with specficated size by sessionFactory. -func MakeDBPool(size uint, sessionFactory func() (*DB, error)) ([]*DB, error) { - sessionPool := make([]*DB, 0, size) +func MakeDBPool(size uint, dbFactory func() (*DB, error)) ([]*DB, error) { + dbPool := make([]*DB, 0, size) for i := uint(0); i < size; i++ { - session, e := sessionFactory() + db, e := dbFactory() if e != nil { - return nil, e + return dbPool, e } - sessionPool = append(sessionPool, session) + dbPool = append(dbPool, db) } - return sessionPool, nil + return dbPool, nil } // EstimateRangeSize estimates the total range count by file. From 539ae21150a0c19de5653cb43cad64e27cc98cfe Mon Sep 17 00:00:00 2001 From: yujuncen Date: Tue, 30 Jun 2020 21:52:18 +0800 Subject: [PATCH 8/8] restore: add some todos --- pkg/restore/client.go | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/pkg/restore/client.go b/pkg/restore/client.go index 522da8508..d5e5d073d 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -58,9 +58,18 @@ type Client struct { workerPool *utils.WorkerPool tlsConf *tls.Config - databases map[string]*utils.Database - ddlJobs []*model.Job - backupMeta *backup.BackupMeta + databases map[string]*utils.Database + ddlJobs []*model.Job + backupMeta *backup.BackupMeta + // TODO Remove this field or replace it with a []*DB, + // since https://github.com/pingcap/br/pull/377 needs more DBs to speed up DDL execution. + // And for now, we must inject a pool of DBs to `Client.GoCreateTables`, otherwise there would be a race condition. + // Which is dirty: why we need DBs from different sources? + // By replace it with a []*DB, we can remove the dirty parameter of `Client.GoCreateTable`, + // along with them in some private functions. + // Before you do it, you can firstly read discussions at + // https://github.com/pingcap/br/pull/377#discussion_r446594501, + // this probably isn't as easy and it seems like (however, not hard, too :D) db *DB rateLimit uint64 isOnline bool @@ -446,6 +455,7 @@ func (rc *Client) GoCreateTables( } go func() { + // TODO replace it with an errgroup wg := new(sync.WaitGroup) defer close(outCh) defer log.Info("all tables created")