Skip to content

Commit

Permalink
DNM *(ticdc): etcd key add cluster id and namespace (#5399)
Browse files Browse the repository at this point in the history
* etcd key add cluster id and namespace
  • Loading branch information
sdojjy committed May 24, 2022
1 parent 41fcb9f commit c1996c0
Show file tree
Hide file tree
Showing 27 changed files with 369 additions and 501 deletions.
2 changes: 1 addition & 1 deletion cdc/api/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func RegisterStatusAPIRoutes(router *gin.Engine, capture *capture.Capture) {
}

func (h *statusAPI) writeEtcdInfo(ctx context.Context, cli *etcd.CDCEtcdClient, w io.Writer) {
resp, err := cli.Client.Get(ctx, etcd.EtcdKeyBase, clientv3.WithPrefix())
resp, err := cli.Client.Get(ctx, etcd.BaseKey(), clientv3.WithPrefix())
if err != nil {
fmt.Fprintf(w, "failed to get info: %s\n\n", err.Error())
return
Expand Down
7 changes: 4 additions & 3 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (c *Capture) reset(ctx context.Context) error {
_ = c.session.Close()
}
c.session = sess
c.election = concurrency.NewElection(sess, etcd.CaptureOwnerKey)
c.election = concurrency.NewElection(sess, etcd.CaptureOwnerKey())

if c.tableActorSystem != nil {
c.tableActorSystem.Stop()
Expand Down Expand Up @@ -412,7 +412,8 @@ func (c *Capture) runEtcdWorker(
timerInterval time.Duration,
role string,
) error {
etcdWorker, err := orchestrator.NewEtcdWorker(ctx.GlobalVars().EtcdClient.Client, etcd.EtcdKeyBase, reactor, reactorState)
etcdWorker, err := orchestrator.NewEtcdWorker(ctx.GlobalVars().EtcdClient.Client,
etcd.BaseKey(), reactor, reactorState)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -577,7 +578,7 @@ func (c *Capture) GetOwnerCaptureInfo(ctx context.Context) (*model.CaptureInfo,
return nil, err
}

ownerID, err := c.EtcdClient.GetOwnerID(ctx, etcd.CaptureOwnerKey)
ownerID, err := c.EtcdClient.GetOwnerID(ctx, etcd.CaptureOwnerKey())
if err != nil {
return nil, err
}
Expand Down
6 changes: 5 additions & 1 deletion cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package owner
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"math"
"os"
Expand All @@ -31,6 +32,7 @@ import (
"github.com/pingcap/tiflow/cdc/scheduler"
"github.com/pingcap/tiflow/pkg/config"
cdcContext "github.com/pingcap/tiflow/pkg/context"
"github.com/pingcap/tiflow/pkg/etcd"
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/txnutil/gc"
"github.com/pingcap/tiflow/pkg/upstream"
Expand Down Expand Up @@ -197,7 +199,9 @@ func createChangefeed4Test(ctx cdcContext.Context, t *testing.T) (
info = ctx.ChangefeedVars().Info
return info, true, nil
})
tester.MustUpdate("/tidb/cdc/capture/"+ctx.GlobalVars().CaptureInfo.ID, []byte(`{"id":"`+ctx.GlobalVars().CaptureInfo.ID+`","address":"127.0.0.1:8300"}`))
tester.MustUpdate(fmt.Sprintf("%s/capture/%s",
etcd.DefaultClusterAndMetaPrefix, ctx.GlobalVars().CaptureInfo.ID),
[]byte(`{"id":"`+ctx.GlobalVars().CaptureInfo.ID+`","address":"127.0.0.1:8300"}`))
tester.MustApplyPatches()
captures := map[model.CaptureID]*model.CaptureInfo{ctx.GlobalVars().CaptureInfo.ID: ctx.GlobalVars().CaptureInfo}
return cf, state, captures, tester
Expand Down
12 changes: 9 additions & 3 deletions cdc/owner/feed_state_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@
package owner

import (
"fmt"
"testing"
"time"

"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
cdcContext "github.com/pingcap/tiflow/pkg/context"
"github.com/pingcap/tiflow/pkg/etcd"
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -308,12 +310,16 @@ func TestChangefeedStatusNotExist(t *testing.T) {
manager := newFeedStateManager4Test()
state := orchestrator.NewChangefeedReactorState(ctx.ChangefeedVars().ID)
tester := orchestrator.NewReactorStateTester(t, state, map[string]string{
"/tidb/cdc/capture/d563bfc0-f406-4f34-bc7d-6dc2e35a44e5": `
fmt.Sprintf("%s/capture/d563bfc0-f406-4f34-bc7d-6dc2e35a44e5",
etcd.DefaultClusterAndMetaPrefix): `
{"id":"d563bfc0-f406-4f34-bc7d-6dc2e35a44e5",
"address":"172.16.6.147:8300","version":"v5.0.0-master-dirty"}`,
"/tidb/cdc/changefeed/info/" +
fmt.Sprintf("%s/changefeed/info/",
etcd.DefaultClusterAndNamespacePrefix) +
ctx.ChangefeedVars().ID.ID: changefeedInfo,
"/tidb/cdc/owner/156579d017f84a68": "d563bfc0-f406-4f34-bc7d-6dc2e35a44e5",
fmt.Sprintf("%s/owner/156579d017f84a68",
etcd.DefaultClusterAndMetaPrefix,
): "d563bfc0-f406-4f34-bc7d-6dc2e35a44e5",
})
manager.Tick(state)
require.False(t, manager.ShouldRunning())
Expand Down
17 changes: 13 additions & 4 deletions cdc/owner/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,10 @@ func TestCheckClusterVersion(t *testing.T) {
ctx, cancel := cdcContext.WithCancel(ctx)
defer cancel()

tester.MustUpdate("/tidb/cdc/capture/6bbc01c8-0605-4f86-a0f9-b3119109b225", []byte(`{"id":"6bbc01c8-0605-4f86-a0f9-b3119109b225","address":"127.0.0.1:8300","version":"v6.0.0"}`))
tester.MustUpdate(fmt.Sprintf("%s/capture/6bbc01c8-0605-4f86-a0f9-b3119109b225",
etcd.DefaultClusterAndMetaPrefix),
[]byte(`{"id":"6bbc01c8-0605-4f86-a0f9-b3119109b225",
"address":"127.0.0.1:8300","version":"v6.0.0"}`))

changefeedID := model.DefaultChangeFeedID("test-changefeed")
changefeedInfo := &model.ChangeFeedInfo{
Expand All @@ -296,7 +299,9 @@ func TestCheckClusterVersion(t *testing.T) {
require.Nil(t, err)
require.NotContains(t, owner.changefeeds, changefeedID)

tester.MustUpdate("/tidb/cdc/capture/6bbc01c8-0605-4f86-a0f9-b3119109b225",
tester.MustUpdate(fmt.Sprintf("%s/capture/6bbc01c8-0605-4f86-a0f9-b3119109b225",
etcd.DefaultClusterAndMetaPrefix,
),
[]byte(`{"id":"6bbc01c8-0605-4f86-a0f9-b3119109b225","address":"127.0.0.1:8300","version":"`+ctx.GlobalVars().CaptureInfo.Version+`"}`))

// check the tick is not skipped and the changefeed will be handled normally
Expand Down Expand Up @@ -388,7 +393,9 @@ func TestUpdateGCSafePoint(t *testing.T) {
}
changefeedID1 := model.DefaultChangeFeedID("test-changefeed1")
tester.MustUpdate(
fmt.Sprintf("/tidb/cdc/changefeed/info/%s", changefeedID1.ID),
fmt.Sprintf("%s/changefeed/info/%s",
etcd.DefaultClusterAndNamespacePrefix,
changefeedID1.ID),
[]byte(`{"config":{"cyclic-replication":{}},"state":"failed"}`))
tester.MustApplyPatches()
state.Changefeeds[changefeedID1].PatchStatus(
Expand Down Expand Up @@ -429,7 +436,9 @@ func TestUpdateGCSafePoint(t *testing.T) {
// add another changefeed, it must update GC safepoint.
changefeedID2 := model.DefaultChangeFeedID("test-changefeed2")
tester.MustUpdate(
fmt.Sprintf("/tidb/cdc/changefeed/info/%s", changefeedID2.ID),
fmt.Sprintf("%s/changefeed/info/%s",
etcd.DefaultClusterAndNamespacePrefix,
changefeedID2.ID),
[]byte(`{"config":{"cyclic-replication":{}},"state":"normal"}`))
tester.MustApplyPatches()
state.Changefeeds[changefeedID1].PatchStatus(
Expand Down
5 changes: 4 additions & 1 deletion cdc/processor/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tiflow/pkg/config"
cdcContext "github.com/pingcap/tiflow/pkg/context"
cerrors "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/etcd"
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -64,7 +65,9 @@ func (s *managerTester) resetSuit(ctx cdcContext.Context, t *testing.T) {
captureInfoBytes, err := ctx.GlobalVars().CaptureInfo.Marshal()
require.Nil(t, err)
s.tester = orchestrator.NewReactorStateTester(t, s.state, map[string]string{
fmt.Sprintf("/tidb/cdc/capture/%s", ctx.GlobalVars().CaptureInfo.ID): string(captureInfoBytes),
fmt.Sprintf("%s/capture/%s",
etcd.DefaultClusterAndMetaPrefix,
ctx.GlobalVars().CaptureInfo.ID): string(captureInfoBytes),
})
}

Expand Down
17 changes: 9 additions & 8 deletions cdc/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,15 @@ func initProcessor4Test(ctx cdcContext.Context, t *testing.T) (*processor, *orch
captureID := ctx.GlobalVars().CaptureInfo.ID
changefeedID := ctx.ChangefeedVars().ID
return p, orchestrator.NewReactorStateTester(t, p.changefeed, map[string]string{
"/tidb/cdc/capture/" +
captureID: `{"id":"` + captureID + `","address":"127.0.0.1:8300"}`,
"/tidb/cdc/changefeed/info/" +
changefeedID.ID: changefeedInfo,
"/tidb/cdc/job/" +
ctx.ChangefeedVars().ID.ID: `{"resolved-ts":0,"checkpoint-ts":0,"admin-job-type":0}`,
"/tidb/cdc/task/status/" +
captureID + "/" + changefeedID.ID: `{"tables":{},"operation":null,"admin-job-type":0}`,
fmt.Sprintf("%s/capture/%s",
etcd.DefaultClusterAndMetaPrefix,
captureID): `{"id":"` + captureID + `","address":"127.0.0.1:8300"}`,
fmt.Sprintf("%s/changefeed/info/%s",
etcd.DefaultClusterAndNamespacePrefix,
changefeedID.ID): changefeedInfo,
fmt.Sprintf("%s/job/%s",
etcd.DefaultClusterAndNamespacePrefix,
ctx.ChangefeedVars().ID.ID): `{"resolved-ts":0,"checkpoint-ts":0,"admin-job-type":0}`,
})
}

Expand Down
2 changes: 1 addition & 1 deletion cdc/scheduler/internal/base/processor_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func NewAgent(
etcdCliCtx, cancel := context.WithTimeout(ctx, getOwnerFromEtcdTimeout)
defer cancel()
ownerCaptureID, err := ret.etcdClient.
GetOwnerID(etcdCliCtx, etcd.CaptureOwnerKey)
GetOwnerID(etcdCliCtx, etcd.CaptureOwnerKey())
if err != nil {
if err != concurrency.ErrElectionNoLeader {
return nil, errors.Trace(err)
Expand Down
14 changes: 7 additions & 7 deletions cdc/scheduler/internal/base/processor_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,11 +219,11 @@ func TestAgentBasics(t *testing.T) {
defer suite.Close()

suite.etcdKVClient.On("Get", mock.Anything,
etcd.CaptureOwnerKey, mock.Anything).
etcd.CaptureOwnerKey(), mock.Anything).
Return(&clientv3.GetResponse{
Kvs: []*mvccpb.KeyValue{
{
Key: []byte(etcd.CaptureOwnerKey),
Key: []byte(etcd.CaptureOwnerKey()),
Value: []byte(ownerCaptureID),
ModRevision: 1,
},
Expand Down Expand Up @@ -337,7 +337,7 @@ func TestAgentNoOwnerAtStartUp(t *testing.T) {

// Empty response implies no owner.
suite.etcdKVClient.On("Get", mock.Anything,
etcd.CaptureOwnerKey, mock.Anything).
etcd.CaptureOwnerKey(), mock.Anything).
Return(&clientv3.GetResponse{}, nil)

// Test Point 1: Create an agent.
Expand Down Expand Up @@ -393,11 +393,11 @@ func TestAgentTolerateClientClosed(t *testing.T) {
defer suite.Close()

suite.etcdKVClient.On("Get", mock.Anything,
etcd.CaptureOwnerKey, mock.Anything).
etcd.CaptureOwnerKey(), mock.Anything).
Return(&clientv3.GetResponse{
Kvs: []*mvccpb.KeyValue{
{
Key: []byte(etcd.CaptureOwnerKey),
Key: []byte(etcd.CaptureOwnerKey()),
Value: []byte(ownerCaptureID),
ModRevision: 1,
},
Expand Down Expand Up @@ -438,11 +438,11 @@ func TestNoFinishOperationBeforeSyncIsReceived(t *testing.T) {
defer suite.Close()

suite.etcdKVClient.On("Get", mock.Anything,
etcd.CaptureOwnerKey, mock.Anything).
etcd.CaptureOwnerKey(), mock.Anything).
Return(&clientv3.GetResponse{
Kvs: []*mvccpb.KeyValue{
{
Key: []byte(etcd.CaptureOwnerKey),
Key: []byte(etcd.CaptureOwnerKey()),
Value: []byte(ownerCaptureID),
ModRevision: 1,
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/cli/cli_capture_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func listCaptures(ctx context.Context, etcdClient *etcd.CDCEtcdClient) ([]*captu
return nil, err
}

ownerID, err := etcdClient.GetOwnerID(ctx, etcd.CaptureOwnerKey)
ownerID, err := etcdClient.GetOwnerID(ctx, etcd.CaptureOwnerKey())
if err != nil && errors.Cause(err) != concurrency.ErrElectionNoLeader {
return nil, err
}
Expand Down
14 changes: 3 additions & 11 deletions pkg/cmd/cli/cli_changefeed_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ func (o *queryChangefeedOptions) run(cmd *cobra.Command) error {
return err
}

taskPositions, err := o.etcdClient.GetAllTaskPositions(ctx, o.changefeedID)
taskPositions, err := o.etcdClient.GetAllTaskPositions(ctx,
model.DefaultChangeFeedID(o.changefeedID))
if err != nil && cerror.ErrChangeFeedNotExists.NotEqual(err) {
return err
}
Expand All @@ -124,18 +125,9 @@ func (o *queryChangefeedOptions) run(cmd *cobra.Command) error {
count += pinfo.Count
}

processorInfos, err := o.etcdClient.GetAllTaskStatus(ctx, o.changefeedID)
if err != nil {
return err
}

taskStatus := make([]captureTaskStatus, 0, len(processorInfos))
for captureID, status := range processorInfos {
taskStatus = append(taskStatus, captureTaskStatus{CaptureID: captureID, TaskStatus: status})
}
taskStatus := make([]captureTaskStatus, 0)

meta := &cfMeta{Info: info, Status: status, Count: count, TaskStatus: taskStatus}

return util.JSONPrint(cmd, meta)
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/cmd/cli/cli_changefeed_statistics.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ func (o *statisticsChangefeedOptions) runCliWithEtcdClient(ctx context.Context,
return err
}

taskPositions, err := o.etcdClient.GetAllTaskPositions(ctx, o.changefeedID)
taskPositions, err := o.etcdClient.GetAllTaskPositions(ctx,
model.DefaultChangeFeedID(o.changefeedID))
if err != nil {
return err
}
Expand Down
8 changes: 3 additions & 5 deletions pkg/cmd/cli/cli_processor_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,10 @@ func (o *queryProcessorOptions) addFlags(cmd *cobra.Command) {

// run cli cmd with etcd client
func (o *queryProcessorOptions) runCliWithEtcdClient(ctx context.Context, cmd *cobra.Command) error {
_, status, err := o.etcdClient.GetTaskStatus(ctx, o.changefeedID, o.captureID)
if err != nil && cerror.ErrTaskStatusNotExists.Equal(err) {
return err
}
status := &model.TaskStatus{}

_, position, err := o.etcdClient.GetTaskPosition(ctx, o.changefeedID, o.captureID)
_, position, err := o.etcdClient.GetTaskPosition(ctx,
model.DefaultChangeFeedID(o.changefeedID), o.captureID)
if err != nil && cerror.ErrTaskPositionNotExists.Equal(err) {
return err
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/cmd/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ func TestParseCfg(t *testing.T) {
ServerWorkerPoolSize: 4,
},
},
ClusterID: "default",
}, o.serverConfig)
}

Expand Down Expand Up @@ -356,6 +357,7 @@ server-worker-pool-size = 16
ServerWorkerPoolSize: 16,
},
},
ClusterID: "default",
}, o.serverConfig)
}

Expand Down Expand Up @@ -499,6 +501,7 @@ cert-allowed-cn = ["dd","ee"]
ServerWorkerPoolSize: 4,
},
},
ClusterID: "default",
}, o.serverConfig)
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ const (
"server-ack-interval": 100000000,
"server-worker-pool-size": 4
}
}
},
"cluster-id": "default"
}`

testCfgTestReplicaConfigMarshal1 = `{
Expand Down
2 changes: 2 additions & 0 deletions pkg/config/server_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ var defaultServerConfig = &ServerConfig{
},
Messages: defaultMessageConfig.Clone(),
},
ClusterID: "default",
}

// ServerConfig represents a config for server
Expand All @@ -155,6 +156,7 @@ type ServerConfig struct {
PerTableMemoryQuota uint64 `toml:"per-table-memory-quota" json:"per-table-memory-quota"`
KVClient *KVClientConfig `toml:"kv-client" json:"kv-client"`
Debug *DebugConfig `toml:"debug" json:"debug"`
ClusterID string `toml:"cluster-id" json:"cluster-id"`
}

// Marshal returns the json marshal format of a ServerConfig
Expand Down
Loading

0 comments on commit c1996c0

Please sign in to comment.