diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 881ad795f0405..176ac3119bf08 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -1056,7 +1056,20 @@ func checkTaskExists(ctx context.Context, cfg *RestoreConfig) error { return err } if len(tasks) > 0 { - return errors.Errorf("log backup task is running: %s, please stop the task before restore, and after PITR operation finished, create log-backup task again and create a full backup on this cluster", tasks[0].Info.Name) + return errors.Errorf("log backup task is running: %s, "+ + "please stop the task before restore, and after PITR operation finished, "+ + "create log-backup task again and create a full backup on this cluster", tasks[0].Info.Name) + } + + // check cdc changefeed + if cfg.CheckRequirements { + nameSet, err := utils.GetCDCChangefeedNameSet(ctx, etcdCLI) + if err != nil { + return err + } + if !nameSet.Empty() { + return errors.Errorf("%splease stop changefeed(s) before restore", nameSet.MessageToUser()) + } } return nil } diff --git a/br/pkg/utils/BUILD.bazel b/br/pkg/utils/BUILD.bazel index b455237dbf506..d119c77364e1b 100644 --- a/br/pkg/utils/BUILD.bazel +++ b/br/pkg/utils/BUILD.bazel @@ -4,6 +4,7 @@ go_library( name = "utils", srcs = [ "backoff.go", + "cdc.go", "db.go", "dyn_pprof_other.go", "dyn_pprof_unix.go", @@ -51,6 +52,7 @@ go_library( "@com_github_tikv_client_go_v2//oracle", "@com_github_tikv_client_go_v2//tikv", "@com_github_tikv_pd_client//:client", + "@io_etcd_go_etcd_client_v3//:client", "@org_golang_google_grpc//:grpc", "@org_golang_google_grpc//backoff", "@org_golang_google_grpc//codes", @@ -71,6 +73,7 @@ go_test( timeout = "short", srcs = [ "backoff_test.go", + "cdc_test.go", "db_test.go", "env_test.go", "json_test.go", @@ -112,6 +115,8 @@ go_test( "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//tikv", "@com_github_tikv_pd_client//:client", + "@io_etcd_go_etcd_client_v3//:client", + "@io_etcd_go_etcd_tests_v3//integration", "@org_golang_google_grpc//:grpc", "@org_golang_google_grpc//codes", "@org_golang_google_grpc//status", diff --git a/br/pkg/utils/cdc.go b/br/pkg/utils/cdc.go new file mode 100644 index 0000000000000..69e029135879c --- /dev/null +++ b/br/pkg/utils/cdc.go @@ -0,0 +1,152 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package utils + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "regexp" + "strings" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + clientv3 "go.etcd.io/etcd/client/v3" + "go.uber.org/zap" +) + +const ( + CDCPrefix = "/tidb/cdc/" + ChangefeedPath = "/changefeed/info/" + CDCPrefixV61 = "/tidb/cdc/changefeed/info/" +) + +// CDCNameSet saves CDC changefeed's information. +// nameSet maps `cluster/namespace` to `changefeed`s +type CDCNameSet struct { + nameSet map[string][]string +} + +// that the nameSet is empty means no changefeed exists. +func (s *CDCNameSet) Empty() bool { + return len(s.nameSet) == 0 +} + +// MessageToUser convert the map `nameSet` to a readable message to user. +func (s *CDCNameSet) MessageToUser() string { + var changefeedMsgBuf strings.Builder + changefeedMsgBuf.WriteString("found CDC changefeed(s): ") + for clusterID, captureIDs := range s.nameSet { + changefeedMsgBuf.WriteString("cluster/namespace: ") + changefeedMsgBuf.WriteString(clusterID) + changefeedMsgBuf.WriteString(" changefeed(s): ") + changefeedMsgBuf.WriteString(fmt.Sprintf("%v", captureIDs)) + changefeedMsgBuf.WriteString(", ") + } + return changefeedMsgBuf.String() +} + +// GetCDCChangefeedNameSet gets CDC changefeed information and wraps them to a map +// for CDC >= v6.2, the etcd key format is /tidb/cdc///changefeed/info/ +// for CDC <= v6.1, the etcd key format is /tidb/cdc/changefeed/info/ +func GetCDCChangefeedNameSet(ctx context.Context, cli *clientv3.Client) (*CDCNameSet, error) { + nameSet := make(map[string][]string, 1) + // check etcd KV of CDC >= v6.2 + resp, err := cli.Get(ctx, CDCPrefix, clientv3.WithPrefix()) + if err != nil { + return nil, errors.Trace(err) + } + + // cluster id should be valid in + // https://github.com/pingcap/tiflow/blob/ca69c33948bea082aff9f4c0a357ace735b494ed/pkg/config/server_config.go#L218 + reg, err := regexp.Compile("^[a-zA-Z0-9]+(-[a-zA-Z0-9]+)*$") + if err != nil { + log.L().Warn("failed to parse cluster id, skip it", zap.Error(err)) + reg = nil + } + + for _, kv := range resp.Kvs { + // example: /tidb/cdc///changefeed/info/ + k := kv.Key[len(CDCPrefix):] + clusterAndNamespace, changefeedID, found := bytes.Cut(k, []byte(ChangefeedPath)) + if !found { + continue + } + // example: clusterAndNamespace normally is / + // but in migration scenario it become __backup__. we need handle it + // see https://github.com/pingcap/tiflow/issues/9807 + clusterID, _, found := bytes.Cut(clusterAndNamespace, []byte(`/`)) + if !found { + // ignore __backup__ or other formats + continue + } + + if reg != nil { + matched := reg.Match(clusterID) + if !matched { + continue + } + if !isActiveCDCChangefeed(kv.Value) { + continue + } + } + + nameSet[string(clusterAndNamespace)] = append(nameSet[string(clusterAndNamespace)], string(changefeedID)) + } + if len(nameSet) == 0 { + // check etcd KV of CDC <= v6.1 + resp, err = cli.Get(ctx, CDCPrefixV61, clientv3.WithPrefix()) + if err != nil { + return nil, errors.Trace(err) + } + for _, kv := range resp.Kvs { + // example: /tidb/cdc/changefeed/info/ + k := kv.Key[len(CDCPrefixV61):] + if len(k) == 0 { + continue + } + if !isActiveCDCChangefeed(kv.Value) { + continue + } + + nameSet[""] = append(nameSet[""], string(k)) + } + } + + return &CDCNameSet{nameSet}, nil +} + +type onlyState struct { + State string `json:"state"` +} + +func isActiveCDCChangefeed(jsonBytes []byte) bool { + s := onlyState{} + err := json.Unmarshal(jsonBytes, &s) + if err != nil { + // maybe a compatible issue, skip this key + log.L().Error("unmarshal etcd value failed when check CDC changefeed, will skip this key", + zap.ByteString("value", jsonBytes), + zap.Error(err)) + return false + } + switch s.State { + // https://docs.pingcap.com/zh/tidb/stable/ticdc-changefeed-overview + case "normal", "stopped", "error", "warning": + return true + default: + return false + } +} diff --git a/br/pkg/utils/cdc_test.go b/br/pkg/utils/cdc_test.go new file mode 100644 index 0000000000000..3b5644c22631d --- /dev/null +++ b/br/pkg/utils/cdc_test.go @@ -0,0 +1,105 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package utils_test + +import ( + "context" + "testing" + + "github.com/pingcap/tidb/br/pkg/utils" + "github.com/stretchr/testify/require" + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/tests/v3/integration" +) + +func TestGetCDCChangefeedNameSet(t *testing.T) { + integration.BeforeTestExternal(t) + testEtcdCluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer testEtcdCluster.Terminate(t) + + ctx := context.Background() + cli := testEtcdCluster.RandClient() + checkEtcdPut := func(key string, vals ...string) { + val := "" + if len(vals) == 1 { + val = vals[0] + } + _, err := cli.Put(ctx, key, val) + require.NoError(t, err) + } + + nameSet, err := utils.GetCDCChangefeedNameSet(ctx, cli) + require.NoError(t, err) + require.True(t, nameSet.Empty()) + + // TiCDC >= v6.2 + checkEtcdPut("/tidb/cdc/default/__cdc_meta__/capture/3ecd5c98-0148-4086-adfd-17641995e71f") + checkEtcdPut("/tidb/cdc/default/__cdc_meta__/meta/meta-version") + checkEtcdPut("/tidb/cdc/default/__cdc_meta__/meta/ticdc-delete-etcd-key-count") + checkEtcdPut("/tidb/cdc/default/__cdc_meta__/owner/22318498f4dd6639") + checkEtcdPut( + "/tidb/cdc/default/default/changefeed/info/test", + `{"upstream-id":7195826648407968958,"namespace":"default","changefeed-id":"test-1","sink-uri":"mysql://root@127.0.0.1:3306?time-zone=","create-time":"2023-02-03T15:23:34.773768+08:00","start-ts":439198420741652483,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":"","config":{"memory-quota":1073741824,"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"enable-sync-point":false,"bdr-mode":false,"sync-point-interval":600000000000,"sync-point-retention":86400000000000,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"event-filters":null},"mounter":{"worker-num":16},"sink":{"transaction-atomicity":"","protocol":"","dispatchers":null,"csv":{"delimiter":",","quote":"\"","null":"\\N","include-commit-ts":false},"column-selectors":null,"schema-registry":"","encoder-concurrency":16,"terminator":"\r\n","date-separator":"none","enable-partition-separator":false},"consistent":{"level":"none","max-log-size":64,"flush-interval":2000,"storage":""},"scheduler":{"region-per-span":0}},"state":"normal","error":null,"creator-version":"v6.5.0-master-dirty"}`, + ) + checkEtcdPut( + "/tidb/cdc/default/default/changefeed/info/test-1", + `{"upstream-id":7195826648407968958,"namespace":"default","changefeed-id":"test-1","sink-uri":"mysql://root@127.0.0.1:3306?time-zone=","create-time":"2023-02-03T15:23:34.773768+08:00","start-ts":439198420741652483,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":"","config":{"memory-quota":1073741824,"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"enable-sync-point":false,"bdr-mode":false,"sync-point-interval":600000000000,"sync-point-retention":86400000000000,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"event-filters":null},"mounter":{"worker-num":16},"sink":{"transaction-atomicity":"","protocol":"","dispatchers":null,"csv":{"delimiter":",","quote":"\"","null":"\\N","include-commit-ts":false},"column-selectors":null,"schema-registry":"","encoder-concurrency":16,"terminator":"\r\n","date-separator":"none","enable-partition-separator":false},"consistent":{"level":"none","max-log-size":64,"flush-interval":2000,"storage":""},"scheduler":{"region-per-span":0}},"state":"failed","error":null,"creator-version":"v6.5.0-master-dirty"}`, + ) + checkEtcdPut("/tidb/cdc/default/default/changefeed/status/test") + checkEtcdPut("/tidb/cdc/default/default/changefeed/status/test-1") + checkEtcdPut("/tidb/cdc/default/default/task/position/3ecd5c98-0148-4086-adfd-17641995e71f/test-1") + checkEtcdPut("/tidb/cdc/default/default/upstream/7168358383033671922") + + nameSet, err = utils.GetCDCChangefeedNameSet(ctx, cli) + require.NoError(t, err) + require.False(t, nameSet.Empty()) + require.Equal(t, "found CDC changefeed(s): cluster/namespace: default/default changefeed(s): [test], ", + nameSet.MessageToUser()) + + _, err = cli.Delete(ctx, "/tidb/cdc/", clientv3.WithPrefix()) + require.NoError(t, err) + + // TiCDC <= v6.1 + checkEtcdPut("/tidb/cdc/capture/f14cb04d-5ba1-410e-a59b-ccd796920e9d") + checkEtcdPut( + "/tidb/cdc/changefeed/info/test", + `{"upstream-id":7195826648407968958,"namespace":"default","changefeed-id":"test-1","sink-uri":"mysql://root@127.0.0.1:3306?time-zone=","create-time":"2023-02-03T15:23:34.773768+08:00","start-ts":439198420741652483,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":"","config":{"memory-quota":1073741824,"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"enable-sync-point":false,"bdr-mode":false,"sync-point-interval":600000000000,"sync-point-retention":86400000000000,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"event-filters":null},"mounter":{"worker-num":16},"sink":{"transaction-atomicity":"","protocol":"","dispatchers":null,"csv":{"delimiter":",","quote":"\"","null":"\\N","include-commit-ts":false},"column-selectors":null,"schema-registry":"","encoder-concurrency":16,"terminator":"\r\n","date-separator":"none","enable-partition-separator":false},"consistent":{"level":"none","max-log-size":64,"flush-interval":2000,"storage":""},"scheduler":{"region-per-span":0}},"state":"stopped","error":null,"creator-version":"v6.5.0-master-dirty"}`, + ) + checkEtcdPut("/tidb/cdc/job/test") + checkEtcdPut("/tidb/cdc/owner/223184ad80a88b0b") + checkEtcdPut("/tidb/cdc/task/position/f14cb04d-5ba1-410e-a59b-ccd796920e9d/test") + + nameSet, err = utils.GetCDCChangefeedNameSet(ctx, cli) + require.NoError(t, err) + require.False(t, nameSet.Empty()) + require.Equal(t, "found CDC changefeed(s): cluster/namespace: changefeed(s): [test], ", + nameSet.MessageToUser()) + + _, err = cli.Delete(ctx, "/tidb/cdc/", clientv3.WithPrefix()) + require.NoError(t, err) + + // ignore __backup__ changefeed + checkEtcdPut( + "/tidb/cdc/__backup__/changefeed/info/test", + `{"upstream-id":7195826648407968958,"namespace":"default","changefeed-id":"test-1","sink-uri":"mysql://root@127.0.0.1:3306?time-zone=","create-time":"2023-02-03T15:23:34.773768+08:00","start-ts":439198420741652483,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":"","config":{"memory-quota":1073741824,"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"enable-sync-point":false,"bdr-mode":false,"sync-point-interval":600000000000,"sync-point-retention":86400000000000,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"event-filters":null},"mounter":{"worker-num":16},"sink":{"transaction-atomicity":"","protocol":"","dispatchers":null,"csv":{"delimiter":",","quote":"\"","null":"\\N","include-commit-ts":false},"column-selectors":null,"schema-registry":"","encoder-concurrency":16,"terminator":"\r\n","date-separator":"none","enable-partition-separator":false},"consistent":{"level":"none","max-log-size":64,"flush-interval":2000,"storage":""},"scheduler":{"region-per-span":0}},"state":"normal","error":null,"creator-version":"v6.5.0-master-dirty"}`, + ) + // ignore cluster id only changefeed + checkEtcdPut( + "/tidb/cdc/5402613591834624000/changefeed/info/test", + `{"upstream-id":7195826648407968958,"namespace":"default","changefeed-id":"test-1","sink-uri":"mysql://root@127.0.0.1:3306?time-zone=","create-time":"2023-02-03T15:23:34.773768+08:00","start-ts":439198420741652483,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":"","config":{"memory-quota":1073741824,"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"enable-sync-point":false,"bdr-mode":false,"sync-point-interval":600000000000,"sync-point-retention":86400000000000,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"event-filters":null},"mounter":{"worker-num":16},"sink":{"transaction-atomicity":"","protocol":"","dispatchers":null,"csv":{"delimiter":",","quote":"\"","null":"\\N","include-commit-ts":false},"column-selectors":null,"schema-registry":"","encoder-concurrency":16,"terminator":"\r\n","date-separator":"none","enable-partition-separator":false},"consistent":{"level":"none","max-log-size":64,"flush-interval":2000,"storage":""},"scheduler":{"region-per-span":0}},"state":"normal","error":null,"creator-version":"v6.5.0-master-dirty"}`, + ) + nameSet, err = utils.GetCDCChangefeedNameSet(ctx, cli) + require.NoError(t, err) + require.True(t, nameSet.Empty()) +}