Skip to content

Commit

Permalink
Merge branch 'master' into optmize-heartbeat
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Apr 24, 2024
2 parents f67c8a8 + 1e65f9d commit 15b1979
Show file tree
Hide file tree
Showing 45 changed files with 380 additions and 324 deletions.
16 changes: 3 additions & 13 deletions .github/workflows/check.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,11 @@ jobs:
runs-on: ubuntu-latest
timeout-minutes: 20
steps:
- uses: actions/setup-go@v3
with:
go-version: '1.21'
- name: Checkout code
uses: actions/checkout@v3
- name: Restore cache
uses: actions/cache@v3
uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
path: |
~/go/pkg/mod
~/.cache/go-build
**/.dashboard_download_cache
key: ${{ runner.os }}-golang-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-golang
go-version: '1.21'
- name: Make Check
run: |
SWAGGER=1 make build
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/label.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ jobs:
add_labels:
runs-on: ubuntu-latest
steps:
- uses: actions/github-script@v4
- uses: actions/github-script@v7
name: Add labels
with:
script: |
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/pd-docker-image.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ jobs:
strategy:
fail-fast: true
steps:
- uses: actions/setup-go@v3
- name: Checkout code
uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
go-version: '1.21'
- name: Checkout code
uses: actions/checkout@v3
- name: Make
run: make docker-image
26 changes: 9 additions & 17 deletions .github/workflows/pd-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,11 @@ jobs:
outputs:
job-total: 13
steps:
- uses: actions/setup-go@v3
with:
go-version: '1.21'
- name: Checkout code
uses: actions/checkout@v3
- name: Restore cache
uses: actions/cache@v3
uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
path: |
~/go/pkg/mod
~/.cache/go-build
**/.tools
**/.dashboard_download_cache
key: ${{ runner.os }}-go-${{ matrix.worker_id }}-${{ hashFiles('**/go.sum') }}
go-version: '1.21'
- name: Make Test
env:
WORKER_ID: ${{ matrix.worker_id }}
Expand All @@ -53,20 +44,21 @@ jobs:
mv covprofile covprofile_$WORKER_ID
sed -i "/failpoint_binding/d" covprofile_$WORKER_ID
- name: Upload coverage result ${{ matrix.worker_id }}
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v4
with:
name: cover-reports
name: cover-reports-${{ matrix.worker_id }}
path: covprofile_${{ matrix.worker_id }}
report-coverage:
needs: chunks
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v3
uses: actions/checkout@v4
- name: Download chunk report
uses: actions/download-artifact@v2
uses: actions/download-artifact@v4
with:
name: cover-reports
pattern: cover-reports-*
merge-multiple: true
- name: Merge
env:
TOTAL_JOBS: ${{needs.chunks.outputs.job-total}}
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/tso-consistency-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ jobs:
tso-consistency-test:
runs-on: ubuntu-latest
steps:
- uses: actions/setup-go@v3
- name: Checkout code
uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
go-version: '1.21'
- name: Checkout code
uses: actions/checkout@v3
- name: Make TSO Consistency Test
run: make test-tso-consistency
6 changes: 3 additions & 3 deletions .github/workflows/tso-function-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ jobs:
tso-function-test:
runs-on: ubuntu-latest
steps:
- uses: actions/setup-go@v3
- name: Checkout code
uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
go-version: '1.21'
- name: Checkout code
uses: actions/checkout@v3
- name: Make TSO Function Test
run: make test-tso-function
33 changes: 14 additions & 19 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -798,34 +798,19 @@ func (c *client) GetLocalTSAsync(ctx context.Context, dcLocation string) TSFutur
defer span.Finish()
}

req := c.getTSORequest(ctx, dcLocation)
if err := c.dispatchTSORequestWithRetry(req); err != nil {
req.tryDone(err)
}
return req
}

