Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DNM *(ticdc): cli support cdc cluster #5412

Merged
merged 46 commits into from
May 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
383fdf7
etcd key add cluster id and namespace
sdojjy May 11, 2022
3ec7f1d
Merge branch 'master' into modify-etcd-key-base
sdojjy May 11, 2022
b996be8
fix ut
sdojjy May 11, 2022
d299306
fix ut
sdojjy May 11, 2022
477cdf0
fix ut
sdojjy May 11, 2022
7649b48
fix lint
sdojjy May 11, 2022
37bc91b
fix ut
sdojjy May 11, 2022
08e6c30
fix integration_test
sdojjy May 11, 2022
7208645
fix integration test
sdojjy May 12, 2022
4f4efc9
remove taskStatus key
sdojjy May 12, 2022
bf4279c
fix lint
sdojjy May 12, 2022
1c1d01f
Merge branch 'master' into modify-etcd-key-base
sdojjy May 12, 2022
a31920d
cli support cdc cluster id
sdojjy May 12, 2022
67b27f1
cli support cdc cluster id
sdojjy May 12, 2022
00e1c56
Merge branch 'decupple-tidb' into modify-etcd-key-base
sdojjy May 12, 2022
bfa9d75
Merge branch 'modify-etcd-key-base' into cli-support-cdc-cluster-id
sdojjy May 12, 2022
23f8396
Merge branch 'master' into modify-etcd-key-base
sdojjy May 13, 2022
687abbb
Merge branch 'master' into cli-support-cdc-cluster-id
sdojjy May 13, 2022
71e7ece
fix ut
sdojjy May 13, 2022
e83ade5
Merge remote-tracking branch 'origin/modify-etcd-key-base' into modif…
sdojjy May 13, 2022
1485b74
Merge branch 'modify-etcd-key-base' into cli-support-cdc-cluster-id
sdojjy May 13, 2022
fd3be43
fix ut
sdojjy May 13, 2022
ce7c397
fix ut
sdojjy May 13, 2022
b26384e
fix ut
sdojjy May 13, 2022
db6f24d
Merge branch 'modify-etcd-key-base' into cli-support-cdc-cluster-id
sdojjy May 13, 2022
2a83553
Merge branch 'master' into cli-support-cdc-cluster-id
sdojjy May 13, 2022
82a433d
Merge remote-tracking branch 'upstream/master' into modify-etcd-key-base
sdojjy May 13, 2022
8fd38fa
Merge branch 'modify-etcd-key-base' into cli-support-cdc-cluster-id
sdojjy May 13, 2022
e91250c
Merge remote-tracking branch 'origin/cli-support-cdc-cluster-id' into…
sdojjy May 13, 2022
4a88493
fix ut
sdojjy May 13, 2022
5dc16f1
fix integration test
sdojjy May 13, 2022
653e0ac
fix ut
sdojjy May 13, 2022
fe3a426
fix ut
sdojjy May 13, 2022
fef6dd1
fix lint
sdojjy May 13, 2022
cf6251c
Merge branch 'master' into modify-etcd-key-base
sdojjy May 14, 2022
5bc40b3
Merge branch 'master' into cli-support-cdc-cluster-id
sdojjy May 14, 2022
6d334b8
fix ut
sdojjy May 17, 2022
519df53
Merge branch 'master' into modify-etcd-key-base
sdojjy May 17, 2022
bc3738c
Merge branch 'modify-etcd-key-base' into cli-support-cdc-cluster-id
sdojjy May 17, 2022
780e02e
Merge branch 'master' into cli-support-cdc-cluster-id
sdojjy May 17, 2022
b3c036b
Merge branch 'master' into cli-support-cdc-cluster-id
sdojjy May 18, 2022
f9a9611
Merge branch 'master' into cli-support-cdc-cluster-id
sdojjy May 18, 2022
53256a2
Merge remote-tracking branch 'upstream/master' into cli-support-cdc-c…
sdojjy May 18, 2022
c7332b8
Merge branch 'master' into cli-support-cdc-cluster-id
sdojjy May 18, 2022
a87257c
Merge branch 'master' into cli-support-cdc-cluster-id
sdojjy May 19, 2022
f80c47a
Merge branch 'decupple-tidb' into cli-support-cdc-cluster-id
sdojjy May 19, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cdc/api/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ func RegisterStatusAPIRoutes(router *gin.Engine, capture *capture.Capture) {
}

