Skip to content

Commit

Permalink
cherry pick pingcap#25317 to release-4.0
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
jyz0309 authored and ti-srebot committed Jun 10, 2021
1 parent 57fa70a commit 9a2003a
Show file tree
Hide file tree
Showing 4 changed files with 219 additions and 1 deletion.
39 changes: 39 additions & 0 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,10 @@ func FormatSQL(sql string, pps variable.PreparedParams) stringutil.StringerFunc
var (
sessionExecuteRunDurationInternal = metrics.SessionExecuteRunDuration.WithLabelValues(metrics.LblInternal)
sessionExecuteRunDurationGeneral = metrics.SessionExecuteRunDuration.WithLabelValues(metrics.LblGeneral)
<<<<<<< HEAD
=======
totalTiFlashQuerySuccCounter = metrics.TiFlashQueryTotalCounter.WithLabelValues("", metrics.LblOK)
>>>>>>> d37062fe5... metrics: Add err label for TiFlashQueryTotalCounter (#25317)
)

// CloseRecordSet will finish the execution of current statement and do some record work
Expand All @@ -816,7 +820,12 @@ func (a *ExecStmt) CloseRecordSet(txnStartTS uint64, lastErr error) {
// 2. record summary statement.
// 3. record execute duration metric.
// 4. update the `PrevStmt` in session variable.
<<<<<<< HEAD
func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, succ bool, hasMoreResults bool) {
=======
// 5. reset `DurationParse` in session variable.
func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, err error, hasMoreResults bool) {
>>>>>>> d37062fe5... metrics: Add err label for TiFlashQueryTotalCounter (#25317)
sessVars := a.Ctx.GetSessionVars()
execDetail := sessVars.StmtCtx.GetExecDetails()
// Attach commit/lockKeys runtime stats to executor runtime stats.
Expand All @@ -835,15 +844,25 @@ func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, succ bool, hasMoreResults boo
// Only record the read keys in write statement which affect row more than 0.
a.Ctx.GetTxnWriteThroughputSLI().AddReadKeys(execDetail.ScanDetail.ProcessedKeys)
}
succ := err == nil
// `LowSlowQuery` and `SummaryStmt` must be called before recording `PrevStmt`.
a.LogSlowQuery(txnTS, succ, hasMoreResults)
a.SummaryStmt(succ)
<<<<<<< HEAD
prevStmt := a.GetTextToLog()
if sessVars.EnableRedactLog {
sessVars.PrevStmt = FormatSQL(prevStmt, nil)
} else {
pps := types.CloneRow(sessVars.PreparedParams)
sessVars.PrevStmt = FormatSQL(prevStmt, pps)
=======
if sessVars.StmtCtx.IsTiFlash.Load() {
if succ {
totalTiFlashQuerySuccCounter.Inc()
} else {
metrics.TiFlashQueryTotalCounter.WithLabelValues(metrics.ExecuteErrorToLabel(err), metrics.LblError).Inc()
}
>>>>>>> d37062fe5... metrics: Add err label for TiFlashQueryTotalCounter (#25317)
}

executeDuration := time.Since(sessVars.StartTime) - sessVars.DurationCompile
Expand All @@ -852,6 +871,26 @@ func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, succ bool, hasMoreResults boo
} else {
sessionExecuteRunDurationGeneral.Observe(executeDuration.Seconds())
}
<<<<<<< HEAD
=======
// Reset DurationParse due to the next statement may not need to be parsed (not a text protocol query).
sessVars.DurationParse = 0
}

// CloseRecordSet will finish the execution of current statement and do some record work
func (a *ExecStmt) CloseRecordSet(txnStartTS uint64, lastErr error) {
a.FinishExecuteStmt(txnStartTS, lastErr, false)
a.logAudit()
// Detach the Memory and disk tracker for the previous stmtCtx from GlobalMemoryUsageTracker and GlobalDiskUsageTracker
if stmtCtx := a.Ctx.GetSessionVars().StmtCtx; stmtCtx != nil {
if stmtCtx.DiskTracker != nil {
stmtCtx.DiskTracker.DetachFromGlobalTracker()
}
if stmtCtx.MemTracker != nil {
stmtCtx.MemTracker.DetachFromGlobalTracker()
}
}
>>>>>>> d37062fe5... metrics: Add err label for TiFlashQueryTotalCounter (#25317)
}

// LogSlowQuery is used to print the slow query in the log files.
Expand Down
11 changes: 11 additions & 0 deletions metrics/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,17 @@ var (
Name: "status",
Help: "Status of the TiDB server configurations.",
}, []string{LblType})
<<<<<<< HEAD
=======

TiFlashQueryTotalCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "tidb",
Subsystem: "server",
Name: "tiflash_query_total",
Help: "Counter of TiFlash queries.",
}, []string{LblType, LblResult})
>>>>>>> d37062fe5... metrics: Add err label for TiFlashQueryTotalCounter (#25317)
)

