Skip to content

Commit

Permalink
infoschema: fix a snapshot infoschema cache bug after v1 v2 switch (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao authored and hawkingrei committed Aug 1, 2024
1 parent f676192 commit 843c4bb
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 2 deletions.
30 changes: 29 additions & 1 deletion pkg/infoschema/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ type InfoCache struct {

r autoid.Requirement
Data *Data

// first known schema version records the first known schema version, all schemas between [firstKnownSchemaVersion, latest)
// are known as long as we keep the DDL history correctly.
firstKnownSchemaVersion int64
}

type schemaAndTimestamp struct {
Expand Down Expand Up @@ -97,6 +101,7 @@ func (h *InfoCache) Upsert(is InfoSchema, schemaTS uint64) func() {
infoschema: is,
timestamp: int64(schemaTS),
})
h.firstKnownSchemaVersion = is.SchemaMetaVersion()

return func() {
// TODO: It's a bit tricky here, somewhere is holding the reference of the old infoschema.
Expand Down Expand Up @@ -223,8 +228,28 @@ func (h *InfoCache) getByVersionNoLock(version int64) InfoSchema {
// return h.cache[i]
// }
// ```
// upsert is a full reset of InfoCache, after upsert, the DDL history might lost and the assumption does not hold anymore.
// For example:
// Before
// infoschema 51
// infoschema 52
// infoschema 53
// infoschema 54
// infoschema 55
// infoschema 56
// After Upsert()
// infoschema 56
// Then load historial snapshot version 51
// infoschema 51
// infoschema 56
// Now, request for schema version 55, return infoschem 51 would be wrong!
//
if i == len(h.cache) {
return nil
}

if i < len(h.cache) && (i != 0 || h.cache[i].infoschema.SchemaMetaVersion() == version) {
if h.cache[i].infoschema.SchemaMetaVersion() == version ||
(i != 0 && h.cache[i].infoschema.SchemaMetaVersion() >= h.firstKnownSchemaVersion) {
infoschema_metrics.HitVersionCounter.Inc()
return h.cache[i].infoschema
}
Expand Down Expand Up @@ -289,6 +314,9 @@ func (h *InfoCache) Insert(is InfoSchema, schemaTS uint64) bool {
infoschema: is,
timestamp: int64(schemaTS),
}
if len(h.cache) == 1 {
h.firstKnownSchemaVersion = is.SchemaMetaVersion()
}
} else if i < len(h.cache) {
// drop older schema
copy(h.cache[i+1:], h.cache[i:])
Expand Down
3 changes: 2 additions & 1 deletion pkg/infoschema/test/infoschemav2test/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ go_test(
"v2_test.go",
],
flaky = True,
shard_count = 7,
shard_count = 8,
deps = [
"//pkg/domain",
"//pkg/domain/infosync",
Expand All @@ -21,6 +21,7 @@ go_test(
"//pkg/testkit/testfailpoint",
"//pkg/testkit/testsetup",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//oracle",
"@org_uber_go_goleak//:goleak",
],
)
43 changes: 43 additions & 0 deletions pkg/infoschema/test/infoschemav2test/v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/testkit/testfailpoint"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
)

func TestSpecialSchemas(t *testing.T) {
Expand Down Expand Up @@ -385,3 +386,45 @@ func TestFullLoadAndSnapshot(t *testing.T) {
tk.MustExec("use db1")
tk.MustQuery("show tables").Check(testkit.Rows("t"))
}

func TestIssue54926(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk2 := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set @@global.tidb_schema_cache_size = 0")
// For mocktikv, safe point is not initialized, we manually insert it for snapshot to use.
safePointName := "tikv_gc_safe_point"
safePointValue := "20160102-15:04:05 -0700"
safePointComment := "All versions after safe point can be accessed. (DO NOT EDIT)"
updateSafePoint := fmt.Sprintf(`INSERT INTO mysql.tidb VALUES ('%[1]s', '%[2]s', '%[3]s')
ON DUPLICATE KEY
UPDATE variable_value = '%[2]s', comment = '%[3]s'`, safePointName, safePointValue, safePointComment)
tk.MustExec(updateSafePoint)

time1 := time.Now()
time1TS := oracle.GoTimeToTS(time1)
schemaVer1 := tk.Session().GetInfoSchema().SchemaMetaVersion()
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (id int primary key);")
tk.MustExec(`drop table if exists t`)
time.Sleep(50 * time.Millisecond)
time2 := time.Now()
time2TS := oracle.GoTimeToTS(time2)
schemaVer2 := tk.Session().GetInfoSchema().SchemaMetaVersion()

tk2.MustExec("create table test.t (id int primary key)")
tk.MustExec("set @@global.tidb_schema_cache_size = 1073741824")
dom.Reload()

// test set txn as of will flush/mutex tidb_snapshot
tk.MustExec(fmt.Sprintf(`set @@tidb_snapshot="%s"`, time1.Format("2006-1-2 15:04:05.000")))
require.Equal(t, time1TS, tk.Session().GetSessionVars().SnapshotTS)
require.NotNil(t, tk.Session().GetSessionVars().SnapshotInfoschema)
require.Equal(t, schemaVer1, tk.Session().GetInfoSchema().SchemaMetaVersion())
tk.MustExec(fmt.Sprintf(`SET TRANSACTION READ ONLY AS OF TIMESTAMP '%s'`, time2.Format("2006-1-2 15:04:05.000")))
require.Equal(t, uint64(0), tk.Session().GetSessionVars().SnapshotTS)
require.NotNil(t, tk.Session().GetSessionVars().SnapshotInfoschema)
require.Equal(t, time2TS, tk.Session().GetSessionVars().TxnReadTS.PeakTxnReadTS())
require.Equal(t, schemaVer2, tk.Session().GetInfoSchema().SchemaMetaVersion())
}

0 comments on commit 843c4bb

Please sign in to comment.