Skip to content

Commit

Permalink
Merge #94750
Browse files Browse the repository at this point in the history
94750: contention: refactor to collect in sql stats r=j82w a=j82w

1. All the stats and events are collected in the same place which makes it easier to troubleshoot and maintain.
2. It improves performance by having the contention events processing being done in the same place.
3. The waiting statement id and waiting statement fingerprint are available higher in stack where the stats are collected. This will make it easier for users to root cause when they have large transactions with many statements.

closes: #94749

Release note: none

Co-authored-by: j82w <jwilley@cockroachlabs.com>
  • Loading branch information
craig[bot] and j82w committed Jan 9, 2023
2 parents f0d4f4e + bb1b8a1 commit 45df230
Show file tree
Hide file tree
Showing 21 changed files with 32 additions and 82 deletions.
1 change: 0 additions & 1 deletion pkg/ccl/backupccl/backup_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,6 @@ func distBackup(
noTxn, /* txn - the flow does not read or write the database */
nil, /* clockUpdater */
evalCtx.Tracing,
evalCtx.ExecCfg.ContentionRegistry,
)
defer recv.Release()

Expand Down
1 change: 0 additions & 1 deletion pkg/ccl/backupccl/restore_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,6 @@ func distRestore(
noTxn, /* txn - the flow does not read or write the database */
nil, /* clockUpdater */
evalCtx.Tracing,
evalCtx.ExecCfg.ContentionRegistry,
)
defer recv.Release()

Expand Down
1 change: 0 additions & 1 deletion pkg/ccl/changefeedccl/cdceval/expr_eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,6 @@ func (e *Evaluator) executePlan(
nil,
nil, /* clockUpdater */
&sql.SessionTracing{},
e.execCfg.ContentionRegistry,
)

// Start execution.
Expand Down
1 change: 0 additions & 1 deletion pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,6 @@ func startDistChangefeed(
noTxn,
nil, /* clockUpdater */
evalCtx.Tracing,
execCtx.ExecCfg().ContentionRegistry,
)
defer recv.Release()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ func distStreamIngest(
noTxn,
nil, /* clockUpdater */
evalCtx.Tracing,
execCfg.ContentionRegistry,
)
defer recv.Release()

Expand Down
1 change: 0 additions & 1 deletion pkg/sql/apply_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,6 @@ func runPlanInsidePlan(
params.p.Txn(),
params.ExecCfg().Clock,
params.p.extendedEvalCtx.Tracing,
params.p.ExecCfg().ContentionRegistry,
)
defer recv.Release()

Expand Down
2 changes: 0 additions & 2 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -1075,7 +1075,6 @@ func (sc *SchemaChanger) distIndexBackfill(
nil, /* txn - the flow does not run wholly in a txn */
sc.clock,
evalCtx.Tracing,
sc.execCfg.ContentionRegistry,
)
defer recv.Release()

