Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: support mysql compatible auto_increment, the client side changes #38809

Merged
merged 22 commits into from
Nov 4, 2022
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion ddl/schematracker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package schematracker
import (
"bytes"
"context"
"crypto/tls"
"fmt"
"strings"
"time"
Expand Down Expand Up @@ -553,15 +554,37 @@ func (d Checker) MoveJobFromTable2Queue() error {
// StorageDDLInjector wraps kv.Storage to inject checker to domain's DDL in bootstrap time.
type StorageDDLInjector struct {
kv.Storage
kv.EtcdBackend
Injector func(ddl.DDL) *Checker
}

var _ kv.EtcdBackend = StorageDDLInjector{}

// EtcdAddrs implements the kv.EtcdBackend interface.
func (s StorageDDLInjector) EtcdAddrs() ([]string, error) {
return s.EtcdBackend.EtcdAddrs()
}

// TLSConfig implements the kv.EtcdBackend interface.
func (s StorageDDLInjector) TLSConfig() *tls.Config {
return s.EtcdBackend.TLSConfig()
}

// StartGCWorker implements the kv.EtcdBackend interface.
func (s StorageDDLInjector) StartGCWorker() error {
return s.EtcdBackend.StartGCWorker()
}

// NewStorageDDLInjector creates a new StorageDDLInjector to inject Checker.
func NewStorageDDLInjector(s kv.Storage) kv.Storage {
return StorageDDLInjector{
ret := StorageDDLInjector{
Storage: s,
Injector: NewChecker,
}
if ebd, ok := s.(kv.EtcdBackend); ok {
ret.EtcdBackend = ebd
}
return ret
}

// UnwrapStorage unwraps StorageDDLInjector for one level.
Expand Down
3 changes: 2 additions & 1 deletion infoschema/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,8 @@ func (b *Builder) applyCreateTable(m *meta.Meta, dbInfo *model.DBInfo, tableID i
tblVer := autoid.AllocOptionTableInfoVersion(tblInfo.Version)
switch tp {
case model.ActionRebaseAutoID, model.ActionModifyTableAutoIdCache:
newAlloc := autoid.NewAllocator(b.store, dbInfo.ID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(), autoid.RowIDAllocType, tblVer)
idCacheOpt := autoid.CustomAutoIncCacheOption(tblInfo.AutoIdCache)
newAlloc := autoid.NewAllocator(b.store, dbInfo.ID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(), autoid.RowIDAllocType, tblVer, idCacheOpt)
allocs = append(allocs, newAlloc)
case model.ActionRebaseAutoRandomBase:
newAlloc := autoid.NewAllocator(b.store, dbInfo.ID, tblInfo.ID, tblInfo.IsAutoRandomBitColUnsigned(), autoid.AutoRandomType, tblVer)
Expand Down
6 changes: 6 additions & 0 deletions meta/autoid/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ go_library(
name = "autoid",
srcs = [
"autoid.go",
"autoid_service.go",
"errors.go",
"memid.go",
],
importpath = "github.com/pingcap/tidb/meta/autoid",
visibility = ["//visibility:public"],
deps = [
"//autoid_service",
"//errno",
"//kv",
"//meta",
Expand All @@ -24,8 +26,12 @@ go_library(
"@com_github_opentracing_opentracing_go//:opentracing-go",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/autoid",
"@com_github_tikv_client_go_v2//txnkv/txnsnapshot",
"@com_github_tikv_client_go_v2//util",
"@io_etcd_go_etcd_client_v3//:client",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//credentials/insecure",
"@org_uber_go_zap//:zap",
],
)
Expand Down
53 changes: 53 additions & 0 deletions meta/autoid/autoid.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
autoid "github.com/pingcap/tidb/autoid_service"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/metrics"
Expand All @@ -38,6 +39,7 @@ import (
"github.com/pingcap/tidb/util/mathutil"
"github.com/tikv/client-go/v2/txnkv/txnsnapshot"
tikvutil "github.com/tikv/client-go/v2/util"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -466,6 +468,7 @@ func (alloc *allocator) ForceRebase(requiredBase int64) error {
if requiredBase == -1 {
return ErrAutoincReadFailed.GenWithStack("Cannot force rebase the next global ID to '0'")
}

tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
alloc.mu.Lock()
defer alloc.mu.Unlock()
startTime := time.Now()
Expand Down Expand Up @@ -533,6 +536,47 @@ func NextStep(curStep int64, consumeDur time.Duration) int64 {
return res
}

func newSinglePointAlloc(store kv.Storage, dbID, tblID int64, isUnsigned bool) *singlePointAlloc {
ebd, ok := store.(kv.EtcdBackend)
if !ok {
// newSinglePointAlloc fail because not etcd background
// This could happen in the server package unit test
return nil
}

addrs, err := ebd.EtcdAddrs()
if err != nil {
panic(err)
}
spa := &singlePointAlloc{
dbID: dbID,
tblID: tblID,
isUnsigned: isUnsigned,
}
if len(addrs) > 0 {
etcdCli, err := clientv3.New(clientv3.Config{
Endpoints: addrs,
TLS: ebd.TLSConfig(),
})
if err != nil {
logutil.BgLogger().Error("[autoid client] fail to connect etcd, fallback to default", zap.Error(err))
return nil
}
spa.clientDiscover = clientDiscover{etcdCli: etcdCli}
} else {
spa.clientDiscover = clientDiscover{}
spa.mu.AutoIDAllocClient = autoid.MockForTest(store)
}

// mockAutoIDChange failpoint is not implemented in this allocator, so fallback to use the default one.
failpoint.Inject("mockAutoIDChange", func(val failpoint.Value) {
if val.(bool) {
spa = nil
}
})
return spa
}

// NewAllocator returns a new auto increment id generator on the store.
func NewAllocator(store kv.Storage, dbID, tbID int64, isUnsigned bool,
allocType AllocatorType, opts ...AllocOption) Allocator {
Expand All @@ -548,6 +592,15 @@ func NewAllocator(store kv.Storage, dbID, tbID int64, isUnsigned bool,
for _, fn := range opts {
fn.ApplyOn(alloc)
}

// Use the MySQL compatible AUTO_INCREMENT mode.
if allocType == RowIDAllocType && alloc.customStep && alloc.step == 1 {
alloc1 := newSinglePointAlloc(store, dbID, tbID, isUnsigned)
if alloc1 != nil {
return alloc1
}
}

return alloc
}

Expand Down
Loading