Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: merge continuous key ranges in FLASHBACK CLUSTER job args #54914

Merged
merged 4 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 83 additions & 16 deletions pkg/ddl/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,9 +322,9 @@ func addToSlice(schema string, tableName string, tableID int64, flashbackIDs []i
return flashbackIDs
}

// GetTableDataKeyRanges get keyRanges by `flashbackIDs`.
// getTableDataKeyRanges get keyRanges by `flashbackIDs`.
// This func will return all flashback table data key ranges.
func GetTableDataKeyRanges(nonFlashbackTableIDs []int64) []kv.KeyRange {
func getTableDataKeyRanges(nonFlashbackTableIDs []int64) []kv.KeyRange {
var keyRanges []kv.KeyRange

nonFlashbackTableIDs = append(nonFlashbackTableIDs, -1)
Expand All @@ -349,10 +349,52 @@ func GetTableDataKeyRanges(nonFlashbackTableIDs []int64) []kv.KeyRange {
return keyRanges
}

// GetFlashbackKeyRanges get keyRanges for flashback cluster.
type keyRangeMayExclude struct {
r kv.KeyRange
exclude bool
}

// appendContinuousKeyRanges merges not exclude continuous key ranges and appends
// to given []kv.KeyRange, assuming the gap between key ranges has no data.
//
// Precondition: schemaKeyRanges is sorted by start key. schemaKeyRanges are
// non-overlapping.
func appendContinuousKeyRanges(result []kv.KeyRange, schemaKeyRanges []keyRangeMayExclude) []kv.KeyRange {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func appendContinuousKeyRanges(result []kv.KeyRange, schemaKeyRanges []keyRangeMayExclude) []kv.KeyRange {
func mergeContinuousKeyRanges(result []kv.KeyRange, schemaKeyRanges []keyRangeMayExclude) []kv.KeyRange {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to mention the input slice will be appended, so I prefer keep the old name

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe avoid modifying input param, ok to leave it anyway

var (
continuousStart, continuousEnd kv.Key
)

for _, r := range schemaKeyRanges {
if r.exclude {
if continuousStart != nil {
result = append(result, kv.KeyRange{
StartKey: continuousStart,
EndKey: continuousEnd,
})
continuousStart = nil
}
continue
}

if continuousStart == nil {
continuousStart = r.r.StartKey
}
continuousEnd = r.r.EndKey
}

if continuousStart != nil {
result = append(result, kv.KeyRange{
StartKey: continuousStart,
EndKey: continuousEnd,
})
}
return result
}

// getFlashbackKeyRanges get keyRanges for flashback cluster.
// It contains all non system table key ranges and meta data key ranges.
// The time complexity is O(nlogn).
func GetFlashbackKeyRanges(sess sessionctx.Context, flashbackTS uint64) ([]kv.KeyRange, error) {
func getFlashbackKeyRanges(ctx context.Context, sess sessionctx.Context, flashbackTS uint64) ([]kv.KeyRange, error) {
is := sess.GetDomainInfoSchema().(infoschema.InfoSchema)
schemas := is.AllSchemas()

Expand All @@ -367,27 +409,52 @@ func GetFlashbackKeyRanges(sess sessionctx.Context, flashbackTS uint64) ([]kv.Ke
}

schemaIDs := make(map[int64]struct{})
excludeSchemaIDs := make(map[int64]struct{})
for _, schema := range schemas {
if !filter.IsSystemSchema(schema.Name.L) {
if filter.IsSystemSchema(schema.Name.L) {
excludeSchemaIDs[schema.ID] = struct{}{}
} else {
schemaIDs[schema.ID] = struct{}{}
}
}
for _, schema := range snapshotSchemas {
if !filter.IsSystemSchema(schema.Name.L) {
if filter.IsSystemSchema(schema.Name.L) {
excludeSchemaIDs[schema.ID] = struct{}{}
} else {
schemaIDs[schema.ID] = struct{}{}
}
}

// The meta data key ranges.
schemaKeyRanges := make([]keyRangeMayExclude, 0, len(schemaIDs)+len(excludeSchemaIDs))
for schemaID := range schemaIDs {
metaStartKey := tablecodec.EncodeMetaKeyPrefix(meta.DBkey(schemaID))
metaEndKey := tablecodec.EncodeMetaKeyPrefix(meta.DBkey(schemaID + 1))
keyRanges = append(keyRanges, kv.KeyRange{
StartKey: metaStartKey,
EndKey: metaEndKey,
schemaKeyRanges = append(schemaKeyRanges, keyRangeMayExclude{
r: kv.KeyRange{
StartKey: metaStartKey,
EndKey: metaEndKey,
},
exclude: false,
})
}
for schemaID := range excludeSchemaIDs {
metaStartKey := tablecodec.EncodeMetaKeyPrefix(meta.DBkey(schemaID))
metaEndKey := tablecodec.EncodeMetaKeyPrefix(meta.DBkey(schemaID + 1))
schemaKeyRanges = append(schemaKeyRanges, keyRangeMayExclude{
r: kv.KeyRange{
StartKey: metaStartKey,
EndKey: metaEndKey,
},
exclude: true,
})
}

slices.SortFunc(schemaKeyRanges, func(a, b keyRangeMayExclude) int {
return bytes.Compare(a.r.StartKey, b.r.StartKey)
})

keyRanges = appendContinuousKeyRanges(keyRanges, schemaKeyRanges)

startKey := tablecodec.EncodeMetaKeyPrefix([]byte("DBs"))
keyRanges = append(keyRanges, kv.KeyRange{
StartKey: startKey,
Expand All @@ -396,11 +463,11 @@ func GetFlashbackKeyRanges(sess sessionctx.Context, flashbackTS uint64) ([]kv.Ke

var nonFlashbackTableIDs []int64
for _, db := range schemas {
tblInfos, err := is.SchemaTableInfos(context.Background(), db.Name)
if err != nil {
return nil, errors.Trace(err)
tbls, err2 := is.SchemaTableInfos(ctx, db.Name)
if err2 != nil {
return nil, errors.Trace(err2)
}
for _, table := range tblInfos {
for _, table := range tbls {
if !table.IsBaseTable() || table.ID > meta.MaxGlobalID {
continue
}
Expand All @@ -413,7 +480,7 @@ func GetFlashbackKeyRanges(sess sessionctx.Context, flashbackTS uint64) ([]kv.Ke
}
}

return append(keyRanges, GetTableDataKeyRanges(nonFlashbackTableIDs)...), nil
return append(keyRanges, getTableDataKeyRanges(nonFlashbackTableIDs)...), nil
}

// SendPrepareFlashbackToVersionRPC prepares regions for flashback, the purpose is to put region into flashback state which region stop write
Expand Down Expand Up @@ -712,7 +779,7 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve
return ver, errors.Trace(err)
}
job.Args[startTSOffset] = startTS
keyRanges, err = GetFlashbackKeyRanges(sess, flashbackTS)
keyRanges, err = getFlashbackKeyRanges(w.ctx, sess, flashbackTS)
if err != nil {
return ver, errors.Trace(err)
}
Expand Down
31 changes: 0 additions & 31 deletions pkg/ddl/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,11 @@ import (
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/ddl"
"github.com/pingcap/tidb/pkg/ddl/util/callback"
"github.com/pingcap/tidb/pkg/domain/infosync"
"github.com/pingcap/tidb/pkg/errno"
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/testkit/testfailpoint"
"github.com/pingcap/tidb/pkg/types"
Expand All @@ -38,34 +35,6 @@ import (
"github.com/tikv/client-go/v2/oracle"
)

func TestGetTableDataKeyRanges(t *testing.T) {
// case 1, empty flashbackIDs
keyRanges := ddl.GetTableDataKeyRanges([]int64{})
require.Len(t, keyRanges, 1)
require.Equal(t, keyRanges[0].StartKey, tablecodec.EncodeTablePrefix(0))
require.Equal(t, keyRanges[0].EndKey, tablecodec.EncodeTablePrefix(meta.MaxGlobalID))

// case 2, insert a execluded table ID
keyRanges = ddl.GetTableDataKeyRanges([]int64{3})
require.Len(t, keyRanges, 2)
require.Equal(t, keyRanges[0].StartKey, tablecodec.EncodeTablePrefix(0))
require.Equal(t, keyRanges[0].EndKey, tablecodec.EncodeTablePrefix(3))
require.Equal(t, keyRanges[1].StartKey, tablecodec.EncodeTablePrefix(4))
require.Equal(t, keyRanges[1].EndKey, tablecodec.EncodeTablePrefix(meta.MaxGlobalID))

// case 3, insert some execluded table ID
keyRanges = ddl.GetTableDataKeyRanges([]int64{3, 5, 9})
require.Len(t, keyRanges, 4)
require.Equal(t, keyRanges[0].StartKey, tablecodec.EncodeTablePrefix(0))
require.Equal(t, keyRanges[0].EndKey, tablecodec.EncodeTablePrefix(3))
require.Equal(t, keyRanges[1].StartKey, tablecodec.EncodeTablePrefix(4))
require.Equal(t, keyRanges[1].EndKey, tablecodec.EncodeTablePrefix(5))
require.Equal(t, keyRanges[2].StartKey, tablecodec.EncodeTablePrefix(6))
require.Equal(t, keyRanges[2].EndKey, tablecodec.EncodeTablePrefix(9))
require.Equal(t, keyRanges[3].StartKey, tablecodec.EncodeTablePrefix(10))
require.Equal(t, keyRanges[3].EndKey, tablecodec.EncodeTablePrefix(meta.MaxGlobalID))
}

func TestFlashbackCloseAndResetPDSchedule(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
originHook := dom.DDL().GetHook()
Expand Down
144 changes: 144 additions & 0 deletions pkg/ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/store/mockstore"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/dbterror"
"github.com/pingcap/tidb/pkg/util/mock"
Expand Down Expand Up @@ -294,3 +295,146 @@ func TestCheckDuplicateConstraint(t *testing.T) {
err = checkDuplicateConstraint(constrNames, "u1", ast.ConstraintUniq)
require.EqualError(t, err, "[ddl:1061]Duplicate key name 'u1'")
}

func TestGetTableDataKeyRanges(t *testing.T) {
// case 1, empty flashbackIDs
keyRanges := getTableDataKeyRanges([]int64{})
require.Len(t, keyRanges, 1)
require.Equal(t, keyRanges[0].StartKey, tablecodec.EncodeTablePrefix(0))
require.Equal(t, keyRanges[0].EndKey, tablecodec.EncodeTablePrefix(meta.MaxGlobalID))

// case 2, insert a execluded table ID
keyRanges = getTableDataKeyRanges([]int64{3})
require.Len(t, keyRanges, 2)
require.Equal(t, keyRanges[0].StartKey, tablecodec.EncodeTablePrefix(0))
require.Equal(t, keyRanges[0].EndKey, tablecodec.EncodeTablePrefix(3))
require.Equal(t, keyRanges[1].StartKey, tablecodec.EncodeTablePrefix(4))
require.Equal(t, keyRanges[1].EndKey, tablecodec.EncodeTablePrefix(meta.MaxGlobalID))

// case 3, insert some execluded table ID
keyRanges = getTableDataKeyRanges([]int64{3, 5, 9})
require.Len(t, keyRanges, 4)
require.Equal(t, keyRanges[0].StartKey, tablecodec.EncodeTablePrefix(0))
require.Equal(t, keyRanges[0].EndKey, tablecodec.EncodeTablePrefix(3))
require.Equal(t, keyRanges[1].StartKey, tablecodec.EncodeTablePrefix(4))
require.Equal(t, keyRanges[1].EndKey, tablecodec.EncodeTablePrefix(5))
require.Equal(t, keyRanges[2].StartKey, tablecodec.EncodeTablePrefix(6))
require.Equal(t, keyRanges[2].EndKey, tablecodec.EncodeTablePrefix(9))
require.Equal(t, keyRanges[3].StartKey, tablecodec.EncodeTablePrefix(10))
require.Equal(t, keyRanges[3].EndKey, tablecodec.EncodeTablePrefix(meta.MaxGlobalID))
}

func TestAppendContinuousKeyRanges(t *testing.T) {
cases := []struct {
input []keyRangeMayExclude
expect []kv.KeyRange
}{
{
[]keyRangeMayExclude{
{
r: kv.KeyRange{StartKey: []byte{1}, EndKey: []byte{2}},
exclude: true,
},
},
[]kv.KeyRange{},
},
{
[]keyRangeMayExclude{
{
r: kv.KeyRange{StartKey: []byte{1}, EndKey: []byte{2}},
exclude: false,
},
},
[]kv.KeyRange{{StartKey: []byte{1}, EndKey: []byte{2}}},
},
{
[]keyRangeMayExclude{
{
r: kv.KeyRange{StartKey: []byte{1}, EndKey: []byte{2}},
exclude: false,
},
{
r: kv.KeyRange{StartKey: []byte{3}, EndKey: []byte{4}},
exclude: false,
},
},
[]kv.KeyRange{{StartKey: []byte{1}, EndKey: []byte{4}}},
},
{
[]keyRangeMayExclude{
{
r: kv.KeyRange{StartKey: []byte{1}, EndKey: []byte{2}},
exclude: false,
},
{
r: kv.KeyRange{StartKey: []byte{3}, EndKey: []byte{4}},
exclude: true,
},
{
r: kv.KeyRange{StartKey: []byte{5}, EndKey: []byte{6}},
exclude: false,
},
},
[]kv.KeyRange{
{StartKey: []byte{1}, EndKey: []byte{2}},
{StartKey: []byte{5}, EndKey: []byte{6}},
},
},
{
[]keyRangeMayExclude{
{
r: kv.KeyRange{StartKey: []byte{1}, EndKey: []byte{2}},
exclude: true,
},
{
r: kv.KeyRange{StartKey: []byte{3}, EndKey: []byte{4}},
exclude: true,
},
{
r: kv.KeyRange{StartKey: []byte{5}, EndKey: []byte{6}},
exclude: false,
},
},
[]kv.KeyRange{{StartKey: []byte{5}, EndKey: []byte{6}}},
},
{
[]keyRangeMayExclude{
{
r: kv.KeyRange{StartKey: []byte{1}, EndKey: []byte{2}},
exclude: false,
},
{
r: kv.KeyRange{StartKey: []byte{3}, EndKey: []byte{4}},
exclude: true,
},
{
r: kv.KeyRange{StartKey: []byte{5}, EndKey: []byte{6}},
exclude: true,
},
},
[]kv.KeyRange{{StartKey: []byte{1}, EndKey: []byte{2}}},
},
{
[]keyRangeMayExclude{
{
r: kv.KeyRange{StartKey: []byte{1}, EndKey: []byte{2}},
exclude: true,
},
{
r: kv.KeyRange{StartKey: []byte{3}, EndKey: []byte{4}},
exclude: false,
},
{
r: kv.KeyRange{StartKey: []byte{5}, EndKey: []byte{6}},
exclude: true,
},
},
[]kv.KeyRange{{StartKey: []byte{3}, EndKey: []byte{4}}},
},
}

for i, ca := range cases {
ranges := appendContinuousKeyRanges([]kv.KeyRange{}, ca.input)
require.Equal(t, ca.expect, ranges, "case %d", i)
}
}
Loading