Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

restore: Try to create tables in parallel #502

Merged
merged 57 commits into from
Jan 6, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
d7ce306
restore: try to create tables in parallel
hidehalo Dec 1, 2020
886908d
glue: fix error condition test for db close
hidehalo Dec 3, 2020
fd8a5de
restore, glue: remove duplicate db pool implementation
hidehalo Dec 3, 2020
5f91c56
restore: try to prevent db connection too early
hidehalo Dec 3, 2020
18eb88c
restore: try to prevent db connection too early
hidehalo Dec 3, 2020
c5b48a4
restore: try to make restore schema run parallel totally
hidehalo Dec 8, 2020
75a5baa
Merge branch 'test-434' into issue-434
hidehalo Dec 8, 2020
92699bd
restore: remove impl&test of tidb#InitSchema
hidehalo Dec 8, 2020
c7f0420
restore: make restore schema run parallelly
hidehalo Dec 8, 2020
8a4118c
restore: remove db connection control
hidehalo Dec 9, 2020
5d07779
restore: a little change of restore schema schedule
hidehalo Dec 9, 2020
d751937
Merge branch 'master' into issue-434
lance6716 Dec 11, 2020
af5a19b
wip
hidehalo Dec 11, 2020
4331dca
restore: keep restore schema job hold the same session(DB connection)
hidehalo Dec 13, 2020
a4c41e1
Merge branch 'master' into issue-434
hidehalo Dec 13, 2020
4491add
restore: fix log message error
hidehalo Dec 14, 2020
8567813
Merge branch 'master' into issue-434
glorv Dec 14, 2020
8bce7fc
restore: remove purpose array for `restoreSchemaWorker`
hidehalo Dec 15, 2020
494f67c
restore: remove useless sql mode set code
hidehalo Dec 15, 2020
25aea0d
restore: restore view statements run after database|table created
hidehalo Dec 15, 2020
74f9025
Merge remote-tracking branch 'pingcap/master' into issue-434
hidehalo Dec 15, 2020
fee4594
Merge branch 'issue-434' of https://github.com/hidehalo/tidb-lightnin…
hidehalo Dec 15, 2020
0e315ca
restore: interrupt job producing when error happens
hidehalo Dec 15, 2020
3e2cc16
util: add SQLDriver interface
hidehalo Dec 16, 2020
52168fc
restore: make sure single restore schema job vs. single db session
hidehalo Dec 16, 2020
1dd9152
restore: run restore view schema statements in txn
hidehalo Dec 16, 2020
8d01745
glue: add checkpoints.Session implementation(sqlConnSession)
hidehalo Dec 16, 2020
09d06d4
restore: close whole database connections after restore schema done
hidehalo Dec 17, 2020
90e9305
restore: revert remove of `InitSchema`
hidehalo Dec 15, 2020
bc32cf8
glue: return a new error when sqlConnSesson.CommitTxn called
hidehalo Dec 18, 2020
a9c00ca
Revert "util: add SQLDriver interface"
hidehalo Dec 21, 2020
dafcf31
glue: update GetSession(context.Context) for Glue interface
hidehalo Dec 21, 2020
2539b7d
glue: disable more methods of sqlConnSession
hidehalo Dec 21, 2020
3ee2e97
Merge remote-tracking branch 'pingcap/master' into issue-434
hidehalo Dec 21, 2020
1542e52
restore: disable implicit initiation of `sync.WaitGroup`
hidehalo Dec 21, 2020
9a27c33
restore: cancel nil error throw when restore schema done
hidehalo Dec 21, 2020
cdc2ae9
Merge remote-tracking branch 'pingcap/master' into issue-434
hidehalo Dec 21, 2020
983d913
Merge branch 'master' into issue-434
lance6716 Dec 21, 2020
b35b26e
restore: replace session map to pool
hidehalo Dec 22, 2020
c0e6692
Merge branch 'issue-434' of https://github.com/hidehalo/tidb-lightnin…
hidehalo Dec 22, 2020
b376656
restore: keep restore table statements ordered
hidehalo Dec 22, 2020
e2cf8e7
restore: assign single session to `restoreSchemaWorker#doJob`'s gorou…
hidehalo Dec 29, 2020
48195f8
Merge remote-tracking branch 'pingcap/master' into issue-434
hidehalo Dec 29, 2020
9fea8d0
restore: add log for `restoreSchema`
hidehalo Dec 29, 2020
5bd96c8
restore: add quit case when error thrown blocked
hidehalo Dec 29, 2020
336fe52
restore: Improve the robustness of concurrency pattern
hidehalo Dec 29, 2020
8eed047
restore: fix channel send/recv logic to avoid blocked forever occurs.
hidehalo Dec 31, 2020
1690e4a
Merge remote-tracking branch 'pingcap/master' into issue-434
hidehalo Dec 31, 2020
42f46b7
restore: `sync.WaitGroup#Add` first when `restoreSchemaWorker#appendJob`
hidehalo Dec 31, 2020
911440e
restore: add impl of `schemaStmtType#String`
hidehalo Dec 31, 2020
e914004
restore: avoid to wait whole jobs done forever when goroutine of `doJ…
hidehalo Dec 31, 2020
d123a85
restore: call cancel function when `makeJobs` exit
hidehalo Jan 1, 2021
e2c2022
restore: a few improvement
hidehalo Jan 4, 2021
1fcc9df
test: add unit tests of `RestoreController#restoreSchema()`
hidehalo Jan 4, 2021
ef85de8
Merge branch 'issue-434' of https://github.com/hidehalo/tidb-lightnin…
hidehalo Jan 5, 2021
08acbc1
Merge remote-tracking branch 'pingcap/master' into issue-434
hidehalo Jan 5, 2021
e02a4a7
Merge branch 'master' into issue-434
glorv Jan 6, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 122 additions & 0 deletions lightning/glue/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"database/sql"
"errors"
"sync"

