Skip to content

Commit

Permalink
br: create database by db pool (#50771) (#51027)
Browse files Browse the repository at this point in the history
close #50767
  • Loading branch information
ti-chi-bot authored Feb 27, 2024
1 parent e7782b3 commit 6fcb7fd
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 11 deletions.
35 changes: 30 additions & 5 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -775,13 +775,38 @@ func (rc *Client) GetDBSchema(dom *domain.Domain, dbName model.CIStr) (*model.DB
return info.SchemaByName(dbName)
}

// CreateDatabase creates a database.
func (rc *Client) CreateDatabase(ctx context.Context, db *model.DBInfo) error {
// CreateDatabases creates databases. If the client has the db pool, it would create it.
func (rc *Client) CreateDatabases(ctx context.Context, dbs []*utils.Database) error {
if rc.IsSkipCreateSQL() {
log.Info("skip create database", zap.Stringer("name", db.Name))
log.Info("skip create database")
return nil
}

if len(rc.dbPool) == 0 {
log.Info("create databases sequentially")
for _, db := range dbs {
err := rc.createDatabaseWithDBConn(ctx, db.Info, rc.db)
if err != nil {
return errors.Trace(err)
}
}
return nil
}

log.Info("create databases in db pool", zap.Int("pool size", len(rc.dbPool)))
eg, ectx := errgroup.WithContext(ctx)
workers := utils.NewWorkerPool(uint(len(rc.dbPool)), "DB DDL workers")
for _, db_ := range dbs {
db := db_
workers.ApplyWithIDInErrorGroup(eg, func(id uint64) error {
conn := rc.dbPool[id%uint64(len(rc.dbPool))]
return rc.createDatabaseWithDBConn(ectx, db.Info, conn)
})
}
return eg.Wait()
}

func (rc *Client) createDatabaseWithDBConn(ctx context.Context, db *model.DBInfo, conn *DB) error {
log.Info("create database", zap.Stringer("name", db.Name))

if !rc.supportPolicy {
Expand All @@ -791,12 +816,12 @@ func (rc *Client) CreateDatabase(ctx context.Context, db *model.DBInfo) error {
}

if db.PlacementPolicyRef != nil {
if err := rc.db.ensurePlacementPolicy(ctx, db.PlacementPolicyRef.Name, rc.policyMap); err != nil {
if err := conn.ensurePlacementPolicy(ctx, db.PlacementPolicyRef.Name, rc.policyMap); err != nil {
return errors.Trace(err)
}
}

return rc.db.CreateDatabase(ctx, db)
return conn.CreateDatabase(ctx, db)
}

// CreateTables creates multiple tables, and returns their rewrite rules.
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/restore/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func TestCheckTargetClusterFresh(t *testing.T) {
ctx := context.Background()
require.NoError(t, client.CheckTargetClusterFresh(ctx))

require.NoError(t, client.CreateDatabase(ctx, &model.DBInfo{Name: model.NewCIStr("user_db")}))
require.NoError(t, client.CreateDatabases(ctx, []*utils.Database{{Info: &model.DBInfo{Name: model.NewCIStr("user_db")}}}))
require.True(t, berrors.ErrRestoreNotFreshCluster.Equal(client.CheckTargetClusterFresh(ctx)))
}

Expand Down
7 changes: 2 additions & 5 deletions br/pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -878,11 +878,8 @@ func runRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
return nil
}

for _, db := range dbs {
err = client.CreateDatabase(ctx, db.Info)
if err != nil {
return errors.Trace(err)
}
if err = client.CreateDatabases(ctx, dbs); err != nil {
return errors.Trace(err)
}

// We make bigger errCh so we won't block on multi-part failed.
Expand Down

0 comments on commit 6fcb7fd

Please sign in to comment.