Skip to content

Commit

Permalink
*: support batch create/drop bindings from plan/sql digest (#55315)
Browse files Browse the repository at this point in the history
close #55343
  • Loading branch information
time-and-fate authored Aug 12, 2024
1 parent 6a34088 commit 1f40cb0
Show file tree
Hide file tree
Showing 16 changed files with 6,917 additions and 6,477 deletions.
2 changes: 1 addition & 1 deletion pkg/bindinfo/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (h *globalBindingHandle) CaptureBaselines() {
SQLDigest: digest.String(),
}
// We don't need to pass the `sctx` because the BindSQL has been validated already.
err = h.CreateGlobalBinding(nil, binding)
err = h.CreateGlobalBinding(nil, []*Binding{&binding})
if err != nil {
logutil.BindLogger().Debug("create bind record failed in baseline capture", zap.String("SQL", bindableStmt.Query), zap.Error(err))
}
Expand Down
119 changes: 74 additions & 45 deletions pkg/bindinfo/global_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ type GlobalBindingHandle interface {

// CreateGlobalBinding creates a Bindings to the storage and the cache.
// It replaces all the exists bindings for the same normalized SQL.
CreateGlobalBinding(sctx sessionctx.Context, binding Binding) (err error)
CreateGlobalBinding(sctx sessionctx.Context, bindings []*Binding) (err error)

// DropGlobalBinding drop Bindings to the storage and Bindings int the cache.
DropGlobalBinding(sqlDigest string) (deletedRows uint64, err error)
DropGlobalBinding(sqlDigests []string) (deletedRows uint64, err error)

// SetGlobalBindingStatus set a Bindings's status to the storage and bind cache.
SetGlobalBindingStatus(newStatus, sqlDigest string) (ok bool, err error)
Expand Down Expand Up @@ -256,9 +256,11 @@ func (h *globalBindingHandle) LoadFromStorageToCache(fullLoad bool) (err error)

// CreateGlobalBinding creates a Bindings to the storage and the cache.
// It replaces all the exists bindings for the same normalized SQL.
func (h *globalBindingHandle) CreateGlobalBinding(sctx sessionctx.Context, binding Binding) (err error) {
if err := prepareHints(sctx, &binding); err != nil {
return err
func (h *globalBindingHandle) CreateGlobalBinding(sctx sessionctx.Context, bindings []*Binding) (err error) {
for _, binding := range bindings {
if err := prepareHints(sctx, binding); err != nil {
return err
}
}
defer func() {
if err == nil {
Expand All @@ -272,71 +274,98 @@ func (h *globalBindingHandle) CreateGlobalBinding(sctx sessionctx.Context, bindi
return err
}

now := types.NewTime(types.FromGoTime(time.Now()), mysql.TypeTimestamp, 3)

updateTs := now.String()
_, err = exec(sctx, `UPDATE mysql.bind_info SET status = %?, update_time = %? WHERE original_sql = %? AND update_time < %?`,
deleted, updateTs, binding.OriginalSQL, updateTs)
if err != nil {
return err
}
for i, binding := range bindings {
now := types.NewTime(types.FromGoTime(time.Now()), mysql.TypeTimestamp, 3)

updateTs := now.String()
_, err = exec(
sctx,
`UPDATE mysql.bind_info SET status = %?, update_time = %? WHERE original_sql = %? AND update_time < %?`,
deleted,
updateTs,
binding.OriginalSQL,
updateTs,
)
if err != nil {
return err
}

binding.CreateTime = now
binding.UpdateTime = now

// Insert the Bindings to the storage.
_, err = exec(sctx, `INSERT INTO mysql.bind_info VALUES (%?,%?, %?, %?, %?, %?, %?, %?, %?, %?, %?)`,
binding.OriginalSQL,
binding.BindSQL,
strings.ToLower(binding.Db),
binding.Status,
binding.CreateTime.String(),
binding.UpdateTime.String(),
binding.Charset,
binding.Collation,
binding.Source,
binding.SQLDigest,
binding.PlanDigest,
)
if err != nil {
return err
binding.CreateTime = now
binding.UpdateTime = now

// Insert the Bindings to the storage.
_, err = exec(
sctx,
`INSERT INTO mysql.bind_info VALUES (%?,%?, %?, %?, %?, %?, %?, %?, %?, %?, %?)`,
binding.OriginalSQL,
binding.BindSQL,
strings.ToLower(binding.Db),
binding.Status,
binding.CreateTime.String(),
binding.UpdateTime.String(),
binding.Charset,
binding.Collation,
binding.Source,
binding.SQLDigest,
binding.PlanDigest,
)
failpoint.Inject("CreateGlobalBindingNthFail", func(val failpoint.Value) {
n := val.(int)
if n == i {
err = errors.NewNoStackErrorf("An injected error")
}
})
if err != nil {
return err
}
}
return nil
})
}

// dropGlobalBinding drops a Bindings to the storage and Bindings int the cache.
func (h *globalBindingHandle) dropGlobalBinding(sqlDigest string) (deletedRows uint64, err error) {
err = h.callWithSCtx(false, func(sctx sessionctx.Context) error {
func (h *globalBindingHandle) dropGlobalBinding(sqlDigests []string) (deletedRows uint64, err error) {
err = h.callWithSCtx(true, func(sctx sessionctx.Context) error {
// Lock mysql.bind_info to synchronize with CreateBinding / AddBinding / DropBinding on other tidb instances.
if err = lockBindInfoTable(sctx); err != nil {
return err
}

updateTs := types.NewTime(types.FromGoTime(time.Now()), mysql.TypeTimestamp, 3).String()

_, err = exec(sctx, `UPDATE mysql.bind_info SET status = %?, update_time = %? WHERE sql_digest = %? AND update_time < %? AND status != %?`,
deleted, updateTs, sqlDigest, updateTs, deleted)
if err != nil {
return err
for _, sqlDigest := range sqlDigests {
updateTs := types.NewTime(types.FromGoTime(time.Now()), mysql.TypeTimestamp, 3).String()
_, err = exec(
sctx,
`UPDATE mysql.bind_info SET status = %?, update_time = %? WHERE sql_digest = %? AND update_time < %? AND status != %?`,
deleted,
updateTs,
sqlDigest,
updateTs,
deleted,
)
if err != nil {
return err
}
deletedRows += sctx.GetSessionVars().StmtCtx.AffectedRows()
}
deletedRows = sctx.GetSessionVars().StmtCtx.AffectedRows()
return nil
})
if err != nil {
deletedRows = 0
}
return
}

// DropGlobalBinding drop Bindings to the storage and Bindings int the cache.
func (h *globalBindingHandle) DropGlobalBinding(sqlDigest string) (deletedRows uint64, err error) {
if sqlDigest == "" {
func (h *globalBindingHandle) DropGlobalBinding(sqlDigests []string) (deletedRows uint64, err error) {
if len(sqlDigests) == 0 {
return 0, errors.New("sql digest is empty")
}
defer func() {
if err == nil {
err = h.LoadFromStorageToCache(false)
}
}()
return h.dropGlobalBinding(sqlDigest)
return h.dropGlobalBinding(sqlDigests)
}

// SetGlobalBindingStatus set a Bindings's status to the storage and bind cache.
Expand Down Expand Up @@ -452,7 +481,7 @@ func (h *globalBindingHandle) DropInvalidGlobalBinding() {
invalidBindings := h.invalidBindings.getAll()
h.invalidBindings.reset()
for _, invalidBinding := range invalidBindings {
if _, err := h.dropGlobalBinding(invalidBinding.SQLDigest); err != nil {
if _, err := h.dropGlobalBinding([]string{invalidBinding.SQLDigest}); err != nil {
logutil.BindLogger().Debug("flush bind record failed", zap.Error(err))
}
}
Expand Down
41 changes: 26 additions & 15 deletions pkg/bindinfo/session_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package bindinfo
import (
"context"
"encoding/json"
"slices"
"strings"
"time"

Expand All @@ -35,10 +36,10 @@ import (
// SessionBindingHandle is used to handle all session sql bind operations.
type SessionBindingHandle interface {
// CreateSessionBinding creates a binding to the cache.
CreateSessionBinding(sctx sessionctx.Context, binding Binding) (err error)
CreateSessionBinding(sctx sessionctx.Context, bindings []*Binding) (err error)

// DropSessionBinding drops a binding by the sql digest.
DropSessionBinding(sqlDigest string) error
DropSessionBinding(sqlDigests []string) error

// MatchSessionBinding returns the matched binding for this statement.
MatchSessionBinding(sctx sessionctx.Context, fuzzyDigest string, tableNames []*ast.TableName) (matchedBinding Binding, isMatched bool)
Expand Down Expand Up @@ -75,26 +76,36 @@ func (h *sessionBindingHandle) appendSessionBinding(sqlDigest string, meta Bindi

// CreateSessionBinding creates a Bindings to the cache.
// It replaces all the exists bindings for the same normalized SQL.
func (h *sessionBindingHandle) CreateSessionBinding(sctx sessionctx.Context, binding Binding) (err error) {
if err := prepareHints(sctx, &binding); err != nil {
return err
func (h *sessionBindingHandle) CreateSessionBinding(sctx sessionctx.Context, bindings []*Binding) (err error) {
for _, binding := range bindings {
if err := prepareHints(sctx, binding); err != nil {
return err
}
}
for _, binding := range bindings {
binding.Db = strings.ToLower(binding.Db)
now := types.NewTime(
types.FromGoTime(time.Now().In(sctx.GetSessionVars().StmtCtx.TimeZone())),
mysql.TypeTimestamp,
3,
)
binding.CreateTime = now
binding.UpdateTime = now

// update the BindMeta to the cache.
h.appendSessionBinding(parser.DigestNormalized(binding.OriginalSQL).String(), []Binding{*binding})
}
binding.Db = strings.ToLower(binding.Db)
now := types.NewTime(types.FromGoTime(time.Now().In(sctx.GetSessionVars().StmtCtx.TimeZone())), mysql.TypeTimestamp, 3)
binding.CreateTime = now
binding.UpdateTime = now

// update the BindMeta to the cache.
h.appendSessionBinding(parser.DigestNormalized(binding.OriginalSQL).String(), []Binding{binding})
return nil
}

// DropSessionBinding drop Bindings in the cache.
func (h *sessionBindingHandle) DropSessionBinding(sqlDigest string) error {
if sqlDigest == "" {
func (h *sessionBindingHandle) DropSessionBinding(sqlDigests []string) error {
if slices.Contains(sqlDigests, "") {
return errors.New("sql digest is empty")
}
h.ch.RemoveBinding(sqlDigest)
for _, sqlDigest := range sqlDigests {
h.ch.RemoveBinding(sqlDigest)
}
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/bindinfo/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ go_test(
],
flaky = True,
race = "on",
shard_count = 18,
shard_count = 19,
deps = [
"//pkg/bindinfo",
"//pkg/bindinfo/internal",
Expand Down
62 changes: 61 additions & 1 deletion pkg/bindinfo/tests/bind_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ package tests
import (
"context"
"fmt"
"math/rand"
"strconv"
"strings"
"testing"

"github.com/pingcap/tidb/pkg/bindinfo"
Expand Down Expand Up @@ -893,12 +895,42 @@ func removeAllBindings(tk *testkit.TestKit, global bool) {
scope = "global"
}
res := showBinding(tk, fmt.Sprintf("show %v bindings", scope))
digests := make([]string, 0, len(res))
for _, r := range res {
if r[4] == "builtin" {
continue
}
tk.MustExec(fmt.Sprintf("drop %v binding for sql digest '%v'", scope, r[5]))
digests = append(digests, r[5].(string))
}
if len(digests) == 0 {
return
}
// test DROP BINDING FOR SQL DIGEST can handle empty strings correctly
digests = append(digests, "", "", "")
// randomly split digests into 4 groups using random number
// shuffle the slice
rand.Shuffle(len(digests), func(i, j int) {
digests[i], digests[j] = digests[j], digests[i]
})
split := make([][]string, 4)
for i, d := range digests {
split[i%4] = append(split[i%4], d)
}
// group 0: wrap with ' then connect by ,
var g0 string
for _, d := range split[0] {
g0 += "'" + d + "',"
}
// group 1: connect by , and set into a user variable
tk.MustExec(fmt.Sprintf("set @a = '%v'", strings.Join(split[1], ",")))
g1 := "@a,"
var g2 string
for _, d := range split[2] {
g2 += "'" + d + "',"
}
// group 2: connect by , and put into a normal string
g3 := "'" + strings.Join(split[3], ",") + "'"
tk.MustExec(fmt.Sprintf("drop %v binding for sql digest %s %s %s %s", scope, g0, g1, g2, g3))
tk.MustQuery(fmt.Sprintf("show %v bindings", scope)).Check(testkit.Rows()) // empty
}

Expand Down Expand Up @@ -1107,3 +1139,31 @@ func TestFuzzyBindingHintsWithSourceReturning(t *testing.T) {
}
}
}

func TestBatchDropBindings(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec(`use test`)
tk.MustExec(`create table t1 (a int, b int, c int, d int, key(a), key(b), key(c), key(d))`)
tk.MustExec(`create table t2 (a int, b int, c int, d int, key(a), key(b), key(c), key(d))`)
tk.MustExec(`create table t3 (a int, b int, c int, d int, key(a), key(b), key(c), key(d))`)
tk.MustExec(`create global binding for select * from t1 using select /*+ use_index(t1, a) */ * from t1`)
tk.MustExec(`create global binding for select * from t1 where b < 1 using select /*+ use_index(t1,b) */ * from t1 where b < 1`)
tk.MustExec(`create global binding for select * from t1 where c < 1 using select /*+ use_index(t1,c) */ * from t1 where c < 1`)
tk.MustExec(`create global binding for select * from t1 join t2 on t1.a = t2.a using select /*+ hash_join(t1) */ * from t1 join t2 on t1.a = t2.a`)
tk.MustExec(`create global binding for select * from t1 join t2 on t1.a = t2.a join t3 on t2.b = t3.b where t1.a = 1 using select /*+ leading(t3,t2,t1) */ * from t1 join t2 on t1.a = t2.a join t3 on t2.b = t3.b where t1.a = 1`)
tk.MustExec(`create global binding for select * from t1 where a in (select sum(b) from t2) using select /*+ agg_to_cop(@sel_2) */ * from t1 where a in (select sum(b) from t2)`)
tk.MustExec(`create global binding for select * from t2 where a = 1 and b = 2 and c = 3 using select * from t2 ignore index (b) where a = 1 and b = 2 and c = 3`)
tk.MustExec(`create global binding for select * from t2 where a = 1 and b = 2 and c = 3 using select * from t2 use index (b) where a = 1 and b = 2 and c = 3`)

tk.MustExec(`create session binding for select * from t1 using select /*+ use_index(t1, a) */ * from t1`)
tk.MustExec(`create session binding for select * from t1 where b < 1 using select /*+ use_index(t1, b) */ * from t1 where b < 1`)
tk.MustExec(`create session binding for select * from t1 where c < 1 using select /*+ use_index(t1, c) */ * from t1 where c < 1`)
tk.MustExec(`create session binding for select * from t1 join t2 on t1.a = t2.a using select /*+ hash_join( t1) */ * from t1 join t2 on t1.a = t2.a`)
tk.MustExec(`create session binding for select * from t1 join t2 on t1.a = t2.a join t3 on t2.b = t3.b where t1. a = 1 using select /*+ leading(t3,t2,t1) */ * from t1 join t2 on t1.a = t2.a join t3 on t2.b = t3.b where t1.a = 1`)
tk.MustExec(`create session binding for select * from t1 where a in (select sum( b) from t2) using select /*+ agg_to_cop(@sel_2) */ * from t1 where a in (select sum(b) from t2)`)
tk.MustExec(`create session binding for select * from t2 where a = 1 and b = 2 and c = 3 using select * from t2 ignore index (b) where a = 1 and b = 2 and c = 3`)
tk.MustExec(`create session binding for select * from t2 where a = 1 and b = 2 and c = 3 using select * from t2 use index (b) where a = 1 and b = 2 and c = 3`)
removeAllBindings(tk, true)
removeAllBindings(tk, false)
}
Loading

0 comments on commit 1f40cb0

Please sign in to comment.