Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TSO microservice discovery fallback path shouldn't call FindGroupByKeyspaceID #6473

Merged
merged 5 commits into from
May 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 58 additions & 27 deletions client/tso_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package pd
import (
"context"
"fmt"
"reflect"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -68,7 +69,7 @@ func (k *keyspaceGroupSvcDiscovery) update(
keyspaceGroup *tsopb.KeyspaceGroup,
newPrimaryAddr string,
secondaryAddrs, addrs []string,
) (oldPrimaryAddr string, primarySwitched bool) {
) (oldPrimaryAddr string, primarySwitched, secondaryChanged bool) {
k.Lock()
defer k.Unlock()

Expand All @@ -79,10 +80,13 @@ func (k *keyspaceGroupSvcDiscovery) update(
k.primaryAddr = newPrimaryAddr
}

if !reflect.DeepEqual(k.secondaryAddrs, secondaryAddrs) {
binshi-bing marked this conversation as resolved.
Show resolved Hide resolved
k.secondaryAddrs = secondaryAddrs
secondaryChanged = true
}

k.group = keyspaceGroup
k.secondaryAddrs = secondaryAddrs
k.addrs = addrs

return
}

Expand Down Expand Up @@ -413,16 +417,43 @@ func (c *tsoServiceDiscovery) updateMember() error {
log.Error("[tso] failed to get tso server", errs.ZapError(err))
return err
}
keyspaceGroup, err := c.findGroupByKeyspaceID(c.keyspaceID, tsoServerAddr, updateMemberTimeout)
if err != nil {
if c.tsoServerDiscovery.countFailure() {
log.Error("[tso] failed to find the keyspace group", errs.ZapError(err))

var keyspaceGroup *tsopb.KeyspaceGroup
if len(tsoServerAddr) > 0 {
keyspaceGroup, err = c.findGroupByKeyspaceID(c.keyspaceID, tsoServerAddr, updateMemberTimeout)
if err != nil {
if c.tsoServerDiscovery.countFailure() {
log.Error("[tso] failed to find the keyspace group", errs.ZapError(err))
}
return err
}
c.tsoServerDiscovery.resetFailure()
} else {
// There is no error but no tso server address found, which means
// the server side hasn't been upgraded to the version that
// processes and returns GetClusterInfoResponse.TsoUrls. In this case,
// we fall back to the old way of discovering the tso primary addresses
// from etcd directly.
log.Warn("[tso] no tso server address found,"+
" fallback to the legacy path to discover from etcd directly",
zap.String("discovery-key", c.defaultDiscoveryKey))
addrs, err := c.discoverWithLegacyPath()
if err != nil {
return err
}
if len(addrs) == 0 {
return errors.New("no tso server address found")
}
members := make([]*tsopb.KeyspaceGroupMember, 0, len(addrs))
for _, addr := range addrs {
members = append(members, &tsopb.KeyspaceGroupMember{Address: addr})
}
members[0].IsPrimary = true
keyspaceGroup = &tsopb.KeyspaceGroup{
Id: defaultKeySpaceGroupID,
Members: members,
}
return err
}
c.tsoServerDiscovery.resetFailure()

log.Info("[tso] update keyspace group", zap.String("keyspace-group", keyspaceGroup.String()))

// Initialize the serving addresses from the returned keyspace group info.
primaryAddr := ""
Expand All @@ -449,12 +480,17 @@ func (c *tsoServiceDiscovery) updateMember() error {
}
}

oldPrimary, primarySwitched := c.keyspaceGroupSD.update(keyspaceGroup, primaryAddr, secondaryAddrs, addrs)
oldPrimary, primarySwitched, secondaryChanged :=
c.keyspaceGroupSD.update(keyspaceGroup, primaryAddr, secondaryAddrs, addrs)
if primarySwitched {
if err := c.afterPrimarySwitched(oldPrimary, primaryAddr); err != nil {
return err
}
}
if primarySwitched || secondaryChanged {
log.Info("[tso] updated keyspace group service discovery info",
zap.String("keyspace-group-service", keyspaceGroup.String()))
}

// Even if the primary address is empty, we still updated other returned info above, including the
// keyspace group info and the secondary addresses.
Expand All @@ -470,6 +506,12 @@ func (c *tsoServiceDiscovery) updateMember() error {
func (c *tsoServiceDiscovery) findGroupByKeyspaceID(
keyspaceID uint32, tsoSrvAddr string, timeout time.Duration,
) (*tsopb.KeyspaceGroup, error) {
failpoint.Inject("unexpectedCallOfFindGroupByKeyspaceID", func(val failpoint.Value) {
keyspaceToCheck, ok := val.(int)
if ok && keyspaceID == uint32(keyspaceToCheck) {
panic("findGroupByKeyspaceID is called unexpectedly")
}
})
ctx, cancel := context.WithTimeout(c.ctx, timeout)
defer cancel()

Expand Down Expand Up @@ -526,21 +568,10 @@ func (c *tsoServiceDiscovery) getTSOServer(sd ServiceDiscovery) (string, error)
})
if len(addrs) == 0 {
// There is no error but no tso server address found, which means
// either the server side is experiencing some problems to get the
// tso primary addresses or the server hasn't been upgraded to the
// version which processes and returns GetClusterInfoResponse.TsoUrls.
// In this case, we fall back to the old way of discovering the tso
// primary addresses from etcd directly.
log.Warn("[tso] no tso server address found,"+
" fallback to the legacy path to discover from etcd directly",
zap.String("discovery-key", c.defaultDiscoveryKey))
addrs, err = c.discoverWithLegacyPath()
if err != nil {
return "", err
}
if len(addrs) == 0 {
return "", errors.New("no tso server address found")
}
// the server side hasn't been upgraded to the version that
// processes and returns GetClusterInfoResponse.TsoUrls. Return here
// and handle the fallback logic outside of this function.
return "", nil
}

log.Info("update tso server addresses", zap.Strings("addrs", addrs))
Expand Down
37 changes: 20 additions & 17 deletions tests/integrations/tso/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package tso

import (
"context"
"fmt"
"math"
"math/rand"
"strings"
Expand All @@ -30,6 +31,7 @@ import (
"github.com/tikv/pd/client/testutil"
bs "github.com/tikv/pd/pkg/basicserver"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/tempurl"
"github.com/tikv/pd/pkg/utils/tsoutil"
Expand Down Expand Up @@ -227,30 +229,31 @@ func (suite *tsoClientTestSuite) TestGetTSAsync() {

func (suite *tsoClientTestSuite) TestDiscoverTSOServiceWithLegacyPath() {
re := suite.Require()
keyspaceID := uint32(1000000)
// Make sure this keyspace ID is not in use somewhere.
re.False(slice.Contains(suite.keyspaceIDs, keyspaceID))
failpointValue := fmt.Sprintf(`return(%d)`, keyspaceID)
// Simulate the case that the server has lower version than the client and returns no tso addrs
// in the GetClusterInfo RPC.
re.NoError(failpoint.Enable("github.com/tikv/pd/client/serverReturnsNoTSOAddrs", `return(true)`))
re.NoError(failpoint.Enable("github.com/tikv/pd/client/unexpectedCallOfFindGroupByKeyspaceID", failpointValue))
defer func() {
re.NoError(failpoint.Disable("github.com/tikv/pd/client/serverReturnsNoTSOAddrs"))
re.NoError(failpoint.Disable("github.com/tikv/pd/client/unexpectedCallOfFindGroupByKeyspaceID"))
}()
var wg sync.WaitGroup
wg.Add(tsoRequestConcurrencyNumber)
for i := 0; i < tsoRequestConcurrencyNumber; i++ {
go func() {
defer wg.Done()
client := mcs.SetupClientWithDefaultKeyspaceName(
suite.ctx, re, strings.Split(suite.backendEndpoints, ","))
var lastTS uint64
for j := 0; j < tsoRequestRound; j++ {
physical, logical, err := client.GetTS(suite.ctx)
suite.NoError(err)
ts := tsoutil.ComposeTS(physical, logical)
suite.Less(lastTS, ts)
lastTS = ts
}
}()

ctx, cancel := context.WithCancel(suite.ctx)
defer cancel()
client := mcs.SetupClientWithKeyspaceID(
ctx, re, keyspaceID, strings.Split(suite.backendEndpoints, ","))
var lastTS uint64
for j := 0; j < tsoRequestRound; j++ {
physical, logical, err := client.GetTS(ctx)
suite.NoError(err)
ts := tsoutil.ComposeTS(physical, logical)
suite.Less(lastTS, ts)
lastTS = ts
}
wg.Wait()
}

// TestGetMinTS tests the correctness of GetMinTS.
Expand Down