Skip to content

Commit

Permalink
Merge pull request pingcap#4 from ti2sky/add-replay
Browse files Browse the repository at this point in the history
add init
  • Loading branch information
jyz0309 authored Jan 1, 2022
2 parents b9159cf + 9848a0e commit 48af694
Show file tree
Hide file tree
Showing 9 changed files with 139 additions and 115 deletions.
8 changes: 8 additions & 0 deletions replay_file
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
5 3 test begin
3 4 test begin
5 5 test insert into t values(5)
3 6 test insert into t values(6)
5 7 test commit
5 8 test select * from t
3 8 test commit
5 8 test commit
1 change: 0 additions & 1 deletion server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1030,7 +1030,6 @@ func (cc *clientConn) Run(ctx context.Context) {
terror.Log(err)
}
}()

// Usually, client connection status changes between [dispatching] <=> [reading].
// When some event happens, server may notify this client connection by setting
// the status to special values, for example: kill or graceful shutdown.
Expand Down
4 changes: 2 additions & 2 deletions server/http_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,8 @@ func (s *Server) startHTTPServer() {
router.Handle("/metrics/profile", profileHandler{tikvHandlerTool})
fmt.Println("register replay sql")
// HTTP path for start record and replay sql.
router.Handle("/record/{filename}/{status}/{startTS}", s.newSQLRecorderHandler())
router.Handle("/replay/{filename}", s.newSQLRecorderHandler())
router.Handle("/record/{filename}/{recordStatus}/{startTS}", s.newSQLRecorderHandler())
router.Handle("/replay/{filename}/{replayStatus}", s.newSQLReplayHandler(tikvHandlerTool.Store))
// HTTP path for web UI.
if host, port, err := net.SplitHostPort(s.statusAddr); err == nil {
if host == "" {
Expand Down
26 changes: 15 additions & 11 deletions server/sql_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@ package server

import (
"fmt"
"github.com/pingcap/tidb/util/replayutil"
"net/http"
"strconv"

"github.com/pingcap/tidb/util/logutil"

"github.com/gorilla/mux"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/util/logutil"
)

// SQLRecorderHandler is the handler for dumping plan replayer file.
Expand All @@ -45,6 +46,7 @@ func (h SQLRecorderHandler) ServeHTTP(w http.ResponseWriter, req *http.Request)
cfg := config.GetGlobalConfig()
params := mux.Vars(req)
if status, ok := params[pRecordStatus]; ok {
fmt.Println()
if status == "on" {
// set replay meta TS first.
cfg.ReplayMetaTS, err = strconv.ParseInt(params[pStartTS], 10, 64)
Expand All @@ -61,29 +63,31 @@ func (h SQLRecorderHandler) ServeHTTP(w http.ResponseWriter, req *http.Request)
}
w.WriteHeader(http.StatusOK)
return
} else {
w.WriteHeader(http.StatusBadRequest)
}
w.WriteHeader(http.StatusBadRequest)

}

type SQLReplayHandler struct{}
// SQLReplayHandler Replay handler
type SQLReplayHandler struct {
Store kv.Storage
}

func (s *Server) newSQLReplayHandler() *SQLReplayHandler {
prh := &SQLReplayHandler{}
func (s *Server) newSQLReplayHandler(store kv.Storage) *SQLReplayHandler {
prh := &SQLReplayHandler{store}
return prh
}

func (h SQLReplayHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
params := mux.Vars(req)
if status, ok := params[pReplayStatus]; ok {
if status == "on" {
logutil.StartReplay(params[pFileName])
replayutil.StartReplay(params[pFileName], h.Store)
} else {
logutil.StopReplay()
replayutil.StopReplay()
}
w.WriteHeader(http.StatusOK)
return
} else {
w.WriteHeader(http.StatusBadRequest)
}
w.WriteHeader(http.StatusBadRequest)
}
12 changes: 6 additions & 6 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2866,12 +2866,12 @@ func logGeneralQuery(execStmt *executor.ExecStmt, s *session, isPrepared bool) {
cfg := config.GetGlobalConfig()
vars := s.GetSessionVars()
if !s.isInternal() && cfg.EnableReplaySQL.Load() {
go func(sql, id string, intxn bool) {
fmt.Println("print sql")
go func(sql, id string) {
//TODO: We need to add a client col also.
var builder strings.Builder
if intxn {
builder.WriteString(id)
builder.WriteString(" ")
}
builder.WriteString(fmt.Sprintf("%v", vars.ConnectionID))
builder.WriteString(" ")
// Logic TS
ts := strconv.FormatInt(s.sessionVars.StartTime.Unix()-cfg.ReplayMetaTS, 10)
builder.WriteString(ts)
Expand All @@ -2880,7 +2880,7 @@ func logGeneralQuery(execStmt *executor.ExecStmt, s *session, isPrepared bool) {
builder.WriteString(text)
builder.WriteString("\n")
logutil.PutRecordOrDrop(builder.String())
}(vars.SQL, vars.TxnCtx.ID.String(), vars.InTxn())
}(vars.SQL, vars.TxnCtx.ID.String())
}

if variable.ProcessGeneralLog.Load() && !vars.InRestrictedSQL {
Expand Down
1 change: 1 addition & 0 deletions session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"strings"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/docker/go-units"
Expand Down
1 change: 1 addition & 0 deletions util/logutil/record_logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func StopRecord() {
RecordLogger.stopLogWorker()
}

// PutRecordOrDrop put record
func PutRecordOrDrop(record string) {
select {
case RecordLogger.recordChan <- record:
Expand Down
95 changes: 0 additions & 95 deletions util/logutil/record_replayer.go

This file was deleted.

106 changes: 106 additions & 0 deletions util/replayutil/record_replayer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package replayutil

import (
"bufio"
"context"
"fmt"
"os"
"strconv"
"strings"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/session"
)

// RecordReplayer is used to replay sql
var RecordReplayer *recordReplayer
// Sessions is a map
var Sessions map[string]session.Session

// StartReplay starts replay
func StartReplay(filename string, store kv.Storage) {
RecordReplayer = newRecordPlayer(filename, store)
go RecordReplayer.start()
}

// StopReplay stops replay
func StopReplay() {
RecordReplayer.close <- struct{}{}
}

func newRecordPlayer(filename string, store kv.Storage) *recordReplayer {
r := &recordReplayer{
store: store,
fileName: filename,
close: make(chan struct{}),
}
return r
}

type recordReplayer struct {
store kv.Storage
close chan struct{}
fileName string
scanner *bufio.Scanner
}

func (r *recordReplayer) start() {
f, err := os.OpenFile(r.fileName, os.O_RDONLY, os.ModePerm)
defer f.Close()
if err != nil {
fmt.Printf("Open file error %s\n", err.Error())
return
}

r.scanner = bufio.NewScanner(f)
Sessions = make(map[string]session.Session)
start := time.Now()
for r.scanner.Scan() {
select {
case <-r.close:
break
default:
}
text := r.scanner.Text()
record := strings.SplitN(text, " ", 4)
if len(record) < 4 {
fmt.Printf("invalid sql log %v, len:%d\n", record, len(record))
continue
}
ts, _ := strconv.ParseFloat(record[1], 10)
if sleepTime := ts - time.Since(start).Seconds(); sleepTime > 0 {
fmt.Printf("sleep time:%v\n", sleepTime)
time.Sleep(time.Duration(sleepTime) * time.Second)
}
if s, exist := Sessions[record[0]]; !exist {
se, err := session.CreateSession(r.store)
se.GetSessionVars().CurrentDB = record[2]
if err != nil {
log.Info("init replay session fail")
return
}
Sessions[record[0]] = se
go replayExecuteSQL(record[3], se, record[0])
} else {
go replayExecuteSQL(record[3], s, record[0])
}
}
}

func replayExecuteSQL(sql string, s session.Session, connection string) error {
ctx := context.Background()
stmts, err := s.Parse(ctx, sql)
if err != nil {
return err
}
for _, stmt := range stmts {
_, err := s.ExecuteStmt(ctx, stmt)
if err != nil {
return errors.Trace(err)
}
}
return nil
}

0 comments on commit 48af694

Please sign in to comment.