From 7604723fa3cff5163a43f8ff3c83d36811884ed3 Mon Sep 17 00:00:00 2001 From: lysu Date: Thu, 15 Nov 2018 15:16:20 +0800 Subject: [PATCH 1/5] planner: cleanup prepare cache when client send dealloc --- planner/core/cache.go | 13 +++++++++++++ session/session.go | 22 ++++++++++++++++++++-- util/kvcache/simple_lru.go | 8 ++++++++ util/kvcache/simple_lru_test.go | 26 ++++++++++++++++++++++++++ 4 files changed, 67 insertions(+), 2 deletions(-) diff --git a/planner/core/cache.go b/planner/core/cache.go index b16a99992a6cf..d1264ecb48b8a 100644 --- a/planner/core/cache.go +++ b/planner/core/cache.go @@ -87,6 +87,19 @@ func (key *pstmtPlanCacheKey) Hash() []byte { return key.hash } +// PstmtCacheKeyMutator is the interface that change Prepared statement's cache key. +type PstmtCacheKeyMutator interface { + SetPstmtIDSchemaVersion(pstmtID uint32, schemaVersion int64) +} + +// SetPstmtIDSchemaVersion implements PstmtCacheKeyMutator interface to change pstmtID and schemaVersion of cacheKey. +// so we can reuse Key instead of new every time. +func (key *pstmtPlanCacheKey) SetPstmtIDSchemaVersion(pstmtID uint32, schemaVersion int64) { + key.pstmtID = pstmtID + key.schemaVersion = schemaVersion + key.hash = nil +} + // NewPSTMTPlanCacheKey creates a new pstmtPlanCacheKey object. func NewPSTMTPlanCacheKey(sessionVars *variable.SessionVars, pstmtID uint32, schemaVersion int64) kvcache.Key { timezoneOffset := 0 diff --git a/session/session.go b/session/session.go index 07e95fe048a74..4e872a938f541 100644 --- a/session/session.go +++ b/session/session.go @@ -164,8 +164,26 @@ func (s *session) getMembufCap() int { func (s *session) cleanRetryInfo() { if !s.sessionVars.RetryInfo.Retrying { retryInfo := s.sessionVars.RetryInfo - for _, stmtID := range retryInfo.DroppedPreparedStmtIDs { - delete(s.sessionVars.PreparedStmts, stmtID) + if len(retryInfo.DroppedPreparedStmtIDs) > 0 { + planCacheEnabled := plannercore.PreparedPlanCacheEnabled() + var cacheKey kvcache.Key + if planCacheEnabled { + firstStmtID := retryInfo.DroppedPreparedStmtIDs[0] + cacheKey = plannercore.NewPSTMTPlanCacheKey( + s.sessionVars, firstStmtID, s.sessionVars.PreparedStmts[firstStmtID].SchemaVersion, + ) + } + for i, stmtID := range retryInfo.DroppedPreparedStmtIDs { + if planCacheEnabled { + if i > 0 { + cacheKey.(plannercore.PstmtCacheKeyMutator).SetPstmtIDSchemaVersion( + stmtID, s.sessionVars.PreparedStmts[stmtID].SchemaVersion, + ) + } + s.PreparedPlanCache().Delete(cacheKey) + } + delete(s.sessionVars.PreparedStmts, stmtID) + } } retryInfo.Clean() } diff --git a/util/kvcache/simple_lru.go b/util/kvcache/simple_lru.go index 4bf04808f7da7..2fc4465592f28 100644 --- a/util/kvcache/simple_lru.go +++ b/util/kvcache/simple_lru.go @@ -89,3 +89,11 @@ func (l *SimpleLRUCache) Put(key Key, value Value) { l.size-- } } + +// Delete delete the key and its from the LRU Cache. +func (l *SimpleLRUCache) Delete(key Key) { + k := string(key.Hash()) + l.cache.Remove(l.elements[k]) + delete(l.elements, k) + l.size-- +} diff --git a/util/kvcache/simple_lru_test.go b/util/kvcache/simple_lru_test.go index e3d73ed1631fd..bf4b99a8c1cf2 100644 --- a/util/kvcache/simple_lru_test.go +++ b/util/kvcache/simple_lru_test.go @@ -143,3 +143,29 @@ func (s *testLRUCacheSuite) TestGet(c *C) { c.Assert(value, Equals, vals[i]) } } + +func (s *testLRUCacheSuite) TestDelete(c *C) { + lru := NewSimpleLRUCache(3) + + keys := make([]*mockCacheKey, 3) + vals := make([]int64, 3) + + for i := 0; i < 3; i++ { + keys[i] = newMockHashKey(int64(i)) + vals[i] = int64(i) + lru.Put(keys[i], vals[i]) + } + c.Assert(int(lru.size), Equals, 3) + + lru.Delete(keys[1]) + value, exists := lru.Get(keys[1]) + c.Assert(exists, IsFalse) + c.Assert(value, IsNil) + c.Assert(int(lru.size), Equals, 2) + + _, exists = lru.Get(keys[0]) + c.Assert(exists, IsTrue) + + _, exists = lru.Get(keys[2]) + c.Assert(exists, IsTrue) +} From 2d974604360b1c102364b18e5e1c3d638d5eba71 Mon Sep 17 00:00:00 2001 From: lysu Date: Thu, 15 Nov 2018 16:54:04 +0800 Subject: [PATCH 2/5] planner: fix close connection panic --- util/kvcache/simple_lru.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/util/kvcache/simple_lru.go b/util/kvcache/simple_lru.go index 2fc4465592f28..3da4ff2c6bbaf 100644 --- a/util/kvcache/simple_lru.go +++ b/util/kvcache/simple_lru.go @@ -93,7 +93,11 @@ func (l *SimpleLRUCache) Put(key Key, value Value) { // Delete delete the key and its from the LRU Cache. func (l *SimpleLRUCache) Delete(key Key) { k := string(key.Hash()) - l.cache.Remove(l.elements[k]) + element := l.elements[k] + if element == nil { + return + } + l.cache.Remove(element) delete(l.elements, k) l.size-- } From d6884eaeae541512560c16f92ee024f2d740766d Mon Sep 17 00:00:00 2001 From: lysu Date: Thu, 15 Nov 2018 17:56:32 +0800 Subject: [PATCH 3/5] planner: add test case and make deallocExec works --- executor/prepared.go | 5 +++++ executor/prepared_test.go | 37 +++++++++++++++++++++++++++++++++++++ util/kvcache/simple_lru.go | 7 ++++++- 3 files changed, 48 insertions(+), 1 deletion(-) diff --git a/executor/prepared.go b/executor/prepared.go index f1c34c14032b3..e7c5d3d340180 100644 --- a/executor/prepared.go +++ b/executor/prepared.go @@ -237,6 +237,11 @@ func (e *DeallocateExec) Next(ctx context.Context, chk *chunk.Chunk) error { return errors.Trace(plannercore.ErrStmtNotFound) } delete(vars.PreparedStmtNameToID, e.Name) + if plannercore.PreparedPlanCacheEnabled() { + e.ctx.PreparedPlanCache().Delete(plannercore.NewPSTMTPlanCacheKey( + vars, id, vars.PreparedStmts[id].SchemaVersion, + )) + } delete(vars.PreparedStmts, id) return nil } diff --git a/executor/prepared_test.go b/executor/prepared_test.go index 1f4771541361a..605745437a848 100644 --- a/executor/prepared_test.go +++ b/executor/prepared_test.go @@ -567,3 +567,40 @@ func (s *testSuite) TestPreparedDelete(c *C) { result.Check(nil) } } + +func (s *testSuite) TestPrepareDealloc(c *C) { + orgEnable := plannercore.PreparedPlanCacheEnabled() + orgCapacity := plannercore.PreparedPlanCacheCapacity + defer func() { + plannercore.SetPreparedPlanCache(orgEnable) + plannercore.PreparedPlanCacheCapacity = orgCapacity + }() + flags := []bool{true} + for _, flag := range flags { + plannercore.SetPreparedPlanCache(flag) + plannercore.PreparedPlanCacheCapacity = 3 + + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists prepare_test") + tk.MustExec("create table prepare_test (id int PRIMARY KEY, c1 int)") + + c.Assert(tk.Se.PreparedPlanCache().Size(), Equals, 0) + tk.MustExec(`prepare stmt1 from 'select * from prepare_test'`) + tk.MustExec("execute stmt1") + tk.MustExec(`prepare stmt2 from 'select * from prepare_test'`) + tk.MustExec("execute stmt2") + tk.MustExec(`prepare stmt3 from 'select * from prepare_test'`) + tk.MustExec("execute stmt3") + tk.MustExec(`prepare stmt4 from 'select * from prepare_test'`) + tk.MustExec("execute stmt4") + c.Assert(tk.Se.PreparedPlanCache().Size(), Equals, 3) + + tk.MustExec("deallocate prepare stmt1") + c.Assert(tk.Se.PreparedPlanCache().Size(), Equals, 3) + tk.MustExec("deallocate prepare stmt2") + tk.MustExec("deallocate prepare stmt3") + tk.MustExec("deallocate prepare stmt4") + c.Assert(tk.Se.PreparedPlanCache().Size(), Equals, 0) + } +} diff --git a/util/kvcache/simple_lru.go b/util/kvcache/simple_lru.go index 3da4ff2c6bbaf..5781795cd4e5e 100644 --- a/util/kvcache/simple_lru.go +++ b/util/kvcache/simple_lru.go @@ -90,7 +90,7 @@ func (l *SimpleLRUCache) Put(key Key, value Value) { } } -// Delete delete the key and its from the LRU Cache. +// Delete deletes the key and its from the LRU Cache. func (l *SimpleLRUCache) Delete(key Key) { k := string(key.Hash()) element := l.elements[k] @@ -101,3 +101,8 @@ func (l *SimpleLRUCache) Delete(key Key) { delete(l.elements, k) l.size-- } + +// Size gets the current cache size. +func (l *SimpleLRUCache) Size() int { + return int(l.size) +} From b8ec5df452ff8e22fef60b5ed5dc4d0535b46899 Mon Sep 17 00:00:00 2001 From: lysu Date: Mon, 19 Nov 2018 12:14:41 +0800 Subject: [PATCH 4/5] address comments --- planner/core/cache.go | 23 +++++++++++----------- session/session.go | 40 +++++++++++++++++++------------------- util/kvcache/simple_lru.go | 2 +- 3 files changed, 33 insertions(+), 32 deletions(-) diff --git a/planner/core/cache.go b/planner/core/cache.go index d1264ecb48b8a..6659d3dc6d97b 100644 --- a/planner/core/cache.go +++ b/planner/core/cache.go @@ -70,12 +70,14 @@ type pstmtPlanCacheKey struct { // Hash implements Key interface. func (key *pstmtPlanCacheKey) Hash() []byte { - if key.hash == nil { + if len(key.hash) == 0 { var ( dbBytes = hack.Slice(key.database) bufferSize = len(dbBytes) + 8*6 ) - key.hash = make([]byte, 0, bufferSize) + if key.hash == nil { + key.hash = make([]byte, 0, bufferSize) + } key.hash = append(key.hash, dbBytes...) key.hash = codec.EncodeInt(key.hash, int64(key.connID)) key.hash = codec.EncodeInt(key.hash, int64(key.pstmtID)) @@ -87,17 +89,16 @@ func (key *pstmtPlanCacheKey) Hash() []byte { return key.hash } -// PstmtCacheKeyMutator is the interface that change Prepared statement's cache key. -type PstmtCacheKeyMutator interface { - SetPstmtIDSchemaVersion(pstmtID uint32, schemaVersion int64) -} - // SetPstmtIDSchemaVersion implements PstmtCacheKeyMutator interface to change pstmtID and schemaVersion of cacheKey. // so we can reuse Key instead of new every time. -func (key *pstmtPlanCacheKey) SetPstmtIDSchemaVersion(pstmtID uint32, schemaVersion int64) { - key.pstmtID = pstmtID - key.schemaVersion = schemaVersion - key.hash = nil +func SetPstmtIDSchemaVersion(key kvcache.Key, pstmtID uint32, schemaVersion int64) { + psStmtKey, isPsStmtKey := key.(*pstmtPlanCacheKey) + if !isPsStmtKey { + return + } + psStmtKey.pstmtID = pstmtID + psStmtKey.schemaVersion = schemaVersion + psStmtKey.hash = psStmtKey.hash[:0] } // NewPSTMTPlanCacheKey creates a new pstmtPlanCacheKey object. diff --git a/session/session.go b/session/session.go index 4e872a938f541..20d09fbc0b7fe 100644 --- a/session/session.go +++ b/session/session.go @@ -162,30 +162,30 @@ func (s *session) getMembufCap() int { } func (s *session) cleanRetryInfo() { - if !s.sessionVars.RetryInfo.Retrying { - retryInfo := s.sessionVars.RetryInfo - if len(retryInfo.DroppedPreparedStmtIDs) > 0 { - planCacheEnabled := plannercore.PreparedPlanCacheEnabled() - var cacheKey kvcache.Key + if s.sessionVars.RetryInfo.Retrying { + return + } + + retryInfo := s.sessionVars.RetryInfo + defer retryInfo.Clean() + if len(retryInfo.DroppedPreparedStmtIDs) > 0 { + planCacheEnabled := plannercore.PreparedPlanCacheEnabled() + var cacheKey kvcache.Key + if planCacheEnabled { + firstStmtID := retryInfo.DroppedPreparedStmtIDs[0] + cacheKey = plannercore.NewPSTMTPlanCacheKey( + s.sessionVars, firstStmtID, s.sessionVars.PreparedStmts[firstStmtID].SchemaVersion, + ) + } + for i, stmtID := range retryInfo.DroppedPreparedStmtIDs { if planCacheEnabled { - firstStmtID := retryInfo.DroppedPreparedStmtIDs[0] - cacheKey = plannercore.NewPSTMTPlanCacheKey( - s.sessionVars, firstStmtID, s.sessionVars.PreparedStmts[firstStmtID].SchemaVersion, - ) - } - for i, stmtID := range retryInfo.DroppedPreparedStmtIDs { - if planCacheEnabled { - if i > 0 { - cacheKey.(plannercore.PstmtCacheKeyMutator).SetPstmtIDSchemaVersion( - stmtID, s.sessionVars.PreparedStmts[stmtID].SchemaVersion, - ) - } - s.PreparedPlanCache().Delete(cacheKey) + if i > 0 { + plannercore.SetPstmtIDSchemaVersion(cacheKey, stmtID, s.sessionVars.PreparedStmts[stmtID].SchemaVersion) } - delete(s.sessionVars.PreparedStmts, stmtID) + s.PreparedPlanCache().Delete(cacheKey) } + delete(s.sessionVars.PreparedStmts, stmtID) } - retryInfo.Clean() } } diff --git a/util/kvcache/simple_lru.go b/util/kvcache/simple_lru.go index 5781795cd4e5e..a9304d378bc42 100644 --- a/util/kvcache/simple_lru.go +++ b/util/kvcache/simple_lru.go @@ -90,7 +90,7 @@ func (l *SimpleLRUCache) Put(key Key, value Value) { } } -// Delete deletes the key and its from the LRU Cache. +// Delete deletes the key-value pair from the LRU Cache. func (l *SimpleLRUCache) Delete(key Key) { k := string(key.Hash()) element := l.elements[k] From f86bd693a7a8f131e33a7a70449f535874b8fea5 Mon Sep 17 00:00:00 2001 From: lysu Date: Mon, 19 Nov 2018 13:41:33 +0800 Subject: [PATCH 5/5] address comments --- executor/prepared_test.go | 51 ++++++++++++++++++--------------------- session/session.go | 32 ++++++++++++------------ 2 files changed, 41 insertions(+), 42 deletions(-) diff --git a/executor/prepared_test.go b/executor/prepared_test.go index 605745437a848..37b475d153eef 100644 --- a/executor/prepared_test.go +++ b/executor/prepared_test.go @@ -575,32 +575,29 @@ func (s *testSuite) TestPrepareDealloc(c *C) { plannercore.SetPreparedPlanCache(orgEnable) plannercore.PreparedPlanCacheCapacity = orgCapacity }() - flags := []bool{true} - for _, flag := range flags { - plannercore.SetPreparedPlanCache(flag) - plannercore.PreparedPlanCacheCapacity = 3 - - tk := testkit.NewTestKit(c, s.store) - tk.MustExec("use test") - tk.MustExec("drop table if exists prepare_test") - tk.MustExec("create table prepare_test (id int PRIMARY KEY, c1 int)") + plannercore.SetPreparedPlanCache(true) + plannercore.PreparedPlanCacheCapacity = 3 - c.Assert(tk.Se.PreparedPlanCache().Size(), Equals, 0) - tk.MustExec(`prepare stmt1 from 'select * from prepare_test'`) - tk.MustExec("execute stmt1") - tk.MustExec(`prepare stmt2 from 'select * from prepare_test'`) - tk.MustExec("execute stmt2") - tk.MustExec(`prepare stmt3 from 'select * from prepare_test'`) - tk.MustExec("execute stmt3") - tk.MustExec(`prepare stmt4 from 'select * from prepare_test'`) - tk.MustExec("execute stmt4") - c.Assert(tk.Se.PreparedPlanCache().Size(), Equals, 3) - - tk.MustExec("deallocate prepare stmt1") - c.Assert(tk.Se.PreparedPlanCache().Size(), Equals, 3) - tk.MustExec("deallocate prepare stmt2") - tk.MustExec("deallocate prepare stmt3") - tk.MustExec("deallocate prepare stmt4") - c.Assert(tk.Se.PreparedPlanCache().Size(), Equals, 0) - } + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists prepare_test") + tk.MustExec("create table prepare_test (id int PRIMARY KEY, c1 int)") + + c.Assert(tk.Se.PreparedPlanCache().Size(), Equals, 0) + tk.MustExec(`prepare stmt1 from 'select * from prepare_test'`) + tk.MustExec("execute stmt1") + tk.MustExec(`prepare stmt2 from 'select * from prepare_test'`) + tk.MustExec("execute stmt2") + tk.MustExec(`prepare stmt3 from 'select * from prepare_test'`) + tk.MustExec("execute stmt3") + tk.MustExec(`prepare stmt4 from 'select * from prepare_test'`) + tk.MustExec("execute stmt4") + c.Assert(tk.Se.PreparedPlanCache().Size(), Equals, 3) + + tk.MustExec("deallocate prepare stmt1") + c.Assert(tk.Se.PreparedPlanCache().Size(), Equals, 3) + tk.MustExec("deallocate prepare stmt2") + tk.MustExec("deallocate prepare stmt3") + tk.MustExec("deallocate prepare stmt4") + c.Assert(tk.Se.PreparedPlanCache().Size(), Equals, 0) } diff --git a/session/session.go b/session/session.go index 20d09fbc0b7fe..edf3f80fa31b1 100644 --- a/session/session.go +++ b/session/session.go @@ -168,24 +168,26 @@ func (s *session) cleanRetryInfo() { retryInfo := s.sessionVars.RetryInfo defer retryInfo.Clean() - if len(retryInfo.DroppedPreparedStmtIDs) > 0 { - planCacheEnabled := plannercore.PreparedPlanCacheEnabled() - var cacheKey kvcache.Key + if len(retryInfo.DroppedPreparedStmtIDs) == 0 { + return + } + + planCacheEnabled := plannercore.PreparedPlanCacheEnabled() + var cacheKey kvcache.Key + if planCacheEnabled { + firstStmtID := retryInfo.DroppedPreparedStmtIDs[0] + cacheKey = plannercore.NewPSTMTPlanCacheKey( + s.sessionVars, firstStmtID, s.sessionVars.PreparedStmts[firstStmtID].SchemaVersion, + ) + } + for i, stmtID := range retryInfo.DroppedPreparedStmtIDs { if planCacheEnabled { - firstStmtID := retryInfo.DroppedPreparedStmtIDs[0] - cacheKey = plannercore.NewPSTMTPlanCacheKey( - s.sessionVars, firstStmtID, s.sessionVars.PreparedStmts[firstStmtID].SchemaVersion, - ) - } - for i, stmtID := range retryInfo.DroppedPreparedStmtIDs { - if planCacheEnabled { - if i > 0 { - plannercore.SetPstmtIDSchemaVersion(cacheKey, stmtID, s.sessionVars.PreparedStmts[stmtID].SchemaVersion) - } - s.PreparedPlanCache().Delete(cacheKey) + if i > 0 { + plannercore.SetPstmtIDSchemaVersion(cacheKey, stmtID, s.sessionVars.PreparedStmts[stmtID].SchemaVersion) } - delete(s.sessionVars.PreparedStmts, stmtID) + s.PreparedPlanCache().Delete(cacheKey) } + delete(s.sessionVars.PreparedStmts, stmtID) } }