diff --git a/executor/builder.go b/executor/builder.go index 3c68b6124db1a..e85d1682d8718 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -108,6 +108,7 @@ type executorBuilder struct { type CTEStorages struct { ResTbl cteutil.Storage IterInTbl cteutil.Storage + Producer *cteProducer } func newExecutorBuilder(ctx sessionctx.Context, is infoschema.InfoSchema, ti *TelemetryInfo) *executorBuilder { @@ -5297,33 +5298,39 @@ func (b *executorBuilder) buildTableSample(v *plannercore.PhysicalTableSample) * } func (b *executorBuilder) buildCTE(v *plannercore.PhysicalCTE) Executor { - // 1. Build seedPlan. if b.Ti != nil { b.Ti.UseNonRecursive = true } - seedExec := b.build(v.SeedPlan) - if b.err != nil { - return nil + if v.RecurPlan != nil && b.Ti != nil { + b.Ti.UseRecursive = true } - // 2. Build tables to store intermediate results. - chkSize := b.ctx.GetSessionVars().MaxChunkSize - tps := seedExec.base().retFieldTypes - // iterOutTbl will be constructed in CTEExec.Open(). - var resTbl cteutil.Storage - var iterInTbl cteutil.Storage - storageMap, ok := b.ctx.GetSessionVars().StmtCtx.CTEStorageMap.(map[int]*CTEStorages) if !ok { b.err = errors.New("type assertion for CTEStorageMap failed") return nil } + + chkSize := b.ctx.GetSessionVars().MaxChunkSize + // iterOutTbl will be constructed in CTEExec.Open(). + var resTbl cteutil.Storage + var iterInTbl cteutil.Storage + var producer *cteProducer storages, ok := storageMap[v.CTE.IDForStorage] if ok { // Storage already setup. resTbl = storages.ResTbl iterInTbl = storages.IterInTbl + producer = storages.Producer } else { + // Build seed part. + seedExec := b.build(v.SeedPlan) + if b.err != nil { + return nil + } + + // Setup storages. + tps := seedExec.base().retFieldTypes resTbl = cteutil.NewStorageRowContainer(tps, chkSize) if err := resTbl.OpenAndRef(); err != nil { b.err = err @@ -5335,38 +5342,39 @@ func (b *executorBuilder) buildCTE(v *plannercore.PhysicalCTE) Executor { return nil } storageMap[v.CTE.IDForStorage] = &CTEStorages{ResTbl: resTbl, IterInTbl: iterInTbl} - } - // 3. Build recursive part. - if v.RecurPlan != nil && b.Ti != nil { - b.Ti.UseRecursive = true - } - recursiveExec := b.build(v.RecurPlan) - if b.err != nil { - return nil - } + // Build recursive part. + recursiveExec := b.build(v.RecurPlan) + if b.err != nil { + return nil + } + var sel []int + if v.CTE.IsDistinct { + sel = make([]int, chkSize) + for i := 0; i < chkSize; i++ { + sel[i] = i + } + } - var sel []int - if v.CTE.IsDistinct { - sel = make([]int, chkSize) - for i := 0; i < chkSize; i++ { - sel[i] = i + producer = &cteProducer{ + ctx: b.ctx, + seedExec: seedExec, + recursiveExec: recursiveExec, + resTbl: resTbl, + iterInTbl: iterInTbl, + isDistinct: v.CTE.IsDistinct, + sel: sel, + hasLimit: v.CTE.HasLimit, + limitBeg: v.CTE.LimitBeg, + limitEnd: v.CTE.LimitEnd, + isInApply: v.CTE.IsInApply, } + storageMap[v.CTE.IDForStorage].Producer = producer } return &CTEExec{ - baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()), - seedExec: seedExec, - recursiveExec: recursiveExec, - resTbl: resTbl, - iterInTbl: iterInTbl, - chkIdx: 0, - isDistinct: v.CTE.IsDistinct, - sel: sel, - hasLimit: v.CTE.HasLimit, - limitBeg: v.CTE.LimitBeg, - limitEnd: v.CTE.LimitEnd, - isInApply: v.CTE.IsInApply, + baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()), + producer: producer, } } diff --git a/executor/cte.go b/executor/cte.go index 569d59298d9c6..101ed42dd610c 100644 --- a/executor/cte.go +++ b/executor/cte.go @@ -63,6 +63,73 @@ var _ Executor = &CTEExec{} type CTEExec struct { baseExecutor + chkIdx int + producer *cteProducer + + // limit in recursive CTE. + cursor uint64 + meetFirstBatch bool +} + +// Open implements the Executor interface. +func (e *CTEExec) Open(ctx context.Context) (err error) { + e.reset() + if err := e.baseExecutor.Open(ctx); err != nil { + return err + } + + e.producer.resTbl.Lock() + defer e.producer.resTbl.Unlock() + + if e.producer.isInApply { + e.producer.reset() + } + if !e.producer.opened { + if err = e.producer.openProducer(ctx, e); err != nil { + return err + } + } + return nil +} + +// Next implements the Executor interface. +func (e *CTEExec) Next(ctx context.Context, req *chunk.Chunk) (err error) { + e.producer.resTbl.Lock() + defer e.producer.resTbl.Unlock() + if !e.producer.resTbl.Done() { + if err = e.producer.produce(ctx, e); err != nil { + return err + } + } + return e.producer.getChunk(ctx, e, req) +} + +// Close implements the Executor interface. +func (e *CTEExec) Close() (err error) { + e.producer.resTbl.Lock() + if !e.producer.closed { + err = e.producer.closeProducer() + } + e.producer.resTbl.Unlock() + if err != nil { + return err + } + return e.baseExecutor.Close() +} + +func (e *CTEExec) reset() { + e.chkIdx = 0 + e.cursor = 0 + e.meetFirstBatch = false +} + +type cteProducer struct { + opened bool + produced bool + closed bool + + ctx sessionctx.Context + seedExec Executor recursiveExec Executor @@ -74,9 +141,6 @@ type CTEExec struct { hashTbl baseHashTable - // Index of chunk to read from `resTbl`. - chkIdx int - // UNION ALL or UNION DISTINCT. isDistinct bool curIter int @@ -84,11 +148,9 @@ type CTEExec struct { sel []int // Limit related info. - hasLimit bool - limitBeg uint64 - limitEnd uint64 - cursor uint64 - meetFirstBatch bool + hasLimit bool + limitBeg uint64 + limitEnd uint64 memTracker *memory.Tracker diskTracker *disk.Tracker @@ -99,99 +161,84 @@ type CTEExec struct { isInApply bool } -// Open implements the Executor interface. -func (e *CTEExec) Open(ctx context.Context) (err error) { - e.reset() - if err := e.baseExecutor.Open(ctx); err != nil { - return err - } - - if e.seedExec == nil { +func (p *cteProducer) openProducer(ctx context.Context, cteExec *CTEExec) (err error) { + if p.seedExec == nil { return errors.New("seedExec for CTEExec is nil") } - if err = e.seedExec.Open(ctx); err != nil { + if err = p.seedExec.Open(ctx); err != nil { return err } - if e.memTracker != nil { - e.memTracker.Reset() + if p.memTracker != nil { + p.memTracker.Reset() } else { - e.memTracker = memory.NewTracker(e.id, -1) + p.memTracker = memory.NewTracker(cteExec.id, -1) } - e.diskTracker = disk.NewTracker(e.id, -1) - e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) - e.diskTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.DiskTracker) + p.diskTracker = disk.NewTracker(cteExec.id, -1) + p.memTracker.AttachTo(p.ctx.GetSessionVars().StmtCtx.MemTracker) + p.diskTracker.AttachTo(p.ctx.GetSessionVars().StmtCtx.DiskTracker) - if e.recursiveExec != nil { - if err = e.recursiveExec.Open(ctx); err != nil { + if p.recursiveExec != nil { + if err = p.recursiveExec.Open(ctx); err != nil { return err } // For non-recursive CTE, the result will be put into resTbl directly. // So no need to build iterOutTbl. // Construct iterOutTbl in Open() instead of buildCTE(), because its destruct is in Close(). - recursiveTypes := e.recursiveExec.base().retFieldTypes - e.iterOutTbl = cteutil.NewStorageRowContainer(recursiveTypes, e.maxChunkSize) - if err = e.iterOutTbl.OpenAndRef(); err != nil { + recursiveTypes := p.recursiveExec.base().retFieldTypes + p.iterOutTbl = cteutil.NewStorageRowContainer(recursiveTypes, cteExec.maxChunkSize) + if err = p.iterOutTbl.OpenAndRef(); err != nil { return err } } - if e.isDistinct { - e.hashTbl = newConcurrentMapHashTable() - e.hCtx = &hashContext{ - allTypes: e.base().retFieldTypes, + if p.isDistinct { + p.hashTbl = newConcurrentMapHashTable() + p.hCtx = &hashContext{ + allTypes: cteExec.base().retFieldTypes, } // We use all columns to compute hash. - e.hCtx.keyColIdx = make([]int, len(e.hCtx.allTypes)) - for i := range e.hCtx.keyColIdx { - e.hCtx.keyColIdx[i] = i + p.hCtx.keyColIdx = make([]int, len(p.hCtx.allTypes)) + for i := range p.hCtx.keyColIdx { + p.hCtx.keyColIdx[i] = i } } + p.opened = true return nil } -// Next implements the Executor interface. -func (e *CTEExec) Next(ctx context.Context, req *chunk.Chunk) (err error) { - req.Reset() - e.resTbl.Lock() - defer e.resTbl.Unlock() - if !e.resTbl.Done() { - if e.resTbl.Error() != nil { - return e.resTbl.Error() - } - resAction := setupCTEStorageTracker(e.resTbl, e.ctx, e.memTracker, e.diskTracker) - iterInAction := setupCTEStorageTracker(e.iterInTbl, e.ctx, e.memTracker, e.diskTracker) - var iterOutAction *chunk.SpillDiskAction - if e.iterOutTbl != nil { - iterOutAction = setupCTEStorageTracker(e.iterOutTbl, e.ctx, e.memTracker, e.diskTracker) +func (p *cteProducer) closeProducer() (err error) { + if err = p.seedExec.Close(); err != nil { + return err + } + if p.recursiveExec != nil { + if err = p.recursiveExec.Close(); err != nil { + return err } - - failpoint.Inject("testCTEStorageSpill", func(val failpoint.Value) { - if val.(bool) && variable.EnableTmpStorageOnOOM.Load() { - defer resAction.WaitForTest() - defer iterInAction.WaitForTest() - if iterOutAction != nil { - defer iterOutAction.WaitForTest() - } + // `iterInTbl` and `resTbl` are shared by multiple operators, + // so will be closed when the SQL finishes. + if p.iterOutTbl != nil { + if err = p.iterOutTbl.DerefAndClose(); err != nil { + return err } - }) - - if err = e.computeSeedPart(ctx); err != nil { - e.resTbl.SetError(err) - return err } - if err = e.computeRecursivePart(ctx); err != nil { - e.resTbl.SetError(err) + } + p.closed = true + if p.isInApply { + if err = p.reopenTbls(); err != nil { return err } - e.resTbl.SetDone() } + return nil +} - if e.hasLimit { - return e.nextChunkLimit(req) +func (p *cteProducer) getChunk(ctx context.Context, cteExec *CTEExec, req *chunk.Chunk) (err error) { + req.Reset() + if p.hasLimit { + return p.nextChunkLimit(cteExec, req) } - if e.chkIdx < e.resTbl.NumChunks() { - res, err := e.resTbl.GetChunk(e.chkIdx) + if cteExec.chkIdx < p.resTbl.NumChunks() { + res, err := p.resTbl.GetChunk(cteExec.chkIdx) if err != nil { return err } @@ -199,59 +246,110 @@ func (e *CTEExec) Next(ctx context.Context, req *chunk.Chunk) (err error) { // Also we ignore copying rows not selected, because some operators like Projection // doesn't support swap column if chunk.sel is no nil. req.SwapColumns(res.CopyConstructSel()) - e.chkIdx++ + cteExec.chkIdx++ } return nil } -// Close implements the Executor interface. -func (e *CTEExec) Close() (err error) { - e.reset() - if err = e.seedExec.Close(); err != nil { - return err - } - if e.recursiveExec != nil { - if err = e.recursiveExec.Close(); err != nil { - return err - } - // `iterInTbl` and `resTbl` are shared by multiple operators, - // so will be closed when the SQL finishes. - if e.iterOutTbl != nil { - if err = e.iterOutTbl.DerefAndClose(); err != nil { +func (p *cteProducer) nextChunkLimit(cteExec *CTEExec, req *chunk.Chunk) error { + if !cteExec.meetFirstBatch { + for cteExec.chkIdx < p.resTbl.NumChunks() { + res, err := p.resTbl.GetChunk(cteExec.chkIdx) + if err != nil { return err } + cteExec.chkIdx++ + numRows := uint64(res.NumRows()) + if newCursor := cteExec.cursor + numRows; newCursor >= p.limitBeg { + cteExec.meetFirstBatch = true + begInChk, endInChk := p.limitBeg-cteExec.cursor, numRows + if newCursor > p.limitEnd { + endInChk = p.limitEnd - cteExec.cursor + } + cteExec.cursor += endInChk + if begInChk == endInChk { + break + } + tmpChk := res.CopyConstructSel() + req.Append(tmpChk, int(begInChk), int(endInChk)) + return nil + } + cteExec.cursor += numRows } } - if e.isInApply { - if err = e.reopenTbls(); err != nil { + if cteExec.chkIdx < p.resTbl.NumChunks() && cteExec.cursor < p.limitEnd { + res, err := p.resTbl.GetChunk(cteExec.chkIdx) + if err != nil { return err } + cteExec.chkIdx++ + numRows := uint64(res.NumRows()) + if cteExec.cursor+numRows > p.limitEnd { + numRows = p.limitEnd - cteExec.cursor + req.Append(res.CopyConstructSel(), 0, int(numRows)) + } else { + req.SwapColumns(res.CopyConstructSel()) + } + cteExec.cursor += numRows } - return e.baseExecutor.Close() + return nil +} + +func (p *cteProducer) produce(ctx context.Context, cteExec *CTEExec) (err error) { + if p.resTbl.Error() != nil { + return p.resTbl.Error() + } + resAction := setupCTEStorageTracker(p.resTbl, cteExec.ctx, p.memTracker, p.diskTracker) + iterInAction := setupCTEStorageTracker(p.iterInTbl, cteExec.ctx, p.memTracker, p.diskTracker) + var iterOutAction *chunk.SpillDiskAction + if p.iterOutTbl != nil { + iterOutAction = setupCTEStorageTracker(p.iterOutTbl, cteExec.ctx, p.memTracker, p.diskTracker) + } + + failpoint.Inject("testCTEStorageSpill", func(val failpoint.Value) { + if val.(bool) && variable.EnableTmpStorageOnOOM.Load() { + defer resAction.WaitForTest() + defer iterInAction.WaitForTest() + if iterOutAction != nil { + defer iterOutAction.WaitForTest() + } + } + }) + + if err = p.computeSeedPart(ctx); err != nil { + p.resTbl.SetError(err) + return err + } + if err = p.computeRecursivePart(ctx); err != nil { + p.resTbl.SetError(err) + return err + } + p.resTbl.SetDone() + return nil } -func (e *CTEExec) computeSeedPart(ctx context.Context) (err error) { +func (p *cteProducer) computeSeedPart(ctx context.Context) (err error) { defer func() { if r := recover(); r != nil && err == nil { err = errors.Errorf("%v", r) } }() failpoint.Inject("testCTESeedPanic", nil) - e.curIter = 0 - e.iterInTbl.SetIter(e.curIter) + p.curIter = 0 + p.iterInTbl.SetIter(p.curIter) chks := make([]*chunk.Chunk, 0, 10) for { - if e.limitDone(e.iterInTbl) { + if p.limitDone(p.iterInTbl) { break } - chk := tryNewCacheChunk(e.seedExec) - if err = Next(ctx, e.seedExec, chk); err != nil { + chk := tryNewCacheChunk(p.seedExec) + if err = Next(ctx, p.seedExec, chk); err != nil { return } if chk.NumRows() == 0 { break } - if chk, err = e.tryDedupAndAdd(chk, e.iterInTbl, e.hashTbl); err != nil { + if chk, err = p.tryDedupAndAdd(chk, p.iterInTbl, p.hashTbl); err != nil { return } chks = append(chks, chk) @@ -259,66 +357,66 @@ func (e *CTEExec) computeSeedPart(ctx context.Context) (err error) { // Initial resTbl is empty, so no need to deduplicate chk using resTbl. // Just adding is ok. for _, chk := range chks { - if err = e.resTbl.Add(chk); err != nil { + if err = p.resTbl.Add(chk); err != nil { return } } - e.curIter++ - e.iterInTbl.SetIter(e.curIter) + p.curIter++ + p.iterInTbl.SetIter(p.curIter) return } -func (e *CTEExec) computeRecursivePart(ctx context.Context) (err error) { +func (p *cteProducer) computeRecursivePart(ctx context.Context) (err error) { defer func() { if r := recover(); r != nil && err == nil { err = errors.Errorf("%v", r) } }() failpoint.Inject("testCTERecursivePanic", nil) - if e.recursiveExec == nil || e.iterInTbl.NumChunks() == 0 { + if p.recursiveExec == nil || p.iterInTbl.NumChunks() == 0 { return } - if e.curIter > e.ctx.GetSessionVars().CTEMaxRecursionDepth { - return exeerrors.ErrCTEMaxRecursionDepth.GenWithStackByArgs(e.curIter) + if p.curIter > p.ctx.GetSessionVars().CTEMaxRecursionDepth { + return exeerrors.ErrCTEMaxRecursionDepth.GenWithStackByArgs(p.curIter) } - if e.limitDone(e.resTbl) { + if p.limitDone(p.resTbl) { return } for { - chk := tryNewCacheChunk(e.recursiveExec) - if err = Next(ctx, e.recursiveExec, chk); err != nil { + chk := tryNewCacheChunk(p.recursiveExec) + if err = Next(ctx, p.recursiveExec, chk); err != nil { return } if chk.NumRows() == 0 { - if err = e.setupTblsForNewIteration(); err != nil { + if err = p.setupTblsForNewIteration(); err != nil { return } - if e.limitDone(e.resTbl) { + if p.limitDone(p.resTbl) { break } - if e.iterInTbl.NumChunks() == 0 { + if p.iterInTbl.NumChunks() == 0 { break } // Next iteration begins. Need use iterOutTbl as input of next iteration. - e.curIter++ - e.iterInTbl.SetIter(e.curIter) - if e.curIter > e.ctx.GetSessionVars().CTEMaxRecursionDepth { - return exeerrors.ErrCTEMaxRecursionDepth.GenWithStackByArgs(e.curIter) + p.curIter++ + p.iterInTbl.SetIter(p.curIter) + if p.curIter > p.ctx.GetSessionVars().CTEMaxRecursionDepth { + return exeerrors.ErrCTEMaxRecursionDepth.GenWithStackByArgs(p.curIter) } // Make sure iterInTbl is setup before Close/Open, // because some executors will read iterInTbl in Open() (like IndexLookupJoin). - if err = e.recursiveExec.Close(); err != nil { + if err = p.recursiveExec.Close(); err != nil { return } - if err = e.recursiveExec.Open(ctx); err != nil { + if err = p.recursiveExec.Open(ctx); err != nil { return } } else { - if err = e.iterOutTbl.Add(chk); err != nil { + if err = p.iterOutTbl.Add(chk); err != nil { return } } @@ -326,67 +424,22 @@ func (e *CTEExec) computeRecursivePart(ctx context.Context) (err error) { return } -// Get next chunk from resTbl for limit. -func (e *CTEExec) nextChunkLimit(req *chunk.Chunk) error { - if !e.meetFirstBatch { - for e.chkIdx < e.resTbl.NumChunks() { - res, err := e.resTbl.GetChunk(e.chkIdx) - if err != nil { - return err - } - e.chkIdx++ - numRows := uint64(res.NumRows()) - if newCursor := e.cursor + numRows; newCursor >= e.limitBeg { - e.meetFirstBatch = true - begInChk, endInChk := e.limitBeg-e.cursor, numRows - if newCursor > e.limitEnd { - endInChk = e.limitEnd - e.cursor - } - e.cursor += endInChk - if begInChk == endInChk { - break - } - tmpChk := res.CopyConstructSel() - req.Append(tmpChk, int(begInChk), int(endInChk)) - return nil - } - e.cursor += numRows - } - } - if e.chkIdx < e.resTbl.NumChunks() && e.cursor < e.limitEnd { - res, err := e.resTbl.GetChunk(e.chkIdx) - if err != nil { - return err - } - e.chkIdx++ - numRows := uint64(res.NumRows()) - if e.cursor+numRows > e.limitEnd { - numRows = e.limitEnd - e.cursor - req.Append(res.CopyConstructSel(), 0, int(numRows)) - } else { - req.SwapColumns(res.CopyConstructSel()) - } - e.cursor += numRows - } - return nil -} - -func (e *CTEExec) setupTblsForNewIteration() (err error) { - num := e.iterOutTbl.NumChunks() +func (p *cteProducer) setupTblsForNewIteration() (err error) { + num := p.iterOutTbl.NumChunks() chks := make([]*chunk.Chunk, 0, num) // Setup resTbl's data. for i := 0; i < num; i++ { - chk, err := e.iterOutTbl.GetChunk(i) + chk, err := p.iterOutTbl.GetChunk(i) if err != nil { return err } // Data should be copied in UNION DISTINCT. // Because deduplicate() will change data in iterOutTbl, // which will cause panic when spilling data into disk concurrently. - if e.isDistinct { + if p.isDistinct { chk = chk.CopyConstruct() } - chk, err = e.tryDedupAndAdd(chk, e.resTbl, e.hashTbl) + chk, err = p.tryDedupAndAdd(chk, p.resTbl, p.hashTbl) if err != nil { return err } @@ -394,47 +447,48 @@ func (e *CTEExec) setupTblsForNewIteration() (err error) { } // Setup new iteration data in iterInTbl. - if err = e.iterInTbl.Reopen(); err != nil { + if err = p.iterInTbl.Reopen(); err != nil { return err } - if e.isDistinct { + if p.isDistinct { // Already deduplicated by resTbl, adding directly is ok. for _, chk := range chks { - if err = e.iterInTbl.Add(chk); err != nil { + if err = p.iterInTbl.Add(chk); err != nil { return err } } } else { - if err = e.iterInTbl.SwapData(e.iterOutTbl); err != nil { + if err = p.iterInTbl.SwapData(p.iterOutTbl); err != nil { return err } } // Clear data in iterOutTbl. - return e.iterOutTbl.Reopen() + return p.iterOutTbl.Reopen() } -func (e *CTEExec) reset() { - e.curIter = 0 - e.chkIdx = 0 - e.hashTbl = nil - e.cursor = 0 - e.meetFirstBatch = false +func (p *cteProducer) reset() { + p.curIter = 0 + p.hashTbl = nil + + p.opened = false + p.produced = false + p.closed = false } -func (e *CTEExec) reopenTbls() (err error) { - if e.isDistinct { - e.hashTbl = newConcurrentMapHashTable() +func (p *cteProducer) reopenTbls() (err error) { + if p.isDistinct { + p.hashTbl = newConcurrentMapHashTable() } - if err := e.resTbl.Reopen(); err != nil { + if err := p.resTbl.Reopen(); err != nil { return err } - return e.iterInTbl.Reopen() + return p.iterInTbl.Reopen() } // Check if tbl meets the requirement of limit. -func (e *CTEExec) limitDone(tbl cteutil.Storage) bool { - return e.hasLimit && uint64(tbl.NumRows()) >= e.limitEnd +func (p *cteProducer) limitDone(tbl cteutil.Storage) bool { + return p.hasLimit && uint64(tbl.NumRows()) >= p.limitEnd } func setupCTEStorageTracker(tbl cteutil.Storage, ctx sessionctx.Context, parentMemTracker *memory.Tracker, @@ -459,11 +513,11 @@ func setupCTEStorageTracker(tbl cteutil.Storage, ctx sessionctx.Context, parentM return actionSpill } -func (e *CTEExec) tryDedupAndAdd(chk *chunk.Chunk, +func (p *cteProducer) tryDedupAndAdd(chk *chunk.Chunk, storage cteutil.Storage, hashTbl baseHashTable) (res *chunk.Chunk, err error) { - if e.isDistinct { - if chk, err = e.deduplicate(chk, storage, hashTbl); err != nil { + if p.isDistinct { + if chk, err = p.deduplicate(chk, storage, hashTbl); err != nil { return nil, err } } @@ -472,12 +526,12 @@ func (e *CTEExec) tryDedupAndAdd(chk *chunk.Chunk, // Compute hash values in chk and put it in hCtx.hashVals. // Use the returned sel to choose the computed hash values. -func (e *CTEExec) computeChunkHash(chk *chunk.Chunk) (sel []int, err error) { +func (p *cteProducer) computeChunkHash(chk *chunk.Chunk) (sel []int, err error) { numRows := chk.NumRows() - e.hCtx.initHash(numRows) + p.hCtx.initHash(numRows) // Continue to reset to make sure all hasher is new. - for i := numRows; i < len(e.hCtx.hashVals); i++ { - e.hCtx.hashVals[i].Reset() + for i := numRows; i < len(p.hCtx.hashVals); i++ { + p.hCtx.hashVals[i].Reset() } sel = chk.Sel() var hashBitMap []bool @@ -489,12 +543,12 @@ func (e *CTEExec) computeChunkHash(chk *chunk.Chunk) (sel []int, err error) { } else { // All rows is selected, sel will be [0....numRows). // e.sel is setup when building executor. - sel = e.sel + sel = p.sel } for i := 0; i < chk.NumCols(); i++ { - if err = codec.HashChunkSelected(e.ctx.GetSessionVars().StmtCtx, e.hCtx.hashVals, - chk, e.hCtx.allTypes[i], i, e.hCtx.buf, e.hCtx.hasNull, + if err = codec.HashChunkSelected(p.ctx.GetSessionVars().StmtCtx, p.hCtx.hashVals, + chk, p.hCtx.allTypes[i], i, p.hCtx.buf, p.hCtx.hasNull, hashBitMap, false); err != nil { return nil, err } @@ -504,7 +558,7 @@ func (e *CTEExec) computeChunkHash(chk *chunk.Chunk) (sel []int, err error) { // Use hashTbl to deduplicate rows, and unique rows will be added to hashTbl. // Duplicated rows are only marked to be removed by sel in Chunk, instead of really deleted. -func (e *CTEExec) deduplicate(chk *chunk.Chunk, +func (p *cteProducer) deduplicate(chk *chunk.Chunk, storage cteutil.Storage, hashTbl baseHashTable) (chkNoDup *chunk.Chunk, err error) { numRows := chk.NumRows() @@ -514,7 +568,7 @@ func (e *CTEExec) deduplicate(chk *chunk.Chunk, // 1. Compute hash values for chunk. chkHashTbl := newConcurrentMapHashTable() - selOri, err := e.computeChunkHash(chk) + selOri, err := p.computeChunkHash(chk) if err != nil { return nil, err } @@ -523,10 +577,10 @@ func (e *CTEExec) deduplicate(chk *chunk.Chunk, // This sel is for filtering rows duplicated in cur chk. selChk := make([]int, 0, numRows) for i := 0; i < numRows; i++ { - key := e.hCtx.hashVals[selOri[i]].Sum64() + key := p.hCtx.hashVals[selOri[i]].Sum64() row := chk.GetRow(i) - hasDup, err := e.checkHasDup(key, row, chk, storage, chkHashTbl) + hasDup, err := p.checkHasDup(key, row, chk, storage, chkHashTbl) if err != nil { return nil, err } @@ -546,10 +600,10 @@ func (e *CTEExec) deduplicate(chk *chunk.Chunk, // This sel is for filtering rows duplicated in cteutil.Storage. selStorage := make([]int, 0, len(selChk)) for i := 0; i < len(selChk); i++ { - key := e.hCtx.hashVals[selChk[i]].Sum64() + key := p.hCtx.hashVals[selChk[i]].Sum64() row := chk.GetRow(i) - hasDup, err := e.checkHasDup(key, row, nil, storage, hashTbl) + hasDup, err := p.checkHasDup(key, row, nil, storage, hashTbl) if err != nil { return nil, err } @@ -570,7 +624,7 @@ func (e *CTEExec) deduplicate(chk *chunk.Chunk, // Use the row's probe key to check if it already exists in chk or storage. // We also need to compare the row's real encoding value to avoid hash collision. -func (e *CTEExec) checkHasDup(probeKey uint64, +func (p *cteProducer) checkHasDup(probeKey uint64, row chunk.Row, curChk *chunk.Chunk, storage cteutil.Storage, @@ -591,9 +645,9 @@ func (e *CTEExec) checkHasDup(probeKey uint64, if err != nil { return false, err } - isEqual, err := codec.EqualChunkRow(e.ctx.GetSessionVars().StmtCtx, - row, e.hCtx.allTypes, e.hCtx.keyColIdx, - matchedRow, e.hCtx.allTypes, e.hCtx.keyColIdx) + isEqual, err := codec.EqualChunkRow(p.ctx.GetSessionVars().StmtCtx, + row, p.hCtx.allTypes, p.hCtx.keyColIdx, + matchedRow, p.hCtx.allTypes, p.hCtx.keyColIdx) if err != nil { return false, err } diff --git a/executor/cte_test.go b/executor/cte_test.go index da69ca170aa3d..0d34fcd5f177a 100644 --- a/executor/cte_test.go +++ b/executor/cte_test.go @@ -485,3 +485,18 @@ func TestCTEDelSpillFile(t *testing.T) { tk.MustExec("insert into t1 (c1, c2) with recursive cte1 as (select c1 from t2 union select cte1.c1 + 1 from cte1 where cte1.c1 < 100000) select cte1.c1, cte1.c1+1 from cte1;") require.Nil(t, tk.Session().GetSessionVars().StmtCtx.CTEStorageMap) } + +func TestCTEShareCorColumn(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test;") + tk.MustExec("drop table if exists t1, t2;") + tk.MustExec("create table t1(c1 int, c2 varchar(100));") + tk.MustExec("insert into t1 values(1, '2020-10-10');") + tk.MustExec("create table t2(c1 int, c2 date);") + tk.MustExec("insert into t2 values(1, '2020-10-10');") + for i := 0; i < 100; i++ { + tk.MustQuery("with cte1 as (select t1.c1, (select t2.c2 from t2 where t2.c2 = str_to_date(t1.c2, '%Y-%m-%d')) from t1 inner join t2 on t1.c1 = t2.c1) select /*+ hash_join_build(alias1) */ * from cte1 alias1 inner join cte1 alias2 on alias1.c1 = alias2.c1;").Check(testkit.Rows("1 2020-10-10 1 2020-10-10")) + tk.MustQuery("with cte1 as (select t1.c1, (select t2.c2 from t2 where t2.c2 = str_to_date(t1.c2, '%Y-%m-%d')) from t1 inner join t2 on t1.c1 = t2.c1) select /*+ hash_join_build(alias2) */ * from cte1 alias1 inner join cte1 alias2 on alias1.c1 = alias2.c1;").Check(testkit.Rows("1 2020-10-10 1 2020-10-10")) + } +}