From 95378e5bedef02de814d6a8c10d1a273ce437c2f Mon Sep 17 00:00:00 2001 From: 3pointer Date: Thu, 28 Sep 2023 18:18:21 +0800 Subject: [PATCH] br: check the correct changefeed info when restore/import data (#47322) close pingcap/tiflow#9807 --- br/pkg/task/stream.go | 14 ++++++++------ br/pkg/utils/cdc.go | 29 +++++++++++++++++++++++++++-- br/pkg/utils/cdc_test.go | 17 +++++++++++++++++ executor/importer/precheck_test.go | 2 +- 4 files changed, 53 insertions(+), 9 deletions(-) diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 56a7c238bc013..ba633947e4e5e 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -1076,12 +1076,14 @@ func checkTaskExists(ctx context.Context, cfg *RestoreConfig, etcdCLI *clientv3. } // check cdc changefeed - nameSet, err := utils.GetCDCChangefeedNameSet(ctx, etcdCLI) - if err != nil { - return err - } - if !nameSet.Empty() { - return errors.Errorf("%splease stop changefeed(s) before restore", nameSet.MessageToUser()) + if cfg.CheckRequirements { + nameSet, err := utils.GetCDCChangefeedNameSet(ctx, etcdCLI) + if err != nil { + return err + } + if !nameSet.Empty() { + return errors.Errorf("%splease stop changefeed(s) before restore", nameSet.MessageToUser()) + } } return nil } diff --git a/br/pkg/utils/cdc.go b/br/pkg/utils/cdc.go index ab655f826fbb0..69e029135879c 100644 --- a/br/pkg/utils/cdc.go +++ b/br/pkg/utils/cdc.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "fmt" + "regexp" "strings" "github.com/pingcap/errors" @@ -68,6 +69,14 @@ func GetCDCChangefeedNameSet(ctx context.Context, cli *clientv3.Client) (*CDCNam return nil, errors.Trace(err) } + // cluster id should be valid in + // https://github.com/pingcap/tiflow/blob/ca69c33948bea082aff9f4c0a357ace735b494ed/pkg/config/server_config.go#L218 + reg, err := regexp.Compile("^[a-zA-Z0-9]+(-[a-zA-Z0-9]+)*$") + if err != nil { + log.L().Warn("failed to parse cluster id, skip it", zap.Error(err)) + reg = nil + } + for _, kv := range resp.Kvs { // example: /tidb/cdc///changefeed/info/ k := kv.Key[len(CDCPrefix):] @@ -75,10 +84,25 @@ func GetCDCChangefeedNameSet(ctx context.Context, cli *clientv3.Client) (*CDCNam if !found { continue } - if !isActiveCDCChangefeed(kv.Value) { + // example: clusterAndNamespace normally is / + // but in migration scenario it become __backup__. we need handle it + // see https://github.com/pingcap/tiflow/issues/9807 + clusterID, _, found := bytes.Cut(clusterAndNamespace, []byte(`/`)) + if !found { + // ignore __backup__ or other formats continue } + if reg != nil { + matched := reg.Match(clusterID) + if !matched { + continue + } + if !isActiveCDCChangefeed(kv.Value) { + continue + } + } + nameSet[string(clusterAndNamespace)] = append(nameSet[string(clusterAndNamespace)], string(changefeedID)) } if len(nameSet) == 0 { @@ -119,7 +143,8 @@ func isActiveCDCChangefeed(jsonBytes []byte) bool { return false } switch s.State { - case "normal", "stopped", "error": + // https://docs.pingcap.com/zh/tidb/stable/ticdc-changefeed-overview + case "normal", "stopped", "error", "warning": return true default: return false diff --git a/br/pkg/utils/cdc_test.go b/br/pkg/utils/cdc_test.go index 1032693a0d3aa..3b5644c22631d 100644 --- a/br/pkg/utils/cdc_test.go +++ b/br/pkg/utils/cdc_test.go @@ -85,4 +85,21 @@ func TestGetCDCChangefeedNameSet(t *testing.T) { require.False(t, nameSet.Empty()) require.Equal(t, "found CDC changefeed(s): cluster/namespace: changefeed(s): [test], ", nameSet.MessageToUser()) + + _, err = cli.Delete(ctx, "/tidb/cdc/", clientv3.WithPrefix()) + require.NoError(t, err) + + // ignore __backup__ changefeed + checkEtcdPut( + "/tidb/cdc/__backup__/changefeed/info/test", + `{"upstream-id":7195826648407968958,"namespace":"default","changefeed-id":"test-1","sink-uri":"mysql://root@127.0.0.1:3306?time-zone=","create-time":"2023-02-03T15:23:34.773768+08:00","start-ts":439198420741652483,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":"","config":{"memory-quota":1073741824,"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"enable-sync-point":false,"bdr-mode":false,"sync-point-interval":600000000000,"sync-point-retention":86400000000000,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"event-filters":null},"mounter":{"worker-num":16},"sink":{"transaction-atomicity":"","protocol":"","dispatchers":null,"csv":{"delimiter":",","quote":"\"","null":"\\N","include-commit-ts":false},"column-selectors":null,"schema-registry":"","encoder-concurrency":16,"terminator":"\r\n","date-separator":"none","enable-partition-separator":false},"consistent":{"level":"none","max-log-size":64,"flush-interval":2000,"storage":""},"scheduler":{"region-per-span":0}},"state":"normal","error":null,"creator-version":"v6.5.0-master-dirty"}`, + ) + // ignore cluster id only changefeed + checkEtcdPut( + "/tidb/cdc/5402613591834624000/changefeed/info/test", + `{"upstream-id":7195826648407968958,"namespace":"default","changefeed-id":"test-1","sink-uri":"mysql://root@127.0.0.1:3306?time-zone=","create-time":"2023-02-03T15:23:34.773768+08:00","start-ts":439198420741652483,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":"","config":{"memory-quota":1073741824,"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"enable-sync-point":false,"bdr-mode":false,"sync-point-interval":600000000000,"sync-point-retention":86400000000000,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"event-filters":null},"mounter":{"worker-num":16},"sink":{"transaction-atomicity":"","protocol":"","dispatchers":null,"csv":{"delimiter":",","quote":"\"","null":"\\N","include-commit-ts":false},"column-selectors":null,"schema-registry":"","encoder-concurrency":16,"terminator":"\r\n","date-separator":"none","enable-partition-separator":false},"consistent":{"level":"none","max-log-size":64,"flush-interval":2000,"storage":""},"scheduler":{"region-per-span":0}},"state":"normal","error":null,"creator-version":"v6.5.0-master-dirty"}`, + ) + nameSet, err = utils.GetCDCChangefeedNameSet(ctx, cli) + require.NoError(t, err) + require.True(t, nameSet.Empty()) } diff --git a/executor/importer/precheck_test.go b/executor/importer/precheck_test.go index 6b795e7181eef..2643c585d3594 100644 --- a/executor/importer/precheck_test.go +++ b/executor/importer/precheck_test.go @@ -137,7 +137,7 @@ func TestCheckRequirements(t *testing.T) { _, err = etcdCli.Delete(ctx, pitrKey) require.NoError(t, err) // example: /tidb/cdc///changefeed/info/ - cdcKey := utils.CDCPrefix + "test_cluster/test_ns/changefeed/info/test_cf" + cdcKey := utils.CDCPrefix + "testcluster/test_ns/changefeed/info/test_cf" _, err = etcdCli.Put(ctx, cdcKey, `{"state":"normal"}`) require.NoError(t, err) err = c.CheckRequirements(ctx, conn)