Skip to content

Commit

Permalink
Merge branch 'master' into skip
Browse files Browse the repository at this point in the history
  • Loading branch information
hehechen authored Jan 31, 2023
2 parents e4b2441 + 0d47a5e commit a36fe93
Show file tree
Hide file tree
Showing 31 changed files with 669 additions and 43 deletions.
11 changes: 11 additions & 0 deletions bindinfo/bind_record.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,17 @@ type BindRecord struct {
Bindings []Binding
}

// Copy get the copy of bindRecord
func (br *BindRecord) Copy() *BindRecord {
nbr := &BindRecord{
OriginalSQL: br.OriginalSQL,
Db: br.Db,
}
nbr.Bindings = make([]Binding, len(br.Bindings))
copy(nbr.Bindings, br.Bindings)
return nbr
}

// HasEnabledBinding checks if there are any enabled bindings in bind record.
func (br *BindRecord) HasEnabledBinding() bool {
for _, binding := range br.Bindings {
Expand Down
60 changes: 50 additions & 10 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,32 +343,64 @@ type MaxError struct {
// In TiDB backend, this also includes all possible SQL errors raised from INSERT,
// such as unique key conflict when `on-duplicate` is set to `error`.
// When tolerated, the row causing the error will be skipped, and adds 1 to the counter.
// The default value is zero, which means that such errors are not tolerated.
Type atomic.Int64 `toml:"type" json:"type"`

// Conflict is the maximum number of unique key conflicts in local backend accepted.
// When tolerated, every pair of conflict adds 1 to the counter.
// Those pairs will NOT be deleted from the target. Conflict resolution is performed separately.
// TODO Currently this is hard-coded to infinity.
Conflict atomic.Int64 `toml:"conflict" json:"-"`
// The default value is max int64, which means conflict errors will be recorded as much as possible.
// Sometime the actual number of conflict record logged will be greater than the value configured here,
// because conflict error data are recorded batch by batch.
// If the limit is reached in a single batch, the entire batch of records will be persisted before an error is reported.
Conflict atomic.Int64 `toml:"conflict" json:"conflict"`
}

func (cfg *MaxError) UnmarshalTOML(v interface{}) error {
defaultValMap := map[string]int64{
"syntax": 0,
"charset": math.MaxInt64,
"type": 0,
"conflict": math.MaxInt64,
}
// set default value first
cfg.Syntax.Store(defaultValMap["syntax"])
cfg.Charset.Store(defaultValMap["charset"])
cfg.Type.Store(defaultValMap["type"])
cfg.Conflict.Store(defaultValMap["conflict"])
switch val := v.(type) {
case int64:
// ignore val that is smaller than 0
if val < 0 {
val = 0
if val >= 0 {
// only set type error
cfg.Type.Store(val)
}
cfg.Syntax.Store(0)
cfg.Charset.Store(math.MaxInt64)
cfg.Type.Store(val)
cfg.Conflict.Store(math.MaxInt64)
return nil
case map[string]interface{}:
// TODO support stuff like `max-error = { charset = 1000, type = 1000 }` if proved useful.
// support stuff like `max-error = { charset = 1000, type = 1000 }`.
getVal := func(k string, v interface{}) int64 {
defaultVal, ok := defaultValMap[k]
if !ok {
return 0
}
iVal, ok := v.(int64)
if !ok || iVal < 0 {
return defaultVal
}
return iVal
}
for k, v := range val {
switch k {
case "type":
cfg.Type.Store(getVal(k, v))
case "conflict":
cfg.Conflict.Store(getVal(k, v))
}
}
return nil
default:
return errors.Errorf("invalid max-error '%v', should be an integer or a map of string:int64", v)
}
return errors.Errorf("invalid max-error '%v', should be an integer", v)
}

// DuplicateResolutionAlgorithm is the config type of how to resolve duplicates.
Expand Down Expand Up @@ -805,8 +837,16 @@ func (cfg *Config) LoadFromTOML(data []byte) error {
unusedGlobalKeyStrs[key.String()] = struct{}{}
}

iterateUnusedKeys:
for _, key := range unusedConfigKeys {
keyStr := key.String()
switch keyStr {
// these keys are not counted as decoded by toml decoder, but actually they are decoded,
// because the corresponding unmarshal logic handles these key's decoding in a custom way
case "lightning.max-error.type",
"lightning.max-error.conflict":
continue iterateUnusedKeys
}
if _, found := unusedGlobalKeyStrs[keyStr]; found {
bothUnused = append(bothUnused, keyStr)
} else {
Expand Down
121 changes: 121 additions & 0 deletions br/pkg/lightning/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"flag"
"fmt"
"math"
"net"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -561,6 +562,126 @@ func TestDurationUnmarshal(t *testing.T) {
require.Regexp(t, "time: unknown unit .?x.? in duration .?13x20s.?", err.Error())
}

func TestMaxErrorUnmarshal(t *testing.T) {
type testCase struct {
TOMLStr string
ExpectedValues map[string]int64
ExpectErrStr string
CaseName string
}
for _, tc := range []*testCase{
{
TOMLStr: `max-error = 123`,
ExpectedValues: map[string]int64{
"syntax": 0,
"charset": math.MaxInt64,
"type": 123,
"conflict": math.MaxInt64,
},
CaseName: "Normal_Int",
},
{
TOMLStr: `max-error = -123`,
ExpectedValues: map[string]int64{
"syntax": 0,
"charset": math.MaxInt64,
"type": 0,
"conflict": math.MaxInt64,
},
CaseName: "Abnormal_Negative_Int",
},
{
TOMLStr: `max-error = "abcde"`,
ExpectErrStr: "invalid max-error 'abcde', should be an integer or a map of string:int64",
CaseName: "Abnormal_String",
},
{
TOMLStr: `[max-error]
syntax = 1
charset = 2
type = 3
conflict = 4
`,
ExpectedValues: map[string]int64{
"syntax": 0,
"charset": math.MaxInt64,
"type": 3,
"conflict": 4,
},
CaseName: "Normal_Map_All_Set",
},
{
TOMLStr: `[max-error]
conflict = 1000
`,
ExpectedValues: map[string]int64{
"syntax": 0,
"charset": math.MaxInt64,
"type": 0,
"conflict": 1000,
},
CaseName: "Normal_Map_Partial_Set",
},
{
TOMLStr: `max-error = { conflict = 1000, type = 123 }`,
ExpectedValues: map[string]int64{
"syntax": 0,
"charset": math.MaxInt64,
"type": 123,
"conflict": 1000,
},
CaseName: "Normal_OneLineMap_Partial_Set",
},
{
TOMLStr: `[max-error]
conflict = 1000
not_exist = 123
`,
ExpectedValues: map[string]int64{
"syntax": 0,
"charset": math.MaxInt64,
"type": 0,
"conflict": 1000,
},
CaseName: "Normal_Map_Partial_Set_Invalid_Key",
},
{
TOMLStr: `[max-error]
conflict = 1000
type = -123
`,
ExpectedValues: map[string]int64{
"syntax": 0,
"charset": math.MaxInt64,
"type": 0,
"conflict": 1000,
},
CaseName: "Normal_Map_Partial_Set_Invalid_Value",
},
{
TOMLStr: `[max-error]
conflict = 1000
type = abc
`,
ExpectErrStr: `toml: line 3 (last key "max-error.type"): expected value but found "abc" instead`,
CaseName: "Normal_Map_Partial_Set_Invalid_ValueType",
},
} {
targetLightningCfg := new(config.Lightning)
err := toml.Unmarshal([]byte(tc.TOMLStr), targetLightningCfg)
if len(tc.ExpectErrStr) > 0 {
require.Errorf(t, err, "test case: %s", tc.CaseName)
require.Equalf(t, tc.ExpectErrStr, err.Error(), "test case: %s", tc.CaseName)
} else {
require.NoErrorf(t, err, "test case: %s", tc.CaseName)
require.Equalf(t, tc.ExpectedValues["syntax"], targetLightningCfg.MaxError.Syntax.Load(), "test case: %s", tc.CaseName)
require.Equalf(t, tc.ExpectedValues["charset"], targetLightningCfg.MaxError.Charset.Load(), "test case: %s", tc.CaseName)
require.Equalf(t, tc.ExpectedValues["type"], targetLightningCfg.MaxError.Type.Load(), "test case: %s", tc.CaseName)
require.Equalf(t, tc.ExpectedValues["conflict"], targetLightningCfg.MaxError.Conflict.Load(), "test case: %s", tc.CaseName)
}
}
}

func TestDurationMarshalJSON(t *testing.T) {
duration := config.Duration{}
err := duration.UnmarshalText([]byte("13m20s"))
Expand Down
31 changes: 22 additions & 9 deletions br/pkg/lightning/errormanager/errormanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,8 @@ func (em *ErrorManager) RecordTypeError(
if em.remainingError.Type.Dec() < 0 {
threshold := em.configError.Type.Load()
if threshold > 0 {
encodeErr = errors.Annotatef(encodeErr, "meet errors exceed the max-error.type threshold '%d'",
encodeErr = errors.Annotatef(encodeErr,
"The number of type errors exceeds the threshold configured by `max-error.type`: '%d'",
em.configError.Type.Load())
}
return encodeErr
Expand Down Expand Up @@ -241,25 +242,28 @@ func (em *ErrorManager) RecordDataConflictError(
tableName string,
conflictInfos []DataConflictInfo,
) error {
var gerr error
if len(conflictInfos) == 0 {
return nil
}

if em.remainingError.Conflict.Sub(int64(len(conflictInfos))) < 0 {
threshold := em.configError.Conflict.Load()
return errors.Errorf(" meet errors exceed the max-error.conflict threshold '%d'", threshold)
// Still need to record this batch of conflict records, and then return this error at last.
// Otherwise, if the max-error.conflict is set a very small value, non of the conflict errors will be recorded
gerr = errors.Errorf("The number of conflict errors exceeds the threshold configured by `max-error.conflict`: '%d'", threshold)
}

if em.db == nil {
return nil
return gerr
}

exec := common.SQLWithRetry{
DB: em.db,
Logger: logger,
HideQueryLog: redact.NeedRedact(),
}
return exec.Transact(ctx, "insert data conflict error record", func(c context.Context, txn *sql.Tx) error {
if err := exec.Transact(ctx, "insert data conflict error record", func(c context.Context, txn *sql.Tx) error {
sb := &strings.Builder{}
fmt.Fprintf(sb, insertIntoConflictErrorData, em.schemaEscaped)
var sqlArgs []interface{}
Expand All @@ -279,7 +283,10 @@ func (em *ErrorManager) RecordDataConflictError(
}
_, err := txn.ExecContext(c, sb.String(), sqlArgs...)
return err
})
}); err != nil {
gerr = err
}
return gerr
}

func (em *ErrorManager) RecordIndexConflictError(
Expand All @@ -290,25 +297,28 @@ func (em *ErrorManager) RecordIndexConflictError(
conflictInfos []DataConflictInfo,
rawHandles, rawRows [][]byte,
) error {
var gerr error
if len(conflictInfos) == 0 {
return nil
}

if em.remainingError.Conflict.Sub(int64(len(conflictInfos))) < 0 {
threshold := em.configError.Conflict.Load()
return errors.Errorf(" meet errors exceed the max-error.conflict threshold %d", threshold)
// Still need to record this batch of conflict records, and then return this error at last.
// Otherwise, if the max-error.conflict is set a very small value, non of the conflict errors will be recorded
gerr = errors.Errorf("The number of conflict errors exceeds the threshold configured by `max-error.conflict`: '%d'", threshold)
}

if em.db == nil {
return nil
return gerr
}

exec := common.SQLWithRetry{
DB: em.db,
Logger: logger,
HideQueryLog: redact.NeedRedact(),
}
return exec.Transact(ctx, "insert index conflict error record", func(c context.Context, txn *sql.Tx) error {
if err := exec.Transact(ctx, "insert index conflict error record", func(c context.Context, txn *sql.Tx) error {
sb := &strings.Builder{}
fmt.Fprintf(sb, insertIntoConflictErrorIndex, em.schemaEscaped)
var sqlArgs []interface{}
Expand All @@ -331,7 +341,10 @@ func (em *ErrorManager) RecordIndexConflictError(
}
_, err := txn.ExecContext(c, sb.String(), sqlArgs...)
return err
})
}); err != nil {
gerr = err
}
return gerr
}

// ResolveAllConflictKeys query all conflicting rows (handle and their
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE testtbl (
id INTEGER PRIMARY KEY,
val1 VARCHAR(40) NOT NULL,
INDEX `idx_val1` (`val1`)
);
16 changes: 16 additions & 0 deletions br/tests/lightning_config_max_error/data/mytest.testtbl.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
id,val1
1,"aaa01"
2,"aaa01"
3,"aaa02"
4,"aaa02"
5,"aaa05"
6,"aaa06"
7,"aaa07"
8,"aaa08"
9,"aaa09"
10,"aaa10"
1,"bbb01"
2,"bbb02"
3,"bbb03"
4,"bbb04"
5,"bbb05"
8 changes: 8 additions & 0 deletions br/tests/lightning_config_max_error/err_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[lightning.max-error]
conflict = 4

[mydumper.csv]
header = true

[tikv-importer]
duplicate-resolution = 'remove'
8 changes: 8 additions & 0 deletions br/tests/lightning_config_max_error/normal_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[lightning.max-error]
conflict = 20

[mydumper.csv]
header = true

[tikv-importer]
duplicate-resolution = 'remove'
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[lightning]
max-error = 0 # this actually sets the type error

[mydumper.csv]
header = true

[tikv-importer]
duplicate-resolution = 'remove'
Loading

0 comments on commit a36fe93

Please sign in to comment.