From 019df3fbb2f3edd56e74df08b96bbb6a4eb68b48 Mon Sep 17 00:00:00 2001 From: lysu Date: Wed, 25 Sep 2019 17:57:16 +0800 Subject: [PATCH] *: log each `com_stmt_fetch` separately (#11987) # Conflicts: # server/conn.go # session/tidb.go --- executor/adapter.go | 35 +++++++++++++++---------- server/conn.go | 5 +++- server/conn_stmt.go | 5 ++++ server/driver.go | 7 +++++ server/driver_tidb.go | 7 +++++ session/tidb.go | 2 +- sessionctx/variable/session.go | 40 +++++++++++++++++++---------- sessionctx/variable/session_test.go | 30 +++++++++++++--------- 8 files changed, 90 insertions(+), 41 deletions(-) diff --git a/executor/adapter.go b/executor/adapter.go index b37c85e5c86fa..33f149c9edebb 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -125,7 +125,7 @@ func (a *recordSet) NewChunk() *chunk.Chunk { func (a *recordSet) Close() error { err := a.executor.Close() - a.stmt.LogSlowQuery(a.txnStartTS, a.lastErr == nil) + a.stmt.LogSlowQuery(a.txnStartTS, a.lastErr == nil, false) sessVars := a.stmt.Ctx.GetSessionVars() pps := types.CopyRow(sessVars.PreparedParams) sessVars.PrevStmt = FormatSQL(a.stmt.OriginText(), pps) @@ -133,6 +133,11 @@ func (a *recordSet) Close() error { return errors.Trace(err) } +// OnFetchReturned implements commandLifeCycle#OnFetchReturned +func (a *recordSet) OnFetchReturned() { + a.stmt.LogSlowQuery(a.txnStartTS, a.lastErr == nil, true) +} + // ExecStmt implements the sqlexec.Statement interface, it builds a planner.Plan to an sqlexec.Statement. type ExecStmt struct { // InfoSchema stores a reference to the schema information. @@ -411,7 +416,7 @@ func FormatSQL(sql string, pps variable.PreparedParams) stringutil.StringerFunc } // LogSlowQuery is used to print the slow query in the log files. -func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool) { +func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool, hasMoreResults bool) { sessVars := a.Ctx.GetSessionVars() level := log.GetLevel() cfg := config.GetGlobalConfig() @@ -435,18 +440,20 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool) { memMax := sessVars.StmtCtx.MemTracker.MaxConsumed() _, digest := sessVars.StmtCtx.SQLDigest() slowItems := &variable.SlowQueryLogItems{ - TxnTS: txnTS, - SQL: sql.String(), - Digest: digest, - TimeTotal: costTime, - TimeParse: a.Ctx.GetSessionVars().DurationParse, - TimeCompile: a.Ctx.GetSessionVars().DurationCompile, - IndexNames: indexNames, - StatsInfos: statsInfos, - CopTasks: copTaskInfo, - ExecDetail: execDetail, - MemMax: memMax, - Succ: succ, + TxnTS: txnTS, + SQL: sql.String(), + Digest: digest, + TimeTotal: costTime, + TimeParse: a.Ctx.GetSessionVars().DurationParse, + TimeCompile: a.Ctx.GetSessionVars().DurationCompile, + IndexNames: indexNames, + StatsInfos: statsInfos, + CopTasks: copTaskInfo, + ExecDetail: execDetail, + MemMax: memMax, + Succ: succ, + Prepared: a.isPreparedStmt, + HasMoreResults: hasMoreResults, } if _, ok := a.StmtNode.(*ast.CommitStmt); ok { slowItems.PrevStmt = sessVars.PrevStmt.String() diff --git a/server/conn.go b/server/conn.go index e04f4d3861346..4a836d3066ce0 100644 --- a/server/conn.go +++ b/server/conn.go @@ -1171,7 +1171,10 @@ func (cc *clientConn) writeChunksWithFetchSize(ctx context.Context, rs ResultSet return errors.Trace(err) } } - return errors.Trace(cc.writeEOF(serverStatus)) + if cl, ok := rs.(fetchNotifier); ok { + cl.OnFetchReturned() + } + return cc.writeEOF(serverStatus) } func (cc *clientConn) writeMultiResultset(ctx context.Context, rss []ResultSet, binary bool) error { diff --git a/server/conn_stmt.go b/server/conn_stmt.go index 06f22837b724c..7743b540c798d 100644 --- a/server/conn_stmt.go +++ b/server/conn_stmt.go @@ -196,6 +196,9 @@ func (cc *clientConn) handleStmtExecute(ctx context.Context, data []byte) (err e if err != nil { return errors.Trace(err) } + if cl, ok := rs.(fetchNotifier); ok { + cl.OnFetchReturned() + } // explicitly flush columnInfo to client. return errors.Trace(cc.flush()) } @@ -208,6 +211,7 @@ const ( ) func (cc *clientConn) handleStmtFetch(ctx context.Context, data []byte) (err error) { + cc.ctx.GetSessionVars().StartTime = time.Now() stmtID, fetchSize, err := parseStmtFetchCmd(data) if err != nil { @@ -540,6 +544,7 @@ func (cc *clientConn) handleStmtReset(data []byte) (err error) { strconv.Itoa(stmtID), "stmt_reset") } stmt.Reset() + stmt.StoreResultSet(nil) return cc.writeOK() } diff --git a/server/driver.go b/server/driver.go index 1313fbdbcb2e6..619dfb8b6258c 100644 --- a/server/driver.go +++ b/server/driver.go @@ -139,3 +139,10 @@ type ResultSet interface { GetFetchedRows() []chunk.Row Close() error } + +// fetchNotifier represents notifier will be called in COM_FETCH. +type fetchNotifier interface { + // OnFetchReturned be called when COM_FETCH returns. + // it will be used in server-side cursor. + OnFetchReturned() +} diff --git a/server/driver_tidb.go b/server/driver_tidb.go index 1c9f3ddb3c284..3e9f09b408fc2 100644 --- a/server/driver_tidb.go +++ b/server/driver_tidb.go @@ -379,6 +379,13 @@ func (trs *tidbResultSet) Close() error { return trs.recordSet.Close() } +// OnFetchReturned implements fetchNotifier#OnFetchReturned +func (trs *tidbResultSet) OnFetchReturned() { + if cl, ok := trs.recordSet.(fetchNotifier); ok { + cl.OnFetchReturned() + } +} + func (trs *tidbResultSet) Columns() []*ColumnInfo { if trs.columns == nil { fields := trs.recordSet.Fields() diff --git a/session/tidb.go b/session/tidb.go index 8815122abc910..8ae236d7a91f9 100644 --- a/session/tidb.go +++ b/session/tidb.go @@ -204,7 +204,7 @@ func runStmt(ctx context.Context, sctx sessionctx.Context, s sqlexec.Statement) // If it is not a select statement, we record its slow log here, // then it could include the transaction commit time. if rs == nil { - s.(*executor.ExecStmt).LogSlowQuery(origTxnCtx.StartTS, err == nil) + s.(*executor.ExecStmt).LogSlowQuery(origTxnCtx.StartTS, err == nil, false) pps := types.CopyRow(sessVars.PreparedParams) sessVars.PrevStmt = executor.FormatSQL(s.OriginText(), pps) } diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 05d896de2b1cc..e668c97a01b50 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -816,6 +816,10 @@ const ( SlowLogConnIDStr = "Conn_ID" // SlowLogQueryTimeStr is slow log field name. SlowLogQueryTimeStr = "Query_time" + // SlowLogParseTimeStr is the parse sql time. + SlowLogParseTimeStr = "Parse_time" + // SlowLogCompileTimeStr is the compile plan time. + SlowLogCompileTimeStr = "Compile_time" // SlowLogDBStr is slow log field name. SlowLogDBStr = "DB" // SlowLogIsInternalStr is slow log field name. @@ -844,6 +848,10 @@ const ( SlowLogCopWaitMax = "Cop_wait_max" // SlowLogMemMax is the max number bytes of memory used in this statement. SlowLogMemMax = "Mem_max" + // SlowLogPrepared is used to indicate whether this sql execute in prepare. + SlowLogPrepared = "Prepared" + // SlowLogHasMoreResults is used to indicate whether this sql has more following results. + SlowLogHasMoreResults = "Has_more_results" // SlowLogSucc is used to indicate whether this sql execute successfully. SlowLogSucc = "Succ" // SlowLogPrevStmt is used to show the previous executed statement. @@ -855,19 +863,21 @@ const ( // SlowQueryLogItems is a collection of items that should be included in the // slow query log. type SlowQueryLogItems struct { - TxnTS uint64 - SQL string - Digest string - TimeTotal time.Duration - TimeParse time.Duration - TimeCompile time.Duration - IndexNames string - StatsInfos map[string]uint64 - CopTasks *stmtctx.CopTasksDetails - ExecDetail execdetails.ExecDetails - MemMax int64 - Succ bool - PrevStmt string + TxnTS uint64 + SQL string + Digest string + TimeTotal time.Duration + TimeParse time.Duration + TimeCompile time.Duration + IndexNames string + StatsInfos map[string]uint64 + CopTasks *stmtctx.CopTasksDetails + ExecDetail execdetails.ExecDetails + MemMax int64 + Succ bool + Prepared bool + HasMoreResults bool + PrevStmt string } // SlowLogFormat uses for formatting slow log. @@ -901,6 +911,8 @@ func (s *SessionVars) SlowLogFormat(logItems *SlowQueryLogItems) string { writeSlowLogItem(&buf, SlowLogConnIDStr, strconv.FormatUint(s.ConnectionID, 10)) } writeSlowLogItem(&buf, SlowLogQueryTimeStr, strconv.FormatFloat(logItems.TimeTotal.Seconds(), 'f', -1, 64)) + writeSlowLogItem(&buf, SlowLogParseTimeStr, strconv.FormatFloat(logItems.TimeParse.Seconds(), 'f', -1, 64)) + writeSlowLogItem(&buf, SlowLogCompileTimeStr, strconv.FormatFloat(logItems.TimeCompile.Seconds(), 'f', -1, 64)) if execDetailStr := logItems.ExecDetail.String(); len(execDetailStr) > 0 { buf.WriteString(SlowLogPrefixStr + execDetailStr + "\n") @@ -952,6 +964,8 @@ func (s *SessionVars) SlowLogFormat(logItems *SlowQueryLogItems) string { writeSlowLogItem(&buf, SlowLogMemMax, strconv.FormatInt(logItems.MemMax, 10)) } + writeSlowLogItem(&buf, SlowLogPrepared, strconv.FormatBool(logItems.Prepared)) + writeSlowLogItem(&buf, SlowLogHasMoreResults, strconv.FormatBool(logItems.HasMoreResults)) writeSlowLogItem(&buf, SlowLogSucc, strconv.FormatBool(logItems.Succ)) if logItems.PrevStmt != "" { diff --git a/sessionctx/variable/session_test.go b/sessionctx/variable/session_test.go index 849b5f321f010..82d283c9137c7 100644 --- a/sessionctx/variable/session_test.go +++ b/sessionctx/variable/session_test.go @@ -94,6 +94,8 @@ func (*testSessionSuite) TestSlowLogFormat(c *C) { # User: root@192.168.0.1 # Conn_ID: 1 # Query_time: 1 +# Parse_time: 0.00000001 +# Compile_time: 0.00000001 # Process_time: 2 Wait_time: 60 Backoff_time: 0.001 Request_count: 2 Total_keys: 10000 Process_keys: 20001 # DB: test # Index_names: [t1:a,t2:b] @@ -104,23 +106,27 @@ func (*testSessionSuite) TestSlowLogFormat(c *C) { # Cop_proc_avg: 1 Cop_proc_p90: 2 Cop_proc_max: 3 # Cop_wait_avg: 0.01 Cop_wait_p90: 0.02 Cop_wait_max: 0.03 # Mem_max: 2333 +# Prepared: true +# Has_more_results: true # Succ: true select * from t;` sql := "select * from t" digest := parser.DigestHash(sql) logString := seVar.SlowLogFormat(&variable.SlowQueryLogItems{ - TxnTS: txnTS, - SQL: sql, - Digest: digest, - TimeTotal: costTime, - TimeParse: time.Duration(10), - TimeCompile: time.Duration(10), - IndexNames: "[t1:a,t2:b]", - StatsInfos: statsInfos, - CopTasks: copTasks, - ExecDetail: execDetail, - MemMax: memMax, - Succ: true, + TxnTS: txnTS, + SQL: sql, + Digest: digest, + TimeTotal: costTime, + TimeParse: time.Duration(10), + TimeCompile: time.Duration(10), + IndexNames: "[t1:a,t2:b]", + StatsInfos: statsInfos, + CopTasks: copTasks, + ExecDetail: execDetail, + MemMax: memMax, + Prepared: true, + HasMoreResults: true, + Succ: true, }) c.Assert(logString, Equals, resultString) }