Skip to content

Commit

Permalink
distsql: fix disk monitor leak on flow setup error
Browse files Browse the repository at this point in the history
Previously, it was possible for the flow disk monitor to be created in
`newFlowContext` and never stopped. This monitor is connected to the
long-living temp storage monitor, which accumulates all of its children
as of recently, so this would become a memory leak that is now fixed.

There is no release note given that the leak occurs only during error
conditions that should be rare.

Epic: None

Release note: None
  • Loading branch information
yuzefovich committed Mar 15, 2024
1 parent 2242972 commit e4ffb1c
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 12 deletions.
20 changes: 10 additions & 10 deletions pkg/sql/distsql/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,16 +219,19 @@ func (ds *ServerImpl) setupFlow(
batchSyncFlowConsumer execinfra.BatchReceiver,
localState LocalState,
) (retCtx context.Context, _ flowinfra.Flow, _ execopnode.OpChains, retErr error) {
var sp *tracing.Span // will be Finish()ed by Flow.Cleanup()
var monitor *mon.BytesMonitor // will be closed in Flow.Cleanup()
var onFlowCleanupEnd func() // will be called at the very end of Flow.Cleanup()
var sp *tracing.Span // will be Finish()ed by Flow.Cleanup()
var monitor, diskMonitor *mon.BytesMonitor // will be closed in Flow.Cleanup()
var onFlowCleanupEnd func() // will be called at the very end of Flow.Cleanup()
// Make sure that we clean up all resources (which in the happy case are
// cleaned up in Flow.Cleanup()) if an error is encountered.
defer func() {
if retErr != nil {
if monitor != nil {
monitor.Stop(ctx)
}
if diskMonitor != nil {
diskMonitor.Stop(ctx)
}
if onFlowCleanupEnd != nil {
onFlowCleanupEnd()
} else {
Expand Down Expand Up @@ -282,6 +285,7 @@ func (ds *ServerImpl) setupFlow(
ds.Settings,
)
monitor.Start(ctx, parentMonitor, reserved)
diskMonitor = execinfra.NewMonitor(ctx, ds.ParentDiskMonitor, "flow-disk-monitor")

makeLeaf := func() (*kv.Txn, error) {
tis := req.LeafTxnInputState
Expand Down Expand Up @@ -384,7 +388,7 @@ func (ds *ServerImpl) setupFlow(

// Create the FlowCtx for the flow.
flowCtx := ds.newFlowContext(
ctx, req.Flow.FlowID, evalCtx, monitor, makeLeaf, req.TraceKV,
ctx, req.Flow.FlowID, evalCtx, monitor, diskMonitor, makeLeaf, req.TraceKV,
req.CollectStats, localState, req.Flow.Gateway == ds.NodeID.SQLInstanceID(),
)

Expand Down Expand Up @@ -473,7 +477,7 @@ func (ds *ServerImpl) newFlowContext(
ctx context.Context,
id execinfrapb.FlowID,
evalCtx *eval.Context,
monitor *mon.BytesMonitor,
monitor, diskMonitor *mon.BytesMonitor,
makeLeafTxn func() (*kv.Txn, error),
traceKV bool,
collectStats bool,
Expand All @@ -494,11 +498,7 @@ func (ds *ServerImpl) newFlowContext(
CollectStats: collectStats,
Local: localState.IsLocal,
Gateway: isGatewayNode,
// The flow disk monitor is a child of the server's and is closed on
// Cleanup.
DiskMonitor: execinfra.NewMonitor(
ctx, ds.ParentDiskMonitor, "flow-disk-monitor",
),
DiskMonitor: diskMonitor,
}

if localState.IsLocal && localState.Collection != nil {
Expand Down
3 changes: 1 addition & 2 deletions pkg/sql/flowinfra/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -725,8 +725,7 @@ func (f *FlowBase) Cleanup(ctx context.Context) {
}
}

// This closes the disk monitor opened in newFlowContext as well as the
// memory monitor opened in ServerImpl.setupFlow.
// This closes the monitors opened in ServerImpl.setupFlow.
if r := recover(); r != nil {
f.DiskMonitor.EmergencyStop(ctx)
f.Mon.EmergencyStop(ctx)
Expand Down

0 comments on commit e4ffb1c

Please sign in to comment.