Skip to content

Commit

Permalink
ttl: always enable all read engines for TTL sessions (#56604) (#56806)
Browse files Browse the repository at this point in the history
close #56402
  • Loading branch information
ti-chi-bot authored Nov 4, 2024
1 parent d1250f2 commit 8b658c2
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 1 deletion.
2 changes: 1 addition & 1 deletion ttl/ttlworker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ go_test(
embed = [":ttlworker"],
flaky = True,
race = "on",
shard_count = 41,
shard_count = 42,
deps = [
"//domain",
"//infoschema",
Expand Down
32 changes: 32 additions & 0 deletions ttl/ttlworker/job_manager_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"github.com/google/uuid"
"github.com/ngaut/pools"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -1049,3 +1050,34 @@ func TestFinishError(t *testing.T) {
m.UpdateHeartBeat(context.Background(), se, now)
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("0"))
}

func TestGetSession(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set @@tidb_retry_limit=1")
tk.MustExec("set @@tidb_enable_1pc=0")
tk.MustExec("set @@tidb_enable_async_commit=0")
tk.MustExec("set @@tidb_isolation_read_engines='tiflash,tidb'")
var getCnt atomic.Int32

pool := pools.NewResourcePool(func() (pools.Resource, error) {
if getCnt.CompareAndSwap(0, 1) {
return tk.Session(), nil
}
require.FailNow(t, "get session more than once")
return nil, nil
}, 1, 1, 0)
defer pool.Close()
se, err := ttlworker.GetSessionForTest(pool)
require.NoError(t, err)
defer se.Close()

// session variables should be set
tk.MustQuery("select @@tidb_retry_limit, @@tidb_enable_1pc, @@tidb_enable_async_commit, @@tidb_isolation_read_engines").
Check(testkit.Rows("0 1 1 tikv,tiflash,tidb"))

// all session variables should be restored after close
se.Close()
tk.MustQuery("select @@tidb_retry_limit, @@tidb_enable_1pc, @@tidb_enable_async_commit, @@tidb_isolation_read_engines").
Check(testkit.Rows("1 0 0 tiflash,tidb"))
}
40 changes: 40 additions & 0 deletions ttl/ttlworker/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/ngaut/pools"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
Expand Down Expand Up @@ -54,6 +55,12 @@ var DetachStatsCollector = func(s sqlexec.SQLExecutor) sqlexec.SQLExecutor {
return s
}

var allIsolationReadEngines = map[kv.StoreType]struct{}{
kv.TiKV: {},
kv.TiFlash: {},
kv.TiDB: {},
}

type sessionPool interface {
Get() (pools.Resource, error)
Put(pools.Resource)
Expand Down Expand Up @@ -85,6 +92,8 @@ func getSession(pool sessionPool) (session.Session, error) {
originalRetryLimit := sctx.GetSessionVars().RetryLimit
originalEnable1PC := sctx.GetSessionVars().Enable1PC
originalEnableAsyncCommit := sctx.GetSessionVars().EnableAsyncCommit
originalIsolationReadEngines, restoreIsolationReadEngines := "", false

se := session.NewSession(sctx, exec, func(se session.Session) {
_, err = se.ExecuteSQL(context.Background(), fmt.Sprintf("set tidb_retry_limit=%d", originalRetryLimit))
if err != nil {
Expand All @@ -101,6 +110,11 @@ func getSession(pool sessionPool) (session.Session, error) {
terror.Log(err)
}

if restoreIsolationReadEngines {
_, err = se.ExecuteSQL(context.Background(), "set tidb_isolation_read_engines=%?", originalIsolationReadEngines)
terror.Log(err)
}

DetachStatsCollector(exec)

pool.Put(resource)
Expand Down Expand Up @@ -135,6 +149,32 @@ func getSession(pool sessionPool) (session.Session, error) {
return nil, err
}

// allow the session in TTL to use all read engines.
_, hasTiDBEngine := se.GetSessionVars().IsolationReadEngines[kv.TiDB]
_, hasTiKVEngine := se.GetSessionVars().IsolationReadEngines[kv.TiKV]
_, hasTiFlashEngine := se.GetSessionVars().IsolationReadEngines[kv.TiFlash]
if !hasTiDBEngine || !hasTiKVEngine || !hasTiFlashEngine {
rows, err := se.ExecuteSQL(context.Background(), "select @@tidb_isolation_read_engines")
if err != nil {
se.Close()
return nil, err
}

if len(rows) == 0 || rows[0].Len() == 0 {
se.Close()
return nil, errors.New("failed to get tidb_isolation_read_engines variable")
}
originalIsolationReadEngines = rows[0].GetString(0)

_, err = se.ExecuteSQL(context.Background(), "set tidb_isolation_read_engines='tikv,tiflash,tidb'")
if err != nil {
se.Close()
return nil, err
}

restoreIsolationReadEngines = true
}

return se, nil
}

Expand Down
3 changes: 3 additions & 0 deletions ttl/ttlworker/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,3 +359,6 @@ func TestValidateTTLWork(t *testing.T) {
err = validateTTLWork(ctx, s, tbl, expire)
require.EqualError(t, err, "physical id changed")
}

// GetSessionForTest is used for test
var GetSessionForTest = getSession

0 comments on commit 8b658c2

Please sign in to comment.