Skip to content

Commit

Permalink
DNM gc(ticdc): gc service id refactor (#5444)
Browse files Browse the repository at this point in the history
* refactor gc service id format
  • Loading branch information
sdojjy committed May 24, 2022
1 parent 3425318 commit 953077d
Show file tree
Hide file tree
Showing 24 changed files with 165 additions and 91 deletions.
1 change: 1 addition & 0 deletions cdc/api/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func verifyCreateChangefeedConfig(
if err := gc.EnsureChangefeedStartTsSafety(
ctx,
upStream.PDClient,
capture.EtcdClient.GetEnsureGCServiceID(),
model.DefaultChangeFeedID(changefeedConfig.ID),
ensureTTL, changefeedConfig.StartTS); err != nil {
if !cerror.ErrStartTsBeforeGC.Equal(err) {
Expand Down
2 changes: 1 addition & 1 deletion cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (c *Capture) reset(ctx context.Context) error {
if c.UpstreamManager != nil {
c.UpstreamManager.Close()
}
c.UpstreamManager = upstream.NewManager(ctx)
c.UpstreamManager = upstream.NewManager(ctx, c.EtcdClient.GetGCServiceID())
err = c.UpstreamManager.Add(upstream.DefaultUpstreamID, c.pdEnpoints, conf.Security)
if err != nil {
return errors.Annotate(
Expand Down
3 changes: 2 additions & 1 deletion cdc/capture/capture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ func TestReset(t *testing.T) {
DialTimeout: 3 * time.Second,
})
require.NoError(t, err)
client := etcd.NewCDCEtcdClient(ctx, etcdCli, etcd.DefaultCDCClusterID)
client, err := etcd.NewCDCEtcdClient(ctx, etcdCli, etcd.DefaultCDCClusterID)
require.Nil(t, err)
// Close the client before the test function exits to prevent possible
// ctx leaks.
// Ref: https://github.com/grpc/grpc-go/blob/master/stream.go#L229
Expand Down
4 changes: 3 additions & 1 deletion cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,9 @@ LOOP:
// See more gc doc.
ensureTTL := int64(10 * 60)
err := gc.EnsureChangefeedStartTsSafety(
ctx, c.upStream.PDClient, c.state.ID, ensureTTL, checkpointTs)
ctx, c.upStream.PDClient,
ctx.GlobalVars().EtcdClient.GetEnsureGCServiceID(),
c.state.ID, ensureTTL, checkpointTs)
if err != nil {
return errors.Trace(err)
}
Expand Down
34 changes: 5 additions & 29 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/txnutil/gc"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/pingcap/tiflow/pkg/version"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
)
Expand Down Expand Up @@ -237,6 +236,7 @@ func TestInitialize(t *testing.T) {
tester.MustApplyPatches()

// initialize
ctx.GlobalVars().EtcdClient = &etcd.CDCEtcdClient{}
cf.Tick(ctx, state, captures)
tester.MustApplyPatches()
require.Equal(t, state.Status.CheckpointTs, ctx.ChangefeedVars().Info.StartTs)
Expand Down Expand Up @@ -271,20 +271,8 @@ func TestExecDDL(t *testing.T) {
job := helper.DDL2Job("create table test0.table0(id int primary key)")
startTs := job.BinlogInfo.FinishedTS + 1000

ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{
CaptureInfo: &model.CaptureInfo{
ID: "capture-id-test",
AdvertiseAddr: "127.0.0.1:0000",
Version: version.ReleaseVersion,
},
})
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: model.DefaultChangeFeedID("changefeed-id-test"),
Info: &model.ChangeFeedInfo{
StartTs: startTs,
Config: config.GetDefaultReplicaConfig(),
},
})
ctx := cdcContext.NewContext4Test(context.Background(), true)
ctx.ChangefeedVars().Info.StartTs = startTs

cf, state, captures, tester := createChangefeed4Test(ctx, t)
cf.upStream.KVStorage = helper.Storage()
Expand Down Expand Up @@ -361,20 +349,8 @@ func TestEmitCheckpointTs(t *testing.T) {
job := helper.DDL2Job("create table test0.table0(id int primary key)")
startTs := job.BinlogInfo.FinishedTS + 1000

ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{
CaptureInfo: &model.CaptureInfo{
ID: "capture-id-test",
AdvertiseAddr: "127.0.0.1:0000",
Version: version.ReleaseVersion,
},
})
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: model.DefaultChangeFeedID("changefeed-id-test"),
Info: &model.ChangeFeedInfo{
StartTs: startTs,
Config: config.GetDefaultReplicaConfig(),
},
})
ctx := cdcContext.NewContext4Test(context.Background(), true)
ctx.ChangefeedVars().Info.StartTs = startTs

cf, state, captures, tester := createChangefeed4Test(ctx, t)
cf.upStream.KVStorage = helper.Storage()
Expand Down
4 changes: 2 additions & 2 deletions cdc/owner/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ func TestUpdateGCSafePoint(t *testing.T) {
// Owner will do a snapshot read at (checkpointTs - 1) from TiKV,
// set GC safepoint to (checkpointTs - 1)
require.Equal(t, safePoint, uint64(1))
require.Equal(t, serviceID, gc.CDCServiceSafePointID)
require.Equal(t, serviceID, etcd.GcServiceIDForTest())
ch <- struct{}{}
return 0, nil
}
Expand Down Expand Up @@ -462,7 +462,7 @@ func TestUpdateGCSafePoint(t *testing.T) {
// Owner will do a snapshot read at (checkpointTs - 1) from TiKV,
// set GC safepoint to (checkpointTs - 1)
require.Equal(t, safePoint, uint64(19))
require.Equal(t, serviceID, gc.CDCServiceSafePointID)
require.Equal(t, serviceID, etcd.GcServiceIDForTest())
ch <- struct{}{}
return 0, nil
}
Expand Down
60 changes: 50 additions & 10 deletions cdc/scheduler/internal/base/processor_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"

Expand All @@ -47,9 +48,10 @@ const (
// TODO add a real unit test with mock components for the agent alone,
// which might require refactoring some existing components.
type agentTestSuite struct {
cluster *p2p.MockCluster
etcdClient *clientv3.Client
etcdKVClient *mockEtcdKVClient
cluster *p2p.MockCluster
etcdClient *clientv3.Client
etcdKVClient *mockEtcdKVClient
etcdClusterClient *mockEtcdClusterClient

tableExecutor *MockTableExecutor
dispatchResponseCh chan *protocol.DispatchTableResponseMessage
Expand All @@ -68,7 +70,7 @@ type agentTestSuite struct {

func newAgentTestSuite(t *testing.T) *agentTestSuite {
ctx, cancel := context.WithCancel(context.Background())
etcdCli, KVCli := newMockEtcdClientForAgentTests(ctx)
etcdCli, KVCli, clusterCli := newMockEtcdClientForAgentTests(ctx)

cluster := p2p.NewMockCluster(t, agentTestMockNodeNum)
ownerMessageServer := cluster.Nodes[ownerCaptureID].Server
Expand All @@ -77,9 +79,10 @@ func newAgentTestSuite(t *testing.T) *agentTestSuite {
require.NotNil(t, ownerMessageClient)

ret := &agentTestSuite{
cluster: cluster,
etcdClient: etcdCli,
etcdKVClient: KVCli,
cluster: cluster,
etcdClient: etcdCli,
etcdKVClient: KVCli,
etcdClusterClient: clusterCli,

// The channel sizes 1024 should be more than sufficient for these tests.
// Full channels will result in panics to make the cases fail.
Expand Down Expand Up @@ -153,7 +156,8 @@ func newAgentTestSuite(t *testing.T) *agentTestSuite {
}

func (s *agentTestSuite) CreateAgent(t *testing.T) (*agentImpl, error) {
cdcEtcdClient := etcd.NewCDCEtcdClient(s.ctx, s.etcdClient, etcd.DefaultCDCClusterID)
cdcEtcdClient, err := etcd.NewCDCEtcdClient(s.ctx, s.etcdClient, etcd.DefaultCDCClusterID)
require.Nil(t, err)
messageServer := s.cluster.Nodes["capture-1"].Server
messageRouter := s.cluster.Nodes["capture-1"].Router
s.tableExecutor = NewMockTableExecutor(t)
Expand Down Expand Up @@ -193,11 +197,15 @@ func (s *agentTestSuite) Close() {
// NOTE: The mock client does not have any useful internal logic.
// It only supports GET operations and any output should be supplied by
// calling the mock.Mock methods embedded in the mock client.
func newMockEtcdClientForAgentTests(ctx context.Context) (*clientv3.Client, *mockEtcdKVClient) {
func newMockEtcdClientForAgentTests(ctx context.Context) (*clientv3.Client,
*mockEtcdKVClient, *mockEtcdClusterClient,
) {
cli := clientv3.NewCtxClient(ctx)
mockKVCli := &mockEtcdKVClient{}
cli.KV = mockKVCli
return cli, mockKVCli
mockClusterCli := &mockEtcdClusterClient{}
cli.Cluster = mockClusterCli
return cli, mockKVCli, mockClusterCli
}

type mockEtcdKVClient struct {
Expand All @@ -214,6 +222,22 @@ func (c *mockEtcdKVClient) Get(ctx context.Context, key string, opts ...clientv3
return resp, args.Error(1)
}

type mockEtcdClusterClient struct {
clientv3.Cluster // embeds a null implementation of the Etcd Cluster client
mock.Mock
}

func (c *mockEtcdClusterClient) MemberList(
ctx context.Context,
) (*clientv3.MemberListResponse, error) {
args := c.Called(ctx)
resp := (*clientv3.MemberListResponse)(nil)
if args.Get(0) != nil {
resp = args.Get(0).(*clientv3.MemberListResponse)
}
return resp, args.Error(1)
}

func TestAgentBasics(t *testing.T) {
suite := newAgentTestSuite(t)
defer suite.Close()
Expand All @@ -229,6 +253,10 @@ func TestAgentBasics(t *testing.T) {
},
},
}, nil)
suite.etcdClusterClient.On("MemberList", mock.Anything).
Return(&clientv3.MemberListResponse{
Header: &etcdserverpb.ResponseHeader{ClusterId: 1},
}, nil)

// Test Point 1: Create an agent.
agent, err := suite.CreateAgent(t)
Expand Down Expand Up @@ -339,6 +367,10 @@ func TestAgentNoOwnerAtStartUp(t *testing.T) {
suite.etcdKVClient.On("Get", mock.Anything,
etcd.CaptureOwnerKey(etcd.DefaultCDCClusterID), mock.Anything).
Return(&clientv3.GetResponse{}, nil)
suite.etcdClusterClient.On("MemberList", mock.Anything).
Return(&clientv3.MemberListResponse{
Header: &etcdserverpb.ResponseHeader{ClusterId: 1},
}, nil)

// Test Point 1: Create an agent.
agent, err := suite.CreateAgent(t)
Expand Down Expand Up @@ -404,6 +436,10 @@ func TestAgentTolerateClientClosed(t *testing.T) {
},
},
}, nil)
suite.etcdClusterClient.On("MemberList", mock.Anything).
Return(&clientv3.MemberListResponse{
Header: &etcdserverpb.ResponseHeader{ClusterId: 1},
}, nil)

// Test Point 1: Create an agent.
agent, err := suite.CreateAgent(t)
Expand Down Expand Up @@ -449,6 +485,10 @@ func TestNoFinishOperationBeforeSyncIsReceived(t *testing.T) {
},
},
}, nil)
suite.etcdClusterClient.On("MemberList", mock.Anything).
Return(&clientv3.MemberListResponse{
Header: &etcdserverpb.ResponseHeader{ClusterId: 1},
}, nil)

agent, err := suite.CreateAgent(t)
require.NoError(t, err)
Expand Down
7 changes: 5 additions & 2 deletions cdc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,12 @@ func (s *Server) Run(ctx context.Context) error {
return errors.Annotate(cerror.WrapError(cerror.ErrNewCaptureFailed, err), "new etcd client")
}

cdcEtcdClient := etcd.NewCDCEtcdClient(ctx, etcdCli, conf.ClusterID)
cdcEtcdClient, err := etcd.NewCDCEtcdClient(ctx, etcdCli, conf.ClusterID)
if err != nil {
return errors.Annotate(cerror.WrapError(cerror.ErrNewCaptureFailed, err),
"wrapper etcd client")
}
s.etcdClient = &cdcEtcdClient

err = s.initDir(ctx)
if err != nil {
return errors.Trace(err)
Expand Down
3 changes: 2 additions & 1 deletion cdc/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ func newServer(t *testing.T) *testServer {
DialTimeout: 5 * time.Second,
})
require.Nil(t, err)
etcdClient := etcd.NewCDCEtcdClient(s.ctx, client, etcd.DefaultCDCClusterID)
etcdClient, err := etcd.NewCDCEtcdClient(s.ctx, client, etcd.DefaultCDCClusterID)
require.Nil(t, err)
s.server.etcdClient = &etcdClient

s.errg = util.HandleErrWithErrGroup(s.ctx, s.e.Err(), func(e error) { t.Log(e) })
Expand Down
4 changes: 3 additions & 1 deletion pkg/cmd/cli/cli_changefeed_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,9 @@ func (o *createChangefeedOptions) validateStartTs(ctx context.Context) error {
// Ensure the start ts is validate in the next 1 hour.
const ensureTTL = 60 * 60.
return gc.EnsureChangefeedStartTsSafety(
ctx, o.pdClient, model.DefaultChangeFeedID(o.changefeedID), ensureTTL, o.startTs)
ctx, o.pdClient,
o.etcdClient.GetEnsureGCServiceID(),
model.DefaultChangeFeedID(o.changefeedID), ensureTTL, o.startTs)
}

// validateTargetTs checks if targetTs is a valid value.
Expand Down
10 changes: 7 additions & 3 deletions pkg/cmd/cli/cli_unsafe_delete_service_gc_safepoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/tiflow/pkg/cmd/context"
"github.com/pingcap/tiflow/pkg/cmd/factory"
"github.com/pingcap/tiflow/pkg/etcd"
"github.com/pingcap/tiflow/pkg/txnutil/gc"
"github.com/spf13/cobra"
pd "github.com/tikv/pd/client"
Expand All @@ -25,7 +26,8 @@ import (
// unsafeDeleteServiceGcSafepointOptions defines flags
// for the `cli unsafe delete-service-gc-safepoint` command.
type unsafeDeleteServiceGcSafepointOptions struct {
pdClient pd.Client
pdClient pd.Client
etcdClient *etcd.CDCEtcdClient
}

// newUnsafeDeleteServiceGcSafepointOptions creates new unsafeDeleteServiceGcSafepointOptions
Expand All @@ -43,14 +45,16 @@ func (o *unsafeDeleteServiceGcSafepointOptions) complete(f factory.Factory) erro

o.pdClient = pdClient

return nil
o.etcdClient, err = f.EtcdClient()
return err
}

// run runs the `cli unsafe delete-service-gc-safepoint` command.
func (o *unsafeDeleteServiceGcSafepointOptions) run(cmd *cobra.Command) error {
ctx := context.GetDefaultContext()

err := gc.RemoveServiceGCSafepoint(ctx, o.pdClient, gc.CDCServiceSafePointID)
err := gc.RemoveServiceGCSafepoint(ctx, o.pdClient,
o.etcdClient.GetGCServiceID())
if err == nil {
cmd.Println("CDC service GC safepoint truncated in PD!")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/cli/cli_unsafe_reset.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (o *unsafeResetOptions) run(cmd *cobra.Command) error {
return errors.Trace(err)
}

err = gc.RemoveServiceGCSafepoint(ctx, o.pdClient, gc.CDCServiceSafePointID)
err = gc.RemoveServiceGCSafepoint(ctx, o.pdClient, o.etcdClient.GetGCServiceID())
if err != nil {
return errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/cmd/factory/factory_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ func (f *factoryImpl) EtcdClient() (*etcd.CDCEtcdClient, error) {
"fail to open PD client, please check pd address \"%s\"", pdAddr)
}

client := etcd.NewCDCEtcdClient(ctx, etcdClient, f.clientGetter.GetClusterID())
return &client, nil
client, err := etcd.NewCDCEtcdClient(ctx, etcdClient, f.clientGetter.GetClusterID())
return &client, err
}

// PdClient creates new pd client.
Expand Down
3 changes: 3 additions & 0 deletions pkg/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,9 @@ func NewContext4Test(baseCtx context.Context, withChangefeedVars bool) Context {
AdvertiseAddr: "127.0.0.1:0000",
Version: version.ReleaseVersion,
},
EtcdClient: &etcd.CDCEtcdClient{
ClusterID: etcd.DefaultCDCClusterID,
},
})
if withChangefeedVars {
ctx = WithChangefeedVars(ctx, &ChangefeedVars{
Expand Down
Loading

0 comments on commit 953077d

Please sign in to comment.