Skip to content

Commit

Permalink
sql: emit max memory usage on the gateway node
Browse files Browse the repository at this point in the history
Previously, the max memory usage execution stat was only emitted by the last
outbox to drain metadata in a flow. However, there are cases where no outbox
is present in the flow (e.g. single-node queries). This commit adds a Gateway
field to the FlowCtx which specifies whether this flow is running on the
gateway node or not and changes outboxes to only emit this stat if they are
not run on the gateway node. Gateway node max memory usage is now emitted in
the trace during flow.Cleanup.

This commit also adds a NodeID field to the ComponentID in ComponentStats used
to uniquely describe a flow-level ComponentStat. The FlowID was used
previously, which is not unique across nodes.

Release note: None
  • Loading branch information
asubiotto committed Jan 25, 2021
1 parent 8efa63d commit 5fd21c2
Show file tree
Hide file tree
Showing 13 changed files with 172 additions and 92 deletions.
13 changes: 6 additions & 7 deletions pkg/sql/colflow/vectorized_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ func (f *vectorizedFlow) Setup(
helper,
vectorizedRemoteComponentCreator{},
recordingStats,
f.Gateway,
f.GetWaitGroup(),
f.GetSyncFlowConsumer(),
flowCtx.Cfg.NodeDialer,
Expand Down Expand Up @@ -438,6 +439,7 @@ type vectorizedFlowCreator struct {
streamIDToInputOp map[execinfrapb.StreamID]opDAGWithMetaSources
streamIDToSpecIdx map[execinfrapb.StreamID]int
recordingStats bool
isGatewayNode bool
vectorizedStatsCollectorsQueue []colexec.VectorizedStatsCollector
waitGroup *sync.WaitGroup
syncFlowConsumer execinfra.RowReceiver
Expand Down Expand Up @@ -505,6 +507,7 @@ func newVectorizedFlowCreator(
helper flowCreatorHelper,
componentCreator remoteComponentCreator,
recordingStats bool,
isGatewayNode bool,
waitGroup *sync.WaitGroup,
syncFlowConsumer execinfra.RowReceiver,
nodeDialer *nodedialer.Dialer,
Expand Down Expand Up @@ -948,17 +951,13 @@ func (s *vectorizedFlowCreator) setupOutput(
// Start a separate recording so that GetRecording will return
// the recordings for only the child spans containing stats.
ctx, span := tracing.ChildSpanRemote(ctx, "")
if atomic.AddInt32(&s.numOutboxesDrained, 1) == atomic.LoadInt32(&s.numOutboxes) {
if atomic.AddInt32(&s.numOutboxesDrained, 1) == atomic.LoadInt32(&s.numOutboxes) && !s.isGatewayNode {
// At the last outbox, we can accurately retrieve stats for the
// whole flow from parent monitors. These stats are added to a
// flow-level span.
span.SetTag(execinfrapb.FlowIDTagKey, flowCtx.ID)
span.SetSpanStats(&execinfrapb.ComponentStats{
Component: execinfrapb.ComponentID{
Type: execinfrapb.ComponentID_FLOW,
FlowID: flowCtx.ID,
// TODO(radu): the node ID should be part of the ComponentID.
},
Component: execinfrapb.FlowComponentID(outputStream.OriginNodeID, flowCtx.ID),
FlowStats: execinfrapb.FlowStats{
MaxMemUsage: optional.MakeUint(uint64(flowCtx.EvalCtx.Mon.MaximumBytes())),
},
Expand Down Expand Up @@ -1369,7 +1368,7 @@ func ConvertToVecTree(
fuseOpt = flowinfra.FuseAggressively
}
creator := newVectorizedFlowCreator(
newNoopFlowCreatorHelper(), vectorizedRemoteComponentCreator{}, false,
newNoopFlowCreatorHelper(), vectorizedRemoteComponentCreator{}, false, false,
nil, &execinfra.RowChannel{}, nil, execinfrapb.FlowID{}, colcontainer.DiskQueueCfg{},
flowCtx.Cfg.VecFDSemaphore, flowCtx.TypeResolverFactory.NewTypeResolver(flowCtx.EvalCtx.Txn),
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colflow/vectorized_flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func TestDrainOnlyInputDAG(t *testing.T) {
}
var wg sync.WaitGroup
vfc := newVectorizedFlowCreator(
&vectorizedFlowCreatorHelper{f: f}, componentCreator, false, &wg, &execinfra.RowChannel{},
&vectorizedFlowCreatorHelper{f: f}, componentCreator, false, false, &wg, &execinfra.RowChannel{},
nil /* nodeDialer */, execinfrapb.FlowID{}, colcontainer.DiskQueueCfg{},
nil /* fdSemaphore */, descs.DistSQLTypeResolver{},
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/distsql/inbound_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestOutboxInboundStreamIntegration(t *testing.T) {
}

streamID := execinfrapb.StreamID(1)
outbox := flowinfra.NewOutbox(&flowCtx, execinfra.StaticNodeID, streamID, nil /* numOutboxes */)
outbox := flowinfra.NewOutbox(&flowCtx, execinfra.StaticNodeID, streamID, nil /* numOutboxes */, false /* isGatewayNode */)
outbox.Init(rowenc.OneIntCol)

// WaitGroup for the outbox and inbound stream. If the WaitGroup is done, no
Expand Down
6 changes: 5 additions & 1 deletion pkg/sql/distsql/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,9 @@ func (ds *ServerImpl) setupFlow(
}

// Create the FlowCtx for the flow.
flowCtx := ds.NewFlowContext(ctx, req.Flow.FlowID, evalCtx, req.TraceKV, localState)
flowCtx := ds.NewFlowContext(
ctx, req.Flow.FlowID, evalCtx, req.TraceKV, localState, req.Flow.Gateway == roachpb.NodeID(ds.NodeID.SQLInstanceID()),
)

// req always contains the desired vectorize mode, regardless of whether we
// have non-nil localState.EvalContext. We don't want to update EvalContext
Expand Down Expand Up @@ -392,6 +394,7 @@ func (ds *ServerImpl) NewFlowContext(
evalCtx *tree.EvalContext,
traceKV bool,
localState LocalState,
isGatewayNode bool,
) execinfra.FlowCtx {
// TODO(radu): we should sanity check some of these fields.
flowCtx := execinfra.FlowCtx{
Expand All @@ -402,6 +405,7 @@ func (ds *ServerImpl) NewFlowContext(
NodeID: ds.ServerConfig.NodeID,
TraceKV: traceKV,
Local: localState.IsLocal,
Gateway: isGatewayNode,
}

if localState.IsLocal && localState.Collection != nil {
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/execinfra/flow_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ type FlowCtx struct {
// Local is true if this flow is being run as part of a local-only query.
Local bool

// Gateway is true if this flow is being run on the gateway node.
Gateway bool

// TypeResolverFactory is used to construct transaction bound TypeResolvers
// to resolve type references during flow setup. It is not safe for concurrent
// use and is intended to be used only during flow setup and initialization.
Expand Down
10 changes: 10 additions & 0 deletions pkg/sql/execinfrapb/component_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"strconv"
"strings"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/optional"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
Expand Down Expand Up @@ -42,6 +43,15 @@ func StreamComponentID(flowID FlowID, streamID StreamID) ComponentID {
}
}

// FlowComponentID returns a ComponentID for the given flow.
func FlowComponentID(nodeID roachpb.NodeID, flowID FlowID) ComponentID {
return ComponentID{
FlowID: flowID,
Type: ComponentID_FLOW,
NodeID: nodeID,
}
}

// FlowIDTagKey is the key used for flow id tags in tracing spans.
const (
FlowIDTagKey = tracing.TagPrefix + "flowid"
Expand Down
Loading

0 comments on commit 5fd21c2

Please sign in to comment.