Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

infoschema: fix a snapshot infoschema cache bug after v1 v2 switch #54966

Merged
merged 2 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion pkg/infoschema/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ type InfoCache struct {

r autoid.Requirement
Data *Data

// first known schema version records the first known schema version, all schemas between [firstKnownSchemaVersion, latest)
// are known as long as we keep the DDL history correctly.
firstKnownSchemaVersion int64
}

type schemaAndTimestamp struct {
Expand Down Expand Up @@ -97,6 +101,7 @@ func (h *InfoCache) Upsert(is InfoSchema, schemaTS uint64) func() {
infoschema: is,
timestamp: int64(schemaTS),
})
h.firstKnownSchemaVersion = is.SchemaMetaVersion()

return func() {
// TODO: It's a bit tricky here, somewhere is holding the reference of the old infoschema.
Expand Down Expand Up @@ -223,8 +228,28 @@ func (h *InfoCache) getByVersionNoLock(version int64) InfoSchema {
// return h.cache[i]
// }
// ```
// upsert is a full reset of InfoCache, after upsert, the DDL history might lost and the assumption does not hold anymore.
// For example:
// Before
// infoschema 51
// infoschema 52
// infoschema 53
// infoschema 54
// infoschema 55
// infoschema 56
// After Upsert()
// infoschema 56
// Then load historial snapshot version 51
// infoschema 51
// infoschema 56
// Now, request for schema version 55, return infoschem 51 would be wrong!
//
if i == len(h.cache) {
return nil
}

if i < len(h.cache) && (i != 0 || h.cache[i].infoschema.SchemaMetaVersion() == version) {
if h.cache[i].infoschema.SchemaMetaVersion() == version ||
(i != 0 && h.cache[i].infoschema.SchemaMetaVersion() >= h.firstKnownSchemaVersion) {
infoschema_metrics.HitVersionCounter.Inc()
return h.cache[i].infoschema
}
Expand Down Expand Up @@ -289,6 +314,9 @@ func (h *InfoCache) Insert(is InfoSchema, schemaTS uint64) bool {
infoschema: is,
timestamp: int64(schemaTS),
}
if len(h.cache) == 1 {
h.firstKnownSchemaVersion = is.SchemaMetaVersion()
}
} else if i < len(h.cache) {
// drop older schema
copy(h.cache[i+1:], h.cache[i:])
Expand Down
3 changes: 2 additions & 1 deletion pkg/infoschema/test/infoschemav2test/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ go_test(
"v2_test.go",
],
flaky = True,
shard_count = 7,
shard_count = 8,
deps = [
"//pkg/domain",
"//pkg/domain/infosync",
Expand All @@ -21,6 +21,7 @@ go_test(
"//pkg/testkit/testfailpoint",
"//pkg/testkit/testsetup",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//oracle",
"@org_uber_go_goleak//:goleak",
],
)
43 changes: 43 additions & 0 deletions pkg/infoschema/test/infoschemav2test/v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/testkit/testfailpoint"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
)

func TestSpecialSchemas(t *testing.T) {
Expand Down Expand Up @@ -375,3 +376,45 @@ func TestFullLoadAndSnapshot(t *testing.T) {
tk.MustExec("use db1")
tk.MustQuery("show tables").Check(testkit.Rows("t"))
}

func TestIssue54926(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk2 := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set @@global.tidb_schema_cache_size = 0")
// For mocktikv, safe point is not initialized, we manually insert it for snapshot to use.
safePointName := "tikv_gc_safe_point"
safePointValue := "20160102-15:04:05 -0700"
safePointComment := "All versions after safe point can be accessed. (DO NOT EDIT)"
updateSafePoint := fmt.Sprintf(`INSERT INTO mysql.tidb VALUES ('%[1]s', '%[2]s', '%[3]s')
ON DUPLICATE KEY
UPDATE variable_value = '%[2]s', comment = '%[3]s'`, safePointName, safePointValue, safePointComment)
tk.MustExec(updateSafePoint)

time1 := time.Now()
time1TS := oracle.GoTimeToTS(time1)
schemaVer1 := tk.Session().GetInfoSchema().SchemaMetaVersion()
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (id int primary key);")
tk.MustExec(`drop table if exists t`)
time.Sleep(50 * time.Millisecond)
time2 := time.Now()
time2TS := oracle.GoTimeToTS(time2)
schemaVer2 := tk.Session().GetInfoSchema().SchemaMetaVersion()

tk2.MustExec("create table test.t (id int primary key)")
tk.MustExec("set @@global.tidb_schema_cache_size = 1073741824")
dom.Reload()

// test set txn as of will flush/mutex tidb_snapshot
tk.MustExec(fmt.Sprintf(`set @@tidb_snapshot="%s"`, time1.Format("2006-1-2 15:04:05.000")))
require.Equal(t, time1TS, tk.Session().GetSessionVars().SnapshotTS)
require.NotNil(t, tk.Session().GetSessionVars().SnapshotInfoschema)
require.Equal(t, schemaVer1, tk.Session().GetInfoSchema().SchemaMetaVersion())
tk.MustExec(fmt.Sprintf(`SET TRANSACTION READ ONLY AS OF TIMESTAMP '%s'`, time2.Format("2006-1-2 15:04:05.000")))
require.Equal(t, uint64(0), tk.Session().GetSessionVars().SnapshotTS)
require.NotNil(t, tk.Session().GetSessionVars().SnapshotInfoschema)
require.Equal(t, time2TS, tk.Session().GetSessionVars().TxnReadTS.PeakTxnReadTS())
require.Equal(t, schemaVer2, tk.Session().GetInfoSchema().SchemaMetaVersion())
}