Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DNM gc(ticdc): gc service id refactor #5444

Merged
merged 67 commits into from
May 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
383fdf7
etcd key add cluster id and namespace
sdojjy May 11, 2022
3ec7f1d
Merge branch 'master' into modify-etcd-key-base
sdojjy May 11, 2022
b996be8
fix ut
sdojjy May 11, 2022
d299306
fix ut
sdojjy May 11, 2022
477cdf0
fix ut
sdojjy May 11, 2022
7649b48
fix lint
sdojjy May 11, 2022
37bc91b
fix ut
sdojjy May 11, 2022
08e6c30
fix integration_test
sdojjy May 11, 2022
7208645
fix integration test
sdojjy May 12, 2022
4f4efc9
remove taskStatus key
sdojjy May 12, 2022
bf4279c
fix lint
sdojjy May 12, 2022
1c1d01f
Merge branch 'master' into modify-etcd-key-base
sdojjy May 12, 2022
a31920d
cli support cdc cluster id
sdojjy May 12, 2022
67b27f1
cli support cdc cluster id
sdojjy May 12, 2022
00e1c56
Merge branch 'decupple-tidb' into modify-etcd-key-base
sdojjy May 12, 2022
bfa9d75
Merge branch 'modify-etcd-key-base' into cli-support-cdc-cluster-id
sdojjy May 12, 2022
23f8396
Merge branch 'master' into modify-etcd-key-base
sdojjy May 13, 2022
687abbb
Merge branch 'master' into cli-support-cdc-cluster-id
sdojjy May 13, 2022
71e7ece
fix ut
sdojjy May 13, 2022
e83ade5
Merge remote-tracking branch 'origin/modify-etcd-key-base' into modif…
sdojjy May 13, 2022
1485b74
Merge branch 'modify-etcd-key-base' into cli-support-cdc-cluster-id
sdojjy May 13, 2022
fd3be43
fix ut
sdojjy May 13, 2022
ce7c397
fix ut
sdojjy May 13, 2022
b26384e
fix ut
sdojjy May 13, 2022
db6f24d
Merge branch 'modify-etcd-key-base' into cli-support-cdc-cluster-id
sdojjy May 13, 2022
2a83553
Merge branch 'master' into cli-support-cdc-cluster-id
sdojjy May 13, 2022
82a433d
Merge remote-tracking branch 'upstream/master' into modify-etcd-key-base
sdojjy May 13, 2022
8fd38fa
Merge branch 'modify-etcd-key-base' into cli-support-cdc-cluster-id
sdojjy May 13, 2022
e91250c
Merge remote-tracking branch 'origin/cli-support-cdc-cluster-id' into…
sdojjy May 13, 2022
4a88493
fix ut
sdojjy May 13, 2022
5dc16f1
fix integration test
sdojjy May 13, 2022
653e0ac
fix ut
sdojjy May 13, 2022
fe3a426
fix ut
sdojjy May 13, 2022
fef6dd1
fix lint
sdojjy May 13, 2022
cf6251c
Merge branch 'master' into modify-etcd-key-base
sdojjy May 14, 2022
5bc40b3
Merge branch 'master' into cli-support-cdc-cluster-id
sdojjy May 14, 2022
b48140a
refactor gc service id format
sdojjy May 16, 2022
e93eb8c
refactor gc service id format
sdojjy May 17, 2022
6d334b8
fix ut
sdojjy May 17, 2022
5e51703
Merge branch 'cli-support-cdc-cluster-id' into gc-manager-refactor
sdojjy May 17, 2022
519df53
Merge branch 'master' into modify-etcd-key-base
sdojjy May 17, 2022
bc3738c
Merge branch 'modify-etcd-key-base' into cli-support-cdc-cluster-id
sdojjy May 17, 2022
c141149
Merge branch 'cli-support-cdc-cluster-id' into gc-manager-refactor
sdojjy May 17, 2022
3d899f4
fix ut
sdojjy May 17, 2022
780e02e
Merge branch 'master' into cli-support-cdc-cluster-id
sdojjy May 17, 2022
2f91f20
fix ut
sdojjy May 17, 2022
37e84c6
Merge branch 'master' into gc-manager-refactor
sdojjy May 17, 2022
8f1684e
fix lint
sdojjy May 17, 2022
b3c036b
Merge branch 'master' into cli-support-cdc-cluster-id
sdojjy May 18, 2022
9994417
Merge branch 'master' into gc-manager-refactor
sdojjy May 18, 2022
f9a9611
Merge branch 'master' into cli-support-cdc-cluster-id
sdojjy May 18, 2022
24d0122
refactor gc service id format
sdojjy May 16, 2022
d41dcde
refactor gc service id format
sdojjy May 17, 2022
bc785b8
fix ut
sdojjy May 17, 2022
3c494a4
fix ut
sdojjy May 17, 2022
8114eb2
sink(ticdc): make mysql sink support split transactions (#5281)
CharlesCheung96 May 17, 2022
ec7d396
fix lint
sdojjy May 17, 2022
2c40521
config(ticdc): Add alias for "dispatcher" in dispatch rules (#5441)
zhaoxinyu May 17, 2022
e6a313c
sink/codec(cdc): refactor Avro codec (#5339)
zhangyangyu May 17, 2022
f0e56aa
Optimist: return ConflictNone if a conflict DDL has no conflict with …
GMHDBJD May 17, 2022
729693a
Merge branch 'gc-manager-refactor' of github.com:sdojjy/ticdc into gc…
sdojjy May 18, 2022
53256a2
Merge remote-tracking branch 'upstream/master' into cli-support-cdc-c…
sdojjy May 18, 2022
8976857
Merge branch 'master' into gc-manager-refactor
sdojjy May 18, 2022
a6cb567
Merge branch 'cli-support-cdc-cluster-id' into gc-manager-refactor
sdojjy May 18, 2022
047237b
fix integration get_safepoint func
sdojjy May 18, 2022
b3d6e7b
Merge branch 'master' into gc-manager-refactor
sdojjy May 18, 2022
b344270
Merge branch 'decupple-tidb' into gc-manager-refactor
sdojjy May 19, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cdc/api/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func verifyCreateChangefeedConfig(
if err := gc.EnsureChangefeedStartTsSafety(
ctx,
upStream.PDClient,
capture.EtcdClient.GetEnsureGCServiceID(),
model.DefaultChangeFeedID(changefeedConfig.ID),
ensureTTL, changefeedConfig.StartTS); err != nil {
if !cerror.ErrStartTsBeforeGC.Equal(err) {
Expand Down
2 changes: 1 addition & 1 deletion cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (c *Capture) reset(ctx context.Context) error {
if c.UpstreamManager != nil {
c.UpstreamManager.Close()
}
c.UpstreamManager = upstream.NewManager(ctx)
c.UpstreamManager = upstream.NewManager(ctx, c.EtcdClient.GetGCServiceID())
err = c.UpstreamManager.Add(upstream.DefaultUpstreamID, c.pdEnpoints, conf.Security)
if err != nil {
return errors.Annotate(
Expand Down
3 changes: 2 additions & 1 deletion cdc/capture/capture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ func TestReset(t *testing.T) {
DialTimeout: 3 * time.Second,
})
require.NoError(t, err)
client := etcd.NewCDCEtcdClient(ctx, etcdCli, etcd.DefaultCDCClusterID)
client, err := etcd.NewCDCEtcdClient(ctx, etcdCli, etcd.DefaultCDCClusterID)
require.Nil(t, err)
// Close the client before the test function exits to prevent possible
// ctx leaks.
// Ref: https://github.com/grpc/grpc-go/blob/master/stream.go#L229
Expand Down
4 changes: 3 additions & 1 deletion cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,9 @@ LOOP:
// See more gc doc.
ensureTTL := int64(10 * 60)
err := gc.EnsureChangefeedStartTsSafety(
ctx, c.upStream.PDClient, c.state.ID, ensureTTL, checkpointTs)
ctx, c.upStream.PDClient,
ctx.GlobalVars().EtcdClient.GetEnsureGCServiceID(),
c.state.ID, ensureTTL, checkpointTs)
if err != nil {
return errors.Trace(err)
}
Expand Down
34 changes: 5 additions & 29 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/txnutil/gc"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/pingcap/tiflow/pkg/version"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
)
Expand Down Expand Up @@ -237,6 +236,7 @@ func TestInitialize(t *testing.T) {
tester.MustApplyPatches()

// initialize
ctx.GlobalVars().EtcdClient = &etcd.CDCEtcdClient{}
cf.Tick(ctx, state, captures)
tester.MustApplyPatches()
require.Equal(t, state.Status.CheckpointTs, ctx.ChangefeedVars().Info.StartTs)
Expand Down Expand Up @@ -271,20 +271,8 @@ func TestExecDDL(t *testing.T) {
job := helper.DDL2Job("create table test0.table0(id int primary key)")
startTs := job.BinlogInfo.FinishedTS + 1000

ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{
CaptureInfo: &model.CaptureInfo{
ID: "capture-id-test",
AdvertiseAddr: "127.0.0.1:0000",
Version: version.ReleaseVersion,
},
})
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: model.DefaultChangeFeedID("changefeed-id-test"),
Info: &model.ChangeFeedInfo{
StartTs: startTs,
Config: config.GetDefaultReplicaConfig(),
},
})
ctx := cdcContext.NewContext4Test(context.Background(), true)
ctx.ChangefeedVars().Info.StartTs = startTs

cf, state, captures, tester := createChangefeed4Test(ctx, t)
cf.upStream.KVStorage = helper.Storage()
Expand Down Expand Up @@ -361,20 +349,8 @@ func TestEmitCheckpointTs(t *testing.T) {
job := helper.DDL2Job("create table test0.table0(id int primary key)")
startTs := job.BinlogInfo.FinishedTS + 1000

ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{
CaptureInfo: &model.CaptureInfo{
ID: "capture-id-test",
AdvertiseAddr: "127.0.0.1:0000",
Version: version.ReleaseVersion,
},
})
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: model.DefaultChangeFeedID("changefeed-id-test"),
Info: &model.ChangeFeedInfo{
StartTs: startTs,
Config: config.GetDefaultReplicaConfig(),
},
})
ctx := cdcContext.NewContext4Test(context.Background(), true)
ctx.ChangefeedVars().Info.StartTs = startTs

cf, state, captures, tester := createChangefeed4Test(ctx, t)
cf.upStream.KVStorage = helper.Storage()
Expand Down
4 changes: 2 additions & 2 deletions cdc/owner/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ func TestUpdateGCSafePoint(t *testing.T) {
// Owner will do a snapshot read at (checkpointTs - 1) from TiKV,
// set GC safepoint to (checkpointTs - 1)
require.Equal(t, safePoint, uint64(1))
require.Equal(t, serviceID, gc.CDCServiceSafePointID)
require.Equal(t, serviceID, etcd.GcServiceIDForTest())
ch <- struct{}{}
return 0, nil
}
Expand Down Expand Up @@ -462,7 +462,7 @@ func TestUpdateGCSafePoint(t *testing.T) {
// Owner will do a snapshot read at (checkpointTs - 1) from TiKV,
// set GC safepoint to (checkpointTs - 1)
require.Equal(t, safePoint, uint64(19))
require.Equal(t, serviceID, gc.CDCServiceSafePointID)
require.Equal(t, serviceID, etcd.GcServiceIDForTest())
ch <- struct{}{}
return 0, nil
}
Expand Down
60 changes: 50 additions & 10 deletions cdc/scheduler/internal/base/processor_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"

Expand All @@ -47,9 +48,10 @@ const (
// TODO add a real unit test with mock components for the agent alone,
// which might require refactoring some existing components.
type agentTestSuite struct {
cluster *p2p.MockCluster
etcdClient *clientv3.Client
etcdKVClient *mockEtcdKVClient
cluster *p2p.MockCluster
etcdClient *clientv3.Client
etcdKVClient *mockEtcdKVClient
etcdClusterClient *mockEtcdClusterClient

tableExecutor *MockTableExecutor
dispatchResponseCh chan *protocol.DispatchTableResponseMessage
Expand All @@ -68,7 +70,7 @@ type agentTestSuite struct {

func newAgentTestSuite(t *testing.T) *agentTestSuite {
ctx, cancel := context.WithCancel(context.Background())
etcdCli, KVCli := newMockEtcdClientForAgentTests(ctx)
etcdCli, KVCli, clusterCli := newMockEtcdClientForAgentTests(ctx)

cluster := p2p.NewMockCluster(t, agentTestMockNodeNum)
ownerMessageServer := cluster.Nodes[ownerCaptureID].Server
Expand All @@ -77,9 +79,10 @@ func newAgentTestSuite(t *testing.T) *agentTestSuite {
require.NotNil(t, ownerMessageClient)

ret := &agentTestSuite{
cluster: cluster,
etcdClient: etcdCli,
etcdKVClient: KVCli,
cluster: cluster,
etcdClient: etcdCli,
etcdKVClient: KVCli,
etcdClusterClient: clusterCli,

// The channel sizes 1024 should be more than sufficient for these tests.
// Full channels will result in panics to make the cases fail.
Expand Down Expand Up @@ -153,7 +156,8 @@ func newAgentTestSuite(t *testing.T) *agentTestSuite {
}

func (s *agentTestSuite) CreateAgent(t *testing.T) (*agentImpl, error) {
cdcEtcdClient := etcd.NewCDCEtcdClient(s.ctx, s.etcdClient, etcd.DefaultCDCClusterID)
cdcEtcdClient, err := etcd.NewCDCEtcdClient(s.ctx, s.etcdClient, etcd.DefaultCDCClusterID)
require.Nil(t, err)
messageServer := s.cluster.Nodes["capture-1"].Server
messageRouter := s.cluster.Nodes["capture-1"].Router
s.tableExecutor = NewMockTableExecutor(t)
Expand Down Expand Up @@ -193,11 +197,15 @@ func (s *agentTestSuite) Close() {
// NOTE: The mock client does not have any useful internal logic.
// It only supports GET operations and any output should be supplied by
// calling the mock.Mock methods embedded in the mock client.
func newMockEtcdClientForAgentTests(ctx context.Context) (*clientv3.Client, *mockEtcdKVClient) {
func newMockEtcdClientForAgentTests(ctx context.Context) (*clientv3.Client,
*mockEtcdKVClient, *mockEtcdClusterClient,
) {
cli := clientv3.NewCtxClient(ctx)
mockKVCli := &mockEtcdKVClient{}
cli.KV = mockKVCli
return cli, mockKVCli
mockClusterCli := &mockEtcdClusterClient{}
cli.Cluster = mockClusterCli
return cli, mockKVCli, mockClusterCli
}

type mockEtcdKVClient struct {
Expand All @@ -214,6 +222,22 @@ func (c *mockEtcdKVClient) Get(ctx context.Context, key string, opts ...clientv3
return resp, args.Error(1)
}

type mockEtcdClusterClient struct {
clientv3.Cluster // embeds a null implementation of the Etcd Cluster client
mock.Mock
}

func (c *mockEtcdClusterClient) MemberList(
ctx context.Context,
) (*clientv3.MemberListResponse, error) {
args := c.Called(ctx)
resp := (*clientv3.MemberListResponse)(nil)
if args.Get(0) != nil {
resp = args.Get(0).(*clientv3.MemberListResponse)
}
return resp, args.Error(1)
}

func TestAgentBasics(t *testing.T) {
suite := newAgentTestSuite(t)
defer suite.Close()
Expand All @@ -229,6 +253,10 @@ func TestAgentBasics(t *testing.T) {
},
},
}, nil)
suite.etcdClusterClient.On("MemberList", mock.Anything).
Return(&clientv3.MemberListResponse{
Header: &etcdserverpb.ResponseHeader{ClusterId: 1},
}, nil)

// Test Point 1: Create an agent.
agent, err := suite.CreateAgent(t)
Expand Down Expand Up @@ -339,6 +367,10 @@ func TestAgentNoOwnerAtStartUp(t *testing.T) {
suite.etcdKVClient.On("Get", mock.Anything,
etcd.CaptureOwnerKey(etcd.DefaultCDCClusterID), mock.Anything).
Return(&clientv3.GetResponse{}, nil)
suite.etcdClusterClient.On("MemberList", mock.Anything).
Return(&clientv3.MemberListResponse{
Header: &etcdserverpb.ResponseHeader{ClusterId: 1},
}, nil)

// Test Point 1: Create an agent.
agent, err := suite.CreateAgent(t)
Expand Down Expand Up @@ -404,6 +436,10 @@ func TestAgentTolerateClientClosed(t *testing.T) {
},
},
}, nil)
suite.etcdClusterClient.On("MemberList", mock.Anything).
Return(&clientv3.MemberListResponse{
Header: &etcdserverpb.ResponseHeader{ClusterId: 1},
}, nil)

// Test Point 1: Create an agent.
agent, err := suite.CreateAgent(t)
Expand Down Expand Up @@ -449,6 +485,10 @@ func TestNoFinishOperationBeforeSyncIsReceived(t *testing.T) {
},
},
}, nil)
suite.etcdClusterClient.On("MemberList", mock.Anything).
Return(&clientv3.MemberListResponse{
Header: &etcdserverpb.ResponseHeader{ClusterId: 1},
}, nil)

agent, err := suite.CreateAgent(t)
require.NoError(t, err)
Expand Down
7 changes: 5 additions & 2 deletions cdc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,12 @@ func (s *Server) Run(ctx context.Context) error {
return errors.Annotate(cerror.WrapError(cerror.ErrNewCaptureFailed, err), "new etcd client")
}

cdcEtcdClient := etcd.NewCDCEtcdClient(ctx, etcdCli, conf.ClusterID)
cdcEtcdClient, err := etcd.NewCDCEtcdClient(ctx, etcdCli, conf.ClusterID)
if err != nil {
return errors.Annotate(cerror.WrapError(cerror.ErrNewCaptureFailed, err),
"wrapper etcd client")
}
s.etcdClient = &cdcEtcdClient

err = s.initDir(ctx)
if err != nil {
return errors.Trace(err)
Expand Down
3 changes: 2 additions & 1 deletion cdc/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ func newServer(t *testing.T) *testServer {
DialTimeout: 5 * time.Second,
})
require.Nil(t, err)
etcdClient := etcd.NewCDCEtcdClient(s.ctx, client, etcd.DefaultCDCClusterID)
etcdClient, err := etcd.NewCDCEtcdClient(s.ctx, client, etcd.DefaultCDCClusterID)
require.Nil(t, err)
s.server.etcdClient = &etcdClient

s.errg = util.HandleErrWithErrGroup(s.ctx, s.e.Err(), func(e error) { t.Log(e) })
Expand Down
4 changes: 3 additions & 1 deletion pkg/cmd/cli/cli_changefeed_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,9 @@ func (o *createChangefeedOptions) validateStartTs(ctx context.Context) error {
// Ensure the start ts is validate in the next 1 hour.
const ensureTTL = 60 * 60.
return gc.EnsureChangefeedStartTsSafety(
ctx, o.pdClient, model.DefaultChangeFeedID(o.changefeedID), ensureTTL, o.startTs)
ctx, o.pdClient,
o.etcdClient.GetEnsureGCServiceID(),
model.DefaultChangeFeedID(o.changefeedID), ensureTTL, o.startTs)
}

// validateTargetTs checks if targetTs is a valid value.
Expand Down
10 changes: 7 additions & 3 deletions pkg/cmd/cli/cli_unsafe_delete_service_gc_safepoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/tiflow/pkg/cmd/context"
"github.com/pingcap/tiflow/pkg/cmd/factory"
"github.com/pingcap/tiflow/pkg/etcd"
"github.com/pingcap/tiflow/pkg/txnutil/gc"
"github.com/spf13/cobra"
pd "github.com/tikv/pd/client"
Expand All @@ -25,7 +26,8 @@ import (
// unsafeDeleteServiceGcSafepointOptions defines flags
// for the `cli unsafe delete-service-gc-safepoint` command.
type unsafeDeleteServiceGcSafepointOptions struct {
pdClient pd.Client
pdClient pd.Client
etcdClient *etcd.CDCEtcdClient
}

// newUnsafeDeleteServiceGcSafepointOptions creates new unsafeDeleteServiceGcSafepointOptions
Expand All @@ -43,14 +45,16 @@ func (o *unsafeDeleteServiceGcSafepointOptions) complete(f factory.Factory) erro

o.pdClient = pdClient

return nil
o.etcdClient, err = f.EtcdClient()
return err
}

// run runs the `cli unsafe delete-service-gc-safepoint` command.
func (o *unsafeDeleteServiceGcSafepointOptions) run(cmd *cobra.Command) error {
ctx := context.GetDefaultContext()

err := gc.RemoveServiceGCSafepoint(ctx, o.pdClient, gc.CDCServiceSafePointID)
err := gc.RemoveServiceGCSafepoint(ctx, o.pdClient,
o.etcdClient.GetGCServiceID())
if err == nil {
cmd.Println("CDC service GC safepoint truncated in PD!")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/cli/cli_unsafe_reset.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (o *unsafeResetOptions) run(cmd *cobra.Command) error {
return errors.Trace(err)
}

err = gc.RemoveServiceGCSafepoint(ctx, o.pdClient, gc.CDCServiceSafePointID)
err = gc.RemoveServiceGCSafepoint(ctx, o.pdClient, o.etcdClient.GetGCServiceID())
if err != nil {
return errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/cmd/factory/factory_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ func (f *factoryImpl) EtcdClient() (*etcd.CDCEtcdClient, error) {
"fail to open PD client, please check pd address \"%s\"", pdAddr)
}

client := etcd.NewCDCEtcdClient(ctx, etcdClient, f.clientGetter.GetClusterID())
return &client, nil
client, err := etcd.NewCDCEtcdClient(ctx, etcdClient, f.clientGetter.GetClusterID())
return &client, err
}

// PdClient creates new pd client.
Expand Down
3 changes: 3 additions & 0 deletions pkg/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,9 @@ func NewContext4Test(baseCtx context.Context, withChangefeedVars bool) Context {
AdvertiseAddr: "127.0.0.1:0000",
Version: version.ReleaseVersion,
},
EtcdClient: &etcd.CDCEtcdClient{
ClusterID: etcd.DefaultCDCClusterID,
},
})
if withChangefeedVars {
ctx = WithChangefeedVars(ctx, &ChangefeedVars{
Expand Down
Loading