Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: add integration test for ingest corner case test #52734

Merged
merged 9 commits into from
Apr 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion br/pkg/task/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ go_library(
"//br/pkg/utils",
"//br/pkg/version",
"//pkg/config",
"//pkg/ddl",
"//pkg/kv",
"//pkg/parser/model",
"//pkg/parser/mysql",
Expand Down
5 changes: 0 additions & 5 deletions br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ import (
"github.com/pingcap/tidb/br/pkg/streamhelper/daemon"
"github.com/pingcap/tidb/br/pkg/summary"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/pkg/ddl"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/util/cdcutil"
Expand Down Expand Up @@ -469,10 +468,6 @@ func (s *streamMgr) checkStreamStartEnable(g glue.Glue) error {
return errors.New("Unable to create task about log-backup. " +
"please set TiKV config `log-backup.enable` to true and restart TiKVs.")
}
if !ddl.IngestJobsNotExisted(se.GetSessionCtx()) {
return errors.Annotate(berrors.ErrUnknown,
"Unable to create log backup task. Please wait until the DDL jobs(add index with ingest method) are finished.")
}

return nil
}
Expand Down
2 changes: 1 addition & 1 deletion br/tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ This folder contains all tests which relies on external processes such as TiDB.
br/tests/download_integration_test_binaries.sh
mkdir -p bin && mv third_bin/* bin/
rm -rf third_bin/
make #make tidb
make failpoint-enable && make && make failpoint-disable #make tidb
```

2. The following programs must be installed:
Expand Down
28 changes: 28 additions & 0 deletions br/tests/br_pitr_failpoint/check/check_ingest_repair.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/bin/bash
#
# Copyright 2024 PingCAP, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

set -eu

# check index schema
## check table test.pairs
run_sql "SHOW INDEX FROM test.pairs WHERE Key_name = 'i1';"
check_contains "Column_name: y"
check_contains "Column_name: z"

# check index data
Leavrth marked this conversation as resolved.
Show resolved Hide resolved
run_sql "select count(*) AS RESCNT from test.pairs use index(i1) where y = 0 and z = 0;"
check_not_contains "RESCNT: 0"
run_sql "admin check table test.pairs;"
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE test.pairs ADD INDEX i1(y, z);
3 changes: 3 additions & 0 deletions br/tests/br_pitr_failpoint/prepare_data/ingest_repair.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
CREATE TABLE test.pairs(x int auto_increment primary key, y int DEFAULT RAND(), z int DEFAULT RAND());
INSERT INTO test.pairs (y,z) VALUES (0,0);
INSERT INTO test.pairs VALUES (),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),();
183 changes: 183 additions & 0 deletions br/tests/br_pitr_failpoint/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
#!/bin/bash
#
# Copyright 2024 PingCAP, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

set -eu
. run_services
CUR=$(cd `dirname $0`; pwd)

# const value
PREFIX="pitr_backup_failpoint" # NOTICE: don't start with 'br' because `restart services` would remove file/directory br*.
res_file="$TEST_DIR/sql_res.$TEST_NAME.txt"
hint_sig_file_public=$TEST_DIR/hint_sig_file_public
hint_sig_file_history=$TEST_DIR/hint_sig_file_history

# inject some failpoints for TiDB-server
export GO_FAILPOINTS="github.com/pingcap/tidb/pkg/ddl/create-index-stuck-before-public=return(\"$hint_sig_file_public\");\
github.com/pingcap/tidb/pkg/ddl/create-index-stuck-before-ddlhistory=return(\"$hint_sig_file_history\")"

# start a new cluster
echo "restart a services"
restart_services

# prepare the data
echo "prepare the data"
run_sql_file $CUR/prepare_data/ingest_repair.sql

# prepare the intersect data
run_sql_file $CUR/intersect_data/ingest_repair1.sql &
sql_pid=$!

# start the log backup task
echo "start log task"
run_br --pd $PD_ADDR log start --task-name integration_test -s "local://$TEST_DIR/$PREFIX/log"
BornChanger marked this conversation as resolved.
Show resolved Hide resolved

# wait until the index creation is running
retry_cnt=0
while true; do
run_sql "ADMIN SHOW DDL JOBS WHERE DB_NAME = 'test' AND TABLE_NAME = 'pairs' AND STATE = 'running' AND SCHEMA_STATE = 'write reorganization' AND JOB_TYPE = 'add index /* ingest */';"
if grep -Fq "1. row" $res_file; then
break
fi

retry_cnt=$((retry_cnt+1))
if [ "$retry_cnt" -gt 50 ]; then
echo 'the wait lag is too large'
exit 1
fi

sleep 1
done

# run snapshot backup 1 -- before the index becomes public
echo "run snapshot backup"
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$PREFIX/full-1"

# advance the progress of index creation, make the index become public
touch $hint_sig_file_public

# wait until the index creation is done
retry_cnt=0
while true; do
run_sql "ADMIN SHOW DDL JOBS WHERE DB_NAME = 'test' AND TABLE_NAME = 'pairs' AND STATE = 'done' AND SCHEMA_STATE = 'public' AND JOB_TYPE = 'add index /* ingest */';"
if grep -Fq "1. row" $res_file; then
break
fi

retry_cnt=$((retry_cnt+1))
if [ "$retry_cnt" -gt 50 ]; then
echo 'the wait lag is too large'
exit 1
fi

sleep 1
done

# run snapshot backup 2 -- before the ddl history is generated
echo "run snapshot backup"
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$PREFIX/full-2"

# advance the progress of index creation, generate ddl history
touch $hint_sig_file_history

# wait index creation done
wait $sql_pid

# wait until the index creation is done
retry_cnt=0
while true; do
run_sql "ADMIN SHOW DDL JOBS WHERE DB_NAME = 'test' AND TABLE_NAME = 'pairs' AND STATE = 'synced' AND SCHEMA_STATE = 'public' AND JOB_TYPE = 'add index /* ingest */';"
if grep -Fq "1. row" $res_file; then
break
fi

retry_cnt=$((retry_cnt+1))
if [ "$retry_cnt" -gt 50 ]; then
echo 'the wait lag is too large'
exit 1
fi

sleep 1
done

# clean the failpoints
export GO_FAILPOINTS=""

# check something in the upstream
run_sql "SHOW INDEX FROM test.pairs WHERE Key_name = 'i1';"
check_contains "Column_name: y"
check_contains "Column_name: z"

# wait checkpoint advance
echo "wait checkpoint advance"
sleep 10
current_ts=$(echo $(($(date +%s%3N) << 18)))
echo "current ts: $current_ts"
i=0
while true; do
# extract the checkpoint ts of the log backup task. If there is some error, the checkpoint ts should be empty
log_backup_status=$(unset BR_LOG_TO_TERM && run_br --pd $PD_ADDR log status --task-name integration_test --json 2>/dev/null)
echo "log backup status: $log_backup_status"
checkpoint_ts=$(echo "$log_backup_status" | head -n 1 | jq 'if .[0].last_errors | length == 0 then .[0].checkpoint else empty end')
echo "checkpoint ts: $checkpoint_ts"

# check whether the checkpoint ts is a number
if [ $checkpoint_ts -gt 0 ] 2>/dev/null; then
# check whether the checkpoint has advanced
if [ $checkpoint_ts -gt $current_ts ]; then
echo "the checkpoint has advanced"
break
fi
# the checkpoint hasn't advanced
echo "the checkpoint hasn't advanced"
i=$((i+1))
if [ "$i" -gt 50 ]; then
echo 'the checkpoint lag is too large'
exit 1
fi
sleep 10
else
# unknown status, maybe somewhere is wrong
echo "TEST: [$TEST_NAME] failed to wait checkpoint advance!"
exit 1
fi
done

# start a new cluster
echo "restart a services"
restart_services

# PITR restore - 1
echo "run pitr 1 -- before the index becomes public"
run_br --pd $PD_ADDR restore point -s "local://$TEST_DIR/$PREFIX/log" --full-backup-storage "local://$TEST_DIR/$PREFIX/full-1" > $res_file 2>&1

# check something in downstream cluster
echo "check br log"
check_contains "restore log success summary"
## check feature compatibility between PITR and accelerate indexing
bash $CUR/check/check_ingest_repair.sh

# Clean the data
run_sql "DROP DATABASE test; CREATE DATABASE test;"

# PITR restore - 2
echo "run pitr 2 -- before the index becomes public"
run_br --pd $PD_ADDR restore point -s "local://$TEST_DIR/$PREFIX/log" --full-backup-storage "local://$TEST_DIR/$PREFIX/full-2" > $res_file 2>&1

# check something in downstream cluster
echo "check br log"
check_contains "restore log success summary"
## check feature compatibility between PITR and accelerate indexing
bash $CUR/check/check_ingest_repair.sh
3 changes: 3 additions & 0 deletions br/tests/config/tikv.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,6 @@ data-encryption-method = "aes256-ctr"
[security.encryption.master-key]
type = "file"
path = "/tmp/backup_restore_test/master-key-file"

[log-backup]
max-flush-interval = "50s"
2 changes: 1 addition & 1 deletion br/tests/run_group_br_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ groups=(
["G00"]="br_300_small_tables br_backup_empty br_backup_version br_cache_table br_case_sensitive br_charset_gbk br_check_new_collocation_enable"
["G01"]="br_autoid br_crypter2 br_db br_db_online br_db_online_newkv br_db_skip br_debug_meta br_ebs br_foreign_key br_full br_table_partition"
["G02"]="br_full_cluster_restore br_full_ddl br_full_index br_gcs br_history"
["G03"]='br_incompatible_tidb_config br_incremental br_incremental_ddl br_incremental_index br_pitr'
["G03"]='br_incompatible_tidb_config br_incremental br_incremental_ddl br_incremental_index br_pitr br_pitr_failpoint'
["G04"]='br_incremental_only_ddl br_incremental_same_table br_insert_after_restore br_key_locked br_log_test br_move_backup br_mv_index br_other br_partition_add_index br_tidb_placement_policy br_tiflash'
["G05"]='br_range br_rawkv br_replica_read br_restore_TDE_enable br_restore_log_task_enable br_s3 br_shuffle_leader br_shuffle_region br_single_table'
["G06"]='br_skip_checksum br_small_batch_size br_split_region_fail br_systables br_table_filter br_txn br_stats br_clustered_index br_crypter'
Expand Down
16 changes: 16 additions & 0 deletions pkg/ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"math/rand"
"os"
"strconv"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -907,6 +908,21 @@ func (w *worker) HandleDDLJobTable(d *ddlCtx, job *model.Job) (int64, error) {
if job.IsDone() {
job.State = model.JobStateSynced
}
// Inject the failpoint to prevent the progress of index creation.
failpoint.Inject("create-index-stuck-before-ddlhistory", func(v failpoint.Value) {
if sigFile, ok := v.(string); ok && job.Type == model.ActionAddIndex {
for {
time.Sleep(1 * time.Second)
if _, err := os.Stat(sigFile); err != nil {
if os.IsNotExist(err) {
continue
}
failpoint.Return(0, errors.Trace(err))
}
break
}
}
})
err = w.HandleJobDone(d, job, t)
return 0, err
}
Expand Down
42 changes: 16 additions & 26 deletions pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,22 @@ SwitchIndexState:
indexInfo.State = model.StatePublic
}

// Inject the failpoint to prevent the progress of index creation.
failpoint.Inject("create-index-stuck-before-public", func(v failpoint.Value) {
if sigFile, ok := v.(string); ok {
for {
time.Sleep(1 * time.Second)
if _, err := os.Stat(sigFile); err != nil {
if os.IsNotExist(err) {
continue
}
failpoint.Return(ver, errors.Trace(err))
}
break
}
}
})

ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != model.StatePublic)
if err != nil {
return ver, errors.Trace(err)
Expand Down Expand Up @@ -862,32 +878,6 @@ func cleanupSortPath(ctx context.Context, currentJobID int64) error {
return nil
}

// IngestJobsNotExisted checks the ddl about `add index` with ingest method not existed.
Benjamin2037 marked this conversation as resolved.
Show resolved Hide resolved
func IngestJobsNotExisted(ctx sessionctx.Context) bool {
se := sess.NewSession(ctx)
template := "select job_meta from mysql.tidb_ddl_job where reorg and (type = %d or type = %d) and processing;"
sql := fmt.Sprintf(template, model.ActionAddIndex, model.ActionAddPrimaryKey)
rows, err := se.Execute(context.Background(), sql, "check-pitr")
if err != nil {
logutil.BgLogger().Warn("cannot check ingest job", zap.Error(err))
return false
}
for _, row := range rows {
jobBinary := row.GetBytes(0)
runJob := model.Job{}
err := runJob.Decode(jobBinary)
if err != nil {
logutil.BgLogger().Warn("cannot check ingest job", zap.Error(err))
return false
}
// Check whether this add index job is using lightning to do the backfill work.
if runJob.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge {
return false
}
}
return true
}

func doReorgWorkForCreateIndexMultiSchema(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
tbl table.Table, allIndexInfos []*model.IndexInfo) (done bool, ver int64, err error) {
if job.MultiSchemaInfo.Revertible {
Expand Down
Loading