diff --git a/cdc/api/status.go b/cdc/api/status.go index 79d9da7537d..1678333f8e9 100644 --- a/cdc/api/status.go +++ b/cdc/api/status.go @@ -48,7 +48,7 @@ func RegisterStatusAPIRoutes(router *gin.Engine, capture *capture.Capture) { } func (h *statusAPI) writeEtcdInfo(ctx context.Context, cli *etcd.CDCEtcdClient, w io.Writer) { - resp, err := cli.Client.Get(ctx, etcd.EtcdKeyBase, clientv3.WithPrefix()) + resp, err := cli.Client.Get(ctx, etcd.BaseKey(), clientv3.WithPrefix()) if err != nil { fmt.Fprintf(w, "failed to get info: %s\n\n", err.Error()) return diff --git a/cdc/capture/capture.go b/cdc/capture/capture.go index 872f0d6c3f1..03f14c3e217 100644 --- a/cdc/capture/capture.go +++ b/cdc/capture/capture.go @@ -141,7 +141,7 @@ func (c *Capture) reset(ctx context.Context) error { _ = c.session.Close() } c.session = sess - c.election = concurrency.NewElection(sess, etcd.CaptureOwnerKey) + c.election = concurrency.NewElection(sess, etcd.CaptureOwnerKey()) if c.tableActorSystem != nil { c.tableActorSystem.Stop() @@ -412,7 +412,8 @@ func (c *Capture) runEtcdWorker( timerInterval time.Duration, role string, ) error { - etcdWorker, err := orchestrator.NewEtcdWorker(ctx.GlobalVars().EtcdClient.Client, etcd.EtcdKeyBase, reactor, reactorState) + etcdWorker, err := orchestrator.NewEtcdWorker(ctx.GlobalVars().EtcdClient.Client, + etcd.BaseKey(), reactor, reactorState) if err != nil { return errors.Trace(err) } @@ -577,7 +578,7 @@ func (c *Capture) GetOwnerCaptureInfo(ctx context.Context) (*model.CaptureInfo, return nil, err } - ownerID, err := c.EtcdClient.GetOwnerID(ctx, etcd.CaptureOwnerKey) + ownerID, err := c.EtcdClient.GetOwnerID(ctx, etcd.CaptureOwnerKey()) if err != nil { return nil, err } diff --git a/cdc/owner/changefeed_test.go b/cdc/owner/changefeed_test.go index 06cffdf3f5c..b701484572f 100644 --- a/cdc/owner/changefeed_test.go +++ b/cdc/owner/changefeed_test.go @@ -16,6 +16,7 @@ package owner import ( "context" "encoding/json" + "fmt" "io/ioutil" "math" "os" @@ -31,6 +32,7 @@ import ( "github.com/pingcap/tiflow/cdc/scheduler" "github.com/pingcap/tiflow/pkg/config" cdcContext "github.com/pingcap/tiflow/pkg/context" + "github.com/pingcap/tiflow/pkg/etcd" "github.com/pingcap/tiflow/pkg/orchestrator" "github.com/pingcap/tiflow/pkg/txnutil/gc" "github.com/pingcap/tiflow/pkg/upstream" @@ -197,7 +199,9 @@ func createChangefeed4Test(ctx cdcContext.Context, t *testing.T) ( info = ctx.ChangefeedVars().Info return info, true, nil }) - tester.MustUpdate("/tidb/cdc/capture/"+ctx.GlobalVars().CaptureInfo.ID, []byte(`{"id":"`+ctx.GlobalVars().CaptureInfo.ID+`","address":"127.0.0.1:8300"}`)) + tester.MustUpdate(fmt.Sprintf("%s/capture/%s", + etcd.DefaultClusterAndMetaPrefix, ctx.GlobalVars().CaptureInfo.ID), + []byte(`{"id":"`+ctx.GlobalVars().CaptureInfo.ID+`","address":"127.0.0.1:8300"}`)) tester.MustApplyPatches() captures := map[model.CaptureID]*model.CaptureInfo{ctx.GlobalVars().CaptureInfo.ID: ctx.GlobalVars().CaptureInfo} return cf, state, captures, tester diff --git a/cdc/owner/feed_state_manager_test.go b/cdc/owner/feed_state_manager_test.go index e9fea98d0ad..5fda9e6c3ee 100644 --- a/cdc/owner/feed_state_manager_test.go +++ b/cdc/owner/feed_state_manager_test.go @@ -14,12 +14,14 @@ package owner import ( + "fmt" "testing" "time" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/config" cdcContext "github.com/pingcap/tiflow/pkg/context" + "github.com/pingcap/tiflow/pkg/etcd" "github.com/pingcap/tiflow/pkg/orchestrator" "github.com/stretchr/testify/require" ) @@ -308,12 +310,16 @@ func TestChangefeedStatusNotExist(t *testing.T) { manager := newFeedStateManager4Test() state := orchestrator.NewChangefeedReactorState(ctx.ChangefeedVars().ID) tester := orchestrator.NewReactorStateTester(t, state, map[string]string{ - "/tidb/cdc/capture/d563bfc0-f406-4f34-bc7d-6dc2e35a44e5": ` + fmt.Sprintf("%s/capture/d563bfc0-f406-4f34-bc7d-6dc2e35a44e5", + etcd.DefaultClusterAndMetaPrefix): ` {"id":"d563bfc0-f406-4f34-bc7d-6dc2e35a44e5", "address":"172.16.6.147:8300","version":"v5.0.0-master-dirty"}`, - "/tidb/cdc/changefeed/info/" + + fmt.Sprintf("%s/changefeed/info/", + etcd.DefaultClusterAndNamespacePrefix) + ctx.ChangefeedVars().ID.ID: changefeedInfo, - "/tidb/cdc/owner/156579d017f84a68": "d563bfc0-f406-4f34-bc7d-6dc2e35a44e5", + fmt.Sprintf("%s/owner/156579d017f84a68", + etcd.DefaultClusterAndMetaPrefix, + ): "d563bfc0-f406-4f34-bc7d-6dc2e35a44e5", }) manager.Tick(state) require.False(t, manager.ShouldRunning()) diff --git a/cdc/owner/owner_test.go b/cdc/owner/owner_test.go index f2bec3bfdcf..c524a044ed8 100644 --- a/cdc/owner/owner_test.go +++ b/cdc/owner/owner_test.go @@ -275,7 +275,10 @@ func TestCheckClusterVersion(t *testing.T) { ctx, cancel := cdcContext.WithCancel(ctx) defer cancel() - tester.MustUpdate("/tidb/cdc/capture/6bbc01c8-0605-4f86-a0f9-b3119109b225", []byte(`{"id":"6bbc01c8-0605-4f86-a0f9-b3119109b225","address":"127.0.0.1:8300","version":"v6.0.0"}`)) + tester.MustUpdate(fmt.Sprintf("%s/capture/6bbc01c8-0605-4f86-a0f9-b3119109b225", + etcd.DefaultClusterAndMetaPrefix), + []byte(`{"id":"6bbc01c8-0605-4f86-a0f9-b3119109b225", +"address":"127.0.0.1:8300","version":"v6.0.0"}`)) changefeedID := model.DefaultChangeFeedID("test-changefeed") changefeedInfo := &model.ChangeFeedInfo{ @@ -296,7 +299,9 @@ func TestCheckClusterVersion(t *testing.T) { require.Nil(t, err) require.NotContains(t, owner.changefeeds, changefeedID) - tester.MustUpdate("/tidb/cdc/capture/6bbc01c8-0605-4f86-a0f9-b3119109b225", + tester.MustUpdate(fmt.Sprintf("%s/capture/6bbc01c8-0605-4f86-a0f9-b3119109b225", + etcd.DefaultClusterAndMetaPrefix, + ), []byte(`{"id":"6bbc01c8-0605-4f86-a0f9-b3119109b225","address":"127.0.0.1:8300","version":"`+ctx.GlobalVars().CaptureInfo.Version+`"}`)) // check the tick is not skipped and the changefeed will be handled normally @@ -388,7 +393,9 @@ func TestUpdateGCSafePoint(t *testing.T) { } changefeedID1 := model.DefaultChangeFeedID("test-changefeed1") tester.MustUpdate( - fmt.Sprintf("/tidb/cdc/changefeed/info/%s", changefeedID1.ID), + fmt.Sprintf("%s/changefeed/info/%s", + etcd.DefaultClusterAndNamespacePrefix, + changefeedID1.ID), []byte(`{"config":{"cyclic-replication":{}},"state":"failed"}`)) tester.MustApplyPatches() state.Changefeeds[changefeedID1].PatchStatus( @@ -429,7 +436,9 @@ func TestUpdateGCSafePoint(t *testing.T) { // add another changefeed, it must update GC safepoint. changefeedID2 := model.DefaultChangeFeedID("test-changefeed2") tester.MustUpdate( - fmt.Sprintf("/tidb/cdc/changefeed/info/%s", changefeedID2.ID), + fmt.Sprintf("%s/changefeed/info/%s", + etcd.DefaultClusterAndNamespacePrefix, + changefeedID2.ID), []byte(`{"config":{"cyclic-replication":{}},"state":"normal"}`)) tester.MustApplyPatches() state.Changefeeds[changefeedID1].PatchStatus( diff --git a/cdc/processor/manager_test.go b/cdc/processor/manager_test.go index 62dfb7f76d3..9c3e4f91161 100644 --- a/cdc/processor/manager_test.go +++ b/cdc/processor/manager_test.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/tiflow/pkg/config" cdcContext "github.com/pingcap/tiflow/pkg/context" cerrors "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/etcd" "github.com/pingcap/tiflow/pkg/orchestrator" "github.com/pingcap/tiflow/pkg/upstream" "github.com/stretchr/testify/require" @@ -64,7 +65,9 @@ func (s *managerTester) resetSuit(ctx cdcContext.Context, t *testing.T) { captureInfoBytes, err := ctx.GlobalVars().CaptureInfo.Marshal() require.Nil(t, err) s.tester = orchestrator.NewReactorStateTester(t, s.state, map[string]string{ - fmt.Sprintf("/tidb/cdc/capture/%s", ctx.GlobalVars().CaptureInfo.ID): string(captureInfoBytes), + fmt.Sprintf("%s/capture/%s", + etcd.DefaultClusterAndMetaPrefix, + ctx.GlobalVars().CaptureInfo.ID): string(captureInfoBytes), }) } diff --git a/cdc/processor/processor_test.go b/cdc/processor/processor_test.go index 2561a3d5d9a..4606c3e07a7 100644 --- a/cdc/processor/processor_test.go +++ b/cdc/processor/processor_test.go @@ -114,14 +114,15 @@ func initProcessor4Test(ctx cdcContext.Context, t *testing.T) (*processor, *orch captureID := ctx.GlobalVars().CaptureInfo.ID changefeedID := ctx.ChangefeedVars().ID return p, orchestrator.NewReactorStateTester(t, p.changefeed, map[string]string{ - "/tidb/cdc/capture/" + - captureID: `{"id":"` + captureID + `","address":"127.0.0.1:8300"}`, - "/tidb/cdc/changefeed/info/" + - changefeedID.ID: changefeedInfo, - "/tidb/cdc/job/" + - ctx.ChangefeedVars().ID.ID: `{"resolved-ts":0,"checkpoint-ts":0,"admin-job-type":0}`, - "/tidb/cdc/task/status/" + - captureID + "/" + changefeedID.ID: `{"tables":{},"operation":null,"admin-job-type":0}`, + fmt.Sprintf("%s/capture/%s", + etcd.DefaultClusterAndMetaPrefix, + captureID): `{"id":"` + captureID + `","address":"127.0.0.1:8300"}`, + fmt.Sprintf("%s/changefeed/info/%s", + etcd.DefaultClusterAndNamespacePrefix, + changefeedID.ID): changefeedInfo, + fmt.Sprintf("%s/job/%s", + etcd.DefaultClusterAndNamespacePrefix, + ctx.ChangefeedVars().ID.ID): `{"resolved-ts":0,"checkpoint-ts":0,"admin-job-type":0}`, }) } diff --git a/cdc/scheduler/internal/base/processor_agent.go b/cdc/scheduler/internal/base/processor_agent.go index c49c276e4c1..f51b733e6e1 100644 --- a/cdc/scheduler/internal/base/processor_agent.go +++ b/cdc/scheduler/internal/base/processor_agent.go @@ -122,7 +122,7 @@ func NewAgent( etcdCliCtx, cancel := context.WithTimeout(ctx, getOwnerFromEtcdTimeout) defer cancel() ownerCaptureID, err := ret.etcdClient. - GetOwnerID(etcdCliCtx, etcd.CaptureOwnerKey) + GetOwnerID(etcdCliCtx, etcd.CaptureOwnerKey()) if err != nil { if err != concurrency.ErrElectionNoLeader { return nil, errors.Trace(err) diff --git a/cdc/scheduler/internal/base/processor_agent_test.go b/cdc/scheduler/internal/base/processor_agent_test.go index ec03d5a3302..2e58dc8750e 100644 --- a/cdc/scheduler/internal/base/processor_agent_test.go +++ b/cdc/scheduler/internal/base/processor_agent_test.go @@ -219,11 +219,11 @@ func TestAgentBasics(t *testing.T) { defer suite.Close() suite.etcdKVClient.On("Get", mock.Anything, - etcd.CaptureOwnerKey, mock.Anything). + etcd.CaptureOwnerKey(), mock.Anything). Return(&clientv3.GetResponse{ Kvs: []*mvccpb.KeyValue{ { - Key: []byte(etcd.CaptureOwnerKey), + Key: []byte(etcd.CaptureOwnerKey()), Value: []byte(ownerCaptureID), ModRevision: 1, }, @@ -337,7 +337,7 @@ func TestAgentNoOwnerAtStartUp(t *testing.T) { // Empty response implies no owner. suite.etcdKVClient.On("Get", mock.Anything, - etcd.CaptureOwnerKey, mock.Anything). + etcd.CaptureOwnerKey(), mock.Anything). Return(&clientv3.GetResponse{}, nil) // Test Point 1: Create an agent. @@ -393,11 +393,11 @@ func TestAgentTolerateClientClosed(t *testing.T) { defer suite.Close() suite.etcdKVClient.On("Get", mock.Anything, - etcd.CaptureOwnerKey, mock.Anything). + etcd.CaptureOwnerKey(), mock.Anything). Return(&clientv3.GetResponse{ Kvs: []*mvccpb.KeyValue{ { - Key: []byte(etcd.CaptureOwnerKey), + Key: []byte(etcd.CaptureOwnerKey()), Value: []byte(ownerCaptureID), ModRevision: 1, }, @@ -438,11 +438,11 @@ func TestNoFinishOperationBeforeSyncIsReceived(t *testing.T) { defer suite.Close() suite.etcdKVClient.On("Get", mock.Anything, - etcd.CaptureOwnerKey, mock.Anything). + etcd.CaptureOwnerKey(), mock.Anything). Return(&clientv3.GetResponse{ Kvs: []*mvccpb.KeyValue{ { - Key: []byte(etcd.CaptureOwnerKey), + Key: []byte(etcd.CaptureOwnerKey()), Value: []byte(ownerCaptureID), ModRevision: 1, }, diff --git a/pkg/cmd/cli/cli_capture_list.go b/pkg/cmd/cli/cli_capture_list.go index 1f62a4491e1..9b77594a4bf 100644 --- a/pkg/cmd/cli/cli_capture_list.go +++ b/pkg/cmd/cli/cli_capture_list.go @@ -96,7 +96,7 @@ func listCaptures(ctx context.Context, etcdClient *etcd.CDCEtcdClient) ([]*captu return nil, err } - ownerID, err := etcdClient.GetOwnerID(ctx, etcd.CaptureOwnerKey) + ownerID, err := etcdClient.GetOwnerID(ctx, etcd.CaptureOwnerKey()) if err != nil && errors.Cause(err) != concurrency.ErrElectionNoLeader { return nil, err } diff --git a/pkg/cmd/cli/cli_changefeed_query.go b/pkg/cmd/cli/cli_changefeed_query.go index d5766c5e5a1..23fe166b267 100644 --- a/pkg/cmd/cli/cli_changefeed_query.go +++ b/pkg/cmd/cli/cli_changefeed_query.go @@ -114,7 +114,8 @@ func (o *queryChangefeedOptions) run(cmd *cobra.Command) error { return err } - taskPositions, err := o.etcdClient.GetAllTaskPositions(ctx, o.changefeedID) + taskPositions, err := o.etcdClient.GetAllTaskPositions(ctx, + model.DefaultChangeFeedID(o.changefeedID)) if err != nil && cerror.ErrChangeFeedNotExists.NotEqual(err) { return err } @@ -124,18 +125,9 @@ func (o *queryChangefeedOptions) run(cmd *cobra.Command) error { count += pinfo.Count } - processorInfos, err := o.etcdClient.GetAllTaskStatus(ctx, o.changefeedID) - if err != nil { - return err - } - - taskStatus := make([]captureTaskStatus, 0, len(processorInfos)) - for captureID, status := range processorInfos { - taskStatus = append(taskStatus, captureTaskStatus{CaptureID: captureID, TaskStatus: status}) - } + taskStatus := make([]captureTaskStatus, 0) meta := &cfMeta{Info: info, Status: status, Count: count, TaskStatus: taskStatus} - return util.JSONPrint(cmd, meta) } diff --git a/pkg/cmd/cli/cli_changefeed_statistics.go b/pkg/cmd/cli/cli_changefeed_statistics.go index 243a2e68437..ec86ba5880d 100644 --- a/pkg/cmd/cli/cli_changefeed_statistics.go +++ b/pkg/cmd/cli/cli_changefeed_statistics.go @@ -120,7 +120,8 @@ func (o *statisticsChangefeedOptions) runCliWithEtcdClient(ctx context.Context, return err } - taskPositions, err := o.etcdClient.GetAllTaskPositions(ctx, o.changefeedID) + taskPositions, err := o.etcdClient.GetAllTaskPositions(ctx, + model.DefaultChangeFeedID(o.changefeedID)) if err != nil { return err } diff --git a/pkg/cmd/cli/cli_processor_query.go b/pkg/cmd/cli/cli_processor_query.go index 85a1eceaad2..acb25bdcb30 100644 --- a/pkg/cmd/cli/cli_processor_query.go +++ b/pkg/cmd/cli/cli_processor_query.go @@ -99,12 +99,10 @@ func (o *queryProcessorOptions) addFlags(cmd *cobra.Command) { // run cli cmd with etcd client func (o *queryProcessorOptions) runCliWithEtcdClient(ctx context.Context, cmd *cobra.Command) error { - _, status, err := o.etcdClient.GetTaskStatus(ctx, o.changefeedID, o.captureID) - if err != nil && cerror.ErrTaskStatusNotExists.Equal(err) { - return err - } + status := &model.TaskStatus{} - _, position, err := o.etcdClient.GetTaskPosition(ctx, o.changefeedID, o.captureID) + _, position, err := o.etcdClient.GetTaskPosition(ctx, + model.DefaultChangeFeedID(o.changefeedID), o.captureID) if err != nil && cerror.ErrTaskPositionNotExists.Equal(err) { return err } diff --git a/pkg/cmd/server/server_test.go b/pkg/cmd/server/server_test.go index 99e795509e6..1de79aec9a6 100644 --- a/pkg/cmd/server/server_test.go +++ b/pkg/cmd/server/server_test.go @@ -210,6 +210,7 @@ func TestParseCfg(t *testing.T) { ServerWorkerPoolSize: 4, }, }, + ClusterID: "default", }, o.serverConfig) } @@ -356,6 +357,7 @@ server-worker-pool-size = 16 ServerWorkerPoolSize: 16, }, }, + ClusterID: "default", }, o.serverConfig) } @@ -499,6 +501,7 @@ cert-allowed-cn = ["dd","ee"] ServerWorkerPoolSize: 4, }, }, + ClusterID: "default", }, o.serverConfig) } diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index 7f4efa79759..8aeaf36eecf 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -137,7 +137,8 @@ const ( "server-ack-interval": 100000000, "server-worker-pool-size": 4 } - } + }, + "cluster-id": "default" }` testCfgTestReplicaConfigMarshal1 = `{ diff --git a/pkg/config/server_config.go b/pkg/config/server_config.go index 864b9ad361b..029c2c42bc5 100644 --- a/pkg/config/server_config.go +++ b/pkg/config/server_config.go @@ -129,6 +129,7 @@ var defaultServerConfig = &ServerConfig{ }, Messages: defaultMessageConfig.Clone(), }, + ClusterID: "default", } // ServerConfig represents a config for server @@ -155,6 +156,7 @@ type ServerConfig struct { PerTableMemoryQuota uint64 `toml:"per-table-memory-quota" json:"per-table-memory-quota"` KVClient *KVClientConfig `toml:"kv-client" json:"kv-client"` Debug *DebugConfig `toml:"debug" json:"debug"` + ClusterID string `toml:"cluster-id" json:"cluster-id"` } // Marshal returns the json marshal format of a ServerConfig diff --git a/pkg/etcd/etcd.go b/pkg/etcd/etcd.go index 7171f149a46..300a1d7a6b8 100644 --- a/pkg/etcd/etcd.go +++ b/pkg/etcd/etcd.go @@ -36,49 +36,49 @@ import ( cerror "github.com/pingcap/tiflow/pkg/errors" ) -const ( - // CaptureOwnerKey is the capture owner path that is saved to etcd - CaptureOwnerKey = EtcdKeyBase + "/owner" - // CaptureInfoKeyPrefix is the capture info path that is saved to etcd - CaptureInfoKeyPrefix = EtcdKeyBase + "/capture" - // TaskKeyPrefix is the prefix of task keys - TaskKeyPrefix = EtcdKeyBase + "/task" - // TaskStatusKeyPrefix is the prefix of task status keys - TaskStatusKeyPrefix = TaskKeyPrefix + "/status" - // TaskPositionKeyPrefix is the prefix of task position keys - TaskPositionKeyPrefix = TaskKeyPrefix + "/position" - // JobKeyPrefix is the prefix of job keys - JobKeyPrefix = EtcdKeyBase + "/job" -) +// CaptureOwnerKey is the capture owner path that is saved to etcd +func CaptureOwnerKey() string { + return BaseKey() + metaPrefix + "/owner" +} + +// CaptureInfoKeyPrefix is the capture info path that is saved to etcd +func CaptureInfoKeyPrefix() string { + return BaseKey() + metaPrefix + captureKey +} + +// TaskPositionKeyPrefix is the prefix of task position keys +func TaskPositionKeyPrefix(namespace string) string { + return NamespacedPrefix(namespace) + taskPositionKey +} + +// JobKeyPrefix is the prefix of job keys +func JobKeyPrefix(namespace string) string { + return NamespacedPrefix(namespace) + jobKey +} // GetEtcdKeyChangeFeedList returns the prefix key of all changefeed config -func GetEtcdKeyChangeFeedList() string { - return fmt.Sprintf("%s/changefeed/info", EtcdKeyBase) +func GetEtcdKeyChangeFeedList(namespace string) string { + return fmt.Sprintf("%s/changefeed/info", NamespacedPrefix(namespace)) } // GetEtcdKeyChangeFeedInfo returns the key of a changefeed config func GetEtcdKeyChangeFeedInfo(changefeedID model.ChangeFeedID) string { - return fmt.Sprintf("%s/%s", GetEtcdKeyChangeFeedList(), changefeedID.ID) + return fmt.Sprintf("%s/%s", GetEtcdKeyChangeFeedList(changefeedID.Namespace), changefeedID.ID) } // GetEtcdKeyTaskPosition returns the key of a task position -func GetEtcdKeyTaskPosition(changefeedID, captureID string) string { - return TaskPositionKeyPrefix + "/" + captureID + "/" + changefeedID +func GetEtcdKeyTaskPosition(changefeedID model.ChangeFeedID, captureID string) string { + return TaskPositionKeyPrefix(changefeedID.Namespace) + "/" + captureID + "/" + changefeedID.ID } // GetEtcdKeyCaptureInfo returns the key of a capture info func GetEtcdKeyCaptureInfo(id string) string { - return CaptureInfoKeyPrefix + "/" + id -} - -// GetEtcdKeyTaskStatus returns the key for the task status -func GetEtcdKeyTaskStatus(changeFeedID, captureID string) string { - return TaskStatusKeyPrefix + "/" + captureID + "/" + changeFeedID + return CaptureInfoKeyPrefix() + "/" + id } // GetEtcdKeyJob returns the key for a job status func GetEtcdKeyJob(changeFeedID model.ChangeFeedID) string { - return JobKeyPrefix + "/" + changeFeedID.ID + return JobKeyPrefix(changeFeedID.Namespace) + "/" + changeFeedID.ID } // CDCEtcdClient is a wrap of etcd client @@ -106,13 +106,13 @@ func (c CDCEtcdClient) Close() error { // ClearAllCDCInfo delete all keys created by CDC func (c CDCEtcdClient) ClearAllCDCInfo(ctx context.Context) error { - _, err := c.Client.Delete(ctx, EtcdKeyBase, clientv3.WithPrefix()) + _, err := c.Client.Delete(ctx, BaseKey(), clientv3.WithPrefix()) return cerror.WrapError(cerror.ErrPDEtcdAPIError, err) } // GetAllCDCInfo get all keys created by CDC func (c CDCEtcdClient) GetAllCDCInfo(ctx context.Context) ([]*mvccpb.KeyValue, error) { - resp, err := c.Client.Get(ctx, EtcdKeyBase, clientv3.WithPrefix()) + resp, err := c.Client.Get(ctx, BaseKey(), clientv3.WithPrefix()) if err != nil { return nil, cerror.WrapError(cerror.ErrPDEtcdAPIError, err) } @@ -124,7 +124,8 @@ func (c CDCEtcdClient) GetChangeFeeds(ctx context.Context) ( int64, map[model.ChangeFeedID]*mvccpb.KeyValue, error, ) { - key := GetEtcdKeyChangeFeedList() + // todo: support namespace + key := GetEtcdKeyChangeFeedList(model.DefaultNamespace) resp, err := c.Client.Get(ctx, key, clientv3.WithPrefix()) if err != nil { @@ -192,7 +193,8 @@ func (c CDCEtcdClient) DeleteChangeFeedInfo(ctx context.Context, func (c CDCEtcdClient) GetAllChangeFeedStatus(ctx context.Context) ( map[model.ChangeFeedID]*model.ChangeFeedStatus, error, ) { - key := JobKeyPrefix + // todo: support namespace + key := JobKeyPrefix(model.DefaultNamespace) resp, err := c.Client.Get(ctx, key, clientv3.WithPrefix()) if err != nil { return nil, cerror.WrapError(cerror.ErrPDEtcdAPIError, err) @@ -232,7 +234,7 @@ func (c CDCEtcdClient) GetChangeFeedStatus(ctx context.Context, // GetCaptures returns kv revision and CaptureInfo list func (c CDCEtcdClient) GetCaptures(ctx context.Context) (int64, []*model.CaptureInfo, error) { - key := CaptureInfoKeyPrefix + key := CaptureInfoKeyPrefix() resp, err := c.Client.Get(ctx, key, clientv3.WithPrefix()) if err != nil { @@ -276,7 +278,7 @@ func (c CDCEtcdClient) GetCaptureInfo(ctx context.Context, id string) (info *mod // GetCaptureLeases returns a map mapping from capture ID to its lease func (c CDCEtcdClient) GetCaptureLeases(ctx context.Context) (map[string]int64, error) { - key := CaptureInfoKeyPrefix + key := CaptureInfoKeyPrefix() resp, err := c.Client.Get(ctx, key, clientv3.WithPrefix()) if err != nil { @@ -358,7 +360,9 @@ func (c CDCEtcdClient) SaveChangeFeedInfo(ctx context.Context, // GetProcessors queries all processors of the cdc cluster, // and returns a slice of ProcInfoSnap(without table info) func (c CDCEtcdClient) GetProcessors(ctx context.Context) ([]*model.ProcInfoSnap, error) { - resp, err := c.Client.Get(ctx, TaskStatusKeyPrefix, clientv3.WithPrefix()) + // todo: support namespace + resp, err := c.Client.Get(ctx, TaskPositionKeyPrefix(model.DefaultNamespace), + clientv3.WithPrefix()) if err != nil { return nil, cerror.WrapError(cerror.ErrPDEtcdAPIError, err) } @@ -382,64 +386,13 @@ func (c CDCEtcdClient) GetProcessors(ctx context.Context) ([]*model.ProcInfoSnap return infos, nil } -// GetAllTaskStatus queries all task status of a changefeed, and returns a map -// mapping from captureID to TaskStatus -func (c CDCEtcdClient) GetAllTaskStatus(ctx context.Context, changefeedID string) (model.ProcessorsInfos, error) { - resp, err := c.Client.Get(ctx, TaskStatusKeyPrefix, clientv3.WithPrefix()) - if err != nil { - return nil, cerror.WrapError(cerror.ErrPDEtcdAPIError, err) - } - pinfo := make(map[string]*model.TaskStatus, resp.Count) - for _, rawKv := range resp.Kvs { - changeFeed, err := extractKeySuffix(string(rawKv.Key)) - if err != nil { - return nil, err - } - endIndex := len(rawKv.Key) - len(changeFeed) - 1 - captureID, err := extractKeySuffix(string(rawKv.Key[0:endIndex])) - if err != nil { - return nil, err - } - if changeFeed != changefeedID { - continue - } - info := &model.TaskStatus{} - err = info.Unmarshal(rawKv.Value) - if err != nil { - return nil, cerror.ErrDecodeFailed.GenWithStackByArgs("failed to unmarshal task status: %s", err) - } - info.ModRevision = rawKv.ModRevision - pinfo[captureID] = info - } - return pinfo, nil -} - -// GetTaskStatus queries task status from etcd, returns -// - ModRevision of the given key -// - *model.TaskStatus unmarshalled from the value -// - error if error happens -func (c CDCEtcdClient) GetTaskStatus( - ctx context.Context, - changefeedID string, - captureID string, -) (int64, *model.TaskStatus, error) { - key := GetEtcdKeyTaskStatus(changefeedID, captureID) - resp, err := c.Client.Get(ctx, key) - if err != nil { - return 0, nil, cerror.WrapError(cerror.ErrPDEtcdAPIError, err) - } - if resp.Count == 0 { - return 0, nil, cerror.ErrTaskStatusNotExists.GenWithStackByArgs(key) - } - info := &model.TaskStatus{} - err = info.Unmarshal(resp.Kvs[0].Value) - return resp.Kvs[0].ModRevision, info, errors.Trace(err) -} - // GetAllTaskPositions queries all task positions of a changefeed, and returns a map // mapping from captureID to TaskPositions -func (c CDCEtcdClient) GetAllTaskPositions(ctx context.Context, changefeedID string) (map[string]*model.TaskPosition, error) { - resp, err := c.Client.Get(ctx, TaskPositionKeyPrefix, clientv3.WithPrefix()) +func (c CDCEtcdClient) GetAllTaskPositions(ctx context.Context, + changefeedID model.ChangeFeedID, +) (map[string]*model.TaskPosition, error) { + resp, err := c.Client.Get(ctx, TaskPositionKeyPrefix(changefeedID.Namespace), + clientv3.WithPrefix()) if err != nil { return nil, cerror.WrapError(cerror.ErrPDEtcdAPIError, err) } @@ -454,7 +407,7 @@ func (c CDCEtcdClient) GetAllTaskPositions(ctx context.Context, changefeedID str if err != nil { return nil, err } - if changeFeed != changefeedID { + if changeFeed != changefeedID.ID { continue } info := &model.TaskPosition{} @@ -473,7 +426,7 @@ func (c CDCEtcdClient) GetAllTaskPositions(ctx context.Context, changefeedID str // - error if error happens func (c CDCEtcdClient) GetTaskPosition( ctx context.Context, - changefeedID string, + changefeedID model.ChangeFeedID, captureID string, ) (int64, *model.TaskPosition, error) { key := GetEtcdKeyTaskPosition(changefeedID, captureID) @@ -523,7 +476,7 @@ func (c CDCEtcdClient) GetOwnerID(ctx context.Context, key string) (string, erro // GetOwnerRevision gets the Etcd revision for the elected owner. func (c CDCEtcdClient) GetOwnerRevision(ctx context.Context, captureID string) (rev int64, err error) { - resp, err := c.Client.Get(ctx, CaptureOwnerKey, clientv3.WithFirstCreate()...) + resp, err := c.Client.Get(ctx, CaptureOwnerKey(), clientv3.WithFirstCreate()...) if err != nil { return 0, cerror.WrapError(cerror.ErrPDEtcdAPIError, err) } @@ -582,7 +535,7 @@ func SetupEmbedEtcd(dir string) (clientURL *url.URL, e *embed.Etcd, err error) { } // extractKeySuffix extracts the suffix of an etcd key, such as extracting -// "6a6c6dd290bc8732" from /tidb/cdc/changefeed/info/6a6c6dd290bc8732 +// "6a6c6dd290bc8732" from /tidb/cdc/cluster/namespace/changefeed/info/6a6c6dd290bc8732 func extractKeySuffix(key string) (string, error) { subs := strings.Split(key, "/") if len(subs) < 2 { diff --git a/pkg/etcd/etcd_test.go b/pkg/etcd/etcd_test.go index 66093579afb..c5b34f345a5 100644 --- a/pkg/etcd/etcd_test.go +++ b/pkg/etcd/etcd_test.go @@ -369,7 +369,7 @@ func TestGetOwnerRevision(t *testing.T) { sess, err := concurrency.NewSession(s.client.Client.Unwrap(), concurrency.WithTTL(10 /* seconds */)) require.Nil(t, err) - election := concurrency.NewElection(sess, CaptureOwnerKey) + election := concurrency.NewElection(sess, CaptureOwnerKey()) mockCaptureID := fmt.Sprintf("capture-%d", i) diff --git a/pkg/etcd/etcdkey.go b/pkg/etcd/etcdkey.go index 622391afbc9..04a1250aab1 100644 --- a/pkg/etcd/etcdkey.go +++ b/pkg/etcd/etcdkey.go @@ -14,26 +14,32 @@ package etcd import ( + "fmt" "strings" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" ) const ( - // EtcdKeyBase is the common prefix of the keys in CDC - EtcdKeyBase = "/tidb/cdc" - ownerKey = "/owner" - captureKey = "/capture" + metaPrefix = "/__cdc_meta__" - taskKey = "/task" - taskWorkloadKey = taskKey + "/workload" - taskStatusKey = taskKey + "/status" - taskPositionKey = taskKey + "/position" + ownerKey = "/owner" + captureKey = "/capture" + taskPositionKey = "/task/position" changefeedInfoKey = "/changefeed/info" jobKey = "/job" + + // DeletionCounterKey is the key path for the counter of deleted keys + DeletionCounterKey = metaPrefix + "/meta/ticdc-delete-etcd-key-count" + + // DefaultClusterAndNamespacePrefix is the default prefix of changefeed data + DefaultClusterAndNamespacePrefix = "/tidb/cdc/default/default" + // DefaultClusterAndMetaPrefix is the default prefix of cluster mata + DefaultClusterAndMetaPrefix = "/tidb/cdc/default" + metaPrefix ) // CDCKeyType is the type of etcd key @@ -47,10 +53,6 @@ const ( CDCKeyTypeChangefeedInfo CDCKeyTypeChangeFeedStatus CDCKeyTypeTaskPosition - // Deprecated: No longer used. Kept for compatibility. - CDCKeyTypeTaskStatus - // Deprecated: No longer used. Kept for compatibility. - CDCKeyTypeTaskWorkload ) // CDCKey represents a etcd key which is defined by TiCDC @@ -82,66 +84,82 @@ type CDCKey struct { ChangefeedID model.ChangeFeedID CaptureID string OwnerLeaseID string + ClusterID string +} + +// BaseKey is the common prefix of the keys with cluster id in CDC +func BaseKey() string { + clusterID := config.GetGlobalServerConfig().ClusterID + return fmt.Sprintf("/tidb/cdc/%s", clusterID) +} + +// NamespacedPrefix returns the etcd prefix of changefeed data +func NamespacedPrefix(namespace string) string { + return BaseKey() + "/" + namespace } // Parse parses the given etcd key func (k *CDCKey) Parse(key string) error { - if !strings.HasPrefix(key, EtcdKeyBase) { + if !strings.HasPrefix(key, BaseKey()) { return cerror.ErrInvalidEtcdKey.GenWithStackByArgs(key) } - key = key[len(EtcdKeyBase):] - switch { - case strings.HasPrefix(key, ownerKey): - k.Tp = CDCKeyTypeOwner - k.CaptureID = "" - key = key[len(ownerKey):] - if len(key) > 0 { - key = key[1:] - } - k.OwnerLeaseID = key - case strings.HasPrefix(key, captureKey): - k.Tp = CDCKeyTypeCapture - k.CaptureID = key[len(captureKey)+1:] - k.OwnerLeaseID = "" - case strings.HasPrefix(key, changefeedInfoKey): - k.Tp = CDCKeyTypeChangefeedInfo - k.CaptureID = "" - k.ChangefeedID = model.DefaultChangeFeedID(key[len(changefeedInfoKey)+1:]) - k.OwnerLeaseID = "" - case strings.HasPrefix(key, jobKey): - k.Tp = CDCKeyTypeChangeFeedStatus - k.CaptureID = "" - k.ChangefeedID = model.DefaultChangeFeedID(key[len(jobKey)+1:]) - k.OwnerLeaseID = "" - case strings.HasPrefix(key, taskStatusKey): - splitKey := strings.SplitN(key[len(taskStatusKey)+1:], "/", 2) - if len(splitKey) != 2 { + key = key[len("/tidb/cdc"):] + parts := strings.Split(key, "/") + k.ClusterID = parts[1] + key = key[len(k.ClusterID)+1:] + if strings.HasPrefix(key, metaPrefix) { + key = key[len(metaPrefix):] + switch { + case strings.HasPrefix(key, ownerKey): + k.Tp = CDCKeyTypeOwner + k.CaptureID = "" + key = key[len(ownerKey):] + if len(key) > 0 { + key = key[1:] + } + k.OwnerLeaseID = key + case strings.HasPrefix(key, captureKey): + k.Tp = CDCKeyTypeCapture + k.CaptureID = key[len(captureKey)+1:] + k.OwnerLeaseID = "" + default: return cerror.ErrInvalidEtcdKey.GenWithStackByArgs(key) } - k.Tp = CDCKeyTypeTaskStatus - k.CaptureID = splitKey[0] - k.ChangefeedID = model.DefaultChangeFeedID(splitKey[1]) - k.OwnerLeaseID = "" - case strings.HasPrefix(key, taskPositionKey): - splitKey := strings.SplitN(key[len(taskPositionKey)+1:], "/", 2) - if len(splitKey) != 2 { + } else { + namespace := parts[2] + key = key[len(namespace)+1:] + switch { + case strings.HasPrefix(key, changefeedInfoKey): + k.Tp = CDCKeyTypeChangefeedInfo + k.CaptureID = "" + k.ChangefeedID = model.ChangeFeedID{ + Namespace: namespace, + ID: key[len(changefeedInfoKey)+1:], + } + k.OwnerLeaseID = "" + case strings.HasPrefix(key, jobKey): + k.Tp = CDCKeyTypeChangeFeedStatus + k.CaptureID = "" + k.ChangefeedID = model.ChangeFeedID{ + Namespace: namespace, + ID: key[len(jobKey)+1:], + } + k.OwnerLeaseID = "" + case strings.HasPrefix(key, taskPositionKey): + splitKey := strings.SplitN(key[len(taskPositionKey)+1:], "/", 2) + if len(splitKey) != 2 { + return cerror.ErrInvalidEtcdKey.GenWithStackByArgs(key) + } + k.Tp = CDCKeyTypeTaskPosition + k.CaptureID = splitKey[0] + k.ChangefeedID = model.ChangeFeedID{ + Namespace: namespace, + ID: splitKey[1], + } + k.OwnerLeaseID = "" + default: return cerror.ErrInvalidEtcdKey.GenWithStackByArgs(key) } - k.Tp = CDCKeyTypeTaskPosition - k.CaptureID = splitKey[0] - k.ChangefeedID = model.DefaultChangeFeedID(splitKey[1]) - k.OwnerLeaseID = "" - case strings.HasPrefix(key, taskWorkloadKey): - splitKey := strings.SplitN(key[len(taskWorkloadKey)+1:], "/", 2) - if len(splitKey) != 2 { - return cerror.ErrInvalidEtcdKey.GenWithStackByArgs(key) - } - k.Tp = CDCKeyTypeTaskWorkload - k.CaptureID = splitKey[0] - k.ChangefeedID = model.DefaultChangeFeedID(splitKey[1]) - k.OwnerLeaseID = "" - default: - return cerror.ErrInvalidEtcdKey.GenWithStackByArgs(key) } return nil } @@ -150,21 +168,20 @@ func (k *CDCKey) String() string { switch k.Tp { case CDCKeyTypeOwner: if len(k.OwnerLeaseID) == 0 { - return EtcdKeyBase + ownerKey + return BaseKey() + metaPrefix + ownerKey } - return EtcdKeyBase + ownerKey + "/" + k.OwnerLeaseID + return BaseKey() + metaPrefix + ownerKey + "/" + k.OwnerLeaseID case CDCKeyTypeCapture: - return EtcdKeyBase + captureKey + "/" + k.CaptureID + return BaseKey() + metaPrefix + captureKey + "/" + k.CaptureID case CDCKeyTypeChangefeedInfo: - return EtcdKeyBase + changefeedInfoKey + "/" + k.ChangefeedID.ID + return NamespacedPrefix(k.ChangefeedID.Namespace) + changefeedInfoKey + + "/" + k.ChangefeedID.ID case CDCKeyTypeChangeFeedStatus: - return EtcdKeyBase + jobKey + "/" + k.ChangefeedID.ID + return NamespacedPrefix(k.ChangefeedID.Namespace) + jobKey + + "/" + k.ChangefeedID.ID case CDCKeyTypeTaskPosition: - return EtcdKeyBase + taskPositionKey + "/" + k.CaptureID + "/" + k.ChangefeedID.ID - case CDCKeyTypeTaskStatus: - return EtcdKeyBase + taskStatusKey + "/" + k.CaptureID + "/" + k.ChangefeedID.ID - case CDCKeyTypeTaskWorkload: - return EtcdKeyBase + taskWorkloadKey + "/" + k.CaptureID + "/" + k.ChangefeedID.ID + return NamespacedPrefix(k.ChangefeedID.Namespace) + taskPositionKey + + "/" + k.CaptureID + "/" + k.ChangefeedID.ID } log.Panic("unreachable") return "" diff --git a/pkg/etcd/etcdkey_test.go b/pkg/etcd/etcdkey_test.go index 540c180b7ae..3283850964c 100644 --- a/pkg/etcd/etcdkey_test.go +++ b/pkg/etcd/etcdkey_test.go @@ -14,6 +14,7 @@ package etcd import ( + "fmt" "testing" "github.com/pingcap/tiflow/cdc/model" @@ -25,68 +26,71 @@ func TestEtcdKey(t *testing.T) { key string expected *CDCKey }{{ - key: "/tidb/cdc/owner/223176cb44d20a13", + key: fmt.Sprintf("%s/owner/223176cb44d20a13", DefaultClusterAndMetaPrefix), expected: &CDCKey{ Tp: CDCKeyTypeOwner, OwnerLeaseID: "223176cb44d20a13", + ClusterID: "default", }, }, { - key: "/tidb/cdc/owner", + key: fmt.Sprintf("%s/owner", DefaultClusterAndMetaPrefix), expected: &CDCKey{ Tp: CDCKeyTypeOwner, OwnerLeaseID: "", + ClusterID: "default", }, }, { - key: "/tidb/cdc/capture/6bbc01c8-0605-4f86-a0f9-b3119109b225", + key: fmt.Sprintf("%s/capture/6bbc01c8-0605-4f86-a0f9-b3119109b225", + DefaultClusterAndMetaPrefix), expected: &CDCKey{ Tp: CDCKeyTypeCapture, CaptureID: "6bbc01c8-0605-4f86-a0f9-b3119109b225", + ClusterID: "default", }, }, { - key: "/tidb/cdc/changefeed/info/test-_@#$%changefeed", + key: fmt.Sprintf("%s", DefaultClusterAndNamespacePrefix) + + "/changefeed/info/test-_@#$%changefeed", expected: &CDCKey{ Tp: CDCKeyTypeChangefeedInfo, ChangefeedID: model.DefaultChangeFeedID("test-_@#$%changefeed"), + ClusterID: "default", }, }, { - key: "/tidb/cdc/changefeed/info/test/changefeed", + key: fmt.Sprintf("%s", DefaultClusterAndNamespacePrefix) + + "/changefeed/info/test/changefeed", expected: &CDCKey{ Tp: CDCKeyTypeChangefeedInfo, ChangefeedID: model.DefaultChangeFeedID("test/changefeed"), + ClusterID: "default", }, }, { - key: "/tidb/cdc/job/test-changefeed", + key: fmt.Sprintf("%s", DefaultClusterAndNamespacePrefix) + + "/job/test-changefeed", expected: &CDCKey{ Tp: CDCKeyTypeChangeFeedStatus, ChangefeedID: model.DefaultChangeFeedID("test-changefeed"), + ClusterID: "default", }, }, { - key: "/tidb/cdc/task/position/6bbc01c8-0605-4f86-a0f9-b3119109b225/test-changefeed", + key: "/tidb/cdc/default/name/task" + + "/position/6bbc01c8-0605-4f86-a0f9-b3119109b225/test-changefeed", expected: &CDCKey{ - Tp: CDCKeyTypeTaskPosition, - ChangefeedID: model.DefaultChangeFeedID("test-changefeed"), - CaptureID: "6bbc01c8-0605-4f86-a0f9-b3119109b225", + Tp: CDCKeyTypeTaskPosition, + ChangefeedID: model.ChangeFeedID{ + Namespace: "name", + ID: "test-changefeed", + }, + CaptureID: "6bbc01c8-0605-4f86-a0f9-b3119109b225", + ClusterID: "default", }, }, { - key: "/tidb/cdc/task/position/6bbc01c8-0605-4f86-a0f9-b3119109b225/test/changefeed", + key: fmt.Sprintf("%s", DefaultClusterAndNamespacePrefix) + + "/task/position/6bbc01c8-0605-4f86-a0f9-b3119109b225/test/changefeed", expected: &CDCKey{ Tp: CDCKeyTypeTaskPosition, ChangefeedID: model.DefaultChangeFeedID("test/changefeed"), CaptureID: "6bbc01c8-0605-4f86-a0f9-b3119109b225", - }, - }, { - key: "/tidb/cdc/task/status/6bbc01c8-0605-4f86-a0f9-b3119109b225/test-changefeed", - expected: &CDCKey{ - Tp: CDCKeyTypeTaskStatus, - ChangefeedID: model.DefaultChangeFeedID("test-changefeed"), - CaptureID: "6bbc01c8-0605-4f86-a0f9-b3119109b225", - }, - }, { - key: "/tidb/cdc/task/workload/6bbc01c8-0605-4f86-a0f9-b3119109b225/test-changefeed", - expected: &CDCKey{ - Tp: CDCKeyTypeTaskWorkload, - ChangefeedID: model.DefaultChangeFeedID("test-changefeed"), - CaptureID: "6bbc01c8-0605-4f86-a0f9-b3119109b225", + ClusterID: "default", }, }} for _, tc := range testcases { @@ -103,19 +107,16 @@ func TestEtcdKeyParseError(t *testing.T) { key string error bool }{{ - key: "/tidb/cdc/task/position/6bbc01c8-0605-4f86-a0f9-b3119109b225/test/changefeed", + key: fmt.Sprintf("%s", DefaultClusterAndNamespacePrefix) + + "/task/position/6bbc01c8-0605-4f86-a0f9-b3119109b225/test/changefeed", error: false, }, { - key: "/tidb/cdc/task/position/6bbc01c8-0605-4f86-a0f9-b3119109b225/", + key: fmt.Sprintf("%s", DefaultClusterAndNamespacePrefix) + + "/task/position/6bbc01c8-0605-4f86-a0f9-b3119109b225/", error: false, }, { - key: "/tidb/cdc/task/position/6bbc01c8-0605-4f86-a0f9-b3119109b225", - error: true, - }, { - key: "/tidb/cdc/task/status/6bbc01c8-0605-4f86-a0f9-b3119109b225", - error: true, - }, { - key: "/tidb/cdc/task/workload/6bbc01c8-0605-4f86-a0f9-b3119109b225", + key: fmt.Sprintf("%s", DefaultClusterAndNamespacePrefix) + + "/task/position/6bbc01c8-0605-4f86-a0f9-b3119109b225", error: true, }, { key: "/tidb/cd", diff --git a/pkg/orchestrator/etcd_worker.go b/pkg/orchestrator/etcd_worker.go index 64acf720c79..9b135669817 100644 --- a/pkg/orchestrator/etcd_worker.go +++ b/pkg/orchestrator/etcd_worker.go @@ -42,7 +42,6 @@ const ( // When EtcdWorker commits a txn to etcd or ticks its reactor // takes more than etcdWorkerLogsWarnDuration, it will print a log etcdWorkerLogsWarnDuration = 1 * time.Second - deletionCounterKey = "/meta/ticdc-delete-etcd-key-count" ) // EtcdWorker handles all interactions with Etcd @@ -397,12 +396,17 @@ func (worker *EtcdWorker) commitChangedState(ctx context.Context, changedState m } if hasDelete { - opsThen = append(opsThen, clientv3.OpPut(worker.prefix.String()+deletionCounterKey, fmt.Sprint(worker.deleteCounter+1))) + opsThen = append(opsThen, clientv3.OpPut(worker.prefix.String()+etcd.DeletionCounterKey, + fmt.Sprint(worker.deleteCounter+1))) } if worker.deleteCounter > 0 { - cmps = append(cmps, clientv3.Compare(clientv3.Value(worker.prefix.String()+deletionCounterKey), "=", fmt.Sprint(worker.deleteCounter))) + cmps = append(cmps, clientv3.Compare(clientv3.Value(worker.prefix.String()+ + etcd.DeletionCounterKey), + "=", fmt.Sprint(worker.deleteCounter))) } else if worker.deleteCounter == 0 { - cmps = append(cmps, clientv3.Compare(clientv3.CreateRevision(worker.prefix.String()+deletionCounterKey), "=", 0)) + cmps = append(cmps, clientv3.Compare(clientv3.CreateRevision(worker.prefix.String()+ + etcd.DeletionCounterKey), + "=", 0)) } else { panic("unreachable") } @@ -488,7 +492,7 @@ func (worker *EtcdWorker) cleanUp() { } func (worker *EtcdWorker) isDeleteCounterKey(key []byte) bool { - return string(key) == worker.prefix.String()+deletionCounterKey + return string(key) == worker.prefix.String()+etcd.DeletionCounterKey } func (worker *EtcdWorker) handleDeleteCounter(value []byte) { diff --git a/pkg/orchestrator/reactor_state.go b/pkg/orchestrator/reactor_state.go index 2ea04a60f2e..a62792579b1 100644 --- a/pkg/orchestrator/reactor_state.go +++ b/pkg/orchestrator/reactor_state.go @@ -89,9 +89,7 @@ func (s *GlobalReactorState) Update(key util.EtcdKey, value []byte, _ bool) erro s.Captures[k.CaptureID] = &newCaptureInfo case etcd.CDCKeyTypeChangefeedInfo, etcd.CDCKeyTypeChangeFeedStatus, - etcd.CDCKeyTypeTaskPosition, - etcd.CDCKeyTypeTaskStatus, - etcd.CDCKeyTypeTaskWorkload: + etcd.CDCKeyTypeTaskPosition: changefeedState, exist := s.Changefeeds[k.ChangefeedID] if !exist { if value == nil { @@ -141,11 +139,6 @@ type ChangefeedReactorState struct { Status *model.ChangeFeedStatus TaskPositions map[model.CaptureID]*model.TaskPosition - // Deprecated: No longer used, kept for compatibility. - TaskStatuses map[model.CaptureID]*model.TaskStatus - // Deprecated: No longer used, kept for compatibility. - Workloads map[model.CaptureID]model.TaskWorkload - pendingPatches []DataPatch skipPatchesInThisTick bool } @@ -155,8 +148,6 @@ func NewChangefeedReactorState(id model.ChangeFeedID) *ChangefeedReactorState { return &ChangefeedReactorState{ ID: id, TaskPositions: make(map[model.CaptureID]*model.TaskPosition), - TaskStatuses: make(map[model.CaptureID]*model.TaskStatus), - Workloads: make(map[model.CaptureID]model.TaskWorkload), } } @@ -208,28 +199,6 @@ func (s *ChangefeedReactorState) UpdateCDCKey(key *etcd.CDCKey, value []byte) er position := new(model.TaskPosition) s.TaskPositions[key.CaptureID] = position e = position - case etcd.CDCKeyTypeTaskStatus: - if key.ChangefeedID != s.ID { - return nil - } - if value == nil { - delete(s.TaskStatuses, key.CaptureID) - return nil - } - status := new(model.TaskStatus) - s.TaskStatuses[key.CaptureID] = status - e = status - case etcd.CDCKeyTypeTaskWorkload: - if key.ChangefeedID != s.ID { - return nil - } - if value == nil { - delete(s.Workloads, key.CaptureID) - return nil - } - workload := make(model.TaskWorkload) - s.Workloads[key.CaptureID] = workload - e = &workload default: return nil } @@ -246,7 +215,7 @@ func (s *ChangefeedReactorState) UpdateCDCKey(key *etcd.CDCKey, value []byte) er // Exist returns false if all keys of this changefeed in ETCD is not exist func (s *ChangefeedReactorState) Exist() bool { - return s.Info != nil || s.Status != nil || len(s.TaskPositions) != 0 || len(s.TaskStatuses) != 0 || len(s.Workloads) != 0 + return s.Info != nil || s.Status != nil || len(s.TaskPositions) != 0 } // Active return true if the changefeed is ready to be processed diff --git a/pkg/orchestrator/reactor_state_test.go b/pkg/orchestrator/reactor_state_test.go index 4d2e9d11d44..03d2aa8e97b 100644 --- a/pkg/orchestrator/reactor_state_test.go +++ b/pkg/orchestrator/reactor_state_test.go @@ -33,7 +33,9 @@ func TestCheckCaptureAlive(t *testing.T) { stateTester := NewReactorStateTester(t, state, nil) state.CheckCaptureAlive("6bbc01c8-0605-4f86-a0f9-b3119109b225") require.Contains(t, stateTester.ApplyPatches().Error(), "[CDC:ErrLeaseExpired]") - err := stateTester.Update("/tidb/cdc/capture/6bbc01c8-0605-4f86-a0f9-b3119109b225", []byte(`{"id":"6bbc01c8-0605-4f86-a0f9-b3119109b225","address":"127.0.0.1:8300"}`)) + err := stateTester.Update(fmt.Sprintf("%s", etcd.DefaultClusterAndMetaPrefix)+ + "/capture/6bbc01c8-0605-4f86-a0f9-b3119109b225", + []byte(`{"id":"6bbc01c8-0605-4f86-a0f9-b3119109b225","address":"127.0.0.1:8300"}`)) require.Nil(t, err) state.CheckCaptureAlive("6bbc01c8-0605-4f86-a0f9-b3119109b225") stateTester.MustApplyPatches() @@ -99,19 +101,19 @@ func TestChangefeedStateUpdate(t *testing.T) { { // common case changefeedID: "test1", updateKey: []string{ - "/tidb/cdc/changefeed/info/test1", - "/tidb/cdc/job/test1", - "/tidb/cdc/task/position/6bbc01c8-0605-4f86-a0f9-b3119109b225/test1", - "/tidb/cdc/task/status/6bbc01c8-0605-4f86-a0f9-b3119109b225/test1", - "/tidb/cdc/task/workload/6bbc01c8-0605-4f86-a0f9-b3119109b225/test1", - "/tidb/cdc/capture/6bbc01c8-0605-4f86-a0f9-b3119109b225", + fmt.Sprintf("%s", etcd.DefaultClusterAndNamespacePrefix) + + "/changefeed/info/test1", + fmt.Sprintf("%s", etcd.DefaultClusterAndNamespacePrefix) + + "/job/test1", + fmt.Sprintf("%s", etcd.DefaultClusterAndNamespacePrefix) + + "/task/position/6bbc01c8-0605-4f86-a0f9-b3119109b225/test1", + fmt.Sprintf("%s", etcd.DefaultClusterAndMetaPrefix) + + "/capture/6bbc01c8-0605-4f86-a0f9-b3119109b225", }, updateValue: []string{ changefeedInfo, `{"resolved-ts":421980720003809281,"checkpoint-ts":421980719742451713,"admin-job-type":0}`, `{"checkpoint-ts":421980720003809281,"resolved-ts":421980720003809281,"count":0,"error":null}`, - `{"tables":{"45":{"start-ts":421980685886554116,"mark-table-id":0}},"operation":null,"admin-job-type":0}`, - `{"45":{"workload":1}}`, `{"id":"6bbc01c8-0605-4f86-a0f9-b3119109b225","address":"127.0.0.1:8300"}`, }, expected: ChangefeedReactorState{ @@ -135,43 +137,33 @@ func TestChangefeedStateUpdate(t *testing.T) { }, }, Status: &model.ChangeFeedStatus{CheckpointTs: 421980719742451713, ResolvedTs: 421980720003809281}, - TaskStatuses: map[model.CaptureID]*model.TaskStatus{ - "6bbc01c8-0605-4f86-a0f9-b3119109b225": { - Tables: map[int64]*model.TableReplicaInfo{45: {StartTs: 421980685886554116}}, - }, - }, TaskPositions: map[model.CaptureID]*model.TaskPosition{ "6bbc01c8-0605-4f86-a0f9-b3119109b225": {CheckPointTs: 421980720003809281, ResolvedTs: 421980720003809281}, }, - Workloads: map[model.CaptureID]model.TaskWorkload{ - "6bbc01c8-0605-4f86-a0f9-b3119109b225": {45: {Workload: 1}}, - }, }, }, { // test multiple capture changefeedID: "test1", updateKey: []string{ - "/tidb/cdc/changefeed/info/test1", - "/tidb/cdc/job/test1", - "/tidb/cdc/task/position/6bbc01c8-0605-4f86-a0f9-b3119109b225/test1", - "/tidb/cdc/task/status/6bbc01c8-0605-4f86-a0f9-b3119109b225/test1", - "/tidb/cdc/task/workload/6bbc01c8-0605-4f86-a0f9-b3119109b225/test1", - "/tidb/cdc/capture/6bbc01c8-0605-4f86-a0f9-b3119109b225", - "/tidb/cdc/task/position/666777888/test1", - "/tidb/cdc/task/status/666777888/test1", - "/tidb/cdc/task/workload/666777888/test1", - "/tidb/cdc/capture/666777888", + fmt.Sprintf("%s", etcd.DefaultClusterAndNamespacePrefix) + + "/changefeed/info/test1", + fmt.Sprintf("%s", etcd.DefaultClusterAndNamespacePrefix) + + "/job/test1", + fmt.Sprintf("%s", etcd.DefaultClusterAndNamespacePrefix) + + "/task/position/6bbc01c8-0605-4f86-a0f9-b3119109b225/test1", + fmt.Sprintf("%s", etcd.DefaultClusterAndMetaPrefix) + + "/capture/6bbc01c8-0605-4f86-a0f9-b3119109b225", + fmt.Sprintf("%s", etcd.DefaultClusterAndNamespacePrefix) + + "/task/position/666777888/test1", + fmt.Sprintf("%s", etcd.DefaultClusterAndMetaPrefix) + + "/capture/666777888", }, updateValue: []string{ changefeedInfo, `{"resolved-ts":421980720003809281,"checkpoint-ts":421980719742451713,"admin-job-type":0}`, `{"checkpoint-ts":421980720003809281,"resolved-ts":421980720003809281,"count":0,"error":null}`, - `{"tables":{"45":{"start-ts":421980685886554116,"mark-table-id":0}},"operation":null,"admin-job-type":0}`, - `{"45":{"workload":1}}`, `{"id":"6bbc01c8-0605-4f86-a0f9-b3119109b225","address":"127.0.0.1:8300"}`, `{"checkpoint-ts":11332244,"resolved-ts":312321,"count":8,"error":null}`, - `{"tables":{"46":{"start-ts":412341234,"mark-table-id":0}},"operation":null,"admin-job-type":0}`, - `{"46":{"workload":3}}`, `{"id":"666777888","address":"127.0.0.1:8300"}`, }, expected: ChangefeedReactorState{ @@ -195,51 +187,38 @@ func TestChangefeedStateUpdate(t *testing.T) { }, }, Status: &model.ChangeFeedStatus{CheckpointTs: 421980719742451713, ResolvedTs: 421980720003809281}, - TaskStatuses: map[model.CaptureID]*model.TaskStatus{ - "6bbc01c8-0605-4f86-a0f9-b3119109b225": { - Tables: map[int64]*model.TableReplicaInfo{45: {StartTs: 421980685886554116}}, - }, - "666777888": { - Tables: map[int64]*model.TableReplicaInfo{46: {StartTs: 412341234}}, - }, - }, TaskPositions: map[model.CaptureID]*model.TaskPosition{ "6bbc01c8-0605-4f86-a0f9-b3119109b225": {CheckPointTs: 421980720003809281, ResolvedTs: 421980720003809281}, "666777888": {CheckPointTs: 11332244, ResolvedTs: 312321, Count: 8}, }, - Workloads: map[model.CaptureID]model.TaskWorkload{ - "6bbc01c8-0605-4f86-a0f9-b3119109b225": {45: {Workload: 1}}, - "666777888": {46: {Workload: 3}}, - }, }, }, { // testing changefeedID not match changefeedID: "test1", updateKey: []string{ - "/tidb/cdc/changefeed/info/test1", - "/tidb/cdc/job/test1", - "/tidb/cdc/task/position/6bbc01c8-0605-4f86-a0f9-b3119109b225/test1", - "/tidb/cdc/task/status/6bbc01c8-0605-4f86-a0f9-b3119109b225/test1", - "/tidb/cdc/task/workload/6bbc01c8-0605-4f86-a0f9-b3119109b225/test1", - "/tidb/cdc/capture/6bbc01c8-0605-4f86-a0f9-b3119109b225", - "/tidb/cdc/changefeed/info/test-fake", - "/tidb/cdc/job/test-fake", - "/tidb/cdc/task/position/6bbc01c8-0605-4f86-a0f9-b3119109b225/test-fake", - "/tidb/cdc/task/status/6bbc01c8-0605-4f86-a0f9-b3119109b225/test-fake", - "/tidb/cdc/task/workload/6bbc01c8-0605-4f86-a0f9-b3119109b225/test-fake", + fmt.Sprintf("%s", etcd.DefaultClusterAndNamespacePrefix) + + "/changefeed/info/test1", + fmt.Sprintf("%s", etcd.DefaultClusterAndNamespacePrefix) + + "/job/test1", + fmt.Sprintf("%s", etcd.DefaultClusterAndNamespacePrefix) + + "/task/position/6bbc01c8-0605-4f86-a0f9-b3119109b225/test1", + fmt.Sprintf("%s", etcd.DefaultClusterAndMetaPrefix) + + "/capture/6bbc01c8-0605-4f86-a0f9-b3119109b225", + fmt.Sprintf("%s", etcd.DefaultClusterAndNamespacePrefix) + + "/changefeed/info/test-fake", + fmt.Sprintf("%s", etcd.DefaultClusterAndNamespacePrefix) + + "/job/test-fake", + fmt.Sprintf("%s", etcd.DefaultClusterAndNamespacePrefix) + + "/task/position/6bbc01c8-0605-4f86-a0f9-b3119109b225/test-fake", }, updateValue: []string{ changefeedInfo, `{"resolved-ts":421980720003809281,"checkpoint-ts":421980719742451713,"admin-job-type":0}`, `{"checkpoint-ts":421980720003809281,"resolved-ts":421980720003809281,"count":0,"error":null}`, - `{"tables":{"45":{"start-ts":421980685886554116,"mark-table-id":0}},"operation":null,"admin-job-type":0}`, - `{"45":{"workload":1}}`, `{"id":"6bbc01c8-0605-4f86-a0f9-b3119109b225","address":"127.0.0.1:8300"}`, `fake value`, `fake value`, `fake value`, - `fake value`, - `fake value`, }, expected: ChangefeedReactorState{ ID: model.DefaultChangeFeedID("test1"), @@ -262,52 +241,39 @@ func TestChangefeedStateUpdate(t *testing.T) { }, }, Status: &model.ChangeFeedStatus{CheckpointTs: 421980719742451713, ResolvedTs: 421980720003809281}, - TaskStatuses: map[model.CaptureID]*model.TaskStatus{ - "6bbc01c8-0605-4f86-a0f9-b3119109b225": { - Tables: map[int64]*model.TableReplicaInfo{45: {StartTs: 421980685886554116}}, - }, - }, TaskPositions: map[model.CaptureID]*model.TaskPosition{ "6bbc01c8-0605-4f86-a0f9-b3119109b225": {CheckPointTs: 421980720003809281, ResolvedTs: 421980720003809281}, }, - Workloads: map[model.CaptureID]model.TaskWorkload{ - "6bbc01c8-0605-4f86-a0f9-b3119109b225": {45: {Workload: 1}}, - }, }, }, { // testing value is nil changefeedID: "test1", updateKey: []string{ - "/tidb/cdc/changefeed/info/test1", - "/tidb/cdc/job/test1", - "/tidb/cdc/task/position/6bbc01c8-0605-4f86-a0f9-b3119109b225/test1", - "/tidb/cdc/task/status/6bbc01c8-0605-4f86-a0f9-b3119109b225/test1", - "/tidb/cdc/task/workload/6bbc01c8-0605-4f86-a0f9-b3119109b225/test1", - "/tidb/cdc/capture/6bbc01c8-0605-4f86-a0f9-b3119109b225", - "/tidb/cdc/task/position/666777888/test1", - "/tidb/cdc/task/status/666777888/test1", - "/tidb/cdc/task/workload/666777888/test1", - "/tidb/cdc/changefeed/info/test1", - "/tidb/cdc/job/test1", - "/tidb/cdc/task/position/6bbc01c8-0605-4f86-a0f9-b3119109b225/test1", - "/tidb/cdc/task/status/6bbc01c8-0605-4f86-a0f9-b3119109b225/test1", - "/tidb/cdc/task/workload/6bbc01c8-0605-4f86-a0f9-b3119109b225/test1", - "/tidb/cdc/capture/6bbc01c8-0605-4f86-a0f9-b3119109b225", - "/tidb/cdc/task/workload/666777888/test1", - "/tidb/cdc/task/status/666777888/test1", + fmt.Sprintf("%s", etcd.DefaultClusterAndNamespacePrefix) + + "/changefeed/info/test1", + fmt.Sprintf("%s", etcd.DefaultClusterAndNamespacePrefix) + + "/job/test1", + fmt.Sprintf("%s", etcd.DefaultClusterAndNamespacePrefix) + + "/task/position/6bbc01c8-0605-4f86-a0f9-b3119109b225/test1", + fmt.Sprintf("%s", etcd.DefaultClusterAndMetaPrefix) + + "/capture/6bbc01c8-0605-4f86-a0f9-b3119109b225", + fmt.Sprintf("%s", etcd.DefaultClusterAndNamespacePrefix) + + "/task/position/666777888/test1", + fmt.Sprintf("%s", etcd.DefaultClusterAndNamespacePrefix) + + "/changefeed/info/test1", + fmt.Sprintf("%s", etcd.DefaultClusterAndNamespacePrefix) + + "/job/test1", + fmt.Sprintf("%s", etcd.DefaultClusterAndNamespacePrefix) + + "/task/position/6bbc01c8-0605-4f86-a0f9-b3119109b225/test1", + fmt.Sprintf("%s", etcd.DefaultClusterAndMetaPrefix) + + "/capture/6bbc01c8-0605-4f86-a0f9-b3119109b225", }, updateValue: []string{ changefeedInfo, `{"resolved-ts":421980720003809281,"checkpoint-ts":421980719742451713,"admin-job-type":0}`, `{"checkpoint-ts":421980720003809281,"resolved-ts":421980720003809281,"count":0,"error":null}`, - `{"tables":{"45":{"start-ts":421980685886554116,"mark-table-id":0}},"operation":null,"admin-job-type":0}`, - `{"45":{"workload":1}}`, `{"id":"6bbc01c8-0605-4f86-a0f9-b3119109b225","address":"127.0.0.1:8300"}`, `{"checkpoint-ts":11332244,"resolved-ts":312321,"count":8,"error":null}`, - `{"tables":{"46":{"start-ts":412341234,"mark-table-id":0}},"operation":null,"admin-job-type":0}`, - `{"46":{"workload":3}}`, - ``, - ``, ``, ``, ``, @@ -316,39 +282,12 @@ func TestChangefeedStateUpdate(t *testing.T) { ``, }, expected: ChangefeedReactorState{ - ID: model.DefaultChangeFeedID("test1"), - Info: nil, - Status: nil, - TaskStatuses: map[model.CaptureID]*model.TaskStatus{}, + ID: model.DefaultChangeFeedID("test1"), + Info: nil, + Status: nil, TaskPositions: map[model.CaptureID]*model.TaskPosition{ "666777888": {CheckPointTs: 11332244, ResolvedTs: 312321, Count: 8}, }, - Workloads: map[model.CaptureID]model.TaskWorkload{}, - }, - }, - { // testing the same key case - changefeedID: "test1", - updateKey: []string{ - "/tidb/cdc/task/status/6bbc01c8-0605-4f86-a0f9-b3119109b225/test1", - "/tidb/cdc/task/status/6bbc01c8-0605-4f86-a0f9-b3119109b225/test1", - "/tidb/cdc/task/status/6bbc01c8-0605-4f86-a0f9-b3119109b225/test1", - "/tidb/cdc/task/status/6bbc01c8-0605-4f86-a0f9-b3119109b225/test1", - }, - updateValue: []string{ - `{"tables":{"45":{"start-ts":421980685886554116,"mark-table-id":0}},"operation":null,"admin-job-type":0}`, - `{"tables":{"46":{"start-ts":421980685886554116,"mark-table-id":0}},"operation":null,"admin-job-type":0}`, - ``, - `{"tables":{"47":{"start-ts":421980685886554116,"mark-table-id":0}},"operation":null,"admin-job-type":0}`, - }, - expected: ChangefeedReactorState{ - ID: model.DefaultChangeFeedID("test1"), - TaskStatuses: map[model.CaptureID]*model.TaskStatus{ - "6bbc01c8-0605-4f86-a0f9-b3119109b225": { - Tables: map[int64]*model.TableReplicaInfo{47: {StartTs: 421980685886554116}}, - }, - }, - TaskPositions: map[model.CaptureID]*model.TaskPosition{}, - Workloads: map[model.CaptureID]model.TaskWorkload{}, }, }, } @@ -504,20 +443,25 @@ func TestGlobalStateUpdate(t *testing.T) { }{ { // common case updateKey: []string{ - "/tidb/cdc/owner/22317526c4fc9a37", - "/tidb/cdc/owner/22317526c4fc9a38", - "/tidb/cdc/capture/6bbc01c8-0605-4f86-a0f9-b3119109b225", - "/tidb/cdc/task/position/6bbc01c8-0605-4f86-a0f9-b3119109b225/test1", - "/tidb/cdc/task/workload/6bbc01c8-0605-4f86-a0f9-b3119109b225/test2", - "/tidb/cdc/task/workload/55551111/test2", + fmt.Sprintf("%s", etcd.DefaultClusterAndMetaPrefix) + + "/owner/22317526c4fc9a37", + fmt.Sprintf("%s", etcd.DefaultClusterAndMetaPrefix) + + "/owner/22317526c4fc9a38", + fmt.Sprintf("%s", etcd.DefaultClusterAndMetaPrefix) + + "/capture/6bbc01c8-0605-4f86-a0f9-b3119109b225", + fmt.Sprintf("%s", etcd.DefaultClusterAndNamespacePrefix) + + "/task/position/6bbc01c8-0605-4f86-a0f9-b3119109b225/test1", + fmt.Sprintf("%s", etcd.DefaultClusterAndNamespacePrefix) + + "/task/position/6bbc01c8-0605-4f86-a0f9-b3119109b225/test2", }, updateValue: []string{ `6bbc01c8-0605-4f86-a0f9-b3119109b225`, `55551111`, `{"id":"6bbc01c8-0605-4f86-a0f9-b3119109b225","address":"127.0.0.1:8300"}`, - `{"resolved-ts":421980720003809281,"checkpoint-ts":421980719742451713,"admin-job-type":0}`, - `{"45":{"workload":1}}`, - `{"46":{"workload":1}}`, + `{"resolved-ts":421980720003809281,"checkpoint-ts":421980719742451713, +"admin-job-type":0}`, + `{"resolved-ts":421980720003809281,"checkpoint-ts":421980719742451713, +"admin-job-type":0}`, }, expected: GlobalReactorState{ Owner: map[string]struct{}{"22317526c4fc9a37": {}, "22317526c4fc9a38": {}}, @@ -527,20 +471,18 @@ func TestGlobalStateUpdate(t *testing.T) { }}, Changefeeds: map[model.ChangeFeedID]*ChangefeedReactorState{ model.DefaultChangeFeedID("test1"): { - ID: model.DefaultChangeFeedID("test1"), - TaskStatuses: map[string]*model.TaskStatus{}, + ID: model.DefaultChangeFeedID("test1"), TaskPositions: map[model.CaptureID]*model.TaskPosition{ "6bbc01c8-0605-4f86-a0f9-b3119109b225": {CheckPointTs: 421980719742451713, ResolvedTs: 421980720003809281}, }, - Workloads: map[string]model.TaskWorkload{}, }, model.DefaultChangeFeedID("test2"): { - ID: model.DefaultChangeFeedID("test2"), - TaskStatuses: map[string]*model.TaskStatus{}, - TaskPositions: map[model.CaptureID]*model.TaskPosition{}, - Workloads: map[model.CaptureID]model.TaskWorkload{ - "6bbc01c8-0605-4f86-a0f9-b3119109b225": {45: {Workload: 1}}, - "55551111": {46: {Workload: 1}}, + ID: model.DefaultChangeFeedID("test2"), + TaskPositions: map[model.CaptureID]*model.TaskPosition{ + "6bbc01c8-0605-4f86-a0f9-b3119109b225": { + CheckPointTs: 421980719742451713, + ResolvedTs: 421980720003809281, + }, }, }, }, @@ -548,25 +490,31 @@ func TestGlobalStateUpdate(t *testing.T) { }, { // testing remove changefeed updateKey: []string{ - "/tidb/cdc/owner/22317526c4fc9a37", - "/tidb/cdc/owner/22317526c4fc9a38", - "/tidb/cdc/capture/6bbc01c8-0605-4f86-a0f9-b3119109b225", - "/tidb/cdc/task/position/6bbc01c8-0605-4f86-a0f9-b3119109b225/test1", - "/tidb/cdc/task/workload/6bbc01c8-0605-4f86-a0f9-b3119109b225/test2", - "/tidb/cdc/task/workload/55551111/test2", - "/tidb/cdc/owner/22317526c4fc9a37", - "/tidb/cdc/task/position/6bbc01c8-0605-4f86-a0f9-b3119109b225/test1", - "/tidb/cdc/task/workload/6bbc01c8-0605-4f86-a0f9-b3119109b225/test2", - "/tidb/cdc/capture/6bbc01c8-0605-4f86-a0f9-b3119109b225", + fmt.Sprintf("%s", etcd.DefaultClusterAndMetaPrefix) + + "/owner/22317526c4fc9a37", + fmt.Sprintf("%s", etcd.DefaultClusterAndMetaPrefix) + + "/owner/22317526c4fc9a38", + fmt.Sprintf("%s", etcd.DefaultClusterAndMetaPrefix) + + "/capture/6bbc01c8-0605-4f86-a0f9-b3119109b225", + fmt.Sprintf("%s", etcd.DefaultClusterAndNamespacePrefix) + + "/task/position/6bbc01c8-0605-4f86-a0f9-b3119109b225/test1", + fmt.Sprintf("%s", etcd.DefaultClusterAndNamespacePrefix) + + "/task/position/6bbc01c8-0605-4f86-a0f9-b3119109b225/test2", + fmt.Sprintf("%s", etcd.DefaultClusterAndMetaPrefix) + + "/owner/22317526c4fc9a37", + fmt.Sprintf("%s", etcd.DefaultClusterAndNamespacePrefix) + + "/task/position/6bbc01c8-0605-4f86-a0f9-b3119109b225/test1", + fmt.Sprintf("%s", etcd.DefaultClusterAndMetaPrefix) + + "/capture/6bbc01c8-0605-4f86-a0f9-b3119109b225", }, updateValue: []string{ `6bbc01c8-0605-4f86-a0f9-b3119109b225`, `55551111`, `{"id":"6bbc01c8-0605-4f86-a0f9-b3119109b225","address":"127.0.0.1:8300"}`, - `{"resolved-ts":421980720003809281,"checkpoint-ts":421980719742451713,"admin-job-type":0}`, - `{"45":{"workload":1}}`, - `{"46":{"workload":1}}`, - ``, + `{"resolved-ts":421980720003809281,"checkpoint-ts":421980719742451713, +"admin-job-type":0}`, + `{"resolved-ts":421980720003809281,"checkpoint-ts":421980719742451713, +"admin-job-type":0}`, ``, ``, ``, @@ -576,11 +524,12 @@ func TestGlobalStateUpdate(t *testing.T) { Captures: map[model.CaptureID]*model.CaptureInfo{}, Changefeeds: map[model.ChangeFeedID]*ChangefeedReactorState{ model.DefaultChangeFeedID("test2"): { - ID: model.DefaultChangeFeedID("test2"), - TaskStatuses: map[string]*model.TaskStatus{}, - TaskPositions: map[model.CaptureID]*model.TaskPosition{}, - Workloads: map[model.CaptureID]model.TaskWorkload{ - "55551111": {46: {Workload: 1}}, + ID: model.DefaultChangeFeedID("test2"), + TaskPositions: map[model.CaptureID]*model.TaskPosition{ + "6bbc01c8-0605-4f86-a0f9-b3119109b225": { + CheckPointTs: 421980719742451713, + ResolvedTs: 421980720003809281, + }, }, }, }, @@ -623,11 +572,13 @@ func TestCaptureChangeHooks(t *testing.T) { captureInfoBytes, err := json.Marshal(captureInfo) require.Nil(t, err) - err = state.Update(util.NewEtcdKey(etcd.CaptureInfoKeyPrefix+"/capture-1"), captureInfoBytes, false) + err = state.Update(util.NewEtcdKey(etcd.CaptureInfoKeyPrefix()+"/capture-1"), + captureInfoBytes, false) require.Nil(t, err) require.Equal(t, callCount, 1) - err = state.Update(util.NewEtcdKey(etcd.CaptureInfoKeyPrefix+"/capture-1"), nil /* delete */, false) + err = state.Update(util.NewEtcdKey(etcd.CaptureInfoKeyPrefix()+"/capture-1"), + nil /* delete */, false) require.Nil(t, err) require.Equal(t, callCount, 2) } diff --git a/tests/integration_tests/capture_session_done_during_task/run.sh b/tests/integration_tests/capture_session_done_during_task/run.sh index 8c2c0eeab56..c4fd0284178 100644 --- a/tests/integration_tests/capture_session_done_during_task/run.sh +++ b/tests/integration_tests/capture_session_done_during_task/run.sh @@ -38,7 +38,7 @@ function run() { run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" fi - capture_key=$(ETCDCTL_API=3 etcdctl get /tidb/cdc/capture --prefix | head -n 1) + capture_key=$(ETCDCTL_API=3 etcdctl get /tidb/cdc/default/__cdc_meta__/capture --prefix | head -n 1) lease=$(ETCDCTL_API=3 etcdctl get $capture_key -w json | grep -o 'lease":[0-9]*' | awk -F: '{print $2}') lease_hex=$(printf '%x\n' $lease) # revoke lease of etcd capture key to simulate etcd session done diff --git a/tests/integration_tests/capture_suicide_while_balance_table/run.sh b/tests/integration_tests/capture_suicide_while_balance_table/run.sh index 541bfe25f46..039fcddcf21 100644 --- a/tests/integration_tests/capture_suicide_while_balance_table/run.sh +++ b/tests/integration_tests/capture_suicide_while_balance_table/run.sh @@ -65,7 +65,7 @@ function run() { sleep 2 # revoke lease of etcd capture key to simulate etcd session done - lease=$(ETCDCTL_API=3 etcdctl get /tidb/cdc/capture/${capture2_id} -w json | grep -o 'lease":[0-9]*' | awk -F: '{print $2}') + lease=$(ETCDCTL_API=3 etcdctl get /tidb/cdc/default/__cdc_meta__/capture/${capture2_id} -w json | grep -o 'lease":[0-9]*' | awk -F: '{print $2}') lease_hex=$(printf '%x\n' $lease) ETCDCTL_API=3 etcdctl lease revoke $lease_hex diff --git a/tests/utils/cdc_state_checker/cdc_monitor.go b/tests/utils/cdc_state_checker/cdc_monitor.go index 0c3480c9498..775ac59d007 100644 --- a/tests/utils/cdc_state_checker/cdc_monitor.go +++ b/tests/utils/cdc_state_checker/cdc_monitor.go @@ -74,7 +74,7 @@ func newCDCMonitor(ctx context.Context, pd string, credential *security.Credenti wrappedCli := etcd.Wrap(etcdCli, map[string]prometheus.Counter{}) reactor := &cdcMonitReactor{} initState := newCDCReactorState() - etcdWorker, err := orchestrator.NewEtcdWorker(wrappedCli, etcd.EtcdKeyBase, reactor, initState) + etcdWorker, err := orchestrator.NewEtcdWorker(wrappedCli, etcd.BaseKey(), reactor, initState) if err != nil { return nil, errors.Trace(err) } diff --git a/tests/utils/cdc_state_checker/state.go b/tests/utils/cdc_state_checker/state.go index 004d3cb7aa9..cff39664e18 100644 --- a/tests/utils/cdc_state_checker/state.go +++ b/tests/utils/cdc_state_checker/state.go @@ -35,10 +35,11 @@ type cdcReactorState struct { } var ( - captureRegex = regexp.MustCompile(regexp.QuoteMeta(etcd.CaptureInfoKeyPrefix) + "/(.+)") - changefeedRegex = regexp.MustCompile(regexp.QuoteMeta(etcd.JobKeyPrefix) + "/(.+)") - positionRegex = regexp.MustCompile(regexp.QuoteMeta(etcd.TaskPositionKeyPrefix) + "/(.+?)/(.+)") - statusRegex = regexp.MustCompile(regexp.QuoteMeta(etcd.TaskStatusKeyPrefix) + "/(.+?)/(.+)") + captureRegex = regexp.MustCompile(regexp.QuoteMeta(etcd.CaptureInfoKeyPrefix()) + "/(.+)") + changefeedRegex = regexp.MustCompile(regexp. + QuoteMeta(etcd.JobKeyPrefix(model.DefaultNamespace)) + "/(.+)") + positionRegex = regexp.MustCompile(regexp. + QuoteMeta(etcd.TaskPositionKeyPrefix(model.DefaultNamespace)) + "/(.+?)/(.+)") ) func newCDCReactorState() *cdcReactorState { @@ -51,7 +52,7 @@ func newCDCReactorState() *cdcReactorState { } func (s *cdcReactorState) Update(key util.EtcdKey, value []byte, isInit bool) error { - if key.String() == etcd.CaptureOwnerKey { + if key.String() == etcd.CaptureOwnerKey() { if value == nil { log.Info("Owner lost", zap.String("oldOwner", s.Owner)) return nil @@ -182,55 +183,6 @@ func (s *cdcReactorState) Update(key util.EtcdKey, value []byte, isInit bool) er return nil } - if matches := statusRegex.FindSubmatch(key.Bytes()); matches != nil { - captureID := string(matches[1]) - changefeedID := model.DefaultChangeFeedID(string(matches[2])) - - if value == nil { - log.Info("Status deleted", - zap.String("captureID", captureID), - zap.String("namespace", changefeedID.Namespace), - zap.String("changefeed", changefeedID.ID), - zap.Reflect("oldStatus", s.TaskStatuses[changefeedID][captureID])) - - delete(s.TaskStatuses[changefeedID], captureID) - if len(s.TaskStatuses[changefeedID]) == 0 { - delete(s.TaskStatuses, changefeedID) - } - - return nil - } - - var newTaskStatus model.TaskStatus - err := json.Unmarshal(value, &newTaskStatus) - if err != nil { - return errors.Trace(err) - } - - if _, ok := s.TaskStatuses[changefeedID]; !ok { - s.TaskStatuses[changefeedID] = make(map[model.CaptureID]*model.TaskStatus) - } - - if status, ok := s.TaskStatuses[changefeedID][captureID]; ok { - log.Info("Status updated", - zap.String("captureID", captureID), - zap.String("namespace", changefeedID.Namespace), - zap.String("changefeed", changefeedID.ID), - zap.Reflect("oldStatus", status), - zap.Reflect("newStatus", newTaskStatus)) - } else { - log.Info("Status updated", - zap.String("captureID", captureID), - zap.String("namespace", changefeedID.Namespace), - zap.String("changefeed", changefeedID.ID), - zap.Reflect("newStatus", newTaskStatus)) - } - - s.TaskStatuses[changefeedID][captureID] = &newTaskStatus - - return nil - } - log.Debug("Etcd operation ignored", zap.String("key", key.String()), zap.ByteString("value", value)) return nil }