Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: prepared plan cache support cached plan with placeholder in limit clause #40196

Merged
merged 48 commits into from
Jan 17, 2023
Merged
Show file tree
Hide file tree
Changes from 47 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
eba19fe
commit
fzzf678 Dec 27, 2022
47c4f51
pass stmtNode
fzzf678 Dec 27, 2022
544fb69
fix ut
fzzf678 Dec 28, 2022
9ce7ca2
fix ut
fzzf678 Dec 28, 2022
7d90645
Merge branch 'master' into planCache_limit
fzzf678 Dec 28, 2022
505e9c2
Merge remote-tracking branch 'upstream/master' into planCache_limit
fzzf678 Dec 28, 2022
2c8f7b6
Merge branch 'planCache_limit' of https://github.com/fzzf678/tidb int…
fzzf678 Dec 28, 2022
9a759ee
commit
fzzf678 Dec 28, 2022
83f7a9d
Update plan_cache_utils.go
fzzf678 Dec 28, 2022
b5f17a4
safe value
fzzf678 Dec 28, 2022
07f327f
Merge branch 'master' into planCache_limit
fzzf678 Dec 28, 2022
a7a5844
fix
fzzf678 Dec 28, 2022
d6482e4
Merge branch 'planCache_limit' of https://github.com/fzzf678/tidb int…
fzzf678 Dec 28, 2022
6188f82
Update plan_cache_test.go
fzzf678 Dec 28, 2022
b5b90ed
Merge branch 'master' into planCache_limit
fzzf678 Jan 3, 2023
fbe23e5
unify error message
fzzf678 Jan 3, 2023
190bc29
check limit argument
fzzf678 Jan 3, 2023
f1f73db
unify warning message
fzzf678 Jan 3, 2023
b75e63e
Merge remote-tracking branch 'upstream/master' into planCache_limit
fzzf678 Jan 3, 2023
ed829d1
fix ut
fzzf678 Jan 3, 2023
8244f14
revert
fzzf678 Jan 4, 2023
03cee83
Merge remote-tracking branch 'upstream/master' into planCache_limit
fzzf678 Jan 4, 2023
8258da0
only_int
fzzf678 Jan 4, 2023
51d06a4
Update plan_cache_utils.go
fzzf678 Jan 4, 2023
5d83cea
only int in limit stmt
fzzf678 Jan 5, 2023
aae94e1
fix
fzzf678 Jan 6, 2023
0d6335a
replace int by uint in cache key
fzzf678 Jan 9, 2023
ab99bb7
Merge branch 'master' into planCache_limit
fzzf678 Jan 9, 2023
4305ba1
Update plan_cache_utils_test.go
fzzf678 Jan 9, 2023
cc040cb
Merge branch 'planCache_limit' of https://github.com/fzzf678/tidb int…
fzzf678 Jan 9, 2023
61b1547
Update prepared_test.go
fzzf678 Jan 9, 2023
cd6ca23
Update prepared_test.go
fzzf678 Jan 9, 2023
e921a81
Update plan_cache_test.go
fzzf678 Jan 9, 2023
9c4df35
Update plan_cache_utils.go
fzzf678 Jan 9, 2023
d9eadbd
Merge remote-tracking branch 'upstream/master' into planCache_limit
fzzf678 Jan 11, 2023
9dc74fb
use exist function
fzzf678 Jan 11, 2023
de1d441
Update plan_cache_test.go
fzzf678 Jan 12, 2023
4c05386
move limit params
fzzf678 Jan 13, 2023
67f83dc
Update plan_cache_lru_test.go
fzzf678 Jan 13, 2023
18c5b47
Merge branch 'master' into planCache_limit
fzzf678 Jan 13, 2023
a9ba0e6
fix ut
fzzf678 Jan 13, 2023
e1cf8a0
Merge branch 'planCache_limit' of https://github.com/fzzf678/tidb int…
fzzf678 Jan 13, 2023
e1b33a4
Update plan_cache_lru.go
fzzf678 Jan 13, 2023
8bf7f8b
Merge branch 'master' into planCache_limit
fzzf678 Jan 13, 2023
5a985d7
Merge branch 'master' into planCache_limit
fzzf678 Jan 13, 2023
64c82da
Merge branch 'master' into planCache_limit
fzzf678 Jan 13, 2023
b877d21
fix
fzzf678 Jan 16, 2023
54edf40
Merge branch 'master' into planCache_limit
ti-chi-bot Jan 17, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2017,7 +2017,7 @@ func getUintFromNode(ctx sessionctx.Context, n ast.Node, mustInt64orUint64 bool)
return 0, false, true
}
if mustInt64orUint64 {
if expected := checkParamTypeInt64orUint64(v); !expected {
if expected, _ := CheckParamTypeInt64orUint64(v); !expected {
return 0, false, false
}
}
Expand Down Expand Up @@ -2054,19 +2054,19 @@ func getUintFromNode(ctx sessionctx.Context, n ast.Node, mustInt64orUint64 bool)
return 0, false, false
}

