diff --git a/loader/checkpoint.go b/loader/checkpoint.go index d34698c8d2..1b54821036 100644 --- a/loader/checkpoint.go +++ b/loader/checkpoint.go @@ -17,6 +17,7 @@ import ( "encoding/json" "fmt" "strings" + "sync" "time" "github.com/pingcap/dm/dm/config" @@ -69,6 +70,10 @@ type CheckPoint interface { // RemoteCheckPoint implements CheckPoint by saving status in remote database system, mostly in TiDB. // it's not thread-safe type RemoteCheckPoint struct { + // used to protect database operation with `conn`. + // if more operations need to be protected, add another mutex or rename this one. + connMutex sync.Mutex + db *conn.BaseDB conn *DBConn id string @@ -118,7 +123,9 @@ func (cp *RemoteCheckPoint) prepare(tctx *tcontext.Context) error { func (cp *RemoteCheckPoint) createSchema(tctx *tcontext.Context) error { sql2 := fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS `%s`", cp.schema) + cp.connMutex.Lock() err := cp.conn.executeSQL(tctx, []string{sql2}) + cp.connMutex.Unlock() return terror.WithScope(err, terror.ScopeDownstream) } @@ -137,7 +144,9 @@ func (cp *RemoteCheckPoint) createTable(tctx *tcontext.Context) error { ); ` sql2 := fmt.Sprintf(createTable, tableName) + cp.connMutex.Lock() err := cp.conn.executeSQL(tctx, []string{sql2}) + cp.connMutex.Unlock() return terror.WithScope(err, terror.ScopeDownstream) } @@ -149,7 +158,9 @@ func (cp *RemoteCheckPoint) Load(tctx *tcontext.Context) error { }() query := fmt.Sprintf("SELECT `filename`,`cp_schema`,`cp_table`,`offset`,`end_pos` from `%s`.`%s` where `id`=?", cp.schema, cp.table) + cp.connMutex.Lock() rows, err := cp.conn.querySQL(tctx, query, cp.id) + cp.connMutex.Unlock() if err != nil { return terror.WithScope(err, terror.ScopeDownstream) } @@ -277,7 +288,7 @@ func (cp *RemoteCheckPoint) Init(tctx *tcontext.Context, filename string, endPos // fields[0] -> db name, fields[1] -> table name sql2 := fmt.Sprintf("INSERT INTO `%s`.`%s` (`id`, `filename`, `cp_schema`, `cp_table`, `offset`, `end_pos`) VALUES(?,?,?,?,?,?)", cp.schema, cp.table) - cp.logCtx.L().Debug("initial checkpoint record", + cp.logCtx.L().Info("initial checkpoint record", zap.String("sql", sql2), zap.String("id", cp.id), zap.String("filename", filename), @@ -286,7 +297,9 @@ func (cp *RemoteCheckPoint) Init(tctx *tcontext.Context, filename string, endPos zap.Int64("offset", 0), zap.Int64("end position", endPos)) args := []interface{}{cp.id, filename, fields[0], fields[1], 0, endPos} + cp.connMutex.Lock() err := cp.conn.executeSQL(tctx, []string{sql2}, args) + cp.connMutex.Unlock() if err != nil { if isErrDupEntry(err) { cp.logCtx.L().Info("checkpoint record already exists, skip it.", zap.String("id", cp.id), zap.String("filename", filename)) @@ -299,6 +312,8 @@ func (cp *RemoteCheckPoint) Init(tctx *tcontext.Context, filename string, endPos // ResetConn implements CheckPoint.ResetConn func (cp *RemoteCheckPoint) ResetConn(tctx *tcontext.Context) error { + cp.connMutex.Lock() + defer cp.connMutex.Unlock() return cp.conn.resetConn(tctx) } @@ -320,14 +335,18 @@ func (cp *RemoteCheckPoint) GenSQL(filename string, offset int64) string { // Clear implements CheckPoint.Clear func (cp *RemoteCheckPoint) Clear(tctx *tcontext.Context) error { sql2 := fmt.Sprintf("DELETE FROM `%s`.`%s` WHERE `id` = '%s'", cp.schema, cp.table, cp.id) + cp.connMutex.Lock() err := cp.conn.executeSQL(tctx, []string{sql2}) + cp.connMutex.Unlock() return terror.WithScope(err, terror.ScopeDownstream) } // Count implements CheckPoint.Count func (cp *RemoteCheckPoint) Count(tctx *tcontext.Context) (int, error) { query := fmt.Sprintf("SELECT COUNT(id) FROM `%s`.`%s` WHERE `id` = ?", cp.schema, cp.table) + cp.connMutex.Lock() rows, err := cp.conn.querySQL(tctx, query, cp.id) + cp.connMutex.Unlock() if err != nil { return 0, terror.WithScope(err, terror.ScopeDownstream) } diff --git a/loader/loader.go b/loader/loader.go index af668749ad..905b7d1f29 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -256,7 +256,7 @@ func (w *Worker) dispatchSQL(ctx context.Context, file string, offset int64, tab baseFile := filepath.Base(file) err = w.checkPoint.Init(w.tctx.WithContext(ctx), baseFile, finfo.Size()) if err != nil { - w.tctx.L().Error("fail to initial checkpoint", zap.String("data file", file), log.ShortError(err)) + w.tctx.L().Error("fail to initial checkpoint", zap.String("data file", file), zap.Int64("offset", offset), log.ShortError(err)) return err } @@ -273,7 +273,7 @@ func (w *Worker) dispatchSQL(ctx context.Context, file string, offset int64, tab for { select { case <-ctx.Done(): - w.tctx.L().Info("sql dispatcher is ready to quit.", zap.String("data file", file)) + w.tctx.L().Info("sql dispatcher is ready to quit.", zap.String("data file", file), zap.Int64("offset", offset)) return nil default: // do nothing @@ -282,7 +282,7 @@ func (w *Worker) dispatchSQL(ctx context.Context, file string, offset int64, tab cur += int64(len(line)) if err == io.EOF { - w.tctx.L().Info("data are scanned finished.", zap.String("data file", file)) + w.tctx.L().Info("data are scanned finished.", zap.String("data file", file), zap.Int64("offset", offset)) break }