Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TSO microservice discovery fallback path shouldn't call FindGroupByKeyspaceID #6473

Merged
merged 5 commits into from
May 16, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 42 additions & 21 deletions client/tso_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,14 +413,43 @@ func (c *tsoServiceDiscovery) updateMember() error {
log.Error("[tso] failed to get tso server", errs.ZapError(err))
return err
}
keyspaceGroup, err := c.findGroupByKeyspaceID(c.keyspaceID, tsoServerAddr, updateMemberTimeout)
if err != nil {
if c.tsoServerDiscovery.countFailure() {
log.Error("[tso] failed to find the keyspace group", errs.ZapError(err))

var keyspaceGroup *tsopb.KeyspaceGroup
if len(tsoServerAddr) > 0 {
keyspaceGroup, err = c.findGroupByKeyspaceID(c.keyspaceID, tsoServerAddr, updateMemberTimeout)
if err != nil {
if c.tsoServerDiscovery.countFailure() {
log.Error("[tso] failed to find the keyspace group", errs.ZapError(err))
}
return err
}
c.tsoServerDiscovery.resetFailure()
} else {
// There is no error but no tso server address found, which means
// the server side hasn't been upgraded to the version that
// processes and returns GetClusterInfoResponse.TsoUrls. In this case,
// we fall back to the old way of discovering the tso primary addresses
// from etcd directly.
log.Warn("[tso] no tso server address found,"+
" fallback to the legacy path to discover from etcd directly",
zap.String("discovery-key", c.defaultDiscoveryKey))
addrs, err := c.discoverWithLegacyPath()
if err != nil {
return err
}
if len(addrs) == 0 {
return errors.New("no tso server address found")
}
members := make([]*tsopb.KeyspaceGroupMember, 0, len(addrs))
for _, addr := range addrs {
members = append(members, &tsopb.KeyspaceGroupMember{Address: addr})
}
members[0].IsPrimary = true
keyspaceGroup = &tsopb.KeyspaceGroup{
Id: c.keyspaceID,
Members: members,
}
return err
}
c.tsoServerDiscovery.resetFailure()

log.Info("[tso] update keyspace group", zap.String("keyspace-group", keyspaceGroup.String()))

Expand Down Expand Up @@ -470,6 +499,9 @@ func (c *tsoServiceDiscovery) updateMember() error {
func (c *tsoServiceDiscovery) findGroupByKeyspaceID(
keyspaceID uint32, tsoSrvAddr string, timeout time.Duration,
) (*tsopb.KeyspaceGroup, error) {
failpoint.Inject("unexpectedCallOfFindGroupByKeyspaceID", func() {
panic("findGroupByKeyspaceID is called unexpectedly")
})
ctx, cancel := context.WithTimeout(c.ctx, timeout)
defer cancel()

Expand Down Expand Up @@ -526,21 +558,10 @@ func (c *tsoServiceDiscovery) getTSOServer(sd ServiceDiscovery) (string, error)
})
if len(addrs) == 0 {
// There is no error but no tso server address found, which means
// either the server side is experiencing some problems to get the
// tso primary addresses or the server hasn't been upgraded to the
// version which processes and returns GetClusterInfoResponse.TsoUrls.
// In this case, we fall back to the old way of discovering the tso
// primary addresses from etcd directly.
log.Warn("[tso] no tso server address found,"+
" fallback to the legacy path to discover from etcd directly",
zap.String("discovery-key", c.defaultDiscoveryKey))
addrs, err = c.discoverWithLegacyPath()
if err != nil {
return "", err
}
if len(addrs) == 0 {
return "", errors.New("no tso server address found")
}
// the server side hasn't been upgraded to the version that
// processes and returns GetClusterInfoResponse.TsoUrls. Return here
// and handle the fallback logic outside of this function.
return "", nil
}

log.Info("update tso server addresses", zap.Strings("addrs", addrs))
Expand Down
2 changes: 1 addition & 1 deletion tests/integrations/tso/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ test: failpoint-enable
$(MAKE) failpoint-disable

ci-test-job:
CGO_ENABLED=1 go test -tags deadlock -race -covermode=atomic -coverprofile=covprofile -coverpkg=$(ROOT_PATH)/... github.com/tikv/pd/tests/integrations/tso
CGO_ENABLED=1 go test -v -tags deadlock -race -covermode=atomic -coverprofile=covprofile -coverpkg=$(ROOT_PATH)/... github.com/tikv/pd/tests/integrations/tso
binshi-bing marked this conversation as resolved.
Show resolved Hide resolved

install-tools:
cd $(ROOT_PATH) && $(MAKE) install-tools
Expand Down
3 changes: 3 additions & 0 deletions tests/integrations/tso/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,12 @@ func (suite *tsoClientTestSuite) TestDiscoverTSOServiceWithLegacyPath() {
// Simulate the case that the server has lower version than the client and returns no tso addrs
// in the GetClusterInfo RPC.
re.NoError(failpoint.Enable("github.com/tikv/pd/client/serverReturnsNoTSOAddrs", `return(true)`))
re.NoError(failpoint.Enable("github.com/tikv/pd/client/unexpectedCallOfFindGroupByKeyspaceID", `return(true)`))
defer func() {
re.NoError(failpoint.Disable("github.com/tikv/pd/client/serverReturnsNoTSOAddrs"))
re.NoError(failpoint.Disable("github.com/tikv/pd/client/unexpectedCallOfFindGroupByKeyspaceID"))
}()

var wg sync.WaitGroup
wg.Add(tsoRequestConcurrencyNumber)
for i := 0; i < tsoRequestConcurrencyNumber; i++ {
Expand Down