Skip to content

Commit

Permalink
ttl: always enable all read engines for TTL sessions (#56604)
Browse files Browse the repository at this point in the history
close #56402
  • Loading branch information
YangKeao authored Oct 22, 2024
1 parent eebdcfe commit 670e970
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 4 deletions.
9 changes: 5 additions & 4 deletions pkg/ttl/ttlworker/job_manager_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func TestGetSession(t *testing.T) {
tk.MustExec("set @@tidb_retry_limit=1")
tk.MustExec("set @@tidb_enable_1pc=0")
tk.MustExec("set @@tidb_enable_async_commit=0")
tk.MustExec("set @@tidb_isolation_read_engines='tiflash,tidb'")
var getCnt atomic.Int32

pool := pools.NewResourcePool(func() (pools.Resource, error) {
Expand All @@ -97,13 +98,13 @@ func TestGetSession(t *testing.T) {
require.Equal(t, "Europe/Berlin", tz.String())

// session variables should be set
tk.MustQuery("select @@time_zone, @@tidb_retry_limit, @@tidb_enable_1pc, @@tidb_enable_async_commit").
Check(testkit.Rows("UTC 0 1 1"))
tk.MustQuery("select @@time_zone, @@tidb_retry_limit, @@tidb_enable_1pc, @@tidb_enable_async_commit, @@tidb_isolation_read_engines").
Check(testkit.Rows("UTC 0 1 1 tikv,tiflash,tidb"))

// all session variables should be restored after close
se.Close()
tk.MustQuery("select @@time_zone, @@tidb_retry_limit, @@tidb_enable_1pc, @@tidb_enable_async_commit").
Check(testkit.Rows("Asia/Shanghai 1 0 0"))
tk.MustQuery("select @@time_zone, @@tidb_retry_limit, @@tidb_enable_1pc, @@tidb_enable_async_commit, @@tidb_isolation_read_engines").
Check(testkit.Rows("Asia/Shanghai 1 0 0 tiflash,tidb"))
}

func TestParallelLockNewJob(t *testing.T) {
Expand Down
40 changes: 40 additions & 0 deletions pkg/ttl/ttlworker/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
Expand Down Expand Up @@ -55,6 +56,12 @@ var DetachStatsCollector = func(s sqlexec.SQLExecutor) sqlexec.SQLExecutor {
return s
}

var allIsolationReadEngines = map[kv.StoreType]struct{}{
kv.TiKV: {},
kv.TiFlash: {},
kv.TiDB: {},
}

func getSession(pool util.SessionPool) (session.Session, error) {
resource, err := pool.Get()
if err != nil {
Expand All @@ -77,6 +84,7 @@ func getSession(pool util.SessionPool) (session.Session, error) {
originalEnable1PC := sctx.GetSessionVars().Enable1PC
originalEnableAsyncCommit := sctx.GetSessionVars().EnableAsyncCommit
originalTimeZone, restoreTimeZone := "", false
originalIsolationReadEngines, restoreIsolationReadEngines := "", false

se := session.NewSession(sctx, exec, func(se session.Session) {
_, err = se.ExecuteSQL(context.Background(), fmt.Sprintf("set tidb_retry_limit=%d", originalRetryLimit))
Expand All @@ -103,6 +111,12 @@ func getSession(pool util.SessionPool) (session.Session, error) {
terror.Log(err)
}

if restoreIsolationReadEngines {
_, err = se.ExecuteSQL(context.Background(), "set tidb_isolation_read_engines=%?", originalIsolationReadEngines)
intest.AssertNoError(err)
terror.Log(err)
}

DetachStatsCollector(exec)

pool.Put(resource)
Expand Down Expand Up @@ -157,6 +171,32 @@ func getSession(pool util.SessionPool) (session.Session, error) {
}
restoreTimeZone = true

// allow the session in TTL to use all read engines.
_, hasTiDBEngine := se.GetSessionVars().IsolationReadEngines[kv.TiDB]
_, hasTiKVEngine := se.GetSessionVars().IsolationReadEngines[kv.TiKV]
_, hasTiFlashEngine := se.GetSessionVars().IsolationReadEngines[kv.TiFlash]
if !hasTiDBEngine || !hasTiKVEngine || !hasTiFlashEngine {
rows, err := se.ExecuteSQL(context.Background(), "select @@tidb_isolation_read_engines")
if err != nil {
se.Close()
return nil, err
}

if len(rows) == 0 || rows[0].Len() == 0 {
se.Close()
return nil, errors.New("failed to get tidb_isolation_read_engines variable")
}
originalIsolationReadEngines = rows[0].GetString(0)

_, err = se.ExecuteSQL(context.Background(), "set tidb_isolation_read_engines='tikv,tiflash,tidb'")
if err != nil {
se.Close()
return nil, err
}

restoreIsolationReadEngines = true
}

return se, nil
}

Expand Down

0 comments on commit 670e970

Please sign in to comment.