Expand Down Expand Up @@ -1320,7 +1319,6 @@ func (sc *SchemaChanger) distColumnBackfill(
nil, /* txn - the flow does not run wholly in a txn */
sc.clock,
evalCtx.Tracing,
sc.execCfg.ContentionRegistry,
)
defer recv.Release()

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -1591,7 +1591,6 @@ func (ex *connExecutor) execWithDistSQLEngine(
planner.txn,
ex.server.cfg.Clock,
&ex.sessionTracing,
ex.server.cfg.ContentionRegistry,
)
recv.progressAtomic = progressAtomic
if ex.server.cfg.TestingKnobs.DistSQLReceiverPushCallbackFactory != nil {
Expand Down Expand Up @@ -2414,6 +2413,7 @@ func (ex *connExecutor) recordTransactionFinish(

if contentionDuration := ex.extraTxnState.accumulatedStats.ContentionTime.Nanoseconds(); contentionDuration > 0 {
ex.metrics.EngineMetrics.SQLContendedTxns.Inc(1)
ex.planner.DistSQLPlanner().distSQLSrv.Metrics.ContendedQueriesCount.Inc(1)
}

ex.txnIDCacheWriter.Record(contentionpb.ResolvedTxnID{
Expand Down
9 changes: 4 additions & 5 deletions pkg/sql/contentionpb/contention.proto
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package cockroach.sql.contentionpb;
option go_package = "contentionpb";

import "roachpb/api.proto";

import "gogoproto/gogo.proto";
import "google/protobuf/duration.proto";
import "google/protobuf/timestamp.proto";
Expand Down Expand Up @@ -167,8 +166,8 @@ message ExtendedContentionEvent {
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.TransactionFingerprintID"
];

google.protobuf.Timestamp collection_ts = 5 [
(gogoproto.nullable) = false,
(gogoproto.stdtime) = true
];
google.protobuf.Timestamp collection_ts = 5 [
(gogoproto.nullable) = false,
(gogoproto.stdtime) = true
];
}
1 change: 0 additions & 1 deletion pkg/sql/distsql_physical_planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,6 @@ func TestDistSQLReceiverUpdatesCaches(t *testing.T) {
nil, /* txn */
nil, /* clockUpdater */
&SessionTracing{},
nil, /* contentionRegistry */
)

replicas := []roachpb.ReplicaDescriptor{{ReplicaID: 1}, {ReplicaID: 2}, {ReplicaID: 3}}
Expand Down
1 change: 0 additions & 1 deletion pkg/sql/distsql_plan_changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,6 @@ func TestCdcExpressionExecution(t *testing.T) {
nil,
nil, /* clockUpdater */
planner.extendedEvalCtx.Tracing,
planner.execCfg.ContentionRegistry,
)
defer r.Release()

Expand Down
1 change: 0 additions & 1 deletion pkg/sql/distsql_plan_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,6 @@ func (dsp *DistSQLPlanner) planAndRunCreateStats(
txn,
evalCtx.ExecCfg.Clock,
evalCtx.Tracing,
evalCtx.ExecCfg.ContentionRegistry,
)
defer recv.Release()

Expand Down
70 changes: 16 additions & 54 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql/colflow"
"github.com/cockroachdb/cockroach/pkg/sql/contention"
"github.com/cockroachdb/cockroach/pkg/sql/contentionpb"
"github.com/cockroachdb/cockroach/pkg/sql/distsql"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
Expand All @@ -49,15 +47,13 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/ring"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
pbtypes "github.com/gogo/protobuf/types"
)

var settingDistSQLNumRunners = settings.RegisterIntSetting(
Expand Down Expand Up @@ -761,7 +757,6 @@ func (dsp *DistSQLPlanner) Run(
defer dsp.distSQLSrv.ServerConfig.Metrics.QueryStop()

recv.outputTypes = plan.GetResultTypes()
recv.contendedQueryMetric = dsp.distSQLSrv.Metrics.ContendedQueriesCount
if multitenant.TenantRUEstimateEnabled.Get(&dsp.st.SV) &&
dsp.distSQLSrv.TenantCostController != nil && planCtx.planner != nil {
if instrumentation := planCtx.planner.curPlan.instrumentation; instrumentation != nil {
Expand Down Expand Up @@ -909,12 +904,6 @@ type DistSQLReceiver struct {
expectedRowsRead int64
progressAtomic *uint64

// contendedQueryMetric is a Counter that is incremented at most once if the
// query produces at least one contention event.
contendedQueryMetric *metric.Counter
// contentionRegistry is a Registry that contention events are added to.
contentionRegistry *contention.Registry

testingKnobs struct {
// pushCallback, if set, will be called every time DistSQLReceiver.Push
// is called, with the same arguments.
Expand Down Expand Up @@ -1129,7 +1118,6 @@ func MakeDistSQLReceiver(
txn *kv.Txn,
clockUpdater clockUpdater,
tracing *SessionTracing,
contentionRegistry *contention.Registry,
) *DistSQLReceiver {
consumeCtx, cleanup := tracing.TraceExecConsume(ctx)
r := receiverSyncPool.Get().(*DistSQLReceiver)
Expand All @@ -1142,15 +1130,14 @@ func MakeDistSQLReceiver(
}
}
*r = DistSQLReceiver{
ctx: consumeCtx,
cleanup: cleanup,
rangeCache: rangeCache,
txn: txn,
clockUpdater: clockUpdater,
stats: &topLevelQueryStats{},
stmtType: stmtType,
tracing: tracing,
contentionRegistry: contentionRegistry,
ctx: consumeCtx,
cleanup: cleanup,
rangeCache: rangeCache,
txn: txn,
clockUpdater: clockUpdater,
stats: &topLevelQueryStats{},
stmtType: stmtType,
tracing: tracing,
}
r.resultWriterMu.row = resultWriter
r.resultWriterMu.batch = batchWriter
Expand All @@ -1169,15 +1156,14 @@ func (r *DistSQLReceiver) Release() {
func (r *DistSQLReceiver) clone() *DistSQLReceiver {
ret := receiverSyncPool.Get().(*DistSQLReceiver)
*ret = DistSQLReceiver{
ctx: r.ctx,
cleanup: func() {},
rangeCache: r.rangeCache,
txn: r.txn,
clockUpdater: r.clockUpdater,
stats: r.stats,
stmtType: tree.Rows,
tracing: r.tracing,
contentionRegistry: r.contentionRegistry,
ctx: r.ctx,
cleanup: func() {},
rangeCache: r.rangeCache,
txn: r.txn,
clockUpdater: r.clockUpdater,
stats: r.stats,
stmtType: tree.Rows,
tracing: r.tracing,
}
return ret
}
Expand Down Expand Up @@ -1291,30 +1277,6 @@ func (r *DistSQLReceiver) pushMeta(meta *execinfrapb.ProducerMetadata) execinfra
if span := tracing.SpanFromContext(r.ctx); span != nil {
span.ImportRemoteRecording(meta.TraceData)
}
var ev roachpb.ContentionEvent
for i := range meta.TraceData {
meta.TraceData[i].Structured(func(any *pbtypes.Any, _ time.Time) {
if !pbtypes.Is(any, &ev) {
return
}
if err := pbtypes.UnmarshalAny(any, &ev); err != nil {
return
}
if r.contendedQueryMetric != nil {
// Increment the contended query metric at most once
// if the query sees at least one contention event.
r.contendedQueryMetric.Inc(1)
r.contendedQueryMetric = nil
}
contentionEvent := contentionpb.ExtendedContentionEvent{
BlockingEvent: ev,
}
if r.txn != nil {
contentionEvent.WaitingTxnID = r.txn.ID()
}
r.contentionRegistry.AddContentionEvent(contentionEvent)
})
}
}
if meta.Metrics != nil {
r.stats.bytesRead += meta.Metrics.BytesRead
Expand Down
3 changes: 0 additions & 3 deletions pkg/sql/distsql_running_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ func TestDistSQLRunningInAbortedTxn(t *testing.T) {
txn,
execCfg.Clock,
p.ExtendedEvalContext().Tracing,
execCfg.ContentionRegistry,
)

// We need to re-plan every time, since the plan is closed automatically
Expand Down Expand Up @@ -223,7 +222,6 @@ func TestDistSQLReceiverErrorRanking(t *testing.T) {
txn,
nil, /* clockUpdater */
&SessionTracing{},
nil, /* contentionRegistry */
)

retryErr := roachpb.NewErrorWithTxn(
Expand Down Expand Up @@ -367,7 +365,6 @@ func TestDistSQLReceiverDrainsOnError(t *testing.T) {
nil, /* txn */
nil, /* clockUpdater */
&SessionTracing{},
nil, /* contentionRegistry */
)
status := recv.Push(nil /* row */, &execinfrapb.ProducerMetadata{Err: errors.New("some error")})
require.Equal(t, execinfra.DrainRequested, status)
Expand Down
12 changes: 11 additions & 1 deletion pkg/sql/executor_statement_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"strconv"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/contentionpb"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/idxrecommendations"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -185,7 +186,7 @@ func (ex *connExecutor) recordStatementSummary(
}
recordedStmtStats := sqlstats.RecordedStmtStats{
SessionID: ex.sessionID,
StatementID: planner.stmt.QueryID,
StatementID: stmt.QueryID,
AutoRetryCount: automaticRetryCount,
AutoRetryReason: ex.state.mu.autoRetryReason,
RowsAffected: rowsAffected,
Expand Down Expand Up @@ -226,6 +227,15 @@ func (ex *connExecutor) recordStatementSummary(
// Record statement execution statistics if span is recorded and no error was
// encountered while collecting query-level statistics.
if queryLevelStatsOk {
for _, ev := range queryLevelStats.ContentionEvents {
contentionEvent := contentionpb.ExtendedContentionEvent{
BlockingEvent: ev,
WaitingTxnID: planner.txn.ID(),
}

ex.server.cfg.ContentionRegistry.AddContentionEvent(contentionEvent)
}

err = ex.statsCollector.RecordStatementExecStats(recordedStmtStatsKey, *queryLevelStats)
if err != nil {
if log.V(2 /* level */) {
Expand Down
1 change: 0 additions & 1 deletion pkg/sql/importer/import_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,6 @@ func distImport(
nil, /* txn - the flow does not read or write the database */
nil, /* clockUpdater */
evalCtx.Tracing,
evalCtx.ExecCfg.ContentionRegistry,
)
defer recv.Release()

Expand Down
1 change: 0 additions & 1 deletion pkg/sql/index_backfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@ func (ib *IndexBackfillPlanner) plan(
nil, /* txn - the flow does not run wholly in a txn */
ib.execCfg.Clock,
evalCtx.Tracing,
ib.execCfg.ContentionRegistry,
)
defer recv.Release()
evalCtxCopy := evalCtx
Expand Down
1 change: 0 additions & 1 deletion pkg/sql/mvcc_backfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ func (im *IndexBackfillerMergePlanner) plan(
nil, /* txn - the flow does not run wholly in a txn */
im.execCfg.Clock,
evalCtx.Tracing,
im.execCfg.ContentionRegistry,
)
defer recv.Release()
evalCtxCopy := evalCtx
Expand Down
1 change: 0 additions & 1 deletion pkg/sql/schema_changer.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,6 @@ func (sc *SchemaChanger) backfillQueryIntoTable(
// because it sets "enabled: false" and thus none of the
// other fields are used.
&SessionTracing{},
sc.execCfg.ContentionRegistry,
)
defer recv.Release()

Expand Down
2 changes: 0 additions & 2 deletions pkg/sql/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ func (dsp *DistSQLPlanner) Exec(
p.txn,
execCfg.Clock,
p.ExtendedEvalContext().Tracing,
execCfg.ContentionRegistry,
)
defer recv.Release()

Expand Down Expand Up @@ -174,7 +173,6 @@ func (dsp *DistSQLPlanner) ExecLocalAll(
p.txn,
execCfg.Clock,
p.ExtendedEvalContext().Tracing,
execCfg.ContentionRegistry,
)
defer recv.Release()

Expand Down
1 change: 0 additions & 1 deletion pkg/sql/ttl/ttljob/ttljob.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,6 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err
nil, /* txn */
nil, /* clockUpdater */
evalCtx.Tracing,
execCfg.ContentionRegistry,
)
defer distSQLReceiver.Release()

Expand Down

0 comments on commit 45df230

Please sign in to comment.