// check param type for plan cache limit, only allow int64 and uint64 now
// CheckParamTypeInt64orUint64 check param type for plan cache limit, only allow int64 and uint64 now
// eg: set @a = 1;
func checkParamTypeInt64orUint64(param *driver.ParamMarkerExpr) bool {
func CheckParamTypeInt64orUint64(param *driver.ParamMarkerExpr) (bool, uint64) {
val := param.GetValue()
switch v := val.(type) {
case int64:
if v >= 0 {
return true
return true, uint64(v)
}
case uint64:
return true
return true, v
}
return false
return false, 0
}

func extractLimitCountOffset(ctx sessionctx.Context, limit *ast.Limit) (count uint64,
Expand Down
19 changes: 11 additions & 8 deletions planner/core/plan_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,15 +165,18 @@ func GetPlanFromSessionPlanCache(ctx context.Context, sctx sessionctx.Context,
return plan, names, err
}
}

limitCountAndOffset, paramErr := ExtractLimitFromAst(stmt.PreparedAst.Stmt, sctx)
if paramErr != nil {
return nil, nil, paramErr
}
if stmtCtx.UseCache { // for non-point plans
if plan, names, ok, err := getCachedPlan(sctx, isNonPrepared, cacheKey, bindSQL, is, stmt,
paramTypes); err != nil || ok {
paramTypes, limitCountAndOffset); err != nil || ok {
return plan, names, err
}
}

return generateNewPlan(ctx, sctx, isNonPrepared, is, stmt, cacheKey, latestSchemaVersion, paramNum, paramTypes, bindSQL)
return generateNewPlan(ctx, sctx, isNonPrepared, is, stmt, cacheKey, latestSchemaVersion, paramNum, paramTypes, bindSQL, limitCountAndOffset)
}

// parseParamTypes get parameters' types in PREPARE statement
Expand Down Expand Up @@ -221,12 +224,12 @@ func getCachedPointPlan(stmt *ast.Prepared, sessVars *variable.SessionVars, stmt
}

