Skip to content

Commit

Permalink
support tcc branch commit & rollback (#41)
Browse files Browse the repository at this point in the history
* support tcc branch commit & rollback
  • Loading branch information
bohehe authored May 9, 2022
1 parent c0bfdf9 commit feab7ae
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 17 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ require (
github.com/dlclark/regexp2 v1.4.1-0.20201116162257-a2a8dda75c91 // indirect
github.com/go-logr/logr v1.2.0 // indirect
github.com/go-ole/go-ole v1.2.4 // indirect
github.com/go-resty/resty/v2 v2.7.0 // indirect
github.com/go-sourcemap/sourcemap v2.1.3+incompatible // indirect
github.com/gobuffalo/envy v1.7.0 // indirect
github.com/gobuffalo/packd v0.3.0 // indirect
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,8 @@ github.com/go-playground/locales v0.12.1/go.mod h1:IUMDtCfWo/w/mtMfIE/IG2K+Ey3yg
github.com/go-playground/overalls v0.0.0-20180201144345-22ec1a223b7c/go.mod h1:UqxAgEOt89sCiXlrc/ycnx00LVvUO/eS8tMUkWX4R7w=
github.com/go-playground/universal-translator v0.16.0/go.mod h1:1AnU7NaIRDWWzGEKwgtJRd2xk99HeFyHw3yid4rvQIY=
github.com/go-resty/resty/v2 v2.6.0/go.mod h1:PwvJS6hvaPkjtjNg9ph+VrSD92bi5Zq73w/BIH7cC3Q=
github.com/go-resty/resty/v2 v2.7.0 h1:me+K9p3uhSmXtrBZ4k9jcEAfJmuC8IivWHwaLZwPrFY=
github.com/go-resty/resty/v2 v2.7.0/go.mod h1:9PWDzw47qPphMRFfhsyk0NnSgvluHcljSMVIq3w7q0I=
github.com/go-sourcemap/sourcemap v2.1.3+incompatible h1:W1iEw64niKVGogNgBN3ePyLFfuisuzeidWPMPWmECqU=
github.com/go-sourcemap/sourcemap v2.1.3+incompatible/go.mod h1:F8jJfvm2KbVjc5NqelyYJmf/v5J0dwNLS2mL4sNA1Jg=
github.com/go-sql-driver/mysql v1.3.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
Expand Down Expand Up @@ -1118,6 +1120,7 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v
golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211029224645-99673261e6eb/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211209124913-491a49abca63/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220225172249-27dd8689420f h1:oA4XRj0qtSt8Yo1Zms0CUlsT3KG69V2UGQWPBxujDmc=
Expand Down
5 changes: 0 additions & 5 deletions pkg/filter/dt/context.go → pkg/dt/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,6 @@ import (
"github.com/cectc/dbpack/pkg/log"
)

var (
VarHost = "host"
VarQueryString = "queryString"
)

type RequestContext struct {
ActionContext map[string]string
Headers []byte
Expand Down
106 changes: 104 additions & 2 deletions pkg/dt/distributed_transaction_manger.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@ package dt

import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/url"
"time"

"github.com/go-resty/resty/v2"
"k8s.io/client-go/util/workqueue"

"github.com/cectc/dbpack/pkg/config"
Expand All @@ -32,9 +36,22 @@ import (
"github.com/cectc/dbpack/pkg/resource"
)

const DefaultRetryDeadThreshold = 130 * 1000
const (
// CommitRequestPath represents for tcc commit request path
CommitRequestPath = "tcc_commit_request_path"

var manager *DistributedTransactionManager
// RollbackRequestPath represents for tcc rollback request path
RollbackRequestPath = "tcc_rollback_request_path"

// DefaultRetryDeadThreshold is max retry milliseconds
DefaultRetryDeadThreshold = 130 * 1000
)

var (
VarHost = "host"
VarQueryString = "queryString"
manager *DistributedTransactionManager
)

func InitDistributedTransactionManager(conf *config.DistributedTransaction, storageDriver storage.Driver) {
if conf.RetryDeadThreshold == 0 {
Expand Down Expand Up @@ -143,6 +160,9 @@ func (manager *DistributedTransactionManager) IsLockable(ctx context.Context, re
}

func (manager *DistributedTransactionManager) branchCommit(bs *api.BranchSession) (api.BranchSession_BranchStatus, error) {
if bs.Type == api.TCC {
return manager.tccBranchCommit(bs)
}
db := resource.GetDBManager().GetDB(bs.ResourceID)
if db == nil {
return 0, fmt.Errorf("DB resource is not exist, db name: %s", bs.ResourceID)
Expand All @@ -159,6 +179,9 @@ func (manager *DistributedTransactionManager) branchCommit(bs *api.BranchSession
}

func (manager *DistributedTransactionManager) branchRollback(bs *api.BranchSession) (api.BranchSession_BranchStatus, error) {
if bs.Type == api.TCC {
return manager.tccBranchRollback(bs)
}
status, lockKeys, err := manager._branchRollback(bs)
if len(lockKeys) > 0 {
if _, err := manager.storageDriver.ReleaseLockKeys(context.Background(), bs.ResourceID, lockKeys); err != nil {
Expand Down Expand Up @@ -368,3 +391,82 @@ func isGlobalSessionTimeout(gs *api.GlobalSession) bool {
func (manager *DistributedTransactionManager) IsRollingBackDead(bs *api.BranchSession) bool {
return (misc.CurrentTimeMillis() - uint64(bs.BeginTime)) > uint64(manager.retryDeadThreshold)
}

func (manager *DistributedTransactionManager) tccBranchCommit(bs *api.BranchSession) (api.BranchSession_BranchStatus, error) {
requestContext := &RequestContext{
ActionContext: make(map[string]string),
Headers: []byte{},
Body: []byte{},
}
err := requestContext.Decode(bs.ApplicationData)
if err != nil {
return api.PhaseTwoCommitting, fmt.Errorf("error decoding bs.ApplicationData: %v", err)
}

resp, err := manager.doHttpRequest(requestContext, true)
if err != nil {
return api.PhaseTwoCommitting, fmt.Errorf("error doHttpRequest for tccBranchCommit: %v", err)
}
if resp.StatusCode() != http.StatusOK {
return api.PhaseTwoCommitting, fmt.Errorf("error tccBranchCommit response code %d", resp.StatusCode())
}
return api.Complete, nil
}

func (manager *DistributedTransactionManager) tccBranchRollback(bs *api.BranchSession) (api.BranchSession_BranchStatus, error) {
requestContext := &RequestContext{
ActionContext: make(map[string]string),
Headers: []byte{},
Body: []byte{},
}
err := requestContext.Decode(bs.ApplicationData)
if err != nil {
return api.PhaseTwoRollbacking, fmt.Errorf("error decoding bs.ApplicationData: %v", err)
}

resp, err := manager.doHttpRequest(requestContext, false)
if err != nil {
return api.PhaseTwoRollbacking, fmt.Errorf("error doHttpRequest for tccBranchRollback: %v", err)
}
if resp.StatusCode() != http.StatusOK {
return api.PhaseTwoRollbacking, fmt.Errorf("error tccBranchRollback response code %d", resp.StatusCode())
}
return api.Complete, nil
}

func (manager *DistributedTransactionManager) doHttpRequest(requestContext *RequestContext, commit bool) (*resty.Response, error) {
var (
host string
path string
queryString string
)
host = requestContext.ActionContext[VarHost]
if commit {
path = requestContext.ActionContext[CommitRequestPath]
} else {
path = requestContext.ActionContext[RollbackRequestPath]
}

u := url.URL{
Scheme: "http",
Path: path,
Host: host,
}
queryString, ok := requestContext.ActionContext[VarQueryString]
if ok {
u.RawQuery = queryString
}

client := resty.New()
request := client.R()

headers := make(map[string]string)
err := json.Unmarshal(requestContext.Headers, &headers)
if err != nil {
return nil, fmt.Errorf("error json.Unmarshal requestContext.Headers: %v", err)
}
request.SetHeaders(headers)
request.SetBody(requestContext.Body)

return request.Post(u.String())
}
15 changes: 5 additions & 10 deletions pkg/filter/dt/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,6 @@ import (
"github.com/cectc/dbpack/pkg/log"
)

const (
CommitRequestPath = "tcc_commit_request_path"
RollbackRequestPath = "tcc_rollback_request_path"
)

// handleHttp1GlobalBegin return bool, represent whether continue
func (f *_httpFilter) handleHttp1GlobalBegin(ctx *fasthttp.RequestCtx, transactionInfo *TransactionInfo) (bool, error) {
// todo support transaction isolation level
Expand Down Expand Up @@ -76,18 +71,18 @@ func (f *_httpFilter) handleHttp1BranchRegister(ctx *fasthttp.RequestCtx, tccRes

bodyBytes := ctx.PostBody()

requestContext := &RequestContext{
requestContext := &dt.RequestContext{
ActionContext: make(map[string]string),
Headers: ctx.Request.Header.Header(),
Body: bodyBytes,
}

requestContext.ActionContext[VarHost] = f.conf.BackendHost
requestContext.ActionContext[CommitRequestPath] = tccResource.CommitRequestPath
requestContext.ActionContext[RollbackRequestPath] = tccResource.RollbackRequestPath
requestContext.ActionContext[dt.VarHost] = f.conf.BackendHost
requestContext.ActionContext[dt.CommitRequestPath] = tccResource.CommitRequestPath
requestContext.ActionContext[dt.RollbackRequestPath] = tccResource.RollbackRequestPath
queryString := ctx.Request.RequestURI()
if string(queryString) != "" {
requestContext.ActionContext[VarQueryString] = string(queryString)
requestContext.ActionContext[dt.VarQueryString] = string(queryString)
}

data, err := requestContext.Encode()
Expand Down

0 comments on commit feab7ae

Please sign in to comment.