From 783ef82229a4d67bb408f5959df711d42a4f79f1 Mon Sep 17 00:00:00 2001 From: jyz0309 <45495947@qq.com> Date: Sun, 26 Dec 2021 18:45:19 +0800 Subject: [PATCH 1/6] add init Signed-off-by: jyz0309 <45495947@qq.com> --- server/http_status.go | 2 +- server/sql_replayer.go | 11 ++- session/session.go | 4 +- util/logutil/record_replayer.go | 125 ++++++++++++++++++++++++-------- 4 files changed, 107 insertions(+), 35 deletions(-) diff --git a/server/http_status.go b/server/http_status.go index cbbdf03fb46ec..dd3d08b74440e 100644 --- a/server/http_status.go +++ b/server/http_status.go @@ -259,7 +259,7 @@ func (s *Server) startHTTPServer() { fmt.Println("register replay sql") // HTTP path for start record and replay sql. router.Handle("/record/{filename}/{status}/{startTS}", s.newSQLRecorderHandler()) - router.Handle("/replay/{filename}", s.newSQLRecorderHandler()) + router.Handle("/replay/{filename}", s.newSQLReplayHandler(tikvHandlerTool.Store)) // HTTP path for web UI. if host, port, err := net.SplitHostPort(s.statusAddr); err == nil { if host == "" { diff --git a/server/sql_replayer.go b/server/sql_replayer.go index 9ff348194f5bd..edf57d22c41a0 100644 --- a/server/sql_replayer.go +++ b/server/sql_replayer.go @@ -16,6 +16,7 @@ package server import ( "fmt" + "github.com/pingcap/tidb/kv" "net/http" "strconv" @@ -66,10 +67,12 @@ func (h SQLRecorderHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) } } -type SQLReplayHandler struct{} +type SQLReplayHandler struct{ + Store kv.Storage +} -func (s *Server) newSQLReplayHandler() *SQLReplayHandler { - prh := &SQLReplayHandler{} +func (s *Server) newSQLReplayHandler(store kv.Storage) *SQLReplayHandler { + prh := &SQLReplayHandler{store} return prh } @@ -77,7 +80,7 @@ func (h SQLReplayHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { params := mux.Vars(req) if status, ok := params[pReplayStatus]; ok { if status == "on" { - logutil.StartReplay(params[pFileName]) + logutil.StartReplay(params[pFileName], h.Store) } else { logutil.StopReplay() } diff --git a/session/session.go b/session/session.go index e9e961ca39840..eafc2a75083af 100644 --- a/session/session.go +++ b/session/session.go @@ -2870,8 +2870,10 @@ func logGeneralQuery(execStmt *executor.ExecStmt, s *session, isPrepared bool) { var builder strings.Builder if intxn { builder.WriteString(id) - builder.WriteString(" ") + } else { + builder.WriteString("0") } + builder.WriteString(" ") // Logic TS ts := strconv.FormatInt(s.sessionVars.StartTime.Unix()-cfg.ReplayMetaTS, 10) builder.WriteString(ts) diff --git a/util/logutil/record_replayer.go b/util/logutil/record_replayer.go index e2f8a9d315f5b..cbbd265ed03b5 100644 --- a/util/logutil/record_replayer.go +++ b/util/logutil/record_replayer.go @@ -2,17 +2,27 @@ package logutil import ( "bufio" + "context" "fmt" + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/session" "os" + "strconv" "strings" + "time" ) +const TxnChannelSize = 10000 + // RecordReplayer is used to replay sql var RecordReplayer *recordReplayer +var TxnChannel = make(chan TxnRecord, TxnChannelSize) // StartReplay starts replay -func StartReplay(filename string) { - RecordReplayer = newRecordPlayer(filename) +func StartReplay(filename string, store kv.Storage) { + RecordReplayer = newRecordPlayer(filename, store) RecordReplayer.start() } @@ -21,18 +31,25 @@ func StopReplay() { RecordReplayer.close <- struct{}{} } -func newRecordPlayer(filename string) *recordReplayer { +func newRecordPlayer(filename string, store kv.Storage) *recordReplayer { + se, err := session.CreateSession(store) + if err != nil { + log.Info("init recordPlayer fail") + return nil + } r := &recordReplayer{ + Se: se, fileName: filename, - close: make(chan struct{}), + close: make(chan struct{}), } return r } type recordReplayer struct { - close chan struct{} - fileName string - scanner *bufio.Scanner + Se session.Session + close chan struct{} + fileName string + scanner *bufio.Scanner } func (r *recordReplayer) start() { @@ -44,7 +61,7 @@ func (r *recordReplayer) start() { } r.scanner = bufio.NewScanner(f) - txns := make(map[string][]string) + Txns := make(map[string]*TxnRecord) for r.scanner.Scan() { select { case <-r.close: @@ -52,44 +69,94 @@ func (r *recordReplayer) start() { default: } text := r.scanner.Text() - s := strings.Split(text, " ") - if len(s) < 2 { - fmt.Printf("invalid sql log %v\n", s) + record := strings.SplitN(text, " ", 2) + if len(record) < 3 { + fmt.Printf("invalid sql log %v\n", record) continue } - inTxn := len(s) == 3 + inTxn := record[0] != "0" if inTxn { - txnID := s[0] - sql := s[2] - txn := txns[txnID] - txn = append(txn, sql) - if sql == strings.ToLower("commit") { - tr := &txnReplayer{ - close: make(chan struct{}), - sqls: txn, + txnID := record[0] + ts := record[1] + sql := record[2] + if txn, ok := Txns[txnID]; !ok { + Txns[record[0]] = &TxnRecord{ + StartTS: ts, + Sqls: make([]string, 10), } - go tr.replay() } else { - // todo - // replay single sql + txn.Sqls = append(txn.Sqls, sql) + if sql == strings.ToLower("commit") { + Txns[txnID].CommitTS = ts + TxnChannel <- *Txns[txnID] + delete(Txns, txnID) + } else { + // todo: create a session + // replay single sql + } } + + } else { + // replay not txn } } } -type txnReplayer struct { +type TxnRecord struct { + StartTS string + CommitTS string + Sqls []string +} + +type TxnReplayer struct { + LogicTs time.Time + Se session.Session close chan struct{} - sqls []string } -func (tr *txnReplayer) replay() { - for _, sql := range tr.sqls { +func (tr *TxnReplayer) replay() { + for { select { + case record := <-TxnChannel: + startTs, _ := strconv.ParseFloat(record.StartTS,2) + if sleepTime := startTs-time.Since(tr.LogicTs).Seconds();sleepTime>0 { + time.Sleep(time.Duration(sleepTime) * time.Second) + } + go tr.TxnExecute("", record) case <-tr.close: break default: - // todo - // replay single sql } } } + +func (tr *TxnReplayer) TxnExecute(sql string, record TxnRecord) error { + + ctx := context.Background() + stmts, err := tr.Se.Parse(ctx, sql) + if err != nil { + return err + } + for _, stmt := range stmts { + _, err := tr.Se.ExecuteStmt(ctx, stmt) + if err != nil { + return errors.Trace(err) + } + } + return nil +} + +func ReplayExecuteSQL(sql string, s session.Session) error { + ctx := context.Background() + stmts, err := s.Parse(ctx, sql) + if err != nil { + return err + } + for _, stmt := range stmts { + _, err := s.ExecuteStmt(ctx, stmt) + if err != nil { + return errors.Trace(err) + } + } + return nil +} From e9023ef6d62a547dc82212f54657d78eea2bbedb Mon Sep 17 00:00:00 2001 From: jyz0309 <45495947@qq.com> Date: Sun, 26 Dec 2021 23:33:30 +0800 Subject: [PATCH 2/6] add TODO Signed-off-by: jyz0309 <45495947@qq.com> --- session/session.go | 1 + session/session_test.go | 1 + 2 files changed, 2 insertions(+) diff --git a/session/session.go b/session/session.go index eafc2a75083af..656d2c79b893d 100644 --- a/session/session.go +++ b/session/session.go @@ -2867,6 +2867,7 @@ func logGeneralQuery(execStmt *executor.ExecStmt, s *session, isPrepared bool) { vars := s.GetSessionVars() if !s.isInternal() && cfg.EnableReplaySQL.Load() { go func(sql, id string, intxn bool) { + //TODO: We need to add a client col also. var builder strings.Builder if intxn { builder.WriteString(id) diff --git a/session/session_test.go b/session/session_test.go index acbc889102fe1..4eb92e6dacd08 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -27,6 +27,7 @@ import ( "strings" "sync" "sync/atomic" + "testing" "time" "github.com/docker/go-units" From d3a5fb5458c64f9151bbc715a92c100f20687feb Mon Sep 17 00:00:00 2001 From: jyz0309 <45495947@qq.com> Date: Tue, 28 Dec 2021 21:33:44 +0800 Subject: [PATCH 3/6] add connection id Signed-off-by: jyz0309 <45495947@qq.com> --- server/conn.go | 1 - session/session.go | 2 ++ 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/server/conn.go b/server/conn.go index 2cd15c8e835e0..4bfdb76ee0c7e 100644 --- a/server/conn.go +++ b/server/conn.go @@ -1030,7 +1030,6 @@ func (cc *clientConn) Run(ctx context.Context) { terror.Log(err) } }() - // Usually, client connection status changes between [dispatching] <=> [reading]. // When some event happens, server may notify this client connection by setting // the status to special values, for example: kill or graceful shutdown. diff --git a/session/session.go b/session/session.go index 656d2c79b893d..423d96709b8cf 100644 --- a/session/session.go +++ b/session/session.go @@ -2875,6 +2875,8 @@ func logGeneralQuery(execStmt *executor.ExecStmt, s *session, isPrepared bool) { builder.WriteString("0") } builder.WriteString(" ") + builder.WriteString(fmt.Sprintf("%v", vars.ConnectionID)) + builder.WriteString(" ") // Logic TS ts := strconv.FormatInt(s.sessionVars.StartTime.Unix()-cfg.ReplayMetaTS, 10) builder.WriteString(ts) From 5f7a4f580f9b0d82ce51fb7f67e01aaba6ca1fd1 Mon Sep 17 00:00:00 2001 From: jyz0309 <45495947@qq.com> Date: Tue, 28 Dec 2021 21:52:20 +0800 Subject: [PATCH 4/6] use connection as split Signed-off-by: jyz0309 <45495947@qq.com> --- session/session.go | 6 --- util/logutil/record_replayer.go | 92 +++++---------------------------- 2 files changed, 13 insertions(+), 85 deletions(-) diff --git a/session/session.go b/session/session.go index 423d96709b8cf..394db67671615 100644 --- a/session/session.go +++ b/session/session.go @@ -2869,12 +2869,6 @@ func logGeneralQuery(execStmt *executor.ExecStmt, s *session, isPrepared bool) { go func(sql, id string, intxn bool) { //TODO: We need to add a client col also. var builder strings.Builder - if intxn { - builder.WriteString(id) - } else { - builder.WriteString("0") - } - builder.WriteString(" ") builder.WriteString(fmt.Sprintf("%v", vars.ConnectionID)) builder.WriteString(" ") // Logic TS diff --git a/util/logutil/record_replayer.go b/util/logutil/record_replayer.go index cbbd265ed03b5..3707c6c3ac33d 100644 --- a/util/logutil/record_replayer.go +++ b/util/logutil/record_replayer.go @@ -9,16 +9,12 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/session" "os" - "strconv" "strings" - "time" ) -const TxnChannelSize = 10000 - // RecordReplayer is used to replay sql var RecordReplayer *recordReplayer -var TxnChannel = make(chan TxnRecord, TxnChannelSize) +var Sessions map[string]session.Session // StartReplay starts replay func StartReplay(filename string, store kv.Storage) { @@ -32,13 +28,8 @@ func StopReplay() { } func newRecordPlayer(filename string, store kv.Storage) *recordReplayer { - se, err := session.CreateSession(store) - if err != nil { - log.Info("init recordPlayer fail") - return nil - } + r := &recordReplayer{ - Se: se, fileName: filename, close: make(chan struct{}), } @@ -46,7 +37,7 @@ func newRecordPlayer(filename string, store kv.Storage) *recordReplayer { } type recordReplayer struct { - Se session.Session + store kv.Storage close chan struct{} fileName string scanner *bufio.Scanner @@ -61,7 +52,7 @@ func (r *recordReplayer) start() { } r.scanner = bufio.NewScanner(f) - Txns := make(map[string]*TxnRecord) + Sessions = make(map[string]session.Session) for r.scanner.Scan() { select { case <-r.close: @@ -74,78 +65,21 @@ func (r *recordReplayer) start() { fmt.Printf("invalid sql log %v\n", record) continue } - inTxn := record[0] != "0" - if inTxn { - txnID := record[0] - ts := record[1] - sql := record[2] - if txn, ok := Txns[txnID]; !ok { - Txns[record[0]] = &TxnRecord{ - StartTS: ts, - Sqls: make([]string, 10), - } - } else { - txn.Sqls = append(txn.Sqls, sql) - if sql == strings.ToLower("commit") { - Txns[txnID].CommitTS = ts - TxnChannel <- *Txns[txnID] - delete(Txns, txnID) - } else { - // todo: create a session - // replay single sql - } + // fake code + if s, exist := Sessions[record[0]]; !exist { + se, err := session.CreateSession(r.store) + if err != nil { + log.Info("init recordPlayer fail") + return } - + Sessions[record[0]] = se + go ReplayExecuteSQL(record[2], s) } else { - // replay not txn - } - } -} - -type TxnRecord struct { - StartTS string - CommitTS string - Sqls []string -} - -type TxnReplayer struct { - LogicTs time.Time - Se session.Session - close chan struct{} -} - -func (tr *TxnReplayer) replay() { - for { - select { - case record := <-TxnChannel: - startTs, _ := strconv.ParseFloat(record.StartTS,2) - if sleepTime := startTs-time.Since(tr.LogicTs).Seconds();sleepTime>0 { - time.Sleep(time.Duration(sleepTime) * time.Second) - } - go tr.TxnExecute("", record) - case <-tr.close: - break - default: + go ReplayExecuteSQL(record[2], s) } } } -func (tr *TxnReplayer) TxnExecute(sql string, record TxnRecord) error { - - ctx := context.Background() - stmts, err := tr.Se.Parse(ctx, sql) - if err != nil { - return err - } - for _, stmt := range stmts { - _, err := tr.Se.ExecuteStmt(ctx, stmt) - if err != nil { - return errors.Trace(err) - } - } - return nil -} - func ReplayExecuteSQL(sql string, s session.Session) error { ctx := context.Background() stmts, err := s.Parse(ctx, sql) From aac0a43536bd4b0ab47e4dfc9fdf127072fb880b Mon Sep 17 00:00:00 2001 From: jyz0309 <45495947@qq.com> Date: Tue, 28 Dec 2021 22:04:44 +0800 Subject: [PATCH 5/6] sleep Signed-off-by: jyz0309 <45495947@qq.com> --- session/session.go | 4 ++-- util/logutil/record_replayer.go | 14 ++++++++++---- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/session/session.go b/session/session.go index 394db67671615..fbd0ab1cf228c 100644 --- a/session/session.go +++ b/session/session.go @@ -2866,7 +2866,7 @@ func logGeneralQuery(execStmt *executor.ExecStmt, s *session, isPrepared bool) { cfg := config.GetGlobalConfig() vars := s.GetSessionVars() if !s.isInternal() && cfg.EnableReplaySQL.Load() { - go func(sql, id string, intxn bool) { + go func(sql, id string) { //TODO: We need to add a client col also. var builder strings.Builder builder.WriteString(fmt.Sprintf("%v", vars.ConnectionID)) @@ -2879,7 +2879,7 @@ func logGeneralQuery(execStmt *executor.ExecStmt, s *session, isPrepared bool) { builder.WriteString(text) builder.WriteString("\n") logutil.PutRecordOrDrop(builder.String()) - }(vars.SQL, vars.TxnCtx.ID.String(), vars.InTxn()) + }(vars.SQL, vars.TxnCtx.ID.String()) } if variable.ProcessGeneralLog.Load() && !vars.InRestrictedSQL { diff --git a/util/logutil/record_replayer.go b/util/logutil/record_replayer.go index 3707c6c3ac33d..c7e9b3f57a91f 100644 --- a/util/logutil/record_replayer.go +++ b/util/logutil/record_replayer.go @@ -9,7 +9,9 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/session" "os" + "strconv" "strings" + "time" ) // RecordReplayer is used to replay sql @@ -19,7 +21,7 @@ var Sessions map[string]session.Session // StartReplay starts replay func StartReplay(filename string, store kv.Storage) { RecordReplayer = newRecordPlayer(filename, store) - RecordReplayer.start() + go RecordReplayer.start() } // StopReplay stops replay @@ -28,8 +30,8 @@ func StopReplay() { } func newRecordPlayer(filename string, store kv.Storage) *recordReplayer { - r := &recordReplayer{ + store: store, fileName: filename, close: make(chan struct{}), } @@ -53,6 +55,7 @@ func (r *recordReplayer) start() { r.scanner = bufio.NewScanner(f) Sessions = make(map[string]session.Session) + start := time.Now() for r.scanner.Scan() { select { case <-r.close: @@ -65,11 +68,14 @@ func (r *recordReplayer) start() { fmt.Printf("invalid sql log %v\n", record) continue } - // fake code + ts, _ := strconv.ParseFloat(record[1], 10) + if sleepTime := time.Since(start).Seconds() - ts; sleepTime > 0 { + time.Sleep(time.Duration(sleepTime) * time.Second) + } if s, exist := Sessions[record[0]]; !exist { se, err := session.CreateSession(r.store) if err != nil { - log.Info("init recordPlayer fail") + log.Info("init replay session fail") return } Sessions[record[0]] = se From 9848a0edc3185b569a1ce2cfbe93c73137fe6c76 Mon Sep 17 00:00:00 2001 From: jyz0309 <45495947@qq.com> Date: Wed, 29 Dec 2021 21:42:51 +0800 Subject: [PATCH 6/6] add test Signed-off-by: jyz0309 <45495947@qq.com> --- replay_file | 8 ++++++ server/http_status.go | 4 +-- server/sql_replayer.go | 21 +++++++------- session/session.go | 1 + util/logutil/record_logger.go | 1 + .../record_replayer.go | 28 +++++++++++-------- 6 files changed, 39 insertions(+), 24 deletions(-) create mode 100644 replay_file rename util/{logutil => replayutil}/record_replayer.go (78%) diff --git a/replay_file b/replay_file new file mode 100644 index 0000000000000..6cad38f954890 --- /dev/null +++ b/replay_file @@ -0,0 +1,8 @@ +5 3 test begin +3 4 test begin +5 5 test insert into t values(5) +3 6 test insert into t values(6) +5 7 test commit +5 8 test select * from t +3 8 test commit +5 8 test commit diff --git a/server/http_status.go b/server/http_status.go index dd3d08b74440e..08b567aaf875d 100644 --- a/server/http_status.go +++ b/server/http_status.go @@ -258,8 +258,8 @@ func (s *Server) startHTTPServer() { router.Handle("/metrics/profile", profileHandler{tikvHandlerTool}) fmt.Println("register replay sql") // HTTP path for start record and replay sql. - router.Handle("/record/{filename}/{status}/{startTS}", s.newSQLRecorderHandler()) - router.Handle("/replay/{filename}", s.newSQLReplayHandler(tikvHandlerTool.Store)) + router.Handle("/record/{filename}/{recordStatus}/{startTS}", s.newSQLRecorderHandler()) + router.Handle("/replay/{filename}/{replayStatus}", s.newSQLReplayHandler(tikvHandlerTool.Store)) // HTTP path for web UI. if host, port, err := net.SplitHostPort(s.statusAddr); err == nil { if host == "" { diff --git a/server/sql_replayer.go b/server/sql_replayer.go index edf57d22c41a0..1cf7a8466625b 100644 --- a/server/sql_replayer.go +++ b/server/sql_replayer.go @@ -16,14 +16,14 @@ package server import ( "fmt" - "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/util/replayutil" "net/http" "strconv" - "github.com/pingcap/tidb/util/logutil" - "github.com/gorilla/mux" "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/util/logutil" ) // SQLRecorderHandler is the handler for dumping plan replayer file. @@ -46,6 +46,7 @@ func (h SQLRecorderHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) cfg := config.GetGlobalConfig() params := mux.Vars(req) if status, ok := params[pRecordStatus]; ok { + fmt.Println() if status == "on" { // set replay meta TS first. cfg.ReplayMetaTS, err = strconv.ParseInt(params[pStartTS], 10, 64) @@ -62,12 +63,13 @@ func (h SQLRecorderHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) } w.WriteHeader(http.StatusOK) return - } else { - w.WriteHeader(http.StatusBadRequest) } + w.WriteHeader(http.StatusBadRequest) + } -type SQLReplayHandler struct{ +// SQLReplayHandler Replay handler +type SQLReplayHandler struct { Store kv.Storage } @@ -80,13 +82,12 @@ func (h SQLReplayHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { params := mux.Vars(req) if status, ok := params[pReplayStatus]; ok { if status == "on" { - logutil.StartReplay(params[pFileName], h.Store) + replayutil.StartReplay(params[pFileName], h.Store) } else { - logutil.StopReplay() + replayutil.StopReplay() } w.WriteHeader(http.StatusOK) return - } else { - w.WriteHeader(http.StatusBadRequest) } + w.WriteHeader(http.StatusBadRequest) } diff --git a/session/session.go b/session/session.go index fbd0ab1cf228c..5eb614ad79456 100644 --- a/session/session.go +++ b/session/session.go @@ -2866,6 +2866,7 @@ func logGeneralQuery(execStmt *executor.ExecStmt, s *session, isPrepared bool) { cfg := config.GetGlobalConfig() vars := s.GetSessionVars() if !s.isInternal() && cfg.EnableReplaySQL.Load() { + fmt.Println("print sql") go func(sql, id string) { //TODO: We need to add a client col also. var builder strings.Builder diff --git a/util/logutil/record_logger.go b/util/logutil/record_logger.go index 29f7a8ff8ba2c..346c3ef6d3a6b 100644 --- a/util/logutil/record_logger.go +++ b/util/logutil/record_logger.go @@ -22,6 +22,7 @@ func StopRecord() { RecordLogger.stopLogWorker() } +// PutRecordOrDrop put record func PutRecordOrDrop(record string) { select { case RecordLogger.recordChan <- record: diff --git a/util/logutil/record_replayer.go b/util/replayutil/record_replayer.go similarity index 78% rename from util/logutil/record_replayer.go rename to util/replayutil/record_replayer.go index c7e9b3f57a91f..304882097403f 100644 --- a/util/logutil/record_replayer.go +++ b/util/replayutil/record_replayer.go @@ -1,21 +1,23 @@ -package logutil +package replayutil import ( "bufio" "context" "fmt" - "github.com/pingcap/errors" - "github.com/pingcap/log" - "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/session" "os" "strconv" "strings" "time" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/session" ) // RecordReplayer is used to replay sql var RecordReplayer *recordReplayer +// Sessions is a map var Sessions map[string]session.Session // StartReplay starts replay @@ -63,30 +65,32 @@ func (r *recordReplayer) start() { default: } text := r.scanner.Text() - record := strings.SplitN(text, " ", 2) - if len(record) < 3 { - fmt.Printf("invalid sql log %v\n", record) + record := strings.SplitN(text, " ", 4) + if len(record) < 4 { + fmt.Printf("invalid sql log %v, len:%d\n", record, len(record)) continue } ts, _ := strconv.ParseFloat(record[1], 10) - if sleepTime := time.Since(start).Seconds() - ts; sleepTime > 0 { + if sleepTime := ts - time.Since(start).Seconds(); sleepTime > 0 { + fmt.Printf("sleep time:%v\n", sleepTime) time.Sleep(time.Duration(sleepTime) * time.Second) } if s, exist := Sessions[record[0]]; !exist { se, err := session.CreateSession(r.store) + se.GetSessionVars().CurrentDB = record[2] if err != nil { log.Info("init replay session fail") return } Sessions[record[0]] = se - go ReplayExecuteSQL(record[2], s) + go replayExecuteSQL(record[3], se, record[0]) } else { - go ReplayExecuteSQL(record[2], s) + go replayExecuteSQL(record[3], s, record[0]) } } } -func ReplayExecuteSQL(sql string, s session.Session) error { +func replayExecuteSQL(sql string, s session.Session, connection string) error { ctx := context.Background() stmts, err := s.Parse(ctx, sql) if err != nil {