Skip to content

Commit

Permalink
schema cache: cache schema version by timestamp (#40768)
Browse files Browse the repository at this point in the history
close #40740
  • Loading branch information
hihihuhu authored Feb 6, 2023
1 parent ba41d92 commit 7b80380
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 26 deletions.
79 changes: 56 additions & 23 deletions infoschema/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package infoschema

import (
"fmt"
"sort"
"sync"

Expand All @@ -36,22 +37,27 @@ var (
// It only promised to cache the infoschema, if it is newer than all the cached.
type InfoCache struct {
mu sync.RWMutex
// cache is sorted by SchemaVersion in descending order
cache []InfoSchema
// record SnapshotTS of the latest schema Insert.
maxUpdatedSnapshotTS uint64
// cache is sorted by both SchemaVersion and timestamp in descending order, assume they have same order
cache []schemaAndTimestamp
}

type schemaAndTimestamp struct {
infoschema InfoSchema
timestamp int64
}

// NewCache creates a new InfoCache.
func NewCache(capacity int) *InfoCache {
return &InfoCache{cache: make([]InfoSchema, 0, capacity)}
return &InfoCache{
cache: make([]schemaAndTimestamp, 0, capacity),
}
}

// Reset resets the cache.
func (h *InfoCache) Reset(capacity int) {
h.mu.Lock()
defer h.mu.Unlock()
h.cache = make([]InfoSchema, 0, capacity)
h.cache = make([]schemaAndTimestamp, 0, capacity)
}

// GetLatest gets the newest information schema.
Expand All @@ -61,18 +67,40 @@ func (h *InfoCache) GetLatest() InfoSchema {
getLatestCounter.Inc()
if len(h.cache) > 0 {
hitLatestCounter.Inc()
return h.cache[0]
return h.cache[0].infoschema
}
return nil
}

// GetSchemaByTimestamp returns the schema used at the specific timestamp
func (h *InfoCache) GetSchemaByTimestamp(ts uint64) (InfoSchema, error) {
h.mu.RLock()
defer h.mu.RUnlock()
return h.getSchemaByTimestampNoLock(ts)
}

func (h *InfoCache) getSchemaByTimestampNoLock(ts uint64) (InfoSchema, error) {
i := sort.Search(len(h.cache), func(i int) bool {
return uint64(h.cache[i].timestamp) <= ts
})
if i < len(h.cache) {
return h.cache[i].infoschema, nil
}

return nil, fmt.Errorf("no schema cached for timestamp %d", ts)
}

// GetByVersion gets the information schema based on schemaVersion. Returns nil if it is not loaded.
func (h *InfoCache) GetByVersion(version int64) InfoSchema {
h.mu.RLock()
defer h.mu.RUnlock()
return h.getByVersionNoLock(version)
}

func (h *InfoCache) getByVersionNoLock(version int64) InfoSchema {
getVersionCounter.Inc()
i := sort.Search(len(h.cache), func(i int) bool {
return h.cache[i].SchemaMetaVersion() <= version
return h.cache[i].infoschema.SchemaMetaVersion() <= version
})

// `GetByVersion` is allowed to load the latest schema that is less than argument `version`.
Expand All @@ -93,9 +121,9 @@ func (h *InfoCache) GetByVersion(version int64) InfoSchema {
// }
// ```

if i < len(h.cache) && (i != 0 || h.cache[i].SchemaMetaVersion() == version) {
if i < len(h.cache) && (i != 0 || h.cache[i].infoschema.SchemaMetaVersion() == version) {
hitVersionCounter.Inc()
return h.cache[i]
return h.cache[i].infoschema
}
return nil
}
Expand All @@ -108,11 +136,9 @@ func (h *InfoCache) GetBySnapshotTS(snapshotTS uint64) InfoSchema {
defer h.mu.RUnlock()

getTSCounter.Inc()
if snapshotTS >= h.maxUpdatedSnapshotTS {
if len(h.cache) > 0 {
hitTSCounter.Inc()
return h.cache[0]
}
if schema, err := h.getSchemaByTimestampNoLock(snapshotTS); err == nil {
hitTSCounter.Inc()
return schema
}
return nil
}
Expand All @@ -125,29 +151,36 @@ func (h *InfoCache) Insert(is InfoSchema, snapshotTS uint64) bool {
defer h.mu.Unlock()

version := is.SchemaMetaVersion()

// assume this is the timestamp order as well
i := sort.Search(len(h.cache), func(i int) bool {
return h.cache[i].SchemaMetaVersion() <= version
return h.cache[i].infoschema.SchemaMetaVersion() <= version
})

if h.maxUpdatedSnapshotTS < snapshotTS {
h.maxUpdatedSnapshotTS = snapshotTS
}

// cached entry
if i < len(h.cache) && h.cache[i].SchemaMetaVersion() == version {
if i < len(h.cache) && h.cache[i].infoschema.SchemaMetaVersion() == version {
if h.cache[i].timestamp > int64(snapshotTS) {
h.cache[i].timestamp = int64(snapshotTS)
}
return true
}

if len(h.cache) < cap(h.cache) {
// has free space, grown the slice
h.cache = h.cache[:len(h.cache)+1]
copy(h.cache[i+1:], h.cache[i:])
h.cache[i] = is
h.cache[i] = schemaAndTimestamp{
infoschema: is,
timestamp: int64(snapshotTS),
}
return true
} else if i < len(h.cache) {
// drop older schema
copy(h.cache[i+1:], h.cache[i:])
h.cache[i] = is
h.cache[i] = schemaAndTimestamp{
infoschema: is,
timestamp: int64(snapshotTS),
}
return true
}
// older than all cached schemas, refuse to cache it
Expand Down
64 changes: 61 additions & 3 deletions infoschema/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestInsert(t *testing.T) {
ic.Insert(is5, 5)
require.Equal(t, is5, ic.GetByVersion(5))
require.Equal(t, is2, ic.GetByVersion(2))
require.Nil(t, ic.GetBySnapshotTS(2))
require.Equal(t, is2, ic.GetBySnapshotTS(2))
require.Equal(t, is5, ic.GetBySnapshotTS(10))

// older
Expand All @@ -59,7 +59,7 @@ func TestInsert(t *testing.T) {
require.Equal(t, is5, ic.GetByVersion(5))
require.Equal(t, is2, ic.GetByVersion(2))
require.Nil(t, ic.GetByVersion(0))
require.Nil(t, ic.GetBySnapshotTS(2))
require.Equal(t, is2, ic.GetBySnapshotTS(2))
require.Equal(t, is6, ic.GetBySnapshotTS(10))

// replace 2, drop 2
Expand Down Expand Up @@ -91,7 +91,7 @@ func TestInsert(t *testing.T) {
require.Nil(t, ic.GetByVersion(2))
require.Nil(t, ic.GetByVersion(0))
require.Nil(t, ic.GetBySnapshotTS(2))
require.Nil(t, ic.GetBySnapshotTS(5))
require.Equal(t, is5, ic.GetBySnapshotTS(5))
require.Equal(t, is6, ic.GetBySnapshotTS(10))
}

Expand Down Expand Up @@ -129,3 +129,61 @@ func TestGetLatest(t *testing.T) {
ic.Insert(is0, 0)
require.Equal(t, is2, ic.GetLatest())
}

func TestGetByTimestamp(t *testing.T) {
ic := infoschema.NewCache(16)
require.NotNil(t, ic)
require.Nil(t, ic.GetLatest())

is1 := infoschema.MockInfoSchemaWithSchemaVer(nil, 1)
ic.Insert(is1, 1)
require.Equal(t, is1, ic.GetLatest())
_, err := ic.GetSchemaByTimestamp(0)
require.NotNil(t, err)
schema, err := ic.GetSchemaByTimestamp(1)
require.Nil(t, err)
require.Equal(t, int64(1), schema.SchemaMetaVersion())
require.Equal(t, is1, ic.GetBySnapshotTS(1))
schema, err = ic.GetSchemaByTimestamp(2)
require.Nil(t, err)
require.Equal(t, int64(1), schema.SchemaMetaVersion())
require.Equal(t, is1, ic.GetBySnapshotTS(2))

is2 := infoschema.MockInfoSchemaWithSchemaVer(nil, 2)
ic.Insert(is2, 2)
require.Equal(t, is2, ic.GetLatest())
_, err = ic.GetSchemaByTimestamp(0)
require.NotNil(t, err)
schema, err = ic.GetSchemaByTimestamp(1)
require.Nil(t, err)
require.Equal(t, int64(1), schema.SchemaMetaVersion())
require.Equal(t, is1, ic.GetBySnapshotTS(1))
schema, err = ic.GetSchemaByTimestamp(2)
require.Nil(t, err)
require.Equal(t, int64(2), schema.SchemaMetaVersion())
require.Equal(t, is2, ic.GetBySnapshotTS(2))
schema, err = ic.GetSchemaByTimestamp(3)
require.Nil(t, err)
require.Equal(t, int64(2), schema.SchemaMetaVersion())
require.Equal(t, is2, ic.GetBySnapshotTS(3))

is0 := infoschema.MockInfoSchemaWithSchemaVer(nil, 0)
ic.Insert(is0, 0)
require.Equal(t, is2, ic.GetLatest())
schema, err = ic.GetSchemaByTimestamp(0)
require.Nil(t, err)
require.Equal(t, int64(0), schema.SchemaMetaVersion())
require.Equal(t, is0, ic.GetBySnapshotTS(0))
schema, err = ic.GetSchemaByTimestamp(1)
require.Nil(t, err)
require.Equal(t, int64(1), schema.SchemaMetaVersion())
require.Equal(t, is1, ic.GetBySnapshotTS(1))
schema, err = ic.GetSchemaByTimestamp(2)
require.Nil(t, err)
require.Equal(t, int64(2), schema.SchemaMetaVersion())
require.Equal(t, is2, ic.GetBySnapshotTS(2))
schema, err = ic.GetSchemaByTimestamp(3)
require.Nil(t, err)
require.Equal(t, int64(2), schema.SchemaMetaVersion())
require.Equal(t, is2, ic.GetBySnapshotTS(3))
}

0 comments on commit 7b80380

Please sign in to comment.