Skip to content

Commit

Permalink
client: support specifying target member (#7909)
Browse files Browse the repository at this point in the history
ref #7905

Signed-off-by: Ryan Leung <rleungx@gmail.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
rleungx and ti-chi-bot[bot] authored Apr 2, 2024
1 parent f559940 commit 6fe44d7
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 9 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ coverage.xml
coverage
*.txt
go.work*
embedded_assets_handler.go
1 change: 1 addition & 0 deletions client/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ var (
ErrClientGetClusterInfo = errors.Normalize("get cluster info failed", errors.RFCCodeText("PD:client:ErrClientGetClusterInfo"))
ErrClientUpdateMember = errors.Normalize("update member failed, %v", errors.RFCCodeText("PD:client:ErrUpdateMember"))
ErrClientNoAvailableMember = errors.Normalize("no available member", errors.RFCCodeText("PD:client:ErrClientNoAvailableMember"))
ErrClientNoTargetMember = errors.Normalize("no target member", errors.RFCCodeText("PD:client:ErrClientNoTargetMember"))
ErrClientProtoUnmarshal = errors.Normalize("failed to unmarshal proto", errors.RFCCodeText("PD:proto:ErrClientProtoUnmarshal"))
ErrClientGetMultiResponse = errors.Normalize("get invalid value response %v, must only one", errors.RFCCodeText("PD:client:ErrClientGetMultiResponse"))
ErrClientGetServingEndpoint = errors.Normalize("get serving endpoint failed", errors.RFCCodeText("PD:client:ErrClientGetServingEndpoint"))
Expand Down
10 changes: 5 additions & 5 deletions client/grpcutil/grpcutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,25 +124,25 @@ func getValueFromMetadata(ctx context.Context, key string, f func(context.Contex

// GetOrCreateGRPCConn returns the corresponding grpc client connection of the given addr.
// Returns the old one if's already existed in the clientConns; otherwise creates a new one and returns it.
func GetOrCreateGRPCConn(ctx context.Context, clientConns *sync.Map, addr string, tlsCfg *tls.Config, opt ...grpc.DialOption) (*grpc.ClientConn, error) {
conn, ok := clientConns.Load(addr)
func GetOrCreateGRPCConn(ctx context.Context, clientConns *sync.Map, url string, tlsCfg *tls.Config, opt ...grpc.DialOption) (*grpc.ClientConn, error) {
conn, ok := clientConns.Load(url)
if ok {
// TODO: check the connection state.
return conn.(*grpc.ClientConn), nil
}
dCtx, cancel := context.WithTimeout(ctx, dialTimeout)
defer cancel()
cc, err := GetClientConn(dCtx, addr, tlsCfg, opt...)
cc, err := GetClientConn(dCtx, url, tlsCfg, opt...)
failpoint.Inject("unreachableNetwork2", func(val failpoint.Value) {
if val, ok := val.(string); ok && val == addr {
if val, ok := val.(string); ok && val == url {
cc = nil
err = errors.Errorf("unreachable network")
}
})
if err != nil {
return nil, err
}
conn, loaded := clientConns.LoadOrStore(addr, cc)
conn, loaded := clientConns.LoadOrStore(url, cc)
if !loaded {
// Successfully stored the connection.
return cc, nil
Expand Down
19 changes: 18 additions & 1 deletion client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,15 +129,23 @@ func (ci *clientInner) requestWithRetry(
if len(clients) == 0 {
return errs.ErrClientNoAvailableMember
}
skipNum := 0
for _, cli := range clients {
url := cli.GetURL()
if reqInfo.targetURL != "" && reqInfo.targetURL != url {
skipNum++
continue
}
statusCode, err = ci.doRequest(ctx, url, reqInfo, headerOpts...)
if err == nil || noNeedRetry(statusCode) {
return err
}
log.Debug("[pd] request url failed",
zap.String("source", ci.source), zap.Bool("is-leader", cli.IsConnectedToLeader()), zap.String("url", url), zap.Error(err))
}
if skipNum == len(clients) {
return errs.ErrClientNoTargetMember
}
return err
}
if reqInfo.bo == nil {
Expand Down Expand Up @@ -244,6 +252,7 @@ type client struct {
callerID string
respHandler respHandleFunc
bo *retry.Backoffer
targetURL string
}

// ClientOption configures the HTTP client.
Expand Down Expand Up @@ -343,6 +352,13 @@ func (c *client) WithBackoffer(bo *retry.Backoffer) Client {
return &newClient
}

// WithTargetURL sets and returns a new client with the given target URL.
func (c *client) WithTargetURL(targetURL string) Client {
newClient := *c
newClient.targetURL = targetURL
return &newClient
}

// Header key definition constants.
const (
pdAllowFollowerHandleKey = "PD-Allow-Follower-Handle"
Expand All @@ -363,7 +379,8 @@ func (c *client) request(ctx context.Context, reqInfo *requestInfo, headerOpts .
return c.inner.requestWithRetry(ctx, reqInfo.
WithCallerID(c.callerID).
WithRespHandler(c.respHandler).
WithBackoffer(c.bo),
WithBackoffer(c.bo).
WithTargetURL(c.targetURL),
headerOpts...)
}

Expand Down
16 changes: 15 additions & 1 deletion client/http/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/stretchr/testify/require"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/retry"
"go.uber.org/atomic"
)
Expand Down Expand Up @@ -49,7 +50,7 @@ func TestPDAllowFollowerHandleHeader(t *testing.T) {
re.Equal(2, checked)
}

func TestCallerID(t *testing.T) {
func TestWithCallerID(t *testing.T) {
re := require.New(t)
checked := 0
expectedVal := atomic.NewString(defaultCallerID)
Expand Down Expand Up @@ -96,3 +97,16 @@ func TestWithBackoffer(t *testing.T) {
re.InDelta(3*time.Second, time.Since(start), float64(250*time.Millisecond))
re.ErrorIs(err, context.DeadlineExceeded)
}

func TestWithTargetURL(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c := newClientWithMockServiceDiscovery("test-with-target-url", []string{"http://127.0.0.1", "http://127.0.0.2", "http://127.0.0.3"})
defer c.Close()

_, err := c.WithTargetURL("http://127.0.0.4").GetStatus(ctx)
re.ErrorIs(err, errs.ErrClientNoTargetMember)
_, err = c.WithTargetURL("http://127.0.0.2").GetStatus(ctx)
re.ErrorContains(err, "connect: connection refused")
}
5 changes: 4 additions & 1 deletion client/http/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ type Client interface {
WithRespHandler(func(resp *http.Response, res any) error) Client
// WithBackoffer sets and returns a new client with the given backoffer.
WithBackoffer(*retry.Backoffer) Client
// WithTargetURL sets and returns a new client with the given target URL.
WithTargetURL(string) Client
// Close gracefully closes the HTTP client.
Close()
}
Expand Down Expand Up @@ -472,7 +474,8 @@ func (c *client) GetStatus(ctx context.Context) (*State, error) {
WithName(getStatusName).
WithURI(Status).
WithMethod(http.MethodGet).
WithResp(&status))
WithResp(&status),
WithAllowFollowerHandle())
if err != nil {
return nil, err
}
Expand Down
7 changes: 7 additions & 0 deletions client/http/request_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ type requestInfo struct {
res any
respHandler respHandleFunc
bo *retry.Backoffer
targetURL string
}

// newRequestInfo creates a new request info.
Expand Down Expand Up @@ -146,6 +147,12 @@ func (ri *requestInfo) WithBackoffer(bo *retry.Backoffer) *requestInfo {
return ri
}

// WithTargetURL sets the target URL of the request.
func (ri *requestInfo) WithTargetURL(targetURL string) *requestInfo {
ri.targetURL = targetURL
return ri
}

func (ri *requestInfo) getURL(addr string) string {
return fmt.Sprintf("%s%s", addr, ri.uri)
}
2 changes: 1 addition & 1 deletion client/mock_pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func NewMockPDServiceDiscovery(urls []string, tlsCfg *tls.Config) *mockPDService
func (m *mockPDServiceDiscovery) Init() error {
m.clients = make([]ServiceClient, 0, len(m.urls))
for _, url := range m.urls {
m.clients = append(m.clients, newPDServiceClient(url, url, nil, false))
m.clients = append(m.clients, newPDServiceClient(url, m.urls[0], nil, false))
}
return nil
}
Expand Down

0 comments on commit 6fe44d7

Please sign in to comment.