diff --git a/replay_file b/replay_file new file mode 100644 index 0000000000000..6cad38f954890 --- /dev/null +++ b/replay_file @@ -0,0 +1,8 @@ +5 3 test begin +3 4 test begin +5 5 test insert into t values(5) +3 6 test insert into t values(6) +5 7 test commit +5 8 test select * from t +3 8 test commit +5 8 test commit diff --git a/server/conn.go b/server/conn.go index 2cd15c8e835e0..4bfdb76ee0c7e 100644 --- a/server/conn.go +++ b/server/conn.go @@ -1030,7 +1030,6 @@ func (cc *clientConn) Run(ctx context.Context) { terror.Log(err) } }() - // Usually, client connection status changes between [dispatching] <=> [reading]. // When some event happens, server may notify this client connection by setting // the status to special values, for example: kill or graceful shutdown. diff --git a/server/http_status.go b/server/http_status.go index cbbdf03fb46ec..08b567aaf875d 100644 --- a/server/http_status.go +++ b/server/http_status.go @@ -258,8 +258,8 @@ func (s *Server) startHTTPServer() { router.Handle("/metrics/profile", profileHandler{tikvHandlerTool}) fmt.Println("register replay sql") // HTTP path for start record and replay sql. - router.Handle("/record/{filename}/{status}/{startTS}", s.newSQLRecorderHandler()) - router.Handle("/replay/{filename}", s.newSQLRecorderHandler()) + router.Handle("/record/{filename}/{recordStatus}/{startTS}", s.newSQLRecorderHandler()) + router.Handle("/replay/{filename}/{replayStatus}", s.newSQLReplayHandler(tikvHandlerTool.Store)) // HTTP path for web UI. if host, port, err := net.SplitHostPort(s.statusAddr); err == nil { if host == "" { diff --git a/server/sql_replayer.go b/server/sql_replayer.go index 9ff348194f5bd..1cf7a8466625b 100644 --- a/server/sql_replayer.go +++ b/server/sql_replayer.go @@ -16,13 +16,14 @@ package server import ( "fmt" + "github.com/pingcap/tidb/util/replayutil" "net/http" "strconv" - "github.com/pingcap/tidb/util/logutil" - "github.com/gorilla/mux" "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/util/logutil" ) // SQLRecorderHandler is the handler for dumping plan replayer file. @@ -45,6 +46,7 @@ func (h SQLRecorderHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) cfg := config.GetGlobalConfig() params := mux.Vars(req) if status, ok := params[pRecordStatus]; ok { + fmt.Println() if status == "on" { // set replay meta TS first. cfg.ReplayMetaTS, err = strconv.ParseInt(params[pStartTS], 10, 64) @@ -61,15 +63,18 @@ func (h SQLRecorderHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) } w.WriteHeader(http.StatusOK) return - } else { - w.WriteHeader(http.StatusBadRequest) } + w.WriteHeader(http.StatusBadRequest) + } -type SQLReplayHandler struct{} +// SQLReplayHandler Replay handler +type SQLReplayHandler struct { + Store kv.Storage +} -func (s *Server) newSQLReplayHandler() *SQLReplayHandler { - prh := &SQLReplayHandler{} +func (s *Server) newSQLReplayHandler(store kv.Storage) *SQLReplayHandler { + prh := &SQLReplayHandler{store} return prh } @@ -77,13 +82,12 @@ func (h SQLReplayHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { params := mux.Vars(req) if status, ok := params[pReplayStatus]; ok { if status == "on" { - logutil.StartReplay(params[pFileName]) + replayutil.StartReplay(params[pFileName], h.Store) } else { - logutil.StopReplay() + replayutil.StopReplay() } w.WriteHeader(http.StatusOK) return - } else { - w.WriteHeader(http.StatusBadRequest) } + w.WriteHeader(http.StatusBadRequest) } diff --git a/session/session.go b/session/session.go index e9e961ca39840..5eb614ad79456 100644 --- a/session/session.go +++ b/session/session.go @@ -2866,12 +2866,12 @@ func logGeneralQuery(execStmt *executor.ExecStmt, s *session, isPrepared bool) { cfg := config.GetGlobalConfig() vars := s.GetSessionVars() if !s.isInternal() && cfg.EnableReplaySQL.Load() { - go func(sql, id string, intxn bool) { + fmt.Println("print sql") + go func(sql, id string) { + //TODO: We need to add a client col also. var builder strings.Builder - if intxn { - builder.WriteString(id) - builder.WriteString(" ") - } + builder.WriteString(fmt.Sprintf("%v", vars.ConnectionID)) + builder.WriteString(" ") // Logic TS ts := strconv.FormatInt(s.sessionVars.StartTime.Unix()-cfg.ReplayMetaTS, 10) builder.WriteString(ts) @@ -2880,7 +2880,7 @@ func logGeneralQuery(execStmt *executor.ExecStmt, s *session, isPrepared bool) { builder.WriteString(text) builder.WriteString("\n") logutil.PutRecordOrDrop(builder.String()) - }(vars.SQL, vars.TxnCtx.ID.String(), vars.InTxn()) + }(vars.SQL, vars.TxnCtx.ID.String()) } if variable.ProcessGeneralLog.Load() && !vars.InRestrictedSQL { diff --git a/session/session_test.go b/session/session_test.go index acbc889102fe1..4eb92e6dacd08 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -27,6 +27,7 @@ import ( "strings" "sync" "sync/atomic" + "testing" "time" "github.com/docker/go-units" diff --git a/util/logutil/record_logger.go b/util/logutil/record_logger.go index 29f7a8ff8ba2c..346c3ef6d3a6b 100644 --- a/util/logutil/record_logger.go +++ b/util/logutil/record_logger.go @@ -22,6 +22,7 @@ func StopRecord() { RecordLogger.stopLogWorker() } +// PutRecordOrDrop put record func PutRecordOrDrop(record string) { select { case RecordLogger.recordChan <- record: diff --git a/util/logutil/record_replayer.go b/util/logutil/record_replayer.go deleted file mode 100644 index e2f8a9d315f5b..0000000000000 --- a/util/logutil/record_replayer.go +++ /dev/null @@ -1,95 +0,0 @@ -package logutil - -import ( - "bufio" - "fmt" - "os" - "strings" -) - -// RecordReplayer is used to replay sql -var RecordReplayer *recordReplayer - -// StartReplay starts replay -func StartReplay(filename string) { - RecordReplayer = newRecordPlayer(filename) - RecordReplayer.start() -} - -// StopReplay stops replay -func StopReplay() { - RecordReplayer.close <- struct{}{} -} - -func newRecordPlayer(filename string) *recordReplayer { - r := &recordReplayer{ - fileName: filename, - close: make(chan struct{}), - } - return r -} - -type recordReplayer struct { - close chan struct{} - fileName string - scanner *bufio.Scanner -} - -func (r *recordReplayer) start() { - f, err := os.OpenFile(r.fileName, os.O_RDONLY, os.ModePerm) - defer f.Close() - if err != nil { - fmt.Printf("Open file error %s\n", err.Error()) - return - } - - r.scanner = bufio.NewScanner(f) - txns := make(map[string][]string) - for r.scanner.Scan() { - select { - case <-r.close: - break - default: - } - text := r.scanner.Text() - s := strings.Split(text, " ") - if len(s) < 2 { - fmt.Printf("invalid sql log %v\n", s) - continue - } - inTxn := len(s) == 3 - if inTxn { - txnID := s[0] - sql := s[2] - txn := txns[txnID] - txn = append(txn, sql) - if sql == strings.ToLower("commit") { - tr := &txnReplayer{ - close: make(chan struct{}), - sqls: txn, - } - go tr.replay() - } else { - // todo - // replay single sql - } - } - } -} - -type txnReplayer struct { - close chan struct{} - sqls []string -} - -func (tr *txnReplayer) replay() { - for _, sql := range tr.sqls { - select { - case <-tr.close: - break - default: - // todo - // replay single sql - } - } -} diff --git a/util/replayutil/record_replayer.go b/util/replayutil/record_replayer.go new file mode 100644 index 0000000000000..304882097403f --- /dev/null +++ b/util/replayutil/record_replayer.go @@ -0,0 +1,106 @@ +package replayutil + +import ( + "bufio" + "context" + "fmt" + "os" + "strconv" + "strings" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/session" +) + +// RecordReplayer is used to replay sql +var RecordReplayer *recordReplayer +// Sessions is a map +var Sessions map[string]session.Session + +// StartReplay starts replay +func StartReplay(filename string, store kv.Storage) { + RecordReplayer = newRecordPlayer(filename, store) + go RecordReplayer.start() +} + +// StopReplay stops replay +func StopReplay() { + RecordReplayer.close <- struct{}{} +} + +func newRecordPlayer(filename string, store kv.Storage) *recordReplayer { + r := &recordReplayer{ + store: store, + fileName: filename, + close: make(chan struct{}), + } + return r +} + +type recordReplayer struct { + store kv.Storage + close chan struct{} + fileName string + scanner *bufio.Scanner +} + +func (r *recordReplayer) start() { + f, err := os.OpenFile(r.fileName, os.O_RDONLY, os.ModePerm) + defer f.Close() + if err != nil { + fmt.Printf("Open file error %s\n", err.Error()) + return + } + + r.scanner = bufio.NewScanner(f) + Sessions = make(map[string]session.Session) + start := time.Now() + for r.scanner.Scan() { + select { + case <-r.close: + break + default: + } + text := r.scanner.Text() + record := strings.SplitN(text, " ", 4) + if len(record) < 4 { + fmt.Printf("invalid sql log %v, len:%d\n", record, len(record)) + continue + } + ts, _ := strconv.ParseFloat(record[1], 10) + if sleepTime := ts - time.Since(start).Seconds(); sleepTime > 0 { + fmt.Printf("sleep time:%v\n", sleepTime) + time.Sleep(time.Duration(sleepTime) * time.Second) + } + if s, exist := Sessions[record[0]]; !exist { + se, err := session.CreateSession(r.store) + se.GetSessionVars().CurrentDB = record[2] + if err != nil { + log.Info("init replay session fail") + return + } + Sessions[record[0]] = se + go replayExecuteSQL(record[3], se, record[0]) + } else { + go replayExecuteSQL(record[3], s, record[0]) + } + } +} + +func replayExecuteSQL(sql string, s session.Session, connection string) error { + ctx := context.Background() + stmts, err := s.Parse(ctx, sql) + if err != nil { + return err + } + for _, stmt := range stmts { + _, err := s.ExecuteStmt(ctx, stmt) + if err != nil { + return errors.Trace(err) + } + } + return nil +}