From e4ffb1c4757663bd43053b9511bffde25d563485 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Fri, 15 Mar 2024 16:38:36 +0000 Subject: [PATCH] distsql: fix disk monitor leak on flow setup error Previously, it was possible for the flow disk monitor to be created in `newFlowContext` and never stopped. This monitor is connected to the long-living temp storage monitor, which accumulates all of its children as of recently, so this would become a memory leak that is now fixed. There is no release note given that the leak occurs only during error conditions that should be rare. Epic: None Release note: None --- pkg/sql/distsql/server.go | 20 ++++++++++---------- pkg/sql/flowinfra/flow.go | 3 +-- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/pkg/sql/distsql/server.go b/pkg/sql/distsql/server.go index a60950ba7c39..e7a87185cd20 100644 --- a/pkg/sql/distsql/server.go +++ b/pkg/sql/distsql/server.go @@ -219,9 +219,9 @@ func (ds *ServerImpl) setupFlow( batchSyncFlowConsumer execinfra.BatchReceiver, localState LocalState, ) (retCtx context.Context, _ flowinfra.Flow, _ execopnode.OpChains, retErr error) { - var sp *tracing.Span // will be Finish()ed by Flow.Cleanup() - var monitor *mon.BytesMonitor // will be closed in Flow.Cleanup() - var onFlowCleanupEnd func() // will be called at the very end of Flow.Cleanup() + var sp *tracing.Span // will be Finish()ed by Flow.Cleanup() + var monitor, diskMonitor *mon.BytesMonitor // will be closed in Flow.Cleanup() + var onFlowCleanupEnd func() // will be called at the very end of Flow.Cleanup() // Make sure that we clean up all resources (which in the happy case are // cleaned up in Flow.Cleanup()) if an error is encountered. defer func() { @@ -229,6 +229,9 @@ func (ds *ServerImpl) setupFlow( if monitor != nil { monitor.Stop(ctx) } + if diskMonitor != nil { + diskMonitor.Stop(ctx) + } if onFlowCleanupEnd != nil { onFlowCleanupEnd() } else { @@ -282,6 +285,7 @@ func (ds *ServerImpl) setupFlow( ds.Settings, ) monitor.Start(ctx, parentMonitor, reserved) + diskMonitor = execinfra.NewMonitor(ctx, ds.ParentDiskMonitor, "flow-disk-monitor") makeLeaf := func() (*kv.Txn, error) { tis := req.LeafTxnInputState @@ -384,7 +388,7 @@ func (ds *ServerImpl) setupFlow( // Create the FlowCtx for the flow. flowCtx := ds.newFlowContext( - ctx, req.Flow.FlowID, evalCtx, monitor, makeLeaf, req.TraceKV, + ctx, req.Flow.FlowID, evalCtx, monitor, diskMonitor, makeLeaf, req.TraceKV, req.CollectStats, localState, req.Flow.Gateway == ds.NodeID.SQLInstanceID(), ) @@ -473,7 +477,7 @@ func (ds *ServerImpl) newFlowContext( ctx context.Context, id execinfrapb.FlowID, evalCtx *eval.Context, - monitor *mon.BytesMonitor, + monitor, diskMonitor *mon.BytesMonitor, makeLeafTxn func() (*kv.Txn, error), traceKV bool, collectStats bool, @@ -494,11 +498,7 @@ func (ds *ServerImpl) newFlowContext( CollectStats: collectStats, Local: localState.IsLocal, Gateway: isGatewayNode, - // The flow disk monitor is a child of the server's and is closed on - // Cleanup. - DiskMonitor: execinfra.NewMonitor( - ctx, ds.ParentDiskMonitor, "flow-disk-monitor", - ), + DiskMonitor: diskMonitor, } if localState.IsLocal && localState.Collection != nil { diff --git a/pkg/sql/flowinfra/flow.go b/pkg/sql/flowinfra/flow.go index bd5e4512e76f..38cbfefc72ed 100644 --- a/pkg/sql/flowinfra/flow.go +++ b/pkg/sql/flowinfra/flow.go @@ -725,8 +725,7 @@ func (f *FlowBase) Cleanup(ctx context.Context) { } } - // This closes the disk monitor opened in newFlowContext as well as the - // memory monitor opened in ServerImpl.setupFlow. + // This closes the monitors opened in ServerImpl.setupFlow. if r := recover(); r != nil { f.DiskMonitor.EmergencyStop(ctx) f.Mon.EmergencyStop(ctx)