From 87fe98c83e67020ba9f261cfa04dd3463ca769a4 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Tue, 17 Oct 2023 10:48:42 +0800 Subject: [PATCH 01/10] draft Signed-off-by: Leavrth --- br/pkg/stream/stream_status.go | 17 +++ br/pkg/stream/util.go | 4 + br/pkg/stream/util_test.go | 10 +- .../br_pitr/incremental_data/delete_range.sql | 124 ++++++++++++++++++ br/tests/br_pitr/run.sh | 87 ++++++++++++ 5 files changed, 241 insertions(+), 1 deletion(-) create mode 100644 br/tests/br_pitr/incremental_data/delete_range.sql create mode 100644 br/tests/br_pitr/run.sh diff --git a/br/pkg/stream/stream_status.go b/br/pkg/stream/stream_status.go index 57b803d93d46d..a88ab4cb2b309 100644 --- a/br/pkg/stream/stream_status.go +++ b/br/pkg/stream/stream_status.go @@ -8,6 +8,7 @@ import ( "encoding/json" "fmt" "io" + "os" "regexp" "strconv" "sync" @@ -15,6 +16,7 @@ import ( "github.com/fatih/color" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" @@ -419,6 +421,21 @@ func (ctl *StatusController) PrintStatusOfTask(ctx context.Context, name string) if err != nil { return err } + failpoint.Inject("only-checkpoint-ts-with-check", func(val failpoint.Value) { + if nowTs, ok := val.(string); ok && len(tasks) > 0 && len(nowTs) > 0 { + checkpointTime := oracle.GetTimeFromTS(tasks[0].globalCheckpoint) + nowTime, err := ParseDate(nowTs) + if err != nil { + failpoint.Return(err) + } + if checkpointTime.After(nowTime) { + os.Exit(50) + } else { + os.Exit(51) + } + failpoint.Return(nil) + } + }) ctl.printToView(tasks) return nil } diff --git a/br/pkg/stream/util.go b/br/pkg/stream/util.go index 10215a68df61d..12ff00ab1361f 100644 --- a/br/pkg/stream/util.go +++ b/br/pkg/stream/util.go @@ -13,6 +13,10 @@ func FormatDate(ts time.Time) string { return ts.Format(DATE_FORMAT) } +func ParseDate(date string) (time.Time, error) { + return time.Parse(DATE_FORMAT, date) +} + func IsMetaDBKey(key []byte) bool { return strings.HasPrefix(string(key), "mDB") } diff --git a/br/pkg/stream/util_test.go b/br/pkg/stream/util_test.go index 2562c9ce15840..63222b8ed7b0c 100644 --- a/br/pkg/stream/util_test.go +++ b/br/pkg/stream/util_test.go @@ -23,12 +23,20 @@ func TestDateFormat(t *testing.T) { 434605479096221697, "2022-07-15 20:32:12.734 +0800", }, + { + 434605479096221697, + "2022-07-15 20:32:12 +0800", + }, } timeZone, _ := time.LoadLocation("Asia/Shanghai") for _, ca := range cases { - date := FormatDate(oracle.GetTimeFromTS(ca.ts).In(timeZone)) + ts := oracle.GetTimeFromTS(ca.ts).In(timeZone) + date := FormatDate(ts) require.Equal(t, ca.target, date) + ts2, err := ParseDate(date) + require.NoError(t, err) + require.Equal(t, ts, ts2.In(timeZone)) } } diff --git a/br/tests/br_pitr/incremental_data/delete_range.sql b/br/tests/br_pitr/incremental_data/delete_range.sql new file mode 100644 index 0000000000000..835d8060b4622 --- /dev/null +++ b/br/tests/br_pitr/incremental_data/delete_range.sql @@ -0,0 +1,124 @@ +-- 1. Drop Schema +create database db_to_be_dropped; +create table db_to_be_dropped.t0(id int primary key, c int, name char(20)); +create table db_to_be_dropped.t1(id int primary key, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); + +create index k1 on db_to_be_dropped.t0 (name); +create index k2 on db_to_be_dropped.t0(c); +create index k1 on db_to_be_dropped.t1(name); +create index k2 on db_to_be_dropped.t1(c); +create index k3 on db_to_be_dropped.t1 (id, c); + +insert into db_to_be_dropped.t0 values (1, 2, "123"), (2, 3, "123"); +insert into db_to_be_dropped.t1 values (1, 2, "123"), (2, 3, "123"); +-- 2. Drop/Truncate Table +create database table_to_be_dropped_or_truncated; +create table table_to_be_dropped_or_truncated.t0_dropped(id int primary key, c int, name char(20)); +create table table_to_be_dropped_or_truncated.t1_dropped(id int primary key, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); +create table table_to_be_dropped_or_truncated.t0_truncated(id int primary key, c int, name char(20)); +create table table_to_be_dropped_or_truncated.t1_truncated(id int primary key, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); + +create index k1 on table_to_be_dropped_or_truncated.t0_dropped (name); +create index k2 on table_to_be_dropped_or_truncated.t0_dropped (c); +create index k1 on table_to_be_dropped_or_truncated.t1_dropped (name); +create index k2 on table_to_be_dropped_or_truncated.t1_dropped (c); +create index k3 on table_to_be_dropped_or_truncated.t1_dropped (id, c); + +create index k1 on table_to_be_dropped_or_truncated.t0_truncated (name); +create index k2 on table_to_be_dropped_or_truncated.t0_truncated (c); +create index k1 on table_to_be_dropped_or_truncated.t1_truncated (name); +create index k2 on table_to_be_dropped_or_truncated.t1_truncated (c); +create index k3 on table_to_be_dropped_or_truncated.t1_truncated (id, c); + +insert into table_to_be_dropped_or_truncated.t0_dropped values (1, 2, "123"), (2, 3, "123"); +insert into table_to_be_dropped_or_truncated.t1_dropped values (1, 2, "123"), (2, 3, "123"); + +insert into table_to_be_dropped_or_truncated.t0_truncated values (1, 2, "123"), (2, 3, "123"); +insert into table_to_be_dropped_or_truncated.t1_truncated values (1, 2, "123"), (2, 3, "123"); + +-- 3. Drop/Truncate Table Partition +create database partition_to_be_dropped_or_truncated; +create table partition_to_be_dropped_or_truncated.t0_dropped(id int primary key, c int, name char(20)); +create table partition_to_be_dropped_or_truncated.t1_dropped(id int primary key, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); +create table partition_to_be_dropped_or_truncated.t0_truncated(id int primary key, c int, name char(20)); +create table partition_to_be_dropped_or_truncated.t1_truncated(id int primary key, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); + +create index k1 on partition_to_be_dropped_or_truncated.t0_dropped (name); +create index k2 on partition_to_be_dropped_or_truncated.t0_dropped (c); +create index k1 on partition_to_be_dropped_or_truncated.t1_dropped (name); +create index k2 on partition_to_be_dropped_or_truncated.t1_dropped (c); +create index k3 on partition_to_be_dropped_or_truncated.t1_dropped (id, c); + +create index k1 on partition_to_be_dropped_or_truncated.t0_truncated (name); +create index k2 on partition_to_be_dropped_or_truncated.t0_truncated (c); +create index k1 on partition_to_be_dropped_or_truncated.t1_truncated (name); +create index k2 on partition_to_be_dropped_or_truncated.t1_truncated (c); +create index k3 on partition_to_be_dropped_or_truncated.t1_truncated (id, c); + +insert into partition_to_be_dropped_or_truncated.t0_dropped values (1, 2, "123"), (2, 3, "123"); +insert into partition_to_be_dropped_or_truncated.t1_dropped values (1, 2, "123"), (2, 3, "123"); + +insert into partition_to_be_dropped_or_truncated.t0_truncated values (1, 2, "123"), (2, 3, "123"); +insert into partition_to_be_dropped_or_truncated.t1_truncated values (1, 2, "123"), (2, 3, "123"); +-- 4. Drop Table Index/PrimaryKey +create database index_or_primarykey_to_be_dropped; +create table index_or_primarykey_to_be_dropped.t0(id int primary key nonclustered, c int, name char(20)); +create table index_or_primarykey_to_be_dropped.t1(id int primary key nonclustered, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); + +create index k1 on index_or_primarykey_to_be_dropped.t0 (name); +create index k2 on index_or_primarykey_to_be_dropped.t0 (c); +create index k1 on index_or_primarykey_to_be_dropped.t1 (name); +create index k2 on index_or_primarykey_to_be_dropped.t1 (c); +create index k3 on index_or_primarykey_to_be_dropped.t1 (id, c); + +insert into index_or_primarykey_to_be_dropped.t0 values (1, 2, "123"), (2, 3, "123"); +insert into index_or_primarykey_to_be_dropped.t1 values (1, 2, "123"), (2, 3, "123"); +-- 5. Drop Table INDEXES +create database indexes_to_be_dropped; +create table indexes_to_be_dropped.t0(id int primary key nonclustered, c int, name char(20)); +create table indexes_to_be_dropped.t1(id int primary key nonclustered, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); + +create index k1 on indexes_to_be_dropped.t0 (name); +create index k2 on indexes_to_be_dropped.t0 (c); +create index k1 on indexes_to_be_dropped.t1 (name); +create index k2 on indexes_to_be_dropped.t1 (c); +create index k3 on indexes_to_be_dropped.t1 (id, c); + +insert into indexes_to_be_dropped.t0 values (1, 2, "123"), (2, 3, "123"); +insert into indexes_to_be_dropped.t1 values (1, 2, "123"), (2, 3, "123"); +-- 6. Drop Table Column/Columns +create database column_s_to_be_dropped; +create table column_s_to_be_dropped.t0_column(id int primary key nonclustered, c int, name char(20)); +create table column_s_to_be_dropped.t1_column(id int primary key nonclustered, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); +create table column_s_to_be_dropped.t0_columns(id int primary key nonclustered, c int, name char(20)); +create table column_s_to_be_dropped.t1_columns(id int primary key nonclustered, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); + +create index k1 on column_s_to_be_dropped.t0_column (name); +create index k2 on column_s_to_be_dropped.t0_column (c); +create index k1 on column_s_to_be_dropped.t1_column (name); +create index k2 on column_s_to_be_dropped.t1_column (c); +create index k3 on column_s_to_be_dropped.t1_column (id, c); + +create index k1 on column_s_to_be_dropped.t0_columns (name); +create index k2 on column_s_to_be_dropped.t0_columns (c); +create index k1 on column_s_to_be_dropped.t1_columns (name); +create index k2 on column_s_to_be_dropped.t1_columns (c); +-- create index k3 on column_s_to_be_dropped.t1_columns (id, c); + +insert into column_s_to_be_dropped.t0_column values (1, 2, "123"), (2, 3, "123"); +insert into column_s_to_be_dropped.t1_column values (1, 2, "123"), (2, 3, "123"); +insert into column_s_to_be_dropped.t0_columns values (1, 2, "123"), (2, 3, "123"); +insert into column_s_to_be_dropped.t1_columns values (1, 2, "123"), (2, 3, "123"); +-- 7. Modify Table Column +create database column_to_be_modified; +create table column_to_be_modified.t0(id int primary key nonclustered, c int, name char(20)); +create table column_to_be_modified.t1(id int primary key nonclustered, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); + +create index k1 on column_to_be_modified.t0 (name); +create index k2 on column_to_be_modified.t0 (c); +create index k1 on column_to_be_modified.t1 (name); +create index k2 on column_to_be_modified.t1 (c); +create index k3 on column_to_be_modified.t1 (id, c); + +insert into column_to_be_modified.t0 values (1, 2, "123"), (2, 3, "123"); +insert into column_to_be_modified.t1 values (1, 2, "123"), (2, 3, "123"); \ No newline at end of file diff --git a/br/tests/br_pitr/run.sh b/br/tests/br_pitr/run.sh new file mode 100644 index 0000000000000..5006879e67852 --- /dev/null +++ b/br/tests/br_pitr/run.sh @@ -0,0 +1,87 @@ +#!/bin/bash +# +# Copyright 2023 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eu +CUR=$(cd `dirname $0`; pwd) +DB="$TEST_NAME" + +# const value +WAIT_DONE_CODE=50 +WAIT_NOT_DONE_CODE=51 + +# prepare the data +echo "prepare the data" +# ... + +# start the log backup task +echo "start log task" +run_br --pd $PD_ADDR log start --task integration_test -s "local://$TEST_DIR/$TEST_NAME/log" + +# run snapshot backup +echo "run snapshot backup" +run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$TEST_NAME/full" + +# load the incremental data +echo "load the incremental data" +run_sql_file $CUR/data/delete_range.sql +# ... + +# wait checkpoint advance +echo "wait checkpoint advance" +OLD_GO_FAILPOINTS=$GO_FAILPOINTS +sleep 10 +now_time=$(date "+%Y-%m-%d %H:%M:%S %z") +echo "get the current time: $now_time" +export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/stream/only-checkpoint-ts-with-check=return(\"$now_time\")" +while true; do + # run br with failpoint to compare the current checkpoint ts with the current time + run_br --pd $PD_ADDR log status --task-name integration_test + exit_code=$? + echo "exit code: $exit_code" + + # the checkpoint has advanced + if [ $exit_code -eq $WAIT_DONE_CODE ]; then + break + fi + + # the checkpoint hasn't advanced + if [ $exit_code -eq $WAIT_NOT_DONE_CODE ]; then + sleep 10 + continue + fi + + # unknown status, maybe somewhere is wrong + echo "TEST: [$TEST_NAME] failed to wait checkpoint advance!" + exit 1 +done +export GO_FAILPOINTS=$OLD_GO_FAILPOINTS + +# dump some info from upstream cluster +# ... + +# start a new cluster +echo "stop services" +stop_services +echo "clean up data" +rm -rf $TEST_DIR && mkdir -p $TEST_DIR +echo "start services" +start_services + +# PITR restore +run_br --pd $PD_ADDR restore point -s "local://$TEST_DIR/$TEST_NAME/log" --full-backup-storage "local://$TEST_DIR/$TEST_NAME/full" + +# check something in downstream cluster +# ... From 7e63a8f89b396e2fcb66f341ecd3d002203416e9 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Tue, 17 Oct 2023 15:46:30 +0800 Subject: [PATCH 02/10] draft Signed-off-by: Leavrth --- br/pkg/stream/stream_status.go | 1 + .../br_pitr/incremental_data/delete_range.sql | 137 +++--------------- .../br_pitr/prepare_data/delete_range.sql | 124 ++++++++++++++++ br/tests/br_pitr/run.sh | 28 ++-- 4 files changed, 158 insertions(+), 132 deletions(-) create mode 100644 br/tests/br_pitr/prepare_data/delete_range.sql diff --git a/br/pkg/stream/stream_status.go b/br/pkg/stream/stream_status.go index a88ab4cb2b309..bdfe0c314acba 100644 --- a/br/pkg/stream/stream_status.go +++ b/br/pkg/stream/stream_status.go @@ -424,6 +424,7 @@ func (ctl *StatusController) PrintStatusOfTask(ctx context.Context, name string) failpoint.Inject("only-checkpoint-ts-with-check", func(val failpoint.Value) { if nowTs, ok := val.(string); ok && len(tasks) > 0 && len(nowTs) > 0 { checkpointTime := oracle.GetTimeFromTS(tasks[0].globalCheckpoint) + log.Info("get checkpoint time", zap.Time("checkpoint ts", checkpointTime)) nowTime, err := ParseDate(nowTs) if err != nil { failpoint.Return(err) diff --git a/br/tests/br_pitr/incremental_data/delete_range.sql b/br/tests/br_pitr/incremental_data/delete_range.sql index 835d8060b4622..1c59a94b7394a 100644 --- a/br/tests/br_pitr/incremental_data/delete_range.sql +++ b/br/tests/br_pitr/incremental_data/delete_range.sql @@ -1,124 +1,25 @@ -- 1. Drop Schema -create database db_to_be_dropped; -create table db_to_be_dropped.t0(id int primary key, c int, name char(20)); -create table db_to_be_dropped.t1(id int primary key, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); - -create index k1 on db_to_be_dropped.t0 (name); -create index k2 on db_to_be_dropped.t0(c); -create index k1 on db_to_be_dropped.t1(name); -create index k2 on db_to_be_dropped.t1(c); -create index k3 on db_to_be_dropped.t1 (id, c); - -insert into db_to_be_dropped.t0 values (1, 2, "123"), (2, 3, "123"); -insert into db_to_be_dropped.t1 values (1, 2, "123"), (2, 3, "123"); +drop database db_to_be_dropped; -- 2. Drop/Truncate Table -create database table_to_be_dropped_or_truncated; -create table table_to_be_dropped_or_truncated.t0_dropped(id int primary key, c int, name char(20)); -create table table_to_be_dropped_or_truncated.t1_dropped(id int primary key, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); -create table table_to_be_dropped_or_truncated.t0_truncated(id int primary key, c int, name char(20)); -create table table_to_be_dropped_or_truncated.t1_truncated(id int primary key, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); - -create index k1 on table_to_be_dropped_or_truncated.t0_dropped (name); -create index k2 on table_to_be_dropped_or_truncated.t0_dropped (c); -create index k1 on table_to_be_dropped_or_truncated.t1_dropped (name); -create index k2 on table_to_be_dropped_or_truncated.t1_dropped (c); -create index k3 on table_to_be_dropped_or_truncated.t1_dropped (id, c); - -create index k1 on table_to_be_dropped_or_truncated.t0_truncated (name); -create index k2 on table_to_be_dropped_or_truncated.t0_truncated (c); -create index k1 on table_to_be_dropped_or_truncated.t1_truncated (name); -create index k2 on table_to_be_dropped_or_truncated.t1_truncated (c); -create index k3 on table_to_be_dropped_or_truncated.t1_truncated (id, c); - -insert into table_to_be_dropped_or_truncated.t0_dropped values (1, 2, "123"), (2, 3, "123"); -insert into table_to_be_dropped_or_truncated.t1_dropped values (1, 2, "123"), (2, 3, "123"); - -insert into table_to_be_dropped_or_truncated.t0_truncated values (1, 2, "123"), (2, 3, "123"); -insert into table_to_be_dropped_or_truncated.t1_truncated values (1, 2, "123"), (2, 3, "123"); - +drop table table_to_be_dropped_or_truncated.t0_dropped; +drop table table_to_be_dropped_or_truncated.t1_dropped; +truncate table table_to_be_dropped_or_truncated.t0_truncated; +truncate table table_to_be_dropped_or_truncated.t1_truncated; -- 3. Drop/Truncate Table Partition -create database partition_to_be_dropped_or_truncated; -create table partition_to_be_dropped_or_truncated.t0_dropped(id int primary key, c int, name char(20)); -create table partition_to_be_dropped_or_truncated.t1_dropped(id int primary key, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); -create table partition_to_be_dropped_or_truncated.t0_truncated(id int primary key, c int, name char(20)); -create table partition_to_be_dropped_or_truncated.t1_truncated(id int primary key, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); - -create index k1 on partition_to_be_dropped_or_truncated.t0_dropped (name); -create index k2 on partition_to_be_dropped_or_truncated.t0_dropped (c); -create index k1 on partition_to_be_dropped_or_truncated.t1_dropped (name); -create index k2 on partition_to_be_dropped_or_truncated.t1_dropped (c); -create index k3 on partition_to_be_dropped_or_truncated.t1_dropped (id, c); - -create index k1 on partition_to_be_dropped_or_truncated.t0_truncated (name); -create index k2 on partition_to_be_dropped_or_truncated.t0_truncated (c); -create index k1 on partition_to_be_dropped_or_truncated.t1_truncated (name); -create index k2 on partition_to_be_dropped_or_truncated.t1_truncated (c); -create index k3 on partition_to_be_dropped_or_truncated.t1_truncated (id, c); - -insert into partition_to_be_dropped_or_truncated.t0_dropped values (1, 2, "123"), (2, 3, "123"); -insert into partition_to_be_dropped_or_truncated.t1_dropped values (1, 2, "123"), (2, 3, "123"); - -insert into partition_to_be_dropped_or_truncated.t0_truncated values (1, 2, "123"), (2, 3, "123"); -insert into partition_to_be_dropped_or_truncated.t1_truncated values (1, 2, "123"), (2, 3, "123"); +alter table partition_to_be_dropped_or_truncated.t1_dropped drop partition p0; +alter table partition_to_be_dropped_or_truncated.t1_truncated truncate partition p0; -- 4. Drop Table Index/PrimaryKey -create database index_or_primarykey_to_be_dropped; -create table index_or_primarykey_to_be_dropped.t0(id int primary key nonclustered, c int, name char(20)); -create table index_or_primarykey_to_be_dropped.t1(id int primary key nonclustered, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); - -create index k1 on index_or_primarykey_to_be_dropped.t0 (name); -create index k2 on index_or_primarykey_to_be_dropped.t0 (c); -create index k1 on index_or_primarykey_to_be_dropped.t1 (name); -create index k2 on index_or_primarykey_to_be_dropped.t1 (c); -create index k3 on index_or_primarykey_to_be_dropped.t1 (id, c); - -insert into index_or_primarykey_to_be_dropped.t0 values (1, 2, "123"), (2, 3, "123"); -insert into index_or_primarykey_to_be_dropped.t1 values (1, 2, "123"), (2, 3, "123"); --- 5. Drop Table INDEXES -create database indexes_to_be_dropped; -create table indexes_to_be_dropped.t0(id int primary key nonclustered, c int, name char(20)); -create table indexes_to_be_dropped.t1(id int primary key nonclustered, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); - -create index k1 on indexes_to_be_dropped.t0 (name); -create index k2 on indexes_to_be_dropped.t0 (c); -create index k1 on indexes_to_be_dropped.t1 (name); -create index k2 on indexes_to_be_dropped.t1 (c); -create index k3 on indexes_to_be_dropped.t1 (id, c); - -insert into indexes_to_be_dropped.t0 values (1, 2, "123"), (2, 3, "123"); -insert into indexes_to_be_dropped.t1 values (1, 2, "123"), (2, 3, "123"); +alter table index_or_primarykey_to_be_dropped.t0 drop index k1; +alter table index_or_primarykey_to_be_dropped.t1 drop index k1; +alter table index_or_primarykey_to_be_dropped.t0 drop primary key; +alter table index_or_primarykey_to_be_dropped.t1 drop primary key; +-- 5. Drop Table Indexes +alter table indexes_to_be_dropped.t0 drop index k1, drop index k2; +alter table indexes_to_be_dropped.t1 drop index k1, drop index k2; -- 6. Drop Table Column/Columns -create database column_s_to_be_dropped; -create table column_s_to_be_dropped.t0_column(id int primary key nonclustered, c int, name char(20)); -create table column_s_to_be_dropped.t1_column(id int primary key nonclustered, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); -create table column_s_to_be_dropped.t0_columns(id int primary key nonclustered, c int, name char(20)); -create table column_s_to_be_dropped.t1_columns(id int primary key nonclustered, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); - -create index k1 on column_s_to_be_dropped.t0_column (name); -create index k2 on column_s_to_be_dropped.t0_column (c); -create index k1 on column_s_to_be_dropped.t1_column (name); -create index k2 on column_s_to_be_dropped.t1_column (c); -create index k3 on column_s_to_be_dropped.t1_column (id, c); - -create index k1 on column_s_to_be_dropped.t0_columns (name); -create index k2 on column_s_to_be_dropped.t0_columns (c); -create index k1 on column_s_to_be_dropped.t1_columns (name); -create index k2 on column_s_to_be_dropped.t1_columns (c); --- create index k3 on column_s_to_be_dropped.t1_columns (id, c); - -insert into column_s_to_be_dropped.t0_column values (1, 2, "123"), (2, 3, "123"); -insert into column_s_to_be_dropped.t1_column values (1, 2, "123"), (2, 3, "123"); -insert into column_s_to_be_dropped.t0_columns values (1, 2, "123"), (2, 3, "123"); -insert into column_s_to_be_dropped.t1_columns values (1, 2, "123"), (2, 3, "123"); +alter table column_s_to_be_dropped.t0_column drop column name; +alter table column_s_to_be_dropped.t1_column drop column name; +alter table column_s_to_be_dropped.t0_columns drop column name, drop column c; +alter table column_s_to_be_dropped.t1_columns drop column name, drop column c; -- 7. Modify Table Column -create database column_to_be_modified; -create table column_to_be_modified.t0(id int primary key nonclustered, c int, name char(20)); -create table column_to_be_modified.t1(id int primary key nonclustered, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); - -create index k1 on column_to_be_modified.t0 (name); -create index k2 on column_to_be_modified.t0 (c); -create index k1 on column_to_be_modified.t1 (name); -create index k2 on column_to_be_modified.t1 (c); -create index k3 on column_to_be_modified.t1 (id, c); - -insert into column_to_be_modified.t0 values (1, 2, "123"), (2, 3, "123"); -insert into column_to_be_modified.t1 values (1, 2, "123"), (2, 3, "123"); \ No newline at end of file +alter table column_to_be_modified.t0 modify column name varchar(25); \ No newline at end of file diff --git a/br/tests/br_pitr/prepare_data/delete_range.sql b/br/tests/br_pitr/prepare_data/delete_range.sql new file mode 100644 index 0000000000000..835d8060b4622 --- /dev/null +++ b/br/tests/br_pitr/prepare_data/delete_range.sql @@ -0,0 +1,124 @@ +-- 1. Drop Schema +create database db_to_be_dropped; +create table db_to_be_dropped.t0(id int primary key, c int, name char(20)); +create table db_to_be_dropped.t1(id int primary key, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); + +create index k1 on db_to_be_dropped.t0 (name); +create index k2 on db_to_be_dropped.t0(c); +create index k1 on db_to_be_dropped.t1(name); +create index k2 on db_to_be_dropped.t1(c); +create index k3 on db_to_be_dropped.t1 (id, c); + +insert into db_to_be_dropped.t0 values (1, 2, "123"), (2, 3, "123"); +insert into db_to_be_dropped.t1 values (1, 2, "123"), (2, 3, "123"); +-- 2. Drop/Truncate Table +create database table_to_be_dropped_or_truncated; +create table table_to_be_dropped_or_truncated.t0_dropped(id int primary key, c int, name char(20)); +create table table_to_be_dropped_or_truncated.t1_dropped(id int primary key, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); +create table table_to_be_dropped_or_truncated.t0_truncated(id int primary key, c int, name char(20)); +create table table_to_be_dropped_or_truncated.t1_truncated(id int primary key, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); + +create index k1 on table_to_be_dropped_or_truncated.t0_dropped (name); +create index k2 on table_to_be_dropped_or_truncated.t0_dropped (c); +create index k1 on table_to_be_dropped_or_truncated.t1_dropped (name); +create index k2 on table_to_be_dropped_or_truncated.t1_dropped (c); +create index k3 on table_to_be_dropped_or_truncated.t1_dropped (id, c); + +create index k1 on table_to_be_dropped_or_truncated.t0_truncated (name); +create index k2 on table_to_be_dropped_or_truncated.t0_truncated (c); +create index k1 on table_to_be_dropped_or_truncated.t1_truncated (name); +create index k2 on table_to_be_dropped_or_truncated.t1_truncated (c); +create index k3 on table_to_be_dropped_or_truncated.t1_truncated (id, c); + +insert into table_to_be_dropped_or_truncated.t0_dropped values (1, 2, "123"), (2, 3, "123"); +insert into table_to_be_dropped_or_truncated.t1_dropped values (1, 2, "123"), (2, 3, "123"); + +insert into table_to_be_dropped_or_truncated.t0_truncated values (1, 2, "123"), (2, 3, "123"); +insert into table_to_be_dropped_or_truncated.t1_truncated values (1, 2, "123"), (2, 3, "123"); + +-- 3. Drop/Truncate Table Partition +create database partition_to_be_dropped_or_truncated; +create table partition_to_be_dropped_or_truncated.t0_dropped(id int primary key, c int, name char(20)); +create table partition_to_be_dropped_or_truncated.t1_dropped(id int primary key, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); +create table partition_to_be_dropped_or_truncated.t0_truncated(id int primary key, c int, name char(20)); +create table partition_to_be_dropped_or_truncated.t1_truncated(id int primary key, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); + +create index k1 on partition_to_be_dropped_or_truncated.t0_dropped (name); +create index k2 on partition_to_be_dropped_or_truncated.t0_dropped (c); +create index k1 on partition_to_be_dropped_or_truncated.t1_dropped (name); +create index k2 on partition_to_be_dropped_or_truncated.t1_dropped (c); +create index k3 on partition_to_be_dropped_or_truncated.t1_dropped (id, c); + +create index k1 on partition_to_be_dropped_or_truncated.t0_truncated (name); +create index k2 on partition_to_be_dropped_or_truncated.t0_truncated (c); +create index k1 on partition_to_be_dropped_or_truncated.t1_truncated (name); +create index k2 on partition_to_be_dropped_or_truncated.t1_truncated (c); +create index k3 on partition_to_be_dropped_or_truncated.t1_truncated (id, c); + +insert into partition_to_be_dropped_or_truncated.t0_dropped values (1, 2, "123"), (2, 3, "123"); +insert into partition_to_be_dropped_or_truncated.t1_dropped values (1, 2, "123"), (2, 3, "123"); + +insert into partition_to_be_dropped_or_truncated.t0_truncated values (1, 2, "123"), (2, 3, "123"); +insert into partition_to_be_dropped_or_truncated.t1_truncated values (1, 2, "123"), (2, 3, "123"); +-- 4. Drop Table Index/PrimaryKey +create database index_or_primarykey_to_be_dropped; +create table index_or_primarykey_to_be_dropped.t0(id int primary key nonclustered, c int, name char(20)); +create table index_or_primarykey_to_be_dropped.t1(id int primary key nonclustered, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); + +create index k1 on index_or_primarykey_to_be_dropped.t0 (name); +create index k2 on index_or_primarykey_to_be_dropped.t0 (c); +create index k1 on index_or_primarykey_to_be_dropped.t1 (name); +create index k2 on index_or_primarykey_to_be_dropped.t1 (c); +create index k3 on index_or_primarykey_to_be_dropped.t1 (id, c); + +insert into index_or_primarykey_to_be_dropped.t0 values (1, 2, "123"), (2, 3, "123"); +insert into index_or_primarykey_to_be_dropped.t1 values (1, 2, "123"), (2, 3, "123"); +-- 5. Drop Table INDEXES +create database indexes_to_be_dropped; +create table indexes_to_be_dropped.t0(id int primary key nonclustered, c int, name char(20)); +create table indexes_to_be_dropped.t1(id int primary key nonclustered, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); + +create index k1 on indexes_to_be_dropped.t0 (name); +create index k2 on indexes_to_be_dropped.t0 (c); +create index k1 on indexes_to_be_dropped.t1 (name); +create index k2 on indexes_to_be_dropped.t1 (c); +create index k3 on indexes_to_be_dropped.t1 (id, c); + +insert into indexes_to_be_dropped.t0 values (1, 2, "123"), (2, 3, "123"); +insert into indexes_to_be_dropped.t1 values (1, 2, "123"), (2, 3, "123"); +-- 6. Drop Table Column/Columns +create database column_s_to_be_dropped; +create table column_s_to_be_dropped.t0_column(id int primary key nonclustered, c int, name char(20)); +create table column_s_to_be_dropped.t1_column(id int primary key nonclustered, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); +create table column_s_to_be_dropped.t0_columns(id int primary key nonclustered, c int, name char(20)); +create table column_s_to_be_dropped.t1_columns(id int primary key nonclustered, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); + +create index k1 on column_s_to_be_dropped.t0_column (name); +create index k2 on column_s_to_be_dropped.t0_column (c); +create index k1 on column_s_to_be_dropped.t1_column (name); +create index k2 on column_s_to_be_dropped.t1_column (c); +create index k3 on column_s_to_be_dropped.t1_column (id, c); + +create index k1 on column_s_to_be_dropped.t0_columns (name); +create index k2 on column_s_to_be_dropped.t0_columns (c); +create index k1 on column_s_to_be_dropped.t1_columns (name); +create index k2 on column_s_to_be_dropped.t1_columns (c); +-- create index k3 on column_s_to_be_dropped.t1_columns (id, c); + +insert into column_s_to_be_dropped.t0_column values (1, 2, "123"), (2, 3, "123"); +insert into column_s_to_be_dropped.t1_column values (1, 2, "123"), (2, 3, "123"); +insert into column_s_to_be_dropped.t0_columns values (1, 2, "123"), (2, 3, "123"); +insert into column_s_to_be_dropped.t1_columns values (1, 2, "123"), (2, 3, "123"); +-- 7. Modify Table Column +create database column_to_be_modified; +create table column_to_be_modified.t0(id int primary key nonclustered, c int, name char(20)); +create table column_to_be_modified.t1(id int primary key nonclustered, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); + +create index k1 on column_to_be_modified.t0 (name); +create index k2 on column_to_be_modified.t0 (c); +create index k1 on column_to_be_modified.t1 (name); +create index k2 on column_to_be_modified.t1 (c); +create index k3 on column_to_be_modified.t1 (id, c); + +insert into column_to_be_modified.t0 values (1, 2, "123"), (2, 3, "123"); +insert into column_to_be_modified.t1 values (1, 2, "123"), (2, 3, "123"); \ No newline at end of file diff --git a/br/tests/br_pitr/run.sh b/br/tests/br_pitr/run.sh index 5006879e67852..51c72fb5e653e 100644 --- a/br/tests/br_pitr/run.sh +++ b/br/tests/br_pitr/run.sh @@ -15,37 +15,39 @@ # limitations under the License. set -eu +. run_services CUR=$(cd `dirname $0`; pwd) -DB="$TEST_NAME" # const value WAIT_DONE_CODE=50 WAIT_NOT_DONE_CODE=51 +PREFIX="pitr_backup" # NOTICE: don't start with 'br' because `restart services` would remove file/directory br*. # prepare the data echo "prepare the data" +run_sql_file $CUR/prepare_data/delete_range.sql # ... # start the log backup task echo "start log task" -run_br --pd $PD_ADDR log start --task integration_test -s "local://$TEST_DIR/$TEST_NAME/log" +run_br --pd $PD_ADDR log start --task-name integration_test -s "local://$TEST_DIR/$PREFIX/log" # run snapshot backup echo "run snapshot backup" -run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$TEST_NAME/full" +run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$PREFIX/full" # load the incremental data echo "load the incremental data" -run_sql_file $CUR/data/delete_range.sql +run_sql_file $CUR/incremental_data/delete_range.sql # ... # wait checkpoint advance echo "wait checkpoint advance" -OLD_GO_FAILPOINTS=$GO_FAILPOINTS sleep 10 now_time=$(date "+%Y-%m-%d %H:%M:%S %z") echo "get the current time: $now_time" export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/stream/only-checkpoint-ts-with-check=return(\"$now_time\")" +set +e # we need to get the exit code of br while true; do # run br with failpoint to compare the current checkpoint ts with the current time run_br --pd $PD_ADDR log status --task-name integration_test @@ -54,11 +56,13 @@ while true; do # the checkpoint has advanced if [ $exit_code -eq $WAIT_DONE_CODE ]; then + echo "the checkpoint has advanced" break fi # the checkpoint hasn't advanced if [ $exit_code -eq $WAIT_NOT_DONE_CODE ]; then + echo "the checkpoint hasn't advanced" sleep 10 continue fi @@ -67,21 +71,17 @@ while true; do echo "TEST: [$TEST_NAME] failed to wait checkpoint advance!" exit 1 done -export GO_FAILPOINTS=$OLD_GO_FAILPOINTS +set -e # dump some info from upstream cluster # ... # start a new cluster -echo "stop services" -stop_services -echo "clean up data" -rm -rf $TEST_DIR && mkdir -p $TEST_DIR -echo "start services" -start_services +echo "restart a services" +restart_services # PITR restore -run_br --pd $PD_ADDR restore point -s "local://$TEST_DIR/$TEST_NAME/log" --full-backup-storage "local://$TEST_DIR/$TEST_NAME/full" +run_br --pd $PD_ADDR restore point -s "local://$TEST_DIR/$PREFIX/log" --full-backup-storage "local://$TEST_DIR/$PREFIX/full" # check something in downstream cluster -# ... +run_sql "select * from mysql.delete_range" From 0c72098baeb758cb9858184a7a46b6de0ea480bb Mon Sep 17 00:00:00 2001 From: Leavrth Date: Wed, 18 Oct 2023 11:58:38 +0800 Subject: [PATCH 03/10] draft Signed-off-by: Leavrth --- br/pkg/stream/rewrite_meta_rawkv.go | 76 +++++++++++++++-------------- br/tests/br_pitr/run.sh | 11 ++++- 2 files changed, 49 insertions(+), 38 deletions(-) diff --git a/br/pkg/stream/rewrite_meta_rawkv.go b/br/pkg/stream/rewrite_meta_rawkv.go index 55086f17d99d4..ef9a23bf66943 100644 --- a/br/pkg/stream/rewrite_meta_rawkv.go +++ b/br/pkg/stream/rewrite_meta_rawkv.go @@ -708,7 +708,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { dbReplace, exist := sr.DbMap[job.SchemaID] if !exist { // skip this mddljob, the same below - log.Debug("try to drop a non-existent range, missing oldDBID", zap.Int64("oldDBID", job.SchemaID)) + log.Warn("[rewrite delete range] try to drop a non-existent range, missing oldDBID", zap.Int64("oldDBID", job.SchemaID)) return nil } @@ -744,14 +744,14 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { newTableIDs := make([]int64, 0, len(tableIDs)) for tableID, tableReplace := range dbReplace.TableMap { if _, exist := argsSet[tableID]; !exist { - log.Debug("DropSchema: record a table, but it doesn't exist in job args", + log.Warn("[rewrite delete range] DropSchema: record a table, but it doesn't exist in job args", zap.Int64("oldTableID", tableID)) continue } newTableIDs = append(newTableIDs, tableReplace.TableID) for partitionID, newPartitionID := range tableReplace.PartitionMap { if _, exist := argsSet[partitionID]; !exist { - log.Debug("DropSchema: record a partition, but it doesn't exist in job args", + log.Warn("[rewrite delete range] DropSchema: record a partition, but it doesn't exist in job args", zap.Int64("oldPartitionID", partitionID)) continue } @@ -760,8 +760,8 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { } if len(newTableIDs) != len(tableIDs) { - log.Debug( - "DropSchema: try to drop a non-existent table/partition, whose oldID doesn't exist in tableReplace") + log.Warn( + "[rewrite delete range] DropSchema: try to drop a non-existent table/partition, whose oldID doesn't exist in tableReplace") // only drop newTableIDs' ranges } @@ -774,7 +774,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { case model.ActionDropTable, model.ActionTruncateTable: tableReplace, exist := dbReplace.TableMap[job.TableID] if !exist { - log.Debug("DropTable/TruncateTable: try to drop a non-existent table, missing oldTableID", + log.Warn("[rewrite delete range] DropTable/TruncateTable: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) return nil } @@ -787,18 +787,19 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { return errors.Trace(err) } if len(physicalTableIDs) > 0 { - // delete partition id instead of table id - for i := 0; i < len(physicalTableIDs); i++ { - newPid, exist := tableReplace.PartitionMap[physicalTableIDs[i]] + newPhysicalTableIDs := make([]int64, 0, len(physicalTableIDs)) + // delete partition id + for _, oldPid := range physicalTableIDs { + newPid, exist := tableReplace.PartitionMap[oldPid] if !exist { - log.Debug("DropTable/TruncateTable: try to drop a non-existent table, missing oldPartitionID", - zap.Int64("oldPartitionID", physicalTableIDs[i])) + log.Warn("[rewrite delete range] DropTable/TruncateTable: try to drop a non-existent table, missing oldPartitionID", + zap.Int64("oldPartitionID", oldPid)) continue } - physicalTableIDs[i] = newPid + newPhysicalTableIDs = append(newPhysicalTableIDs, newPid) } - if len(physicalTableIDs) > 0 { - sr.insertDeleteRangeForTable(newJobID, physicalTableIDs) + if len(newPhysicalTableIDs) > 0 { + sr.insertDeleteRangeForTable(newJobID, newPhysicalTableIDs) } return nil } @@ -808,8 +809,8 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { case model.ActionDropTablePartition, model.ActionTruncateTablePartition: tableReplace, exist := dbReplace.TableMap[job.TableID] if !exist { - log.Debug( - "DropTablePartition/TruncateTablePartition: try to drop a non-existent table, missing oldTableID", + log.Warn( + "[rewrite delete range] DropTablePartition/TruncateTablePartition: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) return nil } @@ -818,18 +819,19 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { return errors.Trace(err) } - for i := 0; i < len(physicalTableIDs); i++ { - newPid, exist := tableReplace.PartitionMap[physicalTableIDs[i]] + newPhysicalTableIDs := make([]int64, 0, len(physicalTableIDs)) + for _, oldPid := range physicalTableIDs { + newPid, exist := tableReplace.PartitionMap[oldPid] if !exist { - log.Debug( - "DropTablePartition/TruncateTablePartition: try to drop a non-existent table, missing oldPartitionID", - zap.Int64("oldPartitionID", physicalTableIDs[i])) + log.Warn( + "[rewrite delete range] DropTablePartition/TruncateTablePartition: try to drop a non-existent table, missing oldPartitionID", + zap.Int64("oldPartitionID", oldPid)) continue } - physicalTableIDs[i] = newPid + newPhysicalTableIDs = append(newPhysicalTableIDs, newPid) } - if len(physicalTableIDs) > 0 { - sr.insertDeleteRangeForTable(newJobID, physicalTableIDs) + if len(newPhysicalTableIDs) > 0 { + sr.insertDeleteRangeForTable(newJobID, newPhysicalTableIDs) } return nil // ActionAddIndex, ActionAddPrimaryKey needs do it, because it needs to be rolled back when it's canceled. @@ -837,7 +839,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { // iff job.State = model.JobStateRollbackDone tableReplace, exist := dbReplace.TableMap[job.TableID] if !exist { - log.Debug("AddIndex/AddPrimaryKey roll-back: try to drop a non-existent table, missing oldTableID", + log.Warn("[rewrite delete range] AddIndex/AddPrimaryKey roll-back: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) return nil } @@ -856,8 +858,8 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { for _, oldPid := range partitionIDs { newPid, exist := tableReplace.PartitionMap[oldPid] if !exist { - log.Debug( - "AddIndex/AddPrimaryKey roll-back: try to drop a non-existent table, missing oldPartitionID", + log.Warn( + "[rewrite delete range] AddIndex/AddPrimaryKey roll-back: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) continue } @@ -871,7 +873,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { case model.ActionDropIndex, model.ActionDropPrimaryKey: tableReplace, exist := dbReplace.TableMap[job.TableID] if !exist { - log.Debug("DropIndex/DropPrimaryKey: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) + log.Warn("[rewrite delete range] DropIndex/DropPrimaryKey: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) return nil } @@ -890,7 +892,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { for _, oldPid := range partitionIDs { newPid, exist := tableReplace.PartitionMap[oldPid] if !exist { - log.Debug("DropIndex/DropPrimaryKey: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) + log.Warn("[rewrite delete range] DropIndex/DropPrimaryKey: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) continue } // len(indexIDs) = 1 @@ -913,7 +915,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { tableReplace, exist := dbReplace.TableMap[job.TableID] if !exist { - log.Debug("DropIndexes: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) + log.Warn("[rewrite delete range] DropIndexes: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) return nil } @@ -922,7 +924,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { for _, oldPid := range partitionIDs { newPid, exist := tableReplace.PartitionMap[oldPid] if !exist { - log.Debug("DropIndexes: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) + log.Warn("[rewrite delete range] DropIndexes: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) continue } sr.insertDeleteRangeForIndex(newJobID, &elementID, newPid, indexIDs) @@ -942,7 +944,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { if len(indexIDs) > 0 { tableReplace, exist := dbReplace.TableMap[job.TableID] if !exist { - log.Debug("DropColumn: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) + log.Warn("[rewrite delete range] DropColumn: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) return nil } @@ -951,7 +953,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { for _, oldPid := range partitionIDs { newPid, exist := tableReplace.PartitionMap[oldPid] if !exist { - log.Debug("DropColumn: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) + log.Warn("[rewrite delete range] DropColumn: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) continue } sr.insertDeleteRangeForIndex(newJobID, &elementID, newPid, indexIDs) @@ -972,7 +974,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { if len(indexIDs) > 0 { tableReplace, exist := dbReplace.TableMap[job.TableID] if !exist { - log.Debug("DropColumns: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) + log.Warn("[rewrite delete range] DropColumns: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) return nil } @@ -981,7 +983,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { for _, oldPid := range partitionIDs { newPid, exist := tableReplace.PartitionMap[oldPid] if !exist { - log.Debug("DropColumns: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) + log.Warn("[rewrite delete range] DropColumns: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) continue } sr.insertDeleteRangeForIndex(newJobID, &elementID, newPid, indexIDs) @@ -1001,7 +1003,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { } tableReplace, exist := dbReplace.TableMap[job.TableID] if !exist { - log.Debug("DropColumn: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) + log.Warn("[rewrite delete range] DropColumn: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) return nil } @@ -1010,7 +1012,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { for _, oldPid := range partitionIDs { newPid, exist := tableReplace.PartitionMap[oldPid] if !exist { - log.Debug("DropColumn: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) + log.Warn("[rewrite delete range] DropColumn: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) continue } sr.insertDeleteRangeForIndex(newJobID, &elementID, newPid, indexIDs) diff --git a/br/tests/br_pitr/run.sh b/br/tests/br_pitr/run.sh index 51c72fb5e653e..557aaf14ff813 100644 --- a/br/tests/br_pitr/run.sh +++ b/br/tests/br_pitr/run.sh @@ -48,6 +48,7 @@ now_time=$(date "+%Y-%m-%d %H:%M:%S %z") echo "get the current time: $now_time" export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/stream/only-checkpoint-ts-with-check=return(\"$now_time\")" set +e # we need to get the exit code of br +i=0 while true; do # run br with failpoint to compare the current checkpoint ts with the current time run_br --pd $PD_ADDR log status --task-name integration_test @@ -63,6 +64,11 @@ while true; do # the checkpoint hasn't advanced if [ $exit_code -eq $WAIT_NOT_DONE_CODE ]; then echo "the checkpoint hasn't advanced" + i=$((i+1)) + if [ "$i" -gt 50 ]; then + echo 'the checkpoint lag is too large' + exit 1 + fi sleep 10 continue fi @@ -81,7 +87,10 @@ echo "restart a services" restart_services # PITR restore +echo "run pitr" run_br --pd $PD_ADDR restore point -s "local://$TEST_DIR/$PREFIX/log" --full-backup-storage "local://$TEST_DIR/$PREFIX/full" # check something in downstream cluster -run_sql "select * from mysql.delete_range" +echo "check something" +run_sql "select count(*) DELETE_RANGE_CNT from mysql.gc_delete_range group by ts order by DELETE_RANGE_CNT desc limit 1;" +check_contains "DELETE_RANGE_CNT: 44" From b94b1dd4d8091dcbf1a0f044a3dba73481af4ab5 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Wed, 18 Oct 2023 13:49:22 +0800 Subject: [PATCH 04/10] add integration test Signed-off-by: Leavrth --- br/tests/br_pitr/run.sh | 11 ++++++++--- br/tests/run_group.sh | 2 +- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/br/tests/br_pitr/run.sh b/br/tests/br_pitr/run.sh index 557aaf14ff813..c5605aac21cca 100644 --- a/br/tests/br_pitr/run.sh +++ b/br/tests/br_pitr/run.sh @@ -22,6 +22,7 @@ CUR=$(cd `dirname $0`; pwd) WAIT_DONE_CODE=50 WAIT_NOT_DONE_CODE=51 PREFIX="pitr_backup" # NOTICE: don't start with 'br' because `restart services` would remove file/directory br*. +res_file="$TEST_DIR/sql_res.$TEST_NAME.txt" # prepare the data echo "prepare the data" @@ -88,9 +89,13 @@ restart_services # PITR restore echo "run pitr" -run_br --pd $PD_ADDR restore point -s "local://$TEST_DIR/$PREFIX/log" --full-backup-storage "local://$TEST_DIR/$PREFIX/full" +run_br --pd $PD_ADDR restore point -s "local://$TEST_DIR/$PREFIX/log" --full-backup-storage "local://$TEST_DIR/$PREFIX/full" > $res_file 2>&1 # check something in downstream cluster -echo "check something" +echo "check br log" +check_contains "restore log success summary" +# check_not_contains "rewrite delete range" +echo "" > $res_file +echo "check sql result" run_sql "select count(*) DELETE_RANGE_CNT from mysql.gc_delete_range group by ts order by DELETE_RANGE_CNT desc limit 1;" -check_contains "DELETE_RANGE_CNT: 44" +check_contains "DELETE_RANGE_CNT: 46" diff --git a/br/tests/run_group.sh b/br/tests/run_group.sh index 39068ab078427..8da15cab19a30 100755 --- a/br/tests/run_group.sh +++ b/br/tests/run_group.sh @@ -23,7 +23,7 @@ groups=( ["G00"]="br_300_small_tables br_backup_empty br_backup_version br_cache_table br_case_sensitive br_charset_gbk br_check_new_collocation_enable" ["G01"]="br_autoid br_crypter2 br_db br_db_online br_db_online_newkv br_db_skip br_debug_meta br_ebs br_foreign_key br_full" ["G02"]="br_full_cluster_restore br_full_ddl br_full_index br_gcs br_history" - ["G03"]='br_incompatible_tidb_config br_incremental br_incremental_ddl br_incremental_index' + ["G03"]='br_incompatible_tidb_config br_incremental br_incremental_ddl br_incremental_index br_pitr' ["G04"]='br_incremental_only_ddl br_incremental_same_table br_insert_after_restore br_key_locked br_log_test br_move_backup br_mv_index br_other br_partition_add_index' ["G05"]='br_range br_rawkv br_replica_read br_restore_TDE_enable br_restore_log_task_enable br_s3 br_shuffle_leader br_shuffle_region br_single_table' ["G06"]='br_skip_checksum br_small_batch_size br_split_region_fail br_systables br_table_filter br_txn' From dd9fb417fe88db88277c3895cc8a956d356b3d4c Mon Sep 17 00:00:00 2001 From: Leavrth Date: Wed, 18 Oct 2023 15:43:04 +0800 Subject: [PATCH 05/10] make bazel_prepare Signed-off-by: Leavrth --- br/pkg/stream/BUILD.bazel | 1 + br/tests/br_pitr/incremental_data/delete_range.sql | 2 +- br/tests/br_pitr/prepare_data/delete_range.sql | 2 +- br/tests/br_pitr/run.sh | 4 ++++ 4 files changed, 7 insertions(+), 2 deletions(-) diff --git a/br/pkg/stream/BUILD.bazel b/br/pkg/stream/BUILD.bazel index fc013f01bd3a1..c9d1b2e1c6df6 100644 --- a/br/pkg/stream/BUILD.bazel +++ b/br/pkg/stream/BUILD.bazel @@ -32,6 +32,7 @@ go_library( "@com_github_fatih_color//:color", "@com_github_klauspost_compress//zstd", "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/brpb", "@com_github_pingcap_kvproto//pkg/metapb", "@com_github_pingcap_log//:log", diff --git a/br/tests/br_pitr/incremental_data/delete_range.sql b/br/tests/br_pitr/incremental_data/delete_range.sql index 1c59a94b7394a..f5afde943649e 100644 --- a/br/tests/br_pitr/incremental_data/delete_range.sql +++ b/br/tests/br_pitr/incremental_data/delete_range.sql @@ -22,4 +22,4 @@ alter table column_s_to_be_dropped.t1_column drop column name; alter table column_s_to_be_dropped.t0_columns drop column name, drop column c; alter table column_s_to_be_dropped.t1_columns drop column name, drop column c; -- 7. Modify Table Column -alter table column_to_be_modified.t0 modify column name varchar(25); \ No newline at end of file +alter table column_to_be_modified.t0 modify column name varchar(25); diff --git a/br/tests/br_pitr/prepare_data/delete_range.sql b/br/tests/br_pitr/prepare_data/delete_range.sql index 835d8060b4622..e2a20be9e45fa 100644 --- a/br/tests/br_pitr/prepare_data/delete_range.sql +++ b/br/tests/br_pitr/prepare_data/delete_range.sql @@ -121,4 +121,4 @@ create index k2 on column_to_be_modified.t1 (c); create index k3 on column_to_be_modified.t1 (id, c); insert into column_to_be_modified.t0 values (1, 2, "123"), (2, 3, "123"); -insert into column_to_be_modified.t1 values (1, 2, "123"), (2, 3, "123"); \ No newline at end of file +insert into column_to_be_modified.t1 values (1, 2, "123"), (2, 3, "123"); diff --git a/br/tests/br_pitr/run.sh b/br/tests/br_pitr/run.sh index c5605aac21cca..9fd0069b42527 100644 --- a/br/tests/br_pitr/run.sh +++ b/br/tests/br_pitr/run.sh @@ -24,6 +24,10 @@ WAIT_NOT_DONE_CODE=51 PREFIX="pitr_backup" # NOTICE: don't start with 'br' because `restart services` would remove file/directory br*. res_file="$TEST_DIR/sql_res.$TEST_NAME.txt" +# start a new cluster +echo "restart a services" +restart_services + # prepare the data echo "prepare the data" run_sql_file $CUR/prepare_data/delete_range.sql From 4e5d41d0f8bd2eaa4cf2d24b43b02caac8b5688d Mon Sep 17 00:00:00 2001 From: Leavrth Date: Wed, 18 Oct 2023 16:23:12 +0800 Subject: [PATCH 06/10] make bazel happy Signed-off-by: Leavrth --- br/pkg/stream/stream_status.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/br/pkg/stream/stream_status.go b/br/pkg/stream/stream_status.go index bdfe0c314acba..4d4a52d57e51d 100644 --- a/br/pkg/stream/stream_status.go +++ b/br/pkg/stream/stream_status.go @@ -431,10 +431,8 @@ func (ctl *StatusController) PrintStatusOfTask(ctx context.Context, name string) } if checkpointTime.After(nowTime) { os.Exit(50) - } else { - os.Exit(51) } - failpoint.Return(nil) + os.Exit(51) } }) ctl.printToView(tasks) From 37cc15fad79485425964ba47fe7d7a7f77fb9970 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Wed, 18 Oct 2023 17:00:59 +0800 Subject: [PATCH 07/10] make test happy Signed-off-by: Leavrth --- br/pkg/stream/util_test.go | 2 +- br/tests/br_full/run.sh | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/br/pkg/stream/util_test.go b/br/pkg/stream/util_test.go index 63222b8ed7b0c..88a698312ace3 100644 --- a/br/pkg/stream/util_test.go +++ b/br/pkg/stream/util_test.go @@ -24,7 +24,7 @@ func TestDateFormat(t *testing.T) { "2022-07-15 20:32:12.734 +0800", }, { - 434605479096221697, + 434605478903808000, "2022-07-15 20:32:12 +0800", }, } diff --git a/br/tests/br_full/run.sh b/br/tests/br_full/run.sh index 78b28d51f8f4e..1753dfdc460d1 100755 --- a/br/tests/br_full/run.sh +++ b/br/tests/br_full/run.sh @@ -53,7 +53,7 @@ test_log="${TEST_DIR}/${DB}_test.log" error_str="not read from or written to within the timeout period" unset BR_LOG_TO_TERM -export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/backup/backup-storage-error=1*return(\"connection refused\")->1*return(\"InternalError\");github.com/pingcap/tidb/br/pkg/backup/backup-timeout-error=1*return(\"RequestTimeout\")->1*return(\"not read from or written to within the timeout period\")->1*return(\"InvalidPart\")"" +export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/backup/backup-storage-error=1*return(\"connection refused\")->1*return(\"InternalError\");github.com/pingcap/tidb/br/pkg/backup/backup-timeout-error=1*return(\"RequestTimeout\")->1*return(\"not read from or written to within the timeout period\")->1*return(\"InvalidPart\")" run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB-lz4" --concurrency 4 --compression lz4 --log-file $test_log export GO_FAILPOINTS="" size_lz4=$(du -d 0 $TEST_DIR/$DB-lz4 | awk '{print $1}') From 8a0a42e8dc70c6ae529109f33ceeec122a450a5c Mon Sep 17 00:00:00 2001 From: Leavrth Date: Fri, 20 Oct 2023 14:21:38 +0800 Subject: [PATCH 08/10] commit some suggestions Signed-off-by: Leavrth --- br/pkg/stream/rewrite_meta_rawkv.go | 50 +++++++++++++++-------------- br/pkg/stream/stream_status.go | 16 --------- br/pkg/stream/util.go | 4 --- br/pkg/stream/util_test.go | 6 +--- br/tests/br_full/run.sh | 2 +- br/tests/br_pitr/run.sh | 37 ++++++++------------- 6 files changed, 42 insertions(+), 73 deletions(-) diff --git a/br/pkg/stream/rewrite_meta_rawkv.go b/br/pkg/stream/rewrite_meta_rawkv.go index ef9a23bf66943..5366b60150d40 100644 --- a/br/pkg/stream/rewrite_meta_rawkv.go +++ b/br/pkg/stream/rewrite_meta_rawkv.go @@ -23,6 +23,7 @@ import ( backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/log" berrors "github.com/pingcap/tidb/br/pkg/errors" + "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/br/pkg/restore/ingestrec" "github.com/pingcap/tidb/br/pkg/restore/tiflashrec" "github.com/pingcap/tidb/pkg/kv" @@ -705,10 +706,11 @@ func (sr *SchemasReplace) restoreFromHistory(job *model.Job, isSubJob bool) erro } func (sr *SchemasReplace) deleteRange(job *model.Job) error { + lctx := logutil.ContextWithField(context.Background(), logutil.RedactAny("category", "ddl: rewrite delete range")) dbReplace, exist := sr.DbMap[job.SchemaID] if !exist { // skip this mddljob, the same below - log.Warn("[rewrite delete range] try to drop a non-existent range, missing oldDBID", zap.Int64("oldDBID", job.SchemaID)) + logutil.CL(lctx).Warn("try to drop a non-existent range, missing oldDBID", zap.Int64("oldDBID", job.SchemaID)) return nil } @@ -744,14 +746,14 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { newTableIDs := make([]int64, 0, len(tableIDs)) for tableID, tableReplace := range dbReplace.TableMap { if _, exist := argsSet[tableID]; !exist { - log.Warn("[rewrite delete range] DropSchema: record a table, but it doesn't exist in job args", + logutil.CL(lctx).Warn("DropSchema: record a table, but it doesn't exist in job args", zap.Int64("oldTableID", tableID)) continue } newTableIDs = append(newTableIDs, tableReplace.TableID) for partitionID, newPartitionID := range tableReplace.PartitionMap { if _, exist := argsSet[partitionID]; !exist { - log.Warn("[rewrite delete range] DropSchema: record a partition, but it doesn't exist in job args", + logutil.CL(lctx).Warn("DropSchema: record a partition, but it doesn't exist in job args", zap.Int64("oldPartitionID", partitionID)) continue } @@ -760,8 +762,8 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { } if len(newTableIDs) != len(tableIDs) { - log.Warn( - "[rewrite delete range] DropSchema: try to drop a non-existent table/partition, whose oldID doesn't exist in tableReplace") + logutil.CL(lctx).Warn( + "DropSchema: try to drop a non-existent table/partition, whose oldID doesn't exist in tableReplace") // only drop newTableIDs' ranges } @@ -774,7 +776,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { case model.ActionDropTable, model.ActionTruncateTable: tableReplace, exist := dbReplace.TableMap[job.TableID] if !exist { - log.Warn("[rewrite delete range] DropTable/TruncateTable: try to drop a non-existent table, missing oldTableID", + logutil.CL(lctx).Warn("DropTable/TruncateTable: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) return nil } @@ -792,7 +794,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { for _, oldPid := range physicalTableIDs { newPid, exist := tableReplace.PartitionMap[oldPid] if !exist { - log.Warn("[rewrite delete range] DropTable/TruncateTable: try to drop a non-existent table, missing oldPartitionID", + logutil.CL(lctx).Warn("DropTable/TruncateTable: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) continue } @@ -809,8 +811,8 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { case model.ActionDropTablePartition, model.ActionTruncateTablePartition: tableReplace, exist := dbReplace.TableMap[job.TableID] if !exist { - log.Warn( - "[rewrite delete range] DropTablePartition/TruncateTablePartition: try to drop a non-existent table, missing oldTableID", + logutil.CL(lctx).Warn( + "DropTablePartition/TruncateTablePartition: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) return nil } @@ -823,8 +825,8 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { for _, oldPid := range physicalTableIDs { newPid, exist := tableReplace.PartitionMap[oldPid] if !exist { - log.Warn( - "[rewrite delete range] DropTablePartition/TruncateTablePartition: try to drop a non-existent table, missing oldPartitionID", + logutil.CL(lctx).Warn( + "DropTablePartition/TruncateTablePartition: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) continue } @@ -839,7 +841,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { // iff job.State = model.JobStateRollbackDone tableReplace, exist := dbReplace.TableMap[job.TableID] if !exist { - log.Warn("[rewrite delete range] AddIndex/AddPrimaryKey roll-back: try to drop a non-existent table, missing oldTableID", + logutil.CL(lctx).Warn("AddIndex/AddPrimaryKey roll-back: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) return nil } @@ -858,8 +860,8 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { for _, oldPid := range partitionIDs { newPid, exist := tableReplace.PartitionMap[oldPid] if !exist { - log.Warn( - "[rewrite delete range] AddIndex/AddPrimaryKey roll-back: try to drop a non-existent table, missing oldPartitionID", + logutil.CL(lctx).Warn( + "AddIndex/AddPrimaryKey roll-back: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) continue } @@ -873,7 +875,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { case model.ActionDropIndex, model.ActionDropPrimaryKey: tableReplace, exist := dbReplace.TableMap[job.TableID] if !exist { - log.Warn("[rewrite delete range] DropIndex/DropPrimaryKey: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) + logutil.CL(lctx).Warn("DropIndex/DropPrimaryKey: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) return nil } @@ -892,7 +894,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { for _, oldPid := range partitionIDs { newPid, exist := tableReplace.PartitionMap[oldPid] if !exist { - log.Warn("[rewrite delete range] DropIndex/DropPrimaryKey: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) + logutil.CL(lctx).Warn("DropIndex/DropPrimaryKey: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) continue } // len(indexIDs) = 1 @@ -915,7 +917,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { tableReplace, exist := dbReplace.TableMap[job.TableID] if !exist { - log.Warn("[rewrite delete range] DropIndexes: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) + logutil.CL(lctx).Warn("DropIndexes: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) return nil } @@ -924,7 +926,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { for _, oldPid := range partitionIDs { newPid, exist := tableReplace.PartitionMap[oldPid] if !exist { - log.Warn("[rewrite delete range] DropIndexes: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) + logutil.CL(lctx).Warn("DropIndexes: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) continue } sr.insertDeleteRangeForIndex(newJobID, &elementID, newPid, indexIDs) @@ -944,7 +946,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { if len(indexIDs) > 0 { tableReplace, exist := dbReplace.TableMap[job.TableID] if !exist { - log.Warn("[rewrite delete range] DropColumn: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) + logutil.CL(lctx).Warn("DropColumn: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) return nil } @@ -953,7 +955,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { for _, oldPid := range partitionIDs { newPid, exist := tableReplace.PartitionMap[oldPid] if !exist { - log.Warn("[rewrite delete range] DropColumn: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) + logutil.CL(lctx).Warn("DropColumn: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) continue } sr.insertDeleteRangeForIndex(newJobID, &elementID, newPid, indexIDs) @@ -974,7 +976,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { if len(indexIDs) > 0 { tableReplace, exist := dbReplace.TableMap[job.TableID] if !exist { - log.Warn("[rewrite delete range] DropColumns: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) + logutil.CL(lctx).Warn("DropColumns: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) return nil } @@ -983,7 +985,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { for _, oldPid := range partitionIDs { newPid, exist := tableReplace.PartitionMap[oldPid] if !exist { - log.Warn("[rewrite delete range] DropColumns: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) + logutil.CL(lctx).Warn("DropColumns: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) continue } sr.insertDeleteRangeForIndex(newJobID, &elementID, newPid, indexIDs) @@ -1003,7 +1005,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { } tableReplace, exist := dbReplace.TableMap[job.TableID] if !exist { - log.Warn("[rewrite delete range] DropColumn: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) + logutil.CL(lctx).Warn("DropColumn: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) return nil } @@ -1012,7 +1014,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { for _, oldPid := range partitionIDs { newPid, exist := tableReplace.PartitionMap[oldPid] if !exist { - log.Warn("[rewrite delete range] DropColumn: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) + logutil.CL(lctx).Warn("DropColumn: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) continue } sr.insertDeleteRangeForIndex(newJobID, &elementID, newPid, indexIDs) diff --git a/br/pkg/stream/stream_status.go b/br/pkg/stream/stream_status.go index 4d4a52d57e51d..57b803d93d46d 100644 --- a/br/pkg/stream/stream_status.go +++ b/br/pkg/stream/stream_status.go @@ -8,7 +8,6 @@ import ( "encoding/json" "fmt" "io" - "os" "regexp" "strconv" "sync" @@ -16,7 +15,6 @@ import ( "github.com/fatih/color" "github.com/pingcap/errors" - "github.com/pingcap/failpoint" backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" @@ -421,20 +419,6 @@ func (ctl *StatusController) PrintStatusOfTask(ctx context.Context, name string) if err != nil { return err } - failpoint.Inject("only-checkpoint-ts-with-check", func(val failpoint.Value) { - if nowTs, ok := val.(string); ok && len(tasks) > 0 && len(nowTs) > 0 { - checkpointTime := oracle.GetTimeFromTS(tasks[0].globalCheckpoint) - log.Info("get checkpoint time", zap.Time("checkpoint ts", checkpointTime)) - nowTime, err := ParseDate(nowTs) - if err != nil { - failpoint.Return(err) - } - if checkpointTime.After(nowTime) { - os.Exit(50) - } - os.Exit(51) - } - }) ctl.printToView(tasks) return nil } diff --git a/br/pkg/stream/util.go b/br/pkg/stream/util.go index 12ff00ab1361f..10215a68df61d 100644 --- a/br/pkg/stream/util.go +++ b/br/pkg/stream/util.go @@ -13,10 +13,6 @@ func FormatDate(ts time.Time) string { return ts.Format(DATE_FORMAT) } -func ParseDate(date string) (time.Time, error) { - return time.Parse(DATE_FORMAT, date) -} - func IsMetaDBKey(key []byte) bool { return strings.HasPrefix(string(key), "mDB") } diff --git a/br/pkg/stream/util_test.go b/br/pkg/stream/util_test.go index 88a698312ace3..6dda62a04ad60 100644 --- a/br/pkg/stream/util_test.go +++ b/br/pkg/stream/util_test.go @@ -31,12 +31,8 @@ func TestDateFormat(t *testing.T) { timeZone, _ := time.LoadLocation("Asia/Shanghai") for _, ca := range cases { - ts := oracle.GetTimeFromTS(ca.ts).In(timeZone) - date := FormatDate(ts) + date := FormatDate(oracle.GetTimeFromTS(ca.ts).In(timeZone)) require.Equal(t, ca.target, date) - ts2, err := ParseDate(date) - require.NoError(t, err) - require.Equal(t, ts, ts2.In(timeZone)) } } diff --git a/br/tests/br_full/run.sh b/br/tests/br_full/run.sh index 1753dfdc460d1..78b28d51f8f4e 100755 --- a/br/tests/br_full/run.sh +++ b/br/tests/br_full/run.sh @@ -53,7 +53,7 @@ test_log="${TEST_DIR}/${DB}_test.log" error_str="not read from or written to within the timeout period" unset BR_LOG_TO_TERM -export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/backup/backup-storage-error=1*return(\"connection refused\")->1*return(\"InternalError\");github.com/pingcap/tidb/br/pkg/backup/backup-timeout-error=1*return(\"RequestTimeout\")->1*return(\"not read from or written to within the timeout period\")->1*return(\"InvalidPart\")" +export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/backup/backup-storage-error=1*return(\"connection refused\")->1*return(\"InternalError\");github.com/pingcap/tidb/br/pkg/backup/backup-timeout-error=1*return(\"RequestTimeout\")->1*return(\"not read from or written to within the timeout period\")->1*return(\"InvalidPart\")"" run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB-lz4" --concurrency 4 --compression lz4 --log-file $test_log export GO_FAILPOINTS="" size_lz4=$(du -d 0 $TEST_DIR/$DB-lz4 | awk '{print $1}') diff --git a/br/tests/br_pitr/run.sh b/br/tests/br_pitr/run.sh index 9fd0069b42527..31950d08101bd 100644 --- a/br/tests/br_pitr/run.sh +++ b/br/tests/br_pitr/run.sh @@ -19,8 +19,6 @@ set -eu CUR=$(cd `dirname $0`; pwd) # const value -WAIT_DONE_CODE=50 -WAIT_NOT_DONE_CODE=51 PREFIX="pitr_backup" # NOTICE: don't start with 'br' because `restart services` would remove file/directory br*. res_file="$TEST_DIR/sql_res.$TEST_NAME.txt" @@ -49,25 +47,20 @@ run_sql_file $CUR/incremental_data/delete_range.sql # wait checkpoint advance echo "wait checkpoint advance" sleep 10 -now_time=$(date "+%Y-%m-%d %H:%M:%S %z") -echo "get the current time: $now_time" -export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/stream/only-checkpoint-ts-with-check=return(\"$now_time\")" -set +e # we need to get the exit code of br +current_ts=$(echo $(($(date --date "2022-05-15 19:00:00.123 +0800" +%s%3N) << 18))) i=0 while true; do # run br with failpoint to compare the current checkpoint ts with the current time - run_br --pd $PD_ADDR log status --task-name integration_test - exit_code=$? - echo "exit code: $exit_code" - - # the checkpoint has advanced - if [ $exit_code -eq $WAIT_DONE_CODE ]; then - echo "the checkpoint has advanced" - break - fi + checkpoint_ts=$(run_br --pd $PD_ADDR log status --task-name integration_test | jq 'if .[0].last_errors | length == 0 then .[0].checkpoint else empty end' | tail -n 1) - # the checkpoint hasn't advanced - if [ $exit_code -eq $WAIT_NOT_DONE_CODE ]; then + # check whether the checkpoint ts is a number + if [ $checkpoint_ts -gt 0 ] 2>/dev/null; then + # check whether the checkpoint has advanced + if [ $checkpoint_ts -gt $current_ts ]; then + echo "the checkpoint has advanced" + break + fi + # the checkpoint hasn't advanced echo "the checkpoint hasn't advanced" i=$((i+1)) if [ "$i" -gt 50 ]; then @@ -75,14 +68,12 @@ while true; do exit 1 fi sleep 10 - continue + else + # unknown status, maybe somewhere is wrong + echo "TEST: [$TEST_NAME] failed to wait checkpoint advance!" + exit 1 fi - - # unknown status, maybe somewhere is wrong - echo "TEST: [$TEST_NAME] failed to wait checkpoint advance!" - exit 1 done -set -e # dump some info from upstream cluster # ... From 71c7f8af95d7a7ddac0e262109da8f45ed9f7a01 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Fri, 20 Oct 2023 14:25:41 +0800 Subject: [PATCH 09/10] make bazel happy Signed-off-by: Leavrth --- br/pkg/stream/BUILD.bazel | 1 - 1 file changed, 1 deletion(-) diff --git a/br/pkg/stream/BUILD.bazel b/br/pkg/stream/BUILD.bazel index c9d1b2e1c6df6..fc013f01bd3a1 100644 --- a/br/pkg/stream/BUILD.bazel +++ b/br/pkg/stream/BUILD.bazel @@ -32,7 +32,6 @@ go_library( "@com_github_fatih_color//:color", "@com_github_klauspost_compress//zstd", "@com_github_pingcap_errors//:errors", - "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/brpb", "@com_github_pingcap_kvproto//pkg/metapb", "@com_github_pingcap_log//:log", From ccb8373394724d9030133e9f2f839c7b6ee12616 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Mon, 23 Oct 2023 10:11:34 +0800 Subject: [PATCH 10/10] fix integration test br_pitr Signed-off-by: Leavrth --- br/tests/br_pitr/run.sh | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/br/tests/br_pitr/run.sh b/br/tests/br_pitr/run.sh index 31950d08101bd..25a7fda5588f2 100644 --- a/br/tests/br_pitr/run.sh +++ b/br/tests/br_pitr/run.sh @@ -47,11 +47,15 @@ run_sql_file $CUR/incremental_data/delete_range.sql # wait checkpoint advance echo "wait checkpoint advance" sleep 10 -current_ts=$(echo $(($(date --date "2022-05-15 19:00:00.123 +0800" +%s%3N) << 18))) +current_ts=$(echo $(($(date +%s%3N) << 18))) +echo "current ts: $current_ts" i=0 while true; do - # run br with failpoint to compare the current checkpoint ts with the current time - checkpoint_ts=$(run_br --pd $PD_ADDR log status --task-name integration_test | jq 'if .[0].last_errors | length == 0 then .[0].checkpoint else empty end' | tail -n 1) + # extract the checkpoint ts of the log backup task. If there is some error, the checkpoint ts should be empty + log_backup_status=$(unset BR_LOG_TO_TERM && run_br --pd $PD_ADDR log status --task-name integration_test --json 2>/dev/null) + echo "log backup status: $log_backup_status" + checkpoint_ts=$(echo "$log_backup_status" | head -n 1 | jq 'if .[0].last_errors | length == 0 then .[0].checkpoint else empty end') + echo "checkpoint ts: $checkpoint_ts" # check whether the checkpoint ts is a number if [ $checkpoint_ts -gt 0 ] 2>/dev/null; then