Skip to content

Commit

Permalink
Merge branch 'master' of github.com:pingcap/tidb into load-data-remote
Browse files Browse the repository at this point in the history
Signed-off-by: lance6716 <lance6716@gmail.com>
  • Loading branch information
lance6716 committed Jan 18, 2023
2 parents 80dcd60 + 1f44d20 commit 489df74
Show file tree
Hide file tree
Showing 105 changed files with 7,301 additions and 5,399 deletions.
4 changes: 2 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -3582,8 +3582,8 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:RI6bs9TDIIJ96N0lR5uZoGO8QNot4qS/1l+Mobx0InM=",
version = "v2.0.5-0.20230110071533-f313ddf58d73",
sum = "h1:B2FNmPDaGirXpIOgQbqxiukIkT8eOT4tKEahqYE2ers=",
version = "v2.0.5-0.20230112062023-fe5b35c5f5dc",
)
go_repository(
name = "com_github_tikv_pd_client",
Expand Down
11 changes: 11 additions & 0 deletions bindinfo/session_handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,3 +521,14 @@ func TestPreparedStmt(t *testing.T) {
require.Len(t, tk.Session().GetSessionVars().StmtCtx.IndexNames, 1)
require.Equal(t, "t:idx_c", tk.Session().GetSessionVars().StmtCtx.IndexNames[0])
}

func TestSetVarBinding(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t1 (a int, b varchar(20))")
tk.MustExec("insert into t1 values (1, '111111111111111')")
tk.MustExec("insert into t1 values (2, '222222222222222')")
tk.MustExec("create binding for select group_concat(b) from test.t1 using select /*+ SET_VAR(group_concat_max_len = 4) */ group_concat(b) from test.t1 ;")
tk.MustQuery("select group_concat(b) from test.t1").Check(testkit.Rows("1111"))
}
6 changes: 5 additions & 1 deletion br/pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ type tidbSession struct {

// GetDomain implements glue.Glue.
func (Glue) GetDomain(store kv.Storage) (*domain.Domain, error) {
initStatsSe, err := session.CreateSession(store)
if err != nil {
return nil, errors.Trace(err)
}
se, err := session.CreateSession(store)
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -74,7 +78,7 @@ func (Glue) GetDomain(store kv.Storage) (*domain.Domain, error) {
return nil, err
}
// create stats handler for backup and restore.
err = dom.UpdateTableStatsLoop(se)
err = dom.UpdateTableStatsLoop(se, initStatsSe)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/streamhelper/advancer_cliext.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (t AdvancerExt) Begin(ctx context.Context, ch chan<- TaskEvent) error {
return nil
}

func (t AdvancerExt) getGlobalCheckpointForTask(ctx context.Context, taskName string) (uint64, error) {
func (t AdvancerExt) GetGlobalCheckpointForTask(ctx context.Context, taskName string) (uint64, error) {
key := GlobalCheckpointOf(taskName)
resp, err := t.KV.Get(ctx, key)
if err != nil {
Expand All @@ -211,7 +211,7 @@ func (t AdvancerExt) getGlobalCheckpointForTask(ctx context.Context, taskName st
func (t AdvancerExt) UploadV3GlobalCheckpointForTask(ctx context.Context, taskName string, checkpoint uint64) error {
key := GlobalCheckpointOf(taskName)
value := string(encodeUint64(checkpoint))
oldValue, err := t.getGlobalCheckpointForTask(ctx, taskName)
oldValue, err := t.GetGlobalCheckpointForTask(ctx, taskName)
if err != nil {
return err
}
Expand Down
34 changes: 2 additions & 32 deletions br/pkg/streamhelper/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ func (c *MetaDataClient) DeleteTask(ctx context.Context, taskName string) error
clientv3.OpDelete(CheckPointsOf(taskName), clientv3.WithPrefix()),
clientv3.OpDelete(Pause(taskName)),
clientv3.OpDelete(LastErrorPrefixOf(taskName), clientv3.WithPrefix()),
clientv3.OpDelete(GlobalCheckpointOf(taskName)),
clientv3.OpDelete(StorageCheckpointOf(taskName), clientv3.WithPrefix()),
).
Commit()
if err != nil {
Expand Down Expand Up @@ -372,28 +374,6 @@ func (t *Task) GetStorageCheckpoint(ctx context.Context) (uint64, error) {
return storageCheckpoint, nil
}

// MinNextBackupTS query the all next backup ts of a store, returning the minimal next backup ts of the store.
func (t *Task) MinNextBackupTS(ctx context.Context, store uint64) (uint64, error) {
key := CheckPointOf(t.Info.Name, store)
resp, err := t.cli.KV.Get(ctx, key)
if err != nil {
return 0, errors.Annotatef(err, "failed to get checkpoints of %s", t.Info.Name)
}
if resp.Count != 1 {
return 0, nil
}
kv := resp.Kvs[0]
if len(kv.Value) != 8 {
return 0, errors.Annotatef(berrors.ErrPiTRMalformedMetadata,
"the next backup ts of store %d isn't 64bits (it is %d bytes, value = %s)",
store,
len(kv.Value),
redact.Key(kv.Value))
}
nextBackupTS := binary.BigEndian.Uint64(kv.Value)
return nextBackupTS, nil
}

// GetGlobalCheckPointTS gets the global checkpoint timestamp according to log task.
func (t *Task) GetGlobalCheckPointTS(ctx context.Context) (uint64, error) {
checkPointMap, err := t.NextBackupTSList(ctx)
Expand Down Expand Up @@ -422,16 +402,6 @@ func (t *Task) GetGlobalCheckPointTS(ctx context.Context) (uint64, error) {
return mathutil.Max(checkpoint, ts), nil
}

// Step forwards the progress (next_backup_ts) of some region.
// The task should be done by TiKV. This function should only be used for test cases.
func (t *Task) Step(ctx context.Context, store uint64, ts uint64) error {
_, err := t.cli.KV.Put(ctx, CheckPointOf(t.Info.Name, store), string(encodeUint64(ts)))
if err != nil {
return errors.Annotatef(err, "failed forward the progress of %s to %d", t.Info.Name, ts)
}
return nil
}

func (t *Task) UploadGlobalCheckpoint(ctx context.Context, ts uint64) error {
_, err := t.cli.KV.Put(ctx, GlobalCheckpointOf(t.Info.Name), string(encodeUint64(ts)))
if err != nil {
Expand Down
122 changes: 81 additions & 41 deletions br/pkg/streamhelper/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"testing"

"github.com/pingcap/errors"
backup "github.com/pingcap/kvproto/pkg/brpb"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
Expand Down Expand Up @@ -138,11 +138,11 @@ func TestIntegration(t *testing.T) {
defer etcd.Server.Stop()
metaCli := streamhelper.MetaDataClient{Client: cli}
t.Run("TestBasic", func(t *testing.T) { testBasic(t, metaCli, etcd) })
t.Run("TestForwardProgress", func(t *testing.T) { testForwardProgress(t, metaCli, etcd) })
t.Run("testGetStorageCheckpoint", func(t *testing.T) { testGetStorageCheckpoint(t, metaCli) })
t.Run("testGetGlobalCheckPointTS", func(t *testing.T) { testGetGlobalCheckPointTS(t, metaCli) })
t.Run("TestStreamListening", func(t *testing.T) { testStreamListening(t, streamhelper.AdvancerExt{MetaDataClient: metaCli}) })
t.Run("TestStreamCheckpoint", func(t *testing.T) { testStreamCheckpoint(t, streamhelper.AdvancerExt{MetaDataClient: metaCli}) })
t.Run("testStoptask", func(t *testing.T) { testStoptask(t, streamhelper.AdvancerExt{MetaDataClient: metaCli}) })
}

func TestChecking(t *testing.T) {
Expand Down Expand Up @@ -210,31 +210,6 @@ func testBasic(t *testing.T, metaCli streamhelper.MetaDataClient, etcd *embed.Et
rangeIsEmpty(t, []byte(streamhelper.RangesOf(taskName)), etcd)
}

func testForwardProgress(t *testing.T, metaCli streamhelper.MetaDataClient, etcd *embed.Etcd) {
ctx := context.Background()
taskName := "many_tables"
taskInfo := simpleTask(taskName, 65)
defer func() {
require.NoError(t, metaCli.DeleteTask(ctx, taskName))
}()

require.NoError(t, metaCli.PutTask(ctx, taskInfo))
task, err := metaCli.GetTask(ctx, taskName)
require.NoError(t, err)
require.NoError(t, task.Step(ctx, 1, 41))
require.NoError(t, task.Step(ctx, 1, 42))
require.NoError(t, task.Step(ctx, 2, 40))
rs, err := task.Ranges(ctx)
require.NoError(t, err)
require.Equal(t, simpleRanges(65), rs)
store1Checkpoint, err := task.MinNextBackupTS(ctx, 1)
require.NoError(t, err)
require.Equal(t, store1Checkpoint, uint64(42))
store2Checkpoint, err := task.MinNextBackupTS(ctx, 2)
require.NoError(t, err)
require.Equal(t, store2Checkpoint, uint64(40))
}

func testGetStorageCheckpoint(t *testing.T, metaCli streamhelper.MetaDataClient) {
var (
taskName = "my_task"
Expand Down Expand Up @@ -298,7 +273,7 @@ func testGetGlobalCheckPointTS(t *testing.T, metaCli streamhelper.MetaDataClient
require.NoError(t, err)
}

task := streamhelper.NewTask(&metaCli, backup.StreamBackupTaskInfo{Name: taskName})
task := streamhelper.NewTask(&metaCli, backuppb.StreamBackupTaskInfo{Name: taskName})
task.UploadGlobalCheckpoint(ctx, 1003)

globalTS, err := task.GetGlobalCheckPointTS(ctx)
Expand Down Expand Up @@ -343,21 +318,86 @@ func testStreamCheckpoint(t *testing.T, metaCli streamhelper.AdvancerExt) {
ctx := context.Background()
task := "simple"
req := require.New(t)
getCheckpoint := func() uint64 {
resp, err := metaCli.KV.Get(ctx, streamhelper.GlobalCheckpointOf(task))
req.NoError(err)
if len(resp.Kvs) == 0 {
return 0
}
req.Len(resp.Kvs, 1)
return binary.BigEndian.Uint64(resp.Kvs[0].Value)
}

req.NoError(metaCli.UploadV3GlobalCheckpointForTask(ctx, task, 5))
req.EqualValues(5, getCheckpoint())
ts, err := metaCli.GetGlobalCheckpointForTask(ctx, task)
req.NoError(err)
req.EqualValues(5, ts)
req.NoError(metaCli.UploadV3GlobalCheckpointForTask(ctx, task, 18))
req.EqualValues(18, getCheckpoint())
ts, err = metaCli.GetGlobalCheckpointForTask(ctx, task)
req.NoError(err)
req.EqualValues(18, ts)
req.NoError(metaCli.UploadV3GlobalCheckpointForTask(ctx, task, 16))
req.EqualValues(18, getCheckpoint())
ts, err = metaCli.GetGlobalCheckpointForTask(ctx, task)
req.NoError(err)
req.EqualValues(18, ts)
req.NoError(metaCli.ClearV3GlobalCheckpointForTask(ctx, task))
req.EqualValues(0, getCheckpoint())
ts, err = metaCli.GetGlobalCheckpointForTask(ctx, task)
req.NoError(err)
req.EqualValues(0, ts)
}

func testStoptask(t *testing.T, metaCli streamhelper.AdvancerExt) {
var (
ctx = context.Background()
taskName = "stop_task"
req = require.New(t)
taskInfo = streamhelper.TaskInfo{
PBInfo: backuppb.StreamBackupTaskInfo{
Name: taskName,
StartTs: 0,
},
}
storeID = "5"
storageCheckpoint = make([]byte, 8)
)

// put task
req.NoError(metaCli.PutTask(ctx, taskInfo))
t2, err := metaCli.GetTask(ctx, taskName)
req.NoError(err)
req.EqualValues(taskInfo.PBInfo.Name, t2.Info.Name)

// upload global checkpoint
req.NoError(metaCli.UploadV3GlobalCheckpointForTask(ctx, taskName, 100))
ts, err := metaCli.GetGlobalCheckpointForTask(ctx, taskName)
req.NoError(err)
req.EqualValues(100, ts)

//upload storage checkpoint
key := path.Join(streamhelper.StorageCheckpointOf(taskName), storeID)
binary.BigEndian.PutUint64(storageCheckpoint, 90)
_, err = metaCli.Put(ctx, key, string(storageCheckpoint))
req.NoError(err)

task := streamhelper.NewTask(&metaCli.MetaDataClient, taskInfo.PBInfo)
ts, err = task.GetStorageCheckpoint(ctx)
req.NoError(err)
req.EqualValues(ts, 90)

// pause task
req.NoError(metaCli.PauseTask(ctx, taskName))
resp, err := metaCli.KV.Get(ctx, streamhelper.Pause(taskName))
req.NoError(err)
req.EqualValues(1, len(resp.Kvs))

// stop task
err = metaCli.DeleteTask(ctx, taskName)
req.NoError(err)

// check task and other meta infomations not existed
_, err = metaCli.GetTask(ctx, taskName)
req.Error(err)

ts, err = task.GetStorageCheckpoint(ctx)
req.NoError(err)
req.EqualValues(ts, 0)

ts, err = metaCli.GetGlobalCheckpointForTask(ctx, taskName)
req.NoError(err)
req.EqualValues(0, ts)

resp, err = metaCli.KV.Get(ctx, streamhelper.Pause(taskName))
req.NoError(err)
req.EqualValues(0, len(resp.Kvs))
}
18 changes: 2 additions & 16 deletions br/pkg/streamhelper/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,6 @@ func RangeKeyOf(name string, startKey []byte) string {
return RangesOf(name) + string(startKey)
}

func writeUint64(buf *bytes.Buffer, num uint64) {
items := [8]byte{}
binary.BigEndian.PutUint64(items[:], num)
buf.Write(items[:])
}

func encodeUint64(num uint64) []byte {
items := [8]byte{}
binary.BigEndian.PutUint64(items[:], num)
Expand All @@ -83,25 +77,17 @@ func CheckPointsOf(task string) string {
}

// GlobalCheckpointOf returns the path to the "global" checkpoint of some task.
// Normally it would be <prefix>/checkpoint/<task-name>/central_globa.
func GlobalCheckpointOf(task string) string {
return path.Join(streamKeyPrefix, taskCheckpointPath, task, checkpointTypeGlobal)
}

// StorageCheckpointOf get the prefix path of the `storage checkpoint status` of a task.
// Normally it would be <prefix>/storage-checkpoint/<task>.
func StorageCheckpointOf(task string) string {
return path.Join(streamKeyPrefix, storageCheckPoint, task)
}

// CheckpointOf returns the checkpoint prefix of some store.
// Normally it would be <prefix>/checkpoint/<task-name>/<store-id(binary-u64)>.
func CheckPointOf(task string, store uint64) string {
buf := bytes.NewBuffer(nil)
buf.WriteString(strings.TrimSuffix(path.Join(streamKeyPrefix, taskCheckpointPath, task), "/"))
buf.WriteRune('/')
writeUint64(buf, store)
return buf.String()
}

// Pause returns the path for pausing the task.
// Normally it would be <prefix>/pause/<task-name>.
func Pause(task string) string {
Expand Down
Binary file added cmd/pluginpkg/pluginpkg
Binary file not shown.
3 changes: 3 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,8 @@ type Config struct {
TiDBMaxReuseChunk uint32 `toml:"tidb-max-reuse-chunk" json:"tidb-max-reuse-chunk"`
// TiDBMaxReuseColumn indicates max cached column num
TiDBMaxReuseColumn uint32 `toml:"tidb-max-reuse-column" json:"tidb-max-reuse-column"`
// TiDBEnableExitCheck indicates whether exit-checking in domain for background process
TiDBEnableExitCheck bool `toml:"tidb-enable-exit-check" json:"tidb-enable-exit-check"`
}

// UpdateTempStoragePath is to update the `TempStoragePath` if port/statusPort was changed
Expand Down Expand Up @@ -1000,6 +1002,7 @@ var defaultConf = Config{
DisaggregatedTiFlash: false,
TiDBMaxReuseChunk: 64,
TiDBMaxReuseColumn: 256,
TiDBEnableExitCheck: false,
}

var (
Expand Down
4 changes: 4 additions & 0 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,10 @@ func (w *worker) onModifyColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver in
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
if tblInfo.Partition != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(dbterror.ErrUnsupportedModifyColumn.GenWithStackByArgs("table is partition table"))
}

changingCol := modifyInfo.changingCol
if changingCol == nil {
Expand Down
Loading

0 comments on commit 489df74

Please sign in to comment.