From 5fd21c24926fd0da43f74f21e5175eb59acd1c12 Mon Sep 17 00:00:00 2001 From: Alfonso Subiotto Marques Date: Mon, 25 Jan 2021 16:35:59 +0100 Subject: [PATCH] sql: emit max memory usage on the gateway node Previously, the max memory usage execution stat was only emitted by the last outbox to drain metadata in a flow. However, there are cases where no outbox is present in the flow (e.g. single-node queries). This commit adds a Gateway field to the FlowCtx which specifies whether this flow is running on the gateway node or not and changes outboxes to only emit this stat if they are not run on the gateway node. Gateway node max memory usage is now emitted in the trace during flow.Cleanup. This commit also adds a NodeID field to the ComponentID in ComponentStats used to uniquely describe a flow-level ComponentStat. The FlowID was used previously, which is not unique across nodes. Release note: None --- pkg/sql/colflow/vectorized_flow.go | 13 +- pkg/sql/colflow/vectorized_flow_test.go | 2 +- pkg/sql/distsql/inbound_test.go | 2 +- pkg/sql/distsql/server.go | 6 +- pkg/sql/execinfra/flow_context.go | 3 + pkg/sql/execinfrapb/component_stats.go | 10 ++ pkg/sql/execinfrapb/component_stats.pb.go | 179 +++++++++++++--------- pkg/sql/execinfrapb/component_stats.proto | 8 + pkg/sql/flowinfra/BUILD.bazel | 1 + pkg/sql/flowinfra/flow.go | 16 ++ pkg/sql/flowinfra/outbox.go | 10 +- pkg/sql/flowinfra/outbox_test.go | 12 +- pkg/sql/rowflow/row_based_flow.go | 2 +- 13 files changed, 172 insertions(+), 92 deletions(-) diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index d26980c9f2f..8f448a09231 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -195,6 +195,7 @@ func (f *vectorizedFlow) Setup( helper, vectorizedRemoteComponentCreator{}, recordingStats, + f.Gateway, f.GetWaitGroup(), f.GetSyncFlowConsumer(), flowCtx.Cfg.NodeDialer, @@ -438,6 +439,7 @@ type vectorizedFlowCreator struct { streamIDToInputOp map[execinfrapb.StreamID]opDAGWithMetaSources streamIDToSpecIdx map[execinfrapb.StreamID]int recordingStats bool + isGatewayNode bool vectorizedStatsCollectorsQueue []colexec.VectorizedStatsCollector waitGroup *sync.WaitGroup syncFlowConsumer execinfra.RowReceiver @@ -505,6 +507,7 @@ func newVectorizedFlowCreator( helper flowCreatorHelper, componentCreator remoteComponentCreator, recordingStats bool, + isGatewayNode bool, waitGroup *sync.WaitGroup, syncFlowConsumer execinfra.RowReceiver, nodeDialer *nodedialer.Dialer, @@ -948,17 +951,13 @@ func (s *vectorizedFlowCreator) setupOutput( // Start a separate recording so that GetRecording will return // the recordings for only the child spans containing stats. ctx, span := tracing.ChildSpanRemote(ctx, "") - if atomic.AddInt32(&s.numOutboxesDrained, 1) == atomic.LoadInt32(&s.numOutboxes) { + if atomic.AddInt32(&s.numOutboxesDrained, 1) == atomic.LoadInt32(&s.numOutboxes) && !s.isGatewayNode { // At the last outbox, we can accurately retrieve stats for the // whole flow from parent monitors. These stats are added to a // flow-level span. span.SetTag(execinfrapb.FlowIDTagKey, flowCtx.ID) span.SetSpanStats(&execinfrapb.ComponentStats{ - Component: execinfrapb.ComponentID{ - Type: execinfrapb.ComponentID_FLOW, - FlowID: flowCtx.ID, - // TODO(radu): the node ID should be part of the ComponentID. - }, + Component: execinfrapb.FlowComponentID(outputStream.OriginNodeID, flowCtx.ID), FlowStats: execinfrapb.FlowStats{ MaxMemUsage: optional.MakeUint(uint64(flowCtx.EvalCtx.Mon.MaximumBytes())), }, @@ -1369,7 +1368,7 @@ func ConvertToVecTree( fuseOpt = flowinfra.FuseAggressively } creator := newVectorizedFlowCreator( - newNoopFlowCreatorHelper(), vectorizedRemoteComponentCreator{}, false, + newNoopFlowCreatorHelper(), vectorizedRemoteComponentCreator{}, false, false, nil, &execinfra.RowChannel{}, nil, execinfrapb.FlowID{}, colcontainer.DiskQueueCfg{}, flowCtx.Cfg.VecFDSemaphore, flowCtx.TypeResolverFactory.NewTypeResolver(flowCtx.EvalCtx.Txn), ) diff --git a/pkg/sql/colflow/vectorized_flow_test.go b/pkg/sql/colflow/vectorized_flow_test.go index 86f19f3ed04..22aaddfcf64 100644 --- a/pkg/sql/colflow/vectorized_flow_test.go +++ b/pkg/sql/colflow/vectorized_flow_test.go @@ -222,7 +222,7 @@ func TestDrainOnlyInputDAG(t *testing.T) { } var wg sync.WaitGroup vfc := newVectorizedFlowCreator( - &vectorizedFlowCreatorHelper{f: f}, componentCreator, false, &wg, &execinfra.RowChannel{}, + &vectorizedFlowCreatorHelper{f: f}, componentCreator, false, false, &wg, &execinfra.RowChannel{}, nil /* nodeDialer */, execinfrapb.FlowID{}, colcontainer.DiskQueueCfg{}, nil /* fdSemaphore */, descs.DistSQLTypeResolver{}, ) diff --git a/pkg/sql/distsql/inbound_test.go b/pkg/sql/distsql/inbound_test.go index 468125d852a..412194b224e 100644 --- a/pkg/sql/distsql/inbound_test.go +++ b/pkg/sql/distsql/inbound_test.go @@ -95,7 +95,7 @@ func TestOutboxInboundStreamIntegration(t *testing.T) { } streamID := execinfrapb.StreamID(1) - outbox := flowinfra.NewOutbox(&flowCtx, execinfra.StaticNodeID, streamID, nil /* numOutboxes */) + outbox := flowinfra.NewOutbox(&flowCtx, execinfra.StaticNodeID, streamID, nil /* numOutboxes */, false /* isGatewayNode */) outbox.Init(rowenc.OneIntCol) // WaitGroup for the outbox and inbound stream. If the WaitGroup is done, no diff --git a/pkg/sql/distsql/server.go b/pkg/sql/distsql/server.go index 2aa49f4d3be..76689f70000 100644 --- a/pkg/sql/distsql/server.go +++ b/pkg/sql/distsql/server.go @@ -306,7 +306,9 @@ func (ds *ServerImpl) setupFlow( } // Create the FlowCtx for the flow. - flowCtx := ds.NewFlowContext(ctx, req.Flow.FlowID, evalCtx, req.TraceKV, localState) + flowCtx := ds.NewFlowContext( + ctx, req.Flow.FlowID, evalCtx, req.TraceKV, localState, req.Flow.Gateway == roachpb.NodeID(ds.NodeID.SQLInstanceID()), + ) // req always contains the desired vectorize mode, regardless of whether we // have non-nil localState.EvalContext. We don't want to update EvalContext @@ -392,6 +394,7 @@ func (ds *ServerImpl) NewFlowContext( evalCtx *tree.EvalContext, traceKV bool, localState LocalState, + isGatewayNode bool, ) execinfra.FlowCtx { // TODO(radu): we should sanity check some of these fields. flowCtx := execinfra.FlowCtx{ @@ -402,6 +405,7 @@ func (ds *ServerImpl) NewFlowContext( NodeID: ds.ServerConfig.NodeID, TraceKV: traceKV, Local: localState.IsLocal, + Gateway: isGatewayNode, } if localState.IsLocal && localState.Collection != nil { diff --git a/pkg/sql/execinfra/flow_context.go b/pkg/sql/execinfra/flow_context.go index b5584a970ec..7aa499f5df3 100644 --- a/pkg/sql/execinfra/flow_context.go +++ b/pkg/sql/execinfra/flow_context.go @@ -62,6 +62,9 @@ type FlowCtx struct { // Local is true if this flow is being run as part of a local-only query. Local bool + // Gateway is true if this flow is being run on the gateway node. + Gateway bool + // TypeResolverFactory is used to construct transaction bound TypeResolvers // to resolve type references during flow setup. It is not safe for concurrent // use and is intended to be used only during flow setup and initialization. diff --git a/pkg/sql/execinfrapb/component_stats.go b/pkg/sql/execinfrapb/component_stats.go index a2d8925d254..4f7b554d5a5 100644 --- a/pkg/sql/execinfrapb/component_stats.go +++ b/pkg/sql/execinfrapb/component_stats.go @@ -15,6 +15,7 @@ import ( "strconv" "strings" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/optional" "github.com/cockroachdb/cockroach/pkg/util/protoutil" @@ -42,6 +43,15 @@ func StreamComponentID(flowID FlowID, streamID StreamID) ComponentID { } } +// FlowComponentID returns a ComponentID for the given flow. +func FlowComponentID(nodeID roachpb.NodeID, flowID FlowID) ComponentID { + return ComponentID{ + FlowID: flowID, + Type: ComponentID_FLOW, + NodeID: nodeID, + } +} + // FlowIDTagKey is the key used for flow id tags in tracing spans. const ( FlowIDTagKey = tracing.TagPrefix + "flowid" diff --git a/pkg/sql/execinfrapb/component_stats.pb.go b/pkg/sql/execinfrapb/component_stats.pb.go index cb7413a2a16..5ae32c27e5b 100644 --- a/pkg/sql/execinfrapb/component_stats.pb.go +++ b/pkg/sql/execinfrapb/component_stats.pb.go @@ -8,6 +8,8 @@ import fmt "fmt" import math "math" import optional "github.com/cockroachdb/cockroach/pkg/util/optional" +import github_com_cockroachdb_cockroach_pkg_roachpb "github.com/cockroachdb/cockroach/pkg/roachpb" + import io "io" // Reference imports to suppress errors if they are not otherwise used. @@ -67,7 +69,7 @@ func (x *ComponentID_Type) UnmarshalJSON(data []byte) error { return nil } func (ComponentID_Type) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_component_stats_b759f3ec168a9421, []int{0, 0} + return fileDescriptor_component_stats_1e4b47cb6b511195, []int{0, 0} } // ComponentID identifies a component in a flow. There are multiple types of @@ -79,13 +81,18 @@ type ComponentID struct { // Identifier of this component, within the domain of components of the same // type. ID int32 `protobuf:"varint,3,opt,name=id" json:"id"` + // NodeID of the node this component ran on. + // TODO(asubiotto): This is only used when Type = FLOW to uniquely describe + // a flow (since flows on different nodes still have the same FlowID). Use + // this for processors/streams as well. + NodeID github_com_cockroachdb_cockroach_pkg_roachpb.NodeID `protobuf:"varint,4,opt,name=node_id,json=nodeId,casttype=github.com/cockroachdb/cockroach/pkg/roachpb.NodeID" json:"node_id"` } func (m *ComponentID) Reset() { *m = ComponentID{} } func (m *ComponentID) String() string { return proto.CompactTextString(m) } func (*ComponentID) ProtoMessage() {} func (*ComponentID) Descriptor() ([]byte, []int) { - return fileDescriptor_component_stats_b759f3ec168a9421, []int{0} + return fileDescriptor_component_stats_1e4b47cb6b511195, []int{0} } func (m *ComponentID) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -132,7 +139,7 @@ func (m *ComponentStats) Reset() { *m = ComponentStats{} } func (m *ComponentStats) String() string { return proto.CompactTextString(m) } func (*ComponentStats) ProtoMessage() {} func (*ComponentStats) Descriptor() ([]byte, []int) { - return fileDescriptor_component_stats_b759f3ec168a9421, []int{1} + return fileDescriptor_component_stats_1e4b47cb6b511195, []int{1} } func (m *ComponentStats) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -169,7 +176,7 @@ func (m *InputStats) Reset() { *m = InputStats{} } func (m *InputStats) String() string { return proto.CompactTextString(m) } func (*InputStats) ProtoMessage() {} func (*InputStats) Descriptor() ([]byte, []int) { - return fileDescriptor_component_stats_b759f3ec168a9421, []int{2} + return fileDescriptor_component_stats_1e4b47cb6b511195, []int{2} } func (m *InputStats) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -213,7 +220,7 @@ func (m *NetworkRxStats) Reset() { *m = NetworkRxStats{} } func (m *NetworkRxStats) String() string { return proto.CompactTextString(m) } func (*NetworkRxStats) ProtoMessage() {} func (*NetworkRxStats) Descriptor() ([]byte, []int) { - return fileDescriptor_component_stats_b759f3ec168a9421, []int{3} + return fileDescriptor_component_stats_1e4b47cb6b511195, []int{3} } func (m *NetworkRxStats) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -251,7 +258,7 @@ func (m *NetworkTxStats) Reset() { *m = NetworkTxStats{} } func (m *NetworkTxStats) String() string { return proto.CompactTextString(m) } func (*NetworkTxStats) ProtoMessage() {} func (*NetworkTxStats) Descriptor() ([]byte, []int) { - return fileDescriptor_component_stats_b759f3ec168a9421, []int{4} + return fileDescriptor_component_stats_1e4b47cb6b511195, []int{4} } func (m *NetworkTxStats) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -292,7 +299,7 @@ func (m *KVStats) Reset() { *m = KVStats{} } func (m *KVStats) String() string { return proto.CompactTextString(m) } func (*KVStats) ProtoMessage() {} func (*KVStats) Descriptor() ([]byte, []int) { - return fileDescriptor_component_stats_b759f3ec168a9421, []int{5} + return fileDescriptor_component_stats_1e4b47cb6b511195, []int{5} } func (m *KVStats) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -331,7 +338,7 @@ func (m *ExecStats) Reset() { *m = ExecStats{} } func (m *ExecStats) String() string { return proto.CompactTextString(m) } func (*ExecStats) ProtoMessage() {} func (*ExecStats) Descriptor() ([]byte, []int) { - return fileDescriptor_component_stats_b759f3ec168a9421, []int{6} + return fileDescriptor_component_stats_1e4b47cb6b511195, []int{6} } func (m *ExecStats) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -368,7 +375,7 @@ func (m *OutputStats) Reset() { *m = OutputStats{} } func (m *OutputStats) String() string { return proto.CompactTextString(m) } func (*OutputStats) ProtoMessage() {} func (*OutputStats) Descriptor() ([]byte, []int) { - return fileDescriptor_component_stats_b759f3ec168a9421, []int{7} + return fileDescriptor_component_stats_1e4b47cb6b511195, []int{7} } func (m *OutputStats) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -402,7 +409,7 @@ func (m *FlowStats) Reset() { *m = FlowStats{} } func (m *FlowStats) String() string { return proto.CompactTextString(m) } func (*FlowStats) ProtoMessage() {} func (*FlowStats) Descriptor() ([]byte, []int) { - return fileDescriptor_component_stats_b759f3ec168a9421, []int{8} + return fileDescriptor_component_stats_1e4b47cb6b511195, []int{8} } func (m *FlowStats) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -468,6 +475,9 @@ func (m *ComponentID) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x18 i++ i = encodeVarintComponentStats(dAtA, i, uint64(m.ID)) + dAtA[i] = 0x20 + i++ + i = encodeVarintComponentStats(dAtA, i, uint64(m.NodeID)) return i, nil } @@ -870,6 +880,7 @@ func (m *ComponentID) Size() (n int) { n += 1 + l + sovComponentStats(uint64(l)) n += 1 + sovComponentStats(uint64(m.Type)) n += 1 + sovComponentStats(uint64(m.ID)) + n += 1 + sovComponentStats(uint64(m.NodeID)) return n } @@ -1117,6 +1128,25 @@ func (m *ComponentID) Unmarshal(dAtA []byte) error { break } } + case 4: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field NodeID", wireType) + } + m.NodeID = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowComponentStats + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.NodeID |= (github_com_cockroachdb_cockroach_pkg_roachpb.NodeID(b) & 0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipComponentStats(dAtA[iNdEx:]) @@ -2515,67 +2545,70 @@ var ( ) func init() { - proto.RegisterFile("sql/execinfrapb/component_stats.proto", fileDescriptor_component_stats_b759f3ec168a9421) -} - -var fileDescriptor_component_stats_b759f3ec168a9421 = []byte{ - // 916 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x96, 0xcd, 0x6e, 0xdb, 0x46, - 0x10, 0xc7, 0x45, 0x4a, 0xa2, 0xac, 0x51, 0x2c, 0x2b, 0xdb, 0x1c, 0x08, 0xa3, 0x95, 0x1d, 0xb6, - 0x01, 0x84, 0x02, 0x95, 0x51, 0x1f, 0x7a, 0xcb, 0xc1, 0xb2, 0x94, 0x46, 0x71, 0x1c, 0xb9, 0x94, - 0xec, 0x02, 0xb9, 0x10, 0x34, 0xb9, 0x76, 0x16, 0xe2, 0x97, 0xc9, 0xa5, 0x4c, 0xf5, 0x29, 0x7a, - 0x6a, 0x9f, 0xa3, 0xd7, 0x3e, 0x81, 0x8f, 0x39, 0x06, 0x39, 0x18, 0x8d, 0xdc, 0x57, 0xe8, 0xbd, - 0xd8, 0xe5, 0x87, 0x18, 0x17, 0x42, 0xc4, 0x22, 0xb7, 0x85, 0x3c, 0xff, 0xdf, 0xce, 0xcc, 0xfe, - 0x67, 0x68, 0x78, 0x12, 0x5c, 0x59, 0x7b, 0x38, 0xc2, 0x06, 0x71, 0x2e, 0x7c, 0xdd, 0x3b, 0xdf, - 0x33, 0x5c, 0xdb, 0x73, 0x1d, 0xec, 0x50, 0x2d, 0xa0, 0x3a, 0x0d, 0xba, 0x9e, 0xef, 0x52, 0x17, - 0xc9, 0x86, 0x6b, 0x4c, 0x7d, 0x57, 0x37, 0xde, 0x74, 0x83, 0x2b, 0xab, 0x6b, 0x92, 0x80, 0x06, - 0x57, 0x96, 0x1f, 0x3a, 0xdb, 0x8f, 0x2e, 0xdd, 0x4b, 0x97, 0x07, 0xed, 0xb1, 0x53, 0x1c, 0xbf, - 0xfd, 0x65, 0x48, 0x89, 0xb5, 0xe7, 0x7a, 0x94, 0xb8, 0x8e, 0xbe, 0x3c, 0xc4, 0x7f, 0x55, 0xde, - 0x0b, 0xd0, 0x38, 0x4c, 0xef, 0x19, 0xf6, 0xd1, 0xf7, 0x50, 0xbb, 0xb0, 0xdc, 0x6b, 0x8d, 0x98, - 0xb2, 0xb0, 0x2b, 0x74, 0x1e, 0xf4, 0xe4, 0x9b, 0xdb, 0x9d, 0xd2, 0xfb, 0xdb, 0x1d, 0xe9, 0x99, - 0xe5, 0x5e, 0x0f, 0xfb, 0x8b, 0xec, 0xa4, 0x4a, 0x2c, 0x70, 0x68, 0xa2, 0x3e, 0x54, 0xe8, 0xdc, - 0xc3, 0xb2, 0xb8, 0x2b, 0x74, 0x9a, 0xfb, 0xdf, 0x76, 0x57, 0xe5, 0xd7, 0xcd, 0xdd, 0xd3, 0x9d, - 0xcc, 0x3d, 0xdc, 0xab, 0x30, 0xb6, 0xca, 0xd5, 0x68, 0x1b, 0x44, 0x62, 0xca, 0xe5, 0x5d, 0xa1, - 0x53, 0xed, 0x01, 0xfb, 0x7d, 0x71, 0xbb, 0x23, 0x0e, 0xfb, 0xaa, 0x48, 0x4c, 0xe5, 0x07, 0xa8, - 0xb0, 0x78, 0x54, 0x87, 0xea, 0xe9, 0xab, 0xf1, 0x60, 0xd2, 0x2a, 0xa1, 0x4d, 0xa8, 0x9f, 0xa8, - 0xa3, 0xc3, 0xc1, 0x78, 0x3c, 0x52, 0x5b, 0x02, 0x02, 0x90, 0xc6, 0x13, 0x75, 0x70, 0x70, 0xdc, - 0x12, 0xd1, 0x06, 0x54, 0x9e, 0xbd, 0x1c, 0xfd, 0xdc, 0x2a, 0x2b, 0x7f, 0x56, 0xa0, 0x99, 0x5d, - 0x3a, 0x66, 0x3d, 0x44, 0x43, 0xa8, 0x67, 0x6d, 0xe5, 0x15, 0x36, 0xf6, 0x9f, 0xac, 0x95, 0x71, - 0x92, 0xec, 0x52, 0x8d, 0x06, 0x20, 0x39, 0x98, 0x6a, 0x7e, 0xc4, 0x2b, 0x6f, 0xec, 0x77, 0x56, - 0x73, 0x5e, 0x61, 0x7a, 0xed, 0xfa, 0x53, 0x35, 0xe2, 0x49, 0x24, 0xa8, 0xaa, 0x83, 0xa9, 0x1a, - 0xa5, 0x18, 0x1a, 0xf1, 0xe2, 0xd7, 0xc1, 0x4c, 0xfe, 0x83, 0x99, 0x44, 0xe8, 0x29, 0x88, 0xd3, - 0x99, 0x5c, 0xe1, 0x88, 0xc7, 0xab, 0x11, 0x47, 0x67, 0xb1, 0x36, 0x6b, 0xf1, 0xd1, 0x99, 0x2a, - 0x4e, 0x67, 0xe8, 0x29, 0x54, 0x98, 0xf5, 0xe4, 0x2a, 0x07, 0x7c, 0xbd, 0x1a, 0x30, 0x88, 0xb0, - 0x91, 0xbf, 0x9e, 0xcb, 0xd0, 0x21, 0x48, 0x6e, 0x48, 0xbd, 0x90, 0xca, 0xd2, 0xa7, 0x7a, 0x3a, - 0xe2, 0x71, 0x79, 0x44, 0x22, 0x45, 0x3d, 0x90, 0x88, 0xe3, 0x85, 0x34, 0x90, 0x6b, 0xbb, 0xe5, - 0x4e, 0x63, 0xff, 0x9b, 0xd5, 0x90, 0xa1, 0x73, 0x9f, 0x11, 0x2b, 0xd1, 0x73, 0x00, 0xee, 0x5f, - 0x3e, 0x31, 0xf2, 0xc6, 0xa7, 0xaa, 0x61, 0x56, 0xce, 0x63, 0xea, 0x17, 0xe9, 0x0f, 0xca, 0x6f, - 0x02, 0xc0, 0xf2, 0x1a, 0xd4, 0x03, 0x70, 0x42, 0x5b, 0xa3, 0xa1, 0x67, 0xe1, 0x20, 0x71, 0xce, - 0x57, 0x39, 0x30, 0x9b, 0xb2, 0x6e, 0x36, 0x5c, 0xa7, 0xc4, 0xa1, 0x29, 0xd2, 0x09, 0xed, 0x09, - 0x57, 0xa1, 0x3e, 0xd4, 0xaf, 0x75, 0x42, 0x35, 0x4a, 0x6c, 0x9c, 0x98, 0xe6, 0xf1, 0x4a, 0x44, - 0x3f, 0xf4, 0x75, 0x76, 0x4c, 0x30, 0x1b, 0x4c, 0x39, 0x21, 0x36, 0x56, 0x3e, 0x94, 0xa1, 0xf9, - 0xb1, 0xa1, 0xd0, 0x01, 0xd4, 0x2c, 0x9d, 0x62, 0xc7, 0x98, 0x27, 0x99, 0xad, 0x8d, 0x4d, 0x75, - 0x9f, 0x27, 0x37, 0xf4, 0x1a, 0x1e, 0x99, 0x38, 0xc0, 0x3e, 0xd1, 0x2d, 0xf2, 0x0b, 0x0f, 0x89, - 0x81, 0xe5, 0x62, 0xc0, 0x2f, 0xee, 0x41, 0x38, 0xfb, 0x25, 0x6c, 0xc5, 0xdd, 0xd7, 0x7c, 0x6c, - 0x60, 0x32, 0xc3, 0x66, 0x62, 0xf7, 0xb5, 0x9e, 0xa1, 0x19, 0x6b, 0xd5, 0x44, 0x8a, 0x5e, 0x40, - 0xf3, 0x7c, 0x4e, 0xf3, 0xb0, 0xea, 0xfa, 0xb0, 0x4d, 0x2e, 0xcd, 0x58, 0x27, 0xf0, 0xd0, 0xc6, - 0x41, 0xa0, 0x5f, 0xe6, 0x71, 0xd2, 0xfa, 0xb8, 0x56, 0xaa, 0x4e, 0x89, 0xca, 0xdf, 0x42, 0xf6, - 0xc6, 0xc9, 0xb4, 0xa3, 0x3e, 0x34, 0x92, 0xf2, 0x83, 0xe5, 0xee, 0x5a, 0x0b, 0x0f, 0xb1, 0x6e, - 0xcc, 0x96, 0x56, 0x0f, 0x20, 0x2e, 0x9b, 0x43, 0xc4, 0x02, 0x36, 0xe6, 0x32, 0xce, 0x78, 0x0e, - 0x9b, 0x59, 0xb9, 0x1c, 0x53, 0x5e, 0x1f, 0xf3, 0x20, 0x55, 0x32, 0x92, 0xf2, 0x87, 0x08, 0xb5, - 0x64, 0x23, 0x2d, 0x33, 0xf3, 0xb1, 0x6e, 0x16, 0x1a, 0xb0, 0xe4, 0x31, 0x74, 0x33, 0xd7, 0x23, - 0x0e, 0x11, 0x0b, 0xf7, 0x88, 0x53, 0x5e, 0x40, 0x6d, 0x3a, 0x2b, 0xe8, 0xdb, 0x66, 0xb2, 0x4f, - 0xa5, 0xa3, 0x33, 0x66, 0x56, 0x55, 0x9a, 0xce, 0xb8, 0x69, 0x4f, 0x60, 0xcb, 0x70, 0x1d, 0x8a, - 0x9d, 0xe5, 0x2c, 0x54, 0x8a, 0xcd, 0x42, 0x73, 0xa9, 0xe7, 0xe3, 0xff, 0x8f, 0x00, 0xf5, 0x6c, - 0x09, 0xb3, 0xb1, 0x65, 0x0b, 0x38, 0x26, 0x17, 0x9c, 0xfd, 0x0d, 0xa6, 0xe4, 0x59, 0x8e, 0xe0, - 0xa1, 0xad, 0x47, 0x9a, 0x6e, 0x59, 0xae, 0xa1, 0x53, 0x6c, 0x6a, 0x36, 0xb6, 0x8b, 0x74, 0x6f, - 0xcb, 0xd6, 0xa3, 0x83, 0x54, 0x7c, 0x8c, 0x6d, 0xf4, 0x13, 0xa0, 0x8f, 0x81, 0x26, 0x09, 0xa6, - 0x45, 0x7c, 0xd2, 0xca, 0x13, 0xfb, 0x24, 0x98, 0x2a, 0xbf, 0x0b, 0xd0, 0xc8, 0x7d, 0x3b, 0xd8, - 0x5b, 0xb3, 0x85, 0x7c, 0xae, 0x53, 0xe3, 0x4d, 0xb1, 0x8d, 0xcc, 0x16, 0x79, 0x2f, 0x96, 0xdd, - 0x5b, 0xeb, 0xe2, 0xff, 0x59, 0xeb, 0xca, 0x04, 0xea, 0xd9, 0x77, 0x04, 0xfd, 0x08, 0x9b, 0xac, - 0x72, 0x1b, 0xdb, 0x5a, 0xc8, 0x8c, 0x5e, 0x24, 0xb1, 0x86, 0xad, 0x47, 0xc7, 0xd8, 0x3e, 0x65, - 0xba, 0xde, 0x77, 0x37, 0x1f, 0xda, 0xa5, 0x9b, 0x45, 0x5b, 0x78, 0xbb, 0x68, 0x0b, 0xef, 0x16, - 0x6d, 0xe1, 0xaf, 0x45, 0x5b, 0xf8, 0xf5, 0xae, 0x5d, 0x7a, 0x7b, 0xd7, 0x2e, 0xbd, 0xbb, 0x6b, - 0x97, 0x5e, 0x37, 0x72, 0xff, 0x2c, 0xfe, 0x1b, 0x00, 0x00, 0xff, 0xff, 0xd1, 0x93, 0x16, 0x2d, - 0x3e, 0x0a, 0x00, 0x00, + proto.RegisterFile("sql/execinfrapb/component_stats.proto", fileDescriptor_component_stats_1e4b47cb6b511195) +} + +var fileDescriptor_component_stats_1e4b47cb6b511195 = []byte{ + // 970 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x96, 0x4d, 0x6f, 0x1a, 0x47, + 0x18, 0xc7, 0xd9, 0x05, 0x16, 0xf3, 0x10, 0x13, 0x32, 0xcd, 0x01, 0x59, 0x2d, 0x76, 0x68, 0x23, + 0xa1, 0x4a, 0x05, 0x95, 0x4a, 0xbd, 0xe5, 0x60, 0x0c, 0x69, 0x88, 0x63, 0xe3, 0x2e, 0xd8, 0x95, + 0x72, 0x28, 0x5a, 0x76, 0xc7, 0x78, 0xb5, 0x2f, 0xb3, 0xde, 0x9d, 0xc5, 0xeb, 0x7e, 0x8a, 0x9e, + 0xda, 0xcf, 0xd1, 0x6b, 0x3e, 0x81, 0x8f, 0x39, 0x46, 0x3d, 0x58, 0x0d, 0xee, 0x57, 0xe8, 0xa5, + 0xa7, 0x6a, 0x66, 0x5f, 0xd8, 0xb8, 0x42, 0x61, 0xab, 0xde, 0x46, 0xcb, 0xf3, 0xff, 0xcd, 0xf3, + 0x3e, 0xc0, 0x53, 0xef, 0xd2, 0xec, 0xe0, 0x00, 0xab, 0xba, 0x7d, 0xee, 0x2a, 0xce, 0xac, 0xa3, + 0x12, 0xcb, 0x21, 0x36, 0xb6, 0xe9, 0xd4, 0xa3, 0x0a, 0xf5, 0xda, 0x8e, 0x4b, 0x28, 0x41, 0x75, + 0x95, 0xa8, 0x86, 0x4b, 0x14, 0xf5, 0xa2, 0xed, 0x5d, 0x9a, 0x6d, 0x4d, 0xf7, 0xa8, 0x77, 0x69, + 0xba, 0xbe, 0xbd, 0xf3, 0x78, 0x4e, 0xe6, 0x84, 0x1b, 0x75, 0xd8, 0x29, 0xb4, 0xdf, 0xf9, 0xd4, + 0xa7, 0xba, 0xd9, 0x21, 0x0e, 0xd5, 0x89, 0xad, 0xac, 0x0e, 0xe1, 0xaf, 0xcd, 0x37, 0x22, 0x54, + 0x0e, 0xe2, 0x7b, 0x86, 0x7d, 0xf4, 0x35, 0x94, 0xce, 0x4d, 0x72, 0x35, 0xd5, 0xb5, 0xba, 0xb0, + 0x27, 0xb4, 0x1e, 0xf4, 0xea, 0x37, 0xb7, 0xbb, 0xb9, 0xdf, 0x6f, 0x77, 0xa5, 0xe7, 0x26, 0xb9, + 0x1a, 0xf6, 0x97, 0xc9, 0x49, 0x96, 0x98, 0xe1, 0x50, 0x43, 0x7d, 0x28, 0xd0, 0x6b, 0x07, 0xd7, + 0xc5, 0x3d, 0xa1, 0x55, 0xed, 0x7e, 0xd9, 0x5e, 0xe7, 0x5f, 0x3b, 0x75, 0x4f, 0x7b, 0x72, 0xed, + 0xe0, 0x5e, 0x81, 0xb1, 0x65, 0xae, 0x46, 0x3b, 0x20, 0xea, 0x5a, 0x3d, 0xbf, 0x27, 0xb4, 0x8a, + 0x3d, 0x60, 0xdf, 0x97, 0xb7, 0xbb, 0xe2, 0xb0, 0x2f, 0x8b, 0xba, 0x86, 0x7e, 0x84, 0x92, 0x4d, + 0x34, 0xcc, 0x9c, 0x2a, 0x70, 0x83, 0x41, 0x64, 0x20, 0x1d, 0x13, 0x0d, 0x0f, 0xfb, 0x7f, 0xdf, + 0xee, 0x7e, 0x33, 0xd7, 0xe9, 0x85, 0x3f, 0x6b, 0xab, 0xc4, 0xea, 0x24, 0x0e, 0x68, 0xb3, 0xd5, + 0xb9, 0xe3, 0x18, 0xf3, 0x0e, 0x3f, 0x39, 0xb3, 0x76, 0x28, 0x93, 0x25, 0x46, 0x1d, 0x6a, 0xcd, + 0x6f, 0xa1, 0xc0, 0xfc, 0x41, 0x65, 0x28, 0x9e, 0x1e, 0x8f, 0x07, 0x93, 0x5a, 0x0e, 0x6d, 0x43, + 0xf9, 0x44, 0x1e, 0x1d, 0x0c, 0xc6, 0xe3, 0x91, 0x5c, 0x13, 0x10, 0x80, 0x34, 0x9e, 0xc8, 0x83, + 0xfd, 0xa3, 0x9a, 0x88, 0xb6, 0xa0, 0xf0, 0xfc, 0xd5, 0xe8, 0x87, 0x5a, 0xbe, 0xf9, 0xa6, 0x00, + 0xd5, 0x24, 0xa8, 0x31, 0xab, 0x11, 0x1a, 0x42, 0x39, 0x29, 0x1b, 0xcf, 0x60, 0xa5, 0xfb, 0x74, + 0xa3, 0x8c, 0x44, 0xc9, 0x58, 0xa9, 0xd1, 0x00, 0x24, 0x1b, 0xd3, 0xa9, 0x1b, 0xf0, 0xcc, 0x56, + 0xba, 0xad, 0xf5, 0x9c, 0x63, 0x4c, 0xaf, 0x88, 0x6b, 0xc8, 0x01, 0x77, 0x22, 0x42, 0x15, 0x6d, + 0x4c, 0xe5, 0x20, 0xc6, 0xd0, 0x80, 0x27, 0x77, 0x13, 0xcc, 0xe4, 0x5f, 0x98, 0x49, 0x80, 0x9e, + 0x81, 0x68, 0x2c, 0x78, 0xfa, 0x2b, 0xdd, 0x27, 0xeb, 0x11, 0x87, 0x67, 0xa1, 0x36, 0x29, 0xe1, + 0xe1, 0x99, 0x2c, 0x1a, 0x0b, 0xf4, 0x0c, 0x0a, 0xac, 0xb5, 0xeb, 0x45, 0x0e, 0xf8, 0x7c, 0x3d, + 0x60, 0x10, 0x60, 0x35, 0x7d, 0x3d, 0x97, 0xa1, 0x03, 0x90, 0x88, 0x4f, 0x1d, 0x9f, 0xd6, 0xa5, + 0x8f, 0xe5, 0x74, 0xc4, 0xed, 0xd2, 0x88, 0x48, 0x8a, 0x7a, 0x20, 0xe9, 0xb6, 0xe3, 0x53, 0xaf, + 0x5e, 0xda, 0xcb, 0xb7, 0x2a, 0xdd, 0x2f, 0xd6, 0x43, 0x86, 0xf6, 0x7d, 0x46, 0xa8, 0x44, 0x2f, + 0x00, 0xf8, 0x7c, 0xf0, 0x89, 0xac, 0x6f, 0x7d, 0x2c, 0x1a, 0x36, 0x2a, 0x69, 0x4c, 0xf9, 0x3c, + 0xfe, 0xd0, 0xfc, 0x45, 0x00, 0x58, 0x5d, 0x83, 0x7a, 0x00, 0xb6, 0x6f, 0x4d, 0xa9, 0xef, 0x98, + 0xd8, 0x8b, 0x3a, 0xe7, 0xb3, 0x14, 0x98, 0x4d, 0x71, 0x3b, 0x19, 0xde, 0x53, 0xdd, 0xa6, 0x31, + 0xd2, 0xf6, 0xad, 0x09, 0x57, 0xa1, 0x3e, 0x94, 0xaf, 0x14, 0x9d, 0x4e, 0xa9, 0x6e, 0xe1, 0xa8, + 0x69, 0x9e, 0xac, 0x45, 0xf4, 0x7d, 0x57, 0x61, 0xc7, 0x08, 0xb3, 0xc5, 0x94, 0x13, 0xdd, 0xc2, + 0xcd, 0xf7, 0x79, 0xa8, 0x7e, 0xd8, 0x50, 0x68, 0x1f, 0x4a, 0xa6, 0x42, 0xb1, 0xad, 0x5e, 0x47, + 0x9e, 0x6d, 0x8c, 0x8d, 0x75, 0xff, 0x8f, 0x6f, 0xe8, 0x35, 0x3c, 0xd6, 0xb0, 0x87, 0x5d, 0x5d, + 0x31, 0xf5, 0x9f, 0xb8, 0x49, 0x08, 0xcc, 0x67, 0x03, 0x7e, 0x72, 0x0f, 0xc2, 0xd9, 0xaf, 0xe0, + 0x61, 0x98, 0xfd, 0xa9, 0x8b, 0x55, 0xac, 0x2f, 0xb0, 0x16, 0xb5, 0xfb, 0x46, 0x65, 0xa8, 0x86, + 0x5a, 0x39, 0x92, 0xa2, 0x97, 0x50, 0x9d, 0x5d, 0xd3, 0x34, 0xac, 0xb8, 0x39, 0x6c, 0x9b, 0x4b, + 0x13, 0xd6, 0x09, 0x3c, 0xb2, 0xb0, 0xe7, 0x29, 0xf3, 0x34, 0x4e, 0xda, 0x1c, 0x57, 0x8b, 0xd5, + 0x31, 0xb1, 0xf9, 0xa7, 0x90, 0xd4, 0x38, 0x9a, 0x76, 0xd4, 0x87, 0x4a, 0x14, 0xbe, 0xb7, 0xda, + 0x5d, 0x1b, 0xe1, 0x21, 0xd4, 0x8d, 0xd9, 0xd2, 0xea, 0x01, 0x84, 0x61, 0x73, 0x88, 0x98, 0xa1, + 0x8d, 0xb9, 0x8c, 0x33, 0x5e, 0xc0, 0x76, 0x12, 0x2e, 0xc7, 0xe4, 0x37, 0xc7, 0x3c, 0x88, 0x95, + 0x8c, 0xd4, 0xfc, 0x4d, 0x84, 0x52, 0xb4, 0x91, 0x56, 0x9e, 0xb9, 0x58, 0xd1, 0x32, 0x0d, 0x58, + 0x54, 0x0c, 0x45, 0x4b, 0xe5, 0x88, 0x43, 0xc4, 0xcc, 0x39, 0xe2, 0x94, 0x97, 0x50, 0x32, 0x16, + 0x19, 0xfb, 0xb6, 0x1a, 0xbf, 0x78, 0x87, 0x67, 0xac, 0x59, 0x65, 0xc9, 0x58, 0xf0, 0xa6, 0x3d, + 0x81, 0x87, 0x2a, 0xb1, 0x29, 0xb6, 0x57, 0xb3, 0x50, 0xc8, 0x36, 0x0b, 0xd5, 0x95, 0x9e, 0x8f, + 0xff, 0x5f, 0x02, 0x94, 0x93, 0x25, 0xcc, 0xc6, 0x96, 0x2d, 0xe0, 0x90, 0x9c, 0x71, 0xf6, 0xb7, + 0x98, 0x92, 0x7b, 0x39, 0x82, 0x47, 0x96, 0x12, 0x4c, 0x15, 0xd3, 0x24, 0xaa, 0x42, 0xb1, 0x36, + 0xb5, 0xb0, 0x95, 0x25, 0x7b, 0x0f, 0x2d, 0x25, 0xd8, 0x8f, 0xc5, 0x47, 0xd8, 0x42, 0xdf, 0x03, + 0xfa, 0x10, 0xa8, 0xe9, 0x9e, 0x91, 0xa5, 0x4f, 0x6a, 0x69, 0x62, 0x5f, 0xf7, 0x8c, 0xe6, 0xaf, + 0x02, 0x54, 0x52, 0x6f, 0x07, 0xab, 0x35, 0x5b, 0xc8, 0x33, 0x85, 0xaa, 0x17, 0xd9, 0x36, 0x32, + 0x5b, 0xe4, 0xbd, 0x50, 0x76, 0x6f, 0xad, 0x8b, 0xff, 0x65, 0xad, 0x37, 0x27, 0x50, 0x4e, 0xde, + 0x11, 0xf4, 0x1d, 0x6c, 0xb3, 0xc8, 0x2d, 0x6c, 0x4d, 0x7d, 0xd6, 0xe8, 0x59, 0x1c, 0xab, 0x58, + 0x4a, 0x70, 0x84, 0xad, 0x53, 0xa6, 0xeb, 0x7d, 0x75, 0xf3, 0xbe, 0x91, 0xbb, 0x59, 0x36, 0x84, + 0xb7, 0xcb, 0x86, 0xf0, 0x6e, 0xd9, 0x10, 0xfe, 0x58, 0x36, 0x84, 0x9f, 0xef, 0x1a, 0xb9, 0xb7, + 0x77, 0x8d, 0xdc, 0xbb, 0xbb, 0x46, 0xee, 0x75, 0x25, 0xf5, 0x67, 0xf4, 0x9f, 0x00, 0x00, 0x00, + 0xff, 0xff, 0xce, 0xc1, 0x00, 0xbc, 0x9e, 0x0a, 0x00, 0x00, } diff --git a/pkg/sql/execinfrapb/component_stats.proto b/pkg/sql/execinfrapb/component_stats.proto index 68031f925b7..de63f1f2d7d 100644 --- a/pkg/sql/execinfrapb/component_stats.proto +++ b/pkg/sql/execinfrapb/component_stats.proto @@ -47,6 +47,14 @@ message ComponentID { // type. optional int32 id = 3 [(gogoproto.nullable) = false, (gogoproto.customname) = "ID"]; + + // NodeID of the node this component ran on. + // TODO(asubiotto): This is only used when Type = FLOW to uniquely describe + // a flow (since flows on different nodes still have the same FlowID). Use + // this for processors/streams as well. + optional int32 node_id = 4 [(gogoproto.nullable) = false, + (gogoproto.customname) = "NodeID", + (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.NodeID"]; } // ComponentStats contains statistics for an execution component. A component is diff --git a/pkg/sql/flowinfra/BUILD.bazel b/pkg/sql/flowinfra/BUILD.bazel index f0ac867af18..61031acd07c 100644 --- a/pkg/sql/flowinfra/BUILD.bazel +++ b/pkg/sql/flowinfra/BUILD.bazel @@ -31,6 +31,7 @@ go_library( "//pkg/util/cancelchecker", "//pkg/util/contextutil", "//pkg/util/log", + "//pkg/util/optional", "//pkg/util/stop", "//pkg/util/syncutil", "//pkg/util/timeutil", diff --git a/pkg/sql/flowinfra/flow.go b/pkg/sql/flowinfra/flow.go index 584369a963b..954f000cdff 100644 --- a/pkg/sql/flowinfra/flow.go +++ b/pkg/sql/flowinfra/flow.go @@ -15,11 +15,13 @@ import ( "sync" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/util/cancelchecker" "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/optional" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" ) @@ -426,6 +428,20 @@ func (f *FlowBase) Cleanup(ctx context.Context) { f.TypeResolverFactory.CleanupFunc(ctx) } + if f.Gateway { + // If this is the gateway node, output the maximum memory usage to the flow + // span. Note that non-gateway nodes use the last outbox to send this + // information over. + if sp := tracing.SpanFromContext(ctx); sp != nil { + sp.SetSpanStats(&execinfrapb.ComponentStats{ + Component: execinfrapb.FlowComponentID(roachpb.NodeID(f.NodeID.SQLInstanceID()), f.FlowCtx.ID), + FlowStats: execinfrapb.FlowStats{ + MaxMemUsage: optional.MakeUint(uint64(f.FlowCtx.EvalCtx.Mon.MaximumBytes())), + }, + }) + } + } + // This closes the monitor opened in ServerImpl.setupFlow. f.EvalCtx.Stop(ctx) for _, p := range f.processors { diff --git a/pkg/sql/flowinfra/outbox.go b/pkg/sql/flowinfra/outbox.go index 130abf7af7a..ab198fb4aa7 100644 --- a/pkg/sql/flowinfra/outbox.go +++ b/pkg/sql/flowinfra/outbox.go @@ -68,8 +68,12 @@ type Outbox struct { // numOutboxes is an atomic that keeps track of how many outboxes are left. // When there is one outbox left, the flow-level stats are added to the last - // outbox's span stats. + // outbox's span stats unless isGatewayNode is true, in which case, the flow + // will do so in its Cleanup method. numOutboxes *int32 + + // isGatewayNode specifies whether this outbox is running on the gateway node. + isGatewayNode bool } var _ execinfra.RowReceiver = &Outbox{} @@ -81,11 +85,13 @@ func NewOutbox( nodeID roachpb.NodeID, streamID execinfrapb.StreamID, numOutboxes *int32, + isGatewayNode bool, ) *Outbox { m := &Outbox{flowCtx: flowCtx, nodeID: nodeID} m.encoder.SetHeaderFields(flowCtx.ID, streamID) m.streamID = streamID m.numOutboxes = numOutboxes + m.isGatewayNode = isGatewayNode m.stats.Component = flowCtx.StreamComponentID(streamID) return m } @@ -292,7 +298,7 @@ func (m *Outbox) mainLoop(ctx context.Context) error { if err != nil { return err } - if m.numOutboxes != nil && atomic.AddInt32(m.numOutboxes, -1) == 0 { + if !m.isGatewayNode && m.numOutboxes != nil && atomic.AddInt32(m.numOutboxes, -1) == 0 { // TODO(cathymw): maxMemUsage shouldn't be attached to span stats that are associated with streams, // since it's a flow level stat. However, due to the row exec engine infrastructure, it is too // complicated to attach this to a flow level span. If the row exec engine gets removed, getting diff --git a/pkg/sql/flowinfra/outbox_test.go b/pkg/sql/flowinfra/outbox_test.go index 226744c4896..944678aefa4 100644 --- a/pkg/sql/flowinfra/outbox_test.go +++ b/pkg/sql/flowinfra/outbox_test.go @@ -73,7 +73,7 @@ func TestOutbox(t *testing.T) { }, } streamID := execinfrapb.StreamID(42) - outbox := NewOutbox(&flowCtx, execinfra.StaticNodeID, streamID, nil /* numOutboxes */) + outbox := NewOutbox(&flowCtx, execinfra.StaticNodeID, streamID, nil /* numOutboxes */, false /* isGatewayNode */) outbox.Init(rowenc.OneIntCol) var outboxWG sync.WaitGroup ctx, cancel := context.WithCancel(context.Background()) @@ -229,7 +229,7 @@ func TestOutboxInitializesStreamBeforeReceivingAnyRows(t *testing.T) { }, } streamID := execinfrapb.StreamID(42) - outbox := NewOutbox(&flowCtx, execinfra.StaticNodeID, streamID, nil /* numOutboxes */) + outbox := NewOutbox(&flowCtx, execinfra.StaticNodeID, streamID, nil /* numOutboxes */, false /* isGatewayNode */) var outboxWG sync.WaitGroup ctx, cancel := context.WithCancel(context.Background()) @@ -310,7 +310,7 @@ func TestOutboxClosesWhenConsumerCloses(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() if tc.outboxIsClient { - outbox = NewOutbox(&flowCtx, execinfra.StaticNodeID, streamID, nil /* numOutboxes */) + outbox = NewOutbox(&flowCtx, execinfra.StaticNodeID, streamID, nil /* numOutboxes */, false /* isGatewayNode */) outbox.Init(rowenc.OneIntCol) outbox.Start(ctx, &wg, cancel) @@ -451,7 +451,7 @@ func TestOutboxCancelsFlowOnError(t *testing.T) { ctxCanceled = true } - outbox = NewOutbox(&flowCtx, execinfra.StaticNodeID, streamID, nil /* numOutboxes */) + outbox = NewOutbox(&flowCtx, execinfra.StaticNodeID, streamID, nil /* numOutboxes */, false /* isGatewayNode */) outbox.Init(rowenc.OneIntCol) outbox.Start(ctx, &wg, mockCancel) @@ -497,7 +497,7 @@ func TestOutboxUnblocksProducers(t *testing.T) { ctx, cancel := context.WithCancel(ctx) defer cancel() - outbox = NewOutbox(&flowCtx, execinfra.StaticNodeID, streamID, nil /* numOutboxes */) + outbox = NewOutbox(&flowCtx, execinfra.StaticNodeID, streamID, nil /* numOutboxes */, false /* isGatewayNode */) outbox.Init(rowenc.OneIntCol) // Fill up the outbox. @@ -561,7 +561,7 @@ func BenchmarkOutbox(b *testing.B) { NodeDialer: nodedialer.New(clientRPC, staticAddressResolver(addr)), }, } - outbox := NewOutbox(&flowCtx, execinfra.StaticNodeID, streamID, nil /* numOutboxes */) + outbox := NewOutbox(&flowCtx, execinfra.StaticNodeID, streamID, nil /* numOutboxes */, false /* isGatewayNode */) outbox.Init(rowenc.MakeIntCols(numCols)) var outboxWG sync.WaitGroup ctx, cancel := context.WithCancel(context.Background()) diff --git a/pkg/sql/rowflow/row_based_flow.go b/pkg/sql/rowflow/row_based_flow.go index 53901a84832..f6f5cc633b7 100644 --- a/pkg/sql/rowflow/row_based_flow.go +++ b/pkg/sql/rowflow/row_based_flow.go @@ -382,7 +382,7 @@ func (f *rowBasedFlow) setupOutboundStream( case execinfrapb.StreamEndpointSpec_REMOTE: atomic.AddInt32(&f.numOutboxes, 1) - outbox := flowinfra.NewOutbox(&f.FlowCtx, spec.TargetNodeID, sid, &f.numOutboxes) + outbox := flowinfra.NewOutbox(&f.FlowCtx, spec.TargetNodeID, sid, &f.numOutboxes, f.FlowCtx.Gateway) f.AddStartable(outbox) return outbox, nil