Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support regexp and prefix match #21

Merged
merged 7 commits into from
Dec 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions core/manager/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package manager
import (
"fmt"
"os"
"time"

v3core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
structpb "github.com/golang/protobuf/ptypes/struct"
Expand All @@ -40,10 +41,19 @@ type BootstrapConfig struct {
}

type XDSServerConfig struct {
SvrName string // The name of the xDS server
SvrAddr string // The address of the xDS server
XDSAuth bool // If this xDS enable the authentication of xDS stream
NDSNotRequired bool // required by default for Istio
SvrName string // The name of the xDS server
SvrAddr string // The address of the xDS server
XDSAuth bool // If this xDS enable the authentication of xDS stream
NDSNotRequired bool // required by default for Istio
FetchXDSTimeout time.Duration // timeout for fecth xds, default to 1s
}

// GetFetchXDSTimeout get timeout.
func (xsc XDSServerConfig) GetFetchXDSTimeout() time.Duration {
if xsc.FetchXDSTimeout == 0 {
return defaultXDSFetchTimeout
}
return xsc.FetchXDSTimeout
}

// newBootstrapConfig constructs the bootstrapConfig
Expand All @@ -62,6 +72,9 @@ func newBootstrapConfig(config *XDSServerConfig) (*BootstrapConfig, error) {
if podIP == "" {
return nil, fmt.Errorf("[XDS] Bootstrap, INSTANCE_IP is not set in env")
}
if config.FetchXDSTimeout == 0 {
config.FetchXDSTimeout = defaultXDSFetchTimeout
}
// specify the version of istio in case of the canary deployment of istiod
istioVersion := os.Getenv(IstioVersion)

Expand Down
2 changes: 2 additions & 0 deletions core/manager/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,7 @@ func (c *xdsClient) handleLDS(resp *discoveryv3.DiscoveryResponse) error {
res, err := xdsresource.UnmarshalLDS(resp.GetResources())
c.updateAndACK(xdsresource.ListenerType, resp.GetNonce(), resp.GetVersionInfo(), err)
if err != nil {
klog.Warnf("KITEX: [XDS] unmarshal lds response error:%v", err)
return err
}

Expand All @@ -418,6 +419,7 @@ func (c *xdsClient) handleLDS(resp *discoveryv3.DiscoveryResponse) error {
if c.ndsRequired() {
ln, err := c.getListenerName(n)
if err != nil || ln == "" {
klog.Warnf("KITEX: [XDS] get listener name %s failed, err: %v", n, err)
continue
}
if lis, ok := res[ln]; ok {
Expand Down
3 changes: 1 addition & 2 deletions core/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,7 @@ func (m *xdsResourceManager) Get(ctx context.Context, rType xdsresource.Resource
}
m.mu.Unlock()
// Set fetch timeout
// TODO: timeout should be specified in the config of xdsResourceManager
ctx, cancel := context.WithTimeout(ctx, defaultXDSFetchTimeout)
ctx, cancel := context.WithTimeout(ctx, m.opts.XDSSvrConfig.GetFetchXDSTimeout())
defer cancel()

select {
Expand Down
5 changes: 3 additions & 2 deletions core/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ var (
Id: "sidecar~kitex-test-node",
}
XdsServerConfig = &XDSServerConfig{
SvrAddr: XdsServerAddress,
SvrName: IstiodSvrName,
SvrAddr: XdsServerAddress,
SvrName: IstiodSvrName,
FetchXDSTimeout: defaultXDSFetchTimeout,
}
XdsBootstrapConfig = &BootstrapConfig{
node: NodeProto,
Expand Down
20 changes: 1 addition & 19 deletions core/xdsresource/lds.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
v3thrift_proxy "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/thrift_proxy/v3"
v3matcher "github.com/envoyproxy/go-control-plane/envoy/type/matcher/v3"
"github.com/golang/protobuf/ptypes/any"
"google.golang.org/protobuf/proto"
)
Expand Down Expand Up @@ -166,24 +165,7 @@ func unmarshalThriftProxy(rawResources *any.Any) (*RouteConfigResource, error) {
case *v3thrift_proxy.RouteMatch_ServiceName:
routeMatch.ServiceName = t.ServiceName
}
// header match
tags := make(map[string]string)
if hs := match.GetHeaders(); hs != nil {
for _, h := range hs {
var v string
switch hm := h.GetHeaderMatchSpecifier().(type) {
case *v3routepb.HeaderMatcher_StringMatch:
switch p := hm.StringMatch.GetMatchPattern().(type) {
case *v3matcher.StringMatcher_Exact:
v = p.Exact
}
}
if v != "" {
tags[h.Name] = v
}
}
}
routeMatch.Tags = tags
routeMatch.Tags = BuildMatchers(match.GetHeaders())
route.Match = routeMatch
// action
action := r.Route
Expand Down
25 changes: 22 additions & 3 deletions core/xdsresource/lds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,28 @@ func TestUnmarshallLDSThriftProxy(t *testing.T) {
r := tp.Routes[0]
assert.NotNil(t, r.Match)
assert.True(t, r.Match.MatchPath("method"))
for k, v := range map[string]string{"k1": "v1", "k2": "v2"} {
assert.Equal(t, v, r.Match.GetTags()[k])
}
assert.True(t, r.Match.MatchMeta(map[string]string{
"k1": "v1",
"k2": "v2",
"k3": "prehello",
"k4": "a4",
}))
assert.True(t, r.Match.MatchMeta(map[string]string{
"k1": "v1",
"k2": "v2",
"k3": "pre",
"k4": "a4",
}))
assert.False(t, r.Match.MatchMeta(map[string]string{
"k3": "prehello",
"k4": "a4",
}))
assert.True(t, r.Match.MatchMeta(map[string]string{
"k1": "v1",
"k2": "v2",
"k3": "prehello",
"k4": "z9",
}))
assert.NotNil(t, r.WeightedClusters)
}
f(lis.NetworkFilters[0])
Expand Down
94 changes: 94 additions & 0 deletions core/xdsresource/matcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright 2022 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package xdsresource

import (
"regexp"
"strings"

"github.com/cloudwego/kitex/pkg/klog"
v3 "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
v3matcher "github.com/envoyproxy/go-control-plane/envoy/type/matcher/v3"
)

type PrefixMatcher string

func (pm PrefixMatcher) Match(other string) bool {
return strings.HasPrefix(other, string(pm))
}

type ExactMatcher string

func (em ExactMatcher) Match(other string) bool {
return string(em) == other
}

type RegexMatcher struct {
re *regexp.Regexp
}

func (rm *RegexMatcher) Match(other string) bool {
return rm.re.MatchString(other)
}

type Matcher interface {
Match(string) bool
}

type Matchers map[string]Matcher

func (ms Matchers) Match(other map[string]string) bool {
for key, m := range ms {
if val, ok := other[key]; !ok || !m.Match(val) {
return false
}
}
return true
}

// BuildMatchers build matcher set from headers
func BuildMatchers(headers []*v3.HeaderMatcher) Matchers {
ms := map[string]Matcher{}
for _, header := range headers {
switch hm := header.GetHeaderMatchSpecifier().(type) {
case *v3.HeaderMatcher_StringMatch:
switch p := hm.StringMatch.GetMatchPattern().(type) {
case *v3matcher.StringMatcher_Exact:
if p.Exact != "" {
ms[header.Name] = ExactMatcher(p.Exact)
}
case *v3matcher.StringMatcher_Prefix:
if p.Prefix != "" {
ms[header.Name] = PrefixMatcher(p.Prefix)
}
case *v3matcher.StringMatcher_SafeRegex:
// only support google re2
if p.SafeRegex != nil && p.SafeRegex.Regex != "" {
re2, err := regexp.Compile(p.SafeRegex.Regex)
if err != nil {
klog.Warnf("KITEX: [XDS] compile regexp %s failed when BuildMatchers, err:", p.SafeRegex.Regex, err)
continue
}
ms[header.Name] = &RegexMatcher{
re: re2,
}
}
}
}
}
return ms
}
39 changes: 11 additions & 28 deletions core/xdsresource/rds.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"time"

v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
matcherv3 "github.com/envoyproxy/go-control-plane/envoy/type/matcher/v3"
"github.com/golang/protobuf/ptypes/any"
"google.golang.org/protobuf/proto"
)
Expand Down Expand Up @@ -62,19 +61,20 @@ type Route struct {

type RouteMatch interface {
MatchPath(path string) bool
GetTags() map[string]string
// default use headers to match the meta.
MatchMeta(map[string]string) bool
}

type HTTPRouteMatch struct {
Path string
Prefix string
Tags map[string]string
Path string
Prefix string
Headers Matchers
ppzqh marked this conversation as resolved.
Show resolved Hide resolved
}

type ThriftRouteMatch struct {
Method string
ServiceName string
Tags map[string]string
Tags Matchers
}

func (tm *ThriftRouteMatch) MatchPath(path string) bool {
Expand All @@ -84,8 +84,8 @@ func (tm *ThriftRouteMatch) MatchPath(path string) bool {
return true
}

func (tm *ThriftRouteMatch) GetTags() map[string]string {
return tm.Tags
func (tm *ThriftRouteMatch) MatchMeta(md map[string]string) bool {
return tm.Tags.Match(md)
}

func (rm *HTTPRouteMatch) MatchPath(path string) bool {
Expand All @@ -96,8 +96,8 @@ func (rm *HTTPRouteMatch) MatchPath(path string) bool {
return rm.Prefix == "/"
}

func (rm *HTTPRouteMatch) GetTags() map[string]string {
return rm.Tags
func (rm *HTTPRouteMatch) MatchMeta(md map[string]string) bool {
return rm.Headers.Match(md)
}

func (r *Route) MarshalJSON() ([]byte, error) {
Expand Down Expand Up @@ -133,24 +133,7 @@ func unmarshalRoutes(rs []*v3routepb.Route) ([]*Route, error) {
// default:
// return nil, fmt.Errorf("only support path match")
}
// header match
tags := make(map[string]string)
if hs := match.GetHeaders(); hs != nil {
for _, h := range hs {
var v string
switch hm := h.GetHeaderMatchSpecifier().(type) {
case *v3routepb.HeaderMatcher_StringMatch:
switch p := hm.StringMatch.GetMatchPattern().(type) {
case *matcherv3.StringMatcher_Exact:
v = p.Exact
}
}
if v != "" {
tags[h.Name] = v
}
}
}
routeMatch.Tags = tags
routeMatch.Headers = BuildMatchers(match.GetHeaders())
route.Match = routeMatch
// action
action := r.GetAction()
Expand Down
8 changes: 4 additions & 4 deletions core/xdsresource/rds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func TestHTTPRouteMatch_MatchPath(t *testing.T) {
type fields struct {
Path string
Prefix string
Tags map[string]string
Tags Matchers
}
type args struct {
path string
Expand Down Expand Up @@ -188,9 +188,9 @@ func TestHTTPRouteMatch_MatchPath(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
rm := &HTTPRouteMatch{
Path: tt.fields.Path,
Prefix: tt.fields.Prefix,
Tags: tt.fields.Tags,
Path: tt.fields.Path,
Prefix: tt.fields.Prefix,
Headers: tt.fields.Tags,
}
if got := rm.MatchPath(tt.args.path); got != tt.want {
t.Errorf("MatchPath() = %v, want %v", got, tt.want)
Expand Down
22 changes: 22 additions & 0 deletions core/xdsresource/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,28 @@ var (
},
},
},
{
Name: "k3",
HeaderMatchSpecifier: &v3routepb.HeaderMatcher_StringMatch{
StringMatch: &v3matcher.StringMatcher{
MatchPattern: &v3matcher.StringMatcher_Prefix{
Prefix: "pre",
},
},
},
},
{
Name: "k4",
HeaderMatchSpecifier: &v3routepb.HeaderMatcher_StringMatch{
StringMatch: &v3matcher.StringMatcher{
MatchPattern: &v3matcher.StringMatcher_SafeRegex{
SafeRegex: &v3matcher.RegexMatcher{
Regex: "[a-z][1-9]",
},
},
},
},
},
},
},
Route: &v3thrift_proxy.RouteAction{
Expand Down
14 changes: 1 addition & 13 deletions xdssuite/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,19 +201,7 @@ func matchThriftRoute(md map[string]string, ri rpcinfo.RPCInfo, routeConfig *xds

// routeMatched checks if the route matches the info provided in the RPCInfo
func routeMatched(path string, md map[string]string, r *xdsresource.Route) bool {
if r.Match != nil && r.Match.MatchPath(path) {
tagMatched := true
for mk, mv := range r.Match.GetTags() {
if v, ok := md[mk]; !ok || v != mv {
tagMatched = false
break
}
}
if tagMatched {
return true
}
}
return false
return r.Match != nil && r.Match.MatchPath(path) && r.Match.MatchMeta(md)
}

// pickCluster selects cluster based on the weight
Expand Down