Skip to content

Commit

Permalink
xds: add HashPolicy fields to RDS update (#4521)
Browse files Browse the repository at this point in the history
* Add HashPolicy fields to RDS update
  • Loading branch information
zasweq authored Jun 14, 2021
1 parent 4554924 commit 22c5358
Show file tree
Hide file tree
Showing 4 changed files with 242 additions and 2 deletions.
6 changes: 5 additions & 1 deletion internal/xds/env/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const (
// When both bootstrap FileName and FileContent are set, FileName is used.
BootstrapFileContentEnv = "GRPC_XDS_BOOTSTRAP_CONFIG"

ringHashSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH"
clientSideSecuritySupportEnv = "GRPC_XDS_EXPERIMENTAL_SECURITY_SUPPORT"
aggregateAndDNSSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"

Expand All @@ -59,7 +60,10 @@ var (
//
// When both bootstrap FileName and FileContent are set, FileName is used.
BootstrapFileContent = os.Getenv(BootstrapFileContentEnv)

// RingHashSupport indicates whether ring hash support is enabled, which can
// be enabled by setting the environment variable
// "GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH" to "true".
RingHashSupport = strings.EqualFold(os.Getenv(ringHashSupportEnv), "true")
// ClientSideSecuritySupport is used to control processing of security
// configuration on the client-side.
//
Expand Down
24 changes: 24 additions & 0 deletions xds/internal/xdsclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,28 @@ type VirtualHost struct {
HTTPFilterConfigOverride map[string]httpfilter.FilterConfig
}

// HashPolicyType specifies the type of HashPolicy from a received RDS Response.
type HashPolicyType int

const (
// HashPolicyTypeHeader specifies to hash a Header in the incoming request.
HashPolicyTypeHeader HashPolicyType = iota
// HashPolicyTypeChannelID specifies to hash a unique Identifier of the
// Channel. In grpc-go, this will be done using the ClientConn pointer.
HashPolicyTypeChannelID
)

// HashPolicy specifies the HashPolicy if the upstream cluster uses a hashing
// load balancer.
type HashPolicy struct {
HashPolicyType HashPolicyType
Terminal bool
// Fields used for type HEADER.
HeaderName string
Regex *regexp.Regexp
RegexSubstitution string
}

// Route is both a specification of how to match a request as well as an
// indication of the action to take upon match.
type Route struct {
Expand All @@ -281,6 +303,8 @@ type Route struct {
Headers []*HeaderMatcher
Fraction *uint32

HashPolicies []*HashPolicy

// If the matchers above indicate a match, the below configuration is used.
WeightedClusters map[string]WeightedCluster
// If MaxStreamDuration is nil, it indicates neither of the route action's
Expand Down
173 changes: 172 additions & 1 deletion xds/internal/xdsclient/rds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/xds/env"
"google.golang.org/grpc/xds/internal/httpfilter"
"google.golang.org/grpc/xds/internal/version"
"google.golang.org/protobuf/types/known/durationpb"
Expand Down Expand Up @@ -1153,6 +1154,61 @@ func (s) TestRoutesProtoToSlice(t *testing.T) {
}},
wantErr: false,
},
{
name: "good-with-channel-id-hash-policy",
routes: []*v3routepb.Route{
{
Match: &v3routepb.RouteMatch{
PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/a/"},
Headers: []*v3routepb.HeaderMatcher{
{
Name: "th",
HeaderMatchSpecifier: &v3routepb.HeaderMatcher_PrefixMatch{
PrefixMatch: "tv",
},
InvertMatch: true,
},
},
RuntimeFraction: &v3corepb.RuntimeFractionalPercent{
DefaultValue: &v3typepb.FractionalPercent{
Numerator: 1,
Denominator: v3typepb.FractionalPercent_HUNDRED,
},
},
},
Action: &v3routepb.Route_Route{
Route: &v3routepb.RouteAction{
ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{
WeightedClusters: &v3routepb.WeightedCluster{
Clusters: []*v3routepb.WeightedCluster_ClusterWeight{
{Name: "B", Weight: &wrapperspb.UInt32Value{Value: 60}},
{Name: "A", Weight: &wrapperspb.UInt32Value{Value: 40}},
},
TotalWeight: &wrapperspb.UInt32Value{Value: 100},
}},
HashPolicy: []*v3routepb.RouteAction_HashPolicy{
{PolicySpecifier: &v3routepb.RouteAction_HashPolicy_FilterState_{FilterState: &v3routepb.RouteAction_HashPolicy_FilterState{Key: "io.grpc.channel_id"}}},
},
}},
},
},
wantRoutes: []*Route{{
Prefix: newStringP("/a/"),
Headers: []*HeaderMatcher{
{
Name: "th",
InvertMatch: newBoolP(true),
PrefixMatch: newStringP("tv"),
},
},
Fraction: newUInt32P(10000),
WeightedClusters: map[string]WeightedCluster{"A": {Weight: 40}, "B": {Weight: 60}},
HashPolicies: []*HashPolicy{
{HashPolicyType: HashPolicyTypeChannelID},
},
}},
wantErr: false,
},
{
name: "with custom HTTP filter config",
routes: goodRouteWithFilterConfigs(map[string]*anypb.Any{"foo": customFilterConfig}),
Expand Down Expand Up @@ -1197,7 +1253,9 @@ func (s) TestRoutesProtoToSlice(t *testing.T) {
return fmt.Sprint(fc)
}),
}

oldRingHashSupport := env.RingHashSupport
env.RingHashSupport = true
defer func() { env.RingHashSupport = oldRingHashSupport }()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := routesProtoToSlice(tt.routes, nil, false)
Expand All @@ -1211,6 +1269,119 @@ func (s) TestRoutesProtoToSlice(t *testing.T) {
}
}

func (s) TestHashPoliciesProtoToSlice(t *testing.T) {
tests := []struct {
name string
hashPolicies []*v3routepb.RouteAction_HashPolicy
wantHashPolicies []*HashPolicy
wantErr bool
}{
// header-hash-policy tests a basic hash policy that specifies to hash a
// certain header.
{
name: "header-hash-policy",
hashPolicies: []*v3routepb.RouteAction_HashPolicy{
{
PolicySpecifier: &v3routepb.RouteAction_HashPolicy_Header_{
Header: &v3routepb.RouteAction_HashPolicy_Header{
HeaderName: ":path",
RegexRewrite: &v3matcherpb.RegexMatchAndSubstitute{
Pattern: &v3matcherpb.RegexMatcher{Regex: "/products"},
Substitution: "/products",
},
},
},
},
},
wantHashPolicies: []*HashPolicy{
{
HashPolicyType: HashPolicyTypeHeader,
HeaderName: ":path",
Regex: func() *regexp.Regexp { return regexp.MustCompile("/products") }(),
RegexSubstitution: "/products",
},
},
},
// channel-id-hash-policy tests a basic hash policy that specifies to
// hash a unique identifier of the channel.
{
name: "channel-id-hash-policy",
hashPolicies: []*v3routepb.RouteAction_HashPolicy{
{PolicySpecifier: &v3routepb.RouteAction_HashPolicy_FilterState_{FilterState: &v3routepb.RouteAction_HashPolicy_FilterState{Key: "io.grpc.channel_id"}}},
},
wantHashPolicies: []*HashPolicy{
{HashPolicyType: HashPolicyTypeChannelID},
},
},
// unsupported-filter-state-key tests that an unsupported key in the
// filter state hash policy are treated as a no-op.
{
name: "wrong-filter-state-key",
hashPolicies: []*v3routepb.RouteAction_HashPolicy{
{PolicySpecifier: &v3routepb.RouteAction_HashPolicy_FilterState_{FilterState: &v3routepb.RouteAction_HashPolicy_FilterState{Key: "unsupported key"}}},
},
},
// no-op-hash-policy tests that hash policies that are not supported by
// grpc are treated as a no-op.
{
name: "no-op-hash-policy",
hashPolicies: []*v3routepb.RouteAction_HashPolicy{
{PolicySpecifier: &v3routepb.RouteAction_HashPolicy_FilterState_{}},
},
},
// header-and-channel-id-hash-policy test that a list of header and
// channel id hash policies are successfully converted to an internal
// struct.
{
name: "header-and-channel-id-hash-policy",
hashPolicies: []*v3routepb.RouteAction_HashPolicy{
{
PolicySpecifier: &v3routepb.RouteAction_HashPolicy_Header_{
Header: &v3routepb.RouteAction_HashPolicy_Header{
HeaderName: ":path",
RegexRewrite: &v3matcherpb.RegexMatchAndSubstitute{
Pattern: &v3matcherpb.RegexMatcher{Regex: "/products"},
Substitution: "/products",
},
},
},
},
{
PolicySpecifier: &v3routepb.RouteAction_HashPolicy_FilterState_{FilterState: &v3routepb.RouteAction_HashPolicy_FilterState{Key: "io.grpc.channel_id"}},
Terminal: true,
},
},
wantHashPolicies: []*HashPolicy{
{
HashPolicyType: HashPolicyTypeHeader,
HeaderName: ":path",
Regex: func() *regexp.Regexp { return regexp.MustCompile("/products") }(),
RegexSubstitution: "/products",
},
{
HashPolicyType: HashPolicyTypeChannelID,
Terminal: true,
},
},
},
}

oldRingHashSupport := env.RingHashSupport
env.RingHashSupport = true
defer func() { env.RingHashSupport = oldRingHashSupport }()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := hashPoliciesProtoToSlice(tt.hashPolicies, nil)
if (err != nil) != tt.wantErr {
t.Fatalf("hashPoliciesProtoToSlice() error = %v, wantErr %v", err, tt.wantErr)
}
if diff := cmp.Diff(got, tt.wantHashPolicies, cmp.AllowUnexported(regexp.Regexp{})); diff != "" {
t.Fatalf("hashPoliciesProtoToSlice() returned unexpected diff (-got +want):\n%s", diff)
}
})
}
}