func (c *client) getTSORequest(ctx context.Context, dcLocation string) *tsoRequest {
req := tsoReqPool.Get().(*tsoRequest)
// Set needed fields in the request before using it.
req.start = time.Now()
req.clientCtx = c.ctx
req.requestCtx = ctx
req.physical = 0
req.logical = 0
req.dcLocation = dcLocation
return req
return c.dispatchTSORequestWithRetry(ctx, dcLocation)
}

const (
dispatchRetryDelay = 50 * time.Millisecond
dispatchRetryCount = 2
)

func (c *client) dispatchTSORequestWithRetry(req *tsoRequest) error {
func (c *client) dispatchTSORequestWithRetry(ctx context.Context, dcLocation string) TSFuture {
var (
retryable bool
err error
req *tsoRequest
)
for i := 0; i < dispatchRetryCount; i++ {
// Do not delay for the first time.
Expand All @@ -838,12 +823,22 @@ func (c *client) dispatchTSORequestWithRetry(req *tsoRequest) error {
err = errs.ErrClientGetTSO.FastGenByArgs("tso client is nil")
continue
}
// Get a new request from the pool if it's nil or not from the current pool.
if req == nil || req.pool != tsoClient.tsoReqPool {
req = tsoClient.getTSORequest(ctx, dcLocation)
}
retryable, err = tsoClient.dispatchRequest(req)
if !retryable {
break
}
}
return err
if err != nil {
if req == nil {
return newTSORequestFastFail(err)
}
req.tryDone(err)
}
return req
}

func (c *client) GetTS(ctx context.Context) (physical int64, logical int64, err error) {
Expand Down
10 changes: 5 additions & 5 deletions client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ require (
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.24.0
golang.org/x/exp v0.0.0-20230711005742-c3f37128e5a4
google.golang.org/grpc v1.59.0
google.golang.org/grpc v1.62.1
google.golang.org/grpc/examples v0.0.0-20231221225426-4f03f3ff32c9
)

Expand All @@ -34,11 +34,11 @@ require (
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.46.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
golang.org/x/net v0.20.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 // indirect
google.golang.org/protobuf v1.32.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
24 changes: 12 additions & 12 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
Expand Down Expand Up @@ -110,8 +110,8 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo=
golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY=
golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs=
golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand All @@ -122,8 +122,8 @@ golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
Expand All @@ -144,16 +144,16 @@ golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 h1:Jyp0Hsi0bmHXG6k9eATXoYtjd6e2UzZ1SCn/wIupY14=
google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17/go.mod h1:oQ5rr10WTTMvP4A36n8JpR1OrO1BEiV4f78CneXZxkA=
google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk=
google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 h1:AjyfHzEPEFp/NpvfN5g+KDla3EMojjhRVZc1i7cj+oM=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80/go.mod h1:PAREbraiVEVGVdTZsVWjSbbTtSyGbAgIIvni8a8CD5s=
google.golang.org/grpc v1.62.1 h1:B4n+nfKzOICUXMgyrNd19h/I9oH0L1pizfk1d4zSgTk=
google.golang.org/grpc v1.62.1/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE=
google.golang.org/grpc/examples v0.0.0-20231221225426-4f03f3ff32c9 h1:ATnmU8nL2NfIyTSiBvJVDIDIr3qBmeW+c7z7XU21eWs=
google.golang.org/grpc/examples v0.0.0-20231221225426-4f03f3ff32c9/go.mod h1:j5uROIAAgi3YmtiETMt1LW0d/lHqQ7wwrIY4uGRXLQ4=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I=
google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
61 changes: 29 additions & 32 deletions client/tso_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,33 +43,6 @@ type TSOClient interface {
GetMinTS(ctx context.Context) (int64, int64, error)
}

type tsoRequest struct {
start time.Time
clientCtx context.Context
requestCtx context.Context
done chan error
physical int64
logical int64
dcLocation string
}

var tsoReqPool = sync.Pool{
New: func() any {
return &tsoRequest{
done: make(chan error, 1),
physical: 0,
logical: 0,
}
},
}

func (req *tsoRequest) tryDone(err error) {
select {
case req.done <- err:
default:
}
}

type tsoClient struct {
ctx context.Context
cancel context.CancelFunc
Expand All @@ -84,6 +57,8 @@ type tsoClient struct {
// tso allocator leader is switched.
tsoAllocServingURLSwitchedCallback []func()

// tsoReqPool is the pool to recycle `*tsoRequest`.
tsoReqPool *sync.Pool
// tsoDispatcher is used to dispatch different TSO requests to
// the corresponding dc-location TSO channel.
tsoDispatcher sync.Map // Same as map[string]*tsoDispatcher
Expand All @@ -104,11 +79,20 @@ func newTSOClient(
) *tsoClient {
ctx, cancel := context.WithCancel(ctx)
c := &tsoClient{
ctx: ctx,
cancel: cancel,
option: option,
svcDiscovery: svcDiscovery,
tsoStreamBuilderFactory: factory,
ctx: ctx,
cancel: cancel,
option: option,
svcDiscovery: svcDiscovery,
tsoStreamBuilderFactory: factory,
tsoReqPool: &sync.Pool{
New: func() any {
return &tsoRequest{
done: make(chan error, 1),
physical: 0,
logical: 0,
}
},
},
checkTSDeadlineCh: make(chan struct{}),
checkTSODispatcherCh: make(chan struct{}, 1),
updateTSOConnectionCtxsCh: make(chan struct{}, 1),
Expand Down Expand Up @@ -155,6 +139,19 @@ func (c *tsoClient) Close() {
log.Info("tso client is closed")
}

func (c *tsoClient) getTSORequest(ctx context.Context, dcLocation string) *tsoRequest {
req := c.tsoReqPool.Get().(*tsoRequest)
// Set needed fields in the request before using it.
req.start = time.Now()
req.pool = c.tsoReqPool
req.requestCtx = ctx
req.clientCtx = c.ctx
req.physical = 0
req.logical = 0
req.dcLocation = dcLocation
return req
}

// GetTSOAllocators returns {dc-location -> TSO allocator leader URL} connection map
func (c *tsoClient) GetTSOAllocators() *sync.Map {
return &c.tsoAllocators
Expand Down
32 changes: 0 additions & 32 deletions client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,38 +115,6 @@ func (c *tsoClient) dispatchRequest(request *tsoRequest) (bool, error) {
return false, nil
}

// TSFuture is a future which promises to return a TSO.
type TSFuture interface {
// Wait gets the physical and logical time, it would block caller if data is not available yet.
Wait() (int64, int64, error)
}

func (req *tsoRequest) Wait() (physical int64, logical int64, err error) {
// If tso command duration is observed very high, the reason could be it
// takes too long for Wait() be called.
start := time.Now()
cmdDurationTSOAsyncWait.Observe(start.Sub(req.start).Seconds())
select {
case err = <-req.done:
defer trace.StartRegion(req.requestCtx, "pdclient.tsoReqDone").End()
err = errors.WithStack(err)
defer tsoReqPool.Put(req)
if err != nil {
cmdFailDurationTSO.Observe(time.Since(req.start).Seconds())
return 0, 0, err
}
physical, logical = req.physical, req.logical
now := time.Now()
cmdDurationWait.Observe(now.Sub(start).Seconds())
cmdDurationTSO.Observe(now.Sub(req.start).Seconds())
return
case <-req.requestCtx.Done():
return 0, 0, errors.WithStack(req.requestCtx.Err())
case <-req.clientCtx.Done():
return 0, 0, errors.WithStack(req.clientCtx.Err())
}
}

func (c *tsoClient) updateTSODispatcher() {
// Set up the new TSO dispatcher and batch controller.
c.GetTSOAllocators().Range(func(dcLocationKey, _ any) bool {
Expand Down
Loading

0 comments on commit 15b1979

Please sign in to comment.