Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: support dynamic tso service #8517

Closed
wants to merge 32 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,9 @@ func (k *serviceModeKeeper) close() {
defer k.Unlock()
switch k.serviceMode {
case pdpb.ServiceMode_API_SVC_MODE:
k.tsoSvcDiscovery.Close()
if k.tsoSvcDiscovery != nil {
k.tsoSvcDiscovery.Close()
}
fallthrough
case pdpb.ServiceMode_PD_SVC_MODE:
if k.tsoClient != nil {
Expand Down Expand Up @@ -660,31 +662,30 @@ func (c *client) Close() {
}
}

func (c *client) setServiceMode(newMode pdpb.ServiceMode) {
func (c *client) setServiceMode(newMode pdpb.ServiceMode, skipSameMode bool) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer using a more straightforward word.

Suggested change
func (c *client) setServiceMode(newMode pdpb.ServiceMode, skipSameMode bool) {
func (c *client) setServiceMode(newMode pdpb.ServiceMode, force bool) {

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not the same as force.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not skipSameMode always?

c.Lock()
defer c.Unlock()

if c.option.useTSOServerProxy {
// If we are using TSO server proxy, we always use PD_SVC_MODE.
newMode = pdpb.ServiceMode_PD_SVC_MODE
}

if newMode == c.serviceMode {
if skipSameMode && newMode == c.serviceMode {
return
}
log.Info("[pd] changing service mode",
zap.String("old-mode", c.serviceMode.String()),
zap.String("new-mode", newMode.String()))
c.resetTSOClientLocked(newMode)
c.setTSOClientLocked(newMode)
oldMode := c.serviceMode
c.serviceMode = newMode
log.Info("[pd] service mode changed",
zap.String("old-mode", oldMode.String()),
zap.String("new-mode", newMode.String()))
}

// Reset a new TSO client.
func (c *client) resetTSOClientLocked(mode pdpb.ServiceMode) {
// Set a new TSO client.
func (c *client) setTSOClientLocked(mode pdpb.ServiceMode) {
// Re-create a new TSO client.
var (
newTSOCli *tsoClient
Expand Down Expand Up @@ -736,11 +737,11 @@ func (c *client) getTSOClient() *tsoClient {
return c.tsoClient
}

// ResetTSOClient resets the TSO client, only for test.
func (c *client) ResetTSOClient() {
// SetTSOClient sets the TSO client, only for test.
func (c *client) SetTSOClient() {
c.Lock()
defer c.Unlock()
c.resetTSOClientLocked(c.serviceMode)
c.setTSOClientLocked(c.serviceMode)
}

func (c *client) getServiceMode() pdpb.ServiceMode {
Expand Down
4 changes: 4 additions & 0 deletions client/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ const (
RetryTimeoutErr = "retry timeout"
// NotPrimaryErr indicates the non-primary member received the requests which should be received by primary.
NotPrimaryErr = "not primary"
// NotFoundTSOErr indicates the tso address is not found.
NotFoundTSOErr = "not found tso address"
// MaximumRetriesExceededErr indicates the maximum retries exceeded.
MaximumRetriesExceededErr = "maximum number of retries exceeded"
)

// client errors
Expand Down
10 changes: 10 additions & 0 deletions client/errs/errs.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@ func IsLeaderChange(err error) bool {
strings.Contains(errMsg, NotPrimaryErr)
}

// IsServiceModeChange determines whether there is a service mode change.
func IsServiceModeChange(err error) bool {
if err == nil {
return false
}
errMsg := err.Error()
return strings.Contains(errMsg, NotFoundTSOErr) ||
strings.Contains(errMsg, MaximumRetriesExceededErr)
}

// ZapError is used to make the log output easier.
func ZapError(err error, causeError ...error) zap.Field {
if err == nil {
Expand Down
3 changes: 3 additions & 0 deletions client/mock_pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ func (*mockPDServiceDiscovery) GetOrCreateGRPCConn(string) (*grpc.ClientConn, er
// ScheduleCheckMemberChanged implements the ServiceDiscovery interface.
func (*mockPDServiceDiscovery) ScheduleCheckMemberChanged() {}

// ScheduleCheckServiceModeChanged implements the ServiceDiscovery interface.
func (*mockPDServiceDiscovery) ScheduleCheckServiceModeChanged() {}

// CheckMemberChanged implements the ServiceDiscovery interface.
func (*mockPDServiceDiscovery) CheckMemberChanged() error { return nil }

Expand Down
45 changes: 33 additions & 12 deletions client/pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ type ServiceDiscovery interface {
// among the leader/followers in a quorum-based cluster or among the primary/secondaries in a
// primary/secondary configured cluster.
ScheduleCheckMemberChanged()
// ScheduleCheckServiceModeChanged is used to trigger a check to see if the service mode is changed.
ScheduleCheckServiceModeChanged()
// CheckMemberChanged immediately check if there is any membership change among the leader/followers
// in a quorum-based cluster or among the primary/secondaries in a primary/secondary configured cluster.
CheckMemberChanged() error
Expand Down Expand Up @@ -420,7 +422,7 @@ type pdServiceDiscovery struct {
clientConns sync.Map // Store as map[string]*grpc.ClientConn

// serviceModeUpdateCb will be called when the service mode gets updated
serviceModeUpdateCb func(pdpb.ServiceMode)
serviceModeUpdateCb func(pdpb.ServiceMode, bool)
// leaderSwitchedCbs will be called after the leader switched
leaderSwitchedCbs []func()
// membersChangedCbs will be called after there is any membership change in the
Expand All @@ -433,7 +435,8 @@ type pdServiceDiscovery struct {
// leader is updated.
tsoGlobalAllocLeaderUpdatedCb tsoGlobalServURLUpdatedFunc

checkMembershipCh chan struct{}
checkMembershipCh chan struct{}
checkServiceModeCh chan struct{}

wg *sync.WaitGroup
ctx context.Context
Expand All @@ -460,13 +463,14 @@ func NewDefaultPDServiceDiscovery(
func newPDServiceDiscovery(
ctx context.Context, cancel context.CancelFunc,
wg *sync.WaitGroup,
serviceModeUpdateCb func(pdpb.ServiceMode),
serviceModeUpdateCb func(pdpb.ServiceMode, bool),
updateKeyspaceIDCb updateKeyspaceIDFunc,
keyspaceID uint32,
urls []string, tlsCfg *tls.Config, option *option,
) *pdServiceDiscovery {
pdsd := &pdServiceDiscovery{
checkMembershipCh: make(chan struct{}, 1),
checkServiceModeCh: make(chan struct{}, 1),
ctx: ctx,
cancel: cancel,
wg: wg,
Expand Down Expand Up @@ -506,7 +510,7 @@ func (c *pdServiceDiscovery) Init() error {
}
}

if err := c.checkServiceModeChanged(); err != nil {
if err := c.updateServiceMode(); err != nil {
log.Warn("[pd] failed to check service mode and will check later", zap.Error(err))
}

Expand Down Expand Up @@ -567,23 +571,31 @@ func (c *pdServiceDiscovery) updateServiceModeLoop() {
failpoint.Inject("skipUpdateServiceMode", func() {
failpoint.Return()
})
failpoint.Inject("usePDServiceMode", func() {
c.serviceModeUpdateCb(pdpb.ServiceMode_PD_SVC_MODE)
failpoint.Return()
})

ctx, cancel := context.WithCancel(c.ctx)
defer cancel()
ticker := time.NewTicker(serviceModeUpdateInterval)
failpoint.Inject("fastUpdateServiceMode", func() {
ticker.Reset(10 * time.Millisecond)
})
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
failpoint.Inject("usePDServiceMode", func() {
c.serviceModeUpdateCb(pdpb.ServiceMode_PD_SVC_MODE, true)
failpoint.Continue()
})
case <-c.checkServiceModeCh:
failpoint.Inject("usePDServiceMode", func() {
c.serviceModeUpdateCb(pdpb.ServiceMode_PD_SVC_MODE, false)
failpoint.Continue()
})
}
if err := c.checkServiceModeChanged(); err != nil {
if err := c.updateServiceMode(); err != nil {
log.Error("[pd] failed to update service mode",
zap.Strings("urls", c.GetServiceURLs()), errs.ZapError(err))
c.ScheduleCheckMemberChanged() // check if the leader changed
Expand Down Expand Up @@ -777,6 +789,15 @@ func (c *pdServiceDiscovery) ScheduleCheckMemberChanged() {
}
}

// ScheduleCheckMemberChanged is used to check if there is any membership
// change among the leader and the followers.
func (c *pdServiceDiscovery) ScheduleCheckServiceModeChanged() {
select {
case c.checkServiceModeCh <- struct{}{}:
default:
}
}

// CheckMemberChanged Immediately check if there is any membership change among the leader/followers in a
// quorum-based cluster or among the primary/secondaries in a primary/secondary configured cluster.
func (c *pdServiceDiscovery) CheckMemberChanged() error {
Expand Down Expand Up @@ -857,7 +878,7 @@ func (c *pdServiceDiscovery) initClusterID() error {
return nil
}

func (c *pdServiceDiscovery) checkServiceModeChanged() error {
func (c *pdServiceDiscovery) updateServiceMode() error {
leaderURL := c.getLeaderURL()
if len(leaderURL) == 0 {
return errors.New("no leader found")
Expand All @@ -870,7 +891,7 @@ func (c *pdServiceDiscovery) checkServiceModeChanged() error {
// TODO: it's a hack way to solve the compatibility issue.
// we need to remove this after all maintained version supports the method.
if c.serviceModeUpdateCb != nil {
c.serviceModeUpdateCb(pdpb.ServiceMode_PD_SVC_MODE)
c.serviceModeUpdateCb(pdpb.ServiceMode_PD_SVC_MODE, true)
}
return nil
}
Expand All @@ -880,7 +901,7 @@ func (c *pdServiceDiscovery) checkServiceModeChanged() error {
return errors.WithStack(errNoServiceModeReturned)
}
if c.serviceModeUpdateCb != nil {
c.serviceModeUpdateCb(clusterInfo.ServiceModes[0])
c.serviceModeUpdateCb(clusterInfo.ServiceModes[0], true)
}
return nil
}
Expand Down
6 changes: 4 additions & 2 deletions client/resource_group/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,11 @@ func TestControllerWithTwoGroupRequestConcurrency(t *testing.T) {
defer cancel()

re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/triggerPeriodicReport", fmt.Sprintf("return(\"%s\")", defaultResourceGroupName)))
defer failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/triggerPeriodicReport")
re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/triggerLowRUReport", fmt.Sprintf("return(\"%s\")", "test-group")))
defer failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/triggerLowRUReport")
defer func() {
re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/triggerPeriodicReport"))
re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/triggerLowRUReport"))
}()

mockProvider := newMockResourceGroupProvider()
controller, _ := NewResourceGroupController(ctx, 1, mockProvider, nil)
Expand Down
4 changes: 4 additions & 0 deletions client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,10 @@ func (td *tsoDispatcher) handleProcessRequestError(ctx context.Context, bo *retr
// Set `stream` to nil and remove this stream from the `connectionCtxs` due to error.
td.connectionCtxs.Delete(streamURL)
streamCancelFunc()
if errs.IsServiceModeChange(err) {
svcDiscovery.ScheduleCheckServiceModeChanged()
return false
}
// Because ScheduleCheckMemberChanged is asynchronous, if the leader changes, we better call `updateMember` ASAP.
if errs.IsLeaderChange(err) {
if err := bo.Exec(ctx, svcDiscovery.CheckMemberChanged); err != nil {
Expand Down
5 changes: 5 additions & 0 deletions client/tso_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,11 @@ func (c *tsoServiceDiscovery) CheckMemberChanged() error {
return nil
}

// ScheduleCheckServiceModeChanged is used to trigger a check to see if there is any change in service mode.
func (c *tsoServiceDiscovery) ScheduleCheckServiceModeChanged() {
c.apiSvcDiscovery.ScheduleCheckServiceModeChanged()
}

// AddServingURLSwitchedCallback adds callbacks which will be called when the primary in
// a primary/secondary configured cluster is switched.
func (*tsoServiceDiscovery) AddServingURLSwitchedCallback(...func()) {}
Expand Down
3 changes: 2 additions & 1 deletion pkg/utils/apiutil/serverapi/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ func (h *redirector) matchMicroServiceRedirectRules(r *http.Request) (bool, stri
for _, rule := range h.microserviceRedirectRules {
// Now we only support checking the scheduling service whether it is independent
if rule.targetServiceName == constant.SchedulingServiceName {
if !h.s.GetRaftCluster().IsServiceIndependent(constant.SchedulingServiceName) {
rc := h.s.DirectlyGetRaftCluster()
if rc == nil || !rc.IsServiceIndependent(constant.SchedulingServiceName) {
continue
}
}
Expand Down
6 changes: 3 additions & 3 deletions server/api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func newConfHandler(svr *server.Server, rd *render.Render) *confHandler {
// @Router /config [get]
func (h *confHandler) GetConfig(w http.ResponseWriter, r *http.Request) {
cfg := h.svr.GetConfig()
if h.svr.GetRaftCluster().IsServiceIndependent(constant.SchedulingServiceName) &&
if h.svr.DirectlyGetRaftCluster().IsServiceIndependent(constant.SchedulingServiceName) &&
r.Header.Get(apiutil.XForbiddenForwardToMicroServiceHeader) != "true" {
schedulingServerConfig, err := h.getSchedulingServerConfig()
if err != nil {
Expand Down Expand Up @@ -336,7 +336,7 @@ func getConfigMap(cfg map[string]any, key []string, value any) map[string]any {
// @Success 200 {object} sc.ScheduleConfig
// @Router /config/schedule [get]
func (h *confHandler) GetScheduleConfig(w http.ResponseWriter, r *http.Request) {
if h.svr.GetRaftCluster().IsServiceIndependent(constant.SchedulingServiceName) &&
if h.svr.DirectlyGetRaftCluster().IsServiceIndependent(constant.SchedulingServiceName) &&
r.Header.Get(apiutil.XForbiddenForwardToMicroServiceHeader) != "true" {
cfg, err := h.getSchedulingServerConfig()
if err != nil {
Expand Down Expand Up @@ -409,7 +409,7 @@ func (h *confHandler) SetScheduleConfig(w http.ResponseWriter, r *http.Request)
// @Success 200 {object} sc.ReplicationConfig
// @Router /config/replicate [get]
func (h *confHandler) GetReplicationConfig(w http.ResponseWriter, r *http.Request) {
if h.svr.GetRaftCluster().IsServiceIndependent(constant.SchedulingServiceName) &&
if h.svr.DirectlyGetRaftCluster().IsServiceIndependent(constant.SchedulingServiceName) &&
r.Header.Get(apiutil.XForbiddenForwardToMicroServiceHeader) != "true" {
cfg, err := h.getSchedulingServerConfig()
if err != nil {
Expand Down
Loading
Loading