func getCachedPlan(sctx sessionctx.Context, isNonPrepared bool, cacheKey kvcache.Key, bindSQL string,
is infoschema.InfoSchema, stmt *PlanCacheStmt, paramTypes []*types.FieldType) (Plan,
is infoschema.InfoSchema, stmt *PlanCacheStmt, paramTypes []*types.FieldType, limitParams []uint64) (Plan,
[]*types.FieldName, bool, error) {
sessVars := sctx.GetSessionVars()
stmtCtx := sessVars.StmtCtx

candidate, exist := sctx.GetPlanCache(isNonPrepared).Get(cacheKey, paramTypes)
candidate, exist := sctx.GetPlanCache(isNonPrepared).Get(cacheKey, paramTypes, limitParams)
if !exist {
return nil, nil, false, nil
}
Expand Down Expand Up @@ -265,7 +268,7 @@ func getCachedPlan(sctx sessionctx.Context, isNonPrepared bool, cacheKey kvcache
// generateNewPlan call the optimizer to generate a new plan for current statement
// and try to add it to cache
func generateNewPlan(ctx context.Context, sctx sessionctx.Context, isNonPrepared bool, is infoschema.InfoSchema, stmt *PlanCacheStmt, cacheKey kvcache.Key, latestSchemaVersion int64, paramNum int,
paramTypes []*types.FieldType, bindSQL string) (Plan, []*types.FieldName, error) {
paramTypes []*types.FieldType, bindSQL string, limitParams []uint64) (Plan, []*types.FieldName, error) {
stmtAst := stmt.PreparedAst
sessVars := sctx.GetSessionVars()
stmtCtx := sessVars.StmtCtx
Expand Down Expand Up @@ -296,11 +299,11 @@ func generateNewPlan(ctx context.Context, sctx sessionctx.Context, isNonPrepared
}
sessVars.IsolationReadEngines[kv.TiFlash] = struct{}{}
}
cached := NewPlanCacheValue(p, names, stmtCtx.TblInfo2UnionScan, paramTypes)
cached := NewPlanCacheValue(p, names, stmtCtx.TblInfo2UnionScan, paramTypes, limitParams)
stmt.NormalizedPlan, stmt.PlanDigest = NormalizePlan(p)
stmtCtx.SetPlan(p)
stmtCtx.SetPlanDigest(stmt.NormalizedPlan, stmt.PlanDigest)
sctx.GetPlanCache(isNonPrepared).Put(cacheKey, cached, paramTypes)
sctx.GetPlanCache(isNonPrepared).Put(cacheKey, cached, paramTypes, limitParams)
}
sessVars.FoundInPlanCache = false
return p, names, err
Expand Down
36 changes: 28 additions & 8 deletions planner/core/plan_cache_lru.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type LRUPlanCache struct {
lock sync.Mutex

// pickFromBucket get one element from bucket. The LRUPlanCache can not work if it is nil
pickFromBucket func(map[*list.Element]struct{}, []*types.FieldType) (*list.Element, bool)
pickFromBucket func(map[*list.Element]struct{}, []*types.FieldType, []uint64) (*list.Element, bool)
// onEvict will be called if any eviction happened, only for test use now
onEvict func(kvcache.Key, kvcache.Value)

Expand All @@ -68,7 +68,7 @@ type LRUPlanCache struct {
// NewLRUPlanCache creates a PCLRUCache object, whose capacity is "capacity".
// NOTE: "capacity" should be a positive value.
func NewLRUPlanCache(capacity uint, guard float64, quota uint64,
pickFromBucket func(map[*list.Element]struct{}, []*types.FieldType) (*list.Element, bool), sctx sessionctx.Context) *LRUPlanCache {
pickFromBucket func(map[*list.Element]struct{}, []*types.FieldType, []uint64) (*list.Element, bool), sctx sessionctx.Context) *LRUPlanCache {
if capacity < 1 {
capacity = 100
logutil.BgLogger().Info("capacity of LRU cache is less than 1, will use default value(100) init cache")
Expand All @@ -94,13 +94,13 @@ func strHashKey(key kvcache.Key, deepCopy bool) string {
}

// Get tries to find the corresponding value according to the given key.
func (l *LRUPlanCache) Get(key kvcache.Key, paramTypes []*types.FieldType) (value kvcache.Value, ok bool) {
func (l *LRUPlanCache) Get(key kvcache.Key, paramTypes []*types.FieldType, limitParams []uint64) (value kvcache.Value, ok bool) {
l.lock.Lock()
defer l.lock.Unlock()

bucket, bucketExist := l.buckets[strHashKey(key, false)]
if bucketExist {
if element, exist := l.pickFromBucket(bucket, paramTypes); exist {
if element, exist := l.pickFromBucket(bucket, paramTypes, limitParams); exist {
l.lruList.MoveToFront(element)
return element.Value.(*planCacheEntry).PlanValue, true
}
Expand All @@ -109,14 +109,14 @@ func (l *LRUPlanCache) Get(key kvcache.Key, paramTypes []*types.FieldType) (valu
}

// Put puts the (key, value) pair into the LRU Cache.
func (l *LRUPlanCache) Put(key kvcache.Key, value kvcache.Value, paramTypes []*types.FieldType) {
func (l *LRUPlanCache) Put(key kvcache.Key, value kvcache.Value, paramTypes []*types.FieldType, limitParams []uint64) {
l.lock.Lock()
defer l.lock.Unlock()

hash := strHashKey(key, true)
bucket, bucketExist := l.buckets[hash]
if bucketExist {
if element, exist := l.pickFromBucket(bucket, paramTypes); exist {
if element, exist := l.pickFromBucket(bucket, paramTypes, limitParams); exist {
l.updateInstanceMetric(&planCacheEntry{PlanKey: key, PlanValue: value}, element.Value.(*planCacheEntry))
element.Value.(*planCacheEntry).PlanValue = value
l.lruList.MoveToFront(element)
Expand Down Expand Up @@ -252,16 +252,36 @@ func (l *LRUPlanCache) memoryControl() {
}

// PickPlanFromBucket pick one plan from bucket
func PickPlanFromBucket(bucket map[*list.Element]struct{}, paramTypes []*types.FieldType) (*list.Element, bool) {
func PickPlanFromBucket(bucket map[*list.Element]struct{}, paramTypes []*types.FieldType, limitParams []uint64) (*list.Element, bool) {
for k := range bucket {
plan := k.Value.(*planCacheEntry).PlanValue.(*PlanCacheValue)
if plan.ParamTypes.CheckTypesCompatibility4PC(paramTypes) {
ok1 := plan.ParamTypes.CheckTypesCompatibility4PC(paramTypes)
if !ok1 {
continue
}
ok2 := checkUint64SliceIfEqual(plan.limitOffsetAndCount, limitParams)
if ok2 {
return k, true
}
}
return nil, false
}

func checkUint64SliceIfEqual(a, b []uint64) bool {
if (a == nil && b != nil) || (a != nil && b == nil) {
return false
}
if len(a) != len(b) {
return false
}
for i := range a {
if a[i] != b[i] {
return false
}
}
return true
}

// updateInstanceMetric update the memory usage and plan num for show in grafana
func (l *LRUPlanCache) updateInstanceMetric(in, out *planCacheEntry) {
updateInstancePlanNum(in, out)
Expand Down
51 changes: 32 additions & 19 deletions planner/core/plan_cache_lru_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,18 @@ func TestLRUPCPut(t *testing.T) {
{types.NewFieldType(mysql.TypeFloat), types.NewFieldType(mysql.TypeLong)},
{types.NewFieldType(mysql.TypeFloat), types.NewFieldType(mysql.TypeInt24)},
}
limitParams := [][]uint64{
{1}, {2}, {3}, {4}, {5},
}

// one key corresponding to multi values
for i := 0; i < 5; i++ {
keys[i] = &planCacheKey{database: strconv.FormatInt(int64(1), 10)}
vals[i] = &PlanCacheValue{
ParamTypes: pTypes[i],
ParamTypes: pTypes[i],
limitOffsetAndCount: limitParams[i],
}
lru.Put(keys[i], vals[i], pTypes[i])
lru.Put(keys[i], vals[i], pTypes[i], limitParams[i])
}
require.Equal(t, lru.size, lru.capacity)
require.Equal(t, uint(3), lru.size)
Expand Down Expand Up @@ -103,7 +107,7 @@ func TestLRUPCPut(t *testing.T) {

bucket, exist := lru.buckets[string(hack.String(keys[i].Hash()))]
require.True(t, exist)
element, exist := lru.pickFromBucket(bucket, pTypes[i])
element, exist := lru.pickFromBucket(bucket, pTypes[i], limitParams[i])
require.NotNil(t, element)
require.True(t, exist)
require.Equal(t, root, element)
Expand Down Expand Up @@ -131,22 +135,25 @@ func TestLRUPCGet(t *testing.T) {
{types.NewFieldType(mysql.TypeFloat), types.NewFieldType(mysql.TypeLong)},
{types.NewFieldType(mysql.TypeFloat), types.NewFieldType(mysql.TypeInt24)},
}
limitParams := [][]uint64{
{1}, {2}, {3}, {4}, {5},
}
// 5 bucket
for i := 0; i < 5; i++ {
keys[i] = &planCacheKey{database: strconv.FormatInt(int64(i%4), 10)}
vals[i] = &PlanCacheValue{ParamTypes: pTypes[i]}
lru.Put(keys[i], vals[i], pTypes[i])
vals[i] = &PlanCacheValue{ParamTypes: pTypes[i], limitOffsetAndCount: limitParams[i]}
lru.Put(keys[i], vals[i], pTypes[i], limitParams[i])
}

// test for non-existent elements
for i := 0; i < 2; i++ {
value, exists := lru.Get(keys[i], pTypes[i])
value, exists := lru.Get(keys[i], pTypes[i], limitParams[i])
require.False(t, exists)
require.Nil(t, value)
}

for i := 2; i < 5; i++ {
value, exists := lru.Get(keys[i], pTypes[i])
value, exists := lru.Get(keys[i], pTypes[i], limitParams[i])
require.True(t, exists)
require.NotNil(t, value)
require.Equal(t, vals[i], value)
Expand Down Expand Up @@ -175,23 +182,29 @@ func TestLRUPCDelete(t *testing.T) {
{types.NewFieldType(mysql.TypeFloat), types.NewFieldType(mysql.TypeEnum)},
{types.NewFieldType(mysql.TypeFloat), types.NewFieldType(mysql.TypeDate)},
}
limitParams := [][]uint64{
{1}, {2}, {3},
}
for i := 0; i < 3; i++ {
keys[i] = &planCacheKey{database: strconv.FormatInt(int64(i), 10)}
vals[i] = &PlanCacheValue{ParamTypes: pTypes[i]}
lru.Put(keys[i], vals[i], pTypes[i])
vals[i] = &PlanCacheValue{
ParamTypes: pTypes[i],
limitOffsetAndCount: limitParams[i],
}
lru.Put(keys[i], vals[i], pTypes[i], []uint64{})
}
require.Equal(t, 3, int(lru.size))

lru.Delete(keys[1])
value, exists := lru.Get(keys[1], pTypes[1])
value, exists := lru.Get(keys[1], pTypes[1], limitParams[1])
require.False(t, exists)
require.Nil(t, value)
require.Equal(t, 2, int(lru.size))

_, exists = lru.Get(keys[0], pTypes[0])
_, exists = lru.Get(keys[0], pTypes[0], limitParams[0])
require.True(t, exists)

_, exists = lru.Get(keys[2], pTypes[2])
_, exists = lru.Get(keys[2], pTypes[2], limitParams[2])
require.True(t, exists)
}

Expand All @@ -207,14 +220,14 @@ func TestLRUPCDeleteAll(t *testing.T) {
for i := 0; i < 3; i++ {
keys[i] = &planCacheKey{database: strconv.FormatInt(int64(i), 10)}
vals[i] = &PlanCacheValue{ParamTypes: pTypes[i]}
lru.Put(keys[i], vals[i], pTypes[i])
lru.Put(keys[i], vals[i], pTypes[i], []uint64{})
}
require.Equal(t, 3, int(lru.size))

lru.DeleteAll()

for i := 0; i < 3; i++ {
value, exists := lru.Get(keys[i], pTypes[i])
value, exists := lru.Get(keys[i], pTypes[i], []uint64{})
require.False(t, exists)
require.Nil(t, value)
require.Equal(t, 0, int(lru.size))
Expand Down Expand Up @@ -242,7 +255,7 @@ func TestLRUPCSetCapacity(t *testing.T) {
for i := 0; i < 5; i++ {
keys[i] = &planCacheKey{database: strconv.FormatInt(int64(1), 10)}
vals[i] = &PlanCacheValue{ParamTypes: pTypes[i]}
lru.Put(keys[i], vals[i], pTypes[i])
lru.Put(keys[i], vals[i], pTypes[i], []uint64{})
}
require.Equal(t, lru.size, lru.capacity)
require.Equal(t, uint(5), lru.size)
Expand Down Expand Up @@ -292,7 +305,7 @@ func TestIssue37914(t *testing.T) {
val := &PlanCacheValue{ParamTypes: pTypes}

require.NotPanics(t, func() {
lru.Put(key, val, pTypes)
lru.Put(key, val, pTypes, []uint64{})
})
}

Expand All @@ -313,7 +326,7 @@ func TestIssue38244(t *testing.T) {
for i := 0; i < 5; i++ {
keys[i] = &planCacheKey{database: strconv.FormatInt(int64(i), 10)}
vals[i] = &PlanCacheValue{ParamTypes: pTypes[i]}
lru.Put(keys[i], vals[i], pTypes[i])
lru.Put(keys[i], vals[i], pTypes[i], []uint64{})
}
require.Equal(t, lru.size, lru.capacity)
require.Equal(t, uint(3), lru.size)
Expand All @@ -334,15 +347,15 @@ func TestLRUPlanCacheMemoryUsage(t *testing.T) {
for i := 0; i < 3; i++ {
k := randomPlanCacheKey()
v := randomPlanCacheValue(pTypes)
lru.Put(k, v, pTypes)
lru.Put(k, v, pTypes, []uint64{})
res += k.MemoryUsage() + v.MemoryUsage()
require.Equal(t, lru.MemoryUsage(), res)
}
// evict
p := &PhysicalTableScan{}
k := &planCacheKey{database: "3"}
v := &PlanCacheValue{Plan: p}
lru.Put(k, v, pTypes)
lru.Put(k, v, pTypes, []uint64{})
res += k.MemoryUsage() + v.MemoryUsage()
for kk, vv := range evict {
res -= kk.(*planCacheKey).MemoryUsage() + vv.(*PlanCacheValue).MemoryUsage()
Expand Down
Loading