func (h *statusAPI) writeEtcdInfo(ctx context.Context, cli *etcd.CDCEtcdClient, w io.Writer) {
resp, err := cli.Client.Get(ctx, etcd.BaseKey(), clientv3.WithPrefix())
resp, err := cli.Client.Get(ctx,
etcd.BaseKey(cli.ClusterID), clientv3.WithPrefix())
if err != nil {
fmt.Fprintf(w, "failed to get info: %s\n\n", err.Error())
return
Expand Down
15 changes: 9 additions & 6 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (c *Capture) reset(ctx context.Context) error {
_ = c.session.Close()
}
c.session = sess
c.election = concurrency.NewElection(sess, etcd.CaptureOwnerKey())
c.election = concurrency.NewElection(sess, etcd.CaptureOwnerKey(c.EtcdClient.ClusterID))

if c.tableActorSystem != nil {
c.tableActorSystem.Stop()
Expand Down Expand Up @@ -279,7 +279,7 @@ func (c *Capture) run(stdCtx context.Context) error {
conf := config.GetGlobalServerConfig()
processorFlushInterval := time.Duration(conf.ProcessorFlushInterval)

globalState := orchestrator.NewGlobalState()
globalState := orchestrator.NewGlobalState(c.EtcdClient.ClusterID)

globalState.SetOnCaptureAdded(func(captureID model.CaptureID, addr string) {
c.MessageRouter.AddPeer(captureID, addr)
Expand Down Expand Up @@ -380,7 +380,7 @@ func (c *Capture) campaignOwner(ctx cdcContext.Context) error {
owner := c.newOwner(c.UpstreamManager)
c.setOwner(owner)

globalState := orchestrator.NewGlobalState()
globalState := orchestrator.NewGlobalState(c.EtcdClient.ClusterID)

globalState.SetOnCaptureAdded(func(captureID model.CaptureID, addr string) {
c.MessageRouter.AddPeer(captureID, addr)
Expand All @@ -389,7 +389,9 @@ func (c *Capture) campaignOwner(ctx cdcContext.Context) error {
c.MessageRouter.RemovePeer(captureID)
})

err = c.runEtcdWorker(ownerCtx, owner, orchestrator.NewGlobalState(), ownerFlushInterval, util.RoleOwner.String())
err = c.runEtcdWorker(ownerCtx, owner,
orchestrator.NewGlobalState(c.EtcdClient.ClusterID),
ownerFlushInterval, util.RoleOwner.String())
c.setOwner(nil)
log.Info("run owner exited", zap.Error(err))
// if owner exits, resign the owner key
Expand All @@ -413,7 +415,7 @@ func (c *Capture) runEtcdWorker(
role string,
) error {
etcdWorker, err := orchestrator.NewEtcdWorker(ctx.GlobalVars().EtcdClient.Client,
etcd.BaseKey(), reactor, reactorState)
etcd.BaseKey(c.EtcdClient.ClusterID), reactor, reactorState)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -578,7 +580,8 @@ func (c *Capture) GetOwnerCaptureInfo(ctx context.Context) (*model.CaptureInfo,
return nil, err
}

ownerID, err := c.EtcdClient.GetOwnerID(ctx, etcd.CaptureOwnerKey())
ownerID, err := c.EtcdClient.GetOwnerID(ctx,
etcd.CaptureOwnerKey(c.EtcdClient.ClusterID))
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/capture/capture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestReset(t *testing.T) {
DialTimeout: 3 * time.Second,
})
require.NoError(t, err)
client := etcd.NewCDCEtcdClient(ctx, etcdCli)
client := etcd.NewCDCEtcdClient(ctx, etcdCli, etcd.DefaultCDCClusterID)
// Close the client before the test function exits to prevent possible
// ctx leaks.
// Ref: https://github.com/grpc/grpc-go/blob/master/stream.go#L229
Expand Down
3 changes: 2 additions & 1 deletion cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ func createChangefeed4Test(ctx cdcContext.Context, t *testing.T) (
return &mockScheduler{}, nil
}
cf.upStream = upStream
state := orchestrator.NewChangefeedReactorState(ctx.ChangefeedVars().ID)
state := orchestrator.NewChangefeedReactorState(etcd.DefaultCDCClusterID,
ctx.ChangefeedVars().ID)
tester := orchestrator.NewReactorStateTester(t, state, nil)
state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
require.Nil(t, info)
Expand Down
18 changes: 12 additions & 6 deletions cdc/owner/feed_state_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ import (
func TestHandleJob(t *testing.T) {
ctx := cdcContext.NewBackendContext4Test(true)
manager := newFeedStateManager4Test()
state := orchestrator.NewChangefeedReactorState(ctx.ChangefeedVars().ID)
state := orchestrator.NewChangefeedReactorState(etcd.DefaultCDCClusterID,
ctx.ChangefeedVars().ID)
tester := orchestrator.NewReactorStateTester(t, state, nil)
state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
require.Nil(t, info)
Expand Down Expand Up @@ -105,7 +106,8 @@ func TestHandleJob(t *testing.T) {
func TestMarkFinished(t *testing.T) {
ctx := cdcContext.NewBackendContext4Test(true)
manager := newFeedStateManager4Test()
state := orchestrator.NewChangefeedReactorState(ctx.ChangefeedVars().ID)
state := orchestrator.NewChangefeedReactorState(etcd.DefaultCDCClusterID,
ctx.ChangefeedVars().ID)
tester := orchestrator.NewReactorStateTester(t, state, nil)
state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
require.Nil(t, info)
Expand Down Expand Up @@ -133,7 +135,8 @@ func TestMarkFinished(t *testing.T) {
func TestCleanUpInfos(t *testing.T) {
ctx := cdcContext.NewBackendContext4Test(true)
manager := newFeedStateManager4Test()
state := orchestrator.NewChangefeedReactorState(ctx.ChangefeedVars().ID)
state := orchestrator.NewChangefeedReactorState(etcd.DefaultCDCClusterID,
ctx.ChangefeedVars().ID)
tester := orchestrator.NewReactorStateTester(t, state, nil)
state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
require.Nil(t, info)
Expand Down Expand Up @@ -165,7 +168,8 @@ func TestCleanUpInfos(t *testing.T) {
func TestHandleError(t *testing.T) {
ctx := cdcContext.NewBackendContext4Test(true)
manager := newFeedStateManager4Test()
state := orchestrator.NewChangefeedReactorState(ctx.ChangefeedVars().ID)
state := orchestrator.NewChangefeedReactorState(etcd.DefaultCDCClusterID,
ctx.ChangefeedVars().ID)
tester := orchestrator.NewReactorStateTester(t, state, nil)
state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
require.Nil(t, info)
Expand Down Expand Up @@ -228,7 +232,8 @@ func TestHandleError(t *testing.T) {
func TestHandleFastFailError(t *testing.T) {
ctx := cdcContext.NewBackendContext4Test(true)
manager := new(feedStateManager)
state := orchestrator.NewChangefeedReactorState(ctx.ChangefeedVars().ID)
state := orchestrator.NewChangefeedReactorState(etcd.DefaultCDCClusterID,
ctx.ChangefeedVars().ID)
tester := orchestrator.NewReactorStateTester(t, state, nil)
state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
require.Nil(t, info)
Expand Down Expand Up @@ -308,7 +313,8 @@ func TestChangefeedStatusNotExist(t *testing.T) {
`
ctx := cdcContext.NewBackendContext4Test(true)
manager := newFeedStateManager4Test()
state := orchestrator.NewChangefeedReactorState(ctx.ChangefeedVars().ID)
state := orchestrator.NewChangefeedReactorState(etcd.DefaultCDCClusterID,
ctx.ChangefeedVars().ID)
tester := orchestrator.NewReactorStateTester(t, state, map[string]string{
fmt.Sprintf("%s/capture/d563bfc0-f406-4f34-bc7d-6dc2e35a44e5",
etcd.DefaultClusterAndMetaPrefix): `
Expand Down
15 changes: 12 additions & 3 deletions cdc/owner/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,12 @@ func createOwner4Test(ctx cdcContext.Context, t *testing.T) (*ownerImpl, *orches
o := owner.(*ownerImpl)
o.upstreamManager = upstream.NewManager4Test(pdClient)

state := orchestrator.NewGlobalState()
state := orchestrator.NewGlobalState(etcd.DefaultCDCClusterID)
tester := orchestrator.NewReactorStateTester(t, state, nil)

// set captures
cdcKey := etcd.CDCKey{
ClusterID: state.ClusterID,
Tp: etcd.CDCKeyTypeCapture,
CaptureID: ctx.GlobalVars().CaptureInfo.ID,
}
Expand All @@ -91,6 +92,7 @@ func TestCreateRemoveChangefeed(t *testing.T) {
changefeedStr, err := changefeedInfo.Marshal()
require.Nil(t, err)
cdcKey := etcd.CDCKey{
ClusterID: state.ClusterID,
Tp: etcd.CDCKeyTypeChangefeedInfo,
ChangefeedID: changefeedID,
}
Expand Down Expand Up @@ -160,6 +162,7 @@ func TestStopChangefeed(t *testing.T) {
changefeedStr, err := changefeedInfo.Marshal()
require.Nil(t, err)
cdcKey := etcd.CDCKey{
ClusterID: state.ClusterID,
Tp: etcd.CDCKeyTypeChangefeedInfo,
ChangefeedID: changefeedID,
}
Expand Down Expand Up @@ -209,6 +212,7 @@ func TestFixChangefeedState(t *testing.T) {
changefeedStr, err := changefeedInfo.Marshal()
require.Nil(t, err)
cdcKey := etcd.CDCKey{
ClusterID: state.ClusterID,
Tp: etcd.CDCKeyTypeChangefeedInfo,
ChangefeedID: changefeedID,
}
Expand Down Expand Up @@ -248,6 +252,7 @@ func TestFixChangefeedSinkProtocol(t *testing.T) {
changefeedStr, err := changefeedInfo.Marshal()
require.Nil(t, err)
cdcKey := etcd.CDCKey{
ClusterID: state.ClusterID,
Tp: etcd.CDCKeyTypeChangefeedInfo,
ChangefeedID: changefeedID,
}
Expand Down Expand Up @@ -288,6 +293,7 @@ func TestCheckClusterVersion(t *testing.T) {
changefeedStr, err := changefeedInfo.Marshal()
require.Nil(t, err)
cdcKey := etcd.CDCKey{
ClusterID: state.ClusterID,
Tp: etcd.CDCKeyTypeChangefeedInfo,
ChangefeedID: changefeedID,
}
Expand Down Expand Up @@ -369,7 +375,7 @@ func TestUpdateGCSafePoint(t *testing.T) {
ctx := cdcContext.NewBackendContext4Test(true)
ctx, cancel := cdcContext.WithCancel(ctx)
defer cancel()
state := orchestrator.NewGlobalState()
state := orchestrator.NewGlobalState(etcd.DefaultCDCClusterID)
tester := orchestrator.NewReactorStateTester(t, state, nil)

// no changefeed, the gc safe point should be max uint64
Expand Down Expand Up @@ -488,6 +494,7 @@ func TestHandleJobsDontBlock(t *testing.T) {
changefeedStr, err := cfInfo1.Marshal()
require.Nil(t, err)
cdcKey := etcd.CDCKey{
ClusterID: state.ClusterID,
Tp: etcd.CDCKeyTypeChangefeedInfo,
ChangefeedID: cf1,
}
Expand All @@ -505,6 +512,7 @@ func TestHandleJobsDontBlock(t *testing.T) {
Version: " v0.0.1-test-only",
}
cdcKey = etcd.CDCKey{
ClusterID: state.ClusterID,
Tp: etcd.CDCKeyTypeCapture,
CaptureID: captureInfo.ID,
}
Expand All @@ -522,6 +530,7 @@ func TestHandleJobsDontBlock(t *testing.T) {
changefeedStr1, err := cfInfo2.Marshal()
require.Nil(t, err)
cdcKey = etcd.CDCKey{
ClusterID: state.ClusterID,
Tp: etcd.CDCKeyTypeChangefeedInfo,
ChangefeedID: cf2,
}
Expand Down Expand Up @@ -565,7 +574,7 @@ WorkLoop:
}

func TestCalculateGCSafepointTs(t *testing.T) {
state := orchestrator.NewGlobalState()
state := orchestrator.NewGlobalState(etcd.DefaultCDCClusterID)
expectMinTsMap := make(map[uint64]uint64)
expectForceUpdateMap := make(map[uint64]interface{})
o := ownerImpl{changefeeds: make(map[model.ChangeFeedID]*changefeed)}
Expand Down
11 changes: 7 additions & 4 deletions cdc/processor/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (s *managerTester) resetSuit(ctx cdcContext.Context, t *testing.T) {
checkpointTs: replicaInfo.StartTs,
}, nil
})
s.state = orchestrator.NewGlobalState()
s.state = orchestrator.NewGlobalState(etcd.DefaultCDCClusterID)
captureInfoBytes, err := ctx.GlobalVars().CaptureInfo.Marshal()
require.Nil(t, err)
s.tester = orchestrator.NewReactorStateTester(t, s.state, map[string]string{
Expand All @@ -83,7 +83,8 @@ func TestChangefeed(t *testing.T) {

changefeedID := model.DefaultChangeFeedID("test-changefeed")
// an inactive changefeed
s.state.Changefeeds[changefeedID] = orchestrator.NewChangefeedReactorState(changefeedID)
s.state.Changefeeds[changefeedID] = orchestrator.NewChangefeedReactorState(
etcd.DefaultCDCClusterID, changefeedID)
_, err = s.manager.Tick(ctx, s.state)
s.tester.MustApplyPatches()
require.Nil(t, err)
Expand Down Expand Up @@ -135,7 +136,8 @@ func TestDebugInfo(t *testing.T) {

changefeedID := model.DefaultChangeFeedID("test-changefeed")
// an active changefeed
s.state.Changefeeds[changefeedID] = orchestrator.NewChangefeedReactorState(changefeedID)
s.state.Changefeeds[changefeedID] = orchestrator.NewChangefeedReactorState(
etcd.DefaultCDCClusterID, changefeedID)
s.state.Changefeeds[changefeedID].PatchInfo(
func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
return &model.ChangeFeedInfo{
Expand Down Expand Up @@ -189,7 +191,8 @@ func TestClose(t *testing.T) {

changefeedID := model.DefaultChangeFeedID("test-changefeed")
// an active changefeed
s.state.Changefeeds[changefeedID] = orchestrator.NewChangefeedReactorState(changefeedID)
s.state.Changefeeds[changefeedID] = orchestrator.NewChangefeedReactorState(
etcd.DefaultCDCClusterID, changefeedID)
s.state.Changefeeds[changefeedID].PatchInfo(
func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
return &model.ChangeFeedInfo{
Expand Down
4 changes: 3 additions & 1 deletion cdc/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ func initProcessor4Test(ctx cdcContext.Context, t *testing.T) (*processor, *orch
checkpointTs: replicaInfo.StartTs,
}, nil
})
p.changefeed = orchestrator.NewChangefeedReactorState(ctx.ChangefeedVars().ID)
p.changefeed = orchestrator.NewChangefeedReactorState(
etcd.DefaultCDCClusterID, ctx.ChangefeedVars().ID)
captureID := ctx.GlobalVars().CaptureInfo.ID
changefeedID := ctx.ChangefeedVars().ID
return p, orchestrator.NewReactorStateTester(t, p.changefeed, map[string]string{
Expand Down Expand Up @@ -578,6 +579,7 @@ func TestSchemaGC(t *testing.T) {

func updateChangeFeedPosition(t *testing.T, tester *orchestrator.ReactorStateTester, cfID model.ChangeFeedID, resolvedTs, checkpointTs model.Ts) {
key := etcd.CDCKey{
ClusterID: etcd.DefaultCDCClusterID,
Tp: etcd.CDCKeyTypeChangeFeedStatus,
ChangefeedID: cfID,
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/scheduler/internal/base/processor_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func NewAgent(
etcdCliCtx, cancel := context.WithTimeout(ctx, getOwnerFromEtcdTimeout)
defer cancel()
ownerCaptureID, err := ret.etcdClient.
GetOwnerID(etcdCliCtx, etcd.CaptureOwnerKey())
GetOwnerID(etcdCliCtx, etcd.CaptureOwnerKey(etcd.DefaultCDCClusterID))
if err != nil {
if err != concurrency.ErrElectionNoLeader {
return nil, errors.Trace(err)
Expand Down
17 changes: 9 additions & 8 deletions cdc/scheduler/internal/base/processor_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func newAgentTestSuite(t *testing.T) *agentTestSuite {
}

func (s *agentTestSuite) CreateAgent(t *testing.T) (*agentImpl, error) {
cdcEtcdClient := etcd.NewCDCEtcdClient(s.ctx, s.etcdClient)
cdcEtcdClient := etcd.NewCDCEtcdClient(s.ctx, s.etcdClient, etcd.DefaultCDCClusterID)
messageServer := s.cluster.Nodes["capture-1"].Server
messageRouter := s.cluster.Nodes["capture-1"].Router
s.tableExecutor = NewMockTableExecutor(t)
Expand Down Expand Up @@ -219,11 +219,11 @@ func TestAgentBasics(t *testing.T) {
defer suite.Close()

suite.etcdKVClient.On("Get", mock.Anything,
etcd.CaptureOwnerKey(), mock.Anything).
etcd.CaptureOwnerKey(etcd.DefaultCDCClusterID), mock.Anything).
Return(&clientv3.GetResponse{
Kvs: []*mvccpb.KeyValue{
{
Key: []byte(etcd.CaptureOwnerKey()),
Key: []byte(etcd.CaptureOwnerKey(etcd.DefaultCDCClusterID)),
Value: []byte(ownerCaptureID),
ModRevision: 1,
},
Expand Down Expand Up @@ -337,7 +337,7 @@ func TestAgentNoOwnerAtStartUp(t *testing.T) {

// Empty response implies no owner.
suite.etcdKVClient.On("Get", mock.Anything,
etcd.CaptureOwnerKey(), mock.Anything).
etcd.CaptureOwnerKey(etcd.DefaultCDCClusterID), mock.Anything).
Return(&clientv3.GetResponse{}, nil)

// Test Point 1: Create an agent.
Expand Down Expand Up @@ -393,11 +393,12 @@ func TestAgentTolerateClientClosed(t *testing.T) {
defer suite.Close()

suite.etcdKVClient.On("Get", mock.Anything,
etcd.CaptureOwnerKey(), mock.Anything).
etcd.CaptureOwnerKey(etcd.DefaultCDCClusterID), mock.Anything).
Return(&clientv3.GetResponse{
Kvs: []*mvccpb.KeyValue{
{
Key: []byte(etcd.CaptureOwnerKey()),
Key: []byte(etcd.CaptureOwnerKey(
etcd.DefaultCDCClusterID)),
Value: []byte(ownerCaptureID),
ModRevision: 1,
},
Expand Down Expand Up @@ -438,11 +439,11 @@ func TestNoFinishOperationBeforeSyncIsReceived(t *testing.T) {
defer suite.Close()

suite.etcdKVClient.On("Get", mock.Anything,
etcd.CaptureOwnerKey(), mock.Anything).
etcd.CaptureOwnerKey(etcd.DefaultCDCClusterID), mock.Anything).
Return(&clientv3.GetResponse{
Kvs: []*mvccpb.KeyValue{
{
Key: []byte(etcd.CaptureOwnerKey()),
Key: []byte(etcd.CaptureOwnerKey(etcd.DefaultCDCClusterID)),
Value: []byte(ownerCaptureID),
ModRevision: 1,
},
Expand Down
2 changes: 1 addition & 1 deletion cdc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (s *Server) Run(ctx context.Context) error {
return errors.Annotate(cerror.WrapError(cerror.ErrNewCaptureFailed, err), "new etcd client")
}

cdcEtcdClient := etcd.NewCDCEtcdClient(ctx, etcdCli)
cdcEtcdClient := etcd.NewCDCEtcdClient(ctx, etcdCli, conf.ClusterID)
s.etcdClient = &cdcEtcdClient

err = s.initDir(ctx)
Expand Down
2 changes: 1 addition & 1 deletion cdc/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func newServer(t *testing.T) *testServer {
DialTimeout: 5 * time.Second,
})
require.Nil(t, err)
etcdClient := etcd.NewCDCEtcdClient(s.ctx, client)
etcdClient := etcd.NewCDCEtcdClient(s.ctx, client, etcd.DefaultCDCClusterID)
s.server.etcdClient = &etcdClient

s.errg = util.HandleErrWithErrGroup(s.ctx, s.e.Err(), func(e error) { t.Log(e) })
Expand Down
3 changes: 2 additions & 1 deletion pkg/cmd/cli/cli_capture_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ func listCaptures(ctx context.Context, etcdClient *etcd.CDCEtcdClient) ([]*captu
return nil, err
}

ownerID, err := etcdClient.GetOwnerID(ctx, etcd.CaptureOwnerKey())
ownerID, err := etcdClient.GetOwnerID(ctx,
etcd.CaptureOwnerKey(etcdClient.ClusterID))
if err != nil && errors.Cause(err) != concurrency.ErrElectionNoLeader {
return nil, err
}
Expand Down
Loading