Skip to content

Commit

Permalink
Merge #64453
Browse files Browse the repository at this point in the history
64453: colexecargs: fix recent memory leak r=yuzefovich a=yuzefovich

**colfetcher: set all unneeded vectors to all nulls**

The cFetcher creates a batch with the same schema as the table it is
reading from. In many cases not all columns from the table are needed
for the query, so as an important perfomance optimization the cFetcher
doesn't decode the data for those columns. As a result, such unneeded
columns are left "unset". This works ok in most cases, however, if we
attempt to materialize such a batch with unset vectors, the conversion
to datums might encounter errors (e.g. UUID values must either be NULL
or have 16 bytes length).

This commit improves this situation slightly by tracking the set of
unneeded columns and setting those vectors to all NULL values. This
will allow to simplify the planning code a bit in the follow-up commit.

Release note: None

**colbuilder: remove unnecessary complication when wrapping table reader**

Previously, we had some custom code for the case when we supported the
table reader core but not the post-processing spec - we attempted to
revert to the core-pre-planning state and plan the whole table reader
with render expressions on top.

Given the previous commit, I think this is no longer necessary, so this
commit removes that special code in favor of the general handling of
only the post-processing spec via a noop processor. This commit was
prompted by some complications because of this old code for the
follow-up commit.

Release note: None

**colexecargs: fix recent memory leak**

In c3b1617 we introduced a new utility
struct that keeps information about the meta objects in the operator
tree. Those meta objects are tracked by several slices which are
resliced to be of length 0 when the "outer" object is released back to
the corresponding pool. However, the slices still end up holding
references to the old meta objects prohibiting those from being GCed.
Such a behavior results in a memory leak. This commit fixes the issue by
explicitly resetting the slices for reuse.

Fixes: #62320.
Fixes: #64093.

Release note: None

**sql: audit implementations of Releasable interface of slices' reuse**

This commit performed the audit of all slices that are kept by
components implementing `execinfra.Releasable` interface to make sure
that the slices that might be referencing large objects are deeply
reset. (By deep reset I mean all slots are set to `nil` so that the
possibly large objects could be garbage-collected.) This was prompted by
the previous commit which fixed a recent regression, but this commit
seems like a good idea on its own, and it might be worth backporting it
too.

Release note: None

Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
  • Loading branch information
