Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: fix incorrect _tidb_rowid allocator value after import for table with AUTO_ID_CACHE=1 #46171

Merged
merged 11 commits into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions br/pkg/lightning/backend/kv/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ type panickingAllocator struct {
}

// NewPanickingAllocators creates a PanickingAllocator shared by all allocation types.
// we use this to collect the max id(either _tidb_rowid or auto_increment id or auto_random) used
// during import, and we will use this info to do ALTER TABLE xxx AUTO_RANDOM_BASE or AUTO_INCREMENT
// on post-process phase.
func NewPanickingAllocators(base int64) autoid.Allocators {
sharedBase := &base
return autoid.NewAllocators(
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/kv/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ func (e *BaseKVEncoder) ProcessColDatum(col *table.Column, rowID int64, inputDat
}
}
if IsAutoIncCol(col.ToInfo()) {
// same as RowIDAllocType, since SepAutoInc is always false when initializing allocators of Table.
alloc := e.Table.Allocators(e.SessionCtx).Get(autoid.AutoIncrementType)
if err := alloc.Rebase(context.Background(), GetAutoRecordID(value, &col.FieldType), false); err != nil {
return value, errors.Trace(err)
Expand Down
11 changes: 10 additions & 1 deletion br/pkg/lightning/common/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ go_test(
name = "common_test",
timeout = "short",
srcs = [
"common_test.go",
"errors_test.go",
"main_test.go",
"once_error_test.go",
Expand All @@ -104,15 +105,23 @@ go_test(
],
embed = [":common"],
flaky = True,
shard_count = 20,
shard_count = 21,
deps = [
"//br/pkg/errors",
"//br/pkg/lightning/log",
"//ddl",
"//errno",
"//kv",
"//meta",
"//meta/autoid",
"//parser",
"//parser/ast",
"//parser/model",
"//store/driver/error",
"//store/mockstore",
"//testkit/testsetup",
"//util/dbutil",
"//util/mock",
"@com_github_data_dog_go_sqlmock//:go-sqlmock",
"@com_github_go_sql_driver_mysql//:mysql",
"@com_github_pingcap_errors//:errors",
Expand Down
47 changes: 37 additions & 10 deletions br/pkg/lightning/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,24 +52,42 @@
// AllocGlobalAutoID allocs N consecutive autoIDs from TiDB.
func AllocGlobalAutoID(ctx context.Context, n int64, store kv.Storage, dbID int64,
tblInfo *model.TableInfo) (autoIDBase, autoIDMax int64, err error) {
alloc, err := getGlobalAutoIDAlloc(store, dbID, tblInfo)
allocators, err := GetGlobalAutoIDAlloc(store, dbID, tblInfo)
if err != nil {
return 0, 0, err
}
return alloc.Alloc(ctx, uint64(n), 1, 1)
// there might be 2 allocators when tblInfo.SepAutoInc is true, and in this case
// RowIDAllocType will be the last one.
// we return the value of last Alloc as autoIDBase and autoIDMax, i.e. the value
// either comes from RowIDAllocType or AutoRandomType.
for _, alloc := range allocators {
autoIDBase, autoIDMax, err = alloc.Alloc(ctx, uint64(n), 1, 1)
if err != nil {
return 0, 0, err
}

Check warning on line 67 in br/pkg/lightning/common/common.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/common/common.go#L66-L67

Added lines #L66 - L67 were not covered by tests
}
return
}

// RebaseGlobalAutoID rebase the autoID base to newBase.
func RebaseGlobalAutoID(ctx context.Context, newBase int64, store kv.Storage, dbID int64,
tblInfo *model.TableInfo) error {
alloc, err := getGlobalAutoIDAlloc(store, dbID, tblInfo)
allocators, err := GetGlobalAutoIDAlloc(store, dbID, tblInfo)
if err != nil {
return err
}
return alloc.Rebase(ctx, newBase, false)
for _, alloc := range allocators {
err = alloc.Rebase(ctx, newBase, false)
if err != nil {
return err
}

Check warning on line 83 in br/pkg/lightning/common/common.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/common/common.go#L82-L83

Added lines #L82 - L83 were not covered by tests
}
return nil
}

func getGlobalAutoIDAlloc(store kv.Storage, dbID int64, tblInfo *model.TableInfo) (autoid.Allocator, error) {
// GetGlobalAutoIDAlloc returns the autoID allocators for a table.
// export it for testing.
func GetGlobalAutoIDAlloc(store kv.Storage, dbID int64, tblInfo *model.TableInfo) ([]autoid.Allocator, error) {
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
if store == nil {
return nil, errors.New("internal error: kv store should not be nil")
}
Expand All @@ -89,20 +107,29 @@
hasAutoIncID := tblInfo.GetAutoIncrementColInfo() != nil
hasAutoRandID := tblInfo.ContainsAutoRandomBits()

// Current TiDB has some limitations for auto ID.
// TiDB version <= 6.4.0 has some limitations for auto ID.
// 1. Auto increment ID and auto row ID are using the same RowID allocator.
// See https://github.com/pingcap/tidb/issues/982.
// 2. Auto random column must be a clustered primary key. That is to say,
// there is no implicit row ID for tables with auto random column.
// 3. There is at most one auto column in a table.
// Therefore, we assume there is only one auto column in a table and use RowID allocator if possible.
//
// Since TiDB 6.5.0, row ID and auto ID are using different allocators when tblInfo.SepAutoInc is true
switch {
case hasRowID || hasAutoIncID:
return autoid.NewAllocator(store, dbID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(),
autoid.RowIDAllocType, noCache, tblVer), nil
allocators := make([]autoid.Allocator, 0, 2)
if tblInfo.SepAutoInc() && hasAutoIncID {
allocators = append(allocators, autoid.NewAllocator(store, dbID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(),
autoid.AutoIncrementType, noCache, tblVer))
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

coverage false alarm, those lines has covered by TestAllocGlobalAutoID

// this allocator is NOT used when SepAutoInc=true and auto increment column is clustered.
allocators = append(allocators, autoid.NewAllocator(store, dbID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(),
autoid.RowIDAllocType, noCache, tblVer))
return allocators, nil
case hasAutoRandID:
return autoid.NewAllocator(store, dbID, tblInfo.ID, tblInfo.IsAutoRandomBitColUnsigned(),
autoid.AutoRandomType, noCache, tblVer), nil
return []autoid.Allocator{autoid.NewAllocator(store, dbID, tblInfo.ID, tblInfo.IsAutoRandomBitColUnsigned(),
autoid.AutoRandomType, noCache, tblVer)}, nil
default:
return nil, errors.Errorf("internal error: table %s has no auto ID", tblInfo.Name)
}
Expand Down
161 changes: 161 additions & 0 deletions br/pkg/lightning/common/common_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package common_test

import (
"context"
"testing"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/store/mockstore"
tmock "github.com/pingcap/tidb/util/mock"
"github.com/stretchr/testify/require"
)

func newTableInfo(t *testing.T,
dbID, tableID int64,
createTableSql string, kvStore kv.Storage,
) *model.TableInfo {
p := parser.New()
se := tmock.NewContext()

node, err := p.ParseOneStmt(createTableSql, "utf8mb4", "utf8mb4_bin")
require.NoError(t, err)
tableInfo, err := ddl.MockTableInfo(se, node.(*ast.CreateTableStmt), tableID)
require.NoError(t, err)
tableInfo.State = model.StatePublic

ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnLightning)
err = kv.RunInNewTxn(ctx, kvStore, false, func(ctx context.Context, txn kv.Transaction) error {
m := meta.NewMeta(txn)
if err := m.CreateDatabase(&model.DBInfo{ID: dbID}); err != nil && !errors.ErrorEqual(err, meta.ErrDBExists) {
return err
}
return m.CreateTableOrView(dbID, tableInfo)
})
require.NoError(t, err)
return tableInfo
}

func TestAllocGlobalAutoID(t *testing.T) {
storePath := t.TempDir()
kvStore, err := mockstore.NewMockStore(mockstore.WithPath(storePath))
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, kvStore.Close())
})

cases := []struct {
tableID int64
createTableSQL string
expectErrStr string
expectAllocatorTypes []autoid.AllocatorType
}{
// autoID, autoIncrID = false, false
{
tableID: 11,
createTableSQL: "create table t11 (a int primary key clustered)",
expectErrStr: "has no auto ID",
expectAllocatorTypes: nil,
},
{
tableID: 12,
createTableSQL: "create table t12 (a int primary key clustered) AUTO_ID_CACHE 1",
expectErrStr: "has no auto ID",
expectAllocatorTypes: nil,
},
// autoID, autoIncrID = true, false
{
tableID: 21,
createTableSQL: "create table t21 (a int)",
expectErrStr: "",
expectAllocatorTypes: []autoid.AllocatorType{autoid.RowIDAllocType},
},
{
tableID: 22,
createTableSQL: "create table t22 (a int) AUTO_ID_CACHE 1",
expectErrStr: "",
expectAllocatorTypes: []autoid.AllocatorType{autoid.RowIDAllocType},
},
// autoID, autoIncrID = false, true
{
tableID: 31,
createTableSQL: "create table t31 (a int primary key clustered auto_increment)",
expectErrStr: "",
expectAllocatorTypes: []autoid.AllocatorType{autoid.RowIDAllocType},
},
{
tableID: 32,
createTableSQL: "create table t32 (a int primary key clustered auto_increment) AUTO_ID_CACHE 1",
expectErrStr: "",
expectAllocatorTypes: []autoid.AllocatorType{autoid.AutoIncrementType, autoid.RowIDAllocType},
},
// autoID, autoIncrID = true, true
{
tableID: 41,
createTableSQL: "create table t41 (a int primary key nonclustered auto_increment)",
expectErrStr: "",
expectAllocatorTypes: []autoid.AllocatorType{autoid.RowIDAllocType},
},
{
tableID: 42,
createTableSQL: "create table t42 (a int primary key nonclustered auto_increment) AUTO_ID_CACHE 1",
expectErrStr: "",
expectAllocatorTypes: []autoid.AllocatorType{autoid.AutoIncrementType, autoid.RowIDAllocType},
},
// autoRandomID
{
tableID: 51,
createTableSQL: "create table t51 (a bigint primary key auto_random)",
expectErrStr: "",
expectAllocatorTypes: []autoid.AllocatorType{autoid.AutoRandomType},
},
}
ctx := context.Background()
for _, c := range cases {
ti := newTableInfo(t, 1, c.tableID, c.createTableSQL, kvStore)
allocators, err := common.GetGlobalAutoIDAlloc(kvStore, 1, ti)
if c.expectErrStr == "" {
require.NoError(t, err, c.tableID)
require.NoError(t, common.RebaseGlobalAutoID(ctx, 123, kvStore, 1, ti))
base, idMax, err := common.AllocGlobalAutoID(ctx, 100, kvStore, 1, ti)
require.NoError(t, err, c.tableID)
require.Equal(t, int64(123), base, c.tableID)
require.Equal(t, int64(223), idMax, c.tableID)
// all allocators are rebased and allocated
for _, alloc := range allocators {
base2, max2, err := alloc.Alloc(ctx, 100, 1, 1)
require.NoError(t, err, c.tableID)
require.Equal(t, int64(223), base2, c.tableID)
require.Equal(t, int64(323), max2, c.tableID)
}
} else {
require.ErrorContains(t, err, c.expectErrStr, c.tableID)
}
var allocatorTypes []autoid.AllocatorType
for _, alloc := range allocators {
allocatorTypes = append(allocatorTypes, alloc.GetType())
}
require.Equal(t, c.expectAllocatorTypes, allocatorTypes, c.tableID)
}
}
33 changes: 18 additions & 15 deletions br/pkg/lightning/importer/meta_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,43 +35,45 @@ type metaMgrSuite struct {
checksumMgr *testChecksumMgr
}

func newTableRestore(t *testing.T, kvStore kv.Storage) *TableImporter {
func newTableRestore(t *testing.T,
db, table string,
dbID, tableID int64,
createTableSQL string, kvStore kv.Storage,
) *TableImporter {
p := parser.New()
se := tmock.NewContext()

node, err := p.ParseOneStmt("CREATE TABLE `t1` (`c1` varchar(5) NOT NULL)", "utf8mb4", "utf8mb4_bin")
node, err := p.ParseOneStmt(createTableSQL, "utf8mb4", "utf8mb4_bin")
require.NoError(t, err)
tableInfo, err := ddl.MockTableInfo(se, node.(*ast.CreateTableStmt), int64(1))
tableInfo, err := ddl.MockTableInfo(se, node.(*ast.CreateTableStmt), tableID)
require.NoError(t, err)
tableInfo.State = model.StatePublic

schema := "test"
tb := "t1"
ti := &checkpoints.TidbTableInfo{
ID: tableInfo.ID,
DB: schema,
Name: tb,
DB: db,
Name: table,
Core: tableInfo,
}
dbInfo := &checkpoints.TidbDBInfo{
ID: 1,
Name: schema,
ID: dbID,
Name: db,
Tables: map[string]*checkpoints.TidbTableInfo{
tb: ti,
table: ti,
},
}

ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnLightning)
err = kv.RunInNewTxn(ctx, kvStore, false, func(ctx context.Context, txn kv.Transaction) error {
m := meta.NewMeta(txn)
if err := m.CreateDatabase(&model.DBInfo{ID: dbInfo.ID}); err != nil {
if err := m.CreateDatabase(&model.DBInfo{ID: dbInfo.ID}); err != nil && !errors.ErrorEqual(err, meta.ErrDBExists) {
return err
}
return m.CreateTableOrView(dbInfo.ID, ti.Core)
})
require.NoError(t, err)

tableName := common.UniqueTable(schema, tb)
tableName := common.UniqueTable(db, table)
logger := log.With(zap.String("table", tableName))

return &TableImporter{
Expand All @@ -93,9 +95,10 @@ func newMetaMgrSuite(t *testing.T) *metaMgrSuite {

var s metaMgrSuite
s.mgr = &dbTableMetaMgr{
session: db,
taskID: 1,
tr: newTableRestore(t, kvStore),
session: db,
taskID: 1,
tr: newTableRestore(t, "test", "t1", 1, 1,
"CREATE TABLE `t1` (`c1` varchar(5) NOT NULL)", kvStore),
tableName: common.UniqueTable("test", TableMetaTableName),
needChecksum: true,
}
Expand Down
17 changes: 15 additions & 2 deletions br/pkg/lightning/importer/table_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -931,8 +931,21 @@ func (tr *TableImporter) postProcess(
maxCap := shardFmt.IncrementalBitsCapacity()
err = AlterAutoRandom(ctx, rc.db, tr.tableName, uint64(tr.alloc.Get(autoid.AutoRandomType).Base())+1, maxCap)
} else if common.TableHasAutoRowID(tblInfo) || tblInfo.GetAutoIncrementColInfo() != nil {
// only alter auto increment id iff table contains auto-increment column or generated handle
err = AlterAutoIncrement(ctx, rc.db, tr.tableName, uint64(tr.alloc.Get(autoid.RowIDAllocType).Base())+1)
// only alter auto increment id iff table contains auto-increment column or generated handle.
// ALTER TABLE xxx AUTO_INCREMENT = yyy has a bad naming.
// if a table has implicit _tidb_rowid column & tbl.SepAutoID=false, then it works on _tidb_rowid
// allocator, even if the table has NO auto-increment column.
newBase := uint64(tr.alloc.Get(autoid.RowIDAllocType).Base()) + 1
err = AlterAutoIncrement(ctx, rc.db, tr.tableName, newBase)

if err == nil && isLocalBackend(rc.cfg) {
// for TiDB version >= 6.5.0, a table might have separate allocators for auto_increment column and _tidb_rowid,
// especially when a table has auto_increment non-clustered PK, it will use both allocators.
// And in this case, ALTER TABLE xxx AUTO_INCREMENT = xxx only works on the allocator of auto_increment column,
// not for allocator of _tidb_rowid.
// So we need to rebase IDs for those 2 allocators explicitly.
err = common.RebaseGlobalAutoID(ctx, adjustIDBase(newBase), tr.kvStore, tr.dbInfo.ID, tr.tableInfo.Core)
}
}
rc.alterTableLock.Unlock()
saveCpErr := rc.saveStatusCheckpoint(ctx, tr.tableName, checkpoints.WholeTableEngineID, err, checkpoints.CheckpointStatusAlteredAutoInc)
Expand Down
Loading
Loading