From 7b5676046bb524b8b3aa9def14a2bba113b0239e Mon Sep 17 00:00:00 2001 From: Han Fei Date: Mon, 13 Sep 2021 23:43:17 +0800 Subject: [PATCH] sessionctx: fix bug where meeting race when alloc task id --- executor/tiflash_test.go | 29 +++++++++++++++++++++++++++++ sessionctx/variable/session.go | 3 +++ 2 files changed, 32 insertions(+) diff --git a/executor/tiflash_test.go b/executor/tiflash_test.go index ee48820dde220..b03f6f82ef3a2 100644 --- a/executor/tiflash_test.go +++ b/executor/tiflash_test.go @@ -149,6 +149,35 @@ func (s *tiflashTestSuite) TestReadUnsigedPK(c *C) { tk.MustQuery("select count(*) from t1 , t where t1.a = t.a and ((t1.a < 9223372036854775800 and t1.a > 2) or (t1.a <= 1 and t1.a > -1))").Check(testkit.Rows("3")) } +// to fix https://github.com/pingcap/tidb/issues/27952 +func (s *tiflashTestSuite) TestJoinRace(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int not null, b int not null)") + tk.MustExec("alter table t set tiflash replica 1") + tb := testGetTableByName(c, tk.Se, "test", "t") + err := domain.GetDomain(tk.Se).DDL().UpdateTableReplicaInfo(tk.Se, tb.Meta().ID, true) + c.Assert(err, IsNil) + tk.MustExec("insert into t values(1,1)") + tk.MustExec("insert into t values(2,1)") + tk.MustExec("insert into t values(3,1)") + tk.MustExec("insert into t values(1,2)") + tk.MustExec("insert into t values(2,2)") + tk.MustExec("insert into t values(3,2)") + tk.MustExec("insert into t values(1,2)") + tk.MustExec("insert into t values(2,2)") + tk.MustExec("insert into t values(3,2)") + tk.MustExec("insert into t values(1,3)") + tk.MustExec("insert into t values(2,3)") + tk.MustExec("insert into t values(3,4)") + tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"") + tk.MustExec("set @@session.tidb_enforce_mpp=ON") + tk.MustExec("set @@tidb_opt_broadcast_cartesian_join=0") + tk.MustQuery("select count(*) from (select count(a) x from t group by b) t1 join (select count(a) x from t group by b) t2 on t1.x > t2.x").Check(testkit.Rows("6")) + +} + func (s *tiflashTestSuite) TestMppExecution(c *C) { if israce.RaceEnabled { c.Skip("skip race test because of long running") diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 0c6bd9c718851..decd6216c7ff0 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -481,6 +481,7 @@ type SessionVars struct { // mppTaskIDAllocator is used to allocate mpp task id for a session. mppTaskIDAllocator struct { + mu sync.Mutex lastTS uint64 taskID int64 } @@ -964,6 +965,8 @@ func (s *SessionVars) InitStatementContext() *stmtctx.StatementContext { // AllocMPPTaskID allocates task id for mpp tasks. It will reset the task id if the query's // startTs is different. func (s *SessionVars) AllocMPPTaskID(startTS uint64) int64 { + s.mppTaskIDAllocator.mu.Lock() + defer s.mppTaskIDAllocator.mu.Unlock() if s.mppTaskIDAllocator.lastTS == startTS { s.mppTaskIDAllocator.taskID++ return s.mppTaskIDAllocator.taskID