Skip to content

Commit

Permalink
*: record previous statement when commit is slow (pingcap#11908) (pin…
Browse files Browse the repository at this point in the history
  • Loading branch information
jackysp authored and sre-bot committed Nov 7, 2019
1 parent ecb16a2 commit d2ec98f
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 50 deletions.
67 changes: 32 additions & 35 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ func (a *recordSet) NewChunk() *chunk.Chunk {
func (a *recordSet) Close() error {
err := a.executor.Close()
a.stmt.LogSlowQuery(a.txnStartTS, a.lastErr == nil)
a.stmt.Ctx.GetSessionVars().PrevStmt = a.stmt.OriginText()
a.stmt.logAudit()
a.stmt.SummaryStmt()
return err
Expand Down Expand Up @@ -615,6 +616,16 @@ func (a *ExecStmt) logAudit() {
}
}

// FormatSQL is used to format the original SQL, e.g. truncating long SQL, appending prepared arguments.
func FormatSQL(sql string, sessVars *variable.SessionVars) string {
cfg := config.GetGlobalConfig()
length := len(sql)
if maxQueryLen := atomic.LoadUint64(&cfg.Log.QueryLogMaxLen); uint64(length) > maxQueryLen {
sql = fmt.Sprintf("%.*q(len:%d)", maxQueryLen, sql, length)
}
return QueryReplacer.Replace(sql) + sessVars.GetExecuteArgumentsInfo()
}

// LogSlowQuery is used to print the slow query in the log files.
func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool) {
sessVars := a.Ctx.GetSessionVars()
Expand All @@ -625,11 +636,7 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool) {
if costTime < threshold && level > zapcore.DebugLevel {
return
}
sql := a.Text
if maxQueryLen := atomic.LoadUint64(&cfg.Log.QueryLogMaxLen); uint64(len(sql)) > maxQueryLen {
sql = fmt.Sprintf("%.*q(len:%d)", maxQueryLen, sql, len(a.Text))
}
sql = QueryReplacer.Replace(sql) + sessVars.GetExecuteArgumentsInfo()
sql := FormatSQL(a.Text, sessVars)

var tableIDs, indexNames string
if len(sessVars.StmtCtx.TableIDs) > 0 {
Expand All @@ -642,38 +649,28 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool) {
copTaskInfo := sessVars.StmtCtx.CopTasksDetails()
statsInfos := plannercore.GetStatsInfo(a.Plan)
memMax := sessVars.StmtCtx.MemTracker.MaxConsumed()
_, digest := sessVars.StmtCtx.SQLDigest()
slowItems := &variable.SlowQueryLogItems{
TxnTS: txnTS,
SQL: sql,
Digest: digest,
TimeTotal: costTime,
TimeParse: a.Ctx.GetSessionVars().DurationParse,
TimeCompile: a.Ctx.GetSessionVars().DurationCompile,
IndexNames: indexNames,
StatsInfos: statsInfos,
CopTasks: copTaskInfo,
ExecDetail: execDetail,
MemMax: memMax,
Succ: succ,
}
if _, ok := a.StmtNode.(*ast.CommitStmt); ok {
slowItems.PrevStmt = FormatSQL(sessVars.PrevStmt, sessVars)
}
if costTime < threshold {
_, digest := sessVars.StmtCtx.SQLDigest()
logutil.SlowQueryLogger.Debug(sessVars.SlowLogFormat(&variable.SlowQueryLogItems{
TxnTS: txnTS,
SQL: sql,
Digest: digest,
TimeTotal: costTime,
TimeParse: a.Ctx.GetSessionVars().DurationParse,
TimeCompile: a.Ctx.GetSessionVars().DurationCompile,
IndexNames: indexNames,
StatsInfos: statsInfos,
CopTasks: copTaskInfo,
ExecDetail: execDetail,
MemMax: memMax,
Succ: succ,
}))
logutil.SlowQueryLogger.Debug(sessVars.SlowLogFormat(slowItems))
} else {
_, digest := sessVars.StmtCtx.SQLDigest()
logutil.SlowQueryLogger.Warn(sessVars.SlowLogFormat(&variable.SlowQueryLogItems{
TxnTS: txnTS,
SQL: sql,
Digest: digest,
TimeTotal: costTime,
TimeParse: a.Ctx.GetSessionVars().DurationParse,
TimeCompile: a.Ctx.GetSessionVars().DurationCompile,
IndexNames: indexNames,
StatsInfos: statsInfos,
CopTasks: copTaskInfo,
ExecDetail: execDetail,
MemMax: memMax,
Succ: succ,
}))
logutil.SlowQueryLogger.Warn(sessVars.SlowLogFormat(slowItems))
metrics.TotalQueryProcHistogram.Observe(costTime.Seconds())
metrics.TotalCopProcHistogram.Observe(execDetail.ProcessTime.Seconds())
metrics.TotalCopWaitHistogram.Observe(execDetail.WaitTime.Seconds())
Expand Down
29 changes: 18 additions & 11 deletions infoschema/slow_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ var slowQueryCols = []columnInfo{
{variable.SlowLogCopWaitAddr, mysql.TypeVarchar, 64, 0, nil, nil},
{variable.SlowLogMemMax, mysql.TypeLonglong, 20, 0, nil, nil},
{variable.SlowLogSucc, mysql.TypeTiny, 1, 0, nil, nil},
{variable.SlowLogPrevStmt, mysql.TypeLongBlob, types.UnspecifiedLength, 0, nil, nil},
{variable.SlowLogQuerySQLStr, mysql.TypeLongBlob, types.UnspecifiedLength, 0, nil, nil},
}

Expand Down Expand Up @@ -113,15 +114,19 @@ func ParseSlowLog(tz *time.Location, reader *bufio.Reader) ([][]types.Datum, err
// Parse slow log field.
if strings.HasPrefix(line, variable.SlowLogRowPrefixStr) {
line = line[len(variable.SlowLogRowPrefixStr):]
fieldValues := strings.Split(line, " ")
for i := 0; i < len(fieldValues)-1; i += 2 {
field := fieldValues[i]
if strings.HasSuffix(field, ":") {
field = field[:len(field)-1]
}
err = st.setFieldValue(tz, field, fieldValues[i+1])
if err != nil {
return rows, err
if strings.HasPrefix(line, variable.SlowLogPrevStmtPrefix) {
st.prevStmt = line[len(variable.SlowLogPrevStmtPrefix):]
} else {
fieldValues := strings.Split(line, " ")
for i := 0; i < len(fieldValues)-1; i += 2 {
field := fieldValues[i]
if strings.HasSuffix(field, ":") {
field = field[:len(field)-1]
}
err = st.setFieldValue(tz, field, fieldValues[i+1])
if err != nil {
return rows, err
}
}
}
} else if strings.HasSuffix(line, variable.SlowLogSQLSuffixStr) {
Expand Down Expand Up @@ -185,7 +190,6 @@ type slowQueryTuple struct {
processKeys uint64
db string
indexIDs string
isInternal bool
digest string
statsInfo string
avgProcessTime float64
Expand All @@ -197,8 +201,10 @@ type slowQueryTuple struct {
maxWaitTime float64
maxWaitAddress string
memMax int64
succ bool
prevStmt string
sql string
isInternal bool
succ bool
}

func (st *slowQueryTuple) setFieldValue(tz *time.Location, field, value string) error {
Expand Down Expand Up @@ -314,6 +320,7 @@ func (st *slowQueryTuple) convertToDatumRow() []types.Datum {
} else {
record = append(record, types.NewIntDatum(0))
}
record = append(record, types.NewStringDatum(st.prevStmt))
record = append(record, types.NewStringDatum(st.sql))
return record
}
Expand Down
3 changes: 2 additions & 1 deletion infoschema/slow_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func (s *testSuite) TestParseSlowLogFile(c *C) {
# Cop_wait_avg: 0.05 Cop_wait_p90: 0.6 Cop_wait_max: 0.8 Cop_wait_addr: 0.0.0.0:20160
# Mem_max: 70724
# Succ: false
# Prev_stmt: update t set i = 1;
select * from t;`)
reader := bufio.NewReader(slowLog)
loc, err := time.LoadLocation("Asia/Shanghai")
Expand All @@ -54,7 +55,7 @@ select * from t;`)
}
recordString += str
}
expectRecordString := "2019-04-28 15:24:04.309074,405888132465033227,,,0,0.216905,0.021,0,0,1,637,0,,,1,42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772,t1:1,t2:2,0.1,0.2,0.03,127.0.0.1:20160,0.05,0.6,0.8,0.0.0.0:20160,70724,0,select * from t;"
expectRecordString := "2019-04-28 15:24:04.309074,405888132465033227,,,0,0.216905,0.021,0,0,1,637,0,,,1,42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772,t1:1,t2:2,0.1,0.2,0.03,127.0.0.1:20160,0.05,0.6,0.8,0.0.0.0:20160,70724,0,update t set i = 1;,select * from t;"
c.Assert(expectRecordString, Equals, recordString)

// fix sql contain '# ' bug
Expand Down
5 changes: 3 additions & 2 deletions infoschema/tables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ func (s *testTableSuite) TestSlowQuery(c *C) {
# Cop_wait_avg: 0.05 Cop_wait_p90: 0.6 Cop_wait_max: 0.8 Cop_wait_addr: 0.0.0.0:20160
# Mem_max: 70724
# Succ: true
# Prev_stmt: update t set i = 2;
select * from t_slim;`))
c.Assert(f.Sync(), IsNil)
c.Assert(err, IsNil)
Expand All @@ -483,10 +484,10 @@ select * from t_slim;`))
tk.MustExec("set time_zone = '+08:00';")
re := tk.MustQuery("select * from information_schema.slow_query")
re.Check(testutil.RowsWithSep("|",
"2019-02-12 19:33:56.571953|406315658548871171|root|127.0.0.1|6|4.895492|0.161|0.101|0.092|1|100001|100000|test||0|42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772|t1:1,t2:2|0.1|0.2|0.03|127.0.0.1:20160|0.05|0.6|0.8|0.0.0.0:20160|70724|1|select * from t_slim;"))
"2019-02-12 19:33:56.571953|406315658548871171|root|127.0.0.1|6|4.895492|0.161|0.101|0.092|1|100001|100000|test||0|42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772|t1:1,t2:2|0.1|0.2|0.03|127.0.0.1:20160|0.05|0.6|0.8|0.0.0.0:20160|70724|1|update t set i = 2;|select * from t_slim;"))
tk.MustExec("set time_zone = '+00:00';")
re = tk.MustQuery("select * from information_schema.slow_query")
re.Check(testutil.RowsWithSep("|", "2019-02-12 11:33:56.571953|406315658548871171|root|127.0.0.1|6|4.895492|0.161|0.101|0.092|1|100001|100000|test||0|42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772|t1:1,t2:2|0.1|0.2|0.03|127.0.0.1:20160|0.05|0.6|0.8|0.0.0.0:20160|70724|1|select * from t_slim;"))
re.Check(testutil.RowsWithSep("|", "2019-02-12 11:33:56.571953|406315658548871171|root|127.0.0.1|6|4.895492|0.161|0.101|0.092|1|100001|100000|test||0|42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772|t1:1,t2:2|0.1|0.2|0.03|127.0.0.1:20160|0.05|0.6|0.8|0.0.0.0:20160|70724|1|update t set i = 2;|select * from t_slim;"))

// Test for long query.
_, err = f.Write([]byte(`
Expand Down
1 change: 1 addition & 0 deletions session/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ func runStmt(ctx context.Context, sctx sessionctx.Context, s sqlexec.Statement)
if rs == nil {
s.(*executor.ExecStmt).LogSlowQuery(origTxnCtx.StartTS, err == nil)
s.(*executor.ExecStmt).SummaryStmt()
sessVars.PrevStmt = s.OriginText()
}
}()

Expand Down
15 changes: 14 additions & 1 deletion sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,11 +399,14 @@ type SessionVars struct {
// StartTime is the start time of the last query.
StartTime time.Time

// DurationParse is the duration of pasing SQL string to AST of the last query.
// DurationParse is the duration of parsing SQL string to AST of the last query.
DurationParse time.Duration

// DurationCompile is the duration of compiling AST to execution plan of the last query.
DurationCompile time.Duration

// PrevStmt is used to store the previous executed statement in the current session.
PrevStmt string
}

// ConnectionInfo present connection used by audit.
Expand Down Expand Up @@ -1037,6 +1040,10 @@ const (
SlowLogMemMax = "Mem_max"
// SlowLogSucc is used to indicate whether this sql execute successfully.
SlowLogSucc = "Succ"
// SlowLogPrevStmt is used to show the previous executed statement.
SlowLogPrevStmt = "Prev_stmt"
// SlowLogPrevStmtPrefix is the prefix of Prev_stmt in slow log file.
SlowLogPrevStmtPrefix = SlowLogPrevStmt + SlowLogSpaceMarkStr
)

// SlowQueryLogItems is a collection of items that should be included in the
Expand All @@ -1054,6 +1061,7 @@ type SlowQueryLogItems struct {
ExecDetail execdetails.ExecDetails
MemMax int64
Succ bool
PrevStmt string
}

// SlowLogFormat uses for formatting slow log.
Expand All @@ -1074,6 +1082,7 @@ type SlowQueryLogItems struct {
// # Cop_wait: Avg_time: 10ms P90_time: 20ms Max_time: 30ms Max_Addr: 10.6.131.79
// # Memory_max: 4096
// # Succ: true
// # Prev_stmt: begin;
// select * from t_slim;
func (s *SessionVars) SlowLogFormat(logItems *SlowQueryLogItems) string {
var buf bytes.Buffer
Expand Down Expand Up @@ -1141,6 +1150,10 @@ func (s *SessionVars) SlowLogFormat(logItems *SlowQueryLogItems) string {

writeSlowLogItem(&buf, SlowLogSucc, strconv.FormatBool(logItems.Succ))

if logItems.PrevStmt != "" {
writeSlowLogItem(&buf, SlowLogPrevStmt, logItems.PrevStmt)
}

buf.WriteString(logItems.SQL)
if len(logItems.SQL) == 0 || logItems.SQL[len(logItems.SQL)-1] != ';' {
buf.WriteString(";")
Expand Down
2 changes: 2 additions & 0 deletions util/signal/signal_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
package signal

import (
"context"
"os"
"os/signal"
"syscall"

"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)

Expand Down

0 comments on commit d2ec98f

Please sign in to comment.