Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into region-health
Browse files Browse the repository at this point in the history
  • Loading branch information
nolouch committed Apr 30, 2024
2 parents 3acec5e + 8ee18a4 commit c6d2fb9
Show file tree
Hide file tree
Showing 10 changed files with 259 additions and 121 deletions.
68 changes: 41 additions & 27 deletions client/tso_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,14 @@ func (c *tsoClient) getTSORequest(ctx context.Context, dcLocation string) *tsoRe
return req
}

func (c *tsoClient) getTSODispatcher(dcLocation string) (*tsoDispatcher, bool) {
dispatcher, ok := c.tsoDispatcher.Load(dcLocation)
if !ok || dispatcher == nil {
return nil, false
}
return dispatcher.(*tsoDispatcher), true
}

// GetTSOAllocators returns {dc-location -> TSO allocator leader URL} connection map
func (c *tsoClient) GetTSOAllocators() *sync.Map {
return &c.tsoAllocators
Expand Down Expand Up @@ -259,6 +267,7 @@ func (c *tsoClient) updateTSOGlobalServURL(url string) error {
log.Info("[tso] switch dc tso global allocator serving url",
zap.String("dc-location", globalDCLocation),
zap.String("new-url", url))
c.scheduleUpdateTSOConnectionCtxs()
c.scheduleCheckTSODispatcher()
return nil
}
Expand Down Expand Up @@ -333,40 +342,41 @@ func (c *tsoClient) updateTSOConnectionCtxs(updaterCtx context.Context, dc strin
// while a new daemon will be created also to switch back to a normal leader connection ASAP the
// connection comes back to normal.
func (c *tsoClient) tryConnectToTSO(
dispatcherCtx context.Context,
ctx context.Context,
dc string,
connectionCtxs *sync.Map,
) error {
var (
networkErrNum uint64
err error
stream tsoStream
url string
cc *grpc.ClientConn
)
updateAndClear := func(newURL string, connectionCtx *tsoConnectionContext) {
if cc, loaded := connectionCtxs.LoadOrStore(newURL, connectionCtx); loaded {
// If the previous connection still exists, we should close it first.
cc.(*tsoConnectionContext).cancel()
connectionCtxs.Store(newURL, connectionCtx)
networkErrNum uint64
err error
stream tsoStream
url string
cc *grpc.ClientConn
updateAndClear = func(newURL string, connectionCtx *tsoConnectionContext) {
// Only store the `connectionCtx` if it does not exist before.
connectionCtxs.LoadOrStore(newURL, connectionCtx)
// Remove all other `connectionCtx`s.
connectionCtxs.Range(func(url, cc any) bool {
if url.(string) != newURL {
cc.(*tsoConnectionContext).cancel()
connectionCtxs.Delete(url)
}
return true
})
}
connectionCtxs.Range(func(url, cc any) bool {
if url.(string) != newURL {
cc.(*tsoConnectionContext).cancel()
connectionCtxs.Delete(url)
}
return true
})
}
// retry several times before falling back to the follower when the network problem happens
)

ticker := time.NewTicker(retryInterval)
defer ticker.Stop()
// Retry several times before falling back to the follower when the network problem happens
for i := 0; i < maxRetryTimes; i++ {
c.svcDiscovery.ScheduleCheckMemberChanged()
cc, url = c.GetTSOAllocatorClientConnByDCLocation(dc)
if _, ok := connectionCtxs.Load(url); ok {
return nil
}
if cc != nil {
cctx, cancel := context.WithCancel(dispatcherCtx)
cctx, cancel := context.WithCancel(ctx)
stream, err = c.tsoStreamBuilderFactory.makeBuilder(cc).build(cctx, cancel, c.option.timeout)
failpoint.Inject("unreachableNetwork", func() {
stream = nil
Expand All @@ -392,7 +402,7 @@ func (c *tsoClient) tryConnectToTSO(
networkErrNum++
}
select {
case <-dispatcherCtx.Done():
case <-ctx.Done():
return err
case <-ticker.C:
}
Expand All @@ -409,14 +419,14 @@ func (c *tsoClient) tryConnectToTSO(
}

// create the follower stream
cctx, cancel := context.WithCancel(dispatcherCtx)
cctx, cancel := context.WithCancel(ctx)
cctx = grpcutil.BuildForwardContext(cctx, forwardedHost)
stream, err = c.tsoStreamBuilderFactory.makeBuilder(backupClientConn).build(cctx, cancel, c.option.timeout)
if err == nil {
forwardedHostTrim := trimHTTPPrefix(forwardedHost)
addr := trimHTTPPrefix(backupURL)
// the goroutine is used to check the network and change back to the original stream
go c.checkAllocator(dispatcherCtx, cancel, dc, forwardedHostTrim, addr, url, updateAndClear)
go c.checkAllocator(ctx, cancel, dc, forwardedHostTrim, addr, url, updateAndClear)
requestForwarded.WithLabelValues(forwardedHostTrim, addr).Set(1)
updateAndClear(backupURL, &tsoConnectionContext{backupURL, stream, cctx, cancel})
return nil
Expand All @@ -429,7 +439,11 @@ func (c *tsoClient) tryConnectToTSO(

// tryConnectToTSOWithProxy will create multiple streams to all the service endpoints to work as
// a TSO proxy to reduce the pressure of the main serving service endpoint.
func (c *tsoClient) tryConnectToTSOWithProxy(dispatcherCtx context.Context, dc string, connectionCtxs *sync.Map) error {
func (c *tsoClient) tryConnectToTSOWithProxy(
ctx context.Context,
dc string,
connectionCtxs *sync.Map,
) error {
tsoStreamBuilders := c.getAllTSOStreamBuilders()
leaderAddr := c.svcDiscovery.GetServingURL()
forwardedHost, ok := c.GetTSOAllocatorServingURLByDCLocation(dc)
Expand All @@ -455,7 +469,7 @@ func (c *tsoClient) tryConnectToTSOWithProxy(dispatcherCtx context.Context, dc s
}
log.Info("[tso] try to create tso stream",
zap.String("dc", dc), zap.String("addr", addr))
cctx, cancel := context.WithCancel(dispatcherCtx)
cctx, cancel := context.WithCancel(ctx)
// Do not proxy the leader client.
if addr != leaderAddr {
log.Info("[tso] use follower to forward tso stream to do the proxy",
Expand Down
Loading

0 comments on commit c6d2fb9

Please sign in to comment.