Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: add HashPolicy fields to RDS update #4521

Merged
merged 17 commits into from
Jun 14, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions internal/xds/env/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const (
BootstrapFileContentEnv = "GRPC_XDS_BOOTSTRAP_CONFIG"

circuitBreakingSupportEnv = "GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING"
ringHashSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH"
timeoutSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT"
faultInjectionSupportEnv = "GRPC_XDS_EXPERIMENTAL_FAULT_INJECTION"
clientSideSecuritySupportEnv = "GRPC_XDS_EXPERIMENTAL_SECURITY_SUPPORT"
Expand Down Expand Up @@ -67,6 +68,10 @@ var (
// enabled, which can be disabled by setting the environment variable
// "GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING" to "false".
CircuitBreakingSupport = !strings.EqualFold(os.Getenv(circuitBreakingSupportEnv), "false")
// RingHashSupport indicates whether ring hash support is enabled, which can
// be disabled by setting the environment variable
// "GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH" to "false".
RingHashSupport = !strings.EqualFold(os.Getenv(ringHashSupportEnv), "false")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want the default of this to be "false". See how AggregateAndDNSSupportEnv is handled

(Note that the comment says it can be enabled by setting env var to true, and also EqualFold(..., "true"))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see, this isn't something the user should have to disable, but rather something the user should have to enable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.

// TimeoutSupport indicates whether support for max_stream_duration in
// route actions is enabled. This can be disabled by setting the
// environment variable "GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT" to "false".
Expand Down
24 changes: 24 additions & 0 deletions xds/internal/xdsclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,28 @@ type VirtualHost struct {
HTTPFilterConfigOverride map[string]httpfilter.FilterConfig
}

// HashPolicyType specifies the type of HashPolicy from a received RDS Response.
type HashPolicyType int

const (
// HashPolicyTypeHeader specifies to hash a Header in the incoming request.
HashPolicyTypeHeader HashPolicyType = iota
// HashPolicyTypeChannelID specifies to hash a unique Identifier of the
// Channel. In grpc-go, this will be done using the ClientConn pointer.
HashPolicyTypeChannelID
)

// HashPolicy specifies the HashPolicy if the upstream cluster uses a hashing
// load balancer.
type HashPolicy struct {
HashPolicyType HashPolicyType
Terminal bool
// Fields used for type HEADER.
HeaderName string
Regex *regexp.Regexp
RegexSubstitution string
}

// Route is both a specification of how to match a request as well as an
// indication of the action to take upon match.
type Route struct {
Expand All @@ -281,6 +303,8 @@ type Route struct {
Headers []*HeaderMatcher
Fraction *uint32

HashPolicies []*HashPolicy

// If the matchers above indicate a match, the below configuration is used.
WeightedClusters map[string]WeightedCluster
// If MaxStreamDuration is nil, it indicates neither of the route action's
Expand Down
174 changes: 173 additions & 1 deletion xds/internal/xdsclient/rds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1167,6 +1167,61 @@ func (s) TestRoutesProtoToSlice(t *testing.T) {
}},
wantErr: false,
},
{
name: "good-with-channel-id-hash-policy",
routes: []*v3routepb.Route{
{
Match: &v3routepb.RouteMatch{
PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/a/"},
Headers: []*v3routepb.HeaderMatcher{
{
Name: "th",
HeaderMatchSpecifier: &v3routepb.HeaderMatcher_PrefixMatch{
PrefixMatch: "tv",
},
InvertMatch: true,
},
},
RuntimeFraction: &v3corepb.RuntimeFractionalPercent{
DefaultValue: &v3typepb.FractionalPercent{
Numerator: 1,
Denominator: v3typepb.FractionalPercent_HUNDRED,
},
},
},
Action: &v3routepb.Route_Route{
Route: &v3routepb.RouteAction{
ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{
WeightedClusters: &v3routepb.WeightedCluster{
Clusters: []*v3routepb.WeightedCluster_ClusterWeight{
{Name: "B", Weight: &wrapperspb.UInt32Value{Value: 60}},
{Name: "A", Weight: &wrapperspb.UInt32Value{Value: 40}},
},
TotalWeight: &wrapperspb.UInt32Value{Value: 100},
}},
HashPolicy: []*v3routepb.RouteAction_HashPolicy{
{PolicySpecifier: &v3routepb.RouteAction_HashPolicy_FilterState_{FilterState: &v3routepb.RouteAction_HashPolicy_FilterState{Key: "io.grpc.channel_id"}}},
},
}},
},
},
wantRoutes: []*Route{{
Prefix: newStringP("/a/"),
Headers: []*HeaderMatcher{
{
Name: "th",
InvertMatch: newBoolP(true),
PrefixMatch: newStringP("tv"),
},
},
Fraction: newUInt32P(10000),
WeightedClusters: map[string]WeightedCluster{"A": {Weight: 40}, "B": {Weight: 60}},
HashPolicies: []*HashPolicy{
{HashPolicyType: HashPolicyTypeChannelID},
},
}},
wantErr: false,
},
{
name: "with custom HTTP filter config",
routes: goodRouteWithFilterConfigs(map[string]*anypb.Any{"foo": customFilterConfig}),
Expand Down Expand Up @@ -1223,7 +1278,9 @@ func (s) TestRoutesProtoToSlice(t *testing.T) {
return fmt.Sprint(fc)
}),
}

oldRingHashSupport := env.RingHashSupport
env.RingHashSupport = true
defer func() { env.RingHashSupport = oldRingHashSupport }()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
oldFI := env.FaultInjectionSupport
Expand All @@ -1241,6 +1298,121 @@ func (s) TestRoutesProtoToSlice(t *testing.T) {
}
}

