Skip to content

Commit

Permalink
executor: let plan replayer record table tiflash replica (#37336)
Browse files Browse the repository at this point in the history
close #37255
  • Loading branch information
Yisaer authored Aug 24, 2022
1 parent 25dda97 commit eb8fc86
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 11 deletions.
8 changes: 8 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/golang/protobuf/proto"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain"
Expand Down Expand Up @@ -79,6 +80,7 @@ func checkFileName(s string) bool {
"meta.txt",
"stats/test.t_dump_single.json",
"schema/test.t_dump_single.schema.txt",
"table_tiflash_replica.txt",
"variables.toml",
"sqls.sql",
"session_bindings.sql",
Expand Down Expand Up @@ -140,12 +142,18 @@ func TestLoadStats(t *testing.T) {
}

func TestPlanReplayer(t *testing.T) {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/infoschema/mockTiFlashStoreCount", `return(true)`))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/infoschema/mockTiFlashStoreCount"))
}()
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int, b int, index idx_a(a))")
tk.MustExec("alter table t set tiflash replica 1")
tk.MustExec("plan replayer dump explain select * from t where a=10")
tk.MustExec("plan replayer dump explain select /*+ read_from_storage(tiflash[t]) */ * from t")

tk.MustExec("create table t1 (a int)")
tk.MustExec("create table t2 (a int)")
Expand Down
118 changes: 107 additions & 11 deletions executor/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"io"
"os"
"path/filepath"
"strconv"
"strings"
"time"

