diff --git a/Makefile b/Makefile index 06bf3f9d564d8..e00542be65752 100644 --- a/Makefile +++ b/Makefile @@ -327,6 +327,7 @@ build_for_br_integration_test: $(GOBUILD) $(RACE_FLAG) -o bin/gc br/tests/br_z_gc_safepoint/*.go && \ $(GOBUILD) $(RACE_FLAG) -o bin/oauth br/tests/br_gcs/*.go && \ $(GOBUILD) $(RACE_FLAG) -o bin/rawkv br/tests/br_rawkv/*.go && \ + $(GOBUILD) $(RACE_FLAG) -o bin/txnkv br/tests/br_txn/*.go && \ $(GOBUILD) $(RACE_FLAG) -o bin/parquet_gen br/tests/lightning_checkpoint_parquet/*.go \ ) || (make failpoint-disable && exit 1) @make failpoint-disable diff --git a/br/pkg/task/restore_txn.go b/br/pkg/task/restore_txn.go index 40ab734cbcd52..7fe38c2bb7f32 100644 --- a/br/pkg/task/restore_txn.go +++ b/br/pkg/task/restore_txn.go @@ -87,7 +87,7 @@ func RunRestoreTxn(c context.Context, g glue.Glue, cmdName string, cfg *Config) !cfg.LogProgress) // RawKV restore does not need to rewrite keys. - err = restore.SplitRanges(ctx, client, ranges, nil, updateCh, true) + err = restore.SplitRanges(ctx, client, ranges, nil, updateCh, false) if err != nil { return errors.Trace(err) } diff --git a/br/tests/br_txn/BUILD.bazel b/br/tests/br_txn/BUILD.bazel new file mode 100644 index 0000000000000..911f746718a6e --- /dev/null +++ b/br/tests/br_txn/BUILD.bazel @@ -0,0 +1,21 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library") + +go_library( + name = "br_txn_lib", + srcs = ["client.go"], + importpath = "github.com/pingcap/tidb/br/tests/br_txn", + visibility = ["//visibility:private"], + deps = [ + "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_log//:log", + "@com_github_tikv_client_go_v2//config", + "@com_github_tikv_client_go_v2//txnkv", + "@org_uber_go_zap//:zap", + ], +) + +go_binary( + name = "br_txn", + embed = [":br_txn_lib"], + visibility = ["//visibility:public"], +) diff --git a/br/tests/br_txn/client.go b/br/tests/br_txn/client.go new file mode 100644 index 0000000000000..6f0bf414e207a --- /dev/null +++ b/br/tests/br_txn/client.go @@ -0,0 +1,268 @@ +package main + +import ( + "bytes" + "context" + "encoding/hex" + "flag" + "fmt" + "hash/crc64" + "math/rand" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/tikv/client-go/v2/config" + "github.com/tikv/client-go/v2/txnkv" + "go.uber.org/zap" +) + +var ( + ca = flag.String("ca", "", "CA certificate path for TLS connection") + cert = flag.String("cert", "", "certificate path for TLS connection") + key = flag.String("key", "", "private key path for TLS connection") + pdAddr = flag.String("pd", "127.0.0.1:2379", "Address of PD") + runMode = flag.String("mode", "", "Mode. One of 'rand-gen', 'checksum', 'scan', 'delete'") + startKeyStr = flag.String("start-key", "", "Start key in hex") + endKeyStr = flag.String("end-key", "", "End key in hex") + keyMaxLen = flag.Int("key-max-len", 32, "Max length of keys for rand-gen mode") + concurrency = flag.Int("concurrency", 32, "Concurrency to run rand-gen") + duration = flag.Int("duration", 10, "duration(second) of rand-gen") +) + +func createClient(addr string) (*txnkv.Client, error) { + if *ca != "" { + conf := config.GetGlobalConfig() + conf.Security.ClusterSSLCA = *ca + conf.Security.ClusterSSLCert = *cert + conf.Security.ClusterSSLKey = *key + config.StoreGlobalConfig(conf) + } + + cli, err := txnkv.NewClient([]string{addr}) + return cli, errors.Trace(err) +} + +func main() { + flag.Parse() + + startKey := []byte(*startKeyStr) + endKey := []byte(*endKeyStr) + if len(endKey) == 0 { + log.Panic("Empty endKey is not supported yet") + } + + if *runMode == "test-rand-key" { + testRandKey(startKey, endKey, *keyMaxLen) + return + } + + client, err := createClient(*pdAddr) + if err != nil { + log.Panic("Failed to create client", zap.String("pd", *pdAddr), zap.Error(err)) + } + + switch *runMode { + case "rand-gen": + err = randGenWithDuration(client, startKey, endKey, *keyMaxLen, *concurrency, *duration) + case "checksum": + err = checksum(client, startKey, endKey) + case "delete": + err = deleteRange(client, startKey, endKey) + } + + if err != nil { + log.Panic("Error", zap.Error(err)) + } +} + +func randGenWithDuration(client *txnkv.Client, startKey, endKey []byte, + maxLen int, concurrency int, duration int) error { + var err error + ok := make(chan struct{}) + go func() { + err = randGen(client, startKey, endKey, maxLen, concurrency) + ok <- struct{}{} + }() + select { + case <-time.After(time.Second * time.Duration(duration)): + case <-ok: + } + return errors.Trace(err) +} + +func randGen(client *txnkv.Client, startKey, endKey []byte, maxLen int, concurrency int) error { + log.Info("Start rand-gen", zap.Int("maxlen", maxLen), + zap.String("startkey", hex.EncodeToString(startKey)), zap.String("endkey", hex.EncodeToString(endKey))) + log.Info("Rand-gen will keep running. Please Ctrl+C to stop manually.") + + // Cannot generate shorter key than commonPrefix + commonPrefixLen := 0 + for ; commonPrefixLen < len(startKey) && commonPrefixLen < len(endKey) && + startKey[commonPrefixLen] == endKey[commonPrefixLen]; commonPrefixLen++ { + continue + } + + if maxLen < commonPrefixLen { + return errors.Errorf("maxLen (%v) < commonPrefixLen (%v)", maxLen, commonPrefixLen) + } + + const batchSize = 32 + + errCh := make(chan error, concurrency) + for i := maxLen; i <= maxLen+concurrency; i++ { + go func(i int) { + for { + txn, err := client.Begin() + if err != nil { + errCh <- errors.Trace(err) + } + for j := 0; j < batchSize; j++ { + key := randKey(startKey, endKey, i) + // append index to avoid write conflict + key = appendIndex(key, i) + value := randValue() + err = txn.Set(key, value) + if err != nil { + errCh <- errors.Trace(err) + } + } + err = txn.Commit(context.TODO()) + if err != nil { + errCh <- errors.Trace(err) + } + } + }(i) + } + + err := <-errCh + if err != nil { + return errors.Trace(err) + } + + return nil +} + +func testRandKey(startKey, endKey []byte, maxLen int) { + for { + k := randKey(startKey, endKey, maxLen) + if bytes.Compare(k, startKey) < 0 || bytes.Compare(k, endKey) >= 0 { + panic(hex.EncodeToString(k)) + } + } +} + +//nolint:gosec +func randKey(startKey, endKey []byte, maxLen int) []byte { +Retry: + for { // Regenerate on fail + result := make([]byte, 0, maxLen) + + upperUnbounded := false + lowerUnbounded := false + + for i := 0; i < maxLen; i++ { + upperBound := 256 + if !upperUnbounded { + if i >= len(endKey) { + // The generated key is the same as endKey which is invalid. Regenerate it. + continue Retry + } + upperBound = int(endKey[i]) + 1 + } + + lowerBound := 0 + if !lowerUnbounded { + if i >= len(startKey) { + lowerUnbounded = true + } else { + lowerBound = int(startKey[i]) + } + } + + if lowerUnbounded { + if rand.Intn(257) == 0 { + return result + } + } + + value := rand.Intn(upperBound - lowerBound) + value += lowerBound + + if value < upperBound-1 { + upperUnbounded = true + } + if value > lowerBound { + lowerUnbounded = true + } + + result = append(result, uint8(value)) + } + + return result + } +} + +//nolint:gosec +func appendIndex(key []byte, i int) []byte { + return append(key, uint8(i)) +} + +//nolint:gosec +func randValue() []byte { + result := make([]byte, 0, 512) + for i := 0; i < 512; i++ { + value := rand.Intn(257) + if value == 256 { + if i > 0 { + return result + } + value-- + } + result = append(result, uint8(value)) + } + return result +} + +func checksum(client *txnkv.Client, startKey, endKey []byte) error { + log.Info("Start checkcum on range", + zap.String("startkey", hex.EncodeToString(startKey)), zap.String("endkey", hex.EncodeToString(endKey))) + + txn, err := client.Begin() + if err != nil { + return errors.Trace(err) + } + iter, err := txn.Iter(startKey, endKey) + if err != nil { + return errors.Trace(err) + } + + digest := crc64.New(crc64.MakeTable(crc64.ECMA)) + + var res uint64 + + for iter.Valid() { + err := iter.Next() + if err != nil { + return errors.Trace(err) + } + if len(iter.Key()) == 0 { + break + } + _, _ = digest.Write(iter.Key()) + _, _ = digest.Write(iter.Value()) + res ^= digest.Sum64() + } + _ = txn.Commit(context.TODO()) + + log.Info("Checksum result", zap.Uint64("checksum", res)) + fmt.Printf("Checksum result: %016x\n", res) + return nil +} + +func deleteRange(client *txnkv.Client, startKey, endKey []byte) error { + log.Info("Start delete data in range", + zap.String("startkey", hex.EncodeToString(startKey)), zap.String("endkey", hex.EncodeToString(endKey))) + _, err := client.DeleteRange(context.TODO(), startKey, endKey, *concurrency) + return err +} diff --git a/br/tests/br_txn/run.sh b/br/tests/br_txn/run.sh new file mode 100755 index 0000000000000..8b15f78764af4 --- /dev/null +++ b/br/tests/br_txn/run.sh @@ -0,0 +1,136 @@ +#!/bin/sh +# +# Copyright 2023 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eux + +# restart service without tiflash +source $( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )/../_utils/run_services +start_services --no-tiflash + +BACKUP_DIR=$TEST_DIR/"txn_backup" +BACKUP_FULL=$TEST_DIR/"txnkv-full" + +checksum() { + bin/txnkv --pd $PD_ADDR \ + --ca "$TEST_DIR/certs/ca.pem" \ + --cert "$TEST_DIR/certs/br.pem" \ + --key "$TEST_DIR/certs/br.key" \ + --mode checksum --start-key $1 --end-key $2 | grep result | tail -n 1 | awk '{print $3}' +} + +fail_and_exit() { + echo "TEST: [$TEST_NAME] failed!" + exit 1 +} + +clean() { + bin/txnkv --pd $PD_ADDR \ + --ca "$TEST_DIR/certs/ca.pem" \ + --cert "$TEST_DIR/certs/br.pem" \ + --key "$TEST_DIR/certs/br.key" \ + --mode delete --start-key $1 --end-key $2 +} + +test_full_txnkv_encryption() { + check_range_start="hello" + check_range_end="world" + + rm -rf $BACKUP_FULL + + checksum_full=$(checksum $check_range_start $check_range_end) + # backup current state of key-values + run_br --pd $PD_ADDR backup txn -s "local://$BACKUP_FULL" --crypter.method "aes128-ctr" --crypter.key "0123456789abcdef0123456789abcdef" + + clean $check_range_start $check_range_end + # Ensure the data is deleted + checksum_new=$(checksum $check_range_start $check_range_end) + if [ "$checksum_new" == "$checksum_full" ];then + echo "failed to delete data in range in encryption" + fail_and_exit + fi + + run_br --pd $PD_ADDR restore txn -s "local://$BACKUP_FULL" --crypter.method "aes128-ctr" --crypter.key "0123456789abcdef0123456789abcdef" + checksum_new=$(checksum $check_range_start $check_range_end) + if [ "$checksum_new" != "$checksum_full" ];then + echo "failed to restore" + fail_and_exit + fi +} + +run_test() { + if [ -z "$1" ];then + echo "run test" + else + export GO_FAILPOINTS="$1" + echo "run test with failpoints: $GO_FAILPOINTS" + fi + + rm -rf $BACKUP_DIR + clean "hello" "world" + + # generate txn kv randomly in range[start-key, end-key) in 10s + bin/txnkv --pd $PD_ADDR \ + --ca "$TEST_DIR/certs/ca.pem" \ + --cert "$TEST_DIR/certs/br.pem" \ + --key "$TEST_DIR/certs/br.key" \ + --mode rand-gen --start-key "hello" --end-key "world" --duration 10 + + checksum_ori=$(checksum "hello" "world") + + # backup txnkv + echo "backup start..." + run_br --pd $PD_ADDR backup txn -s "local://$BACKUP_DIR" + + # delete data in range[start-key, end-key) + clean "hello" "world" + # Ensure the data is deleted + checksum_new=$(checksum "hello" "world") + + if [ "$checksum_new" != "$checksum_empty" ];then + echo "failed to delete data in range after backup" + fail_and_exit + fi + + # restore rawkv + echo "restore start..." + run_br --pd $PD_ADDR restore txn -s "local://$BACKUP_DIR" + + checksum_new=$(checksum "hello" "world") + + if [ "$checksum_new" != "$checksum_ori" ];then + echo "checksum failed after restore" + fail_and_exit + fi + + test_full_txnkv_encryption + + # delete data in range[start-key, end-key) + clean "hello" "world" + # Ensure the data is deleted + checksum_new=$(checksum "hello" "world") + + if [ "$checksum_new" != "$checksum_empty" ];then + echo "failed to delete data in range" + fail_and_exit + fi + + export GO_FAILPOINTS="" +} + +# delete data in range[start-key, end-key) +clean "hello" "world" +checksum_empty=$(checksum "hello" "world") +run_test "" diff --git a/br/tests/run_group.sh b/br/tests/run_group.sh index 58fe387d4be6a..ca66c8d5013ce 100755 --- a/br/tests/run_group.sh +++ b/br/tests/run_group.sh @@ -9,6 +9,7 @@ set -eo pipefail # Step 1 CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) group=$1 + export COV_DIR="/tmp/group_cover" rm -rf COV_DIR mkdir $COV_DIR @@ -23,7 +24,7 @@ groups=( ["G00"]="br_300_small_tables br_backup_empty br_backup_version br_cache_table br_case_sensitive br_charset_gbk br_check_new_collocation_enable" ["G01"]="br_autoid br_crypter2 br_db br_db_online br_db_online_newkv br_db_skip br_debug_meta br_ebs br_foreign_key br_full" ["G02"]="br_full_cluster_restore br_full_ddl br_full_index br_gcs br_history" - ["G03"]='br_incompatible_tidb_config br_incremental br_incremental_ddl br_incremental_index' + ["G03"]='br_incompatible_tidb_config br_incremental br_incremental_ddl br_incremental_index' ["G04"]='br_incremental_only_ddl br_incremental_same_table br_insert_after_restore br_key_locked br_log_test br_move_backup br_mv_index br_other br_partition_add_index' ["G05"]='br_range br_rawkv br_replica_read br_restore_TDE_enable br_restore_log_task_enable br_s3 br_shuffle_leader br_shuffle_region br_single_table' ["G06"]='br_skip_checksum br_small_batch_size br_split_region_fail br_systables br_table_filter br_txn'