func (s) TestHashPoliciesProtoToSlice(t *testing.T) {
tests := []struct {
name string
hashPolicies []*v3routepb.RouteAction_HashPolicy
wantHashPolicies []*HashPolicy
wantErr bool
}{
// header-hash-policy tests a basic hash policy that specifies to hash a
// certain header.
{
name: "header-hash-policy",
hashPolicies: []*v3routepb.RouteAction_HashPolicy{
{
PolicySpecifier: &v3routepb.RouteAction_HashPolicy_Header_{
Header: &v3routepb.RouteAction_HashPolicy_Header{
HeaderName: ":path",
RegexRewrite: &v3matcherpb.RegexMatchAndSubstitute{
Pattern: &v3matcherpb.RegexMatcher{Regex: "/products"},
Substitution: "/products",
},
},
},
},
},
wantHashPolicies: []*HashPolicy{
{
HashPolicyType: HashPolicyTypeHeader,
HeaderName: ":path",
Regex: func() *regexp.Regexp { return regexp.MustCompile("/products") }(),
RegexSubstitution: "/products",
},
},
},
// channel-id-hash-policy tests a basic hash policy that specifies to
// hash a unique identifier of the channel.
{
name: "channel-id-hash-policy",
hashPolicies: []*v3routepb.RouteAction_HashPolicy{
{PolicySpecifier: &v3routepb.RouteAction_HashPolicy_FilterState_{FilterState: &v3routepb.RouteAction_HashPolicy_FilterState{Key: "io.grpc.channel_id"}}},
},
wantHashPolicies: []*HashPolicy{
{HashPolicyType: HashPolicyTypeChannelID},
},
},
// unsupported-filter-state-key tests that an unsupported key in the
// filter state hash policy raises an error.
{
name: "wrong-filter-state-key",
hashPolicies: []*v3routepb.RouteAction_HashPolicy{
{PolicySpecifier: &v3routepb.RouteAction_HashPolicy_FilterState_{FilterState: &v3routepb.RouteAction_HashPolicy_FilterState{Key: "unsupported key"}}},
},
wantErr: true,
},
// no-op-hash-policy tests that hash policies that are not supported by
// grpc raise an error.
{
name: "no-op-hash-policy",
hashPolicies: []*v3routepb.RouteAction_HashPolicy{
{PolicySpecifier: &v3routepb.RouteAction_HashPolicy_FilterState_{}},
},
wantErr: true,
},
// header-and-channel-id-hash-policy test that a list of header and
// channel id hash policies are successfully converted to an internal
// struct.
{
name: "header-and-channel-id-hash-policy",
hashPolicies: []*v3routepb.RouteAction_HashPolicy{
{
PolicySpecifier: &v3routepb.RouteAction_HashPolicy_Header_{
Header: &v3routepb.RouteAction_HashPolicy_Header{
HeaderName: ":path",
RegexRewrite: &v3matcherpb.RegexMatchAndSubstitute{
Pattern: &v3matcherpb.RegexMatcher{Regex: "/products"},
Substitution: "/products",
},
},
},
},
{
PolicySpecifier: &v3routepb.RouteAction_HashPolicy_FilterState_{FilterState: &v3routepb.RouteAction_HashPolicy_FilterState{Key: "io.grpc.channel_id"}},
Terminal: true,
},
},
wantHashPolicies: []*HashPolicy{
{
HashPolicyType: HashPolicyTypeHeader,
HeaderName: ":path",
Regex: func() *regexp.Regexp { return regexp.MustCompile("/products") }(),
RegexSubstitution: "/products",
},
{
HashPolicyType: HashPolicyTypeChannelID,
Terminal: true,
},
},
},
}

oldRingHashSupport := env.RingHashSupport
env.RingHashSupport = true
defer func() { env.RingHashSupport = oldRingHashSupport }()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := hashPoliciesProtoToSlice(tt.hashPolicies)
if err != nil && !tt.wantErr {
t.Fatalf("Error returned when no error desired.")
}
if diff := cmp.Diff(got, tt.wantHashPolicies, cmp.AllowUnexported(regexp.Regexp{})); diff != "" {
t.Fatalf("hashPoliciesProtoToSlice returned returned unexpected diff (-got +want):\n%s", diff)
}
})
}
}