func newStringP(s string) *string {
return &s
}
Expand Down
41 changes: 41 additions & 0 deletions xds/internal/xdsclient/xds.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,16 @@ func routesProtoToSlice(routes []*v3routepb.Route, logger *grpclog.PrefixLogger,

route.WeightedClusters = make(map[string]WeightedCluster)
action := r.GetRoute()

// Hash Policies are only applicable for a Ring Hash LB.
if env.RingHashSupport {
hp, err := hashPoliciesProtoToSlice(action.HashPolicy, logger)
if err != nil {
return nil, err
}
route.HashPolicies = hp
}

switch a := action.GetClusterSpecifier().(type) {
case *v3routepb.RouteAction_Cluster:
route.WeightedClusters[a.Cluster] = WeightedCluster{Weight: 1}
Expand Down Expand Up @@ -557,6 +567,37 @@ func routesProtoToSlice(routes []*v3routepb.Route, logger *grpclog.PrefixLogger,
return routesRet, nil
}

func hashPoliciesProtoToSlice(policies []*v3routepb.RouteAction_HashPolicy, logger *grpclog.PrefixLogger) ([]*HashPolicy, error) {
var hashPoliciesRet []*HashPolicy
for _, p := range policies {
policy := HashPolicy{Terminal: p.Terminal}
switch p.GetPolicySpecifier().(type) {
case *v3routepb.RouteAction_HashPolicy_Header_:
policy.HashPolicyType = HashPolicyTypeHeader
policy.HeaderName = p.GetHeader().GetHeaderName()
regex := p.GetHeader().GetRegexRewrite().GetPattern().GetRegex()
re, err := regexp.Compile(regex)
if err != nil {
return nil, fmt.Errorf("hash policy %+v contains an invalid regex %q", p, regex)
}
policy.Regex = re
policy.RegexSubstitution = p.GetHeader().GetRegexRewrite().GetSubstitution()
case *v3routepb.RouteAction_HashPolicy_FilterState_:
if p.GetFilterState().GetKey() != "io.grpc.channel_id" {
logger.Infof("hash policy %+v contains an invalid key for filter state policy %q", p, p.GetFilterState().GetKey())
continue
}
policy.HashPolicyType = HashPolicyTypeChannelID
default:
logger.Infof("hash policy %T is an unsupported hash policy", p.GetPolicySpecifier())
continue
}

hashPoliciesRet = append(hashPoliciesRet, &policy)
}
return hashPoliciesRet, nil
}

// UnmarshalCluster processes resources received in an CDS response, validates
// them, and transforms them into a native struct which contains only fields we
// are interested in.
Expand Down

0 comments on commit 22c5358

Please sign in to comment.