From 56e7093a9d214401bebc39c228e37d334df90b10 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Mon, 11 Nov 2024 13:12:48 +0800 Subject: [PATCH] ddl: use a global owner manager instance for DDL, to avoid owner change (#57179) ref pingcap/tidb#57185 --- br/pkg/conn/BUILD.bazel | 1 + br/pkg/conn/conn.go | 5 + br/pkg/gluetidb/glue.go | 3 + br/pkg/task/stream.go | 6 +- cmd/benchdb/BUILD.bazel | 2 + cmd/benchdb/main.go | 9 +- cmd/ddltest/BUILD.bazel | 2 + cmd/ddltest/ddl_test.go | 6 +- cmd/tidb-server/main.go | 13 +- pkg/autoid_service/autoid.go | 4 +- pkg/ddl/BUILD.bazel | 4 + pkg/ddl/ddl.go | 14 +- pkg/ddl/owner_mgr.go | 97 ++++++++ pkg/ddl/owner_mgr_test.go | 57 +++++ pkg/ddl/schemaver/BUILD.bazel | 5 - pkg/ddl/schemaver/syncer_test.go | 93 +++----- pkg/ddl/serverstate/BUILD.bazel | 4 - pkg/ddl/serverstate/syncer_test.go | 47 +--- .../framework/integrationtests/BUILD.bazel | 2 + .../framework/integrationtests/bench_test.go | 6 +- pkg/domain/BUILD.bazel | 6 +- pkg/domain/domain.go | 83 ++----- pkg/domain/domain_test.go | 16 -- pkg/owner/BUILD.bazel | 5 - pkg/owner/manager.go | 142 ++++++----- pkg/owner/manager_test.go | 222 +++++++----------- pkg/owner/mock.go | 16 +- pkg/session/BUILD.bazel | 1 + pkg/session/bootstrap.go | 3 +- pkg/store/BUILD.bazel | 13 +- pkg/store/driver/BUILD.bazel | 2 + pkg/store/driver/main_test.go | 6 +- pkg/store/etcd.go | 85 +++++++ pkg/store/etcd_test.go | 48 ++++ pkg/testkit/BUILD.bazel | 2 + pkg/testkit/mockstore.go | 21 +- pkg/util/etcd.go | 7 + tests/realtikvtest/BUILD.bazel | 1 + tests/realtikvtest/testkit.go | 11 +- 39 files changed, 636 insertions(+), 434 deletions(-) create mode 100644 pkg/ddl/owner_mgr.go create mode 100644 pkg/ddl/owner_mgr_test.go create mode 100644 pkg/store/etcd.go create mode 100644 pkg/store/etcd_test.go diff --git a/br/pkg/conn/BUILD.bazel b/br/pkg/conn/BUILD.bazel index 2e035aaf2ea76..c6f7b693d6e3d 100644 --- a/br/pkg/conn/BUILD.bazel +++ b/br/pkg/conn/BUILD.bazel @@ -15,6 +15,7 @@ go_library( "//br/pkg/utils", "//br/pkg/version", "//pkg/config", + "//pkg/ddl", "//pkg/domain", "//pkg/kv", "@com_github_docker_go_units//:go-units", diff --git a/br/pkg/conn/conn.go b/br/pkg/conn/conn.go index 4d92c447e2bc8..35d0abf6ea3a9 100644 --- a/br/pkg/conn/conn.go +++ b/br/pkg/conn/conn.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/br/pkg/version" "github.com/pingcap/tidb/pkg/config" + "github.com/pingcap/tidb/pkg/ddl" "github.com/pingcap/tidb/pkg/domain" "github.com/pingcap/tidb/pkg/kv" "github.com/tikv/client-go/v2/oracle" @@ -194,6 +195,9 @@ func NewMgr( return nil, errors.Trace(err) } + if config.GetGlobalConfig().Store != "tikv" { + config.GetGlobalConfig().Store = "tikv" + } // Disable GC because TiDB enables GC already. path := fmt.Sprintf( "tikv://%s?disableGC=true&keyspaceName=%s", @@ -292,6 +296,7 @@ func (mgr *Mgr) Close() { if mgr.dom != nil { mgr.dom.Close() } + ddl.CloseOwnerManager() tikv.StoreShuttingDown(1) _ = mgr.storage.Close() } diff --git a/br/pkg/gluetidb/glue.go b/br/pkg/gluetidb/glue.go index a86ef312d1804..9514ae4f5f7a1 100644 --- a/br/pkg/gluetidb/glue.go +++ b/br/pkg/gluetidb/glue.go @@ -107,6 +107,9 @@ func (g Glue) startDomainAsNeeded(store kv.Storage) error { if existDom != nil { return nil } + if err := ddl.StartOwnerManager(context.Background(), store); err != nil { + return errors.Trace(err) + } dom, err := session.GetDomain(store) if err != nil { return err diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 6009eb47a6c70..8f5f14753eac3 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -920,7 +920,11 @@ func RunStreamAdvancer(c context.Context, g glue.Glue, cmdName string, cfg *Stre env := streamhelper.CliEnv(mgr.StoreManager, mgr.GetStore(), etcdCLI) advancer := streamhelper.NewCheckpointAdvancer(env) advancer.UpdateConfig(cfg.AdvancerCfg) - advancerd := daemon.New(advancer, streamhelper.OwnerManagerForLogBackup(ctx, etcdCLI), cfg.AdvancerCfg.TickDuration) + ownerMgr := streamhelper.OwnerManagerForLogBackup(ctx, etcdCLI) + defer func() { + ownerMgr.Close() + }() + advancerd := daemon.New(advancer, ownerMgr, cfg.AdvancerCfg.TickDuration) loop, err := advancerd.Begin(ctx) if err != nil { return err diff --git a/cmd/benchdb/BUILD.bazel b/cmd/benchdb/BUILD.bazel index 6e8acbd0bd3ff..a5275dbc84d66 100644 --- a/cmd/benchdb/BUILD.bazel +++ b/cmd/benchdb/BUILD.bazel @@ -6,6 +6,8 @@ go_library( importpath = "github.com/pingcap/tidb/cmd/benchdb", visibility = ["//visibility:private"], deps = [ + "//pkg/config", + "//pkg/ddl", "//pkg/parser/terror", "//pkg/session", "//pkg/session/types", diff --git a/cmd/benchdb/main.go b/cmd/benchdb/main.go index 8d95bee890ab9..43432f6fab828 100644 --- a/cmd/benchdb/main.go +++ b/cmd/benchdb/main.go @@ -24,6 +24,8 @@ import ( "time" "github.com/pingcap/log" + "github.com/pingcap/tidb/pkg/config" + "github.com/pingcap/tidb/pkg/ddl" "github.com/pingcap/tidb/pkg/parser/terror" "github.com/pingcap/tidb/pkg/session" sessiontypes "github.com/pingcap/tidb/pkg/session/types" @@ -96,11 +98,16 @@ func newBenchDB() *benchDB { // Create TiKV store and disable GC as we will trigger GC manually. store, err := store.New("tikv://" + *addr + "?disableGC=true") terror.MustNil(err) + // maybe close below components, but it's for test anyway. + ctx := context.Background() + config.GetGlobalConfig().Store = "tikv" + err = ddl.StartOwnerManager(ctx, store) + terror.MustNil(err) _, err = session.BootstrapSession(store) terror.MustNil(err) se, err := session.CreateSession(store) terror.MustNil(err) - _, err = se.ExecuteInternal(context.Background(), "use test") + _, err = se.ExecuteInternal(ctx, "use test") terror.MustNil(err) return &benchDB{ diff --git a/cmd/ddltest/BUILD.bazel b/cmd/ddltest/BUILD.bazel index bf3fce97cf57a..6dce632241496 100644 --- a/cmd/ddltest/BUILD.bazel +++ b/cmd/ddltest/BUILD.bazel @@ -14,7 +14,9 @@ go_test( race = "on", shard_count = 6, deps = [ + "//dumpling/context", "//pkg/config", + "//pkg/ddl", "//pkg/domain", "//pkg/kv", "//pkg/parser/model", diff --git a/cmd/ddltest/ddl_test.go b/cmd/ddltest/ddl_test.go index 15997dad72864..525ec722e810c 100644 --- a/cmd/ddltest/ddl_test.go +++ b/cmd/ddltest/ddl_test.go @@ -33,7 +33,9 @@ import ( _ "github.com/go-sql-driver/mysql" "github.com/pingcap/errors" "github.com/pingcap/log" + "github.com/pingcap/tidb/dumpling/context" "github.com/pingcap/tidb/pkg/config" + "github.com/pingcap/tidb/pkg/ddl" "github.com/pingcap/tidb/pkg/domain" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/parser/model" @@ -95,12 +97,13 @@ func createDDLSuite(t *testing.T) (s *ddlSuite) { s.quit = make(chan struct{}) + config.GetGlobalConfig().Store = "tikv" s.store, err = store.New(fmt.Sprintf("tikv://%s%s", *etcd, *tikvPath)) require.NoError(t, err) // Make sure the schema lease of this session is equal to other TiDB servers'. session.SetSchemaLease(time.Duration(*lease) * time.Second) - + require.NoError(t, ddl.StartOwnerManager(context.Background(), s.store)) s.dom, err = session.BootstrapSession(s.store) require.NoError(t, err) @@ -118,6 +121,7 @@ func createDDLSuite(t *testing.T) (s *ddlSuite) { err = domain.GetDomain(s.ctx).DDL().Stop() require.NoError(t, err) config.GetGlobalConfig().Instance.TiDBEnableDDL.Store(false) + ddl.CloseOwnerManager() session.ResetStoreForWithTiKVTest(s.store) s.dom.Close() require.NoError(t, s.store.Close()) diff --git a/cmd/tidb-server/main.go b/cmd/tidb-server/main.go index 9e6bfc78f5771..178f68bfaf27b 100644 --- a/cmd/tidb-server/main.go +++ b/cmd/tidb-server/main.go @@ -317,7 +317,7 @@ func main() { keyspaceName := keyspace.GetKeyspaceNameBySettings() executor.Start() resourcemanager.InstanceResourceManager.Start() - storage, dom := createStoreAndDomain(keyspaceName) + storage, dom := createStoreDDLOwnerMgrAndDomain(keyspaceName) svr := createServer(storage, dom) exited := make(chan struct{}) @@ -397,7 +397,7 @@ func registerStores() { terror.MustNil(err) } -func createStoreAndDomain(keyspaceName string) (kv.Storage, *domain.Domain) { +func createStoreDDLOwnerMgrAndDomain(keyspaceName string) (kv.Storage, *domain.Domain) { cfg := config.GetGlobalConfig() var fullPath string if keyspaceName == "" { @@ -411,6 +411,8 @@ func createStoreAndDomain(keyspaceName string) (kv.Storage, *domain.Domain) { copr.GlobalMPPFailedStoreProber.Run() mppcoordmanager.InstanceMPPCoordinatorManager.Run() // Bootstrap a session to load information schema. + err = ddl.StartOwnerManager(context.Background(), storage) + terror.MustNil(err) dom, err := session.BootstrapSession(storage) terror.MustNil(err) return storage, dom @@ -859,7 +861,7 @@ func createServer(storage kv.Storage, dom *domain.Domain) *server.Server { svr, err := server.NewServer(cfg, driver) // Both domain and storage have started, so we have to clean them before exiting. if err != nil { - closeDomainAndStorage(storage, dom) + closeDDLOwnerMgrDomainAndStorage(storage, dom) log.Fatal("failed to create the server", zap.Error(err), zap.Stack("stack")) } svr.SetDomain(dom) @@ -893,9 +895,10 @@ func setupTracing() { opentracing.SetGlobalTracer(tracer) } -func closeDomainAndStorage(storage kv.Storage, dom *domain.Domain) { +func closeDDLOwnerMgrDomainAndStorage(storage kv.Storage, dom *domain.Domain) { tikv.StoreShuttingDown(1) dom.Close() + ddl.CloseOwnerManager() copr.GlobalMPPFailedStoreProber.Stop() mppcoordmanager.InstanceMPPCoordinatorManager.Stop() err := storage.Close() @@ -918,7 +921,7 @@ func cleanup(svr *server.Server, storage kv.Storage, dom *domain.Domain) { // See https://github.com/pingcap/tidb/issues/40038 for details. svr.KillSysProcesses() plugin.Shutdown(context.Background()) - closeDomainAndStorage(storage, dom) + closeDDLOwnerMgrDomainAndStorage(storage, dom) disk.CleanUp() closeStmtSummary() topsql.Close() diff --git a/pkg/autoid_service/autoid.go b/pkg/autoid_service/autoid.go index 005d6783b9739..b5435e055f0e7 100644 --- a/pkg/autoid_service/autoid.go +++ b/pkg/autoid_service/autoid.go @@ -379,8 +379,8 @@ func MockForTest(store kv.Storage) autoid.AutoIDAllocClient { // Close closes the Service and clean up resource. func (s *Service) Close() { - if s.leaderShip != nil && s.leaderShip.IsOwner() { - s.leaderShip.Cancel() + if s.leaderShip != nil { + s.leaderShip.Close() } } diff --git a/pkg/ddl/BUILD.bazel b/pkg/ddl/BUILD.bazel index 2016d1dd5c3f2..9a0c65f0b34b4 100644 --- a/pkg/ddl/BUILD.bazel +++ b/pkg/ddl/BUILD.bazel @@ -51,6 +51,7 @@ go_library( "modify_column.go", "multi_schema_change.go", "options.go", + "owner_mgr.go", "partition.go", "placement_policy.go", "reorg.go", @@ -133,6 +134,7 @@ go_library( "//pkg/sessiontxn", "//pkg/statistics", "//pkg/statistics/handle", + "//pkg/store", "//pkg/store/driver/txn", "//pkg/store/helper", "//pkg/table", @@ -251,6 +253,7 @@ go_test( "multi_schema_change_test.go", "mv_index_test.go", "options_test.go", + "owner_mgr_test.go", "partition_test.go", "placement_policy_ddl_test.go", "placement_policy_test.go", @@ -356,6 +359,7 @@ go_test( "@com_github_tikv_client_go_v2//tikv", "@com_github_tikv_client_go_v2//util", "@io_etcd_go_etcd_client_v3//:client", + "@io_etcd_go_etcd_tests_v3//integration", "@org_golang_google_grpc//:grpc", "@org_golang_x_sync//errgroup", "@org_uber_go_atomic//:atomic", diff --git a/pkg/ddl/ddl.go b/pkg/ddl/ddl.go index 8f3fa816e1466..119ea18e74043 100644 --- a/pkg/ddl/ddl.go +++ b/pkg/ddl/ddl.go @@ -73,7 +73,8 @@ const ( ddlSchemaVersionKeyLock = "/tidb/ddl/schema_version_lock" // addingDDLJobPrefix is the path prefix used to record the newly added DDL job, and it's saved to etcd. addingDDLJobPrefix = "/tidb/ddl/add_ddl_job_" - ddlPrompt = "ddl" + // Prompt is the prompt for ddl owner manager. + Prompt = "ddl" batchAddingJobs = 100 @@ -638,19 +639,21 @@ func newDDL(ctx context.Context, options ...Option) (*ddl, *executor) { o(opt) } - id := uuid.New().String() + var id string var manager owner.Manager var schemaVerSyncer schemaver.Syncer var serverStateSyncer serverstate.Syncer var deadLockCkr util.DeadTableLockChecker if etcdCli := opt.EtcdCli; etcdCli == nil { + id = uuid.New().String() // The etcdCli is nil if the store is localstore which is only used for testing. // So we use mockOwnerManager and memSyncer. manager = owner.NewMockManager(ctx, id, opt.Store, DDLOwnerKey) schemaVerSyncer = schemaver.NewMemSyncer() serverStateSyncer = serverstate.NewMemSyncer() } else { - manager = owner.NewOwnerManager(ctx, etcdCli, ddlPrompt, id, DDLOwnerKey) + id = globalOwnerManager.ID() + manager = globalOwnerManager.OwnerManager() schemaVerSyncer = schemaver.NewEtcdSyncer(etcdCli, id) serverStateSyncer = serverstate.NewEtcdSyncer(etcdCli, util.ServerGlobalState) deadLockCkr = util.NewDeadTableLockChecker(etcdCli) @@ -1003,7 +1006,10 @@ func (d *ddl) close() { startTime := time.Now() d.cancel() d.wg.Wait() - d.ownerManager.Cancel() + // when run with real-tikv, the lifecycle of ownerManager is managed by globalOwnerManager, + // when run with uni-store BreakCampaignLoop is same as Close. + // hope we can unify it after refactor to let some components only start once. + d.ownerManager.BreakCampaignLoop() d.schemaVerSyncer.Close() // d.delRangeMgr using sessions from d.sessPool. diff --git a/pkg/ddl/owner_mgr.go b/pkg/ddl/owner_mgr.go new file mode 100644 index 0000000000000..7e19be9cc7c3a --- /dev/null +++ b/pkg/ddl/owner_mgr.go @@ -0,0 +1,97 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ddl + +import ( + "context" + + "github.com/google/uuid" + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/pkg/config" + "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/owner" + storepkg "github.com/pingcap/tidb/pkg/store" + "github.com/pingcap/tidb/pkg/util/logutil" + clientv3 "go.etcd.io/etcd/client/v3" + "go.uber.org/zap" +) + +var globalOwnerManager = &ownerManager{} + +// StartOwnerManager starts a global DDL owner manager. +func StartOwnerManager(ctx context.Context, store kv.Storage) error { + return globalOwnerManager.Start(ctx, store) +} + +// CloseOwnerManager closes the global DDL owner manager. +func CloseOwnerManager() { + globalOwnerManager.Close() +} + +// ownerManager is used to manage lifecycle of a global DDL owner manager which +// we only want it to init session once, to avoid DDL owner change after upgrade. +type ownerManager struct { + etcdCli *clientv3.Client + id string + ownerMgr owner.Manager + started bool +} + +// Start starts the TiDBInstance. +func (om *ownerManager) Start(ctx context.Context, store kv.Storage) error { + // BR might start domain multiple times, we need to avoid it. when BR have refactored + // this part, we can remove this. + if om.started { + return nil + } + if config.GetGlobalConfig().Store != "tikv" { + return nil + } + cli, err := storepkg.NewEtcdCli(store) + if err != nil { + return errors.Trace(err) + } + failpoint.InjectCall("injectEtcdClient", &cli) + if cli == nil { + return errors.New("etcd client is nil, maybe the server is not started with PD") + } + om.id = uuid.New().String() + om.etcdCli = cli + om.ownerMgr = owner.NewOwnerManager(ctx, om.etcdCli, Prompt, om.id, DDLOwnerKey) + om.started = true + return nil +} + +// Close closes the TiDBInstance. +func (om *ownerManager) Close() { + if om.ownerMgr != nil { + om.ownerMgr.Close() + } + if om.etcdCli != nil { + if err := om.etcdCli.Close(); err != nil { + logutil.BgLogger().Error("close etcd client failed", zap.Error(err)) + } + } + om.started = false +} + +func (om *ownerManager) ID() string { + return om.id +} + +func (om *ownerManager) OwnerManager() owner.Manager { + return om.ownerMgr +} diff --git a/pkg/ddl/owner_mgr_test.go b/pkg/ddl/owner_mgr_test.go new file mode 100644 index 0000000000000..27e9fe5e021d8 --- /dev/null +++ b/pkg/ddl/owner_mgr_test.go @@ -0,0 +1,57 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ddl + +import ( + "context" + "testing" + + "github.com/pingcap/tidb/pkg/config" + "github.com/pingcap/tidb/pkg/testkit/testfailpoint" + "github.com/stretchr/testify/require" + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/tests/v3/integration" +) + +func TestOwnerManager(t *testing.T) { + bak := config.GetGlobalConfig().Store + t.Cleanup(func() { + config.GetGlobalConfig().Store = bak + globalOwnerManager = &ownerManager{} + }) + config.GetGlobalConfig().Store = "unistore" + globalOwnerManager = &ownerManager{} + ctx := context.Background() + require.NoError(t, StartOwnerManager(ctx, nil)) + require.Nil(t, globalOwnerManager.etcdCli) + require.Nil(t, globalOwnerManager.ownerMgr) + require.Empty(t, globalOwnerManager.id) + CloseOwnerManager() + + integration.BeforeTestExternal(t) + cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer cluster.Terminate(t) + cli := cluster.RandClient() + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/injectEtcdClient", func(cliP **clientv3.Client) { + *cliP = cli + }) + config.GetGlobalConfig().Store = "tikv" + require.NoError(t, StartOwnerManager(ctx, nil)) + require.Same(t, cli, globalOwnerManager.etcdCli) + require.NotEmpty(t, globalOwnerManager.id) + require.NotNil(t, globalOwnerManager.ownerMgr) + CloseOwnerManager() + cluster.TakeClient(0) +} diff --git a/pkg/ddl/schemaver/BUILD.bazel b/pkg/ddl/schemaver/BUILD.bazel index db28e7939d2b7..eae7a867057cf 100644 --- a/pkg/ddl/schemaver/BUILD.bazel +++ b/pkg/ddl/schemaver/BUILD.bazel @@ -36,20 +36,15 @@ go_test( flaky = True, shard_count = 5, deps = [ - "//pkg/ddl", "//pkg/ddl/util", "//pkg/domain/infosync", - "//pkg/infoschema", "//pkg/parser/terror", - "//pkg/session", "//pkg/sessionctx/variable", - "//pkg/store/mockstore", "//pkg/util", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", "@com_github_stretchr_testify//require", "@io_etcd_go_etcd_api_v3//mvccpb", - "@io_etcd_go_etcd_client_v3//:client", "@io_etcd_go_etcd_server_v3//etcdserver", "@io_etcd_go_etcd_tests_v3//integration", "@org_golang_google_grpc//codes", diff --git a/pkg/ddl/schemaver/syncer_test.go b/pkg/ddl/schemaver/syncer_test.go index 1e8e285cb1f4e..a62bd24250b13 100644 --- a/pkg/ddl/schemaver/syncer_test.go +++ b/pkg/ddl/schemaver/syncer_test.go @@ -23,18 +23,13 @@ import ( "time" "github.com/pingcap/errors" - . "github.com/pingcap/tidb/pkg/ddl" "github.com/pingcap/tidb/pkg/ddl/schemaver" util2 "github.com/pingcap/tidb/pkg/ddl/util" - "github.com/pingcap/tidb/pkg/infoschema" "github.com/pingcap/tidb/pkg/parser/terror" - "github.com/pingcap/tidb/pkg/session" "github.com/pingcap/tidb/pkg/sessionctx/variable" - "github.com/pingcap/tidb/pkg/store/mockstore" "github.com/pingcap/tidb/pkg/util" "github.com/stretchr/testify/require" "go.etcd.io/etcd/api/v3/mvccpb" - clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/server/v3/etcdserver" "go.etcd.io/etcd/tests/v3/integration" "google.golang.org/grpc/codes" @@ -42,7 +37,6 @@ import ( ) const minInterval = 10 * time.Nanosecond // It's used to test timeout. -const testLease = 5 * time.Millisecond func TestSyncerSimple(t *testing.T) { variable.EnableMDL.Store(false) @@ -57,59 +51,31 @@ func TestSyncerSimple(t *testing.T) { schemaver.CheckVersFirstWaitTime = origin }() - store, err := mockstore.NewMockStore() - require.NoError(t, err) - defer func() { require.NoError(t, store.Close()) }() - domain, err := session.GetDomain(store) - require.NoError(t, err) - cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer cluster.Terminate(t) cli := cluster.RandClient() ctx, cancel := context.WithCancel(context.Background()) defer cancel() - ic := infoschema.NewCache(nil, 2) - ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 0), 0) - d, _ := NewDDL( - ctx, - WithEtcdClient(cli), - WithStore(store), - WithLease(testLease), - WithInfoCache(ic), - WithSchemaLoader(domain), - ) - go func() { - require.NoError(t, d.OwnerManager().CampaignOwner()) + syncers := make([]schemaver.Syncer, 0, 2) + for i := 0; i < 2; i++ { + id := strconv.Itoa(i + 1) + schemaVerSyncer := schemaver.NewEtcdSyncer(cli, id) + require.NoError(t, schemaVerSyncer.Init(ctx)) + syncers = append(syncers, schemaVerSyncer) + } + defer func() { + for _, syncer := range syncers { + syncer.Close() + } }() - defer d.OwnerManager().Cancel() - - // for init function - require.NoError(t, d.SchemaSyncer().Init(ctx)) - resp, err := cli.Get(ctx, util2.DDLAllSchemaVersions, clientv3.WithPrefix()) - require.NoError(t, err) - - defer d.SchemaSyncer().Close() - - key := util2.DDLAllSchemaVersions + "/" + d.OwnerManager().ID() - checkRespKV(t, 1, key, schemaver.InitialVersion, resp.Kvs...) - - ic2 := infoschema.NewCache(nil, 2) - ic2.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 0), 0) - d1, _ := NewDDL( - ctx, - WithEtcdClient(cli), - WithStore(store), - WithLease(testLease), - WithInfoCache(ic2), - WithSchemaLoader(domain), - ) - go func() { - require.NoError(t, d1.OwnerManager().CampaignOwner()) - }() - defer d1.OwnerManager().Cancel() - require.NoError(t, d1.SchemaSyncer().Init(ctx)) - defer d.SchemaSyncer().Close() + for i := range syncers { + id := strconv.Itoa(i + 1) + key := util2.DDLAllSchemaVersions + "/" + id + resp, err := cli.Get(ctx, key) + require.NoError(t, err) + checkRespKV(t, 1, key, schemaver.InitialVersion, resp.Kvs...) + } // for watchCh var wg util.WaitGroupWrapper @@ -117,7 +83,7 @@ func TestSyncerSimple(t *testing.T) { var checkErr string wg.Run(func() { select { - case resp := <-d.SchemaSyncer().GlobalVersionCh(): + case resp := <-syncers[0].GlobalVersionCh(): if len(resp.Events) < 1 { checkErr = "get chan events count less than 1" return @@ -130,7 +96,7 @@ func TestSyncerSimple(t *testing.T) { }) // for update latestSchemaVersion - require.NoError(t, d.SchemaSyncer().OwnerUpdateGlobalVersion(ctx, currentVer)) + require.NoError(t, syncers[0].OwnerUpdateGlobalVersion(ctx, currentVer)) wg.Wait() @@ -138,34 +104,35 @@ func TestSyncerSimple(t *testing.T) { // for CheckAllVersions childCtx, cancel := context.WithTimeout(ctx, 200*time.Millisecond) - require.Error(t, d.SchemaSyncer().WaitVersionSynced(childCtx, 0, currentVer)) + require.Error(t, syncers[0].WaitVersionSynced(childCtx, 0, currentVer)) cancel() // for UpdateSelfVersion - require.NoError(t, d.SchemaSyncer().UpdateSelfVersion(context.Background(), 0, currentVer)) - require.NoError(t, d1.SchemaSyncer().UpdateSelfVersion(context.Background(), 0, currentVer)) + require.NoError(t, syncers[0].UpdateSelfVersion(context.Background(), 0, currentVer)) + require.NoError(t, syncers[1].UpdateSelfVersion(context.Background(), 0, currentVer)) childCtx, cancel = context.WithTimeout(ctx, minInterval) defer cancel() - err = d1.SchemaSyncer().UpdateSelfVersion(childCtx, 0, currentVer) + err := syncers[1].UpdateSelfVersion(childCtx, 0, currentVer) require.True(t, isTimeoutError(err)) // for CheckAllVersions - require.NoError(t, d.SchemaSyncer().WaitVersionSynced(context.Background(), 0, currentVer-1)) - require.NoError(t, d.SchemaSyncer().WaitVersionSynced(context.Background(), 0, currentVer)) + require.NoError(t, syncers[0].WaitVersionSynced(context.Background(), 0, currentVer-1)) + require.NoError(t, syncers[0].WaitVersionSynced(context.Background(), 0, currentVer)) childCtx, cancel = context.WithTimeout(ctx, minInterval) defer cancel() - err = d.SchemaSyncer().WaitVersionSynced(childCtx, 0, currentVer) + err = syncers[0].WaitVersionSynced(childCtx, 0, currentVer) require.True(t, isTimeoutError(err)) // for Close - resp, err = cli.Get(context.Background(), key) + key := util2.DDLAllSchemaVersions + "/1" + resp, err := cli.Get(context.Background(), key) require.NoError(t, err) currVer := fmt.Sprintf("%v", currentVer) checkRespKV(t, 1, key, currVer, resp.Kvs...) - d.SchemaSyncer().Close() + syncers[0].Close() resp, err = cli.Get(context.Background(), key) require.NoError(t, err) require.Len(t, resp.Kvs, 0) diff --git a/pkg/ddl/serverstate/BUILD.bazel b/pkg/ddl/serverstate/BUILD.bazel index 9c983d8036b30..e9e1cf420e802 100644 --- a/pkg/ddl/serverstate/BUILD.bazel +++ b/pkg/ddl/serverstate/BUILD.bazel @@ -30,13 +30,9 @@ go_test( flaky = True, deps = [ ":serverstate", - "//pkg/ddl", "//pkg/ddl/schemaver", "//pkg/ddl/util", - "//pkg/infoschema", - "//pkg/session", "//pkg/sessionctx/variable", - "//pkg/store/mockstore", "//pkg/util", "@com_github_stretchr_testify//require", "@io_etcd_go_etcd_api_v3//mvccpb", diff --git a/pkg/ddl/serverstate/syncer_test.go b/pkg/ddl/serverstate/syncer_test.go index 0c24430ce6bc9..a6c181f2754b5 100644 --- a/pkg/ddl/serverstate/syncer_test.go +++ b/pkg/ddl/serverstate/syncer_test.go @@ -20,14 +20,10 @@ import ( "testing" "time" - . "github.com/pingcap/tidb/pkg/ddl" "github.com/pingcap/tidb/pkg/ddl/schemaver" "github.com/pingcap/tidb/pkg/ddl/serverstate" util2 "github.com/pingcap/tidb/pkg/ddl/util" - "github.com/pingcap/tidb/pkg/infoschema" - "github.com/pingcap/tidb/pkg/session" "github.com/pingcap/tidb/pkg/sessionctx/variable" - "github.com/pingcap/tidb/pkg/store/mockstore" "github.com/pingcap/tidb/pkg/util" "github.com/stretchr/testify/require" "go.etcd.io/etcd/api/v3/mvccpb" @@ -59,42 +55,23 @@ func TestStateSyncerSimple(t *testing.T) { schemaver.CheckVersFirstWaitTime = origin }() - store, err := mockstore.NewMockStore() - require.NoError(t, err) - defer func() { require.NoError(t, store.Close()) }() - domain, err := session.GetDomain(store) - require.NoError(t, err) - cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer cluster.Terminate(t) cli := cluster.RandClient() ctx, cancel := context.WithCancel(context.Background()) defer cancel() - ic := infoschema.NewCache(nil, 2) - ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 0), 0) - d, _ := NewDDL( - ctx, - WithEtcdClient(cli), - WithStore(store), - WithLease(5*time.Millisecond), - WithInfoCache(ic), - WithSchemaLoader(domain), - ) var wg util.WaitGroupWrapper - wg.Run(func() { - require.NoError(t, d.OwnerManager().CampaignOwner()) - }) - defer d.OwnerManager().Cancel() + serverStateSyncer := serverstate.NewEtcdSyncer(cli, util2.ServerGlobalState) // TODO: We can remove it when we call it in newDDL. - require.NoError(t, d.StateSyncer().Init(ctx)) + require.NoError(t, serverStateSyncer.Init(ctx)) // for GetGlobalState // for the initial state stateInfo := &serverstate.StateInfo{State: serverstate.StateNormalRunning} - respState, err := d.StateSyncer().GetGlobalState(ctx) + respState, err := serverStateSyncer.GetGlobalState(ctx) require.Nil(t, err) require.Equal(t, stateInfo, respState) - require.False(t, d.StateSyncer().IsUpgradingState()) + require.False(t, serverStateSyncer.IsUpgradingState()) // for watchCh var checkErr string stateInfo.State = serverstate.StateUpgrading @@ -102,25 +79,25 @@ func TestStateSyncerSimple(t *testing.T) { require.Nil(t, err) checkValue := func() { select { - case resp := <-d.StateSyncer().WatchChan(): + case resp := <-serverStateSyncer.WatchChan(): if len(resp.Events) < 1 { checkErr = "get chan events count less than 1" return } checkRespKV(t, 1, util2.ServerGlobalState, string(stateInfoByte), resp.Events[0].Kv) if stateInfo.State == serverstate.StateUpgrading { - require.False(t, d.StateSyncer().IsUpgradingState()) + require.False(t, serverStateSyncer.IsUpgradingState()) } else { - require.True(t, d.StateSyncer().IsUpgradingState()) + require.True(t, serverStateSyncer.IsUpgradingState()) } // for GetGlobalState - respState, err := d.StateSyncer().GetGlobalState(ctx) + respState, err := serverStateSyncer.GetGlobalState(ctx) require.Nil(t, err) require.Equal(t, stateInfo, respState) if stateInfo.State == serverstate.StateUpgrading { - require.True(t, d.StateSyncer().IsUpgradingState()) + require.True(t, serverStateSyncer.IsUpgradingState()) } else { - require.False(t, d.StateSyncer().IsUpgradingState()) + require.False(t, serverStateSyncer.IsUpgradingState()) } case <-time.After(3 * time.Second): checkErr = "get update state failed" @@ -131,7 +108,7 @@ func TestStateSyncerSimple(t *testing.T) { // for update UpdateGlobalState // for StateUpgrading wg.Run(checkValue) - require.NoError(t, d.StateSyncer().UpdateGlobalState(ctx, &serverstate.StateInfo{State: serverstate.StateUpgrading})) + require.NoError(t, serverStateSyncer.UpdateGlobalState(ctx, &serverstate.StateInfo{State: serverstate.StateUpgrading})) wg.Wait() require.Equal(t, "", checkErr) // for StateNormalRunning @@ -139,7 +116,7 @@ func TestStateSyncerSimple(t *testing.T) { stateInfoByte, err = stateInfo.Marshal() require.Nil(t, err) wg.Run(checkValue) - require.NoError(t, d.StateSyncer().UpdateGlobalState(ctx, &serverstate.StateInfo{State: serverstate.StateNormalRunning})) + require.NoError(t, serverStateSyncer.UpdateGlobalState(ctx, &serverstate.StateInfo{State: serverstate.StateNormalRunning})) wg.Wait() require.Equal(t, "", checkErr) } diff --git a/pkg/disttask/framework/integrationtests/BUILD.bazel b/pkg/disttask/framework/integrationtests/BUILD.bazel index b9f8bc2d4a502..89dc8f8b5e4c9 100644 --- a/pkg/disttask/framework/integrationtests/BUILD.bazel +++ b/pkg/disttask/framework/integrationtests/BUILD.bazel @@ -18,6 +18,8 @@ go_test( race = "off", shard_count = 23, deps = [ + "//pkg/config", + "//pkg/ddl", "//pkg/disttask/framework/handle", "//pkg/disttask/framework/proto", "//pkg/disttask/framework/scheduler", diff --git a/pkg/disttask/framework/integrationtests/bench_test.go b/pkg/disttask/framework/integrationtests/bench_test.go index fd0b0d325b077..756cb7d7015e5 100644 --- a/pkg/disttask/framework/integrationtests/bench_test.go +++ b/pkg/disttask/framework/integrationtests/bench_test.go @@ -21,6 +21,8 @@ import ( "testing" "time" + "github.com/pingcap/tidb/pkg/config" + "github.com/pingcap/tidb/pkg/ddl" "github.com/pingcap/tidb/pkg/disttask/framework/handle" "github.com/pingcap/tidb/pkg/disttask/framework/proto" "github.com/pingcap/tidb/pkg/disttask/framework/scheduler" @@ -113,11 +115,13 @@ func prepareForBenchTest(b *testing.B) { var err error store, err := d.Open("tikv://" + *testkit.WithTiKV) require.NoError(b, err) - + config.GetGlobalConfig().Store = "tikv" + require.NoError(b, ddl.StartOwnerManager(context.Background(), store)) var dom *domain.Domain dom, err = session.BootstrapSession(store) defer func() { dom.Close() + ddl.CloseOwnerManager() err := store.Close() require.NoError(b, err) view.Stop() diff --git a/pkg/domain/BUILD.bazel b/pkg/domain/BUILD.bazel index fea17d5c870e1..a7d37251cc81b 100644 --- a/pkg/domain/BUILD.bazel +++ b/pkg/domain/BUILD.bazel @@ -67,6 +67,7 @@ go_library( "//pkg/statistics/handle/initstats", "//pkg/statistics/handle/logutil", "//pkg/statistics/handle/util", + "//pkg/store", "//pkg/store/helper", "//pkg/ttl/ttlworker", "//pkg/types", @@ -109,9 +110,6 @@ go_library( "@com_github_tikv_pd_client//resource_group/controller", "@io_etcd_go_etcd_client_v3//:client", "@io_etcd_go_etcd_client_v3//concurrency", - "@org_golang_google_grpc//:grpc", - "@org_golang_google_grpc//backoff", - "@org_golang_google_grpc//keepalive", "@org_uber_go_atomic//:atomic", "@org_uber_go_zap//:zap", ], @@ -136,7 +134,7 @@ go_test( ], embed = [":domain"], flaky = True, - shard_count = 31, + shard_count = 30, deps = [ "//pkg/config", "//pkg/ddl", diff --git a/pkg/domain/domain.go b/pkg/domain/domain.go index d8e76396de26f..f4e4796e4c38b 100644 --- a/pkg/domain/domain.go +++ b/pkg/domain/domain.go @@ -74,6 +74,7 @@ import ( "github.com/pingcap/tidb/pkg/statistics/handle/initstats" statslogutil "github.com/pingcap/tidb/pkg/statistics/handle/logutil" handleutil "github.com/pingcap/tidb/pkg/statistics/handle/util" + "github.com/pingcap/tidb/pkg/store" "github.com/pingcap/tidb/pkg/store/helper" "github.com/pingcap/tidb/pkg/ttl/ttlworker" "github.com/pingcap/tidb/pkg/types" @@ -104,9 +105,6 @@ import ( "go.etcd.io/etcd/client/v3/concurrency" atomicutil "go.uber.org/atomic" "go.uber.org/zap" - "google.golang.org/grpc" - "google.golang.org/grpc/backoff" - "google.golang.org/grpc/keepalive" ) var ( @@ -193,6 +191,7 @@ type Domain struct { expiredTimeStamp types.Time } + brOwnerMgr owner.Manager logBackupAdvancer *daemon.OwnerDaemon historicalStatsWorker *HistoricalStatsWorker ttlJobManager atomic.Pointer[ttlworker.JobManager] @@ -1226,8 +1225,8 @@ func (do *Domain) Close() { } do.releaseServerID(context.Background()) close(do.exit) - if do.etcdClient != nil { - terror.Log(errors.Trace(do.etcdClient.Close())) + if do.brOwnerMgr != nil { + do.brOwnerMgr.Close() } do.runawayManager.Stop() @@ -1243,6 +1242,9 @@ func (do *Domain) Close() { } do.cancelFns.mu.Unlock() do.wg.Wait() + if do.etcdClient != nil { + terror.Log(errors.Trace(do.etcdClient.Close())) + } do.sysSessionPool.Close() variable.UnregisterStatistics(do.BindHandle()) if do.onClose != nil { @@ -1315,62 +1317,6 @@ func NewDomain(store kv.Storage, schemaLease time.Duration, statsLease time.Dura const serverIDForStandalone = 1 // serverID for standalone deployment. -// NewEtcdCli creates a new clientv3.Client from store if the store support it. -// the returned client might be nil. -// TODO currently uni-store/mock-tikv/tikv all implements EtcdBackend while they don't support actually. -// refactor this part. -func NewEtcdCli(store kv.Storage) (*clientv3.Client, error) { - etcdStore, addrs, err := getEtcdAddrs(store) - if err != nil { - return nil, err - } - if len(addrs) == 0 { - return nil, nil - } - cli, err := newEtcdCli(addrs, etcdStore) - if err != nil { - return nil, err - } - return cli, nil -} - -func getEtcdAddrs(store kv.Storage) (kv.EtcdBackend, []string, error) { - etcdStore, ok := store.(kv.EtcdBackend) - if !ok { - return nil, nil, nil - } - addrs, err := etcdStore.EtcdAddrs() - if err != nil { - return nil, nil, err - } - return etcdStore, addrs, nil -} - -func newEtcdCli(addrs []string, ebd kv.EtcdBackend) (*clientv3.Client, error) { - cfg := config.GetGlobalConfig() - etcdLogCfg := zap.NewProductionConfig() - etcdLogCfg.Level = zap.NewAtomicLevelAt(zap.ErrorLevel) - backoffCfg := backoff.DefaultConfig - backoffCfg.MaxDelay = 3 * time.Second - cli, err := clientv3.New(clientv3.Config{ - LogConfig: &etcdLogCfg, - Endpoints: addrs, - AutoSyncInterval: 30 * time.Second, - DialTimeout: 5 * time.Second, - DialOptions: []grpc.DialOption{ - grpc.WithConnectParams(grpc.ConnectParams{ - Backoff: backoffCfg, - }), - grpc.WithKeepaliveParams(keepalive.ClientParameters{ - Time: time.Duration(cfg.TiKVClient.GrpcKeepAliveTime) * time.Second, - Timeout: time.Duration(cfg.TiKVClient.GrpcKeepAliveTimeout) * time.Second, - }), - }, - TLS: ebd.TLSConfig(), - }) - return cli, err -} - // Init initializes a domain. after return, session can be used to do DMLs but not // DDLs which can be used after domain Start. func (do *Domain) Init( @@ -1379,12 +1325,12 @@ func (do *Domain) Init( ) error { do.sysExecutorFactory = sysExecutorFactory perfschema.Init() - etcdStore, addrs, err := getEtcdAddrs(do.store) + etcdStore, addrs, err := store.GetEtcdAddrs(do.store) if err != nil { return errors.Trace(err) } if len(addrs) > 0 { - cli, err2 := newEtcdCli(addrs, etcdStore) + cli, err2 := store.NewEtcdCliWithAddrs(addrs, etcdStore) if err2 != nil { return errors.Trace(err2) } @@ -1394,7 +1340,7 @@ func (do *Domain) Init( do.autoidClient = autoid.NewClientDiscover(cli) - unprefixedEtcdCli, err2 := newEtcdCli(addrs, etcdStore) + unprefixedEtcdCli, err2 := store.NewEtcdCliWithAddrs(addrs, etcdStore) if err2 != nil { return errors.Trace(err2) } @@ -1593,7 +1539,8 @@ func (do *Domain) initLogBackup(ctx context.Context, pdClient pd.Client) error { return err } adv := streamhelper.NewCheckpointAdvancer(env) - do.logBackupAdvancer = daemon.New(adv, streamhelper.OwnerManagerForLogBackup(ctx, do.etcdClient), adv.Config().TickDuration) + do.brOwnerMgr = streamhelper.OwnerManagerForLogBackup(ctx, do.etcdClient) + do.logBackupAdvancer = daemon.New(adv, do.brOwnerMgr, adv.Config().TickDuration) loop, err := do.logBackupAdvancer.Begin(ctx) if err != nil { return err @@ -2088,7 +2035,7 @@ func (do *Domain) globalBindHandleWorkerLoop(owner owner.Manager) { for { select { case <-do.exit: - owner.Cancel() + owner.Close() return case <-bindWorkerTicker.C: bindHandle := do.BindHandle() @@ -2486,7 +2433,7 @@ func (do *Domain) disableStatsOwner() error { func quitStatsOwner(do *Domain, mgr owner.Manager) { <-do.exit - mgr.Cancel() + mgr.Close() } // StartLoadStatsSubWorkers starts sub workers with new sessions to load stats concurrently. @@ -2600,7 +2547,7 @@ func (do *Domain) updateStatsWorkerExitPreprocessing(statsHandle *handle.Handle) logutil.BgLogger().Info("updateStatsWorker is going to exit, start to flush stats") statsHandle.FlushStats() logutil.BgLogger().Info("updateStatsWorker ready to release owner") - do.statsOwner.Cancel() + do.statsOwner.Close() ch <- struct{}{} }() select { diff --git a/pkg/domain/domain_test.go b/pkg/domain/domain_test.go index f575d8702b305..92eff2e83dd98 100644 --- a/pkg/domain/domain_test.go +++ b/pkg/domain/domain_test.go @@ -486,19 +486,3 @@ func TestDeferFn(t *testing.T) { require.True(t, d) require.Len(t, df.data, 1) } - -func TestNewEtcdCliGetEtcdAddrs(t *testing.T) { - etcdStore, addrs, err := getEtcdAddrs(nil) - require.NoError(t, err) - require.Empty(t, addrs) - require.Nil(t, etcdStore) - - etcdStore, addrs, err = getEtcdAddrs(&mockEtcdBackend{pdAddrs: []string{"localhost:2379"}}) - require.NoError(t, err) - require.Equal(t, []string{"localhost:2379"}, addrs) - require.NotNil(t, etcdStore) - - cli, err := NewEtcdCli(nil) - require.NoError(t, err) - require.Nil(t, cli) -} diff --git a/pkg/owner/BUILD.bazel b/pkg/owner/BUILD.bazel index 20d8e5ac8debe..e494e0f54666f 100644 --- a/pkg/owner/BUILD.bazel +++ b/pkg/owner/BUILD.bazel @@ -39,12 +39,7 @@ go_test( flaky = True, shard_count = 10, deps = [ - "//pkg/ddl", - "//pkg/infoschema", - "//pkg/kv", "//pkg/parser/terror", - "//pkg/store/mockstore", - "//pkg/testkit", "//pkg/testkit/testsetup", "//pkg/util", "//pkg/util/logutil", diff --git a/pkg/owner/manager.go b/pkg/owner/manager.go index 4db5d655dafaa..88aaccbae9a64 100644 --- a/pkg/owner/manager.go +++ b/pkg/owner/manager.go @@ -57,16 +57,23 @@ type Manager interface { GetOwnerID(ctx context.Context) (string, error) // SetOwnerOpValue updates the owner op value. SetOwnerOpValue(ctx context.Context, op OpType) error - // CampaignOwner campaigns the owner. + // CampaignOwner campaigns the owner. It will start a background goroutine to + // campaign owner in a loop, and when become or retire owner, it will call methods + // of the listener. CampaignOwner(...int) error - // ResignOwner lets the owner start a new election. - ResignOwner(ctx context.Context) error - // Cancel cancels this etcd ownerManager. - Cancel() - // RequireOwner requires the ownerManager is owner. - RequireOwner(ctx context.Context) error - // CampaignCancel cancels one etcd campaign + // CampaignCancel cancels one etcd campaign, it will also close the underlying + // etcd session. After this method is called, the manager can be used to campaign + // owner again. CampaignCancel() + // BreakCampaignLoop breaks the campaign loop, related listener methods will + // be called. The underlying etcd session the related campaign key will remain, + // so if some instance is the owner before, after break and campaign again, it + // will still be the owner. + BreakCampaignLoop() + // ResignOwner will resign and start a new election if it's the owner. + ResignOwner(ctx context.Context) error + // Close closes the manager, after close, no methods can be called. + Close() // SetListener sets the listener, set before CampaignOwner. SetListener(listener Listener) // ForceToBeOwner restart the owner election and trying to be the new owner by @@ -129,27 +136,24 @@ type ownerManager struct { logPrefix string logCtx context.Context etcdCli *clientv3.Client - cancel context.CancelFunc elec atomic.Pointer[concurrency.Election] sessionLease *atomicutil.Int64 wg sync.WaitGroup campaignCancel context.CancelFunc - listener Listener - forceOwnerSession *concurrency.Session + listener Listener + etcdSes *concurrency.Session } // NewOwnerManager creates a new Manager. func NewOwnerManager(ctx context.Context, etcdCli *clientv3.Client, prompt, id, key string) Manager { logPrefix := fmt.Sprintf("[%s] %s ownerManager %s", prompt, key, id) - ctx, cancelFunc := context.WithCancel(ctx) return &ownerManager{ etcdCli: etcdCli, id: id, key: key, ctx: ctx, prompt: prompt, - cancel: cancelFunc, logPrefix: logPrefix, logCtx: logutil.WithKeyValue(context.Background(), "owner info", logPrefix), sessionLease: atomicutil.NewInt64(0), @@ -166,15 +170,10 @@ func (m *ownerManager) IsOwner() bool { return m.elec.Load() != nil } -// Cancel implements Manager.Cancel interface. -func (m *ownerManager) Cancel() { - m.cancel() - m.wg.Wait() -} - -// RequireOwner implements Manager.RequireOwner interface. -func (*ownerManager) RequireOwner(_ context.Context) error { - return nil +// Close implements Manager.Close interface. +func (m *ownerManager) Close() { + // same as CampaignCancel + m.CampaignCancel() } func (m *ownerManager) SetListener(listener Listener) { @@ -184,12 +183,9 @@ func (m *ownerManager) SetListener(listener Listener) { func (m *ownerManager) ForceToBeOwner(context.Context) error { logPrefix := fmt.Sprintf("[%s] %s", m.prompt, m.key) logutil.BgLogger().Info("force to be owner", zap.String("ownerInfo", logPrefix)) - session, err := util2.NewSession(m.ctx, logPrefix, m.etcdCli, util2.NewSessionDefaultRetryCnt, ManagerSessionTTL) - if err != nil { + if err := m.refreshSession(util2.NewSessionDefaultRetryCnt, ManagerSessionTTL); err != nil { return errors.Trace(err) } - m.forceOwnerSession = session - m.sessionLease.Store(int64(m.forceOwnerSession.Lease())) // due to issue https://github.com/pingcap/tidb/issues/54689, if the cluster // version before upgrade don't have fix, when retire owners runs on older version @@ -209,7 +205,7 @@ func (m *ownerManager) ForceToBeOwner(context.Context) error { // we need to sleep in every retry, as other TiDB nodes will start campaign // immediately after we delete their key. time.Sleep(WaitTimeOnForceOwner) - if err = m.tryToBeOwnerOnce(); err != nil { + if err := m.tryToBeOwnerOnce(); err != nil { logutil.Logger(m.logCtx).Warn("failed to retire owner on older version", zap.Error(err)) continue } @@ -219,7 +215,7 @@ func (m *ownerManager) ForceToBeOwner(context.Context) error { } func (m *ownerManager) tryToBeOwnerOnce() error { - lease := m.forceOwnerSession.Lease() + lease := m.etcdSes.Lease() keyPrefix := m.key + "/" getResp, err := m.etcdCli.Get(m.ctx, keyPrefix, clientv3.WithPrefix()) @@ -257,7 +253,7 @@ func (m *ownerManager) tryToBeOwnerOnce() error { // the owner, so we add a timeout to avoid blocking. ctx, cancel := context.WithTimeout(m.ctx, keyOpDefaultTimeout) defer cancel() - elec := concurrency.NewElection(m.forceOwnerSession, m.key) + elec := concurrency.NewElection(m.etcdSes, m.key) if err = elec.Campaign(ctx, m.id); err != nil { return errors.Trace(err) } @@ -295,20 +291,20 @@ func (m *ownerManager) CampaignOwner(withTTL ...int) error { ttl = withTTL[0] } logPrefix := fmt.Sprintf("[%s] %s", m.prompt, m.key) - logutil.BgLogger().Info("start campaign owner", zap.String("ownerInfo", logPrefix)) - campaignSession := m.forceOwnerSession - if campaignSession == nil { - session, err := util2.NewSession(m.ctx, logPrefix, m.etcdCli, util2.NewSessionDefaultRetryCnt, ttl) - if err != nil { + if m.etcdSes == nil { + logutil.BgLogger().Info("start campaign owner", zap.String("ownerInfo", logPrefix)) + if err := m.refreshSession(util2.NewSessionDefaultRetryCnt, ttl); err != nil { return errors.Trace(err) } - m.sessionLease.Store(int64(session.Lease())) - campaignSession = session + } else { + logutil.BgLogger().Info("start campaign owner with existing session", + zap.String("ownerInfo", logPrefix), + zap.String("lease", util2.FormatLeaseID(m.etcdSes.Lease()))) } m.wg.Add(1) var campaignContext context.Context campaignContext, m.campaignCancel = context.WithCancel(m.ctx) - go m.campaignLoop(campaignContext, campaignSession) + go m.campaignLoop(campaignContext) return nil } @@ -349,11 +345,18 @@ func (m *ownerManager) RetireOwner() { // CampaignCancel implements Manager.CampaignCancel interface. func (m *ownerManager) CampaignCancel() { - m.campaignCancel() + m.BreakCampaignLoop() + m.closeSession() +} + +func (m *ownerManager) BreakCampaignLoop() { + if m.campaignCancel != nil { + m.campaignCancel() + } m.wg.Wait() } -func (m *ownerManager) campaignLoop(campaignContext context.Context, etcdSession *concurrency.Session) { +func (m *ownerManager) campaignLoop(campaignContext context.Context) { defer func() { m.campaignCancel() if r := recover(); r != nil { @@ -363,25 +366,28 @@ func (m *ownerManager) campaignLoop(campaignContext context.Context, etcdSession m.wg.Done() }() - logPrefix := m.logPrefix logCtx := m.logCtx var err error + leaseNotFoundCh := make(chan struct{}) for { if err != nil { metrics.CampaignOwnerCounter.WithLabelValues(m.prompt, err.Error()).Inc() } select { - case <-etcdSession.Done(): - logutil.Logger(logCtx).Info("etcd session is done, creates a new one") - leaseID := etcdSession.Lease() - etcdSession, err = util2.NewSession(campaignContext, logPrefix, m.etcdCli, util2.NewSessionRetryUnlimited, ManagerSessionTTL) - if err != nil { - logutil.Logger(logCtx).Info("break campaign loop, NewSession failed", zap.Error(err)) - m.revokeSession(logPrefix, leaseID) + case <-m.etcdSes.Done(): + logutil.Logger(logCtx).Info("etcd session done, refresh it") + if err2 := m.refreshSession(util2.NewSessionRetryUnlimited, ManagerSessionTTL); err2 != nil { + logutil.Logger(logCtx).Info("break campaign loop, refresh session failed", zap.Error(err2)) return } - m.sessionLease.Store(int64(etcdSession.Lease())) + case <-leaseNotFoundCh: + logutil.Logger(logCtx).Info("meet lease not found error, refresh session") + if err2 := m.refreshSession(util2.NewSessionRetryUnlimited, ManagerSessionTTL); err2 != nil { + logutil.Logger(logCtx).Info("break campaign loop, refresh session failed", zap.Error(err2)) + return + } + leaseNotFoundCh = make(chan struct{}) case <-campaignContext.Done(): failpoint.Inject("MockDelOwnerKey", func(v failpoint.Value) { if v.(string) == "delOwnerKeyAndNotOwner" { @@ -390,7 +396,6 @@ func (m *ownerManager) campaignLoop(campaignContext context.Context, etcdSession } }) logutil.Logger(logCtx).Info("break campaign loop, context is done") - m.revokeSession(logPrefix, etcdSession.Lease()) return default: } @@ -398,14 +403,11 @@ func (m *ownerManager) campaignLoop(campaignContext context.Context, etcdSession // The etcd server deletes this session's lease ID, but etcd session doesn't find it. // In this time if we do the campaign operation, the etcd server will return ErrLeaseNotFound. if terror.ErrorEqual(err, rpctypes.ErrLeaseNotFound) { - if etcdSession != nil { - err = etcdSession.Close() - logutil.Logger(logCtx).Info("etcd session encounters the error of lease not found, closes it", zap.Error(err)) - } + close(leaseNotFoundCh) continue } - elec := concurrency.NewElection(etcdSession, m.key) + elec := concurrency.NewElection(m.etcdSes, m.key) err = elec.Campaign(campaignContext, m.id) if err != nil { logutil.Logger(logCtx).Info("failed to campaign", zap.Error(err)) @@ -418,16 +420,42 @@ func (m *ownerManager) campaignLoop(campaignContext context.Context, etcdSession } m.toBeOwner(elec) - err = m.watchOwner(campaignContext, etcdSession, ownerKey, currRev) + err = m.watchOwner(campaignContext, m.etcdSes, ownerKey, currRev) logutil.Logger(logCtx).Info("watch owner finished", zap.Error(err)) m.RetireOwner() metrics.CampaignOwnerCounter.WithLabelValues(m.prompt, metrics.NoLongerOwner).Inc() - logutil.Logger(logCtx).Warn("is not the owner") + logutil.Logger(logCtx).Info("is not the owner") + } +} + +func (m *ownerManager) closeSession() { + if m.etcdSes != nil { + if err := m.etcdSes.Close(); err != nil { + logutil.Logger(m.logCtx).Info("etcd session close failed", zap.Error(err)) + } + m.etcdSes = nil + } +} + +func (m *ownerManager) refreshSession(retryCnt, ttl int) error { + m.closeSession() + // Note: we must use manager's context to create session. If we use campaign + // context and the context is cancelled, the created session cannot be closed + // as session close depends on the context. + // One drawback is that when you want to break the campaign loop, and the campaign + // loop is refreshing the session, it might wait for a long time to return, it + // should be fine as long as network is ok, and acceptable to wait when not. + sess, err2 := util2.NewSession(m.ctx, m.logPrefix, m.etcdCli, retryCnt, ttl) + if err2 != nil { + return errors.Trace(err2) } + m.etcdSes = sess + m.sessionLease.Store(int64(m.etcdSes.Lease())) + return nil } -func (m *ownerManager) revokeSession(_ string, leaseID clientv3.LeaseID) { +func (m *ownerManager) revokeSession(leaseID clientv3.LeaseID) { // Revoke the session lease. // If revoke takes longer than the ttl, lease is expired anyway. cancelCtx, cancel := context.WithTimeout(context.Background(), diff --git a/pkg/owner/manager_test.go b/pkg/owner/manager_test.go index 2955c550f82a7..61b5be6fceadf 100644 --- a/pkg/owner/manager_test.go +++ b/pkg/owner/manager_test.go @@ -25,13 +25,8 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" - . "github.com/pingcap/tidb/pkg/ddl" - "github.com/pingcap/tidb/pkg/infoschema" - "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/owner" "github.com/pingcap/tidb/pkg/parser/terror" - "github.com/pingcap/tidb/pkg/store/mockstore" - "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/logutil" "github.com/stretchr/testify/require" @@ -42,44 +37,20 @@ import ( "golang.org/x/exp/rand" ) -const testLease = 5 * time.Millisecond - type testInfo struct { - store kv.Storage cluster *integration.ClusterV3 client *clientv3.Client - ddl DDL } func newTestInfo(t *testing.T) *testInfo { - store, err := mockstore.NewMockStore() - require.NoError(t, err) - cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 4}) - - cli := cluster.Client(0) - ic := infoschema.NewCache(nil, 2) - ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 0), 0) - d, _ := NewDDL( - context.Background(), - WithEtcdClient(cli), - WithStore(store), - WithLease(testLease), - WithInfoCache(ic), - ) - + cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) return &testInfo{ - store: store, cluster: cluster, - client: cli, - ddl: d, + client: cluster.Client(0), } } func (ti *testInfo) Close(t *testing.T) { - err := ti.ddl.Stop() - require.NoError(t, err) - err = ti.store.Close() - require.NoError(t, err) ti.cluster.Terminate(t) } @@ -101,41 +72,42 @@ func TestSingle(t *testing.T) { integration.BeforeTestExternal(t) tInfo := newTestInfo(t) - client, d := tInfo.client, tInfo.ddl + client := tInfo.client defer tInfo.Close(t) - ownerManager := d.OwnerManager() + ownerMgr := owner.NewOwnerManager(context.Background(), client, "ddl", "1", "/owner/key") lis := &listener{} - ownerManager.SetListener(lis) - require.NoError(t, ownerManager.CampaignOwner()) - isOwner := checkOwner(d, true) + ownerMgr.SetListener(lis) + require.NoError(t, ownerMgr.CampaignOwner()) + isOwner := checkOwner(ownerMgr, true) require.True(t, isOwner) require.True(t, lis.val.Load()) // test for newSession failed ctx := context.Background() ctx, cancel := context.WithCancel(ctx) - manager := owner.NewOwnerManager(ctx, client, "ddl", "ddl_id", DDLOwnerKey) + ownerMgr2 := owner.NewOwnerManager(ctx, client, "ddl", "2", "/owner/key") + defer ownerMgr2.Close() cancel() - err := manager.CampaignOwner() + err := ownerMgr2.CampaignOwner() comment := fmt.Sprintf("campaigned result don't match, err %v", err) require.True(t, terror.ErrorEqual(err, context.Canceled) || terror.ErrorEqual(err, context.DeadlineExceeded), comment) - isOwner = checkOwner(d, true) + isOwner = checkOwner(ownerMgr, true) require.True(t, isOwner) // The test is used to exit campaign loop. - ownerManager.Cancel() - isOwner = checkOwner(d, false) + ownerMgr.Close() + isOwner = checkOwner(ownerMgr, false) require.False(t, isOwner) require.False(t, lis.val.Load()) time.Sleep(200 * time.Millisecond) // err is ok to be not nil since we canceled the manager. - ownerID, _ := manager.GetOwnerID(ctx) + ownerID, _ := ownerMgr2.GetOwnerID(ctx) require.Equal(t, "", ownerID) - op, _ := owner.GetOwnerOpValue(ctx, client, DDLOwnerKey, "log prefix") + op, _ := owner.GetOwnerOpValue(ctx, client, "/owner/key", "log prefix") require.Equal(t, op, owner.OpNone) } @@ -148,37 +120,38 @@ func TestSetAndGetOwnerOpValue(t *testing.T) { tInfo := newTestInfo(t) defer tInfo.Close(t) - require.NoError(t, tInfo.ddl.OwnerManager().CampaignOwner()) - isOwner := checkOwner(tInfo.ddl, true) + ownerMgr := owner.NewOwnerManager(context.Background(), tInfo.client, "ddl", "1", "/owner/key") + defer ownerMgr.Close() + require.NoError(t, ownerMgr.CampaignOwner()) + isOwner := checkOwner(ownerMgr, true) require.True(t, isOwner) // test set/get owner info - manager := tInfo.ddl.OwnerManager() - ownerID, err := manager.GetOwnerID(context.Background()) + ownerID, err := ownerMgr.GetOwnerID(context.Background()) require.NoError(t, err) - require.Equal(t, tInfo.ddl.GetID(), ownerID) - op, err := owner.GetOwnerOpValue(context.Background(), tInfo.client, DDLOwnerKey, "log prefix") + require.Equal(t, ownerMgr.ID(), ownerID) + op, err := owner.GetOwnerOpValue(context.Background(), tInfo.client, "/owner/key", "log prefix") require.NoError(t, err) require.Equal(t, op, owner.OpNone) require.False(t, op.IsSyncedUpgradingState()) - err = manager.SetOwnerOpValue(context.Background(), owner.OpSyncUpgradingState) + err = ownerMgr.SetOwnerOpValue(context.Background(), owner.OpSyncUpgradingState) require.NoError(t, err) - op, err = owner.GetOwnerOpValue(context.Background(), tInfo.client, DDLOwnerKey, "log prefix") + op, err = owner.GetOwnerOpValue(context.Background(), tInfo.client, "/owner/key", "log prefix") require.NoError(t, err) require.Equal(t, op, owner.OpSyncUpgradingState) require.True(t, op.IsSyncedUpgradingState()) // update the same as the original value - err = manager.SetOwnerOpValue(context.Background(), owner.OpSyncUpgradingState) + err = ownerMgr.SetOwnerOpValue(context.Background(), owner.OpSyncUpgradingState) require.NoError(t, err) - op, err = owner.GetOwnerOpValue(context.Background(), tInfo.client, DDLOwnerKey, "log prefix") + op, err = owner.GetOwnerOpValue(context.Background(), tInfo.client, "/owner/key", "log prefix") require.NoError(t, err) require.Equal(t, op, owner.OpSyncUpgradingState) require.True(t, op.IsSyncedUpgradingState()) // test del owner key when SetOwnerOpValue require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/owner/MockDelOwnerKey", `return("delOwnerKeyAndNotOwner")`)) - err = manager.SetOwnerOpValue(context.Background(), owner.OpNone) + err = ownerMgr.SetOwnerOpValue(context.Background(), owner.OpNone) require.Error(t, err, "put owner key failed, cmp is false") - op, err = owner.GetOwnerOpValue(context.Background(), tInfo.client, DDLOwnerKey, "log prefix") + op, err = owner.GetOwnerOpValue(context.Background(), tInfo.client, "/owner/key", "log prefix") require.NotNil(t, err) require.Equal(t, concurrency.ErrElectionNoLeader.Error(), err.Error()) require.Equal(t, op, owner.OpNone) @@ -186,17 +159,17 @@ func TestSetAndGetOwnerOpValue(t *testing.T) { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/owner/MockDelOwnerKey")) // Let ddl run for the owner again. - require.NoError(t, tInfo.ddl.OwnerManager().CampaignOwner()) - isOwner = checkOwner(tInfo.ddl, true) + require.NoError(t, ownerMgr.CampaignOwner()) + isOwner = checkOwner(ownerMgr, true) require.True(t, isOwner) // Mock the manager become not owner because the owner is deleted(like TTL is timeout). // And then the manager campaigns the owner again, and become the owner. require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/owner/MockDelOwnerKey", `return("onlyDelOwnerKey")`)) - err = manager.SetOwnerOpValue(context.Background(), owner.OpSyncUpgradingState) + err = ownerMgr.SetOwnerOpValue(context.Background(), owner.OpSyncUpgradingState) require.Error(t, err, "put owner key failed, cmp is false") - isOwner = checkOwner(tInfo.ddl, true) + isOwner = checkOwner(ownerMgr, true) require.True(t, isOwner) - op, err = owner.GetOwnerOpValue(context.Background(), tInfo.client, DDLOwnerKey, "log prefix") + op, err = owner.GetOwnerOpValue(context.Background(), tInfo.client, "/owner/key", "log prefix") require.NoError(t, err) require.Equal(t, op, owner.OpNone) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/owner/MockDelOwnerKey")) @@ -209,24 +182,23 @@ func TestGetOwnerOpValueBeforeSet(t *testing.T) { } require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/owner/MockNotSetOwnerOp", `return(true)`)) - _, dom := testkit.CreateMockStoreAndDomain(t) - ddl := dom.DDL() - require.NoError(t, ddl.OwnerManager().CampaignOwner()) - isOwner := checkOwner(ddl, true) + ownerMgr := owner.NewMockManager(context.Background(), "1", nil, "/owner/key") + defer ownerMgr.Close() + require.NoError(t, ownerMgr.CampaignOwner()) + isOwner := checkOwner(ownerMgr, true) require.True(t, isOwner) // test set/get owner info - manager := ddl.OwnerManager() - ownerID, err := manager.GetOwnerID(context.Background()) + ownerID, err := ownerMgr.GetOwnerID(context.Background()) require.NoError(t, err) - require.Equal(t, ddl.GetID(), ownerID) - op, err := owner.GetOwnerOpValue(context.Background(), nil, DDLOwnerKey, "log prefix") + require.Equal(t, ownerMgr.ID(), ownerID) + op, err := owner.GetOwnerOpValue(context.Background(), nil, "/owner/key", "log prefix") require.NoError(t, err) require.Equal(t, op, owner.OpNone) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/owner/MockNotSetOwnerOp")) - err = manager.SetOwnerOpValue(context.Background(), owner.OpSyncUpgradingState) + err = ownerMgr.SetOwnerOpValue(context.Background(), owner.OpSyncUpgradingState) require.NoError(t, err) - op, err = owner.GetOwnerOpValue(context.Background(), nil, DDLOwnerKey, "log prefix") + op, err = owner.GetOwnerOpValue(context.Background(), nil, "/owner/key", "log prefix") require.NoError(t, err) require.Equal(t, op, owner.OpSyncUpgradingState) } @@ -244,62 +216,43 @@ func TestCluster(t *testing.T) { }() tInfo := newTestInfo(t) - store, cluster, d := tInfo.store, tInfo.cluster, tInfo.ddl defer tInfo.Close(t) - require.NoError(t, d.OwnerManager().CampaignOwner()) + ownerMgr := owner.NewOwnerManager(context.Background(), tInfo.client, "ddl", "1", "/owner/key") + require.NoError(t, ownerMgr.CampaignOwner()) - isOwner := checkOwner(d, true) + isOwner := checkOwner(ownerMgr, true) require.True(t, isOwner) - cli1 := cluster.Client(1) - ic2 := infoschema.NewCache(nil, 2) - ic2.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 0), 0) - d1, _ := NewDDL( - context.Background(), - WithEtcdClient(cli1), - WithStore(store), - WithLease(testLease), - WithInfoCache(ic2), - ) - require.NoError(t, d1.OwnerManager().CampaignOwner()) - - isOwner = checkOwner(d1, false) + ownerMgr2 := owner.NewOwnerManager(context.Background(), tInfo.client, "ddl", "2", "/owner/key") + require.NoError(t, ownerMgr2.CampaignOwner()) + + isOwner = checkOwner(ownerMgr2, false) require.False(t, isOwner) // Delete the leader key, the d1 become the owner. - cliRW := cluster.Client(2) - err := deleteLeader(cliRW, DDLOwnerKey) + err := deleteLeader(tInfo.client, "/owner/key") require.NoError(t, err) - isOwner = checkOwner(d, false) + isOwner = checkOwner(ownerMgr, false) require.False(t, isOwner) - d.OwnerManager().Cancel() + ownerMgr.Close() // d3 (not owner) stop - cli3 := cluster.Client(3) - ic3 := infoschema.NewCache(nil, 2) - ic3.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 0), 0) - d3, _ := NewDDL( - context.Background(), - WithEtcdClient(cli3), - WithStore(store), - WithLease(testLease), - WithInfoCache(ic3), - ) - require.NoError(t, d3.OwnerManager().CampaignOwner()) - - isOwner = checkOwner(d3, false) + ownerMgr3 := owner.NewOwnerManager(context.Background(), tInfo.client, "ddl", "3", "/owner/key") + require.NoError(t, ownerMgr3.CampaignOwner()) + + isOwner = checkOwner(ownerMgr3, false) require.False(t, isOwner) - d3.OwnerManager().Cancel() + ownerMgr3.Close() // Cancel the owner context, there is no owner. - d1.OwnerManager().Cancel() + ownerMgr2.Close() - logPrefix := fmt.Sprintf("[ddl] %s ownerManager %s", DDLOwnerKey, "useless id") + logPrefix := fmt.Sprintf("[ddl] %s ownerManager %s", "/owner/key", "useless id") logCtx := logutil.WithKeyValue(context.Background(), "owner info", logPrefix) - _, _, err = owner.GetOwnerKeyInfo(context.Background(), logCtx, cliRW, DDLOwnerKey, "useless id") + _, _, err = owner.GetOwnerKeyInfo(context.Background(), logCtx, tInfo.client, "/owner/key", "useless id") require.Truef(t, terror.ErrorEqual(err, concurrency.ErrElectionNoLeader), "get owner info result don't match, err %v", err) - op, err := owner.GetOwnerOpValue(context.Background(), cliRW, DDLOwnerKey, logPrefix) + op, err := owner.GetOwnerOpValue(context.Background(), tInfo.client, "/owner/key", logPrefix) require.Truef(t, terror.ErrorEqual(err, concurrency.ErrElectionNoLeader), "get owner info result don't match, err %v", err) require.Equal(t, op, owner.OpNone) } @@ -311,18 +264,19 @@ func TestWatchOwner(t *testing.T) { integration.BeforeTestExternal(t) tInfo := newTestInfo(t) - client, d := tInfo.client, tInfo.ddl + client := tInfo.client defer tInfo.Close(t) - ownerManager := d.OwnerManager() + ownerMgr := owner.NewOwnerManager(context.Background(), client, "ddl", "1", "/owner/key") + defer ownerMgr.Close() lis := &listener{} - ownerManager.SetListener(lis) - require.NoError(t, ownerManager.CampaignOwner()) - isOwner := checkOwner(d, true) + ownerMgr.SetListener(lis) + require.NoError(t, ownerMgr.CampaignOwner()) + isOwner := checkOwner(ownerMgr, true) require.True(t, isOwner) // get the owner id. ctx := context.Background() - id, err := ownerManager.GetOwnerID(ctx) + id, err := ownerMgr.GetOwnerID(ctx) require.NoError(t, err) // create etcd session. @@ -330,7 +284,7 @@ func TestWatchOwner(t *testing.T) { require.NoError(t, err) // test the GetOwnerKeyInfo() - ownerKey, currRevision, err := owner.GetOwnerKeyInfo(ctx, context.TODO(), client, DDLOwnerKey, id) + ownerKey, currRevision, err := owner.GetOwnerKeyInfo(ctx, context.TODO(), client, "/owner/key", id) require.NoError(t, err) // watch the ownerKey. @@ -339,7 +293,7 @@ func TestWatchOwner(t *testing.T) { watchDone := make(chan bool) watched := false go func() { - watchErr := owner.WatchOwnerForTest(ctx, ownerManager, session, ownerKey, currRevision) + watchErr := owner.WatchOwnerForTest(ctx, ownerMgr, session, ownerKey, currRevision) require.NoError(t, watchErr) watchDone <- true }() @@ -351,14 +305,14 @@ func TestWatchOwner(t *testing.T) { require.False(t, watched) // delete the owner, and can watch the DELETE event. - err = deleteLeader(client, DDLOwnerKey) + err = deleteLeader(client, "/owner/key") require.NoError(t, err) watched = <-watchDone require.True(t, watched) // the ownerKey has been deleted, watch ownerKey again, it can be watched. go func() { - watchErr := owner.WatchOwnerForTest(ctx, ownerManager, session, ownerKey, currRevision) + watchErr := owner.WatchOwnerForTest(ctx, ownerMgr, session, ownerKey, currRevision) require.NoError(t, watchErr) watchDone <- true }() @@ -374,47 +328,47 @@ func TestWatchOwnerAfterDeleteOwnerKey(t *testing.T) { integration.BeforeTestExternal(t) tInfo := newTestInfo(t) - client, d := tInfo.client, tInfo.ddl + client := tInfo.client defer tInfo.Close(t) - ownerManager := d.OwnerManager() + ownerMgr := owner.NewOwnerManager(context.Background(), client, "ddl", "1", "/owner/key") + defer ownerMgr.Close() lis := &listener{} - ownerManager.SetListener(lis) - require.NoError(t, ownerManager.CampaignOwner()) - isOwner := checkOwner(d, true) + ownerMgr.SetListener(lis) + require.NoError(t, ownerMgr.CampaignOwner()) + isOwner := checkOwner(ownerMgr, true) require.True(t, isOwner) // get the owner id. ctx := context.Background() - id, err := ownerManager.GetOwnerID(ctx) + id, err := ownerMgr.GetOwnerID(ctx) require.NoError(t, err) session, err := concurrency.NewSession(client) require.NoError(t, err) // get the ownkey informations. - ownerKey, currRevision, err := owner.GetOwnerKeyInfo(ctx, context.TODO(), client, DDLOwnerKey, id) + ownerKey, currRevision, err := owner.GetOwnerKeyInfo(ctx, context.TODO(), client, "/owner/key", id) require.NoError(t, err) // delete the ownerkey - err = deleteLeader(client, DDLOwnerKey) + err = deleteLeader(client, "/owner/key") require.NoError(t, err) // watch the ownerKey with the current revisoin. watchDone := make(chan bool) go func() { - watchErr := owner.WatchOwnerForTest(ctx, ownerManager, session, ownerKey, currRevision) + watchErr := owner.WatchOwnerForTest(ctx, ownerMgr, session, ownerKey, currRevision) require.NoError(t, watchErr) watchDone <- true }() <-watchDone } -func checkOwner(d DDL, fbVal bool) (isOwner bool) { - manager := d.OwnerManager() +func checkOwner(ownerMgr owner.Manager, fbVal bool) (isOwner bool) { // The longest to wait for 30 seconds to // make sure that campaigning owners is completed. for i := 0; i < 6000; i++ { time.Sleep(5 * time.Millisecond) - isOwner = manager.IsOwner() + isOwner = ownerMgr.IsOwner() if isOwner == fbVal { break } @@ -446,13 +400,13 @@ func TestImmediatelyCancel(t *testing.T) { integration.BeforeTestExternal(t) tInfo := newTestInfo(t) - d := tInfo.ddl defer tInfo.Close(t) - ownerManager := d.OwnerManager() + ownerMgr := owner.NewOwnerManager(context.Background(), tInfo.client, "ddl", "1", "/owner/key") + defer ownerMgr.Close() for i := 0; i < 10; i++ { - err := ownerManager.CampaignOwner() + err := ownerMgr.CampaignOwner() require.NoError(t, err) - ownerManager.CampaignCancel() + ownerMgr.CampaignCancel() } } diff --git a/pkg/owner/mock.go b/pkg/owner/mock.go index f653b51e0e119..6ea2c8f6b3baa 100644 --- a/pkg/owner/mock.go +++ b/pkg/owner/mock.go @@ -106,8 +106,8 @@ func (m *mockManager) RetireOwner() { } } -// Cancel implements Manager.Cancel interface. -func (m *mockManager) Cancel() { +// Close implements Manager.Close interface. +func (m *mockManager) Close() { m.cancel() m.wg.Wait() logutil.BgLogger().Info("owner manager is canceled", @@ -171,11 +171,6 @@ func (m *mockManager) ResignOwner(_ context.Context) error { return nil } -// RequireOwner implements Manager.RequireOwner interface. -func (*mockManager) RequireOwner(context.Context) error { - return nil -} - // SetListener implements Manager.SetListener interface. func (m *mockManager) SetListener(listener Listener) { m.listener = listener @@ -190,6 +185,13 @@ func (m *mockManager) CampaignCancel() { m.campaignDone <- struct{}{} } +func (m *mockManager) BreakCampaignLoop() { + // in uni-store which mostly used in test, there is no need to make sure the + // campaign session is created once, so we can just call Close, but it DOES violate + // the contract of Manager interface. + m.Close() +} + func mockDelOwnerKey(mockCal, ownerKey string, m *ownerManager) error { checkIsOwner := func(m *ownerManager, checkTrue bool) error { // 5s diff --git a/pkg/session/BUILD.bazel b/pkg/session/BUILD.bazel index 51c5b5e23c739..cb05016d148c1 100644 --- a/pkg/session/BUILD.bazel +++ b/pkg/session/BUILD.bazel @@ -82,6 +82,7 @@ go_library( "//pkg/statistics/handle/syncload", "//pkg/statistics/handle/usage", "//pkg/statistics/handle/usage/indexusage", + "//pkg/store", "//pkg/store/driver/error", "//pkg/store/helper", "//pkg/store/mockstore", diff --git a/pkg/session/bootstrap.go b/pkg/session/bootstrap.go index eaa5779a47126..9845605414ec8 100644 --- a/pkg/session/bootstrap.go +++ b/pkg/session/bootstrap.go @@ -47,6 +47,7 @@ import ( "github.com/pingcap/tidb/pkg/parser/terror" sessiontypes "github.com/pingcap/tidb/pkg/session/types" "github.com/pingcap/tidb/pkg/sessionctx/variable" + storepkg "github.com/pingcap/tidb/pkg/store" "github.com/pingcap/tidb/pkg/table/tables" timertable "github.com/pingcap/tidb/pkg/timer/tablestore" "github.com/pingcap/tidb/pkg/types" @@ -1438,7 +1439,7 @@ var ( ) func acquireLock(store kv.Storage) (func(), error) { - etcdCli, err := domain.NewEtcdCli(store) + etcdCli, err := storepkg.NewEtcdCli(store) if err != nil { return nil, errors.Trace(err) } diff --git a/pkg/store/BUILD.bazel b/pkg/store/BUILD.bazel index a62f479d4d58e..895eed9005d9c 100644 --- a/pkg/store/BUILD.bazel +++ b/pkg/store/BUILD.bazel @@ -2,15 +2,23 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "store", - srcs = ["store.go"], + srcs = [ + "etcd.go", + "store.go", + ], importpath = "github.com/pingcap/tidb/pkg/store", visibility = ["//visibility:public"], deps = [ + "//pkg/config", "//pkg/kv", "//pkg/util", "//pkg/util/logutil", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_kvproto//pkg/pdpb", + "@io_etcd_go_etcd_client_v3//:client", + "@org_golang_google_grpc//:grpc", + "@org_golang_google_grpc//backoff", + "@org_golang_google_grpc//keepalive", "@org_uber_go_zap//:zap", ], ) @@ -20,12 +28,13 @@ go_test( timeout = "short", srcs = [ "batch_coprocessor_test.go", + "etcd_test.go", "main_test.go", "store_test.go", ], embed = [":store"], flaky = True, - shard_count = 23, + shard_count = 24, deps = [ "//pkg/domain", "//pkg/kv", diff --git a/pkg/store/driver/BUILD.bazel b/pkg/store/driver/BUILD.bazel index a220d9fb786a4..46d90bca7ab50 100644 --- a/pkg/store/driver/BUILD.bazel +++ b/pkg/store/driver/BUILD.bazel @@ -45,6 +45,8 @@ go_test( flaky = True, shard_count = 8, deps = [ + "//pkg/config", + "//pkg/ddl", "//pkg/domain", "//pkg/kv", "//pkg/meta/model", diff --git a/pkg/store/driver/main_test.go b/pkg/store/driver/main_test.go index ca425561c5245..22505e72c772f 100644 --- a/pkg/store/driver/main_test.go +++ b/pkg/store/driver/main_test.go @@ -20,6 +20,8 @@ import ( "fmt" "testing" + "github.com/pingcap/tidb/pkg/config" + "github.com/pingcap/tidb/pkg/ddl" "github.com/pingcap/tidb/pkg/domain" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/session" @@ -61,7 +63,8 @@ func createTiKVStore(t *testing.T) (kv.Storage, *domain.Domain) { var d TiKVDriver store, err := d.Open(fmt.Sprintf("tikv://%s", *pdAddrs)) require.NoError(t, err) - + config.GetGlobalConfig().Store = "tikv" + require.NoError(t, ddl.StartOwnerManager(context.Background(), store)) // clear storage txn, err := store.Begin() require.NoError(t, err) @@ -80,6 +83,7 @@ func createTiKVStore(t *testing.T) (kv.Storage, *domain.Domain) { t.Cleanup(func() { dom.Close() + ddl.CloseOwnerManager() require.NoError(t, store.Close()) }) diff --git a/pkg/store/etcd.go b/pkg/store/etcd.go new file mode 100644 index 0000000000000..dc53a23abcb16 --- /dev/null +++ b/pkg/store/etcd.go @@ -0,0 +1,85 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package store + +import ( + "time" + + "github.com/pingcap/tidb/pkg/config" + "github.com/pingcap/tidb/pkg/kv" + clientv3 "go.etcd.io/etcd/client/v3" + "go.uber.org/zap" + "google.golang.org/grpc" + "google.golang.org/grpc/backoff" + "google.golang.org/grpc/keepalive" +) + +// NewEtcdCli creates a new clientv3.Client from store if the store support it. +// the returned client might be nil. +// TODO currently uni-store/mock-tikv/tikv all implements EtcdBackend while they don't support actually. +// refactor this part. +func NewEtcdCli(store kv.Storage) (*clientv3.Client, error) { + etcdStore, addrs, err := GetEtcdAddrs(store) + if err != nil { + return nil, err + } + if len(addrs) == 0 { + return nil, nil + } + cli, err := NewEtcdCliWithAddrs(addrs, etcdStore) + if err != nil { + return nil, err + } + return cli, nil +} + +// GetEtcdAddrs gets the etcd addrs from store if the store support it. +func GetEtcdAddrs(store kv.Storage) (kv.EtcdBackend, []string, error) { + etcdStore, ok := store.(kv.EtcdBackend) + if !ok { + return nil, nil, nil + } + addrs, err := etcdStore.EtcdAddrs() + if err != nil { + return nil, nil, err + } + return etcdStore, addrs, nil +} + +// NewEtcdCliWithAddrs creates a new clientv3.Client with specified addrs and etcd backend. +func NewEtcdCliWithAddrs(addrs []string, ebd kv.EtcdBackend) (*clientv3.Client, error) { + cfg := config.GetGlobalConfig() + etcdLogCfg := zap.NewProductionConfig() + etcdLogCfg.Level = zap.NewAtomicLevelAt(zap.ErrorLevel) + backoffCfg := backoff.DefaultConfig + backoffCfg.MaxDelay = 3 * time.Second + cli, err := clientv3.New(clientv3.Config{ + LogConfig: &etcdLogCfg, + Endpoints: addrs, + AutoSyncInterval: 30 * time.Second, + DialTimeout: 5 * time.Second, + DialOptions: []grpc.DialOption{ + grpc.WithConnectParams(grpc.ConnectParams{ + Backoff: backoffCfg, + }), + grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: time.Duration(cfg.TiKVClient.GrpcKeepAliveTime) * time.Second, + Timeout: time.Duration(cfg.TiKVClient.GrpcKeepAliveTimeout) * time.Second, + }), + }, + TLS: ebd.TLSConfig(), + }) + return cli, err +} diff --git a/pkg/store/etcd_test.go b/pkg/store/etcd_test.go new file mode 100644 index 0000000000000..ef83a6596b676 --- /dev/null +++ b/pkg/store/etcd_test.go @@ -0,0 +1,48 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package store + +import ( + "testing" + + "github.com/pingcap/tidb/pkg/kv" + "github.com/stretchr/testify/require" +) + +type mockEtcdBackend struct { + kv.Storage + kv.EtcdBackend + pdAddrs []string +} + +func (mebd *mockEtcdBackend) EtcdAddrs() ([]string, error) { + return mebd.pdAddrs, nil +} + +func TestNewEtcdCliGetEtcdAddrs(t *testing.T) { + etcdStore, addrs, err := GetEtcdAddrs(nil) + require.NoError(t, err) + require.Empty(t, addrs) + require.Nil(t, etcdStore) + + etcdStore, addrs, err = GetEtcdAddrs(&mockEtcdBackend{pdAddrs: []string{"localhost:2379"}}) + require.NoError(t, err) + require.Equal(t, []string{"localhost:2379"}, addrs) + require.NotNil(t, etcdStore) + + cli, err := NewEtcdCli(nil) + require.NoError(t, err) + require.Nil(t, cli) +} diff --git a/pkg/testkit/BUILD.bazel b/pkg/testkit/BUILD.bazel index 90f87f5d49084..93e29f8184c73 100644 --- a/pkg/testkit/BUILD.bazel +++ b/pkg/testkit/BUILD.bazel @@ -14,6 +14,8 @@ go_library( importpath = "github.com/pingcap/tidb/pkg/testkit", visibility = ["//visibility:public"], deps = [ + "//pkg/config", + "//pkg/ddl", "//pkg/ddl/schematracker", "//pkg/domain", "//pkg/expression", diff --git a/pkg/testkit/mockstore.go b/pkg/testkit/mockstore.go index 30e6ca06ced28..4f1a23c5cb521 100644 --- a/pkg/testkit/mockstore.go +++ b/pkg/testkit/mockstore.go @@ -25,6 +25,8 @@ import ( "testing" "time" + "github.com/pingcap/tidb/pkg/config" + "github.com/pingcap/tidb/pkg/ddl" "github.com/pingcap/tidb/pkg/ddl/schematracker" "github.com/pingcap/tidb/pkg/domain" "github.com/pingcap/tidb/pkg/kv" @@ -53,11 +55,13 @@ func CreateMockStore(t testing.TB, opts ...mockstore.MockTiKVStoreOption) kv.Sto var err error store, err := d.Open("tikv://" + *WithTiKV) require.NoError(t, err) - + config.GetGlobalConfig().Store = "tikv" + require.NoError(t, ddl.StartOwnerManager(context.Background(), store)) var dom *domain.Domain dom, err = session.BootstrapSession(store) t.Cleanup(func() { dom.Close() + ddl.CloseOwnerManager() err := store.Close() require.NoError(t, err) view.Stop() @@ -154,16 +158,6 @@ func (d *DistExecutionContext) TriggerOwnerChange() { } } -// AddDomain add 1 domain which is not ddl owner. -func (d *DistExecutionContext) AddDomain() { - d.mu.Lock() - defer d.mu.Unlock() - dom := bootstrap4DistExecution(d.t, d.Store, 500*time.Millisecond) - dom.InfoSyncer().SetSessionManager(d.domains[0].InfoSyncer().GetSessionManager()) - dom.DDL().OwnerManager().RetireOwner() - d.domains = append(d.domains, dom) -} - // Close cleanup running goroutines, release resources used. func (d *DistExecutionContext) Close() { d.t.Cleanup(func() { @@ -191,11 +185,6 @@ func (d *DistExecutionContext) GetDomain(idx int) *domain.Domain { return d.domains[idx] } -// GetDomainCnt get domain count. -func (d *DistExecutionContext) GetDomainCnt() int { - return len(d.domains) -} - // NewDistExecutionContext create DistExecutionContext for testing. func NewDistExecutionContext(t testing.TB, serverNum int) *DistExecutionContext { return NewDistExecutionContextWithLease(t, serverNum, 500*time.Millisecond) diff --git a/pkg/util/etcd.go b/pkg/util/etcd.go index dd8597c1e7f59..060947744d562 100644 --- a/pkg/util/etcd.go +++ b/pkg/util/etcd.go @@ -16,6 +16,7 @@ package util import ( "context" + "fmt" "math" "time" @@ -101,3 +102,9 @@ func contextDone(ctx context.Context, err error) error { return nil } + +// FormatLeaseID formats lease id to hex string as what etcdctl does. +// see https://github.com/etcd-io/etcd/blob/995027f5c1363404e86f7a858ea2833df01f0954/etcdctl/ctlv3/command/printer_simple.go#L118 +func FormatLeaseID(id clientv3.LeaseID) string { + return fmt.Sprintf("%016x", id) +} diff --git a/tests/realtikvtest/BUILD.bazel b/tests/realtikvtest/BUILD.bazel index 9a50f776291e2..6d7f294c9fd8f 100644 --- a/tests/realtikvtest/BUILD.bazel +++ b/tests/realtikvtest/BUILD.bazel @@ -7,6 +7,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/config", + "//pkg/ddl", "//pkg/domain", "//pkg/kv", "//pkg/session", diff --git a/tests/realtikvtest/testkit.go b/tests/realtikvtest/testkit.go index 89415ef8501d8..7c05c134dab5e 100644 --- a/tests/realtikvtest/testkit.go +++ b/tests/realtikvtest/testkit.go @@ -17,6 +17,7 @@ package realtikvtest import ( + "context" "flag" "fmt" "strings" @@ -25,6 +26,7 @@ import ( "time" "github.com/pingcap/tidb/pkg/config" + "github.com/pingcap/tidb/pkg/ddl" "github.com/pingcap/tidb/pkg/domain" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/session" @@ -120,15 +122,21 @@ func CreateMockStoreAndDomainAndSetup(t *testing.T, opts ...mockstore.MockTiKVSt if *WithRealTiKV { var d driver.TiKVDriver + storeBak := config.GetGlobalConfig().Store config.UpdateGlobal(func(conf *config.Config) { conf.TxnLocalLatches.Enabled = false conf.KeyspaceName = *KeyspaceName + conf.Store = "tikv" }) store, err = d.Open(*TiKVPath) require.NoError(t, err) - + require.NoError(t, ddl.StartOwnerManager(context.Background(), store)) dom, err = session.BootstrapSession(store) require.NoError(t, err) + // TestGetTSFailDirtyState depends on the dirty state to work, i.e. some + // special branch on uni-store, else it causes DATA RACE, so we need to switch + // back to make sure it works, see https://github.com/pingcap/tidb/issues/57221 + config.GetGlobalConfig().Store = storeBak sm := testkit.MockSessionManager{} dom.InfoSyncer().SetSessionManager(&sm) tk := testkit.NewTestKit(t, store) @@ -160,6 +168,7 @@ func CreateMockStoreAndDomainAndSetup(t *testing.T, opts ...mockstore.MockTiKVSt t.Cleanup(func() { dom.Close() + ddl.CloseOwnerManager() require.NoError(t, store.Close()) transaction.PrewriteMaxBackoff.Store(20000) view.Stop()