Skip to content

Commit

Permalink
ddl: use a global owner manager instance for DDL, to avoid owner chan…
Browse files Browse the repository at this point in the history
  • Loading branch information
D3Hunter authored Nov 11, 2024
1 parent 042846e commit 56e7093
Show file tree
Hide file tree
Showing 39 changed files with 636 additions and 434 deletions.
1 change: 1 addition & 0 deletions br/pkg/conn/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go_library(
"//br/pkg/utils",
"//br/pkg/version",
"//pkg/config",
"//pkg/ddl",
"//pkg/domain",
"//pkg/kv",
"@com_github_docker_go_units//:go-units",
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/br/pkg/version"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/ddl"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/kv"
"github.com/tikv/client-go/v2/oracle"
Expand Down Expand Up @@ -194,6 +195,9 @@ func NewMgr(
return nil, errors.Trace(err)
}

if config.GetGlobalConfig().Store != "tikv" {
config.GetGlobalConfig().Store = "tikv"
}
// Disable GC because TiDB enables GC already.
path := fmt.Sprintf(
"tikv://%s?disableGC=true&keyspaceName=%s",
Expand Down Expand Up @@ -292,6 +296,7 @@ func (mgr *Mgr) Close() {
if mgr.dom != nil {
mgr.dom.Close()
}
ddl.CloseOwnerManager()
tikv.StoreShuttingDown(1)
_ = mgr.storage.Close()
}
Expand Down
3 changes: 3 additions & 0 deletions br/pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ func (g Glue) startDomainAsNeeded(store kv.Storage) error {
if existDom != nil {
return nil
}
if err := ddl.StartOwnerManager(context.Background(), store); err != nil {
return errors.Trace(err)
}
dom, err := session.GetDomain(store)
if err != nil {
return err
Expand Down
6 changes: 5 additions & 1 deletion br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -920,7 +920,11 @@ func RunStreamAdvancer(c context.Context, g glue.Glue, cmdName string, cfg *Stre
env := streamhelper.CliEnv(mgr.StoreManager, mgr.GetStore(), etcdCLI)
advancer := streamhelper.NewCheckpointAdvancer(env)
advancer.UpdateConfig(cfg.AdvancerCfg)
advancerd := daemon.New(advancer, streamhelper.OwnerManagerForLogBackup(ctx, etcdCLI), cfg.AdvancerCfg.TickDuration)
ownerMgr := streamhelper.OwnerManagerForLogBackup(ctx, etcdCLI)
defer func() {
ownerMgr.Close()
}()
advancerd := daemon.New(advancer, ownerMgr, cfg.AdvancerCfg.TickDuration)
loop, err := advancerd.Begin(ctx)
if err != nil {
return err
Expand Down
2 changes: 2 additions & 0 deletions cmd/benchdb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ go_library(
importpath = "github.com/pingcap/tidb/cmd/benchdb",
visibility = ["//visibility:private"],
deps = [
"//pkg/config",
"//pkg/ddl",
"//pkg/parser/terror",
"//pkg/session",
"//pkg/session/types",
Expand Down
9 changes: 8 additions & 1 deletion cmd/benchdb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"time"

"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/ddl"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/session"
sessiontypes "github.com/pingcap/tidb/pkg/session/types"
Expand Down Expand Up @@ -96,11 +98,16 @@ func newBenchDB() *benchDB {
// Create TiKV store and disable GC as we will trigger GC manually.
store, err := store.New("tikv://" + *addr + "?disableGC=true")
terror.MustNil(err)
// maybe close below components, but it's for test anyway.
ctx := context.Background()
config.GetGlobalConfig().Store = "tikv"
err = ddl.StartOwnerManager(ctx, store)
terror.MustNil(err)
_, err = session.BootstrapSession(store)
terror.MustNil(err)
se, err := session.CreateSession(store)
terror.MustNil(err)
_, err = se.ExecuteInternal(context.Background(), "use test")
_, err = se.ExecuteInternal(ctx, "use test")
terror.MustNil(err)

return &benchDB{
Expand Down
2 changes: 2 additions & 0 deletions cmd/ddltest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ go_test(
race = "on",
shard_count = 6,
deps = [
"//dumpling/context",
"//pkg/config",
"//pkg/ddl",
"//pkg/domain",
"//pkg/kv",
"//pkg/parser/model",
Expand Down
6 changes: 5 additions & 1 deletion cmd/ddltest/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ import (
_ "github.com/go-sql-driver/mysql"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/dumpling/context"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/ddl"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser/model"
Expand Down Expand Up @@ -95,12 +97,13 @@ func createDDLSuite(t *testing.T) (s *ddlSuite) {

s.quit = make(chan struct{})

config.GetGlobalConfig().Store = "tikv"
s.store, err = store.New(fmt.Sprintf("tikv://%s%s", *etcd, *tikvPath))
require.NoError(t, err)

// Make sure the schema lease of this session is equal to other TiDB servers'.
session.SetSchemaLease(time.Duration(*lease) * time.Second)

require.NoError(t, ddl.StartOwnerManager(context.Background(), s.store))
s.dom, err = session.BootstrapSession(s.store)
require.NoError(t, err)

Expand All @@ -118,6 +121,7 @@ func createDDLSuite(t *testing.T) (s *ddlSuite) {
err = domain.GetDomain(s.ctx).DDL().Stop()
require.NoError(t, err)
config.GetGlobalConfig().Instance.TiDBEnableDDL.Store(false)
ddl.CloseOwnerManager()
session.ResetStoreForWithTiKVTest(s.store)
s.dom.Close()
require.NoError(t, s.store.Close())
Expand Down
13 changes: 8 additions & 5 deletions cmd/tidb-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func main() {
keyspaceName := keyspace.GetKeyspaceNameBySettings()
executor.Start()
resourcemanager.InstanceResourceManager.Start()
storage, dom := createStoreAndDomain(keyspaceName)
storage, dom := createStoreDDLOwnerMgrAndDomain(keyspaceName)
svr := createServer(storage, dom)

exited := make(chan struct{})
Expand Down Expand Up @@ -397,7 +397,7 @@ func registerStores() {
terror.MustNil(err)
}

func createStoreAndDomain(keyspaceName string) (kv.Storage, *domain.Domain) {
func createStoreDDLOwnerMgrAndDomain(keyspaceName string) (kv.Storage, *domain.Domain) {
cfg := config.GetGlobalConfig()
var fullPath string
if keyspaceName == "" {
Expand All @@ -411,6 +411,8 @@ func createStoreAndDomain(keyspaceName string) (kv.Storage, *domain.Domain) {
copr.GlobalMPPFailedStoreProber.Run()
mppcoordmanager.InstanceMPPCoordinatorManager.Run()
// Bootstrap a session to load information schema.
err = ddl.StartOwnerManager(context.Background(), storage)
terror.MustNil(err)
dom, err := session.BootstrapSession(storage)
terror.MustNil(err)
return storage, dom
Expand Down Expand Up @@ -859,7 +861,7 @@ func createServer(storage kv.Storage, dom *domain.Domain) *server.Server {
svr, err := server.NewServer(cfg, driver)
// Both domain and storage have started, so we have to clean them before exiting.
if err != nil {
closeDomainAndStorage(storage, dom)
closeDDLOwnerMgrDomainAndStorage(storage, dom)
log.Fatal("failed to create the server", zap.Error(err), zap.Stack("stack"))
}
svr.SetDomain(dom)
Expand Down Expand Up @@ -893,9 +895,10 @@ func setupTracing() {
opentracing.SetGlobalTracer(tracer)
}

func closeDomainAndStorage(storage kv.Storage, dom *domain.Domain) {
func closeDDLOwnerMgrDomainAndStorage(storage kv.Storage, dom *domain.Domain) {
tikv.StoreShuttingDown(1)
dom.Close()
ddl.CloseOwnerManager()
copr.GlobalMPPFailedStoreProber.Stop()
mppcoordmanager.InstanceMPPCoordinatorManager.Stop()
err := storage.Close()
Expand All @@ -918,7 +921,7 @@ func cleanup(svr *server.Server, storage kv.Storage, dom *domain.Domain) {
// See https://github.com/pingcap/tidb/issues/40038 for details.
svr.KillSysProcesses()
plugin.Shutdown(context.Background())
closeDomainAndStorage(storage, dom)
closeDDLOwnerMgrDomainAndStorage(storage, dom)
disk.CleanUp()
closeStmtSummary()
topsql.Close()
Expand Down
4 changes: 2 additions & 2 deletions pkg/autoid_service/autoid.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,8 +379,8 @@ func MockForTest(store kv.Storage) autoid.AutoIDAllocClient {

// Close closes the Service and clean up resource.
func (s *Service) Close() {
if s.leaderShip != nil && s.leaderShip.IsOwner() {
s.leaderShip.Cancel()
if s.leaderShip != nil {
s.leaderShip.Close()
}
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ go_library(
"modify_column.go",
"multi_schema_change.go",
"options.go",
"owner_mgr.go",
"partition.go",
"placement_policy.go",
"reorg.go",
Expand Down Expand Up @@ -133,6 +134,7 @@ go_library(
"//pkg/sessiontxn",
"//pkg/statistics",
"//pkg/statistics/handle",
"//pkg/store",
"//pkg/store/driver/txn",
"//pkg/store/helper",
"//pkg/table",
Expand Down Expand Up @@ -251,6 +253,7 @@ go_test(
"multi_schema_change_test.go",
"mv_index_test.go",
"options_test.go",
"owner_mgr_test.go",
"partition_test.go",
"placement_policy_ddl_test.go",
"placement_policy_test.go",
Expand Down Expand Up @@ -356,6 +359,7 @@ go_test(
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//util",
"@io_etcd_go_etcd_client_v3//:client",
"@io_etcd_go_etcd_tests_v3//integration",
"@org_golang_google_grpc//:grpc",
"@org_golang_x_sync//errgroup",
"@org_uber_go_atomic//:atomic",
Expand Down
14 changes: 10 additions & 4 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ const (
ddlSchemaVersionKeyLock = "/tidb/ddl/schema_version_lock"
// addingDDLJobPrefix is the path prefix used to record the newly added DDL job, and it's saved to etcd.
addingDDLJobPrefix = "/tidb/ddl/add_ddl_job_"
ddlPrompt = "ddl"
// Prompt is the prompt for ddl owner manager.
Prompt = "ddl"

batchAddingJobs = 100

Expand Down Expand Up @@ -638,19 +639,21 @@ func newDDL(ctx context.Context, options ...Option) (*ddl, *executor) {
o(opt)
}

id := uuid.New().String()
var id string
var manager owner.Manager
var schemaVerSyncer schemaver.Syncer
var serverStateSyncer serverstate.Syncer
var deadLockCkr util.DeadTableLockChecker
if etcdCli := opt.EtcdCli; etcdCli == nil {
id = uuid.New().String()
// The etcdCli is nil if the store is localstore which is only used for testing.
// So we use mockOwnerManager and memSyncer.
manager = owner.NewMockManager(ctx, id, opt.Store, DDLOwnerKey)
schemaVerSyncer = schemaver.NewMemSyncer()
serverStateSyncer = serverstate.NewMemSyncer()
} else {
manager = owner.NewOwnerManager(ctx, etcdCli, ddlPrompt, id, DDLOwnerKey)
id = globalOwnerManager.ID()
manager = globalOwnerManager.OwnerManager()
schemaVerSyncer = schemaver.NewEtcdSyncer(etcdCli, id)
serverStateSyncer = serverstate.NewEtcdSyncer(etcdCli, util.ServerGlobalState)
deadLockCkr = util.NewDeadTableLockChecker(etcdCli)
Expand Down Expand Up @@ -1003,7 +1006,10 @@ func (d *ddl) close() {
startTime := time.Now()
d.cancel()
d.wg.Wait()
d.ownerManager.Cancel()
// when run with real-tikv, the lifecycle of ownerManager is managed by globalOwnerManager,
// when run with uni-store BreakCampaignLoop is same as Close.
// hope we can unify it after refactor to let some components only start once.
d.ownerManager.BreakCampaignLoop()
d.schemaVerSyncer.Close()

// d.delRangeMgr using sessions from d.sessPool.
Expand Down
97 changes: 97 additions & 0 deletions pkg/ddl/owner_mgr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ddl

import (
"context"

"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/owner"
storepkg "github.com/pingcap/tidb/pkg/store"
"github.com/pingcap/tidb/pkg/util/logutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)

var globalOwnerManager = &ownerManager{}

// StartOwnerManager starts a global DDL owner manager.
func StartOwnerManager(ctx context.Context, store kv.Storage) error {
return globalOwnerManager.Start(ctx, store)
}

// CloseOwnerManager closes the global DDL owner manager.
func CloseOwnerManager() {
globalOwnerManager.Close()
}

// ownerManager is used to manage lifecycle of a global DDL owner manager which
// we only want it to init session once, to avoid DDL owner change after upgrade.
type ownerManager struct {
etcdCli *clientv3.Client
id string
ownerMgr owner.Manager
started bool
}

// Start starts the TiDBInstance.
func (om *ownerManager) Start(ctx context.Context, store kv.Storage) error {
// BR might start domain multiple times, we need to avoid it. when BR have refactored
// this part, we can remove this.
if om.started {
return nil
}
if config.GetGlobalConfig().Store != "tikv" {
return nil
}
cli, err := storepkg.NewEtcdCli(store)
if err != nil {
return errors.Trace(err)
}
failpoint.InjectCall("injectEtcdClient", &cli)
if cli == nil {
return errors.New("etcd client is nil, maybe the server is not started with PD")
}
om.id = uuid.New().String()
om.etcdCli = cli
om.ownerMgr = owner.NewOwnerManager(ctx, om.etcdCli, Prompt, om.id, DDLOwnerKey)
om.started = true
return nil
}

// Close closes the TiDBInstance.
func (om *ownerManager) Close() {
if om.ownerMgr != nil {
om.ownerMgr.Close()
}
if om.etcdCli != nil {
if err := om.etcdCli.Close(); err != nil {
logutil.BgLogger().Error("close etcd client failed", zap.Error(err))
}
}
om.started = false
}

func (om *ownerManager) ID() string {
return om.id
}

func (om *ownerManager) OwnerManager() owner.Manager {
return om.ownerMgr
}
Loading

0 comments on commit 56e7093

Please sign in to comment.