Skip to content

Commit

Permalink
fix: when register branch session, global session can not change (#147)
Browse files Browse the repository at this point in the history
  • Loading branch information
dk-lockdown authored Jun 12, 2022
1 parent 8dd789e commit 11ea3dd
Showing 1 changed file with 27 additions and 12 deletions.
39 changes: 27 additions & 12 deletions pkg/dt/storage/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (s *store) AddBranchSession(ctx context.Context, branchSession *api.BranchS
return err
}

gs, err := s.GetGlobalSession(ctx, branchSession.XID)
gs, modRevision, err := s.getGlobalSession(ctx, branchSession.XID)
if err != nil {
if errors.Is(err, err2.CouldNotFoundGlobalTransaction) {
return err2.GlobalTransactionFinished
Expand All @@ -118,20 +118,20 @@ func (s *store) AddBranchSession(ctx context.Context, branchSession *api.BranchS
}

txn := s.client.Txn(ctx)
comparisons := make([]clientv3.Cmp, 0)
ops := make([]clientv3.Op, 0)

comparisons = append(comparisons, clientv3.Compare(clientv3.ModRevision(branchSession.XID), "=", modRevision))
ops = append(ops, clientv3.OpPut(branchSession.BranchID, string(data)))
// 全局事务关联的事务分支
// branch transactions of global transaction
globalBranchKey := fmt.Sprintf("bs/%s/%d", branchSession.XID, branchSession.BranchSessionID)
ops = append(ops, clientv3.OpPut(globalBranchKey, branchSession.BranchID))

if branchSession.Type == api.AT && branchSession.LockKey != "" {
rowKeys := misc.CollectRowKeys(branchSession.LockKey, branchSession.ResourceID)

var cmpSlice []clientv3.Cmp
for _, rowKey := range rowKeys {
cmpSlice = append(cmpSlice, notFound(rowKey))
comparisons = append(comparisons, notFound(rowKey))
}
txn = txn.If(cmpSlice...)

for _, rowKey := range rowKeys {
lockKey := fmt.Sprintf("lk/%s/%s", branchSession.XID, rowKey)
Expand All @@ -140,6 +140,7 @@ func (s *store) AddBranchSession(ctx context.Context, branchSession *api.BranchS
}
}

txn = txn.If(comparisons...)
txn.Then(ops...)

txnResp, err := txn.Commit()
Expand Down Expand Up @@ -169,7 +170,11 @@ func (s *store) GlobalCommit(ctx context.Context, xid string) (api.GlobalSession
return api.Begin, err
}
if gs.Status > api.Begin {
return gs.Status, err2.GlobalTransactionFinished
if gs.Status == api.Committing {
return gs.Status, nil
} else {
return gs.Status, err2.GlobalTransactionFinished
}
}
gs.Status = api.Committing
data, err := gs.Marshal()
Expand Down Expand Up @@ -211,7 +216,11 @@ func (s *store) GlobalRollback(ctx context.Context, xid string) (api.GlobalSessi
return api.Begin, err
}
if gs.Status > api.Begin {
return gs.Status, err2.GlobalTransactionFinished
if gs.Status == api.Rollbacking {
return gs.Status, nil
} else {
return gs.Status, err2.GlobalTransactionFinished
}
}
gs.Status = api.Rollbacking
data, err := gs.Marshal()
Expand Down Expand Up @@ -245,19 +254,25 @@ func (s *store) GlobalRollback(ctx context.Context, xid string) (api.GlobalSessi
}

func (s *store) GetGlobalSession(ctx context.Context, xid string) (*api.GlobalSession, error) {
gs, _, err := s.getGlobalSession(ctx, xid)
return gs, err
}

func (s *store) getGlobalSession(ctx context.Context, xid string) (*api.GlobalSession, int64, error) {
resp, err := s.client.Get(ctx, xid, clientv3.WithSerializable())
if err != nil {
return nil, err
return nil, 0, err
}
if len(resp.Kvs) == 0 {
return nil, err2.CouldNotFoundGlobalTransaction
return nil, 0, err2.CouldNotFoundGlobalTransaction
}

globalSession := &api.GlobalSession{}
err = globalSession.Unmarshal(resp.Kvs[0].Value)
if err != nil {
return nil, err
return nil, 0, err
}
return globalSession, nil
return globalSession, resp.Kvs[0].ModRevision, nil
}

func (s *store) ListGlobalSession(ctx context.Context, applicationID string) ([]*api.GlobalSession, error) {
Expand Down

0 comments on commit 11ea3dd

Please sign in to comment.