diff --git a/executor/builder.go b/executor/builder.go index bad42e3ff99f4..ec79aad7f832d 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -108,6 +108,7 @@ type executorBuilder struct { type CTEStorages struct { ResTbl cteutil.Storage IterInTbl cteutil.Storage + Producer *CTEProducer } func newExecutorBuilder(ctx sessionctx.Context, is infoschema.InfoSchema, ti *TelemetryInfo) *executorBuilder { @@ -5317,33 +5318,61 @@ func (b *executorBuilder) buildTableSample(v *plannercore.PhysicalTableSample) * } func (b *executorBuilder) buildCTE(v *plannercore.PhysicalCTE) Executor { - // 1. Build seedPlan. if b.Ti != nil { b.Ti.UseNonRecursive = true } - seedExec := b.build(v.SeedPlan) - if b.err != nil { - return nil + if v.RecurPlan != nil && b.Ti != nil { + b.Ti.UseRecursive = true } - // 2. Build tables to store intermediate results. - chkSize := b.ctx.GetSessionVars().MaxChunkSize - tps := seedExec.base().retFieldTypes - // iterOutTbl will be constructed in CTEExec.Open(). - var resTbl cteutil.Storage - var iterInTbl cteutil.Storage - storageMap, ok := b.ctx.GetSessionVars().StmtCtx.CTEStorageMap.(map[int]*CTEStorages) if !ok { b.err = errors.New("type assertion for CTEStorageMap failed") return nil } + + chkSize := b.ctx.GetSessionVars().MaxChunkSize + // tps := seedExec.base().retFieldTypes + // iterOutTbl will be constructed in CTEExec.Open(). + var resTbl cteutil.Storage + var iterInTbl cteutil.Storage + var cteProducer *CTEProducer storages, ok := storageMap[v.CTE.IDForStorage] if ok { // Storage already setup. resTbl = storages.ResTbl iterInTbl = storages.IterInTbl + cteProducer = storages.Producer } else { + seedExec := b.build(v.SeedPlan) + if b.err != nil { + return nil + } + recursiveExec := b.build(v.RecurPlan) + if b.err != nil { + return nil + } + var sel []int + if v.CTE.IsDistinct { + sel = make([]int, chkSize) + for i := 0; i < chkSize; i++ { + sel[i] = i + } + } + cteProducer = &CTEProducer{ + ctx: b.ctx, + seedExec: seedExec, + recursiveExec: recursiveExec, + resTbl: resTbl, + iterInTbl: iterInTbl, + isDistinct: v.CTE.IsDistinct, + sel: sel, + hasLimit: v.CTE.HasLimit, + limitBeg: v.CTE.LimitBeg, + limitEnd: v.CTE.LimitEnd, + isInApply: v.CTE.IsInApply, + } + tps := seedExec.base().retFieldTypes resTbl = cteutil.NewStorageRowContainer(tps, chkSize) if err := resTbl.OpenAndRef(); err != nil { b.err = err @@ -5354,39 +5383,12 @@ func (b *executorBuilder) buildCTE(v *plannercore.PhysicalCTE) Executor { b.err = err return nil } - storageMap[v.CTE.IDForStorage] = &CTEStorages{ResTbl: resTbl, IterInTbl: iterInTbl} - } - - // 3. Build recursive part. - if v.RecurPlan != nil && b.Ti != nil { - b.Ti.UseRecursive = true - } - recursiveExec := b.build(v.RecurPlan) - if b.err != nil { - return nil - } - - var sel []int - if v.CTE.IsDistinct { - sel = make([]int, chkSize) - for i := 0; i < chkSize; i++ { - sel[i] = i - } + storageMap[v.CTE.IDForStorage] = &CTEStorages{ResTbl: resTbl, IterInTbl: iterInTbl, Producer: cteProducer} } return &CTEExec{ - baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()), - seedExec: seedExec, - recursiveExec: recursiveExec, - resTbl: resTbl, - iterInTbl: iterInTbl, - chkIdx: 0, - isDistinct: v.CTE.IsDistinct, - sel: sel, - hasLimit: v.CTE.HasLimit, - limitBeg: v.CTE.LimitBeg, - limitEnd: v.CTE.LimitEnd, - isInApply: v.CTE.IsInApply, + baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()), + producer: cteProducer, } } diff --git a/executor/cte.go b/executor/cte.go index 569d59298d9c6..f6c1d67dd6481 100644 --- a/executor/cte.go +++ b/executor/cte.go @@ -63,6 +63,18 @@ var _ Executor = &CTEExec{} type CTEExec struct { baseExecutor + chkIdx int + producer *CTEProducer +} + +type CTEProducer struct { + opened bool + produced bool + closed bool + + // gjt todo + ctx sessionctx.Context + seedExec Executor recursiveExec Executor @@ -101,97 +113,137 @@ type CTEExec struct { // Open implements the Executor interface. func (e *CTEExec) Open(ctx context.Context) (err error) { - e.reset() if err := e.baseExecutor.Open(ctx); err != nil { return err } - if e.seedExec == nil { + e.producer.resTbl.Lock() + defer e.producer.resTbl.Unlock() + + if !e.producer.opened { + e.producer.openProducer(ctx, e) + } + return nil +} + +// Next implements the Executor interface. +func (e *CTEExec) Next(ctx context.Context, req *chunk.Chunk) (err error) { + e.producer.resTbl.Lock() + defer e.producer.resTbl.Unlock() + if !e.producer.resTbl.Done() { + if err = e.producer.produce(ctx, e); err != nil { + return err + } + } + + e.producer.getChunk(ctx, e, req) + return nil +} + +// Close implements the Executor interface. +func (e *CTEExec) Close() (err error) { + e.producer.resTbl.Lock() + if !e.producer.closed { + err = e.producer.closeProducer() + } + e.producer.resTbl.Unlock() + if err != nil { + return err + } + return e.baseExecutor.Close() +} + +func setupCTEStorageTracker(tbl cteutil.Storage, ctx sessionctx.Context, parentMemTracker *memory.Tracker, + parentDiskTracker *disk.Tracker) (actionSpill *chunk.SpillDiskAction) { + memTracker := tbl.GetMemTracker() + memTracker.SetLabel(memory.LabelForCTEStorage) + memTracker.AttachTo(parentMemTracker) + + diskTracker := tbl.GetDiskTracker() + diskTracker.SetLabel(memory.LabelForCTEStorage) + diskTracker.AttachTo(parentDiskTracker) + + if variable.EnableTmpStorageOnOOM.Load() { + actionSpill = tbl.ActionSpill() + failpoint.Inject("testCTEStorageSpill", func(val failpoint.Value) { + if val.(bool) { + actionSpill = tbl.(*cteutil.StorageRC).ActionSpillForTest() + } + }) + ctx.GetSessionVars().MemTracker.FallbackOldAndSetNewAction(actionSpill) + } + return actionSpill +} + +func (p *CTEProducer) openProducer(ctx context.Context, cteExec *CTEExec) (err error) { + if p.seedExec == nil { return errors.New("seedExec for CTEExec is nil") } - if err = e.seedExec.Open(ctx); err != nil { + if err = p.seedExec.Open(ctx); err != nil { return err } - if e.memTracker != nil { - e.memTracker.Reset() + if p.memTracker != nil { + p.memTracker.Reset() } else { - e.memTracker = memory.NewTracker(e.id, -1) + p.memTracker = memory.NewTracker(cteExec.id, -1) } - e.diskTracker = disk.NewTracker(e.id, -1) - e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) - e.diskTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.DiskTracker) + p.diskTracker = disk.NewTracker(cteExec.id, -1) + p.memTracker.AttachTo(p.ctx.GetSessionVars().StmtCtx.MemTracker) + p.diskTracker.AttachTo(p.ctx.GetSessionVars().StmtCtx.DiskTracker) - if e.recursiveExec != nil { - if err = e.recursiveExec.Open(ctx); err != nil { + if p.recursiveExec != nil { + if err = p.recursiveExec.Open(ctx); err != nil { return err } // For non-recursive CTE, the result will be put into resTbl directly. // So no need to build iterOutTbl. // Construct iterOutTbl in Open() instead of buildCTE(), because its destruct is in Close(). - recursiveTypes := e.recursiveExec.base().retFieldTypes - e.iterOutTbl = cteutil.NewStorageRowContainer(recursiveTypes, e.maxChunkSize) - if err = e.iterOutTbl.OpenAndRef(); err != nil { + recursiveTypes := p.recursiveExec.base().retFieldTypes + p.iterOutTbl = cteutil.NewStorageRowContainer(recursiveTypes, cteExec.maxChunkSize) + if err = p.iterOutTbl.OpenAndRef(); err != nil { return err } } - if e.isDistinct { - e.hashTbl = newConcurrentMapHashTable() - e.hCtx = &hashContext{ - allTypes: e.base().retFieldTypes, + if p.isDistinct { + p.hashTbl = newConcurrentMapHashTable() + p.hCtx = &hashContext{ + allTypes: cteExec.base().retFieldTypes, } // We use all columns to compute hash. - e.hCtx.keyColIdx = make([]int, len(e.hCtx.allTypes)) - for i := range e.hCtx.keyColIdx { - e.hCtx.keyColIdx[i] = i + p.hCtx.keyColIdx = make([]int, len(p.hCtx.allTypes)) + for i := range p.hCtx.keyColIdx { + p.hCtx.keyColIdx[i] = i } } + p.opened = true return nil } -// Next implements the Executor interface. -func (e *CTEExec) Next(ctx context.Context, req *chunk.Chunk) (err error) { - req.Reset() - e.resTbl.Lock() - defer e.resTbl.Unlock() - if !e.resTbl.Done() { - if e.resTbl.Error() != nil { - return e.resTbl.Error() - } - resAction := setupCTEStorageTracker(e.resTbl, e.ctx, e.memTracker, e.diskTracker) - iterInAction := setupCTEStorageTracker(e.iterInTbl, e.ctx, e.memTracker, e.diskTracker) - var iterOutAction *chunk.SpillDiskAction - if e.iterOutTbl != nil { - iterOutAction = setupCTEStorageTracker(e.iterOutTbl, e.ctx, e.memTracker, e.diskTracker) - } - - failpoint.Inject("testCTEStorageSpill", func(val failpoint.Value) { - if val.(bool) && variable.EnableTmpStorageOnOOM.Load() { - defer resAction.WaitForTest() - defer iterInAction.WaitForTest() - if iterOutAction != nil { - defer iterOutAction.WaitForTest() - } - } - }) - - if err = e.computeSeedPart(ctx); err != nil { - e.resTbl.SetError(err) +func (p *CTEProducer) closeProducer() (err error) { + if err = p.seedExec.Close(); err != nil { + return err + } + if p.recursiveExec != nil { + if err = p.recursiveExec.Close(); err != nil { return err } - if err = e.computeRecursivePart(ctx); err != nil { - e.resTbl.SetError(err) - return err + // `iterInTbl` and `resTbl` are shared by multiple operators, + // so will be closed when the SQL finishes. + if p.iterOutTbl != nil { + if err = p.iterOutTbl.DerefAndClose(); err != nil { + return err + } } - e.resTbl.SetDone() } + return nil +} - if e.hasLimit { - return e.nextChunkLimit(req) - } - if e.chkIdx < e.resTbl.NumChunks() { - res, err := e.resTbl.GetChunk(e.chkIdx) +func (p *CTEProducer) getChunk(ctx context.Context, cteExec *CTEExec, req *chunk.Chunk) (err error) { + req.Reset() + if cteExec.chkIdx < p.resTbl.NumChunks() { + res, err := p.resTbl.GetChunk(cteExec.chkIdx) if err != nil { return err } @@ -199,59 +251,66 @@ func (e *CTEExec) Next(ctx context.Context, req *chunk.Chunk) (err error) { // Also we ignore copying rows not selected, because some operators like Projection // doesn't support swap column if chunk.sel is no nil. req.SwapColumns(res.CopyConstructSel()) - e.chkIdx++ + cteExec.chkIdx++ } return nil } -// Close implements the Executor interface. -func (e *CTEExec) Close() (err error) { - e.reset() - if err = e.seedExec.Close(); err != nil { - return err +func (p *CTEProducer) produce(ctx context.Context, cteExec *CTEExec) (err error) { + if p.resTbl.Error() != nil { + return p.resTbl.Error() } - if e.recursiveExec != nil { - if err = e.recursiveExec.Close(); err != nil { - return err - } - // `iterInTbl` and `resTbl` are shared by multiple operators, - // so will be closed when the SQL finishes. - if e.iterOutTbl != nil { - if err = e.iterOutTbl.DerefAndClose(); err != nil { - return err + resAction := setupCTEStorageTracker(p.resTbl, cteExec.ctx, p.memTracker, p.diskTracker) + iterInAction := setupCTEStorageTracker(p.iterInTbl, cteExec.ctx, p.memTracker, p.diskTracker) + var iterOutAction *chunk.SpillDiskAction + if p.iterOutTbl != nil { + iterOutAction = setupCTEStorageTracker(p.iterOutTbl, cteExec.ctx, p.memTracker, p.diskTracker) + } + + failpoint.Inject("testCTEStorageSpill", func(val failpoint.Value) { + if val.(bool) && variable.EnableTmpStorageOnOOM.Load() { + defer resAction.WaitForTest() + defer iterInAction.WaitForTest() + if iterOutAction != nil { + defer iterOutAction.WaitForTest() } } + }) + + if err = p.computeSeedPart(ctx); err != nil { + p.resTbl.SetError(err) + return err } - if e.isInApply { - if err = e.reopenTbls(); err != nil { - return err - } + if err = p.computeRecursivePart(ctx); err != nil { + p.resTbl.SetError(err) + return err } - return e.baseExecutor.Close() + p.resTbl.SetDone() + return nil } -func (e *CTEExec) computeSeedPart(ctx context.Context) (err error) { +func (p *CTEProducer) computeSeedPart(ctx context.Context) (err error) { defer func() { if r := recover(); r != nil && err == nil { err = errors.Errorf("%v", r) } }() failpoint.Inject("testCTESeedPanic", nil) - e.curIter = 0 - e.iterInTbl.SetIter(e.curIter) + p.curIter = 0 + p.iterInTbl.SetIter(p.curIter) chks := make([]*chunk.Chunk, 0, 10) for { - if e.limitDone(e.iterInTbl) { + if p.limitDone(p.iterInTbl) { break } - chk := tryNewCacheChunk(e.seedExec) - if err = Next(ctx, e.seedExec, chk); err != nil { + chk := tryNewCacheChunk(p.seedExec) + if err = Next(ctx, p.seedExec, chk); err != nil { return } if chk.NumRows() == 0 { break } - if chk, err = e.tryDedupAndAdd(chk, e.iterInTbl, e.hashTbl); err != nil { + if chk, err = p.tryDedupAndAdd(chk, p.iterInTbl, p.hashTbl); err != nil { return } chks = append(chks, chk) @@ -259,66 +318,66 @@ func (e *CTEExec) computeSeedPart(ctx context.Context) (err error) { // Initial resTbl is empty, so no need to deduplicate chk using resTbl. // Just adding is ok. for _, chk := range chks { - if err = e.resTbl.Add(chk); err != nil { + if err = p.resTbl.Add(chk); err != nil { return } } - e.curIter++ - e.iterInTbl.SetIter(e.curIter) + p.curIter++ + p.iterInTbl.SetIter(p.curIter) return } -func (e *CTEExec) computeRecursivePart(ctx context.Context) (err error) { +func (p *CTEProducer) computeRecursivePart(ctx context.Context) (err error) { defer func() { if r := recover(); r != nil && err == nil { err = errors.Errorf("%v", r) } }() failpoint.Inject("testCTERecursivePanic", nil) - if e.recursiveExec == nil || e.iterInTbl.NumChunks() == 0 { + if p.recursiveExec == nil || p.iterInTbl.NumChunks() == 0 { return } - if e.curIter > e.ctx.GetSessionVars().CTEMaxRecursionDepth { - return exeerrors.ErrCTEMaxRecursionDepth.GenWithStackByArgs(e.curIter) + if p.curIter > p.ctx.GetSessionVars().CTEMaxRecursionDepth { + return exeerrors.ErrCTEMaxRecursionDepth.GenWithStackByArgs(p.curIter) } - if e.limitDone(e.resTbl) { + if p.limitDone(p.resTbl) { return } for { - chk := tryNewCacheChunk(e.recursiveExec) - if err = Next(ctx, e.recursiveExec, chk); err != nil { + chk := tryNewCacheChunk(p.recursiveExec) + if err = Next(ctx, p.recursiveExec, chk); err != nil { return } if chk.NumRows() == 0 { - if err = e.setupTblsForNewIteration(); err != nil { + if err = p.setupTblsForNewIteration(); err != nil { return } - if e.limitDone(e.resTbl) { + if p.limitDone(p.resTbl) { break } - if e.iterInTbl.NumChunks() == 0 { + if p.iterInTbl.NumChunks() == 0 { break } // Next iteration begins. Need use iterOutTbl as input of next iteration. - e.curIter++ - e.iterInTbl.SetIter(e.curIter) - if e.curIter > e.ctx.GetSessionVars().CTEMaxRecursionDepth { - return exeerrors.ErrCTEMaxRecursionDepth.GenWithStackByArgs(e.curIter) + p.curIter++ + p.iterInTbl.SetIter(p.curIter) + if p.curIter > p.ctx.GetSessionVars().CTEMaxRecursionDepth { + return exeerrors.ErrCTEMaxRecursionDepth.GenWithStackByArgs(p.curIter) } // Make sure iterInTbl is setup before Close/Open, // because some executors will read iterInTbl in Open() (like IndexLookupJoin). - if err = e.recursiveExec.Close(); err != nil { + if err = p.recursiveExec.Close(); err != nil { return } - if err = e.recursiveExec.Open(ctx); err != nil { + if err = p.recursiveExec.Open(ctx); err != nil { return } } else { - if err = e.iterOutTbl.Add(chk); err != nil { + if err = p.iterOutTbl.Add(chk); err != nil { return } } @@ -326,67 +385,22 @@ func (e *CTEExec) computeRecursivePart(ctx context.Context) (err error) { return } -// Get next chunk from resTbl for limit. -func (e *CTEExec) nextChunkLimit(req *chunk.Chunk) error { - if !e.meetFirstBatch { - for e.chkIdx < e.resTbl.NumChunks() { - res, err := e.resTbl.GetChunk(e.chkIdx) - if err != nil { - return err - } - e.chkIdx++ - numRows := uint64(res.NumRows()) - if newCursor := e.cursor + numRows; newCursor >= e.limitBeg { - e.meetFirstBatch = true - begInChk, endInChk := e.limitBeg-e.cursor, numRows - if newCursor > e.limitEnd { - endInChk = e.limitEnd - e.cursor - } - e.cursor += endInChk - if begInChk == endInChk { - break - } - tmpChk := res.CopyConstructSel() - req.Append(tmpChk, int(begInChk), int(endInChk)) - return nil - } - e.cursor += numRows - } - } - if e.chkIdx < e.resTbl.NumChunks() && e.cursor < e.limitEnd { - res, err := e.resTbl.GetChunk(e.chkIdx) - if err != nil { - return err - } - e.chkIdx++ - numRows := uint64(res.NumRows()) - if e.cursor+numRows > e.limitEnd { - numRows = e.limitEnd - e.cursor - req.Append(res.CopyConstructSel(), 0, int(numRows)) - } else { - req.SwapColumns(res.CopyConstructSel()) - } - e.cursor += numRows - } - return nil -} - -func (e *CTEExec) setupTblsForNewIteration() (err error) { - num := e.iterOutTbl.NumChunks() +func (p *CTEProducer) setupTblsForNewIteration() (err error) { + num := p.iterOutTbl.NumChunks() chks := make([]*chunk.Chunk, 0, num) // Setup resTbl's data. for i := 0; i < num; i++ { - chk, err := e.iterOutTbl.GetChunk(i) + chk, err := p.iterOutTbl.GetChunk(i) if err != nil { return err } // Data should be copied in UNION DISTINCT. // Because deduplicate() will change data in iterOutTbl, // which will cause panic when spilling data into disk concurrently. - if e.isDistinct { + if p.isDistinct { chk = chk.CopyConstruct() } - chk, err = e.tryDedupAndAdd(chk, e.resTbl, e.hashTbl) + chk, err = p.tryDedupAndAdd(chk, p.resTbl, p.hashTbl) if err != nil { return err } @@ -394,76 +408,54 @@ func (e *CTEExec) setupTblsForNewIteration() (err error) { } // Setup new iteration data in iterInTbl. - if err = e.iterInTbl.Reopen(); err != nil { + if err = p.iterInTbl.Reopen(); err != nil { return err } - if e.isDistinct { + if p.isDistinct { // Already deduplicated by resTbl, adding directly is ok. for _, chk := range chks { - if err = e.iterInTbl.Add(chk); err != nil { + if err = p.iterInTbl.Add(chk); err != nil { return err } } } else { - if err = e.iterInTbl.SwapData(e.iterOutTbl); err != nil { + if err = p.iterInTbl.SwapData(p.iterOutTbl); err != nil { return err } } // Clear data in iterOutTbl. - return e.iterOutTbl.Reopen() + return p.iterOutTbl.Reopen() } -func (e *CTEExec) reset() { - e.curIter = 0 - e.chkIdx = 0 - e.hashTbl = nil - e.cursor = 0 - e.meetFirstBatch = false +func (p *CTEProducer) reset() { + p.curIter = 0 + p.chkIdx = 0 + p.hashTbl = nil + p.cursor = 0 + p.meetFirstBatch = false } -func (e *CTEExec) reopenTbls() (err error) { - if e.isDistinct { - e.hashTbl = newConcurrentMapHashTable() +func (p *CTEProducer) reopenTbls() (err error) { + if p.isDistinct { + p.hashTbl = newConcurrentMapHashTable() } - if err := e.resTbl.Reopen(); err != nil { + if err := p.resTbl.Reopen(); err != nil { return err } - return e.iterInTbl.Reopen() + return p.iterInTbl.Reopen() } // Check if tbl meets the requirement of limit. -func (e *CTEExec) limitDone(tbl cteutil.Storage) bool { - return e.hasLimit && uint64(tbl.NumRows()) >= e.limitEnd -} - -func setupCTEStorageTracker(tbl cteutil.Storage, ctx sessionctx.Context, parentMemTracker *memory.Tracker, - parentDiskTracker *disk.Tracker) (actionSpill *chunk.SpillDiskAction) { - memTracker := tbl.GetMemTracker() - memTracker.SetLabel(memory.LabelForCTEStorage) - memTracker.AttachTo(parentMemTracker) - - diskTracker := tbl.GetDiskTracker() - diskTracker.SetLabel(memory.LabelForCTEStorage) - diskTracker.AttachTo(parentDiskTracker) - - if variable.EnableTmpStorageOnOOM.Load() { - actionSpill = tbl.ActionSpill() - failpoint.Inject("testCTEStorageSpill", func(val failpoint.Value) { - if val.(bool) { - actionSpill = tbl.(*cteutil.StorageRC).ActionSpillForTest() - } - }) - ctx.GetSessionVars().MemTracker.FallbackOldAndSetNewAction(actionSpill) - } - return actionSpill +func (p *CTEProducer) limitDone(tbl cteutil.Storage) bool { + return p.hasLimit && uint64(tbl.NumRows()) >= p.limitEnd } -func (e *CTEExec) tryDedupAndAdd(chk *chunk.Chunk, +func (p *CTEProducer) tryDedupAndAdd(chk *chunk.Chunk, storage cteutil.Storage, hashTbl baseHashTable) (res *chunk.Chunk, err error) { - if e.isDistinct { - if chk, err = e.deduplicate(chk, storage, hashTbl); err != nil { + if p.isDistinct { + if chk, err = p.deduplicate(chk, storage, hashTbl); err != nil { return nil, err } } @@ -472,12 +464,12 @@ func (e *CTEExec) tryDedupAndAdd(chk *chunk.Chunk, // Compute hash values in chk and put it in hCtx.hashVals. // Use the returned sel to choose the computed hash values. -func (e *CTEExec) computeChunkHash(chk *chunk.Chunk) (sel []int, err error) { +func (p *CTEProducer) computeChunkHash(chk *chunk.Chunk) (sel []int, err error) { numRows := chk.NumRows() - e.hCtx.initHash(numRows) + p.hCtx.initHash(numRows) // Continue to reset to make sure all hasher is new. - for i := numRows; i < len(e.hCtx.hashVals); i++ { - e.hCtx.hashVals[i].Reset() + for i := numRows; i < len(p.hCtx.hashVals); i++ { + p.hCtx.hashVals[i].Reset() } sel = chk.Sel() var hashBitMap []bool @@ -489,12 +481,12 @@ func (e *CTEExec) computeChunkHash(chk *chunk.Chunk) (sel []int, err error) { } else { // All rows is selected, sel will be [0....numRows). // e.sel is setup when building executor. - sel = e.sel + sel = p.sel } for i := 0; i < chk.NumCols(); i++ { - if err = codec.HashChunkSelected(e.ctx.GetSessionVars().StmtCtx, e.hCtx.hashVals, - chk, e.hCtx.allTypes[i], i, e.hCtx.buf, e.hCtx.hasNull, + if err = codec.HashChunkSelected(p.ctx.GetSessionVars().StmtCtx, p.hCtx.hashVals, + chk, p.hCtx.allTypes[i], i, p.hCtx.buf, p.hCtx.hasNull, hashBitMap, false); err != nil { return nil, err } @@ -504,7 +496,7 @@ func (e *CTEExec) computeChunkHash(chk *chunk.Chunk) (sel []int, err error) { // Use hashTbl to deduplicate rows, and unique rows will be added to hashTbl. // Duplicated rows are only marked to be removed by sel in Chunk, instead of really deleted. -func (e *CTEExec) deduplicate(chk *chunk.Chunk, +func (p *CTEProducer) deduplicate(chk *chunk.Chunk, storage cteutil.Storage, hashTbl baseHashTable) (chkNoDup *chunk.Chunk, err error) { numRows := chk.NumRows() @@ -514,7 +506,7 @@ func (e *CTEExec) deduplicate(chk *chunk.Chunk, // 1. Compute hash values for chunk. chkHashTbl := newConcurrentMapHashTable() - selOri, err := e.computeChunkHash(chk) + selOri, err := p.computeChunkHash(chk) if err != nil { return nil, err } @@ -523,10 +515,10 @@ func (e *CTEExec) deduplicate(chk *chunk.Chunk, // This sel is for filtering rows duplicated in cur chk. selChk := make([]int, 0, numRows) for i := 0; i < numRows; i++ { - key := e.hCtx.hashVals[selOri[i]].Sum64() + key := p.hCtx.hashVals[selOri[i]].Sum64() row := chk.GetRow(i) - hasDup, err := e.checkHasDup(key, row, chk, storage, chkHashTbl) + hasDup, err := p.checkHasDup(key, row, chk, storage, chkHashTbl) if err != nil { return nil, err } @@ -546,10 +538,10 @@ func (e *CTEExec) deduplicate(chk *chunk.Chunk, // This sel is for filtering rows duplicated in cteutil.Storage. selStorage := make([]int, 0, len(selChk)) for i := 0; i < len(selChk); i++ { - key := e.hCtx.hashVals[selChk[i]].Sum64() + key := p.hCtx.hashVals[selChk[i]].Sum64() row := chk.GetRow(i) - hasDup, err := e.checkHasDup(key, row, nil, storage, hashTbl) + hasDup, err := p.checkHasDup(key, row, nil, storage, hashTbl) if err != nil { return nil, err } @@ -570,7 +562,7 @@ func (e *CTEExec) deduplicate(chk *chunk.Chunk, // Use the row's probe key to check if it already exists in chk or storage. // We also need to compare the row's real encoding value to avoid hash collision. -func (e *CTEExec) checkHasDup(probeKey uint64, +func (p *CTEProducer) checkHasDup(probeKey uint64, row chunk.Row, curChk *chunk.Chunk, storage cteutil.Storage, @@ -591,9 +583,9 @@ func (e *CTEExec) checkHasDup(probeKey uint64, if err != nil { return false, err } - isEqual, err := codec.EqualChunkRow(e.ctx.GetSessionVars().StmtCtx, - row, e.hCtx.allTypes, e.hCtx.keyColIdx, - matchedRow, e.hCtx.allTypes, e.hCtx.keyColIdx) + isEqual, err := codec.EqualChunkRow(p.ctx.GetSessionVars().StmtCtx, + row, p.hCtx.allTypes, p.hCtx.keyColIdx, + matchedRow, p.hCtx.allTypes, p.hCtx.keyColIdx) if err != nil { return false, err }