Skip to content

Commit

Permalink
br: add integration test for ingest corner case test (#52734) (#52767)
Browse files Browse the repository at this point in the history
close #52733
  • Loading branch information
ti-chi-bot authored Apr 24, 2024
1 parent 85ae939 commit 4546180
Show file tree
Hide file tree
Showing 10 changed files with 251 additions and 33 deletions.
1 change: 0 additions & 1 deletion br/pkg/task/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ go_library(
"//br/pkg/utils",
"//br/pkg/version",
"//pkg/config",
"//pkg/ddl",
"//pkg/kv",
"//pkg/parser/model",
"//pkg/parser/mysql",
Expand Down
5 changes: 0 additions & 5 deletions br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ import (
"github.com/pingcap/tidb/br/pkg/streamhelper/daemon"
"github.com/pingcap/tidb/br/pkg/summary"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/pkg/ddl"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/util/cdcutil"
Expand Down Expand Up @@ -469,10 +468,6 @@ func (s *streamMgr) checkStreamStartEnable(g glue.Glue) error {
return errors.New("Unable to create task about log-backup. " +
"please set TiKV config `log-backup.enable` to true and restart TiKVs.")
}
if !ddl.IngestJobsNotExisted(se.GetSessionCtx()) {
return errors.Annotate(berrors.ErrUnknown,
"Unable to create log backup task. Please wait until the DDL jobs(add index with ingest method) are finished.")
}

return nil
}
Expand Down
28 changes: 28 additions & 0 deletions br/tests/br_pitr_failpoint/check/check_ingest_repair.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/bin/bash
#
# Copyright 2024 PingCAP, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

set -eu

# check index schema
## check table test.pairs
run_sql "SHOW INDEX FROM test.pairs WHERE Key_name = 'i1';"
check_contains "Column_name: y"
check_contains "Column_name: z"

# check index data
run_sql "select count(*) AS RESCNT from test.pairs use index(i1) where y = 0 and z = 0;"
check_not_contains "RESCNT: 0"
run_sql "admin check table test.pairs;"
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE test.pairs ADD INDEX i1(y, z);
3 changes: 3 additions & 0 deletions br/tests/br_pitr_failpoint/prepare_data/ingest_repair.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
CREATE TABLE test.pairs(x int auto_increment primary key, y int DEFAULT RAND(), z int DEFAULT RAND());
INSERT INTO test.pairs (y,z) VALUES (0,0);
INSERT INTO test.pairs VALUES (),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),();
183 changes: 183 additions & 0 deletions br/tests/br_pitr_failpoint/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
#!/bin/bash
#
# Copyright 2024 PingCAP, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

set -eu
. run_services
CUR=$(cd `dirname $0`; pwd)

# const value
PREFIX="pitr_backup_failpoint" # NOTICE: don't start with 'br' because `restart services` would remove file/directory br*.
res_file="$TEST_DIR/sql_res.$TEST_NAME.txt"
hint_sig_file_public=$TEST_DIR/hint_sig_file_public
hint_sig_file_history=$TEST_DIR/hint_sig_file_history

# inject some failpoints for TiDB-server
export GO_FAILPOINTS="github.com/pingcap/tidb/pkg/ddl/create-index-stuck-before-public=return(\"$hint_sig_file_public\");\
github.com/pingcap/tidb/pkg/ddl/create-index-stuck-before-ddlhistory=return(\"$hint_sig_file_history\")"

# start a new cluster
echo "restart a services"
restart_services

# prepare the data
echo "prepare the data"
run_sql_file $CUR/prepare_data/ingest_repair.sql

# prepare the intersect data
run_sql_file $CUR/intersect_data/ingest_repair1.sql &
sql_pid=$!

# start the log backup task
echo "start log task"
run_br --pd $PD_ADDR log start --task-name integration_test -s "local://$TEST_DIR/$PREFIX/log"

# wait until the index creation is running
retry_cnt=0
while true; do
run_sql "ADMIN SHOW DDL JOBS WHERE DB_NAME = 'test' AND TABLE_NAME = 'pairs' AND STATE = 'running' AND SCHEMA_STATE = 'write reorganization' AND JOB_TYPE = 'add index /* ingest */';"
if grep -Fq "1. row" $res_file; then
break
fi

retry_cnt=$((retry_cnt+1))
if [ "$retry_cnt" -gt 50 ]; then
echo 'the wait lag is too large'
exit 1
fi

sleep 1
done

# run snapshot backup 1 -- before the index becomes public
echo "run snapshot backup"
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$PREFIX/full-1"

# advance the progress of index creation, make the index become public
touch $hint_sig_file_public

# wait until the index creation is done
retry_cnt=0
while true; do
run_sql "ADMIN SHOW DDL JOBS WHERE DB_NAME = 'test' AND TABLE_NAME = 'pairs' AND STATE = 'done' AND SCHEMA_STATE = 'public' AND JOB_TYPE = 'add index /* ingest */';"
if grep -Fq "1. row" $res_file; then
break
fi

retry_cnt=$((retry_cnt+1))
if [ "$retry_cnt" -gt 50 ]; then
echo 'the wait lag is too large'
exit 1
fi

sleep 1
done

# run snapshot backup 2 -- before the ddl history is generated
echo "run snapshot backup"
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$PREFIX/full-2"

# advance the progress of index creation, generate ddl history
touch $hint_sig_file_history

# wait index creation done
wait $sql_pid

# wait until the index creation is done
retry_cnt=0
while true; do
run_sql "ADMIN SHOW DDL JOBS WHERE DB_NAME = 'test' AND TABLE_NAME = 'pairs' AND STATE = 'synced' AND SCHEMA_STATE = 'public' AND JOB_TYPE = 'add index /* ingest */';"
if grep -Fq "1. row" $res_file; then
break
fi

retry_cnt=$((retry_cnt+1))
if [ "$retry_cnt" -gt 50 ]; then
echo 'the wait lag is too large'
exit 1
fi

sleep 1
done

# clean the failpoints
export GO_FAILPOINTS=""

# check something in the upstream
run_sql "SHOW INDEX FROM test.pairs WHERE Key_name = 'i1';"
check_contains "Column_name: y"
check_contains "Column_name: z"

# wait checkpoint advance
echo "wait checkpoint advance"
sleep 10
current_ts=$(echo $(($(date +%s%3N) << 18)))
echo "current ts: $current_ts"
i=0
while true; do
# extract the checkpoint ts of the log backup task. If there is some error, the checkpoint ts should be empty
log_backup_status=$(unset BR_LOG_TO_TERM && run_br --pd $PD_ADDR log status --task-name integration_test --json 2>/dev/null)
echo "log backup status: $log_backup_status"
checkpoint_ts=$(echo "$log_backup_status" | head -n 1 | jq 'if .[0].last_errors | length == 0 then .[0].checkpoint else empty end')
echo "checkpoint ts: $checkpoint_ts"

# check whether the checkpoint ts is a number
if [ $checkpoint_ts -gt 0 ] 2>/dev/null; then
# check whether the checkpoint has advanced
if [ $checkpoint_ts -gt $current_ts ]; then
echo "the checkpoint has advanced"
break
fi
# the checkpoint hasn't advanced
echo "the checkpoint hasn't advanced"
i=$((i+1))
if [ "$i" -gt 50 ]; then
echo 'the checkpoint lag is too large'
exit 1
fi
sleep 10
else
# unknown status, maybe somewhere is wrong
echo "TEST: [$TEST_NAME] failed to wait checkpoint advance!"
exit 1
fi
done

# start a new cluster
echo "restart a services"
restart_services

# PITR restore - 1
echo "run pitr 1 -- before the index becomes public"
run_br --pd $PD_ADDR restore point -s "local://$TEST_DIR/$PREFIX/log" --full-backup-storage "local://$TEST_DIR/$PREFIX/full-1" > $res_file 2>&1

# check something in downstream cluster
echo "check br log"
check_contains "restore log success summary"
## check feature compatibility between PITR and accelerate indexing
bash $CUR/check/check_ingest_repair.sh

# Clean the data
run_sql "DROP DATABASE test; CREATE DATABASE test;"

# PITR restore - 2
echo "run pitr 2 -- before the index becomes public"
run_br --pd $PD_ADDR restore point -s "local://$TEST_DIR/$PREFIX/log" --full-backup-storage "local://$TEST_DIR/$PREFIX/full-2" > $res_file 2>&1

# check something in downstream cluster
echo "check br log"
check_contains "restore log success summary"
## check feature compatibility between PITR and accelerate indexing
bash $CUR/check/check_ingest_repair.sh
3 changes: 3 additions & 0 deletions br/tests/config/tikv.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,6 @@ data-encryption-method = "aes256-ctr"
[security.encryption.master-key]
type = "file"
path = "/tmp/backup_restore_test/master-key-file"

[log-backup]
max-flush-interval = "50s"
2 changes: 1 addition & 1 deletion br/tests/run_group_br_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ groups=(
["G00"]="br_300_small_tables br_backup_empty br_backup_version br_cache_table br_case_sensitive br_charset_gbk br_check_new_collocation_enable"
["G01"]="br_autoid br_crypter2 br_db br_db_online br_db_online_newkv br_db_skip br_debug_meta br_ebs br_foreign_key br_full"
["G02"]="br_full_cluster_restore br_full_ddl br_full_index br_gcs br_history"
["G03"]='br_incompatible_tidb_config br_incremental br_incremental_ddl br_incremental_index br_pitr'
["G03"]='br_incompatible_tidb_config br_incremental br_incremental_ddl br_incremental_index br_pitr br_pitr_failpoint'
["G04"]='br_incremental_only_ddl br_incremental_same_table br_insert_after_restore br_key_locked br_log_test br_move_backup br_mv_index br_other br_partition_add_index'
["G05"]='br_range br_rawkv br_replica_read br_restore_TDE_enable br_restore_log_task_enable br_s3 br_shuffle_leader br_shuffle_region br_single_table'
["G06"]='br_skip_checksum br_small_batch_size br_split_region_fail br_systables br_table_filter br_txn br_stats'
Expand Down
16 changes: 16 additions & 0 deletions pkg/ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"math/rand"
"os"
"strconv"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -918,6 +919,21 @@ func (w *worker) HandleDDLJobTable(d *ddlCtx, job *model.Job) (int64, error) {
if job.IsDone() {
job.State = model.JobStateSynced
}
// Inject the failpoint to prevent the progress of index creation.
failpoint.Inject("create-index-stuck-before-ddlhistory", func(v failpoint.Value) {
if sigFile, ok := v.(string); ok && job.Type == model.ActionAddIndex {
for {
time.Sleep(1 * time.Second)
if _, err := os.Stat(sigFile); err != nil {
if os.IsNotExist(err) {
continue
}
failpoint.Return(0, errors.Trace(err))
}
break
}
}
})
err = w.HandleJobDone(d, job, t)
return 0, err
}
Expand Down
42 changes: 16 additions & 26 deletions pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,22 @@ SwitchIndexState:
indexInfo.State = model.StatePublic
}

// Inject the failpoint to prevent the progress of index creation.
failpoint.Inject("create-index-stuck-before-public", func(v failpoint.Value) {
if sigFile, ok := v.(string); ok {
for {
time.Sleep(1 * time.Second)
if _, err := os.Stat(sigFile); err != nil {
if os.IsNotExist(err) {
continue
}
failpoint.Return(ver, errors.Trace(err))
}
break
}
}
})

ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != model.StatePublic)
if err != nil {
return ver, errors.Trace(err)
Expand Down Expand Up @@ -858,32 +874,6 @@ func cleanupSortPath(ctx context.Context, currentJobID int64) error {
return nil
}

// IngestJobsNotExisted checks the ddl about `add index` with ingest method not existed.
func IngestJobsNotExisted(ctx sessionctx.Context) bool {
se := sess.NewSession(ctx)
template := "select job_meta from mysql.tidb_ddl_job where reorg and (type = %d or type = %d) and processing;"
sql := fmt.Sprintf(template, model.ActionAddIndex, model.ActionAddPrimaryKey)
rows, err := se.Execute(context.Background(), sql, "check-pitr")
if err != nil {
logutil.BgLogger().Warn("cannot check ingest job", zap.Error(err))
return false
}
for _, row := range rows {
jobBinary := row.GetBytes(0)
runJob := model.Job{}
err := runJob.Decode(jobBinary)
if err != nil {
logutil.BgLogger().Warn("cannot check ingest job", zap.Error(err))
return false
}
// Check whether this add index job is using lightning to do the backfill work.
if runJob.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge {
return false
}
}
return true
}

func doReorgWorkForCreateIndexMultiSchema(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
tbl table.Table, allIndexInfos []*model.IndexInfo) (done bool, ver int64, err error) {
if job.MultiSchemaInfo.Revertible {
Expand Down

0 comments on commit 4546180

Please sign in to comment.