Skip to content

Commit

Permalink
*: update util/tracing and remove repeated code (#40825)
Browse files Browse the repository at this point in the history
close #40809
  • Loading branch information
tiancaiamao authored Feb 6, 2023
1 parent 1915035 commit 00d48f9
Show file tree
Hide file tree
Showing 33 changed files with 163 additions and 238 deletions.
2 changes: 1 addition & 1 deletion distsql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ go_library(
"//util/logutil",
"//util/memory",
"//util/ranger",
"//util/tracing",
"//util/trxevents",
"@com_github_opentracing_opentracing_go//:opentracing-go",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/metapb",
Expand Down
9 changes: 3 additions & 6 deletions distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"strconv"
"unsafe"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
Expand All @@ -31,6 +30,7 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/tracing"
"github.com/pingcap/tidb/util/trxevents"
"github.com/pingcap/tipb/go-tipb"
"github.com/tikv/client-go/v2/tikvrpc/interceptor"
Expand Down Expand Up @@ -64,11 +64,8 @@ func DispatchMPPTasks(ctx context.Context, sctx sessionctx.Context, tasks []*kv.
// Select sends a DAG request, returns SelectResult.
// In kvReq, KeyRanges is required, Concurrency/KeepOrder/Desc/IsolationLevel/Priority are optional.
func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fieldTypes []*types.FieldType, fb *statistics.QueryFeedback) (SelectResult, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("distsql.Select", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
r, ctx := tracing.StartRegionEx(ctx, "distsql.Select")
defer r.End()

// For testing purpose.
if hook := ctx.Value("CheckSelectRequestHook"); hook != nil {
Expand Down
1 change: 1 addition & 0 deletions executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ go_library(
"//util/tls",
"//util/topsql",
"//util/topsql/state",
"//util/tracing",
"@com_github_burntsushi_toml//:toml",
"@com_github_gogo_protobuf//proto",
"@com_github_ngaut_pools//:pools",
Expand Down
19 changes: 8 additions & 11 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"sync/atomic"
"time"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
Expand Down Expand Up @@ -65,6 +64,7 @@ import (
"github.com/pingcap/tidb/util/stringutil"
"github.com/pingcap/tidb/util/topsql"
topsqlstate "github.com/pingcap/tidb/util/topsql/state"
"github.com/pingcap/tidb/util/tracing"
"github.com/prometheus/client_golang/prometheus"
tikverr "github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/oracle"
Expand Down Expand Up @@ -283,12 +283,12 @@ func (a *ExecStmt) GetStmtNode() ast.StmtNode {

// PointGet short path for point exec directly from plan, keep only necessary steps
func (a *ExecStmt) PointGet(ctx context.Context) (*recordSet, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("ExecStmt.PointGet", opentracing.ChildOf(span.Context()))
span1.LogKV("sql", a.OriginText())
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
r, ctx := tracing.StartRegionEx(ctx, "ExecStmt.PointGet")
defer r.End()
if r.Span != nil {
r.Span.LogKV("sql", a.OriginText())
}

failpoint.Inject("assertTxnManagerInShortPointGetPlan", func() {
sessiontxn.RecordAssert(a.Ctx, "assertTxnManagerInShortPointGetPlan", true)
// stale read should not reach here
Expand Down Expand Up @@ -921,11 +921,8 @@ func (a *ExecStmt) runPessimisticSelectForUpdate(ctx context.Context, e Executor

func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, e Executor) (sqlexec.RecordSet, error) {
sctx := a.Ctx
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("executor.handleNoDelayExecutor", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
r, ctx := tracing.StartRegionEx(ctx, "executor.handleNoDelayExecutor")
defer r.End()

var err error
defer func() {
Expand Down
10 changes: 4 additions & 6 deletions executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"strings"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/config"
Expand All @@ -32,6 +31,7 @@ import (
"github.com/pingcap/tidb/sessiontxn/staleread"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/tracing"
"go.uber.org/zap"
)

Expand All @@ -56,11 +56,9 @@ type Compiler struct {

// Compile compiles an ast.StmtNode to a physical plan.
func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (_ *ExecStmt, err error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("executor.Compile", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
r, ctx := tracing.StartRegionEx(ctx, "executor.Compile")
defer r.End()

defer func() {
r := recover()
if r == nil {
Expand Down
21 changes: 7 additions & 14 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"fmt"
"math"
"runtime/pprof"
"runtime/trace"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -68,6 +67,7 @@ import (
"github.com/pingcap/tidb/util/resourcegrouptag"
"github.com/pingcap/tidb/util/topsql"
topsqlstate "github.com/pingcap/tidb/util/topsql/state"
"github.com/pingcap/tidb/util/tracing"
tikverr "github.com/tikv/client-go/v2/error"
tikvstore "github.com/tikv/client-go/v2/kv"
tikvutil "github.com/tikv/client-go/v2/util"
Expand Down Expand Up @@ -314,14 +314,10 @@ func Next(ctx context.Context, e Executor, req *chunk.Chunk) error {
if atomic.LoadUint32(&sessVars.Killed) == 1 {
return ErrQueryInterrupted
}
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan(fmt.Sprintf("%T.Next", e), opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
if trace.IsEnabled() {
defer trace.StartRegion(ctx, fmt.Sprintf("%T.Next", e)).End()
}

r, ctx := tracing.StartRegionEx(ctx, fmt.Sprintf("%T.Next", e))
defer r.End()

if topsqlstate.TopSQLEnabled() && sessVars.StmtCtx.IsSQLAndPlanRegistered.CompareAndSwap(false, true) {
registerSQLAndPlanInExecForTopSQL(sessVars)
}
Expand Down Expand Up @@ -1527,11 +1523,8 @@ func init() {
s.RewritePhaseInfo.DurationPreprocessSubQuery += time.Since(begin)
}(time.Now())

if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("executor.EvalSubQuery", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
r, ctx := tracing.StartRegionEx(ctx, "executor.EvalSubQuery")
defer r.End()

e := newExecutorBuilder(sctx, is, nil)
exec := e.build(p)
Expand Down
22 changes: 6 additions & 16 deletions executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"runtime/trace"
"time"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/expression"
Expand All @@ -37,6 +36,7 @@ import (
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/stringutil"
"github.com/pingcap/tidb/util/tracing"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -116,11 +116,8 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error {
}

func prefetchUniqueIndices(ctx context.Context, txn kv.Transaction, rows []toBeCheckedRow) (map[string][]byte, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("prefetchUniqueIndices", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
r, ctx := tracing.StartRegionEx(ctx, "prefetchUniqueIndices")
defer r.End()

nKeys := 0
for _, r := range rows {
Expand Down Expand Up @@ -148,11 +145,8 @@ func prefetchUniqueIndices(ctx context.Context, txn kv.Transaction, rows []toBeC
}

func prefetchConflictedOldRows(ctx context.Context, txn kv.Transaction, rows []toBeCheckedRow, values map[string][]byte) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("prefetchConflictedOldRows", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
r, ctx := tracing.StartRegionEx(ctx, "prefetchConflictedOldRows")
defer r.End()

batchKeys := make([]kv.Key, 0, len(rows))
for _, r := range rows {
Expand Down Expand Up @@ -182,11 +176,7 @@ func (e *InsertValues) prefetchDataCache(ctx context.Context, txn kv.Transaction
return nil
}

if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("prefetchDataCache", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
defer tracing.StartRegion(ctx, "prefetchDataCache").End()
values, err := prefetchUniqueIndices(ctx, txn, rows)
if err != nil {
return err
Expand Down
8 changes: 2 additions & 6 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"sync"
"time"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/expression"
Expand All @@ -46,6 +45,7 @@ import (
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/tracing"
"github.com/tikv/client-go/v2/txnkv/txnsnapshot"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -1142,11 +1142,7 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D
replace bool) error {
// all the rows will be checked, so it is safe to set BatchCheck = true
e.ctx.GetSessionVars().StmtCtx.BatchCheck = true
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("InsertValues.batchCheckAndInsert", opentracing.ChildOf(span.Context()))
defer span1.Finish()
opentracing.ContextWithSpan(ctx, span1)
}
defer tracing.StartRegion(ctx, "InsertValues.batchCheckAndInsert").End()
start := time.Now()
// Get keys need to be checked.
toBeCheckedRows, err := getKeysNeedCheck(ctx, e.ctx, e.Table, rows)
Expand Down
54 changes: 13 additions & 41 deletions executor/mem_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package executor
import (
"context"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/expression"
Expand All @@ -34,6 +33,7 @@ import (
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/rowcodec"
"github.com/pingcap/tidb/util/tracing"
)

type memReader interface {
Expand Down Expand Up @@ -65,11 +65,7 @@ type memIndexReader struct {
}

func buildMemIndexReader(ctx context.Context, us *UnionScanExec, idxReader *IndexReaderExecutor) *memIndexReader {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("buildMemIndexReader", opentracing.ChildOf(span.Context()))
defer span1.Finish()
opentracing.ContextWithSpan(ctx, span1)
}
defer tracing.StartRegion(ctx, "buildMemIndexReader").End()
kvRanges := idxReader.kvRanges
outputOffset := make([]int, 0, len(us.columns))
for _, col := range idxReader.outputColumns {
Expand All @@ -90,11 +86,7 @@ func buildMemIndexReader(ctx context.Context, us *UnionScanExec, idxReader *Inde
}

func (m *memIndexReader) getMemRows(ctx context.Context) ([][]types.Datum, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("memIndexReader.getMemRows", opentracing.ChildOf(span.Context()))
defer span1.Finish()
opentracing.ContextWithSpan(ctx, span1)
}
defer tracing.StartRegion(ctx, "memIndexReader.getMemRows").End()
tps := make([]*types.FieldType, 0, len(m.index.Columns)+1)
cols := m.table.Columns
for _, col := range m.index.Columns {
Expand Down Expand Up @@ -190,11 +182,7 @@ type allocBuf struct {
}

func buildMemTableReader(ctx context.Context, us *UnionScanExec, tblReader *TableReaderExecutor) *memTableReader {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("buildMemTableReader", opentracing.ChildOf(span.Context()))
defer span1.Finish()
opentracing.ContextWithSpan(ctx, span1)
}
defer tracing.StartRegion(ctx, "buildMemTableReader").End()
colIDs := make(map[int64]int, len(us.columns))
for i, col := range us.columns {
colIDs[col.ID] = i
Expand Down Expand Up @@ -235,11 +223,7 @@ func buildMemTableReader(ctx context.Context, us *UnionScanExec, tblReader *Tabl

// TODO: Try to make memXXXReader lazy, There is no need to decode many rows when parent operator only need 1 row.
func (m *memTableReader) getMemRows(ctx context.Context) ([][]types.Datum, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("memTableReader.getMemRows", opentracing.ChildOf(span.Context()))
defer span1.Finish()
opentracing.ContextWithSpan(ctx, span1)
}
defer tracing.StartRegion(ctx, "memTableReader.getMemRows").End()
mutableRow := chunk.MutRowFromTypes(m.retFieldTypes)
resultRows := make([]types.Datum, len(m.columns))
m.offsets = make([]int, len(m.columns))
Expand Down Expand Up @@ -490,11 +474,8 @@ type memIndexLookUpReader struct {
}

func buildMemIndexLookUpReader(ctx context.Context, us *UnionScanExec, idxLookUpReader *IndexLookUpExecutor) *memIndexLookUpReader {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("buildMemIndexLookUpReader", opentracing.ChildOf(span.Context()))
defer span1.Finish()
opentracing.ContextWithSpan(ctx, span1)
}
defer tracing.StartRegion(ctx, "buildMemIndexLookUpReader").End()

kvRanges := idxLookUpReader.kvRanges
outputOffset := []int{len(idxLookUpReader.index.Columns)}
memIdxReader := &memIndexReader{
Expand Down Expand Up @@ -527,11 +508,9 @@ func buildMemIndexLookUpReader(ctx context.Context, us *UnionScanExec, idxLookUp
}

func (m *memIndexLookUpReader) getMemRows(ctx context.Context) ([][]types.Datum, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("memIndexLookUpReader.getMemRows", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
r, ctx := tracing.StartRegionEx(ctx, "memIndexLookUpReader.getMemRows")
defer r.End()

kvRanges := [][]kv.KeyRange{m.idxReader.kvRanges}
tbls := []table.Table{m.table}
if m.partitionMode {
Expand Down Expand Up @@ -604,11 +583,7 @@ type memIndexMergeReader struct {
}

func buildMemIndexMergeReader(ctx context.Context, us *UnionScanExec, indexMergeReader *IndexMergeReaderExecutor) *memIndexMergeReader {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("buildMemIndexMergeReader", opentracing.ChildOf(span.Context()))
defer span1.Finish()
opentracing.ContextWithSpan(ctx, span1)
}
defer tracing.StartRegion(ctx, "buildMemIndexMergeReader").End()
indexCount := len(indexMergeReader.indexes)
memReaders := make([]memReader, 0, indexCount)
for i := 0; i < indexCount; i++ {
Expand Down Expand Up @@ -661,11 +636,8 @@ func buildMemIndexMergeReader(ctx context.Context, us *UnionScanExec, indexMerge
}

func (m *memIndexMergeReader) getMemRows(ctx context.Context) ([][]types.Datum, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("memIndexMergeReader.getMemRows", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
r, ctx := tracing.StartRegionEx(ctx, "memIndexMergeReader.getMemRows")
defer r.End()
tbls := []table.Table{m.table}
// [partNum][indexNum][rangeNum]
var kvRanges [][][]kv.KeyRange
Expand Down
Loading

0 comments on commit 00d48f9

Please sign in to comment.