func newStringP(s string) *string {
return &s
}
Expand Down
41 changes: 41 additions & 0 deletions xds/internal/xdsclient/xds.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,14 @@ func routesProtoToSlice(routes []*v3routepb.Route, logger *grpclog.PrefixLogger,
}

route.WeightedClusters = make(map[string]WeightedCluster)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete this blank line

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

action := r.GetRoute()
hp, err := hashPoliciesProtoToSlice(action.HashPolicy)
if err != nil {
return nil, err
}
route.HashPolicies = hp

switch a := action.GetClusterSpecifier().(type) {
case *v3routepb.RouteAction_Cluster:
route.WeightedClusters[a.Cluster] = WeightedCluster{Weight: 1}
Expand Down Expand Up @@ -561,6 +568,40 @@ func routesProtoToSlice(routes []*v3routepb.Route, logger *grpclog.PrefixLogger,
return routesRet, nil
}

func hashPoliciesProtoToSlice(policies []*v3routepb.RouteAction_HashPolicy) ([]*HashPolicy, error) {
var hashPoliciesRet []*HashPolicy
for _, p := range policies {
// Hash Policies are only applicable for a Ring Hash LB.
if !env.RingHashSupport {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do this check before calling hashPoliciesProtoToSlice.

And also, no need to return an error (error results in NACKs). It's safe to just ignore it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrapped the hashPoliciesProtoToSlice call inside a RingHashSupport conditional.

return nil, errors.New("hash policies are only applicable for ring hash lb policy")
}
var policy HashPolicy
policy.Terminal = p.Terminal
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this can be oneline
policy := HashPolicy{Terminal: p.Terminal}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

switch p.GetPolicySpecifier().(type) {
case *v3routepb.RouteAction_HashPolicy_Header_:
policy.HashPolicyType = HashPolicyTypeHeader
policy.HeaderName = p.GetHeader().HeaderName
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Call GetHeaderName(). Otherwise this will panic if GetHeader() returns nil.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I've actually been wondering about this for a while.

regex := p.GetHeader().GetRegexRewrite().GetPattern().Regex
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Call GetRegex()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

re, err := regexp.Compile(regex)
if err != nil {
return nil, fmt.Errorf("hash policy %+v contains an invalid regex %q", p, regex)
}
policy.Regex = re
policy.RegexSubstitution = p.GetHeader().GetRegexRewrite().Substitution
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Call GetSubstitution()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

case *v3routepb.RouteAction_HashPolicy_FilterState_:
if p.GetFilterState().GetKey() != "io.grpc.channel_id" {
return nil, fmt.Errorf("hash policy %+v contains an invalid key for filter state policy %q", p, p.GetFilterState().GetKey())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should also not NACK.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to logger.

}
policy.HashPolicyType = HashPolicyTypeChannelID
default:
return nil, fmt.Errorf("hash policy %v is an unsupported hash policy", p)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Print the type (with %T). %v would print only the pointer of the field, and then we don't know which policy was in the response.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to p.GetPolicySpecifier(). Crossing my fingers that that's the right thing.

}

hashPoliciesRet = append(hashPoliciesRet, &policy)
}
return hashPoliciesRet, nil
}

// UnmarshalCluster processes resources received in an CDS response, validates
// them, and transforms them into a native struct which contains only fields we
// are interested in.
Expand Down