Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: use a global owner manager instance for DDL, to avoid owner change #57179

Merged
merged 16 commits into from
Nov 11, 2024
1 change: 1 addition & 0 deletions br/pkg/conn/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go_library(
"//br/pkg/utils",
"//br/pkg/version",
"//pkg/config",
"//pkg/ddl",
"//pkg/domain",
"//pkg/kv",
"@com_github_docker_go_units//:go-units",
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/br/pkg/version"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/ddl"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/kv"
"github.com/tikv/client-go/v2/oracle"
Expand Down Expand Up @@ -209,6 +210,9 @@ func NewMgr(
return nil, berrors.ErrKVNotTiKV
}

if err = ddl.StartOwnerManager(ctx, storage); err != nil {
return nil, errors.Trace(err)
}
var dom *domain.Domain
if needDomain {
dom, err = g.GetDomain(storage)
Expand Down Expand Up @@ -292,6 +296,7 @@ func (mgr *Mgr) Close() {
if mgr.dom != nil {
mgr.dom.Close()
}
ddl.CloseOwnerManager()
tikv.StoreShuttingDown(1)
_ = mgr.storage.Close()
}
Expand Down
6 changes: 5 additions & 1 deletion br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -920,7 +920,11 @@ func RunStreamAdvancer(c context.Context, g glue.Glue, cmdName string, cfg *Stre
env := streamhelper.CliEnv(mgr.StoreManager, mgr.GetStore(), etcdCLI)
advancer := streamhelper.NewCheckpointAdvancer(env)
advancer.UpdateConfig(cfg.AdvancerCfg)
advancerd := daemon.New(advancer, streamhelper.OwnerManagerForLogBackup(ctx, etcdCLI), cfg.AdvancerCfg.TickDuration)
ownerMgr := streamhelper.OwnerManagerForLogBackup(ctx, etcdCLI)
defer func() {
ownerMgr.Close()
}()
advancerd := daemon.New(advancer, ownerMgr, cfg.AdvancerCfg.TickDuration)
loop, err := advancerd.Begin(ctx)
if err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions cmd/benchdb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
importpath = "github.com/pingcap/tidb/cmd/benchdb",
visibility = ["//visibility:private"],
deps = [
"//pkg/ddl",
"//pkg/parser/terror",
"//pkg/session",
"//pkg/session/types",
Expand Down
7 changes: 6 additions & 1 deletion cmd/benchdb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/ddl"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/session"
sessiontypes "github.com/pingcap/tidb/pkg/session/types"
Expand Down Expand Up @@ -96,11 +97,15 @@ func newBenchDB() *benchDB {
// Create TiKV store and disable GC as we will trigger GC manually.
store, err := store.New("tikv://" + *addr + "?disableGC=true")
terror.MustNil(err)
// maybe close below components, but it's for test anyway.
ctx := context.Background()
err = ddl.StartOwnerManager(ctx, store)
terror.MustNil(err)
_, err = session.BootstrapSession(store)
terror.MustNil(err)
se, err := session.CreateSession(store)
terror.MustNil(err)
_, err = se.ExecuteInternal(context.Background(), "use test")
_, err = se.ExecuteInternal(ctx, "use test")
terror.MustNil(err)

return &benchDB{
Expand Down
2 changes: 2 additions & 0 deletions cmd/ddltest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ go_test(
race = "on",
shard_count = 6,
deps = [
"//dumpling/context",
"//pkg/config",
"//pkg/ddl",
"//pkg/domain",
"//pkg/kv",
"//pkg/parser/model",
Expand Down
5 changes: 4 additions & 1 deletion cmd/ddltest/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ import (
_ "github.com/go-sql-driver/mysql"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/dumpling/context"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/ddl"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser/model"
Expand Down Expand Up @@ -100,7 +102,7 @@ func createDDLSuite(t *testing.T) (s *ddlSuite) {

// Make sure the schema lease of this session is equal to other TiDB servers'.
session.SetSchemaLease(time.Duration(*lease) * time.Second)

require.NoError(t, ddl.StartOwnerManager(context.Background(), s.store))
s.dom, err = session.BootstrapSession(s.store)
require.NoError(t, err)

Expand Down Expand Up @@ -185,6 +187,7 @@ func (s *ddlSuite) teardown(t *testing.T) {
case <-quitCh:
}
}()
ddl.CloseOwnerManager()
err := s.store.Close()
require.NoError(t, err)
close(quitCh)
Expand Down
13 changes: 8 additions & 5 deletions cmd/tidb-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func main() {
keyspaceName := keyspace.GetKeyspaceNameBySettings()
executor.Start()
resourcemanager.InstanceResourceManager.Start()
storage, dom := createStoreAndDomain(keyspaceName)
storage, dom := createStoreDDLOwnerMgrAndDomain(keyspaceName)
svr := createServer(storage, dom)

exited := make(chan struct{})
Expand Down Expand Up @@ -397,7 +397,7 @@ func registerStores() {
terror.MustNil(err)
}

func createStoreAndDomain(keyspaceName string) (kv.Storage, *domain.Domain) {
func createStoreDDLOwnerMgrAndDomain(keyspaceName string) (kv.Storage, *domain.Domain) {
cfg := config.GetGlobalConfig()
var fullPath string
if keyspaceName == "" {
Expand All @@ -411,6 +411,8 @@ func createStoreAndDomain(keyspaceName string) (kv.Storage, *domain.Domain) {
copr.GlobalMPPFailedStoreProber.Run()
mppcoordmanager.InstanceMPPCoordinatorManager.Run()
// Bootstrap a session to load information schema.
err = ddl.StartOwnerManager(context.Background(), storage)
terror.MustNil(err)
dom, err := session.BootstrapSession(storage)
terror.MustNil(err)
return storage, dom
Expand Down Expand Up @@ -859,7 +861,7 @@ func createServer(storage kv.Storage, dom *domain.Domain) *server.Server {
svr, err := server.NewServer(cfg, driver)
// Both domain and storage have started, so we have to clean them before exiting.
if err != nil {
closeDomainAndStorage(storage, dom)
closeDDLOwnerMgrDomainAndStorage(storage, dom)
log.Fatal("failed to create the server", zap.Error(err), zap.Stack("stack"))
}
svr.SetDomain(dom)
Expand Down Expand Up @@ -893,9 +895,10 @@ func setupTracing() {
opentracing.SetGlobalTracer(tracer)
}

func closeDomainAndStorage(storage kv.Storage, dom *domain.Domain) {
func closeDDLOwnerMgrDomainAndStorage(storage kv.Storage, dom *domain.Domain) {
tikv.StoreShuttingDown(1)
dom.Close()
ddl.CloseOwnerManager()
copr.GlobalMPPFailedStoreProber.Stop()
mppcoordmanager.InstanceMPPCoordinatorManager.Stop()
err := storage.Close()
Expand All @@ -918,7 +921,7 @@ func cleanup(svr *server.Server, storage kv.Storage, dom *domain.Domain) {
// See https://github.com/pingcap/tidb/issues/40038 for details.
svr.KillSysProcesses()
plugin.Shutdown(context.Background())
closeDomainAndStorage(storage, dom)
closeDDLOwnerMgrDomainAndStorage(storage, dom)
disk.CleanUp()
closeStmtSummary()
topsql.Close()
Expand Down
4 changes: 2 additions & 2 deletions pkg/autoid_service/autoid.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,8 +379,8 @@ func MockForTest(store kv.Storage) autoid.AutoIDAllocClient {

// Close closes the Service and clean up resource.
func (s *Service) Close() {
if s.leaderShip != nil && s.leaderShip.IsOwner() {
s.leaderShip.Cancel()
if s.leaderShip != nil {
s.leaderShip.Close()
}
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ go_library(
"modify_column.go",
"multi_schema_change.go",
"options.go",
"owner_mgr.go",
"partition.go",
"placement_policy.go",
"reorg.go",
Expand Down Expand Up @@ -133,6 +134,7 @@ go_library(
"//pkg/sessiontxn",
"//pkg/statistics",
"//pkg/statistics/handle",
"//pkg/store",
"//pkg/store/driver/txn",
"//pkg/store/helper",
"//pkg/table",
Expand Down
14 changes: 10 additions & 4 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ const (
ddlSchemaVersionKeyLock = "/tidb/ddl/schema_version_lock"
// addingDDLJobPrefix is the path prefix used to record the newly added DDL job, and it's saved to etcd.
addingDDLJobPrefix = "/tidb/ddl/add_ddl_job_"
ddlPrompt = "ddl"
// Prompt is the prompt for ddl owner manager.
Prompt = "ddl"

batchAddingJobs = 100

Expand Down Expand Up @@ -638,19 +639,21 @@ func newDDL(ctx context.Context, options ...Option) (*ddl, *executor) {
o(opt)
}

id := uuid.New().String()
var id string
var manager owner.Manager
var schemaVerSyncer schemaver.Syncer
var serverStateSyncer serverstate.Syncer
var deadLockCkr util.DeadTableLockChecker
if etcdCli := opt.EtcdCli; etcdCli == nil {
id = uuid.New().String()
// The etcdCli is nil if the store is localstore which is only used for testing.
// So we use mockOwnerManager and memSyncer.
manager = owner.NewMockManager(ctx, id, opt.Store, DDLOwnerKey)
schemaVerSyncer = schemaver.NewMemSyncer()
serverStateSyncer = serverstate.NewMemSyncer()
} else {
manager = owner.NewOwnerManager(ctx, etcdCli, ddlPrompt, id, DDLOwnerKey)
id = globalOwnerManager.ID()
manager = globalOwnerManager.OwnerManager()
schemaVerSyncer = schemaver.NewEtcdSyncer(etcdCli, id)
serverStateSyncer = serverstate.NewEtcdSyncer(etcdCli, util.ServerGlobalState)
deadLockCkr = util.NewDeadTableLockChecker(etcdCli)
Expand Down Expand Up @@ -1003,7 +1006,10 @@ func (d *ddl) close() {
startTime := time.Now()
d.cancel()
d.wg.Wait()
d.ownerManager.Cancel()
// when run with real-tikv, the lifecycle of ownerManager is managed by tidbInstance,
// when run with uni-store BreakCampaignLoop is same as Close.
// hope we can unify it after refactor to let some components only start once.
d.ownerManager.BreakCampaignLoop()
d.schemaVerSyncer.Close()

// d.delRangeMgr using sessions from d.sessPool.
Expand Down
83 changes: 83 additions & 0 deletions pkg/ddl/owner_mgr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ddl

import (
"context"

"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/owner"
storepkg "github.com/pingcap/tidb/pkg/store"
"github.com/pingcap/tidb/pkg/util/logutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)

var globalOwnerManager = &ownerManager{}

// StartOwnerManager starts a global DDL owner manager.
func StartOwnerManager(ctx context.Context, store kv.Storage) error {
return globalOwnerManager.Start(ctx, store)
}

// CloseOwnerManager closes the global DDL owner manager.
func CloseOwnerManager() {
globalOwnerManager.Close()
}

// ownerManager is used to manage lifecycle of a global DDL owner manager which
// we only want it to init session once, to avoid DDL owner change after upgrade.
type ownerManager struct {
etcdCli *clientv3.Client
id string
ownerMgr owner.Manager
}

// Start starts the TiDBInstance.
func (om *ownerManager) Start(ctx context.Context, store kv.Storage) error {
om.id = uuid.New().String()
cli, err := storepkg.NewEtcdCli(store)
if err != nil {
return errors.Trace(err)
}
if cli == nil {
return errors.New("etcd client is nil, maybe the server is not started with PD")
}
om.etcdCli = cli
om.ownerMgr = owner.NewOwnerManager(ctx, om.etcdCli, Prompt, om.id, DDLOwnerKey)
return nil
}

// Close closes the TiDBInstance.
func (om *ownerManager) Close() {
if om.ownerMgr != nil {
om.ownerMgr.Close()
}
if om.etcdCli != nil {
if err := om.etcdCli.Close(); err != nil {
logutil.BgLogger().Error("close etcd client failed", zap.Error(err))
}
}
}

func (om *ownerManager) ID() string {
return om.id
}

func (om *ownerManager) OwnerManager() owner.Manager {
return om.ownerMgr
}
4 changes: 2 additions & 2 deletions pkg/ddl/schemaver/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func TestSyncerSimple(t *testing.T) {
go func() {
require.NoError(t, d.OwnerManager().CampaignOwner())
}()
defer d.OwnerManager().Cancel()
defer d.OwnerManager().Close()

// for init function
require.NoError(t, d.SchemaSyncer().Init(ctx))
Expand All @@ -107,7 +107,7 @@ func TestSyncerSimple(t *testing.T) {
go func() {
require.NoError(t, d1.OwnerManager().CampaignOwner())
}()
defer d1.OwnerManager().Cancel()
defer d1.OwnerManager().Close()
require.NoError(t, d1.SchemaSyncer().Init(ctx))
defer d.SchemaSyncer().Close()

Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/serverstate/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestStateSyncerSimple(t *testing.T) {
wg.Run(func() {
require.NoError(t, d.OwnerManager().CampaignOwner())
})
defer d.OwnerManager().Cancel()
defer d.OwnerManager().Close()
// TODO: We can remove it when we call it in newDDL.
require.NoError(t, d.StateSyncer().Init(ctx))

Expand Down
1 change: 1 addition & 0 deletions pkg/disttask/framework/integrationtests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ go_test(
race = "off",
shard_count = 23,
deps = [
"//pkg/ddl",
"//pkg/disttask/framework/handle",
"//pkg/disttask/framework/proto",
"//pkg/disttask/framework/scheduler",
Expand Down
4 changes: 3 additions & 1 deletion pkg/disttask/framework/integrationtests/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"testing"
"time"

"github.com/pingcap/tidb/pkg/ddl"
"github.com/pingcap/tidb/pkg/disttask/framework/handle"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/scheduler"
Expand Down Expand Up @@ -113,11 +114,12 @@ func prepareForBenchTest(b *testing.B) {
var err error
store, err := d.Open("tikv://" + *testkit.WithTiKV)
require.NoError(b, err)

require.NoError(b, ddl.StartOwnerManager(context.Background(), store))
var dom *domain.Domain
dom, err = session.BootstrapSession(store)
defer func() {
dom.Close()
ddl.CloseOwnerManager()
err := store.Close()
require.NoError(b, err)
view.Stop()
Expand Down
Loading