Skip to content

Commit

Permalink
br: remove duplicate bind info after snapshot restore (#48946)
Browse files Browse the repository at this point in the history
close #46527
  • Loading branch information
Leavrth authored Dec 10, 2023
1 parent 899dfe8 commit e62699c
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 11 deletions.
1 change: 1 addition & 0 deletions br/pkg/restore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ go_library(
"//br/pkg/utils/iter",
"//br/pkg/utils/storewatch",
"//br/pkg/version",
"//pkg/bindinfo",
"//pkg/config",
"//pkg/ddl",
"//pkg/ddl/util",
Expand Down
29 changes: 19 additions & 10 deletions br/pkg/restore/systable_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/pkg/bindinfo"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
filter "github.com/pingcap/tidb/pkg/util/table-filter"
Expand Down Expand Up @@ -148,26 +149,26 @@ func (rc *Client) ClearSystemUsers(ctx context.Context, resetUsers []string) err

// RestoreSystemSchemas restores the system schema(i.e. the `mysql` schema).
// Detail see https://github.com/pingcap/br/issues/679#issuecomment-762592254.
func (rc *Client) RestoreSystemSchemas(ctx context.Context, f filter.Filter) {
func (rc *Client) RestoreSystemSchemas(ctx context.Context, f filter.Filter) error {
sysDB := mysql.SystemDB

temporaryDB := utils.TemporaryDBName(sysDB)
defer rc.cleanTemporaryDatabase(ctx, sysDB)

if !f.MatchSchema(sysDB) || !rc.withSysTable {
log.Debug("system database filtered out", zap.String("database", sysDB))
return
return nil
}
originDatabase, ok := rc.databases[temporaryDB.O]
if !ok {
log.Info("system database not backed up, skipping", zap.String("database", sysDB))
return
return nil
}
db, ok := rc.getDatabaseByName(sysDB)
if !ok {
// Or should we create the database here?
log.Warn("target database not exist, aborting", zap.String("database", sysDB))
return
return nil
}

tablesRestored := make([]string, 0, len(originDatabase.Tables))
Expand All @@ -179,15 +180,15 @@ func (rc *Client) RestoreSystemSchemas(ctx context.Context, f filter.Filter) {
logutil.ShortError(err),
zap.Stringer("table", tableName),
)
return errors.Annotatef(err, "error during merging temporary tables into system tables, table: %s", tableName)
}
tablesRestored = append(tablesRestored, tableName.L)
}
}
if err := rc.afterSystemTablesReplaced(tablesRestored); err != nil {
for _, e := range multierr.Errors(err) {
log.Warn("error during reconfigurating the system tables", zap.String("database", sysDB), logutil.ShortError(e))
}
if err := rc.afterSystemTablesReplaced(ctx, tablesRestored); err != nil {
return errors.Annotate(err, "error during extra works after system tables replaced")
}
return nil
}

// database is a record of a database.
Expand Down Expand Up @@ -218,20 +219,28 @@ func (rc *Client) getDatabaseByName(name string) (*database, bool) {

// afterSystemTablesReplaced do some extra work for special system tables.
// e.g. after inserting to the table mysql.user, we must execute `FLUSH PRIVILEGES` to allow it take effect.
func (rc *Client) afterSystemTablesReplaced(tables []string) error {
func (rc *Client) afterSystemTablesReplaced(ctx context.Context, tables []string) error {
var err error
for _, table := range tables {
if table == "user" {
if rc.fullClusterRestore {
log.Info("privilege system table restored, please reconnect to make it effective")
err = rc.dom.NotifyUpdatePrivilege()
err = multierr.Append(err, rc.dom.NotifyUpdatePrivilege())
} else {
// to make it compatible with older version
// todo: should we allow restore system table in non-fresh cluster in later br version?
// if we don't, we can check it at first place.
err = multierr.Append(err, errors.Annotatef(berrors.ErrUnsupportedSystemTable,
"restored user info may not take effect, until you should execute `FLUSH PRIVILEGES` manually"))
}
} else if table == "bind_info" {
if serr := rc.db.se.Execute(ctx, bindinfo.StmtRemoveDuplicatedPseudoBinding); serr != nil {
log.Warn("failed to delete duplicated pseudo binding", zap.Error(serr))
err = multierr.Append(err,
berrors.ErrUnknown.Wrap(serr).GenWithStack("failed to delete duplicated pseudo binding %s", bindinfo.StmtRemoveDuplicatedPseudoBinding))
} else {
log.Info("success to remove duplicated pseudo binding")
}
}
}
return err
Expand Down
5 changes: 4 additions & 1 deletion br/pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1038,7 +1038,10 @@ func runRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf

// The cost of rename user table / replace into system table wouldn't be so high.
// So leave it out of the pipeline for easier implementation.
client.RestoreSystemSchemas(ctx, cfg.TableFilter)
err = client.RestoreSystemSchemas(ctx, cfg.TableFilter)
if err != nil {
return errors.Trace(err)
}

schedulersRemovable = true

Expand Down

0 comments on commit e62699c

Please sign in to comment.