From 1468e8da887b859d4d8c34825aa8e8b4a5332c13 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Wed, 27 Sep 2023 14:40:53 +0800 Subject: [PATCH 01/11] br: check the correct changefeed info when restore --- br/pkg/task/stream.go | 14 ++++++++------ br/pkg/utils/cdc.go | 25 ++++++++++++++++++++++++- br/pkg/utils/cdc_test.go | 28 +++++++++++++++++++++++++--- 3 files changed, 57 insertions(+), 10 deletions(-) diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 56a7c238bc013..ba633947e4e5e 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -1076,12 +1076,14 @@ func checkTaskExists(ctx context.Context, cfg *RestoreConfig, etcdCLI *clientv3. } // check cdc changefeed - nameSet, err := utils.GetCDCChangefeedNameSet(ctx, etcdCLI) - if err != nil { - return err - } - if !nameSet.Empty() { - return errors.Errorf("%splease stop changefeed(s) before restore", nameSet.MessageToUser()) + if cfg.CheckRequirements { + nameSet, err := utils.GetCDCChangefeedNameSet(ctx, etcdCLI) + if err != nil { + return err + } + if !nameSet.Empty() { + return errors.Errorf("%splease stop changefeed(s) before restore", nameSet.MessageToUser()) + } } return nil } diff --git a/br/pkg/utils/cdc.go b/br/pkg/utils/cdc.go index ab655f826fbb0..556f1bc79360f 100644 --- a/br/pkg/utils/cdc.go +++ b/br/pkg/utils/cdc.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "fmt" + "regexp" "strings" "github.com/pingcap/errors" @@ -62,7 +63,7 @@ func (s *CDCNameSet) MessageToUser() string { // for CDC <= v6.1, the etcd key format is /tidb/cdc/changefeed/info/ func GetCDCChangefeedNameSet(ctx context.Context, cli *clientv3.Client) (*CDCNameSet, error) { nameSet := make(map[string][]string, 1) - // check etcd KV of CDC >= v6.2 + // check etcd KV of CDC >= v5.2 resp, err := cli.Get(ctx, CDCPrefix, clientv3.WithPrefix()) if err != nil { return nil, errors.Trace(err) @@ -75,6 +76,28 @@ func GetCDCChangefeedNameSet(ctx context.Context, cli *clientv3.Client) (*CDCNam if !found { continue } + // example: clusterAndNamespace normally is / + // but in migration scenario it become __backup__. we need handle it + // see https://github.com/pingcap/tiflow/issues/9807 + clusterID, _, found := bytes.Cut(clusterAndNamespace, []byte(`/`)) + if !found { + // ignore __backup__ or other formats + continue + } + + // Generate a random cluster ID in pd + // r := rand.New(rand.NewSource(time.Now().UnixNano())) + // ts := uint64(time.Now().Unix()) + // clusterID := (ts << 32) + uint64(r.Uint32()) + matched, err := regexp.Match("^[0-9]+$", clusterID) + if err != nil { + log.L().Warn("failed to parse cluster id, skip it", zap.String("cluster ID", string(clusterID)), zap.Error(err)) + continue + } + if !matched { + continue + } + if !isActiveCDCChangefeed(kv.Value) { continue } diff --git a/br/pkg/utils/cdc_test.go b/br/pkg/utils/cdc_test.go index 1032693a0d3aa..0c9ad25e09bdc 100644 --- a/br/pkg/utils/cdc_test.go +++ b/br/pkg/utils/cdc_test.go @@ -49,11 +49,11 @@ func TestGetCDCChangefeedNameSet(t *testing.T) { checkEtcdPut("/tidb/cdc/default/__cdc_meta__/meta/ticdc-delete-etcd-key-count") checkEtcdPut("/tidb/cdc/default/__cdc_meta__/owner/22318498f4dd6639") checkEtcdPut( - "/tidb/cdc/default/default/changefeed/info/test", + "/tidb/cdc/5402613591834624000/default/changefeed/info/test", `{"upstream-id":7195826648407968958,"namespace":"default","changefeed-id":"test-1","sink-uri":"mysql://root@127.0.0.1:3306?time-zone=","create-time":"2023-02-03T15:23:34.773768+08:00","start-ts":439198420741652483,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":"","config":{"memory-quota":1073741824,"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"enable-sync-point":false,"bdr-mode":false,"sync-point-interval":600000000000,"sync-point-retention":86400000000000,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"event-filters":null},"mounter":{"worker-num":16},"sink":{"transaction-atomicity":"","protocol":"","dispatchers":null,"csv":{"delimiter":",","quote":"\"","null":"\\N","include-commit-ts":false},"column-selectors":null,"schema-registry":"","encoder-concurrency":16,"terminator":"\r\n","date-separator":"none","enable-partition-separator":false},"consistent":{"level":"none","max-log-size":64,"flush-interval":2000,"storage":""},"scheduler":{"region-per-span":0}},"state":"normal","error":null,"creator-version":"v6.5.0-master-dirty"}`, ) checkEtcdPut( - "/tidb/cdc/default/default/changefeed/info/test-1", + "/tidb/cdc/5402613591834624000/default/changefeed/info/test-1", `{"upstream-id":7195826648407968958,"namespace":"default","changefeed-id":"test-1","sink-uri":"mysql://root@127.0.0.1:3306?time-zone=","create-time":"2023-02-03T15:23:34.773768+08:00","start-ts":439198420741652483,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":"","config":{"memory-quota":1073741824,"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"enable-sync-point":false,"bdr-mode":false,"sync-point-interval":600000000000,"sync-point-retention":86400000000000,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"event-filters":null},"mounter":{"worker-num":16},"sink":{"transaction-atomicity":"","protocol":"","dispatchers":null,"csv":{"delimiter":",","quote":"\"","null":"\\N","include-commit-ts":false},"column-selectors":null,"schema-registry":"","encoder-concurrency":16,"terminator":"\r\n","date-separator":"none","enable-partition-separator":false},"consistent":{"level":"none","max-log-size":64,"flush-interval":2000,"storage":""},"scheduler":{"region-per-span":0}},"state":"failed","error":null,"creator-version":"v6.5.0-master-dirty"}`, ) checkEtcdPut("/tidb/cdc/default/default/changefeed/status/test") @@ -64,7 +64,7 @@ func TestGetCDCChangefeedNameSet(t *testing.T) { nameSet, err = utils.GetCDCChangefeedNameSet(ctx, cli) require.NoError(t, err) require.False(t, nameSet.Empty()) - require.Equal(t, "found CDC changefeed(s): cluster/namespace: default/default changefeed(s): [test], ", + require.Equal(t, "found CDC changefeed(s): cluster/namespace: 5402613591834624000/default changefeed(s): [test], ", nameSet.MessageToUser()) _, err = cli.Delete(ctx, "/tidb/cdc/", clientv3.WithPrefix()) @@ -85,4 +85,26 @@ func TestGetCDCChangefeedNameSet(t *testing.T) { require.False(t, nameSet.Empty()) require.Equal(t, "found CDC changefeed(s): cluster/namespace: changefeed(s): [test], ", nameSet.MessageToUser()) + + _, err = cli.Delete(ctx, "/tidb/cdc/", clientv3.WithPrefix()) + require.NoError(t, err) + + // ignore __backup__ changefeed + checkEtcdPut( + "/tidb/cdc/__backup__/changefeed/info/test", + `{"upstream-id":7195826648407968958,"namespace":"default","changefeed-id":"test-1","sink-uri":"mysql://root@127.0.0.1:3306?time-zone=","create-time":"2023-02-03T15:23:34.773768+08:00","start-ts":439198420741652483,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":"","config":{"memory-quota":1073741824,"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"enable-sync-point":false,"bdr-mode":false,"sync-point-interval":600000000000,"sync-point-retention":86400000000000,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"event-filters":null},"mounter":{"worker-num":16},"sink":{"transaction-atomicity":"","protocol":"","dispatchers":null,"csv":{"delimiter":",","quote":"\"","null":"\\N","include-commit-ts":false},"column-selectors":null,"schema-registry":"","encoder-concurrency":16,"terminator":"\r\n","date-separator":"none","enable-partition-separator":false},"consistent":{"level":"none","max-log-size":64,"flush-interval":2000,"storage":""},"scheduler":{"region-per-span":0}},"state":"normal","error":null,"creator-version":"v6.5.0-master-dirty"}`, + ) + // ignore cluster id only changefeed + checkEtcdPut( + "/tidb/cdc/5402613591834624000/changefeed/info/test", + `{"upstream-id":7195826648407968958,"namespace":"default","changefeed-id":"test-1","sink-uri":"mysql://root@127.0.0.1:3306?time-zone=","create-time":"2023-02-03T15:23:34.773768+08:00","start-ts":439198420741652483,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":"","config":{"memory-quota":1073741824,"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"enable-sync-point":false,"bdr-mode":false,"sync-point-interval":600000000000,"sync-point-retention":86400000000000,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"event-filters":null},"mounter":{"worker-num":16},"sink":{"transaction-atomicity":"","protocol":"","dispatchers":null,"csv":{"delimiter":",","quote":"\"","null":"\\N","include-commit-ts":false},"column-selectors":null,"schema-registry":"","encoder-concurrency":16,"terminator":"\r\n","date-separator":"none","enable-partition-separator":false},"consistent":{"level":"none","max-log-size":64,"flush-interval":2000,"storage":""},"scheduler":{"region-per-span":0}},"state":"normal","error":null,"creator-version":"v6.5.0-master-dirty"}`, + ) + // ignore no cluster id changefeed + checkEtcdPut( + "/tidb/cdc/xxx/default/changefeed/info/test1", + `{"upstream-id":7195826648407968958,"namespace":"default","changefeed-id":"test-1","sink-uri":"mysql://root@127.0.0.1:3306?time-zone=","create-time":"2023-02-03T15:23:34.773768+08:00","start-ts":439198420741652483,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":"","config":{"memory-quota":1073741824,"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"enable-sync-point":false,"bdr-mode":false,"sync-point-interval":600000000000,"sync-point-retention":86400000000000,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"event-filters":null},"mounter":{"worker-num":16},"sink":{"transaction-atomicity":"","protocol":"","dispatchers":null,"csv":{"delimiter":",","quote":"\"","null":"\\N","include-commit-ts":false},"column-selectors":null,"schema-registry":"","encoder-concurrency":16,"terminator":"\r\n","date-separator":"none","enable-partition-separator":false},"consistent":{"level":"none","max-log-size":64,"flush-interval":2000,"storage":""},"scheduler":{"region-per-span":0}},"state":"normal","error":null,"creator-version":"v6.5.0-master-dirty"}`, + ) + nameSet, err = utils.GetCDCChangefeedNameSet(ctx, cli) + require.NoError(t, err) + require.True(t, nameSet.Empty()) } From 3ceb144ff994e956917dedc55a5e294acb6b612b Mon Sep 17 00:00:00 2001 From: 3pointer Date: Wed, 27 Sep 2023 14:45:38 +0800 Subject: [PATCH 02/11] update --- br/pkg/utils/cdc.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/br/pkg/utils/cdc.go b/br/pkg/utils/cdc.go index 556f1bc79360f..9dbcb9eea3092 100644 --- a/br/pkg/utils/cdc.go +++ b/br/pkg/utils/cdc.go @@ -63,7 +63,7 @@ func (s *CDCNameSet) MessageToUser() string { // for CDC <= v6.1, the etcd key format is /tidb/cdc/changefeed/info/ func GetCDCChangefeedNameSet(ctx context.Context, cli *clientv3.Client) (*CDCNameSet, error) { nameSet := make(map[string][]string, 1) - // check etcd KV of CDC >= v5.2 + // check etcd KV of CDC >= v6.2 resp, err := cli.Get(ctx, CDCPrefix, clientv3.WithPrefix()) if err != nil { return nil, errors.Trace(err) From 369eda0f9210539dd54c4fc6e18dea23a961b7a1 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Wed, 27 Sep 2023 15:28:08 +0800 Subject: [PATCH 03/11] update --- br/pkg/utils/cdc.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/br/pkg/utils/cdc.go b/br/pkg/utils/cdc.go index 9dbcb9eea3092..9837199223699 100644 --- a/br/pkg/utils/cdc.go +++ b/br/pkg/utils/cdc.go @@ -89,11 +89,12 @@ func GetCDCChangefeedNameSet(ctx context.Context, cli *clientv3.Client) (*CDCNam // r := rand.New(rand.NewSource(time.Now().UnixNano())) // ts := uint64(time.Now().Unix()) // clusterID := (ts << 32) + uint64(r.Uint32()) - matched, err := regexp.Match("^[0-9]+$", clusterID) + reg, err := regexp.Compile("^[0-9]+$") if err != nil { log.L().Warn("failed to parse cluster id, skip it", zap.String("cluster ID", string(clusterID)), zap.Error(err)) continue } + matched := reg.Match(clusterID) if !matched { continue } From 504aa2d900e6e8b729492ddd0f78ffedee172160 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Wed, 27 Sep 2023 16:10:13 +0800 Subject: [PATCH 04/11] update --- br/pkg/utils/cdc.go | 34 ++++++++++++++++++---------------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/br/pkg/utils/cdc.go b/br/pkg/utils/cdc.go index 9837199223699..0faf85ad51600 100644 --- a/br/pkg/utils/cdc.go +++ b/br/pkg/utils/cdc.go @@ -69,6 +69,16 @@ func GetCDCChangefeedNameSet(ctx context.Context, cli *clientv3.Client) (*CDCNam return nil, errors.Trace(err) } + // Generate a random cluster ID in pd + // r := rand.New(rand.NewSource(time.Now().UnixNano())) + // ts := uint64(time.Now().Unix()) + // clusterID := (ts << 32) + uint64(r.Uint32()) + reg, err := regexp.Compile("^[0-9]+$") + if err != nil { + log.L().Warn("failed to parse cluster id, skip it", zap.Error(err)) + reg = nil + } + for _, kv := range resp.Kvs { // example: /tidb/cdc///changefeed/info/ k := kv.Key[len(CDCPrefix):] @@ -85,22 +95,14 @@ func GetCDCChangefeedNameSet(ctx context.Context, cli *clientv3.Client) (*CDCNam continue } - // Generate a random cluster ID in pd - // r := rand.New(rand.NewSource(time.Now().UnixNano())) - // ts := uint64(time.Now().Unix()) - // clusterID := (ts << 32) + uint64(r.Uint32()) - reg, err := regexp.Compile("^[0-9]+$") - if err != nil { - log.L().Warn("failed to parse cluster id, skip it", zap.String("cluster ID", string(clusterID)), zap.Error(err)) - continue - } - matched := reg.Match(clusterID) - if !matched { - continue - } - - if !isActiveCDCChangefeed(kv.Value) { - continue + if reg != nil { + matched := reg.Match(clusterID) + if !matched { + continue + } + if !isActiveCDCChangefeed(kv.Value) { + continue + } } nameSet[string(clusterAndNamespace)] = append(nameSet[string(clusterAndNamespace)], string(changefeedID)) From b08534b93054f5e160a59e1b3af7214d0361343d Mon Sep 17 00:00:00 2001 From: 3pointer Date: Wed, 27 Sep 2023 17:43:03 +0800 Subject: [PATCH 05/11] fix test --- executor/importer/precheck_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executor/importer/precheck_test.go b/executor/importer/precheck_test.go index 6b795e7181eef..2fea681cb0800 100644 --- a/executor/importer/precheck_test.go +++ b/executor/importer/precheck_test.go @@ -137,7 +137,7 @@ func TestCheckRequirements(t *testing.T) { _, err = etcdCli.Delete(ctx, pitrKey) require.NoError(t, err) // example: /tidb/cdc///changefeed/info/ - cdcKey := utils.CDCPrefix + "test_cluster/test_ns/changefeed/info/test_cf" + cdcKey := utils.CDCPrefix + "5402613591834624000/test_ns/changefeed/info/test_cf" _, err = etcdCli.Put(ctx, cdcKey, `{"state":"normal"}`) require.NoError(t, err) err = c.CheckRequirements(ctx, conn) From fa63ec04841427c5ea9e47a503c4b8c94315af27 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Wed, 27 Sep 2023 18:02:11 +0800 Subject: [PATCH 06/11] fix test --- br/pkg/lightning/importer/precheck_impl_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/br/pkg/lightning/importer/precheck_impl_test.go b/br/pkg/lightning/importer/precheck_impl_test.go index a7839ba821afd..66a7fa509512e 100644 --- a/br/pkg/lightning/importer/precheck_impl_test.go +++ b/br/pkg/lightning/importer/precheck_impl_test.go @@ -630,16 +630,16 @@ func (s *precheckImplSuite) TestCDCPITRCheckItem() { s.Require().NoError(err) } // TiCDC >= v6.2 - checkEtcdPut("/tidb/cdc/default/__cdc_meta__/capture/3ecd5c98-0148-4086-adfd-17641995e71f") + checkEtcdPut("/tidb/cdc//__cdc_meta__/capture/3ecd5c98-0148-4086-adfd-17641995e71f") checkEtcdPut("/tidb/cdc/default/__cdc_meta__/meta/meta-version") checkEtcdPut("/tidb/cdc/default/__cdc_meta__/meta/ticdc-delete-etcd-key-count") checkEtcdPut("/tidb/cdc/default/__cdc_meta__/owner/22318498f4dd6639") checkEtcdPut( - "/tidb/cdc/default/default/changefeed/info/test", + "/tidb/cdc/5402613591834624000/default/changefeed/info/test", `{"upstream-id":7195826648407968958,"namespace":"default","changefeed-id":"test-1","sink-uri":"mysql://root@127.0.0.1:3306?time-zone=","create-time":"2023-02-03T15:23:34.773768+08:00","start-ts":439198420741652483,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":"","config":{"memory-quota":1073741824,"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"enable-sync-point":false,"bdr-mode":false,"sync-point-interval":600000000000,"sync-point-retention":86400000000000,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"event-filters":null},"mounter":{"worker-num":16},"sink":{"transaction-atomicity":"","protocol":"","dispatchers":null,"csv":{"delimiter":",","quote":"\"","null":"\\N","include-commit-ts":false},"column-selectors":null,"schema-registry":"","encoder-concurrency":16,"terminator":"\r\n","date-separator":"none","enable-partition-separator":false},"consistent":{"level":"none","max-log-size":64,"flush-interval":2000,"storage":""},"scheduler":{"region-per-span":0}},"state":"normal","error":null,"creator-version":"v6.5.0-master-dirty"}`, ) checkEtcdPut( - "/tidb/cdc/default/default/changefeed/info/test-1", + "/tidb/cdc/5402613591834624000/default/changefeed/info/test-1", `{"upstream-id":7195826648407968958,"namespace":"default","changefeed-id":"test-1","sink-uri":"mysql://root@127.0.0.1:3306?time-zone=","create-time":"2023-02-03T15:23:34.773768+08:00","start-ts":439198420741652483,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":"","config":{"memory-quota":1073741824,"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"enable-sync-point":false,"bdr-mode":false,"sync-point-interval":600000000000,"sync-point-retention":86400000000000,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"event-filters":null},"mounter":{"worker-num":16},"sink":{"transaction-atomicity":"","protocol":"","dispatchers":null,"csv":{"delimiter":",","quote":"\"","null":"\\N","include-commit-ts":false},"column-selectors":null,"schema-registry":"","encoder-concurrency":16,"terminator":"\r\n","date-separator":"none","enable-partition-separator":false},"consistent":{"level":"none","max-log-size":64,"flush-interval":2000,"storage":""},"scheduler":{"region-per-span":0}},"state":"failed","error":null,"creator-version":"v6.5.0-master-dirty"}`, ) checkEtcdPut("/tidb/cdc/default/default/changefeed/status/test") @@ -651,7 +651,7 @@ func (s *precheckImplSuite) TestCDCPITRCheckItem() { s.Require().NoError(err) s.Require().False(result.Passed) s.Require().Equal("found PiTR log streaming task(s): [br_name],\n"+ - "found CDC changefeed(s): cluster/namespace: default/default changefeed(s): [test], \n"+ + "found CDC changefeed(s): cluster/namespace: 5402613591834624000/default changefeed(s): [test], \n"+ "local backend is not compatible with them. Please switch to tidb backend then try again.", result.Message) From 3f89da2a51b1e1dec15adfe9de639cf6339adcf5 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Thu, 28 Sep 2023 11:20:53 +0800 Subject: [PATCH 07/11] address comment --- br/pkg/lightning/importer/precheck_impl_test.go | 6 +++--- br/pkg/utils/cdc.go | 2 +- br/pkg/utils/cdc_test.go | 11 +++-------- 3 files changed, 7 insertions(+), 12 deletions(-) diff --git a/br/pkg/lightning/importer/precheck_impl_test.go b/br/pkg/lightning/importer/precheck_impl_test.go index 66a7fa509512e..3042391c4e562 100644 --- a/br/pkg/lightning/importer/precheck_impl_test.go +++ b/br/pkg/lightning/importer/precheck_impl_test.go @@ -635,11 +635,11 @@ func (s *precheckImplSuite) TestCDCPITRCheckItem() { checkEtcdPut("/tidb/cdc/default/__cdc_meta__/meta/ticdc-delete-etcd-key-count") checkEtcdPut("/tidb/cdc/default/__cdc_meta__/owner/22318498f4dd6639") checkEtcdPut( - "/tidb/cdc/5402613591834624000/default/changefeed/info/test", + "/tidb/cdc/default/default/changefeed/info/test", `{"upstream-id":7195826648407968958,"namespace":"default","changefeed-id":"test-1","sink-uri":"mysql://root@127.0.0.1:3306?time-zone=","create-time":"2023-02-03T15:23:34.773768+08:00","start-ts":439198420741652483,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":"","config":{"memory-quota":1073741824,"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"enable-sync-point":false,"bdr-mode":false,"sync-point-interval":600000000000,"sync-point-retention":86400000000000,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"event-filters":null},"mounter":{"worker-num":16},"sink":{"transaction-atomicity":"","protocol":"","dispatchers":null,"csv":{"delimiter":",","quote":"\"","null":"\\N","include-commit-ts":false},"column-selectors":null,"schema-registry":"","encoder-concurrency":16,"terminator":"\r\n","date-separator":"none","enable-partition-separator":false},"consistent":{"level":"none","max-log-size":64,"flush-interval":2000,"storage":""},"scheduler":{"region-per-span":0}},"state":"normal","error":null,"creator-version":"v6.5.0-master-dirty"}`, ) checkEtcdPut( - "/tidb/cdc/5402613591834624000/default/changefeed/info/test-1", + "/tidb/cdc/default/default/changefeed/info/test-1", `{"upstream-id":7195826648407968958,"namespace":"default","changefeed-id":"test-1","sink-uri":"mysql://root@127.0.0.1:3306?time-zone=","create-time":"2023-02-03T15:23:34.773768+08:00","start-ts":439198420741652483,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":"","config":{"memory-quota":1073741824,"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"enable-sync-point":false,"bdr-mode":false,"sync-point-interval":600000000000,"sync-point-retention":86400000000000,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"event-filters":null},"mounter":{"worker-num":16},"sink":{"transaction-atomicity":"","protocol":"","dispatchers":null,"csv":{"delimiter":",","quote":"\"","null":"\\N","include-commit-ts":false},"column-selectors":null,"schema-registry":"","encoder-concurrency":16,"terminator":"\r\n","date-separator":"none","enable-partition-separator":false},"consistent":{"level":"none","max-log-size":64,"flush-interval":2000,"storage":""},"scheduler":{"region-per-span":0}},"state":"failed","error":null,"creator-version":"v6.5.0-master-dirty"}`, ) checkEtcdPut("/tidb/cdc/default/default/changefeed/status/test") @@ -651,7 +651,7 @@ func (s *precheckImplSuite) TestCDCPITRCheckItem() { s.Require().NoError(err) s.Require().False(result.Passed) s.Require().Equal("found PiTR log streaming task(s): [br_name],\n"+ - "found CDC changefeed(s): cluster/namespace: 5402613591834624000/default changefeed(s): [test], \n"+ + "found CDC changefeed(s): cluster/namespace: default/default changefeed(s): [test], \n"+ "local backend is not compatible with them. Please switch to tidb backend then try again.", result.Message) diff --git a/br/pkg/utils/cdc.go b/br/pkg/utils/cdc.go index 0faf85ad51600..a17b6ad0b2167 100644 --- a/br/pkg/utils/cdc.go +++ b/br/pkg/utils/cdc.go @@ -73,7 +73,7 @@ func GetCDCChangefeedNameSet(ctx context.Context, cli *clientv3.Client) (*CDCNam // r := rand.New(rand.NewSource(time.Now().UnixNano())) // ts := uint64(time.Now().Unix()) // clusterID := (ts << 32) + uint64(r.Uint32()) - reg, err := regexp.Compile("^[0-9]+$") + reg, err := regexp.Compile("^[a-zA-Z0-9]+(-[a-zA-Z0-9]+)*$") if err != nil { log.L().Warn("failed to parse cluster id, skip it", zap.Error(err)) reg = nil diff --git a/br/pkg/utils/cdc_test.go b/br/pkg/utils/cdc_test.go index 0c9ad25e09bdc..3b5644c22631d 100644 --- a/br/pkg/utils/cdc_test.go +++ b/br/pkg/utils/cdc_test.go @@ -49,11 +49,11 @@ func TestGetCDCChangefeedNameSet(t *testing.T) { checkEtcdPut("/tidb/cdc/default/__cdc_meta__/meta/ticdc-delete-etcd-key-count") checkEtcdPut("/tidb/cdc/default/__cdc_meta__/owner/22318498f4dd6639") checkEtcdPut( - "/tidb/cdc/5402613591834624000/default/changefeed/info/test", + "/tidb/cdc/default/default/changefeed/info/test", `{"upstream-id":7195826648407968958,"namespace":"default","changefeed-id":"test-1","sink-uri":"mysql://root@127.0.0.1:3306?time-zone=","create-time":"2023-02-03T15:23:34.773768+08:00","start-ts":439198420741652483,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":"","config":{"memory-quota":1073741824,"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"enable-sync-point":false,"bdr-mode":false,"sync-point-interval":600000000000,"sync-point-retention":86400000000000,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"event-filters":null},"mounter":{"worker-num":16},"sink":{"transaction-atomicity":"","protocol":"","dispatchers":null,"csv":{"delimiter":",","quote":"\"","null":"\\N","include-commit-ts":false},"column-selectors":null,"schema-registry":"","encoder-concurrency":16,"terminator":"\r\n","date-separator":"none","enable-partition-separator":false},"consistent":{"level":"none","max-log-size":64,"flush-interval":2000,"storage":""},"scheduler":{"region-per-span":0}},"state":"normal","error":null,"creator-version":"v6.5.0-master-dirty"}`, ) checkEtcdPut( - "/tidb/cdc/5402613591834624000/default/changefeed/info/test-1", + "/tidb/cdc/default/default/changefeed/info/test-1", `{"upstream-id":7195826648407968958,"namespace":"default","changefeed-id":"test-1","sink-uri":"mysql://root@127.0.0.1:3306?time-zone=","create-time":"2023-02-03T15:23:34.773768+08:00","start-ts":439198420741652483,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":"","config":{"memory-quota":1073741824,"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"enable-sync-point":false,"bdr-mode":false,"sync-point-interval":600000000000,"sync-point-retention":86400000000000,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"event-filters":null},"mounter":{"worker-num":16},"sink":{"transaction-atomicity":"","protocol":"","dispatchers":null,"csv":{"delimiter":",","quote":"\"","null":"\\N","include-commit-ts":false},"column-selectors":null,"schema-registry":"","encoder-concurrency":16,"terminator":"\r\n","date-separator":"none","enable-partition-separator":false},"consistent":{"level":"none","max-log-size":64,"flush-interval":2000,"storage":""},"scheduler":{"region-per-span":0}},"state":"failed","error":null,"creator-version":"v6.5.0-master-dirty"}`, ) checkEtcdPut("/tidb/cdc/default/default/changefeed/status/test") @@ -64,7 +64,7 @@ func TestGetCDCChangefeedNameSet(t *testing.T) { nameSet, err = utils.GetCDCChangefeedNameSet(ctx, cli) require.NoError(t, err) require.False(t, nameSet.Empty()) - require.Equal(t, "found CDC changefeed(s): cluster/namespace: 5402613591834624000/default changefeed(s): [test], ", + require.Equal(t, "found CDC changefeed(s): cluster/namespace: default/default changefeed(s): [test], ", nameSet.MessageToUser()) _, err = cli.Delete(ctx, "/tidb/cdc/", clientv3.WithPrefix()) @@ -99,11 +99,6 @@ func TestGetCDCChangefeedNameSet(t *testing.T) { "/tidb/cdc/5402613591834624000/changefeed/info/test", `{"upstream-id":7195826648407968958,"namespace":"default","changefeed-id":"test-1","sink-uri":"mysql://root@127.0.0.1:3306?time-zone=","create-time":"2023-02-03T15:23:34.773768+08:00","start-ts":439198420741652483,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":"","config":{"memory-quota":1073741824,"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"enable-sync-point":false,"bdr-mode":false,"sync-point-interval":600000000000,"sync-point-retention":86400000000000,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"event-filters":null},"mounter":{"worker-num":16},"sink":{"transaction-atomicity":"","protocol":"","dispatchers":null,"csv":{"delimiter":",","quote":"\"","null":"\\N","include-commit-ts":false},"column-selectors":null,"schema-registry":"","encoder-concurrency":16,"terminator":"\r\n","date-separator":"none","enable-partition-separator":false},"consistent":{"level":"none","max-log-size":64,"flush-interval":2000,"storage":""},"scheduler":{"region-per-span":0}},"state":"normal","error":null,"creator-version":"v6.5.0-master-dirty"}`, ) - // ignore no cluster id changefeed - checkEtcdPut( - "/tidb/cdc/xxx/default/changefeed/info/test1", - `{"upstream-id":7195826648407968958,"namespace":"default","changefeed-id":"test-1","sink-uri":"mysql://root@127.0.0.1:3306?time-zone=","create-time":"2023-02-03T15:23:34.773768+08:00","start-ts":439198420741652483,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":"","config":{"memory-quota":1073741824,"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"enable-sync-point":false,"bdr-mode":false,"sync-point-interval":600000000000,"sync-point-retention":86400000000000,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"event-filters":null},"mounter":{"worker-num":16},"sink":{"transaction-atomicity":"","protocol":"","dispatchers":null,"csv":{"delimiter":",","quote":"\"","null":"\\N","include-commit-ts":false},"column-selectors":null,"schema-registry":"","encoder-concurrency":16,"terminator":"\r\n","date-separator":"none","enable-partition-separator":false},"consistent":{"level":"none","max-log-size":64,"flush-interval":2000,"storage":""},"scheduler":{"region-per-span":0}},"state":"normal","error":null,"creator-version":"v6.5.0-master-dirty"}`, - ) nameSet, err = utils.GetCDCChangefeedNameSet(ctx, cli) require.NoError(t, err) require.True(t, nameSet.Empty()) From e06940336cba4d4643508f1e6e77cfa835cda93e Mon Sep 17 00:00:00 2001 From: 3pointer Date: Thu, 28 Sep 2023 11:22:01 +0800 Subject: [PATCH 08/11] update --- br/pkg/lightning/importer/precheck_impl_test.go | 2 +- executor/importer/precheck_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/br/pkg/lightning/importer/precheck_impl_test.go b/br/pkg/lightning/importer/precheck_impl_test.go index 3042391c4e562..a7839ba821afd 100644 --- a/br/pkg/lightning/importer/precheck_impl_test.go +++ b/br/pkg/lightning/importer/precheck_impl_test.go @@ -630,7 +630,7 @@ func (s *precheckImplSuite) TestCDCPITRCheckItem() { s.Require().NoError(err) } // TiCDC >= v6.2 - checkEtcdPut("/tidb/cdc//__cdc_meta__/capture/3ecd5c98-0148-4086-adfd-17641995e71f") + checkEtcdPut("/tidb/cdc/default/__cdc_meta__/capture/3ecd5c98-0148-4086-adfd-17641995e71f") checkEtcdPut("/tidb/cdc/default/__cdc_meta__/meta/meta-version") checkEtcdPut("/tidb/cdc/default/__cdc_meta__/meta/ticdc-delete-etcd-key-count") checkEtcdPut("/tidb/cdc/default/__cdc_meta__/owner/22318498f4dd6639") diff --git a/executor/importer/precheck_test.go b/executor/importer/precheck_test.go index 2fea681cb0800..6b795e7181eef 100644 --- a/executor/importer/precheck_test.go +++ b/executor/importer/precheck_test.go @@ -137,7 +137,7 @@ func TestCheckRequirements(t *testing.T) { _, err = etcdCli.Delete(ctx, pitrKey) require.NoError(t, err) // example: /tidb/cdc///changefeed/info/ - cdcKey := utils.CDCPrefix + "5402613591834624000/test_ns/changefeed/info/test_cf" + cdcKey := utils.CDCPrefix + "test_cluster/test_ns/changefeed/info/test_cf" _, err = etcdCli.Put(ctx, cdcKey, `{"state":"normal"}`) require.NoError(t, err) err = c.CheckRequirements(ctx, conn) From 2807c6b15f9df77381ac9482275f43de9b1456d1 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Thu, 28 Sep 2023 13:26:03 +0800 Subject: [PATCH 09/11] update --- br/pkg/utils/cdc.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/br/pkg/utils/cdc.go b/br/pkg/utils/cdc.go index a17b6ad0b2167..4d1b83917af01 100644 --- a/br/pkg/utils/cdc.go +++ b/br/pkg/utils/cdc.go @@ -69,10 +69,8 @@ func GetCDCChangefeedNameSet(ctx context.Context, cli *clientv3.Client) (*CDCNam return nil, errors.Trace(err) } - // Generate a random cluster ID in pd - // r := rand.New(rand.NewSource(time.Now().UnixNano())) - // ts := uint64(time.Now().Unix()) - // clusterID := (ts << 32) + uint64(r.Uint32()) + // cluster id should be valid in + // https://github.com/pingcap/tiflow/blob/ca69c33948bea082aff9f4c0a357ace735b494ed/pkg/config/server_config.go#L218 reg, err := regexp.Compile("^[a-zA-Z0-9]+(-[a-zA-Z0-9]+)*$") if err != nil { log.L().Warn("failed to parse cluster id, skip it", zap.Error(err)) From aef5d0c277f8775968db0d2cfaa8d4834ed7fa7b Mon Sep 17 00:00:00 2001 From: 3pointer Date: Thu, 28 Sep 2023 14:02:30 +0800 Subject: [PATCH 10/11] address comment --- br/pkg/utils/cdc.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/br/pkg/utils/cdc.go b/br/pkg/utils/cdc.go index 4d1b83917af01..69e029135879c 100644 --- a/br/pkg/utils/cdc.go +++ b/br/pkg/utils/cdc.go @@ -143,7 +143,8 @@ func isActiveCDCChangefeed(jsonBytes []byte) bool { return false } switch s.State { - case "normal", "stopped", "error": + // https://docs.pingcap.com/zh/tidb/stable/ticdc-changefeed-overview + case "normal", "stopped", "error", "warning": return true default: return false From cf3112c407f8554bc63ca566e127364d12f9a892 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Thu, 28 Sep 2023 16:12:14 +0800 Subject: [PATCH 11/11] fix test --- executor/importer/precheck_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executor/importer/precheck_test.go b/executor/importer/precheck_test.go index 6b795e7181eef..2643c585d3594 100644 --- a/executor/importer/precheck_test.go +++ b/executor/importer/precheck_test.go @@ -137,7 +137,7 @@ func TestCheckRequirements(t *testing.T) { _, err = etcdCli.Delete(ctx, pitrKey) require.NoError(t, err) // example: /tidb/cdc///changefeed/info/ - cdcKey := utils.CDCPrefix + "test_cluster/test_ns/changefeed/info/test_cf" + cdcKey := utils.CDCPrefix + "testcluster/test_ns/changefeed/info/test_cf" _, err = etcdCli.Put(ctx, cdcKey, `{"state":"normal"}`) require.NoError(t, err) err = c.CheckRequirements(ctx, conn)