craig[bot] and yuzefovich committed May 5, 2021
2 parents 8ac4cf4 + 99153b4 commit ecf484d
Show file tree
Hide file tree
Showing 11 changed files with 131 additions and 83 deletions.
11 changes: 10 additions & 1 deletion pkg/sql/colconv/vec_to_datum.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 10 additions & 1 deletion pkg/sql/colconv/vec_to_datum_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,17 @@ func NewAllVecToDatumConverter(batchWidth int) *VecToDatumConverter {

// Release is part of the execinfra.Releasable interface.
func (c *VecToDatumConverter) Release() {
// Deeply reset the converted vectors so that we don't hold onto the old
// datums.
for _, vec := range c.convertedVecs {
for i := range vec {
//gcassert:bce
vec[i] = nil
}
}
*c = VecToDatumConverter{
convertedVecs: c.convertedVecs[:0],
convertedVecs: c.convertedVecs[:0],
// This slice is of integers, so there is no need to reset it deeply.
vecIdxsToConvert: c.vecIdxsToConvert[:0],
}
vecToDatumConverterPool.Put(c)
Expand Down
81 changes: 10 additions & 71 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,44 +72,34 @@ func wrapRowSources(
var toWrapInputs []execinfra.RowSource
var releasables []execinfra.Releasable
for i := range inputs {
inputInfo := &inputs[i]
// Optimization: if the input is a Columnarizer, its input is
// necessarily a execinfra.RowSource, so remove the unnecessary
// conversion.
if c, ok := inputInfo.Root.(*colexec.Columnarizer); ok {
if c, ok := inputs[i].Root.(*colexec.Columnarizer); ok {
// Since this Columnarizer has been previously added to Closers and
// MetadataSources, this call ensures that all future calls are noops.
// Modifying the slices at this stage is difficult.
c.MarkAsRemovedFromFlow()
toWrapInputs = append(toWrapInputs, c.Input())
} else {
inputInfoCopy := *inputInfo
// We pass on the ownership over the meta components to the
// materializer.
// TODO(yuzefovich): possibly set the length to 0 in order to be
// able to pool the underlying slices.
inputInfo.StatsCollectors = nil
inputInfo.MetadataSources = nil
inputInfo.ToClose = nil
// Note that this materializer is *not* added to the set of
// releasables because in some cases it could be released before
// being closed. Namely, this would occur if we have a subquery
// with LocalPlanNode core and a materializer is added in order to
// wrap that core - what will happen is that all releasables are put
// back into their pools upon the subquery's flow cleanup, yet the
// subquery planNode tree isn't closed yet since its closure is down
// when the main planNode tree is being closed.
toWrapInput, err := colexec.NewMaterializer(
flowCtx,
processorID,
inputInfoCopy,
inputs[i],
inputTypes[i],
nil, /* output */
nil, /* cancelFlow */
)
if err != nil {
return nil, releasables, err
}
// We passed the ownership over the meta components to the
// materializer.
// TODO(yuzefovich): possibly set the length to 0 in order to be
// able to pool the underlying slices.
inputs[i].StatsCollectors = nil
inputs[i].MetadataSources = nil
inputs[i].ToClose = nil
toWrapInputs = append(toWrapInputs, toWrapInput)
if materializerSafeToRelease {
releasables = append(releasables, toWrapInput)
Expand Down Expand Up @@ -143,38 +133,6 @@ type opResult struct {
*colexecargs.NewColOperatorResult
}

// resetToState resets r to the state specified in arg. arg may be a shallow
// copy made at a given point in time.
func (r *opResult) resetToState(ctx context.Context, arg colexecargs.NewColOperatorResult) {
// MetadataSources are left untouched since there is no need to do any
// cleaning there.

// Close BoundAccounts that are not present in arg.OpAccounts.
accs := make(map[*mon.BoundAccount]struct{})
for _, a := range arg.OpAccounts {
accs[a] = struct{}{}
}
for _, a := range r.OpAccounts {
if _, ok := accs[a]; !ok {
a.Close(ctx)
}
}
// Stop BytesMonitors that are not present in arg.OpMonitors.
mons := make(map[*mon.BytesMonitor]struct{})
for _, m := range arg.OpMonitors {
mons[m] = struct{}{}
}

for _, m := range r.OpMonitors {
if _, ok := mons[m]; !ok {
m.Stop(ctx)
}
}

// Shallow copy over the rest.
*r.NewColOperatorResult = arg
}

func needHashAggregator(aggSpec *execinfrapb.AggregatorSpec) (bool, error) {
var groupCols, orderedCols util.FastIntSet
for _, col := range aggSpec.OrderedGroupCols {
Expand Down Expand Up @@ -720,10 +678,6 @@ func NewColOperator(
core := &spec.Core
post := &spec.Post

// resultPreSpecPlanningStateShallowCopy is a shallow copy of the result
// before any specs are planned. Used if there is a need to backtrack.
resultPreSpecPlanningStateShallowCopy := *r

if err = supportedNatively(spec); err != nil {
if wrapErr := canWrap(flowCtx.EvalCtx.SessionData.VectorizeMode, spec); wrapErr != nil {
// Return the original error for why we don't support this spec
Expand Down Expand Up @@ -1337,22 +1291,7 @@ func NewColOperator(
err, post,
)
}
if core.TableReader != nil {
// We cannot naively wrap a TableReader's post-processing spec since
// it might project out unneeded columns with unset values. These
// columns are still returned, and if we were to wrap an unsupported
// post-processing spec, a Materializer would naively decode these
// columns, which would return errors (e.g. UUIDs require 16 bytes).
inputTypes := make([][]*types.T, len(spec.Input))
for inputIdx, input := range spec.Input {
inputTypes[inputIdx] = make([]*types.T, len(input.ColumnTypes))
copy(inputTypes[inputIdx], input.ColumnTypes)
}
result.resetToState(ctx, resultPreSpecPlanningStateShallowCopy)
err = result.createAndWrapRowSource(ctx, flowCtx, args, inputs, inputTypes, spec, factory, err)
} else {
err = result.wrapPostProcessSpec(ctx, flowCtx, args, post, args.Spec.ResultTypes, factory, err)
}
err = result.wrapPostProcessSpec(ctx, flowCtx, args, post, args.Spec.ResultTypes, factory, err)
} else {
// The result can be updated with the post process result.
result.updateWithPostProcessResult(ppr)
Expand Down
26 changes: 24 additions & 2 deletions pkg/sql/colexec/colexecargs/op_creation.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,33 @@ func (r *NewColOperatorResult) Release() {
for _, releasable := range r.Releasables {
releasable.Release()
}
// Explicitly unset each slot in the slices of objects of non-trivial size
// in order to lose references to the old objects. If we don't do it, we
// might have a memory leak in case the slices aren't appended to for a
// while (because we're slicing them up to 0 below, the references to the
// old objects would be kept "alive" until the spot in the slice is
// overwritten by a new object).
for i := range r.StatsCollectors {
r.StatsCollectors[i] = nil
}
for i := range r.MetadataSources {
r.MetadataSources[i] = nil
}
for i := range r.ToClose {
r.ToClose[i] = nil
}
for i := range r.Releasables {
r.Releasables[i] = nil
}
*r = NewColOperatorResult{
OpWithMetaInfo: OpWithMetaInfo{
MetadataSources: r.OpWithMetaInfo.MetadataSources[:0],
ToClose: r.OpWithMetaInfo.ToClose[:0],
StatsCollectors: r.StatsCollectors[:0],
MetadataSources: r.MetadataSources[:0],
ToClose: r.ToClose[:0],
},
// There is no need to deeply reset the column types and the memory
// monitoring infra slices because these objects are very tiny in the
// grand scheme of things.
ColumnTypes: r.ColumnTypes[:0],
OpMonitors: r.OpMonitors[:0],
OpAccounts: r.OpAccounts[:0],
Expand Down
34 changes: 33 additions & 1 deletion pkg/sql/colfetcher/cfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ type cTableInfo struct {
// The set of required value-component column ordinals in the table.
neededValueColsByIdx util.FastIntSet

// The set of ordinals of the columns that are **not** required. cFetcher
// creates an output batch that includes all columns in cols, yet only
// needed columns are actually populated. The vectors at positions in
// notNeededColOrdinals will be set to have all null values.
notNeededColOrdinals []int

// Map used to get the index for columns in cols.
// It's kept as a pointer so we don't have to re-allocate to sort it each
// time.
Expand Down Expand Up @@ -135,13 +141,16 @@ func newCTableInfo() *cTableInfo {

// Release implements the execinfra.Releasable interface.
func (c *cTableInfo) Release() {
// Note that all slices are being reused, but there is no need to deeply
// reset them since all of the slices are of Go native types.
c.colIdxMap.ords = c.colIdxMap.ords[:0]
c.colIdxMap.vals = c.colIdxMap.vals[:0]
*c = cTableInfo{
colIdxMap: c.colIdxMap,
keyValTypes: c.keyValTypes[:0],
extraTypes: c.extraTypes[:0],
neededColsList: c.neededColsList[:0],
notNeededColOrdinals: c.notNeededColOrdinals[:0],
indexColOrdinals: c.indexColOrdinals[:0],
allIndexColOrdinals: c.allIndexColOrdinals[:0],
extraValColOrdinals: c.extraValColOrdinals[:0],
Expand Down Expand Up @@ -453,6 +462,17 @@ func (rf *cFetcher) Init(
}
sort.Ints(table.neededColsList)

// Find the set of columns for which vectors will **not** be properly
// populated.
if numNeededCols := tableArgs.ValNeededForCol.Len(); cap(table.notNeededColOrdinals) < len(rf.typs)-numNeededCols {
table.notNeededColOrdinals = make([]int, 0, len(rf.typs)-numNeededCols)
}
for i := 0; i < len(rf.typs); i++ {
if !tableArgs.ValNeededForCol.Contains(i) {
table.notNeededColOrdinals = append(table.notNeededColOrdinals, i)
}
}

table.knownPrefixLength = len(rowenc.MakeIndexKeyPrefix(codec, table.desc, table.index.GetID()))

var indexColumnIDs []descpb.ColumnID
Expand Down Expand Up @@ -1093,7 +1113,7 @@ func (rf *cFetcher) nextBatch(ctx context.Context) (coldata.Batch, error) {

if emitBatch {
rf.pushState(stateResetBatch)
rf.machine.batch.SetLength(rf.machine.rowIdx)
rf.finalizeBatch()
rf.machine.rowIdx = 0
return rf.machine.batch, nil
}
Expand Down Expand Up @@ -1475,6 +1495,16 @@ func (rf *cFetcher) fillNulls() error {
return nil
}

func (rf *cFetcher) finalizeBatch() {
// We need to set all values in "not needed" vectors to nulls because if the
// batch is materialized (i.e. values are converted to datums), the
// conversion of unset values might encounter an error.
for _, notNeededIdx := range rf.table.notNeededColOrdinals {
rf.machine.batch.ColVec(notNeededIdx).Nulls().SetNulls()
}
rf.machine.batch.SetLength(rf.machine.rowIdx)
}

// getCurrentColumnFamilyID returns the column family id of the key in
// rf.machine.nextKV.Key.
func (rf *cFetcher) getCurrentColumnFamilyID() (descpb.FamilyID, error) {
Expand Down Expand Up @@ -1536,6 +1566,8 @@ var cFetcherPool = sync.Pool{
func (rf *cFetcher) Release() {
rf.table.Release()
*rf = cFetcher{
// The types are small objects, so we don't bother deeply resetting this
// slice.
typs: rf.typs[:0],
}
cFetcherPool.Put(rf)
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/colfetcher/colbatch_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,10 @@ func initCRowFetcher(
// Release implements the execinfra.Releasable interface.
func (s *ColBatchScan) Release() {
s.rf.Release()
// Deeply reset the spans so that we don't hold onto the keys of the spans.
for i := range s.spans {
s.spans[i] = roachpb.Span{}
}
*s = ColBatchScan{
spans: s.spans[:0],
}
Expand Down
30 changes: 25 additions & 5 deletions pkg/sql/colflow/vectorized_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,15 +590,28 @@ func (s *vectorizedFlowCreator) Release() {
for _, r := range s.releasables {
r.Release()
}
// Deeply reset slices that might point to the objects of non-trivial size
// so that the old references don't interfere with the objects being
// garbage-collected.
for i := range s.opChains {
s.opChains[i] = nil
}
for i := range s.releasables {
s.releasables[i] = nil
}
*s = vectorizedFlowCreator{
streamIDToInputOp: s.streamIDToInputOp,
streamIDToSpecIdx: s.streamIDToSpecIdx,
exprHelper: s.exprHelper,
procIdxQueue: s.procIdxQueue[:0],
opChains: s.opChains[:0],
monitors: s.monitors[:0],
accounts: s.accounts[:0],
releasables: s.releasables[:0],
// procIdxQueue is a slice of ints, so it's ok to just slice up to 0 to
// prime it for reuse.
procIdxQueue: s.procIdxQueue[:0],
opChains: s.opChains[:0],
// There is no need to deeply reset the memory monitoring infra slices
// because these objects are very tiny in the grand scheme of things.
monitors: s.monitors[:0],
accounts: s.accounts[:0],
releasables: s.releasables[:0],
}
vectorizedFlowCreatorPool.Put(s)
}
Expand Down Expand Up @@ -1244,6 +1257,13 @@ func (r *vectorizedFlowCreatorHelper) getCancelFlowFn() context.CancelFunc {
}

func (r *vectorizedFlowCreatorHelper) Release() {
// Note that processors here can only be of 0 or 1 length, but always of
// 1 capacity (only the root materializer can be appended to this
// slice). Unset the slot so that we don't keep the reference to the old
// materializer.
if len(r.processors) == 1 {
r.processors[0] = nil
}
*r = vectorizedFlowCreatorHelper{
processors: r.processors[:0],
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/execinfra/processorsbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,13 @@ func (pb *ProcessorBase) MustBeStreaming() bool {
// Reset resets this ProcessorBase, retaining allocated memory in slices.
func (pb *ProcessorBase) Reset() {
pb.Out.Reset()
// Deeply reset the slices so that we don't hold onto the old objects.
for i := range pb.trailingMeta {
pb.trailingMeta[i] = execinfrapb.ProducerMetadata{}
}
for i := range pb.inputsToDrain {
pb.inputsToDrain[i] = nil
}
*pb = ProcessorBase{
Out: pb.Out,
trailingMeta: pb.trailingMeta[:0],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ EXPLAIN (VEC) SELECT k::REGCLASS FROM kv
----
└ Node 1
└ *rowexec.tableReader
└ *rowexec.noopProcessor
└ *colfetcher.ColBatchScan

statement ok
SET disable_partially_distributed_plans = false
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/logictest/testdata/logic_test/vectorize_overloads
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,8 @@ EXPLAIN (VEC) SELECT _time + _date FROM many_types
----
└ Node 1
└ *rowexec.tableReader
└ *rowexec.noopProcessor
└ *colfetcher.ColBatchScan

# Regression #50261 (not handling constant datum-backed values on the left
# correctly).
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/rowexec/tablereader.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,10 @@ func (tr *tableReader) startScan(ctx context.Context) error {
func (tr *tableReader) Release() {
tr.ProcessorBase.Reset()
tr.fetcher.Reset()
// Deeply reset the spans so that we don't hold onto the keys of the spans.
for i := range tr.spans {
tr.spans[i] = roachpb.Span{}
}
*tr = tableReader{
ProcessorBase: tr.ProcessorBase,
fetcher: tr.fetcher,
Expand Down

0 comments on commit ecf484d

Please sign in to comment.