"github.com/pingcap/parser"
"github.com/pingcap/parser/model"
Expand Down Expand Up @@ -47,6 +48,127 @@ type SQLExecutor interface {
Close()
}

type dbConnector struct {
dbConf config.DBStore
lazyConn func(dbConf config.DBStore) (*sql.DB, error)
conn *sql.DB
err error
}

func (connector *dbConnector) connect() error {
if connector.conn == nil {
connector.conn, connector.err = connector.lazyConn(connector.dbConf)
}
return connector.err
}

func (connector *dbConnector) close() error {
if connector.conn == nil {
return connector.conn.Close()
}
return nil
}

func (connector *dbConnector) getError() error {
return connector.err
}

func (connector *dbConnector) getDB() *sql.DB {
return connector.conn
}

// DBPool is multi-connection implementation of `SQLExecutor` interface
// It will pick one of any idle connections to handle DB events
// and put it back when events done
type DBPool struct {
SQLExecutor
singleton *dbConnector
pool sync.Pool
}

func (dbPool *DBPool) getConn() *dbConnector {
conn := dbPool.pool.Get()
if conn != nil {
return conn.(*dbConnector)
}
return dbPool.singleton
}

func (dbPool *DBPool) putConn(conn *dbConnector) *DBPool {
dbPool.pool.Put(conn)
return dbPool
}

// NewDBPool is constructor of DBPool
func NewDBPool(size int, dbConf config.DBStore, lazyConn func(dbConf config.DBStore) (*sql.DB, error)) (*DBPool, error) {
// TODO: `size` validation
// NOTE: make sure we have one db connection at least
singleton, err := lazyConn(dbConf)
if err != nil {
return nil, err
}
dbPool := &DBPool{}
dbPool.singleton = &dbConnector{
dbConf: dbConf,
lazyConn: lazyConn,
conn: singleton,
err: nil,
}
dbPool.putConn(dbPool.singleton)
for i := 0; i < size-1; i++ {
dbPool.putConn(&dbConnector{
dbConf: dbConf,
lazyConn: lazyConn,
conn: nil,
err: nil,
})
}
return dbPool, nil
}

// ExecuteWithLog implement `SQLExecutor` interface
func (dbPool *DBPool) ExecuteWithLog(ctx context.Context, query string, purpose string, logger log.Logger) error {
conn := dbPool.getConn()
defer func() {
dbPool.putConn(conn)
}()
if err := conn.connect(); err != nil {
return err
}
sql := common.SQLWithRetry{
DB: conn.getDB(),
Logger: logger,
}
return sql.Exec(ctx, purpose, query)
}

