Skip to content

Commit

Permalink
Merge pull request #120561 from cockroachdb/blathers/backport-release…
Browse files Browse the repository at this point in the history
…-23.2-120557

release-23.2: distsql: fix disk monitor leak on flow setup error (#120561)

Co-Authored-By: Yahor Yuzefovich <yahor@cockroachlabs.com>
  • Loading branch information
yuzefovich and yuzefovich committed Mar 15, 2024
2 parents 2242972 + e4ffb1c commit 51dee74
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 12 deletions.
20 changes: 10 additions & 10 deletions pkg/sql/distsql/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,16 +219,19 @@ func (ds *ServerImpl) setupFlow(
batchSyncFlowConsumer execinfra.BatchReceiver,
localState LocalState,
) (retCtx context.Context, _ flowinfra.Flow, _ execopnode.OpChains, retErr error) {
var sp *tracing.Span // will be Finish()ed by Flow.Cleanup()
var monitor *mon.BytesMonitor // will be closed in Flow.Cleanup()
var onFlowCleanupEnd func() // will be called at the very end of Flow.Cleanup()
var sp *tracing.Span // will be Finish()ed by Flow.Cleanup()
var monitor, diskMonitor *mon.BytesMonitor // will be closed in Flow.Cleanup()
var onFlowCleanupEnd func() // will be called at the very end of Flow.Cleanup()
// Make sure that we clean up all resources (which in the happy case are
// cleaned up in Flow.Cleanup()) if an error is encountered.
defer func() {
if retErr != nil {
if monitor != nil {
monitor.Stop(ctx)
}
if diskMonitor != nil {
diskMonitor.Stop(ctx)
}
if onFlowCleanupEnd != nil {
onFlowCleanupEnd()
} else {
Expand Down Expand Up @@ -282,6 +285,7 @@ func (ds *ServerImpl) setupFlow(
ds.Settings,
)
monitor.Start(ctx, parentMonitor, reserved)
diskMonitor = execinfra.NewMonitor(ctx, ds.ParentDiskMonitor, "flow-disk-monitor")

makeLeaf := func() (*kv.Txn, error) {
tis := req.LeafTxnInputState
Expand Down Expand Up @@ -384,7 +388,7 @@ func (ds *ServerImpl) setupFlow(

// Create the FlowCtx for the flow.
flowCtx := ds.newFlowContext(
ctx, req.Flow.FlowID, evalCtx, monitor, makeLeaf, req.TraceKV,
ctx, req.Flow.FlowID, evalCtx, monitor, diskMonitor, makeLeaf, req.TraceKV,
req.CollectStats, localState, req.Flow.Gateway == ds.NodeID.SQLInstanceID(),
)

Expand Down Expand Up @@ -473,7 +477,7 @@ func (ds *ServerImpl) newFlowContext(
ctx context.Context,
id execinfrapb.FlowID,
evalCtx *eval.Context,
monitor *mon.BytesMonitor,
monitor, diskMonitor *mon.BytesMonitor,
makeLeafTxn func() (*kv.Txn, error),
traceKV bool,
collectStats bool,
Expand All @@ -494,11 +498,7 @@ func (ds *ServerImpl) newFlowContext(
CollectStats: collectStats,
Local: localState.IsLocal,
Gateway: isGatewayNode,
// The flow disk monitor is a child of the server's and is closed on
// Cleanup.
DiskMonitor: execinfra.NewMonitor(
ctx, ds.ParentDiskMonitor, "flow-disk-monitor",
),
DiskMonitor: diskMonitor,
}

if localState.IsLocal && localState.Collection != nil {
Expand Down
3 changes: 1 addition & 2 deletions pkg/sql/flowinfra/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -725,8 +725,7 @@ func (f *FlowBase) Cleanup(ctx context.Context) {
}
}

// This closes the disk monitor opened in newFlowContext as well as the
// memory monitor opened in ServerImpl.setupFlow.
// This closes the monitors opened in ServerImpl.setupFlow.
if r := recover(); r != nil {
f.DiskMonitor.EmergencyStop(ctx)
f.Mon.EmergencyStop(ctx)
Expand Down

0 comments on commit 51dee74

Please sign in to comment.