diff --git a/internal/xds/env/env.go b/internal/xds/env/env.go index 05a017f2611b..e90c7ffc465c 100644 --- a/internal/xds/env/env.go +++ b/internal/xds/env/env.go @@ -39,6 +39,7 @@ const ( // When both bootstrap FileName and FileContent are set, FileName is used. BootstrapFileContentEnv = "GRPC_XDS_BOOTSTRAP_CONFIG" + ringHashSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH" clientSideSecuritySupportEnv = "GRPC_XDS_EXPERIMENTAL_SECURITY_SUPPORT" aggregateAndDNSSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER" @@ -59,7 +60,10 @@ var ( // // When both bootstrap FileName and FileContent are set, FileName is used. BootstrapFileContent = os.Getenv(BootstrapFileContentEnv) - + // RingHashSupport indicates whether ring hash support is enabled, which can + // be enabled by setting the environment variable + // "GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH" to "true". + RingHashSupport = strings.EqualFold(os.Getenv(ringHashSupportEnv), "true") // ClientSideSecuritySupport is used to control processing of security // configuration on the client-side. // diff --git a/xds/internal/xdsclient/client.go b/xds/internal/xdsclient/client.go index 13ef973807ce..cb0d93bd98ec 100644 --- a/xds/internal/xdsclient/client.go +++ b/xds/internal/xdsclient/client.go @@ -269,6 +269,28 @@ type VirtualHost struct { HTTPFilterConfigOverride map[string]httpfilter.FilterConfig } +// HashPolicyType specifies the type of HashPolicy from a received RDS Response. +type HashPolicyType int + +const ( + // HashPolicyTypeHeader specifies to hash a Header in the incoming request. + HashPolicyTypeHeader HashPolicyType = iota + // HashPolicyTypeChannelID specifies to hash a unique Identifier of the + // Channel. In grpc-go, this will be done using the ClientConn pointer. + HashPolicyTypeChannelID +) + +// HashPolicy specifies the HashPolicy if the upstream cluster uses a hashing +// load balancer. +type HashPolicy struct { + HashPolicyType HashPolicyType + Terminal bool + // Fields used for type HEADER. + HeaderName string + Regex *regexp.Regexp + RegexSubstitution string +} + // Route is both a specification of how to match a request as well as an // indication of the action to take upon match. type Route struct { @@ -281,6 +303,8 @@ type Route struct { Headers []*HeaderMatcher Fraction *uint32 + HashPolicies []*HashPolicy + // If the matchers above indicate a match, the below configuration is used. WeightedClusters map[string]WeightedCluster // If MaxStreamDuration is nil, it indicates neither of the route action's diff --git a/xds/internal/xdsclient/rds_test.go b/xds/internal/xdsclient/rds_test.go index adabe5f41671..b78a49a3e31a 100644 --- a/xds/internal/xdsclient/rds_test.go +++ b/xds/internal/xdsclient/rds_test.go @@ -29,6 +29,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "google.golang.org/grpc/internal/testutils" + "google.golang.org/grpc/internal/xds/env" "google.golang.org/grpc/xds/internal/httpfilter" "google.golang.org/grpc/xds/internal/version" "google.golang.org/protobuf/types/known/durationpb" @@ -1153,6 +1154,61 @@ func (s) TestRoutesProtoToSlice(t *testing.T) { }}, wantErr: false, }, + { + name: "good-with-channel-id-hash-policy", + routes: []*v3routepb.Route{ + { + Match: &v3routepb.RouteMatch{ + PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/a/"}, + Headers: []*v3routepb.HeaderMatcher{ + { + Name: "th", + HeaderMatchSpecifier: &v3routepb.HeaderMatcher_PrefixMatch{ + PrefixMatch: "tv", + }, + InvertMatch: true, + }, + }, + RuntimeFraction: &v3corepb.RuntimeFractionalPercent{ + DefaultValue: &v3typepb.FractionalPercent{ + Numerator: 1, + Denominator: v3typepb.FractionalPercent_HUNDRED, + }, + }, + }, + Action: &v3routepb.Route_Route{ + Route: &v3routepb.RouteAction{ + ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{ + WeightedClusters: &v3routepb.WeightedCluster{ + Clusters: []*v3routepb.WeightedCluster_ClusterWeight{ + {Name: "B", Weight: &wrapperspb.UInt32Value{Value: 60}}, + {Name: "A", Weight: &wrapperspb.UInt32Value{Value: 40}}, + }, + TotalWeight: &wrapperspb.UInt32Value{Value: 100}, + }}, + HashPolicy: []*v3routepb.RouteAction_HashPolicy{ + {PolicySpecifier: &v3routepb.RouteAction_HashPolicy_FilterState_{FilterState: &v3routepb.RouteAction_HashPolicy_FilterState{Key: "io.grpc.channel_id"}}}, + }, + }}, + }, + }, + wantRoutes: []*Route{{ + Prefix: newStringP("/a/"), + Headers: []*HeaderMatcher{ + { + Name: "th", + InvertMatch: newBoolP(true), + PrefixMatch: newStringP("tv"), + }, + }, + Fraction: newUInt32P(10000), + WeightedClusters: map[string]WeightedCluster{"A": {Weight: 40}, "B": {Weight: 60}}, + HashPolicies: []*HashPolicy{ + {HashPolicyType: HashPolicyTypeChannelID}, + }, + }}, + wantErr: false, + }, { name: "with custom HTTP filter config", routes: goodRouteWithFilterConfigs(map[string]*anypb.Any{"foo": customFilterConfig}), @@ -1197,7 +1253,9 @@ func (s) TestRoutesProtoToSlice(t *testing.T) { return fmt.Sprint(fc) }), } - + oldRingHashSupport := env.RingHashSupport + env.RingHashSupport = true + defer func() { env.RingHashSupport = oldRingHashSupport }() for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { got, err := routesProtoToSlice(tt.routes, nil, false) @@ -1211,6 +1269,119 @@ func (s) TestRoutesProtoToSlice(t *testing.T) { } } +func (s) TestHashPoliciesProtoToSlice(t *testing.T) { + tests := []struct { + name string + hashPolicies []*v3routepb.RouteAction_HashPolicy + wantHashPolicies []*HashPolicy + wantErr bool + }{ + // header-hash-policy tests a basic hash policy that specifies to hash a + // certain header. + { + name: "header-hash-policy", + hashPolicies: []*v3routepb.RouteAction_HashPolicy{ + { + PolicySpecifier: &v3routepb.RouteAction_HashPolicy_Header_{ + Header: &v3routepb.RouteAction_HashPolicy_Header{ + HeaderName: ":path", + RegexRewrite: &v3matcherpb.RegexMatchAndSubstitute{ + Pattern: &v3matcherpb.RegexMatcher{Regex: "/products"}, + Substitution: "/products", + }, + }, + }, + }, + }, + wantHashPolicies: []*HashPolicy{ + { + HashPolicyType: HashPolicyTypeHeader, + HeaderName: ":path", + Regex: func() *regexp.Regexp { return regexp.MustCompile("/products") }(), + RegexSubstitution: "/products", + }, + }, + }, + // channel-id-hash-policy tests a basic hash policy that specifies to + // hash a unique identifier of the channel. + { + name: "channel-id-hash-policy", + hashPolicies: []*v3routepb.RouteAction_HashPolicy{ + {PolicySpecifier: &v3routepb.RouteAction_HashPolicy_FilterState_{FilterState: &v3routepb.RouteAction_HashPolicy_FilterState{Key: "io.grpc.channel_id"}}}, + }, + wantHashPolicies: []*HashPolicy{ + {HashPolicyType: HashPolicyTypeChannelID}, + }, + }, + // unsupported-filter-state-key tests that an unsupported key in the + // filter state hash policy are treated as a no-op. + { + name: "wrong-filter-state-key", + hashPolicies: []*v3routepb.RouteAction_HashPolicy{ + {PolicySpecifier: &v3routepb.RouteAction_HashPolicy_FilterState_{FilterState: &v3routepb.RouteAction_HashPolicy_FilterState{Key: "unsupported key"}}}, + }, + }, + // no-op-hash-policy tests that hash policies that are not supported by + // grpc are treated as a no-op. + { + name: "no-op-hash-policy", + hashPolicies: []*v3routepb.RouteAction_HashPolicy{ + {PolicySpecifier: &v3routepb.RouteAction_HashPolicy_FilterState_{}}, + }, + }, + // header-and-channel-id-hash-policy test that a list of header and + // channel id hash policies are successfully converted to an internal + // struct. + { + name: "header-and-channel-id-hash-policy", + hashPolicies: []*v3routepb.RouteAction_HashPolicy{ + { + PolicySpecifier: &v3routepb.RouteAction_HashPolicy_Header_{ + Header: &v3routepb.RouteAction_HashPolicy_Header{ + HeaderName: ":path", + RegexRewrite: &v3matcherpb.RegexMatchAndSubstitute{ + Pattern: &v3matcherpb.RegexMatcher{Regex: "/products"}, + Substitution: "/products", + }, + }, + }, + }, + { + PolicySpecifier: &v3routepb.RouteAction_HashPolicy_FilterState_{FilterState: &v3routepb.RouteAction_HashPolicy_FilterState{Key: "io.grpc.channel_id"}}, + Terminal: true, + }, + }, + wantHashPolicies: []*HashPolicy{ + { + HashPolicyType: HashPolicyTypeHeader, + HeaderName: ":path", + Regex: func() *regexp.Regexp { return regexp.MustCompile("/products") }(), + RegexSubstitution: "/products", + }, + { + HashPolicyType: HashPolicyTypeChannelID, + Terminal: true, + }, + }, + }, + } + + oldRingHashSupport := env.RingHashSupport + env.RingHashSupport = true + defer func() { env.RingHashSupport = oldRingHashSupport }() + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := hashPoliciesProtoToSlice(tt.hashPolicies, nil) + if (err != nil) != tt.wantErr { + t.Fatalf("hashPoliciesProtoToSlice() error = %v, wantErr %v", err, tt.wantErr) + } + if diff := cmp.Diff(got, tt.wantHashPolicies, cmp.AllowUnexported(regexp.Regexp{})); diff != "" { + t.Fatalf("hashPoliciesProtoToSlice() returned unexpected diff (-got +want):\n%s", diff) + } + }) + } +} + func newStringP(s string) *string { return &s } diff --git a/xds/internal/xdsclient/xds.go b/xds/internal/xdsclient/xds.go index dd68f6c68bc8..b4b0dd1a45c1 100644 --- a/xds/internal/xdsclient/xds.go +++ b/xds/internal/xdsclient/xds.go @@ -496,6 +496,16 @@ func routesProtoToSlice(routes []*v3routepb.Route, logger *grpclog.PrefixLogger, route.WeightedClusters = make(map[string]WeightedCluster) action := r.GetRoute() + + // Hash Policies are only applicable for a Ring Hash LB. + if env.RingHashSupport { + hp, err := hashPoliciesProtoToSlice(action.HashPolicy, logger) + if err != nil { + return nil, err + } + route.HashPolicies = hp + } + switch a := action.GetClusterSpecifier().(type) { case *v3routepb.RouteAction_Cluster: route.WeightedClusters[a.Cluster] = WeightedCluster{Weight: 1} @@ -557,6 +567,37 @@ func routesProtoToSlice(routes []*v3routepb.Route, logger *grpclog.PrefixLogger, return routesRet, nil } +func hashPoliciesProtoToSlice(policies []*v3routepb.RouteAction_HashPolicy, logger *grpclog.PrefixLogger) ([]*HashPolicy, error) { + var hashPoliciesRet []*HashPolicy + for _, p := range policies { + policy := HashPolicy{Terminal: p.Terminal} + switch p.GetPolicySpecifier().(type) { + case *v3routepb.RouteAction_HashPolicy_Header_: + policy.HashPolicyType = HashPolicyTypeHeader + policy.HeaderName = p.GetHeader().GetHeaderName() + regex := p.GetHeader().GetRegexRewrite().GetPattern().GetRegex() + re, err := regexp.Compile(regex) + if err != nil { + return nil, fmt.Errorf("hash policy %+v contains an invalid regex %q", p, regex) + } + policy.Regex = re + policy.RegexSubstitution = p.GetHeader().GetRegexRewrite().GetSubstitution() + case *v3routepb.RouteAction_HashPolicy_FilterState_: + if p.GetFilterState().GetKey() != "io.grpc.channel_id" { + logger.Infof("hash policy %+v contains an invalid key for filter state policy %q", p, p.GetFilterState().GetKey()) + continue + } + policy.HashPolicyType = HashPolicyTypeChannelID + default: + logger.Infof("hash policy %T is an unsupported hash policy", p.GetPolicySpecifier()) + continue + } + + hashPoliciesRet = append(hashPoliciesRet, &policy) + } + return hashPoliciesRet, nil +} + // UnmarshalCluster processes resources received in an CDS response, validates // them, and transforms them into a native struct which contains only fields we // are interested in.