Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: refactor Allocator.Alloc() interface for separating auto IDs #27616

Merged
merged 9 commits into from
Aug 28, 2021
9 changes: 5 additions & 4 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,6 @@ func BuildBackupRangeAndSchema(
continue
}

idAlloc := autoid.NewAllocator(storage, dbInfo.ID, false, autoid.RowIDAllocType)
seqAlloc := autoid.NewAllocator(storage, dbInfo.ID, false, autoid.SequenceType)
randAlloc := autoid.NewAllocator(storage, dbInfo.ID, false, autoid.AutoRandomType)

tables, err := m.ListTables(dbInfo.ID)
if err != nil {
return nil, nil, errors.Trace(err)
Expand All @@ -294,6 +290,11 @@ func BuildBackupRangeAndSchema(
zap.String("table", tableInfo.Name.O),
)

tblVer := autoid.AllocOptionTableInfoVersion(tableInfo.Version)
idAlloc := autoid.NewAllocator(storage, dbInfo.ID, tableInfo.ID, false, autoid.RowIDAllocType, tblVer)
seqAlloc := autoid.NewAllocator(storage, dbInfo.ID, tableInfo.ID, false, autoid.SequenceType, tblVer)
randAlloc := autoid.NewAllocator(storage, dbInfo.ID, tableInfo.ID, false, autoid.AutoRandomType, tblVer)

var globalAutoID int64
switch {
case tableInfo.IsSequence():
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/restore/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (s *testRestoreSchemaSuite) TestRestoreAutoIncID(c *C) {
DB: dbInfo,
}
// Get the next AutoIncID
idAlloc := autoid.NewAllocator(s.mock.Storage, dbInfo.ID, false, autoid.RowIDAllocType)
idAlloc := autoid.NewAllocator(s.mock.Storage, dbInfo.ID, table.Info.ID, false, autoid.RowIDAllocType)
globalAutoID, err := idAlloc.NextGlobalAutoID(table.Info.ID)
c.Assert(err, IsNil, Commentf("Error allocate next auto id"))
c.Assert(autoIncID, Equals, uint64(globalAutoID))
Expand Down
2 changes: 1 addition & 1 deletion ddl/column_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ func getCurrentTable(d *ddl, schemaID, tableID int64) (table.Table, error) {
if err != nil {
return nil, errors.Trace(err)
}
alloc := autoid.NewAllocator(d.store, schemaID, false, autoid.RowIDAllocType)
alloc := autoid.NewAllocator(d.store, schemaID, tblInfo.ID, false, autoid.RowIDAllocType)
tbl, err := table.TableFromMeta(autoid.NewAllocators(alloc), tblInfo)
if err != nil {
return nil, errors.Trace(err)
Expand Down
2 changes: 1 addition & 1 deletion ddl/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ func testGetTableWithError(d *ddl, schemaID, tableID int64) (table.Table, error)
if tblInfo == nil {
return nil, errors.New("table not found")
}
alloc := autoid.NewAllocator(d.store, schemaID, false, autoid.RowIDAllocType)
alloc := autoid.NewAllocator(d.store, schemaID, tblInfo.ID, false, autoid.RowIDAllocType)
tbl, err := table.TableFromMeta(autoid.NewAllocators(alloc), tblInfo)
if err != nil {
return nil, errors.Trace(err)
Expand Down
2 changes: 1 addition & 1 deletion executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3838,7 +3838,7 @@ func (s *testSuite) TestCheckIndex(c *C) {
c.Assert(err, IsNil)
tbInfo := tbl.Meta()

alloc := autoid.NewAllocator(s.store, dbInfo.ID, false, autoid.RowIDAllocType)
alloc := autoid.NewAllocator(s.store, dbInfo.ID, tbInfo.ID, false, autoid.RowIDAllocType)
tb, err := tables.TableFromMeta(autoid.NewAllocators(alloc), tbInfo)
c.Assert(err, IsNil)

Expand Down
2 changes: 1 addition & 1 deletion executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -915,7 +915,7 @@ func (e *InsertValues) allocAutoRandomID(ctx context.Context, fieldType *types.F
tableInfo := e.Table.Meta()
increment := e.ctx.GetSessionVars().AutoIncrementIncrement
offset := e.ctx.GetSessionVars().AutoIncrementOffset
_, autoRandomID, err := alloc.Alloc(ctx, tableInfo.ID, 1, int64(increment), int64(offset))
_, autoRandomID, err := alloc.Alloc(ctx, 1, int64(increment), int64(offset))
if err != nil {
return 0, err
}
Expand Down
7 changes: 4 additions & 3 deletions infoschema/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,12 +359,13 @@ func (b *Builder) applyCreateTable(m *meta.Meta, dbInfo *model.DBInfo, tableID i
if len(allocs) == 0 {
allocs = autoid.NewAllocatorsFromTblInfo(b.store, dbInfo.ID, tblInfo)
} else {
tblVer := autoid.AllocOptionTableInfoVersion(tblInfo.Version)
switch tp {
case model.ActionRebaseAutoID, model.ActionModifyTableAutoIdCache:
newAlloc := autoid.NewAllocator(b.store, dbInfo.ID, tblInfo.IsAutoIncColUnsigned(), autoid.RowIDAllocType)
newAlloc := autoid.NewAllocator(b.store, dbInfo.ID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(), autoid.RowIDAllocType, tblVer)
allocs = append(allocs, newAlloc)
case model.ActionRebaseAutoRandomBase:
newAlloc := autoid.NewAllocator(b.store, dbInfo.ID, tblInfo.IsAutoRandomBitColUnsigned(), autoid.AutoRandomType)
newAlloc := autoid.NewAllocator(b.store, dbInfo.ID, tblInfo.ID, tblInfo.IsAutoRandomBitColUnsigned(), autoid.AutoRandomType, tblVer)
allocs = append(allocs, newAlloc)
case model.ActionModifyColumn:
// Change column attribute from auto_increment to auto_random.
Expand All @@ -373,7 +374,7 @@ func (b *Builder) applyCreateTable(m *meta.Meta, dbInfo *model.DBInfo, tableID i
allocs = allocs.Filter(func(a autoid.Allocator) bool {
return a.GetType() != autoid.AutoIncrementType && a.GetType() != autoid.RowIDAllocType
})
newAlloc := autoid.NewAllocator(b.store, dbInfo.ID, tblInfo.IsAutoRandomBitColUnsigned(), autoid.AutoRandomType)
newAlloc := autoid.NewAllocator(b.store, dbInfo.ID, tblInfo.ID, tblInfo.IsAutoRandomBitColUnsigned(), autoid.AutoRandomType, tblVer)
allocs = append(allocs, newAlloc)
}
}
Expand Down
36 changes: 25 additions & 11 deletions meta/autoid/autoid.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (a AllocatorType) String() string {
// CustomAutoIncCacheOption is one kind of AllocOption to customize the allocator step length.
type CustomAutoIncCacheOption int64

// ApplyOn is implement the AllocOption interface.
// ApplyOn implements the AllocOption interface.
func (step CustomAutoIncCacheOption) ApplyOn(alloc *allocator) {
if step == 0 {
return
Expand All @@ -109,6 +109,14 @@ func (step CustomAutoIncCacheOption) ApplyOn(alloc *allocator) {
alloc.customStep = true
}

// AllocOptionTableInfoVersion is used to pass the TableInfo.Version to the allocator.
type AllocOptionTableInfoVersion uint64

// ApplyOn implements the AllocOption interface.
func (v AllocOptionTableInfoVersion) ApplyOn(alloc *allocator) {
alloc.tbVersion = uint64(v)
}

// AllocOption is a interface to define allocator custom options coming in future.
type AllocOption interface {
ApplyOn(*allocator)
Expand All @@ -124,7 +132,7 @@ type Allocator interface {
// The returned range is (min, max]:
// case increment=1 & offset=1: you can derive the ids like min+1, min+2... max.
// case increment=x & offset=y: you firstly need to seek to firstID by `SeekToFirstAutoIDXXX`, then derive the IDs like firstID, firstID + increment * 2... in the caller.
Alloc(ctx context.Context, tableID int64, n uint64, increment, offset int64) (int64, int64, error)
Alloc(ctx context.Context, n uint64, increment, offset int64) (int64, int64, error)

// AllocSeqCache allocs sequence batch value cached in table level(rather than in alloc), the returned range covering
// the size of sequence cache with it's increment. The returned round indicates the sequence cycle times if it is with
Expand Down Expand Up @@ -187,6 +195,8 @@ type allocator struct {
store kv.Storage
// dbID is current database's ID.
dbID int64
tbID int64
tbVersion uint64
AilinKid marked this conversation as resolved.
Show resolved Hide resolved
isUnsigned bool
lastAllocTime time.Time
step int64
Expand Down Expand Up @@ -461,10 +471,12 @@ func NextStep(curStep int64, consumeDur time.Duration) int64 {
}

// NewAllocator returns a new auto increment id generator on the store.
func NewAllocator(store kv.Storage, dbID int64, isUnsigned bool, allocType AllocatorType, opts ...AllocOption) Allocator {
func NewAllocator(store kv.Storage, dbID, tbID int64, isUnsigned bool,
allocType AllocatorType, opts ...AllocOption) Allocator {
alloc := &allocator{
store: store,
dbID: dbID,
tbID: tbID,
isUnsigned: isUnsigned,
step: step,
lastAllocTime: time.Now(),
Expand All @@ -477,10 +489,11 @@ func NewAllocator(store kv.Storage, dbID int64, isUnsigned bool, allocType Alloc
}

// NewSequenceAllocator returns a new sequence value generator on the store.
func NewSequenceAllocator(store kv.Storage, dbID int64, info *model.SequenceInfo) Allocator {
func NewSequenceAllocator(store kv.Storage, dbID, tbID int64, info *model.SequenceInfo) Allocator {
return &allocator{
store: store,
dbID: dbID,
tbID: tbID,
// Sequence allocator is always signed.
isUnsigned: false,
lastAllocTime: time.Now(),
Expand All @@ -494,20 +507,21 @@ func NewAllocatorsFromTblInfo(store kv.Storage, schemaID int64, tblInfo *model.T
var allocs []Allocator
dbID := tblInfo.GetDBID(schemaID)
idCacheOpt := CustomAutoIncCacheOption(tblInfo.AutoIdCache)
tblVer := AllocOptionTableInfoVersion(tblInfo.Version)

hasRowID := !tblInfo.PKIsHandle && !tblInfo.IsCommonHandle
hasAutoIncID := tblInfo.GetAutoIncrementColInfo() != nil
if hasRowID || hasAutoIncID {
alloc := NewAllocator(store, dbID, tblInfo.IsAutoIncColUnsigned(), RowIDAllocType, idCacheOpt)
alloc := NewAllocator(store, dbID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(), RowIDAllocType, idCacheOpt, tblVer)
allocs = append(allocs, alloc)
}
hasAutoRandID := tblInfo.ContainsAutoRandomBits()
if hasAutoRandID {
alloc := NewAllocator(store, dbID, tblInfo.IsAutoRandomBitColUnsigned(), AutoRandomType, idCacheOpt)
alloc := NewAllocator(store, dbID, tblInfo.ID, tblInfo.IsAutoRandomBitColUnsigned(), AutoRandomType, idCacheOpt, tblVer)
allocs = append(allocs, alloc)
}
if tblInfo.IsSequence() {
allocs = append(allocs, NewSequenceAllocator(store, dbID, tblInfo.Sequence))
allocs = append(allocs, NewSequenceAllocator(store, dbID, tblInfo.ID, tblInfo.Sequence))
}
return NewAllocators(allocs...)
}
Expand All @@ -525,8 +539,8 @@ func NewAllocatorsFromTblInfo(store kv.Storage, schemaID int64, tblInfo *model.T
// but actually we don't care about it, all we need is to calculate the new autoID corresponding to the
// increment and offset at this time now. To simplify the rule is like (ID - offset) % increment = 0,
// so the first autoID should be 9, then add increment to it to get 13.
func (alloc *allocator) Alloc(ctx context.Context, tableID int64, n uint64, increment, offset int64) (int64, int64, error) {
if tableID == 0 {
func (alloc *allocator) Alloc(ctx context.Context, n uint64, increment, offset int64) (int64, int64, error) {
if alloc.tbID == 0 {
return 0, 0, errInvalidTableID.GenWithStackByArgs("Invalid tableID")
}
if n == 0 {
Expand All @@ -540,9 +554,9 @@ func (alloc *allocator) Alloc(ctx context.Context, tableID int64, n uint64, incr
alloc.mu.Lock()
defer alloc.mu.Unlock()
if alloc.isUnsigned {
return alloc.alloc4Unsigned(ctx, tableID, n, increment, offset)
return alloc.alloc4Unsigned(ctx, alloc.tbID, n, increment, offset)
}
return alloc.alloc4Signed(ctx, tableID, n, increment, offset)
return alloc.alloc4Signed(ctx, alloc.tbID, n, increment, offset)
}

func (alloc *allocator) AllocSeqCache(tableID int64) (int64, int64, int64, error) {
Expand Down
Loading