Skip to content

Commit

Permalink
executor: fix random cte error under apply (#57294) (#57395)
Browse files Browse the repository at this point in the history
close #55881
  • Loading branch information
ti-chi-bot authored Nov 15, 2024
1 parent 9112d43 commit e3cb8c7
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 42 deletions.
87 changes: 46 additions & 41 deletions pkg/executor/cte.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,16 @@ func (e *CTEExec) Open(ctx context.Context) (err error) {
defer e.producer.resTbl.Unlock()

if e.producer.checkAndUpdateCorColHashCode() {
e.producer.reset()
if err = e.producer.reopenTbls(); err != nil {
err = e.producer.reset()
if err != nil {
return err
}
}
if e.producer.openErr != nil {
return e.producer.openErr
}
if !e.producer.opened {
if err = e.producer.openProducer(ctx, e); err != nil {
if !e.producer.hasCTEResult() && !e.producer.executorOpened {
if err = e.producer.openProducerExecutor(ctx, e); err != nil {
return err
}
}
Expand All @@ -110,8 +110,14 @@ func (e *CTEExec) Open(ctx context.Context) (err error) {
func (e *CTEExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
e.producer.resTbl.Lock()
defer e.producer.resTbl.Unlock()
if !e.producer.resTbl.Done() {
if err = e.producer.produce(ctx); err != nil {
if !e.producer.hasCTEResult() {
// in case that another CTEExec call close without generate CTE result.
if !e.producer.executorOpened {
if err = e.producer.openProducerExecutor(ctx, e); err != nil {
return err
}
}
if err = e.producer.genCTEResult(ctx); err != nil {
return err
}
}
Expand All @@ -133,20 +139,25 @@ func (e *CTEExec) Close() (firstErr error) {
func() {
e.producer.resTbl.Lock()
defer e.producer.resTbl.Unlock()
if !e.producer.closed {
if e.producer.executorOpened {
failpoint.Inject("mock_cte_exec_panic_avoid_deadlock", func(v failpoint.Value) {
ok := v.(bool)
if ok {
// mock an oom panic, returning ErrMemoryExceedForQuery for error identification in recovery work.
panic(exeerrors.ErrMemoryExceedForQuery)
}
})
// closeProducer() only close seedExec and recursiveExec, will not touch resTbl.
// It means you can still read resTbl after call closeProducer().
// You can even call all three functions(openProducer/produce/closeProducer) in CTEExec.Next().
// closeProducerExecutor() only close seedExec and recursiveExec, will not touch resTbl.
// It means you can still read resTbl after call closeProducerExecutor().
// You can even call all three functions(openProducerExecutor/genCTEResult/closeProducerExecutor) in CTEExec.Next().
// Separating these three function calls is only to follow the abstraction of the volcano model.
err := e.producer.closeProducer()
err := e.producer.closeProducerExecutor()
firstErr = setFirstErr(firstErr, err, "close cte producer error")
if !e.producer.hasCTEResult() {
// CTE result is not generated, in this case, we reset it
err = e.producer.reset()
firstErr = setFirstErr(firstErr, err, "close cte producer error")
}
}
}()
err := e.BaseExecutor.Close()
Expand All @@ -161,10 +172,10 @@ func (e *CTEExec) reset() {
}

type cteProducer struct {
// opened should be false when not open or open fail(a.k.a. openErr != nil)
opened bool
produced bool
closed bool
// executorOpened is used to indicate whether the executor(seedExec/recursiveExec) is opened.
// when executorOpened is true, the executor is opened, otherwise it means the executor is
// not opened or is already closed.
executorOpened bool

// cteProducer is shared by multiple operators, so if the first operator tries to open
// and got error, the second should return open error directly instead of open again.
Expand Down Expand Up @@ -203,14 +214,10 @@ type cteProducer struct {
corColHashCodes [][]byte
}

func (p *cteProducer) openProducer(ctx context.Context, cteExec *CTEExec) (err error) {
func (p *cteProducer) openProducerExecutor(ctx context.Context, cteExec *CTEExec) (err error) {
defer func() {
p.openErr = err
if err == nil {
p.opened = true
} else {
p.opened = false
}
p.executorOpened = true
}()
if p.seedExec == nil {
return errors.New("seedExec for CTEExec is nil")
Expand Down Expand Up @@ -253,7 +260,7 @@ func (p *cteProducer) openProducer(ctx context.Context, cteExec *CTEExec) (err e
return nil
}

func (p *cteProducer) closeProducer() (firstErr error) {
func (p *cteProducer) closeProducerExecutor() (firstErr error) {
err := exec.Close(p.seedExec)
firstErr = setFirstErr(firstErr, err, "close seedExec err")

Expand All @@ -272,7 +279,7 @@ func (p *cteProducer) closeProducer() (firstErr error) {
// because ExplainExec still needs tracker to get mem usage info.
p.memTracker = nil
p.diskTracker = nil
p.closed = true
p.executorOpened = false
return
}

Expand Down Expand Up @@ -339,7 +346,13 @@ func (p *cteProducer) nextChunkLimit(cteExec *CTEExec, req *chunk.Chunk) error {
return nil
}

func (p *cteProducer) produce(ctx context.Context) (err error) {
func (p *cteProducer) hasCTEResult() bool {
return p.resTbl.Done()
}

// genCTEResult generates the result of CTE, and stores the result in resTbl.
// This is a synchronous function, which means it will block until the result is generated.
func (p *cteProducer) genCTEResult(ctx context.Context) (err error) {
if p.resTbl.Error() != nil {
return p.resTbl.Error()
}
Expand Down Expand Up @@ -532,14 +545,18 @@ func (p *cteProducer) setupTblsForNewIteration() (err error) {
return nil
}

func (p *cteProducer) reset() {
func (p *cteProducer) reset() error {
p.curIter = 0
p.hashTbl = nil

p.opened = false
p.executorOpened = false
p.openErr = nil
p.produced = false
p.closed = false

// Normally we need to setup tracker after calling Reopen(),
// But reopen resTbl means we need to call genCTEResult() again, it will setup tracker.
if err := p.resTbl.Reopen(); err != nil {
return err
}
return p.iterInTbl.Reopen()
}

func (p *cteProducer) resetTracker() {
Expand All @@ -553,18 +570,6 @@ func (p *cteProducer) resetTracker() {
}
}

func (p *cteProducer) reopenTbls() (err error) {
if p.isDistinct {
p.hashTbl = join.NewConcurrentMapHashTable()
}
// Normally we need to setup tracker after calling Reopen(),
// But reopen resTbl means we need to call produce() again, it will setup tracker.
if err := p.resTbl.Reopen(); err != nil {
return err
}
return p.iterInTbl.Reopen()
}

// Check if tbl meets the requirement of limit.
func (p *cteProducer) limitDone(tbl cteutil.Storage) bool {
return p.hasLimit && uint64(tbl.NumRows()) >= p.limitEnd
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/test/issuetest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ go_test(
"main_test.go",
],
flaky = True,
shard_count = 23,
shard_count = 24,
deps = [
"//pkg/autoid_service",
"//pkg/config",
Expand Down
19 changes: 19 additions & 0 deletions pkg/executor/test/issuetest/executor_issue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -745,3 +745,22 @@ func TestCalculateBatchSize(t *testing.T) {
require.Equal(t, 258, executor.CalculateBatchSize(10, 1024, 258))
require.Equal(t, 1024, executor.CalculateBatchSize(0, 1024, 20000))
}

func TestIssue55881(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test;")
tk.MustExec("drop table if exists aaa;")
tk.MustExec("drop table if exists bbb;")
tk.MustExec("create table aaa(id int, value int);")
tk.MustExec("create table bbb(id int, value int);")
tk.MustExec("insert into aaa values(1,2),(2,3)")
tk.MustExec("insert into bbb values(1,2),(2,3),(3,4)")
// set tidb_executor_concurrency to 1 to let the issue happens with high probability.
tk.MustExec("set tidb_executor_concurrency=1;")
// this is a random issue, so run it 100 times to increase the probability of the issue.
for i := 0; i < 100; i++ {
tk.MustQuery("with cte as (select * from aaa) select id, (select id from (select * from aaa where aaa.id != bbb.id union all select * from cte union all select * from cte) d limit 1)," +
"(select max(value) from (select * from cte union all select * from cte union all select * from aaa where aaa.id > bbb.id)) from bbb;")
}
}

0 comments on commit e3cb8c7

Please sign in to comment.