Skip to content

Commit

Permalink
feature: support leader election, only leader can commit and rollback (
Browse files Browse the repository at this point in the history
  • Loading branch information
dk-lockdown authored Apr 28, 2022
1 parent b89c672 commit d7ab60b
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 10 deletions.
22 changes: 13 additions & 9 deletions pkg/dt/distributed_transaction_manger.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,19 @@ func InitDistributedTransactionManager(conf *config.DistributedTransaction, stor
globalSessionQueue: workqueue.NewDelayingQueue(),
branchSessionQueue: workqueue.New(),
}
if err := manager.processGlobalSessions(); err != nil {
panic(err)
}
if err := manager.processBranchSessions(); err != nil {
panic(err)
}
go manager.processGlobalSessionQueue()
go manager.processBranchSessionQueue()
go manager.watchBranchSession()
go func() {
if storageDriver.LeaderElection(manager.applicationID) {
if err := manager.processGlobalSessions(); err != nil {
log.Fatal(err)
}
if err := manager.processBranchSessions(); err != nil {
log.Fatal(err)
}
go manager.processGlobalSessionQueue()
go manager.processBranchSessionQueue()
go manager.watchBranchSession()
}
}()
}

func GetDistributedTransactionManager() *DistributedTransactionManager {
Expand Down
19 changes: 18 additions & 1 deletion pkg/dt/storage/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/pkg/errors"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"

"github.com/cectc/dbpack/pkg/dt/api"
"github.com/cectc/dbpack/pkg/dt/storage"
Expand All @@ -39,17 +40,23 @@ const (

type store struct {
client *clientv3.Client
session *concurrency.Session
initGlobalSessionRevision int64
initBranchSessionRevision int64
}

func NewEtcdStore(config clientv3.Config) *store {
client, err := clientv3.New(config)
if err != nil {
panic(err)
log.Fatal(err)
}
session, err := concurrency.NewSession(client)
if err != nil {
log.Fatal(err)
}
return &store{
client: client,
session: session,
initGlobalSessionRevision: 0,
initBranchSessionRevision: 0,
}
Expand All @@ -68,6 +75,16 @@ type watchChan struct {
isGlobalSession bool
}

func (s *store) LeaderElection(applicationID string) bool {
e := concurrency.NewElection(s.session, fmt.Sprintf("%s/leader-election/", applicationID))
ctx := context.Background()
// Elect a leader (or wait that the leader resign)
if err := e.Campaign(ctx, "e"); err != nil {
log.Fatal(err)
}
return true
}

func (s *store) AddGlobalSession(ctx context.Context, globalSession *api.GlobalSession) error {
data, err := globalSession.Marshal()
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/dt/storage/storagedriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
)

type Driver interface {
LeaderElection(applicationID string) bool
AddGlobalSession(ctx context.Context, globalSession *api.GlobalSession) error
AddBranchSession(ctx context.Context, branchSession *api.BranchSession) error
GlobalCommit(ctx context.Context, xid string) (api.GlobalSession_GlobalStatus, error)
Expand Down

0 comments on commit d7ab60b

Please sign in to comment.