// ExecuteErrorToLabel converts an execute error to label.
Expand Down
2 changes: 1 addition & 1 deletion server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1479,7 +1479,7 @@ func (cc *clientConn) handleStmt(ctx context.Context, stmt ast.StmtNode, lastStm
if handled {
execStmt := cc.ctx.Value(session.ExecStmtVarKey)
if execStmt != nil {
execStmt.(*executor.ExecStmt).FinishExecuteStmt(0, err == nil, false)
execStmt.(*executor.ExecStmt).FinishExecuteStmt(0, err, false)
}

}
Expand Down
168 changes: 168 additions & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1391,6 +1391,174 @@ func (s *session) ExecuteStmt(ctx context.Context, stmtNode ast.StmtNode) (sqlex
return recordSet, nil
}

<<<<<<< HEAD
=======
func (s *session) validateStatementReadOnlyInStaleness(stmtNode ast.StmtNode) error {
vars := s.GetSessionVars()
if !vars.TxnCtx.IsStaleness && vars.TxnReadTS.PeakTxnReadTS() == 0 {
return nil
}
errMsg := "only support read-only statement during read-only staleness transactions"
node := stmtNode.(ast.Node)
switch node.(type) {
case *ast.SplitRegionStmt:
return nil
case *ast.SelectStmt, *ast.ExplainStmt, *ast.DoStmt, *ast.ShowStmt, *ast.SetOprStmt, *ast.ExecuteStmt, *ast.SetOprSelectList:
if !planner.IsReadOnly(stmtNode, vars) {
return errors.New(errMsg)
}
return nil
default:
}
// covered DeleteStmt/InsertStmt/UpdateStmt/CallStmt/LoadDataStmt
if _, ok := stmtNode.(ast.DMLNode); ok {
return errors.New(errMsg)
}
return nil
}

// querySpecialKeys contains the keys of special query, the special query will handled by handleQuerySpecial method.
var querySpecialKeys = []fmt.Stringer{
executor.LoadDataVarKey,
executor.LoadStatsVarKey,
executor.IndexAdviseVarKey,
}

func (s *session) hasQuerySpecial() bool {
found := false
s.mu.RLock()
for _, k := range querySpecialKeys {
v := s.mu.values[k]
if v != nil {
found = true
break
}
}
s.mu.RUnlock()
return found
}

// runStmt executes the sqlexec.Statement and commit or rollback the current transaction.
func runStmt(ctx context.Context, se *session, s sqlexec.Statement) (rs sqlexec.RecordSet, err error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("session.runStmt", opentracing.ChildOf(span.Context()))
span1.LogKV("sql", s.OriginText())
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
se.SetValue(sessionctx.QueryString, s.OriginText())
if _, ok := s.(*executor.ExecStmt).StmtNode.(ast.DDLNode); ok {
se.SetValue(sessionctx.LastExecuteDDL, true)
} else {
se.ClearValue(sessionctx.LastExecuteDDL)
}

sessVars := se.sessionVars

// Record diagnostic information for DML statements
if _, ok := s.(*executor.ExecStmt).StmtNode.(ast.DMLNode); ok {
defer func() {
sessVars.LastQueryInfo = variable.QueryInfo{
TxnScope: sessVars.CheckAndGetTxnScope(),
StartTS: sessVars.TxnCtx.StartTS,
ForUpdateTS: sessVars.TxnCtx.GetForUpdateTS(),
}
if err != nil {
sessVars.LastQueryInfo.ErrMsg = err.Error()
}
}()
}

// Save origTxnCtx here to avoid it reset in the transaction retry.
origTxnCtx := sessVars.TxnCtx
err = se.checkTxnAborted(s)
if err != nil {
return nil, err
}
rs, err = s.Exec(ctx)
se.updateTelemetryMetric(s.(*executor.ExecStmt))
sessVars.TxnCtx.StatementCount++
if rs != nil {
return &execStmtResult{
RecordSet: rs,
sql: s,
se: se,
}, err
}

err = finishStmt(ctx, se, err, s)
if se.hasQuerySpecial() {
// The special query will be handled later in handleQuerySpecial,
// then should call the ExecStmt.FinishExecuteStmt to finish this statement.
se.SetValue(ExecStmtVarKey, s.(*executor.ExecStmt))
} else {
// If it is not a select statement or special query, we record its slow log here,
// then it could include the transaction commit time.
s.(*executor.ExecStmt).FinishExecuteStmt(origTxnCtx.StartTS, err, false)
}
return nil, err
}

// ExecStmtVarKeyType is a dummy type to avoid naming collision in context.
type ExecStmtVarKeyType int

// String defines a Stringer function for debugging and pretty printing.
func (k ExecStmtVarKeyType) String() string {
return "exec_stmt_var_key"
}

// ExecStmtVarKey is a variable key for ExecStmt.
const ExecStmtVarKey ExecStmtVarKeyType = 0

// execStmtResult is the return value of ExecuteStmt and it implements the sqlexec.RecordSet interface.
// Why we need a struct to wrap a RecordSet and provide another RecordSet?
// This is because there are so many session state related things that definitely not belongs to the original
// RecordSet, so this struct exists and RecordSet.Close() is overrided handle that.
type execStmtResult struct {
sqlexec.RecordSet
se *session
sql sqlexec.Statement
}

func (rs *execStmtResult) Close() error {
se := rs.se
if err := resetCTEStorageMap(se); err != nil {
return finishStmt(context.Background(), se, err, rs.sql)
}
if err := rs.RecordSet.Close(); err != nil {
return finishStmt(context.Background(), se, err, rs.sql)
}
return finishStmt(context.Background(), se, nil, rs.sql)
}

func resetCTEStorageMap(se *session) error {
tmp := se.GetSessionVars().StmtCtx.CTEStorageMap
if tmp == nil {
// Close() is already called, so no need to reset. Such as TraceExec.
return nil
}
storageMap, ok := tmp.(map[int]*executor.CTEStorages)
if !ok {
return errors.New("type assertion for CTEStorageMap failed")
}
for _, v := range storageMap {
// No need to lock IterInTbl.
v.ResTbl.Lock()
defer v.ResTbl.Unlock()
err1 := v.ResTbl.DerefAndClose()
err2 := v.IterInTbl.DerefAndClose()
if err1 != nil {
return err1
}
if err2 != nil {
return err2
}
}
se.GetSessionVars().StmtCtx.CTEStorageMap = nil
return nil
}

>>>>>>> d37062fe5... metrics: Add err label for TiFlashQueryTotalCounter (#25317)
// rollbackOnError makes sure the next statement starts a new transaction with the latest InfoSchema.
func (s *session) rollbackOnError(ctx context.Context) {
if !s.sessionVars.InTxn() {
Expand Down

0 comments on commit 9a2003a

Please sign in to comment.