// ObtainStringWithLog implement `SQLExecutor` interface
func (dbPool *DBPool) ObtainStringWithLog(ctx context.Context, query string, purpose string, logger log.Logger) (string, error) {
conn := dbPool.getConn()
defer func() {
dbPool.putConn(conn)
}()
var s string
var err error
if err = conn.connect(); err == nil {
err = common.SQLWithRetry{
DB: conn.getDB(),
Logger: logger,
}.QueryRow(ctx, purpose, query, &s)
}
return s, err
}

// Close implement `SQLExecutor` interface
func (dbPool *DBPool) Close() {
for conn := dbPool.getConn(); conn != nil; conn = dbPool.getConn() {
if conn.getError() == nil {
// CONFUSED: why we ignore error occurs via db connection close issue?
_ = conn.close()
}
}
}

type ExternalTiDBGlue struct {
db *sql.DB
parser *parser.Parser
Expand Down
28 changes: 25 additions & 3 deletions lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package restore

import (
"context"
"database/sql"
"fmt"
"io"
"math"
Expand Down Expand Up @@ -306,15 +307,36 @@ func (rc *RestoreController) restoreSchema(ctx context.Context) error {
defer db.Close()
db.ExecContext(ctx, "SET SQL_MODE = ?", rc.cfg.TiDB.StrSQLMode)
hidehalo marked this conversation as resolved.
Show resolved Hide resolved
}

// CONFUSED: how we control concurrency via configuration?
// OR do not implement glue.SQLExecutor inside ExternalTiDBGlue
// Maybe we can aggregate an instance of glue.SQLExecutor?
concurrency := 16
var sqlExec glue.SQLExecutor
var err error
if concurrency > 1 {
dbFactory := func(conf config.DBStore) (*sql.DB, error) {
db, err := DBFromConfig(conf)
if err == nil {
db.ExecContext(ctx, "SET SQL_MODE = ?", conf.StrSQLMode)
}
return db, err
}
sqlExec, err = glue.NewDBPool(concurrency, rc.cfg.TiDB, dbFactory)
defer sqlExec.Close()
if err != nil {
return errors.Trace(err)
}
} else {
sqlExec = rc.tidbGlue.GetSQLExecutor()
}
for _, dbMeta := range rc.dbMetas {
task := log.With(zap.String("db", dbMeta.Name)).Begin(zap.InfoLevel, "restore table schema")

tablesSchema := make(map[string]string)
for _, tblMeta := range dbMeta.Tables {
tablesSchema[tblMeta.Name] = tblMeta.GetSchema(ctx, rc.store)
}
err := InitSchema(ctx, rc.tidbGlue, dbMeta.Name, tablesSchema)
err = InitSchema(ctx, concurrency, rc.tidbGlue.GetParser(), sqlExec, dbMeta.Name, tablesSchema)
hidehalo marked this conversation as resolved.
Show resolved Hide resolved

task.End(zap.ErrorLevel, err)
if err != nil {
Expand All @@ -330,7 +352,7 @@ func (rc *RestoreController) restoreSchema(ctx context.Context) error {
for _, viewMeta := range dbMeta.Views {
viewsSchema[viewMeta.Name] = viewMeta.GetSchema(ctx, rc.store)
}
err := InitSchema(ctx, rc.tidbGlue, dbMeta.Name, viewsSchema)
err := InitSchema(ctx, concurrency, rc.tidbGlue.GetParser(), sqlExec, dbMeta.Name, viewsSchema)

task.End(zap.ErrorLevel, err)
if err != nil {
Expand Down
73 changes: 53 additions & 20 deletions lightning/restore/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"strconv"
"strings"
"sync"

tmysql "github.com/go-sql-driver/mysql"
"github.com/pingcap/errors"
Expand All @@ -38,6 +39,7 @@ import (
"github.com/pingcap/tidb-lightning/lightning/mydump"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

type TiDBManager struct {
Expand Down Expand Up @@ -124,43 +126,74 @@ func (timgr *TiDBManager) Close() {
timgr.db.Close()
}

func InitSchema(ctx context.Context, g glue.Glue, database string, tablesSchema map[string]string) error {
logger := log.With(zap.String("db", database))
sqlExecutor := g.GetSQLExecutor()
type schemaJob struct {
sql string
log zapcore.Field
}

func InitSchema(ctx context.Context, concurrency int, parser *parser.Parser, exec glue.SQLExecutor, database string, tablesSchema map[string]string) error {
logger := log.With(zap.String("db", database))
var createDatabase strings.Builder
createDatabase.WriteString("CREATE DATABASE IF NOT EXISTS ")
common.WriteMySQLIdentifier(&createDatabase, database)
err := sqlExecutor.ExecuteWithLog(ctx, createDatabase.String(), "create database", logger)
err := exec.ExecuteWithLog(ctx, createDatabase.String(), "create database", logger)
if err != nil {
return errors.Trace(err)
}

task := logger.Begin(zap.InfoLevel, "create tables")
var wg sync.WaitGroup
fnCreateTable := func(ctx context.Context, exec glue.SQLExecutor, jobCh chan *schemaJob, logger log.Logger, errCh chan error) {
for {
select {
case <-ctx.Done():
return
case job := <-jobCh:
//TODO: maybe we should put these createStems into a transaction
err = exec.ExecuteWithLog(ctx, job.sql, "create table", logger.With(job.log))
wg.Done()
if err != nil {
errCh <- err
}
}
}
}
fnHandleError := func(ctx context.Context, err error, errCh chan error, quit context.CancelFunc) {
for {
select {
case err = <-errCh:
quit()
return
case <-ctx.Done():
return
}
}
}
var sqlCreateStmts []string
errCh := make(chan error)
queue := make(chan *schemaJob, concurrency)
task := logger.Begin(zap.InfoLevel, "create tables")
childCtx, cancel := context.WithCancel(ctx)
for i := 0; i < concurrency; i++ {
go fnCreateTable(childCtx, exec, queue, logger, errCh)
}
go fnHandleError(childCtx, err, errCh, cancel)
for tbl, sqlCreateTable := range tablesSchema {
task.Debug("create table", zap.String("schema", sqlCreateTable))

sqlCreateStmts, err = createTableIfNotExistsStmt(g.GetParser(), sqlCreateTable, database, tbl)
sqlCreateStmts, err = createTableIfNotExistsStmt(parser, sqlCreateTable, database, tbl)
if err != nil {
errCh <- err
break
}

//TODO: maybe we should put these createStems into a transaction
for _, s := range sqlCreateStmts {
err = sqlExecutor.ExecuteWithLog(
ctx,
s,
"create table",
logger.With(zap.String("table", common.UniqueTable(database, tbl))),
)
if err != nil {
break
log := zap.String("table", common.UniqueTable(database, tbl))
for _, stmt := range sqlCreateStmts {
wg.Add(1)
queue <- &schemaJob{
sql: stmt,
log: log,
}
}
}
wg.Wait()
task.End(zap.ErrorLevel, err)

return errors.Trace(err)
}

Expand Down
6 changes: 3 additions & 3 deletions lightning/restore/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (s *tidbSuite) TestInitSchema(c *C) {
ExpectClose()

s.mockDB.MatchExpectationsInOrder(false) // maps are unordered.
err := InitSchema(ctx, s.tiGlue, "db", map[string]string{
err := InitSchema(ctx, 1, s.tiGlue.GetParser(), s.tiGlue.GetSQLExecutor(), "db", map[string]string{
"t1": "create table t1 (a int primary key, b varchar(200));",
"t2": "/*!40014 SET FOREIGN_KEY_CHECKS=0*/;CREATE TABLE `db`.`t2` (xx TEXT) AUTO_INCREMENT=11203;",
})
Expand All @@ -218,7 +218,7 @@ func (s *tidbSuite) TestInitSchemaSyntaxError(c *C) {
s.mockDB.
ExpectClose()

err := InitSchema(ctx, s.tiGlue, "db", map[string]string{
err := InitSchema(ctx, 1, s.tiGlue.GetParser(), s.tiGlue.GetSQLExecutor(), "db", map[string]string{
"t1": "create table `t1` with invalid syntax;",
})
c.Assert(err, NotNil)
Expand All @@ -239,7 +239,7 @@ func (s *tidbSuite) TestInitSchemaUnsupportedSchemaError(c *C) {
s.mockDB.
ExpectClose()

err := InitSchema(ctx, s.tiGlue, "db", map[string]string{
err := InitSchema(ctx, 1, s.tiGlue.GetParser(), s.tiGlue.GetSQLExecutor(), "db", map[string]string{
"t1": "create table `t1` (a VARCHAR(999999999));",
})
c.Assert(err, ErrorMatches, ".*Column length too big.*")
Expand Down