Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: introduce sql.statement_stats.sample_rate to sample execution stats #59132

Merged
merged 3 commits into from
Jan 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions pkg/roachpb/app_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,16 @@ func (s *StatementStatistics) Add(other *StatementStatistics) {
s.OverheadLat.Add(other.OverheadLat, s.Count, other.Count)
s.BytesRead.Add(other.BytesRead, s.Count, other.Count)
s.RowsRead.Add(other.RowsRead, s.Count, other.Count)
s.BytesSentOverNetwork.Add(other.BytesSentOverNetwork, s.Count, other.Count)

// Execution stats collected using a sampling approach.
statCollectionCount := s.ExecStatCollectionCount
if statCollectionCount == 0 && other.ExecStatCollectionCount == 0 {
// If both are zero, artificially set the receiver's count to one to avoid
// division by zero in Add.
statCollectionCount = 1
}
s.BytesSentOverNetwork.Add(other.BytesSentOverNetwork, s.ExecStatCollectionCount, statCollectionCount)
s.MaxMemUsage.Add(other.MaxMemUsage, s.ExecStatCollectionCount, statCollectionCount)

if other.SensitiveInfo.LastErr != "" {
s.SensitiveInfo.LastErr = other.SensitiveInfo.LastErr
Expand Down Expand Up @@ -157,5 +166,6 @@ func (s *StatementStatistics) AlmostEqual(other *StatementStatistics, eps float6
s.SensitiveInfo.Equal(other.SensitiveInfo) &&
s.BytesRead.AlmostEqual(other.BytesRead, eps) &&
s.RowsRead.AlmostEqual(other.RowsRead, eps) &&
s.BytesSentOverNetwork.AlmostEqual(other.BytesSentOverNetwork, eps)
s.BytesSentOverNetwork.AlmostEqual(other.BytesSentOverNetwork, eps) &&
s.MaxMemUsage.AlmostEqual(other.MaxMemUsage, eps)
}
294 changes: 188 additions & 106 deletions pkg/roachpb/app_stats.pb.go

Large diffs are not rendered by default.

17 changes: 16 additions & 1 deletion pkg/roachpb/app_stats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ option go_package = "roachpb";
import "gogoproto/gogo.proto";
import "google/protobuf/timestamp.proto";

// StatementStatistics represent the statement statistics sent to the DB
// Console for a given statement fingerprint. Note that these stats are cleared
// ever diagnostics.sql_stat_reset_interval.
// N.B. When fields are added to this struct, make sure to update
// (*StatementStatistics).Add and (*StatementStatistics).AlmostEqual
// in app_stats.go.
Expand Down Expand Up @@ -89,7 +92,19 @@ message StatementStatistics {
// BytesSentOverNetwork collects the number of bytes sent over the network.
optional NumericStat bytes_sent_over_network = 17 [(gogoproto.nullable) = false];

// Note: be sure to update `sql/app_stats.go` when adding/removing fields here!
// MaxMemUsage collects the maximum memory usage that occurred on a node.
optional NumericStat max_mem_usage = 18 [(gogoproto.nullable) = false];

// ExecStatCollectionCount keeps track of how many times execution stats were
// recorded for this statement. Since this collection follows a sampling
// approach, this number is not necessarily equal to Count. Used to calculate
// the mean of the following NumericStat values:
// bytes_sent_over_network
// max_mem_usage
optional int64 exec_stat_collection_count = 19 [(gogoproto.nullable) = false];

// Note: be sure to update `sql/app_stats.go` and the comment above
// exec_stat_collection_count when adding/removing fields here!
}

message TransactionStatistics {
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ go_test(
"explain_test.go",
"explain_tree_test.go",
"indexbackfiller_test.go",
"instrumentation_test.go",
"internal_test.go",
"main_test.go",
"materialized_view_test.go",
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colflow/vectorized_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (f *vectorizedFlow) Setup(
log.Infof(ctx, "setting up vectorize flow %s", f.ID.Short())
}
recordingStats := false
if sp := tracing.SpanFromContext(ctx); sp != nil && sp.IsVerbose() {
if execinfra.ShouldCollectStats(ctx, &f.FlowCtx) {
recordingStats = true
}
helper := newVectorizedFlowCreatorHelper(f.FlowBase)
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"fmt"
"io"
"math"
"math/rand"
"strings"
"sync/atomic"
"time"
Expand Down Expand Up @@ -595,6 +596,7 @@ func (s *Server) newConnExecutor(
// ctxHolder will be reset at the start of run(). We only define
// it here so that an early call to close() doesn't panic.
ctxHolder: ctxHolder{connCtx: ctx},
rng: rand.New(rand.NewSource(timeutil.Now().UnixNano())),
executorType: executorTypeExec,
hasCreatedTemporarySchema: false,
stmtDiagnosticsRecorder: s.cfg.StmtDiagnosticsRecorder,
Expand Down Expand Up @@ -1077,6 +1079,10 @@ type connExecutor struct {
// transactions.
statsCollector *sqlStatsCollector

// rng is used to generate random numbers. An example usage is to determine
// whether to sample execution stats.
rng *rand.Rand

// mu contains of all elements of the struct that can be changed
// after initialization, and may be accessed from another thread.
mu struct {
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ func (ex *connExecutor) execStmtInOpenState(
var needFinish bool
ctx, needFinish = ih.Setup(
ctx, ex.server.cfg, ex.appStats, p, ex.stmtDiagnosticsRecorder,
stmt.AnonymizedStr, os.ImplicitTxn.Get(),
stmt.AnonymizedStr, os.ImplicitTxn.Get(), ex.rng,
)
if needFinish {
sql := stmt.SQL
Expand Down Expand Up @@ -975,6 +975,7 @@ func (ex *connExecutor) execWithDistSQLEngine(
planCtx.saveFlows = planCtx.getDefaultSaveFlowsFunc(ctx, planner, planComponentTypeMainQuery)
}
planCtx.traceMetadata = planner.instrumentation.traceMetadata
planCtx.collectExecStats = planner.instrumentation.ShouldCollectExecStats()

var evalCtxFactory func() *extendedEvalContext
if len(planner.curPlan.subqueryPlans) != 0 ||
Expand Down
4 changes: 3 additions & 1 deletion pkg/sql/distsql/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ func (ds *ServerImpl) setupFlow(

// Create the FlowCtx for the flow.
flowCtx := ds.NewFlowContext(
ctx, req.Flow.FlowID, evalCtx, req.TraceKV, localState, req.Flow.Gateway == roachpb.NodeID(ds.NodeID.SQLInstanceID()),
ctx, req.Flow.FlowID, evalCtx, req.TraceKV, req.CollectStats, localState, req.Flow.Gateway == roachpb.NodeID(ds.NodeID.SQLInstanceID()),
)

// req always contains the desired vectorize mode, regardless of whether we
Expand Down Expand Up @@ -400,6 +400,7 @@ func (ds *ServerImpl) NewFlowContext(
id execinfrapb.FlowID,
evalCtx *tree.EvalContext,
traceKV bool,
collectStats bool,
localState LocalState,
isGatewayNode bool,
) execinfra.FlowCtx {
Expand All @@ -411,6 +412,7 @@ func (ds *ServerImpl) NewFlowContext(
EvalCtx: evalCtx,
NodeID: ds.ServerConfig.NodeID,
TraceKV: traceKV,
CollectStats: collectStats,
Local: localState.IsLocal,
Gateway: isGatewayNode,
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,9 @@ type PlanningCtx struct {
// If set, we will record the mapping from planNode to tracing metadata to
// later allow associating statistics with the planNode.
traceMetadata execNodeTraceMetadata

// If set, statement execution stats should be collected.
collectExecStats bool
}

var _ physicalplan.ExprContext = &PlanningCtx{}
Expand Down
8 changes: 7 additions & 1 deletion pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func (dsp *DistSQLPlanner) setupFlows(
recv *DistSQLReceiver,
localState distsql.LocalState,
vectorizeThresholdMet bool,
collectStats bool,
) (context.Context, flowinfra.Flow, error) {
thisNodeID := dsp.gatewayNodeID
_, ok := flows[thisNodeID]
Expand All @@ -141,6 +142,7 @@ func (dsp *DistSQLPlanner) setupFlows(
Version: execinfra.Version,
EvalContext: evalCtxProto,
TraceKV: evalCtx.Tracing.KVTracingEnabled(),
CollectStats: collectStats,
}

// Start all the flows except the flow on this node (there is always a flow on
Expand Down Expand Up @@ -333,7 +335,9 @@ func (dsp *DistSQLPlanner) Run(
localState.IsLocal = true
}

ctx, flow, err := dsp.setupFlows(ctx, evalCtx, leafInputState, flows, recv, localState, vectorizedThresholdMet)
ctx, flow, err := dsp.setupFlows(
ctx, evalCtx, leafInputState, flows, recv, localState, vectorizedThresholdMet, planCtx.collectExecStats,
)
if err != nil {
recv.SetError(err)
return func() {}
Expand Down Expand Up @@ -834,6 +838,7 @@ func (dsp *DistSQLPlanner) planAndRunSubquery(
subqueryPlanCtx.saveFlows = subqueryPlanCtx.getDefaultSaveFlowsFunc(ctx, planner, planComponentTypeSubquery)
}
subqueryPlanCtx.traceMetadata = planner.instrumentation.traceMetadata
subqueryPlanCtx.collectExecStats = planner.instrumentation.ShouldCollectExecStats()
// Don't close the top-level plan from subqueries - someone else will handle
// that.
subqueryPlanCtx.ignoreClose = true
Expand Down Expand Up @@ -1126,6 +1131,7 @@ func (dsp *DistSQLPlanner) planAndRunPostquery(
postqueryPlanCtx.saveFlows = postqueryPlanCtx.getDefaultSaveFlowsFunc(ctx, planner, planComponentTypePostquery)
}
postqueryPlanCtx.traceMetadata = planner.instrumentation.traceMetadata
postqueryPlanCtx.collectExecStats = planner.instrumentation.ShouldCollectExecStats()

postqueryPhysPlan, err := dsp.createPhysPlan(postqueryPlanCtx, postqueryPlan)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/execinfra/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go_library(
"readerbase.go",
"scanbase.go",
"server_config.go",
"stats.go",
"testutils.go",
"version.go",
":gen-consumerstatus-stringer", # keep
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/execinfra/flow_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ type FlowCtx struct {
// TraceKV is true if KV tracing was requested by the session.
TraceKV bool

// CollectStats is true if execution stats collection was requested.
CollectStats bool

// Local is true if this flow is being run as part of a local-only query.
Local bool

Expand Down
25 changes: 25 additions & 0 deletions pkg/sql/execinfra/stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package execinfra

import (
"context"

"github.com/cockroachdb/cockroach/pkg/util/tracing"
)

// ShouldCollectStats is a helper function used to determine if a processor
// should collect stats. The two requirements are that tracing must be enabled
// (to be able to output the stats somewhere), and that the flowCtx.CollectStats
// flag was set by the gateway node.
func ShouldCollectStats(ctx context.Context, flowCtx *FlowCtx) bool {
return tracing.SpanFromContext(ctx) != nil && flowCtx.CollectStats
}
Loading