Skip to content

Commit

Permalink
Merge branch 'master' into topsql-pubsub-datasink
Browse files Browse the repository at this point in the history
  • Loading branch information
crazycs520 authored Dec 20, 2021
2 parents 9c9ea90 + 87ab28e commit 7984f19
Show file tree
Hide file tree
Showing 82 changed files with 1,429 additions and 1,146 deletions.
1 change: 1 addition & 0 deletions .codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,5 @@ ignore:
- "executor/seqtest/.*"
- "metrics/.*"
- "expression/generator/.*"
- "br/pkg/mock/.*"

6 changes: 3 additions & 3 deletions bindinfo/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (h *BindHandle) Update(fullLoad bool) (err error) {
}

exec := h.sctx.Context.(sqlexec.RestrictedSQLExecutor)
stmt, err := exec.ParseWithParams(context.TODO(), `SELECT original_sql, bind_sql, default_db, status, create_time, update_time, charset, collation, source
stmt, err := exec.ParseWithParamsInternal(context.TODO(), `SELECT original_sql, bind_sql, default_db, status, create_time, update_time, charset, collation, source
FROM mysql.bind_info WHERE update_time > %? ORDER BY update_time, create_time`, updateTime)
if err != nil {
return err
Expand Down Expand Up @@ -697,7 +697,7 @@ func (h *BindHandle) extractCaptureFilterFromStorage() (filter *captureFilter) {
tables: make(map[stmtctx.TableEntry]struct{}),
}
exec := h.sctx.Context.(sqlexec.RestrictedSQLExecutor)
stmt, err := exec.ParseWithParams(context.TODO(), `SELECT filter_type, filter_value FROM mysql.capture_plan_baselines_blacklist order by filter_type`)
stmt, err := exec.ParseWithParamsInternal(context.TODO(), `SELECT filter_type, filter_value FROM mysql.capture_plan_baselines_blacklist order by filter_type`)
if err != nil {
logutil.BgLogger().Warn("[sql-bind] failed to parse query for mysql.capture_plan_baselines_blacklist load", zap.Error(err))
return
Expand Down Expand Up @@ -923,7 +923,7 @@ func (h *BindHandle) SaveEvolveTasksToStore() {
}

func getEvolveParameters(ctx sessionctx.Context) (time.Duration, time.Time, time.Time, error) {
stmt, err := ctx.(sqlexec.RestrictedSQLExecutor).ParseWithParams(
stmt, err := ctx.(sqlexec.RestrictedSQLExecutor).ParseWithParamsInternal(
context.TODO(),
"SELECT variable_name, variable_value FROM mysql.global_variables WHERE variable_name IN (%?, %?, %?)",
variable.TiDBEvolvePlanTaskMaxTime,
Expand Down
10 changes: 8 additions & 2 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ const (
gRPCBackOffMaxDelay = 10 * time.Minute

// See: https://github.com/tikv/tikv/blob/e030a0aae9622f3774df89c62f21b2171a72a69e/etc/config-template.toml#L360
regionMaxKeyCount = 1_440_000
// lower the max-key-count to avoid tikv trigger region auto split
regionMaxKeyCount = 1_280_000
defaultRegionSplitSize = 96 * units.MiB

propRangeIndex = "tikv.range_index"
Expand Down Expand Up @@ -782,7 +783,12 @@ func (local *local) WriteToTiKV(
size := int64(0)
totalCount := int64(0)
firstLoop := true
regionMaxSize := regionSplitSize * 4 / 3
// if region-split-size <= 96MiB, we bump the threshold a bit to avoid too many retry split
// because the range-properties is not 100% accurate
regionMaxSize := regionSplitSize
if regionSplitSize <= defaultRegionSplitSize {
regionMaxSize = regionSplitSize * 4 / 3
}

for iter.First(); iter.Valid(); iter.Next() {
size += int64(len(iter.Key()) + len(iter.Value()))
Expand Down
7 changes: 6 additions & 1 deletion br/pkg/lightning/common/storage_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,18 @@ import (
"syscall"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"golang.org/x/sys/unix"
)

// GetStorageSize gets storage's capacity and available size
func GetStorageSize(dir string) (size StorageSize, err error) {
var stat unix.Statfs_t
failpoint.Inject("GetStorageSize", func(val failpoint.Value) {
injectedSize := val.(int)
failpoint.Return(StorageSize{Capacity: uint64(injectedSize), Available: uint64(injectedSize)}, nil)
})

var stat unix.Statfs_t
err = unix.Statfs(dir, &stat)
if err != nil {
return size, errors.Annotatef(err, "cannot get disk capacity at %s", dir)
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/lightning/common/storage_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"unsafe"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
)

var (
Expand All @@ -33,6 +34,10 @@ var (

// GetStorageSize gets storage's capacity and available size
func GetStorageSize(dir string) (size StorageSize, err error) {
failpoint.Inject("GetStorageSize", func(val failpoint.Value) {
injectedSize := val.(int)
failpoint.Return(StorageSize{Capacity: uint64(injectedSize), Available: uint64(injectedSize)}, nil)
})
r, _, e := getDiskFreeSpaceExW.Call(
uintptr(unsafe.Pointer(syscall.StringToUTF16Ptr(dir))),
uintptr(unsafe.Pointer(&size.Available)),
Expand Down
36 changes: 17 additions & 19 deletions br/pkg/lightning/restore/check_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,33 +464,31 @@ func (rc *Controller) localResource(sourceSize int64) error {
if err != nil {
return errors.Trace(err)
}
localAvailable := storageSize.Available
localAvailable := int64(storageSize.Available)

var message string
var passed bool
switch {
case localAvailable > uint64(sourceSize):
case localAvailable > sourceSize:
message = fmt.Sprintf("local disk resources are rich, estimate sorted data size %s, local available is %s",
units.BytesSize(float64(sourceSize)), units.BytesSize(float64(localAvailable)))
passed = true
case int64(rc.cfg.TikvImporter.DiskQuota) > localAvailable:
message = fmt.Sprintf("local disk space may not enough to finish import, estimate sorted data size is %s,"+
" but local available is %s, please set `tikv-importer.disk-quota` to a smaller value than %s"+
" or change `mydumper.sorted-kv-dir` to another disk with enough space to finish imports",
units.BytesSize(float64(sourceSize)),
units.BytesSize(float64(localAvailable)), units.BytesSize(float64(localAvailable)))
passed = false
log.L().Error(message)
default:
if int64(rc.cfg.TikvImporter.DiskQuota) > int64(localAvailable) {
message = fmt.Sprintf("local disk space may not enough to finish import"+
"estimate sorted data size is %s, but local available is %s,"+
"you need a smaller number for tikv-importer.disk-quota (%s) to finish imports",
units.BytesSize(float64(sourceSize)),
units.BytesSize(float64(localAvailable)), units.BytesSize(float64(rc.cfg.TikvImporter.DiskQuota)))
passed = false
log.L().Error(message)
} else {
message = fmt.Sprintf("local disk space may not enough to finish import, "+
"estimate sorted data size is %s, but local available is %s,"+
"we will use disk-quota (size: %s) to finish imports, which may slow down import",
units.BytesSize(float64(sourceSize)),
units.BytesSize(float64(localAvailable)), units.BytesSize(float64(rc.cfg.TikvImporter.DiskQuota)))
passed = true
log.L().Warn(message)
}
message = fmt.Sprintf("local disk space may not enough to finish import, "+
"estimate sorted data size is %s, but local available is %s,"+
"we will use disk-quota (size: %s) to finish imports, which may slow down import",
units.BytesSize(float64(sourceSize)),
units.BytesSize(float64(localAvailable)), units.BytesSize(float64(rc.cfg.TikvImporter.DiskQuota)))
passed = true
log.L().Warn(message)
}
rc.checkTemplate.Collect(Critical, passed, message)
return nil
Expand Down
50 changes: 50 additions & 0 deletions br/pkg/lightning/restore/check_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"path/filepath"

. "github.com/pingcap/check"
"github.com/pingcap/failpoint"

"github.com/pingcap/tidb/br/pkg/lightning/checkpoints"
"github.com/pingcap/tidb/br/pkg/lightning/config"
Expand Down Expand Up @@ -401,5 +402,54 @@ func (s *checkInfoSuite) TestCheckCSVHeader(c *C) {
c.Assert(rc.checkTemplate.FailedCount(ca.level), Equals, 1)
}
}
}

func (s *checkInfoSuite) TestLocalResource(c *C) {
dir := c.MkDir()
mockStore, err := storage.NewLocalStorage(dir)
c.Assert(err, IsNil)

err = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/common/GetStorageSize", "return(2048)")
c.Assert(err, IsNil)
defer func() {
_ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/common/GetStorageSize")
}()

cfg := config.NewConfig()
cfg.Mydumper.SourceDir = dir
cfg.TikvImporter.SortedKVDir = dir
cfg.TikvImporter.Backend = "local"
rc := &Controller{
cfg: cfg,
store: mockStore,
ioWorkers: worker.NewPool(context.Background(), 1, "io"),
}

// 1. source-size is smaller than disk-size, won't trigger error information
rc.checkTemplate = NewSimpleTemplate()
err = rc.localResource(1000)
c.Assert(err, IsNil)
tmpl := rc.checkTemplate.(*SimpleTemplate)
c.Assert(tmpl.warnFailedCount, Equals, 1)
c.Assert(tmpl.criticalFailedCount, Equals, 0)
c.Assert(tmpl.normalMsgs[1], Matches, "local disk resources are rich, estimate sorted data size 1000B, local available is 2KiB")

// 2. source-size is bigger than disk-size, with default disk-quota will trigger a critical error
rc.checkTemplate = NewSimpleTemplate()
err = rc.localResource(4096)
c.Assert(err, IsNil)
tmpl = rc.checkTemplate.(*SimpleTemplate)
c.Assert(tmpl.warnFailedCount, Equals, 1)
c.Assert(tmpl.criticalFailedCount, Equals, 1)
c.Assert(tmpl.criticalMsgs[0], Matches, "local disk space may not enough to finish import, estimate sorted data size is 4KiB, but local available is 2KiB, please set `tikv-importer.disk-quota` to a smaller value than 2KiB or change `mydumper.sorted-kv-dir` to another disk with enough space to finish imports")

// 3. source-size is bigger than disk-size, with a vaild disk-quota will trigger a warning
rc.checkTemplate = NewSimpleTemplate()
rc.cfg.TikvImporter.DiskQuota = config.ByteSize(1024)
err = rc.localResource(4096)
c.Assert(err, IsNil)
tmpl = rc.checkTemplate.(*SimpleTemplate)
c.Assert(tmpl.warnFailedCount, Equals, 1)
c.Assert(tmpl.criticalFailedCount, Equals, 0)
c.Assert(tmpl.normalMsgs[1], Matches, "local disk space may not enough to finish import, estimate sorted data size is 4KiB, but local available is 2KiB,we will use disk-quota \\(size: 1KiB\\) to finish imports, which may slow down import")
}
21 changes: 11 additions & 10 deletions br/pkg/lightning/restore/check_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ type SimpleTemplate struct {
count int
warnFailedCount int
criticalFailedCount int
failedMsg []string
normalMsgs []string // only used in unit test now
criticalMsgs []string
t table.Writer
}

Expand All @@ -65,16 +66,12 @@ func NewSimpleTemplate() Template {
{Name: "Passed", WidthMax: 6},
})
return &SimpleTemplate{
0,
0,
0,
make([]string, 0),
t,
t: t,
}
}

func (c *SimpleTemplate) FailedMsg() string {
return strings.Join(c.failedMsg, ";\n")
return strings.Join(c.criticalMsgs, ";\n")
}

func (c *SimpleTemplate) Collect(t CheckType, passed bool, msg string) {
Expand All @@ -87,7 +84,11 @@ func (c *SimpleTemplate) Collect(t CheckType, passed bool, msg string) {
c.warnFailedCount++
}
}
c.failedMsg = append(c.failedMsg, msg)
if !passed && t == Critical {
c.criticalMsgs = append(c.criticalMsgs, msg)
} else {
c.normalMsgs = append(c.normalMsgs, msg)
}
c.t.AppendRow(table.Row{c.count, msg, t, passed})
c.t.AppendSeparator()
}
Expand All @@ -108,7 +109,7 @@ func (c *SimpleTemplate) FailedCount(t CheckType) int {

func (c *SimpleTemplate) Output() string {
c.t.SetAllowedRowLength(170)
c.t.SetRowPainter(table.RowPainter(func(row table.Row) text.Colors {
c.t.SetRowPainter(func(row table.Row) text.Colors {
if passed, ok := row[3].(bool); ok {
if !passed {
if typ, ok := row[2].(CheckType); ok {
Expand All @@ -122,7 +123,7 @@ func (c *SimpleTemplate) Output() string {
}
}
return nil
}))
})
res := c.t.Render()
summary := "\n"
if c.criticalFailedCount > 0 {
Expand Down
3 changes: 1 addition & 2 deletions br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1926,8 +1926,7 @@ func (rc *Controller) preCheckRequirements(ctx context.Context) error {
if !taskExist && rc.taskMgr != nil {
rc.taskMgr.CleanupTask(ctx)
}
return errors.Errorf("tidb-lightning check failed."+
" Please fix the failed check(s):\n %s", rc.checkTemplate.FailedMsg())
return errors.Errorf("tidb-lightning pre-check failed: %s", rc.checkTemplate.FailedMsg())
}
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/restore/table_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -998,8 +998,8 @@ func estimateCompactionThreshold(cp *checkpoints.TableCheckpoint, factor int64)
threshold := totalRawFileSize / 512
threshold = utils.NextPowerOfTwo(threshold)
if threshold < compactionLowerThreshold {
// disable compaction if threshold is smaller than lower bound
threshold = 0
// too may small SST files will cause inaccuracy of region range estimation,
threshold = compactionLowerThreshold
} else if threshold > compactionUpperThreshold {
threshold = compactionUpperThreshold
}
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/storage/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (r *testStorageSuite) TestCreateStorage(c *C) {
c.Assert(s3, NotNil)
c.Assert(s3.Bucket, Equals, "bucket2")
c.Assert(s3.Prefix, Equals, "prefix")
c.Assert(s3.Endpoint, Equals, "https://s3.example.com/")
c.Assert(s3.Endpoint, Equals, "https://s3.example.com")
c.Assert(s3.ForcePathStyle, IsFalse)

// nolint:lll
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (options *S3BackendOptions) Apply(s3 *backuppb.S3) error {
return errors.Annotate(berrors.ErrStorageInvalidConfig, "secret_access_key not found")
}

s3.Endpoint = options.Endpoint
s3.Endpoint = strings.TrimSuffix(options.Endpoint, "/")
s3.Region = options.Region
// StorageClass, SSE and ACL are acceptable to be empty
s3.StorageClass = options.StorageClass
Expand Down Expand Up @@ -189,6 +189,7 @@ func (options *S3BackendOptions) parseFromFlags(flags *pflag.FlagSet) error {
if err != nil {
return errors.Trace(err)
}
options.Endpoint = strings.TrimSuffix(options.Endpoint, "/")
options.Region, err = flags.GetString(s3RegionOption)
if err != nil {
return errors.Trace(err)
Expand Down
9 changes: 5 additions & 4 deletions cmd/explaintest/r/new_character_set_builtin.result
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
set @@sql_mode = '';
drop table if exists t;
create table t (a char(20) charset utf8mb4, b char(20) charset gbk, c binary(20));
insert into t values ('一二三', '一二三', '一二三');
Expand Down Expand Up @@ -244,17 +245,17 @@ insert into t values ('65'), ('123456'), ('123456789');
select char(a using gbk), char(a using utf8), char(a) from t;
char(a using gbk) char(a using utf8) char(a)
A A A
釦 �@ �@
NULL [� [�
釦  �@
[ [ [�
select char(12345678 using gbk);
char(12345678 using gbk)
糰N
set @@tidb_enable_vectorized_expression = true;
select char(a using gbk), char(a using utf8), char(a) from t;
char(a using gbk) char(a using utf8) char(a)
A A A
釦 �@ �@
NULL [� [�
釦  �@
[ [ [�
select char(12345678 using gbk);
char(12345678 using gbk)
糰N
Expand Down
1 change: 1 addition & 0 deletions cmd/explaintest/t/new_character_set_builtin.test
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
set @@sql_mode = '';
-- test for builtin function hex(), length(), ascii(), octet_length()
drop table if exists t;
create table t (a char(20) charset utf8mb4, b char(20) charset gbk, c binary(20));
Expand Down
4 changes: 2 additions & 2 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -996,7 +996,7 @@ func (w *worker) doModifyColumnTypeWithData(
}
defer w.sessPool.put(ctx)

stmt, err := ctx.(sqlexec.RestrictedSQLExecutor).ParseWithParams(context.Background(), valStr)
stmt, err := ctx.(sqlexec.RestrictedSQLExecutor).ParseWithParamsInternal(context.Background(), valStr)
if err != nil {
job.State = model.JobStateCancelled
failpoint.Return(ver, err)
Expand Down Expand Up @@ -1703,7 +1703,7 @@ func checkForNullValue(ctx context.Context, sctx sessionctx.Context, isDataTrunc
}
}
buf.WriteString(" limit 1")
stmt, err := sctx.(sqlexec.RestrictedSQLExecutor).ParseWithParams(ctx, buf.String(), paramsList...)
stmt, err := sctx.(sqlexec.RestrictedSQLExecutor).ParseWithParamsInternal(ctx, buf.String(), paramsList...)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 2 additions & 0 deletions ddl/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,4 +310,6 @@ var (
errDependentByFunctionalIndex = dbterror.ClassDDL.NewStd(mysql.ErrDependentByFunctionalIndex)
// errFunctionalIndexOnBlob when the expression of expression index returns blob or text.
errFunctionalIndexOnBlob = dbterror.ClassDDL.NewStd(mysql.ErrFunctionalIndexOnBlob)
// ErrIncompatibleTiFlashAndPlacement when placement and tiflash replica options are set at the same time
ErrIncompatibleTiFlashAndPlacement = dbterror.ClassDDL.NewStdErr(mysql.ErrUnsupportedDDLOperation, parser_mysql.Message("Placement and tiflash replica options cannot be set at the same time", nil))
)
4 changes: 2 additions & 2 deletions ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1551,7 +1551,7 @@ func checkExchangePartitionRecordValidation(w *worker, pt *model.TableInfo, inde
}
defer w.sessPool.put(ctx)

stmt, err := ctx.(sqlexec.RestrictedSQLExecutor).ParseWithParams(w.ddlJobCtx, sql, paramList...)
stmt, err := ctx.(sqlexec.RestrictedSQLExecutor).ParseWithParamsInternal(w.ddlJobCtx, sql, paramList...)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -1569,7 +1569,7 @@ func checkExchangePartitionRecordValidation(w *worker, pt *model.TableInfo, inde
func buildCheckSQLForRangeExprPartition(pi *model.PartitionInfo, index int, schemaName, tableName model.CIStr) (string, []interface{}) {
var buf strings.Builder
paramList := make([]interface{}, 0, 4)
// Since the pi.Expr string may contain the identifier, which couldn't be escaped in our ParseWithParams(...)
// Since the pi.Expr string may contain the identifier, which couldn't be escaped in our ParseWithParamsInternal(...)
// So we write it to the origin sql string here.
if index == 0 {
buf.WriteString("select 1 from %n.%n where ")
Expand Down
Loading

0 comments on commit 7984f19

Please sign in to comment.