Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

loader: fix concurrent usage of checkpoint's DBconn. (#552) #554

Merged
merged 1 commit into from
Mar 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion loader/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"encoding/json"
"fmt"
"strings"
"sync"
"time"

"github.com/pingcap/dm/dm/config"
Expand Down Expand Up @@ -69,6 +70,10 @@ type CheckPoint interface {
// RemoteCheckPoint implements CheckPoint by saving status in remote database system, mostly in TiDB.
// it's not thread-safe
type RemoteCheckPoint struct {
// used to protect database operation with `conn`.
// if more operations need to be protected, add another mutex or rename this one.
connMutex sync.Mutex

db *conn.BaseDB
conn *DBConn
id string
Expand Down Expand Up @@ -118,7 +123,9 @@ func (cp *RemoteCheckPoint) prepare(tctx *tcontext.Context) error {

func (cp *RemoteCheckPoint) createSchema(tctx *tcontext.Context) error {
sql2 := fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS `%s`", cp.schema)
cp.connMutex.Lock()
err := cp.conn.executeSQL(tctx, []string{sql2})
cp.connMutex.Unlock()
return terror.WithScope(err, terror.ScopeDownstream)
}

Expand All @@ -137,7 +144,9 @@ func (cp *RemoteCheckPoint) createTable(tctx *tcontext.Context) error {
);
`
sql2 := fmt.Sprintf(createTable, tableName)
cp.connMutex.Lock()
err := cp.conn.executeSQL(tctx, []string{sql2})
cp.connMutex.Unlock()
return terror.WithScope(err, terror.ScopeDownstream)
}

Expand All @@ -149,7 +158,9 @@ func (cp *RemoteCheckPoint) Load(tctx *tcontext.Context) error {
}()

query := fmt.Sprintf("SELECT `filename`,`cp_schema`,`cp_table`,`offset`,`end_pos` from `%s`.`%s` where `id`=?", cp.schema, cp.table)
cp.connMutex.Lock()
rows, err := cp.conn.querySQL(tctx, query, cp.id)
cp.connMutex.Unlock()
if err != nil {
return terror.WithScope(err, terror.ScopeDownstream)
}
Expand Down Expand Up @@ -277,7 +288,7 @@ func (cp *RemoteCheckPoint) Init(tctx *tcontext.Context, filename string, endPos

// fields[0] -> db name, fields[1] -> table name
sql2 := fmt.Sprintf("INSERT INTO `%s`.`%s` (`id`, `filename`, `cp_schema`, `cp_table`, `offset`, `end_pos`) VALUES(?,?,?,?,?,?)", cp.schema, cp.table)
cp.logCtx.L().Debug("initial checkpoint record",
cp.logCtx.L().Info("initial checkpoint record",
zap.String("sql", sql2),
zap.String("id", cp.id),
zap.String("filename", filename),
Expand All @@ -286,7 +297,9 @@ func (cp *RemoteCheckPoint) Init(tctx *tcontext.Context, filename string, endPos
zap.Int64("offset", 0),
zap.Int64("end position", endPos))
args := []interface{}{cp.id, filename, fields[0], fields[1], 0, endPos}
cp.connMutex.Lock()
err := cp.conn.executeSQL(tctx, []string{sql2}, args)
cp.connMutex.Unlock()
if err != nil {
if isErrDupEntry(err) {
cp.logCtx.L().Info("checkpoint record already exists, skip it.", zap.String("id", cp.id), zap.String("filename", filename))
Expand All @@ -299,6 +312,8 @@ func (cp *RemoteCheckPoint) Init(tctx *tcontext.Context, filename string, endPos

// ResetConn implements CheckPoint.ResetConn
func (cp *RemoteCheckPoint) ResetConn(tctx *tcontext.Context) error {
cp.connMutex.Lock()
defer cp.connMutex.Unlock()
return cp.conn.resetConn(tctx)
}

Expand All @@ -320,14 +335,18 @@ func (cp *RemoteCheckPoint) GenSQL(filename string, offset int64) string {
// Clear implements CheckPoint.Clear
func (cp *RemoteCheckPoint) Clear(tctx *tcontext.Context) error {
sql2 := fmt.Sprintf("DELETE FROM `%s`.`%s` WHERE `id` = '%s'", cp.schema, cp.table, cp.id)
cp.connMutex.Lock()
err := cp.conn.executeSQL(tctx, []string{sql2})
cp.connMutex.Unlock()
return terror.WithScope(err, terror.ScopeDownstream)
}

// Count implements CheckPoint.Count
func (cp *RemoteCheckPoint) Count(tctx *tcontext.Context) (int, error) {
query := fmt.Sprintf("SELECT COUNT(id) FROM `%s`.`%s` WHERE `id` = ?", cp.schema, cp.table)
cp.connMutex.Lock()
rows, err := cp.conn.querySQL(tctx, query, cp.id)
cp.connMutex.Unlock()
if err != nil {
return 0, terror.WithScope(err, terror.ScopeDownstream)
}
Expand Down
6 changes: 3 additions & 3 deletions loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func (w *Worker) dispatchSQL(ctx context.Context, file string, offset int64, tab
baseFile := filepath.Base(file)
err = w.checkPoint.Init(w.tctx.WithContext(ctx), baseFile, finfo.Size())
if err != nil {
w.tctx.L().Error("fail to initial checkpoint", zap.String("data file", file), log.ShortError(err))
w.tctx.L().Error("fail to initial checkpoint", zap.String("data file", file), zap.Int64("offset", offset), log.ShortError(err))
return err
}

Expand All @@ -273,7 +273,7 @@ func (w *Worker) dispatchSQL(ctx context.Context, file string, offset int64, tab
for {
select {
case <-ctx.Done():
w.tctx.L().Info("sql dispatcher is ready to quit.", zap.String("data file", file))
w.tctx.L().Info("sql dispatcher is ready to quit.", zap.String("data file", file), zap.Int64("offset", offset))
return nil
default:
// do nothing
Expand All @@ -282,7 +282,7 @@ func (w *Worker) dispatchSQL(ctx context.Context, file string, offset int64, tab
cur += int64(len(line))

if err == io.EOF {
w.tctx.L().Info("data are scanned finished.", zap.String("data file", file))
w.tctx.L().Info("data are scanned finished.", zap.String("data file", file), zap.Int64("offset", offset))
break
}

Expand Down