diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index ad49ec5a6c2b9..6e86849858d4e 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -68,6 +68,7 @@ import ( "go.uber.org/atomic" "go.uber.org/multierr" "go.uber.org/zap" + "golang.org/x/exp/slices" "golang.org/x/sync/errgroup" "golang.org/x/time/rate" "google.golang.org/grpc" @@ -1476,13 +1477,18 @@ func (local *local) ResolveDuplicateRows(ctx context.Context, tbl table.Table, t return err } + tableIDs := physicalTableIDs(tbl.Meta()) + keyInTable := func(key []byte) bool { + return slices.Contains(tableIDs, tablecodec.DecodeTableID(key)) + } + errLimiter := rate.NewLimiter(1, 1) pool := utils.NewWorkerPool(uint(local.dupeConcurrency), "resolve duplicate rows") err = local.errorMgr.ResolveAllConflictKeys( ctx, tableName, pool, func(ctx context.Context, handleRows [][2][]byte) error { for { - err := local.deleteDuplicateRows(ctx, logger, handleRows, decoder) + err := local.deleteDuplicateRows(ctx, logger, handleRows, decoder, keyInTable) if err == nil { return nil } @@ -1505,7 +1511,13 @@ func (local *local) ResolveDuplicateRows(ctx context.Context, tbl table.Table, t return errors.Trace(err) } -func (local *local) deleteDuplicateRows(ctx context.Context, logger *log.Task, handleRows [][2][]byte, decoder *kv.TableKVDecoder) (err error) { +func (local *local) deleteDuplicateRows( + ctx context.Context, + logger *log.Task, + handleRows [][2][]byte, + decoder *kv.TableKVDecoder, + keyInTable func(key []byte) bool, +) (err error) { // Starts a Delete transaction. txn, err := local.tikvCli.Begin() if err != nil { @@ -1530,6 +1542,12 @@ func (local *local) deleteDuplicateRows(ctx context.Context, logger *log.Task, h // (if the number of duplicates is small this should fit entirely in memory) // (Txn's MemBuf's bufferSizeLimit is currently infinity) for _, handleRow := range handleRows { + // Skip the row key if it's not in the table. + // This can happen if the table has been recreated or truncated, + // and the duplicate key is from the old table. + if !keyInTable(handleRow[0]) { + continue + } logger.Debug("[resolve-dupe] found row to resolve", logutil.Key("handle", handleRow[0]), logutil.Key("row", handleRow[1])) diff --git a/br/pkg/lightning/restore/table_restore.go b/br/pkg/lightning/restore/table_restore.go index 74ada7f8b1c6c..93ae41a5d7697 100644 --- a/br/pkg/lightning/restore/table_restore.go +++ b/br/pkg/lightning/restore/table_restore.go @@ -775,12 +775,14 @@ func (tr *TableRestore) postProcess( if e != nil { tr.logger.Error("collect remote duplicate keys failed", log.ShortError(e)) return false, e - } else { - hasDupe = hasDupe || hasRemoteDupe } - if err = rc.backend.ResolveDuplicateRows(ctx, tr.encTable, tr.tableName, rc.cfg.TikvImporter.DuplicateResolution); err != nil { - tr.logger.Error("resolve remote duplicate keys failed", log.ShortError(err)) - return false, err + hasDupe = hasDupe || hasRemoteDupe + + if hasDupe { + if err = rc.backend.ResolveDuplicateRows(ctx, tr.encTable, tr.tableName, rc.cfg.TikvImporter.DuplicateResolution); err != nil { + tr.logger.Error("resolve remote duplicate keys failed", log.ShortError(err)) + return false, err + } } } diff --git a/br/tests/lightning_disable_scheduler_by_key_range/run.sh b/br/tests/lightning_disable_scheduler_by_key_range/run.sh new file mode 100644 index 0000000000000..2a88f0e0cac8e --- /dev/null +++ b/br/tests/lightning_disable_scheduler_by_key_range/run.sh @@ -0,0 +1,65 @@ +#!/bin/bash +# +# Copyright 2022 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eux + +export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/lightning/EnableTestAPI=return" +export GO_FAILPOINTS="${GO_FAILPOINTS};github.com/pingcap/tidb/br/pkg/lightning/backend/local/ReadyForImportEngine=sleep(10000)" + +run_lightning --backend='local' & +shpid="$!" +pid= + +ensure_lightning_is_started() { + for _ in {0..60}; do + pid=$(pstree -p "$shpid" | grep -Eo "tidb-lightning\.\([0-9]*\)" | grep -Eo "[0-9]*") || true + [ -n "$pid" ] && break + sleep 1 + done + if [ -z "$pid" ]; then + echo "lightning doesn't start successfully, please check the log" >&2 + exit 1 + fi + echo "lightning is started, pid is $pid" +} + +ready_for_import_engine() { + for _ in {0..60}; do + grep -Fq "start import engine" "$TEST_DIR"/lightning.log && return + sleep 1 + done + echo "lightning doesn't start import engine, please check the log" >&2 + exit 1 +} + +ensure_lightning_is_started +ready_for_import_engine + +run_curl "https://${PD_ADDR}/pd/api/v1/config/cluster-version" + +length=$(run_curl "https://${PD_ADDR}/pd/api/v1/config/region-label/rules" | jq '[ .[] | select(.rule_type == "key-range" and .labels[0].key == "schedule") ] | length') +if [ "$length" != "1" ]; then + echo "region-label key-range schedule rules should be 1, but got $length" >&2 + exit 1 +fi + +wait "$shpid" + +length=$(run_curl "https://${PD_ADDR}/pd/api/v1/config/region-label/rules" | jq '[ .[] | select(.rule_type == "key-range" and .labels[0].key == "schedule") ] | length') +if [ -n "$length" ] && [ "$length" -ne 0 ]; then + echo "region-label key-range schedule rules should be 0, but got $length" >&2 + exit 1 +fi diff --git a/br/tests/lightning_issue_40657/config.toml b/br/tests/lightning_issue_40657/config.toml new file mode 100644 index 0000000000000..74561bc05f026 --- /dev/null +++ b/br/tests/lightning_issue_40657/config.toml @@ -0,0 +1,6 @@ +[tikv-importer] +backend = "local" +duplicate-resolution = "remove" + +[mydumper.csv] +header = true diff --git a/br/tests/lightning_issue_40657/data1/test.t-schema.sql b/br/tests/lightning_issue_40657/data1/test.t-schema.sql new file mode 100644 index 0000000000000..ef7136b531abc --- /dev/null +++ b/br/tests/lightning_issue_40657/data1/test.t-schema.sql @@ -0,0 +1,6 @@ +CREATE TABLE `t` ( + `id` int(11) NOT NULL, + `name` varchar(255) DEFAULT NULL, + PRIMARY KEY (`id`) /*T![clustered_index] CLUSTERED */, + UNIQUE KEY `uni_name` (`name`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; diff --git a/br/tests/lightning_issue_40657/data1/test.t.0.csv b/br/tests/lightning_issue_40657/data1/test.t.0.csv new file mode 100644 index 0000000000000..2987cee08b206 --- /dev/null +++ b/br/tests/lightning_issue_40657/data1/test.t.0.csv @@ -0,0 +1,6 @@ +id,name +1,"aaa01" +2,"aaa02" +3,"aaa03" +4,"aaa04" +5,"aaa04" diff --git a/br/tests/lightning_issue_40657/data2/test.t-schema.sql b/br/tests/lightning_issue_40657/data2/test.t-schema.sql new file mode 100644 index 0000000000000..ef7136b531abc --- /dev/null +++ b/br/tests/lightning_issue_40657/data2/test.t-schema.sql @@ -0,0 +1,6 @@ +CREATE TABLE `t` ( + `id` int(11) NOT NULL, + `name` varchar(255) DEFAULT NULL, + PRIMARY KEY (`id`) /*T![clustered_index] CLUSTERED */, + UNIQUE KEY `uni_name` (`name`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; diff --git a/br/tests/lightning_issue_40657/data2/test.t.0.csv b/br/tests/lightning_issue_40657/data2/test.t.0.csv new file mode 100644 index 0000000000000..f64aebd0630d9 --- /dev/null +++ b/br/tests/lightning_issue_40657/data2/test.t.0.csv @@ -0,0 +1,6 @@ +id,name +1,"aaa01" +2,"aaa02" +3,"aaa03" +4,"aaa04" +5,"aaa05" diff --git a/br/tests/lightning_issue_40657/run.sh b/br/tests/lightning_issue_40657/run.sh new file mode 100644 index 0000000000000..a20600b79d14b --- /dev/null +++ b/br/tests/lightning_issue_40657/run.sh @@ -0,0 +1,32 @@ +#!/bin/bash +# +# Copyright 2023 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eux + +check_cluster_version 5 2 0 'duplicate detection' || exit 0 + +run_lightning -d "tests/$TEST_NAME/data1" +run_sql 'admin check table test.t' +run_sql 'select count(*) from test.t' +check_contains 'count(*): 3' +run_sql 'select count(*) from lightning_task_info.conflict_error_v1' +check_contains 'count(*): 2' + +run_sql 'truncate table test.t' +run_lightning -d "tests/$TEST_NAME/data2" +run_sql 'admin check table test.t' +run_sql 'select count(*) from test.t' +check_contains 'count(*): 5' diff --git a/br/tests/lightning_reload_cert/run.sh b/br/tests/lightning_reload_cert/run.sh index e06ef8d7fbf51..be0c5ff40421e 100644 --- a/br/tests/lightning_reload_cert/run.sh +++ b/br/tests/lightning_reload_cert/run.sh @@ -29,7 +29,7 @@ shpid="$!" sleep 15 ok=0 for _ in {0..60}; do - if grep -Fq "connection closed before server preface received" "$TEST_DIR"/lightning.log; then + if grep -Fq "connection error" "$TEST_DIR"/lightning.log; then ok=1 break fi