diff --git a/core/manager/bootstrap.go b/core/manager/bootstrap.go index daab89d..84b4029 100644 --- a/core/manager/bootstrap.go +++ b/core/manager/bootstrap.go @@ -19,6 +19,7 @@ package manager import ( "fmt" "os" + "time" v3core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" structpb "github.com/golang/protobuf/ptypes/struct" @@ -40,10 +41,19 @@ type BootstrapConfig struct { } type XDSServerConfig struct { - SvrName string // The name of the xDS server - SvrAddr string // The address of the xDS server - XDSAuth bool // If this xDS enable the authentication of xDS stream - NDSNotRequired bool // required by default for Istio + SvrName string // The name of the xDS server + SvrAddr string // The address of the xDS server + XDSAuth bool // If this xDS enable the authentication of xDS stream + NDSNotRequired bool // required by default for Istio + FetchXDSTimeout time.Duration // timeout for fecth xds, default to 1s +} + +// GetFetchXDSTimeout get timeout. +func (xsc XDSServerConfig) GetFetchXDSTimeout() time.Duration { + if xsc.FetchXDSTimeout == 0 { + return defaultXDSFetchTimeout + } + return xsc.FetchXDSTimeout } // newBootstrapConfig constructs the bootstrapConfig @@ -62,6 +72,9 @@ func newBootstrapConfig(config *XDSServerConfig) (*BootstrapConfig, error) { if podIP == "" { return nil, fmt.Errorf("[XDS] Bootstrap, INSTANCE_IP is not set in env") } + if config.FetchXDSTimeout == 0 { + config.FetchXDSTimeout = defaultXDSFetchTimeout + } // specify the version of istio in case of the canary deployment of istiod istioVersion := os.Getenv(IstioVersion) diff --git a/core/manager/client.go b/core/manager/client.go index f412319..2f65c92 100644 --- a/core/manager/client.go +++ b/core/manager/client.go @@ -406,6 +406,7 @@ func (c *xdsClient) handleLDS(resp *discoveryv3.DiscoveryResponse) error { res, err := xdsresource.UnmarshalLDS(resp.GetResources()) c.updateAndACK(xdsresource.ListenerType, resp.GetNonce(), resp.GetVersionInfo(), err) if err != nil { + klog.Warnf("KITEX: [XDS] unmarshal lds response error:%v", err) return err } @@ -418,6 +419,7 @@ func (c *xdsClient) handleLDS(resp *discoveryv3.DiscoveryResponse) error { if c.ndsRequired() { ln, err := c.getListenerName(n) if err != nil || ln == "" { + klog.Warnf("KITEX: [XDS] get listener name %s failed, err: %v", n, err) continue } if lis, ok := res[ln]; ok { diff --git a/core/manager/manager.go b/core/manager/manager.go index 704b2ec..ed6619d 100644 --- a/core/manager/manager.go +++ b/core/manager/manager.go @@ -145,8 +145,7 @@ func (m *xdsResourceManager) Get(ctx context.Context, rType xdsresource.Resource } m.mu.Unlock() // Set fetch timeout - // TODO: timeout should be specified in the config of xdsResourceManager - ctx, cancel := context.WithTimeout(ctx, defaultXDSFetchTimeout) + ctx, cancel := context.WithTimeout(ctx, m.opts.XDSSvrConfig.GetFetchXDSTimeout()) defer cancel() select { diff --git a/core/manager/manager_test.go b/core/manager/manager_test.go index 9d924f1..5e496a4 100644 --- a/core/manager/manager_test.go +++ b/core/manager/manager_test.go @@ -37,8 +37,9 @@ var ( Id: "sidecar~kitex-test-node", } XdsServerConfig = &XDSServerConfig{ - SvrAddr: XdsServerAddress, - SvrName: IstiodSvrName, + SvrAddr: XdsServerAddress, + SvrName: IstiodSvrName, + FetchXDSTimeout: defaultXDSFetchTimeout, } XdsBootstrapConfig = &BootstrapConfig{ node: NodeProto, diff --git a/core/xdsresource/lds.go b/core/xdsresource/lds.go index f0dd884..fd753a4 100644 --- a/core/xdsresource/lds.go +++ b/core/xdsresource/lds.go @@ -23,7 +23,6 @@ import ( v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3" v3thrift_proxy "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/thrift_proxy/v3" - v3matcher "github.com/envoyproxy/go-control-plane/envoy/type/matcher/v3" "github.com/golang/protobuf/ptypes/any" "google.golang.org/protobuf/proto" ) @@ -166,24 +165,7 @@ func unmarshalThriftProxy(rawResources *any.Any) (*RouteConfigResource, error) { case *v3thrift_proxy.RouteMatch_ServiceName: routeMatch.ServiceName = t.ServiceName } - // header match - tags := make(map[string]string) - if hs := match.GetHeaders(); hs != nil { - for _, h := range hs { - var v string - switch hm := h.GetHeaderMatchSpecifier().(type) { - case *v3routepb.HeaderMatcher_StringMatch: - switch p := hm.StringMatch.GetMatchPattern().(type) { - case *v3matcher.StringMatcher_Exact: - v = p.Exact - } - } - if v != "" { - tags[h.Name] = v - } - } - } - routeMatch.Tags = tags + routeMatch.Tags = BuildMatchers(match.GetHeaders()) route.Match = routeMatch // action action := r.Route diff --git a/core/xdsresource/lds_test.go b/core/xdsresource/lds_test.go index 166c5e0..cdcc1d9 100644 --- a/core/xdsresource/lds_test.go +++ b/core/xdsresource/lds_test.go @@ -104,9 +104,28 @@ func TestUnmarshallLDSThriftProxy(t *testing.T) { r := tp.Routes[0] assert.NotNil(t, r.Match) assert.True(t, r.Match.MatchPath("method")) - for k, v := range map[string]string{"k1": "v1", "k2": "v2"} { - assert.Equal(t, v, r.Match.GetTags()[k]) - } + assert.True(t, r.Match.MatchMeta(map[string]string{ + "k1": "v1", + "k2": "v2", + "k3": "prehello", + "k4": "a4", + })) + assert.True(t, r.Match.MatchMeta(map[string]string{ + "k1": "v1", + "k2": "v2", + "k3": "pre", + "k4": "a4", + })) + assert.False(t, r.Match.MatchMeta(map[string]string{ + "k3": "prehello", + "k4": "a4", + })) + assert.True(t, r.Match.MatchMeta(map[string]string{ + "k1": "v1", + "k2": "v2", + "k3": "prehello", + "k4": "z9", + })) assert.NotNil(t, r.WeightedClusters) } f(lis.NetworkFilters[0]) diff --git a/core/xdsresource/matcher.go b/core/xdsresource/matcher.go new file mode 100644 index 0000000..cbd56ef --- /dev/null +++ b/core/xdsresource/matcher.go @@ -0,0 +1,94 @@ +/* + * Copyright 2022 CloudWeGo Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package xdsresource + +import ( + "regexp" + "strings" + + "github.com/cloudwego/kitex/pkg/klog" + v3 "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" + v3matcher "github.com/envoyproxy/go-control-plane/envoy/type/matcher/v3" +) + +type PrefixMatcher string + +func (pm PrefixMatcher) Match(other string) bool { + return strings.HasPrefix(other, string(pm)) +} + +type ExactMatcher string + +func (em ExactMatcher) Match(other string) bool { + return string(em) == other +} + +type RegexMatcher struct { + re *regexp.Regexp +} + +func (rm *RegexMatcher) Match(other string) bool { + return rm.re.MatchString(other) +} + +type Matcher interface { + Match(string) bool +} + +type Matchers map[string]Matcher + +func (ms Matchers) Match(other map[string]string) bool { + for key, m := range ms { + if val, ok := other[key]; !ok || !m.Match(val) { + return false + } + } + return true +} + +// BuildMatchers build matcher set from headers +func BuildMatchers(headers []*v3.HeaderMatcher) Matchers { + ms := map[string]Matcher{} + for _, header := range headers { + switch hm := header.GetHeaderMatchSpecifier().(type) { + case *v3.HeaderMatcher_StringMatch: + switch p := hm.StringMatch.GetMatchPattern().(type) { + case *v3matcher.StringMatcher_Exact: + if p.Exact != "" { + ms[header.Name] = ExactMatcher(p.Exact) + } + case *v3matcher.StringMatcher_Prefix: + if p.Prefix != "" { + ms[header.Name] = PrefixMatcher(p.Prefix) + } + case *v3matcher.StringMatcher_SafeRegex: + // only support google re2 + if p.SafeRegex != nil && p.SafeRegex.Regex != "" { + re2, err := regexp.Compile(p.SafeRegex.Regex) + if err != nil { + klog.Warnf("KITEX: [XDS] compile regexp %s failed when BuildMatchers, err:", p.SafeRegex.Regex, err) + continue + } + ms[header.Name] = &RegexMatcher{ + re: re2, + } + } + } + } + } + return ms +} diff --git a/core/xdsresource/rds.go b/core/xdsresource/rds.go index 6739203..64c18f2 100644 --- a/core/xdsresource/rds.go +++ b/core/xdsresource/rds.go @@ -22,7 +22,6 @@ import ( "time" v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" - matcherv3 "github.com/envoyproxy/go-control-plane/envoy/type/matcher/v3" "github.com/golang/protobuf/ptypes/any" "google.golang.org/protobuf/proto" ) @@ -62,19 +61,20 @@ type Route struct { type RouteMatch interface { MatchPath(path string) bool - GetTags() map[string]string + // default use headers to match the meta. + MatchMeta(map[string]string) bool } type HTTPRouteMatch struct { - Path string - Prefix string - Tags map[string]string + Path string + Prefix string + Headers Matchers } type ThriftRouteMatch struct { Method string ServiceName string - Tags map[string]string + Tags Matchers } func (tm *ThriftRouteMatch) MatchPath(path string) bool { @@ -84,8 +84,8 @@ func (tm *ThriftRouteMatch) MatchPath(path string) bool { return true } -func (tm *ThriftRouteMatch) GetTags() map[string]string { - return tm.Tags +func (tm *ThriftRouteMatch) MatchMeta(md map[string]string) bool { + return tm.Tags.Match(md) } func (rm *HTTPRouteMatch) MatchPath(path string) bool { @@ -96,8 +96,8 @@ func (rm *HTTPRouteMatch) MatchPath(path string) bool { return rm.Prefix == "/" } -func (rm *HTTPRouteMatch) GetTags() map[string]string { - return rm.Tags +func (rm *HTTPRouteMatch) MatchMeta(md map[string]string) bool { + return rm.Headers.Match(md) } func (r *Route) MarshalJSON() ([]byte, error) { @@ -133,24 +133,7 @@ func unmarshalRoutes(rs []*v3routepb.Route) ([]*Route, error) { // default: // return nil, fmt.Errorf("only support path match") } - // header match - tags := make(map[string]string) - if hs := match.GetHeaders(); hs != nil { - for _, h := range hs { - var v string - switch hm := h.GetHeaderMatchSpecifier().(type) { - case *v3routepb.HeaderMatcher_StringMatch: - switch p := hm.StringMatch.GetMatchPattern().(type) { - case *matcherv3.StringMatcher_Exact: - v = p.Exact - } - } - if v != "" { - tags[h.Name] = v - } - } - } - routeMatch.Tags = tags + routeMatch.Headers = BuildMatchers(match.GetHeaders()) route.Match = routeMatch // action action := r.GetAction() diff --git a/core/xdsresource/rds_test.go b/core/xdsresource/rds_test.go index db63961..75f6c7c 100644 --- a/core/xdsresource/rds_test.go +++ b/core/xdsresource/rds_test.go @@ -131,7 +131,7 @@ func TestHTTPRouteMatch_MatchPath(t *testing.T) { type fields struct { Path string Prefix string - Tags map[string]string + Tags Matchers } type args struct { path string @@ -188,9 +188,9 @@ func TestHTTPRouteMatch_MatchPath(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { rm := &HTTPRouteMatch{ - Path: tt.fields.Path, - Prefix: tt.fields.Prefix, - Tags: tt.fields.Tags, + Path: tt.fields.Path, + Prefix: tt.fields.Prefix, + Headers: tt.fields.Tags, } if got := rm.MatchPath(tt.args.path); got != tt.want { t.Errorf("MatchPath() = %v, want %v", got, tt.want) diff --git a/core/xdsresource/testutil.go b/core/xdsresource/testutil.go index 544c628..b5954de 100644 --- a/core/xdsresource/testutil.go +++ b/core/xdsresource/testutil.go @@ -68,6 +68,28 @@ var ( }, }, }, + { + Name: "k3", + HeaderMatchSpecifier: &v3routepb.HeaderMatcher_StringMatch{ + StringMatch: &v3matcher.StringMatcher{ + MatchPattern: &v3matcher.StringMatcher_Prefix{ + Prefix: "pre", + }, + }, + }, + }, + { + Name: "k4", + HeaderMatchSpecifier: &v3routepb.HeaderMatcher_StringMatch{ + StringMatch: &v3matcher.StringMatcher{ + MatchPattern: &v3matcher.StringMatcher_SafeRegex{ + SafeRegex: &v3matcher.RegexMatcher{ + Regex: "[a-z][1-9]", + }, + }, + }, + }, + }, }, }, Route: &v3thrift_proxy.RouteAction{ diff --git a/xdssuite/router.go b/xdssuite/router.go index 482691f..dab037f 100644 --- a/xdssuite/router.go +++ b/xdssuite/router.go @@ -201,19 +201,7 @@ func matchThriftRoute(md map[string]string, ri rpcinfo.RPCInfo, routeConfig *xds // routeMatched checks if the route matches the info provided in the RPCInfo func routeMatched(path string, md map[string]string, r *xdsresource.Route) bool { - if r.Match != nil && r.Match.MatchPath(path) { - tagMatched := true - for mk, mv := range r.Match.GetTags() { - if v, ok := md[mk]; !ok || v != mv { - tagMatched = false - break - } - } - if tagMatched { - return true - } - } - return false + return r.Match != nil && r.Match.MatchPath(path) && r.Match.MatchMeta(md) } // pickCluster selects cluster based on the weight