Expand All @@ -48,6 +49,17 @@ import (
var _ Executor = &PlanReplayerSingleExec{}
var _ Executor = &PlanReplayerLoadExec{}

const (
configFile = "config.toml"
metaFile = "meta.txt"
variablesFile = "variables.toml"
sqlFile = "sqls.sql"
tiFlashReplicasFile = "table_tiflash_replica.txt"
sessionBindingFile = "session_bindings.sql"
globalBindingFile = "global_bindings.sql"
explainFile = "explain.txt"
)

// PlanReplayerSingleExec represents a plan replayer executor.
type PlanReplayerSingleExec struct {
baseExecutor
Expand Down Expand Up @@ -165,6 +177,7 @@ func (e *PlanReplayerSingleExec) Next(ctx context.Context, req *chunk.Chunk) err
| |-stats2.json
| |-....
|-config.toml
|-table_tiflash_replica.txt
|-variables.toml
|-bindings.sql
|-sqls.sql
Expand Down Expand Up @@ -232,6 +245,11 @@ func (e *PlanReplayerSingleExec) dumpSingle(ctx context.Context, path string) (f
return "", err
}

// Dump tables tiflash replicas
if err = dumpTiFlashReplica(e.ctx, zw, pairs); err != nil {
return "", err
}

// Dump stats
if err = dumpStats(zw, pairs, do); err != nil {
return "", err
Expand All @@ -243,7 +261,7 @@ func (e *PlanReplayerSingleExec) dumpSingle(ctx context.Context, path string) (f
}

// Dump sql
sql, err := zw.Create("sqls.sql")
sql, err := zw.Create(sqlFile)
if err != nil {
return "", nil
}
Expand Down Expand Up @@ -271,7 +289,7 @@ func (e *PlanReplayerSingleExec) dumpSingle(ctx context.Context, path string) (f
}

func dumpConfig(zw *zip.Writer) error {
cf, err := zw.Create("config.toml")
cf, err := zw.Create(configFile)
if err != nil {
return errors.AddStack(err)
}
Expand All @@ -282,7 +300,7 @@ func dumpConfig(zw *zip.Writer) error {
}

func dumpMeta(zw *zip.Writer) error {
mt, err := zw.Create("meta.txt")
mt, err := zw.Create(metaFile)
if err != nil {
return errors.AddStack(err)
}
Expand All @@ -293,6 +311,31 @@ func dumpMeta(zw *zip.Writer) error {
return nil
}

func dumpTiFlashReplica(ctx sessionctx.Context, zw *zip.Writer, pairs map[tableNamePair]struct{}) error {
bf, err := zw.Create(tiFlashReplicasFile)
if err != nil {
return errors.AddStack(err)
}
is := domain.GetDomain(ctx).InfoSchema()
for pair := range pairs {
dbName := model.NewCIStr(pair.DBName)
tableName := model.NewCIStr(pair.TableName)
t, err := is.TableByName(dbName, tableName)
if err != nil {
logutil.BgLogger().Warn("failed to find table info", zap.Error(err),
zap.String("dbName", dbName.L), zap.String("tableName", tableName.L))
continue
}
if t.Meta().TiFlashReplica != nil && t.Meta().TiFlashReplica.Count > 0 {
row := []string{
pair.DBName, pair.TableName, strconv.FormatUint(t.Meta().TiFlashReplica.Count, 10),
}
fmt.Fprintf(bf, "%s\n", strings.Join(row, "\t"))
}
}
return nil
}

func dumpSchemas(ctx sessionctx.Context, zw *zip.Writer, pairs map[tableNamePair]struct{}) error {
for pair := range pairs {
err := getShowCreateTable(pair, zw, ctx)
Expand Down Expand Up @@ -338,7 +381,7 @@ func dumpVariables(ctx sessionctx.Context, zw *zip.Writer) error {
if err != nil {
return err
}
vf, err := zw.Create("variables.toml")
vf, err := zw.Create(variablesFile)
if err != nil {
return errors.AddStack(err)
}
Expand All @@ -365,7 +408,7 @@ func dumpSessionBindings(ctx sessionctx.Context, zw *zip.Writer) error {
if err != nil {
return err
}
bf, err := zw.Create("session_bindings.sql")
bf, err := zw.Create(sessionBindingFile)
if err != nil {
return errors.AddStack(err)
}
Expand All @@ -389,7 +432,7 @@ func dumpGlobalBindings(ctx sessionctx.Context, zw *zip.Writer) error {
if err != nil {
return err
}
bf, err := zw.Create("global_bindings.sql")
bf, err := zw.Create(globalBindingFile)
if err != nil {
return errors.AddStack(err)
}
Expand Down Expand Up @@ -424,7 +467,7 @@ func dumpExplain(ctx sessionctx.Context, zw *zip.Writer, sql string, isAnalyze b
if err != nil {
return err
}
fw, err := zw.Create("explain.txt")
fw, err := zw.Create(explainFile)
if err != nil {
return errors.AddStack(err)
}
Expand Down Expand Up @@ -604,9 +647,48 @@ func (e *PlanReplayerLoadExec) Next(ctx context.Context, req *chunk.Chunk) error
return nil
}

func loadSetTiFlashReplica(ctx sessionctx.Context, z *zip.Reader) error {
for _, zipFile := range z.File {
if strings.Compare(zipFile.Name, tiFlashReplicasFile) == 0 {
v, err := zipFile.Open()
if err != nil {
return errors.AddStack(err)
}
//nolint: errcheck,all_revive
defer v.Close()
buf := new(bytes.Buffer)
_, err = buf.ReadFrom(v)
if err != nil {
return errors.AddStack(err)
}
rows := strings.Split(buf.String(), "\n")
for _, row := range rows {
if len(row) < 1 {
continue
}
r := strings.Split(row, "\t")
if len(r) < 3 {
logutil.BgLogger().Debug("plan replayer: skip error",
zap.Error(errors.New("setting tiflash replicas failed")))
continue
}
dbName := r[0]
tableName := r[1]
c := context.Background()
// Though we record tiflash replica in txt, we only set 1 tiflash replica as it's enough for reproduce the plan
sql := fmt.Sprintf("alter table %s.%s set tiflash replica 1", dbName, tableName)
_, err = ctx.(sqlexec.SQLExecutor).Execute(c, sql)
logutil.BgLogger().Debug("plan replayer: skip error", zap.Error(err))
}
}
}
return nil
}

func loadVariables(ctx sessionctx.Context, z *zip.Reader) error {
unLoadVars := make([]string, 0)
for _, zipFile := range z.File {
if strings.Compare(zipFile.Name, "variables.toml") == 0 {
if strings.Compare(zipFile.Name, variablesFile) == 0 {
varMap := make(map[string]string)
v, err := zipFile.Open()
if err != nil {
Expand All @@ -622,20 +704,28 @@ func loadVariables(ctx sessionctx.Context, z *zip.Reader) error {
for name, value := range varMap {
sysVar := variable.GetSysVar(name)
if sysVar == nil {
return variable.ErrUnknownSystemVar.GenWithStackByArgs(name)
unLoadVars = append(unLoadVars, name)
logutil.BgLogger().Warn(fmt.Sprintf("skip set variable %s:%s", name, value), zap.Error(err))
continue
}
sVal, err := sysVar.Validate(vars, value, variable.ScopeSession)
if err != nil {
logutil.BgLogger().Debug(fmt.Sprintf("skip variable %s:%s", name, value), zap.Error(err))
unLoadVars = append(unLoadVars, name)
logutil.BgLogger().Warn(fmt.Sprintf("skip variable %s:%s", name, value), zap.Error(err))
continue
}
err = vars.SetSystemVar(name, sVal)
if err != nil {
return err
unLoadVars = append(unLoadVars, name)
logutil.BgLogger().Warn(fmt.Sprintf("skip set variable %s:%s", name, value), zap.Error(err))
continue
}
}
}
}
if len(unLoadVars) > 0 {
ctx.GetSessionVars().StmtCtx.AppendWarning(errors.Errorf("variables set failed:%s", strings.Join(unLoadVars, ",")))
}
return nil
}

Expand Down Expand Up @@ -722,6 +812,12 @@ func (e *PlanReplayerLoadInfo) Update(data []byte) error {
}
}

// set tiflash replica if exists
err = loadSetTiFlashReplica(e.Ctx, z)
if err != nil {
return err
}

// build view next
for _, zipFile := range z.File {
path := strings.Split(zipFile.Name, "/")
Expand Down

0 comments on commit eb8fc86

Please sign in to comment.