Skip to content

Commit

Permalink
fix(kuma-cp) fault injection matching (#2757)
Browse files Browse the repository at this point in the history
Signed-off-by: Ilya Lobkov <ilya.lobkov@konghq.com>
(cherry picked from commit 1f3b393)
  • Loading branch information
lobkovilya authored and mergify-bot committed Sep 20, 2021
1 parent b1395ec commit 591b89d
Show file tree
Hide file tree
Showing 10 changed files with 371 additions and 55 deletions.
8 changes: 5 additions & 3 deletions pkg/core/faultinjections/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@ func BuildFaultInjectionMap(dataplane *core_mesh.DataplaneResource, mesh *core_m
return nil, errors.Wrap(err, "could not fetch additional inbounds")
}
inbounds := append(dataplane.Spec.GetNetworking().GetInbound(), additionalInbounds...)
policyMap := policy.SelectInboundConnectionPolicies(dataplane, inbounds, policies)
policyMap := policy.SelectInboundConnectionMatchingPolicies(dataplane, inbounds, policies)

result := core_xds.FaultInjectionMap{}
for inbound, connectionPolicy := range policyMap {
result[inbound] = connectionPolicy.(*core_mesh.FaultInjectionResource).Spec
for inbound, connectionPolicies := range policyMap {
for _, connectionPolicy := range connectionPolicies {
result[inbound] = append(result[inbound], connectionPolicy.(*core_mesh.FaultInjectionResource).Spec)
}
}
return result, nil
}
110 changes: 99 additions & 11 deletions pkg/core/faultinjections/matcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,11 @@ var _ = Describe("Match", func() {
Expect(err).ToNot(HaveOccurred())
Expect(len(bestMatched)).To(Equal(len(given.expected)))
for key := range bestMatched {
Expect(bestMatched[key]).To(MatchProto(given.expected[key]))
elements := []interface{}{}
for _, expected := range given.expected[key] {
elements = append(elements, MatchProto(expected))
}
Expect(bestMatched[key]).To(ConsistOf(elements...))
}
},
Entry("1 inbound dataplane, 2 policies", testCase{
Expand Down Expand Up @@ -123,14 +127,16 @@ var _ = Describe("Match", func() {
mesh_proto.InboundInterface{
WorkloadIP: "127.0.0.1",
WorkloadPort: 8080,
}: policyWithDestinationsFunc("fi2", time.Unix(1, 0), []*mesh_proto.Selector{
{
Match: map[string]string{
"service": "*",
"kuma.io/protocol": "http",
}: {
policyWithDestinationsFunc("fi2", time.Unix(1, 0), []*mesh_proto.Selector{
{
Match: map[string]string{
"service": "*",
"kuma.io/protocol": "http",
},
},
},
}).Spec,
}).Spec,
},
}}),
Entry("should apply policy only to the first inbound", testCase{
dataplane: dataplaneWithInboundsFunc([]*mesh_proto.Dataplane_Networking_Inbound{
Expand Down Expand Up @@ -167,14 +173,96 @@ var _ = Describe("Match", func() {
mesh_proto.InboundInterface{
WorkloadIP: "127.0.0.1",
WorkloadPort: 8081,
}: policyWithDestinationsFunc("fi1", time.Unix(1, 0), []*mesh_proto.Selector{
}: {
policyWithDestinationsFunc("fi1", time.Unix(1, 0), []*mesh_proto.Selector{
{
Match: map[string]string{
"service": "web-api",
"kuma.io/protocol": "http",
},
},
}).Spec,
},
},
}),
Entry("should select all policies matched for the inbound", testCase{
dataplane: dataplaneWithInboundsFunc([]*mesh_proto.Dataplane_Networking_Inbound{
{
ServicePort: 8080,
Tags: map[string]string{
"service": "web",
"version": "0.1",
"region": "eu",
"kuma.io/protocol": "http",
},
},
}),
policies: []*mesh.FaultInjectionResource{
policyWithDestinationsFunc("fi1", time.Unix(1, 0), []*mesh_proto.Selector{
{
Match: map[string]string{
"service": "web-api",
"service": "*",
"kuma.io/protocol": "http",
},
},
}).Spec,
}),
policyWithDestinationsFunc("fi2", time.Unix(1, 0), []*mesh_proto.Selector{
{
Match: map[string]string{
"service": "web",
"kuma.io/protocol": "http",
},
},
}),
policyWithDestinationsFunc("fi3", time.Unix(1, 0), []*mesh_proto.Selector{
{
Match: map[string]string{
"version": "0.1",
"region": "eu",
"kuma.io/protocol": "http",
},
},
}),
policyWithDestinationsFunc("fi4", time.Unix(1, 0), []*mesh_proto.Selector{
{
Match: map[string]string{
"region": "us",
"kuma.io/protocol": "http",
},
},
}),
},
expected: core_xds.FaultInjectionMap{
mesh_proto.InboundInterface{
WorkloadIP: "127.0.0.1",
WorkloadPort: 8080,
}: {
policyWithDestinationsFunc("fi1", time.Unix(1, 0), []*mesh_proto.Selector{
{
Match: map[string]string{
"service": "*",
"kuma.io/protocol": "http",
},
},
}).Spec,
policyWithDestinationsFunc("fi2", time.Unix(1, 0), []*mesh_proto.Selector{
{
Match: map[string]string{
"service": "web",
"kuma.io/protocol": "http",
},
},
}).Spec,
policyWithDestinationsFunc("fi3", time.Unix(1, 0), []*mesh_proto.Selector{
{
Match: map[string]string{
"version": "0.1",
"region": "eu",
"kuma.io/protocol": "http",
},
},
}).Spec,
},
},
}),
)
Expand Down
4 changes: 2 additions & 2 deletions pkg/core/xds/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ type CircuitBreakerMap map[ServiceName]*core_mesh.CircuitBreakerResource
// RetryMap holds the most specific Retry for each reachable service.
type RetryMap map[ServiceName]*core_mesh.RetryResource

// FaultInjectionMap holds the most specific FaultInjectionResource for each InboundInterface
type FaultInjectionMap map[mesh_proto.InboundInterface]*mesh_proto.FaultInjection
// FaultInjectionMap holds all matched FaultInjectionResources for each InboundInterface
type FaultInjectionMap map[mesh_proto.InboundInterface][]*mesh_proto.FaultInjection

// TrafficPermissionMap holds the most specific TrafficPermissionResource for each InboundInterface
type TrafficPermissionMap map[mesh_proto.InboundInterface]*core_mesh.TrafficPermissionResource
Expand Down
4 changes: 2 additions & 2 deletions pkg/xds/envoy/listeners/filter_chain_configurers.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ func TcpProxyWithMetadata(statsName string, clusters ...envoy_common.Cluster) Fi
})
}

func FaultInjection(faultInjection *mesh_proto.FaultInjection) FilterChainBuilderOpt {
func FaultInjection(faultInjections ...*mesh_proto.FaultInjection) FilterChainBuilderOpt {
return AddFilterChainConfigurer(&v3.FaultInjectionConfigurer{
FaultInjection: faultInjection,
FaultInjections: faultInjections,
})
}

Expand Down
58 changes: 32 additions & 26 deletions pkg/xds/envoy/listeners/v3/fault_injection_configurer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,42 +15,48 @@ import (
)

type FaultInjectionConfigurer struct {
FaultInjection *mesh_proto.FaultInjection
FaultInjections []*mesh_proto.FaultInjection
}

func (f *FaultInjectionConfigurer) Configure(filterChain *envoy_listener.FilterChain) error {
if f.FaultInjection == nil {
return nil
}
var httpFilters []*envoy_hcm.HttpFilter

config := &envoy_http_fault.HTTPFault{
Delay: convertDelay(f.FaultInjection.Conf.GetDelay()),
Abort: convertAbort(f.FaultInjection.Conf.GetAbort()),
Headers: []*envoy_route.HeaderMatcher{
createHeaders(f.FaultInjection.SourceTags()),
},
}
// Iterate over FaultInjections and generate the relevant HTTP filters.
// We do assume that the FaultInjections resource is sorted, so the most
// specific source matches come first.
for _, fi := range f.FaultInjections {
config := &envoy_http_fault.HTTPFault{
Delay: convertDelay(fi.Conf.GetDelay()),
Abort: convertAbort(fi.Conf.GetAbort()),
Headers: []*envoy_route.HeaderMatcher{
createHeaders(fi.SourceTags()),
},
}

rrl, err := convertResponseRateLimit(f.FaultInjection.Conf.GetResponseBandwidth())
if err != nil {
return err
rrl, err := convertResponseRateLimit(fi.Conf.GetResponseBandwidth())
if err != nil {
return err
}
config.ResponseRateLimit = rrl

pbst, err := proto.MarshalAnyDeterministic(config)
if err != nil {
return err
}
httpFilters = append(httpFilters, &envoy_hcm.HttpFilter{
Name: "envoy.filters.http.fault",
ConfigType: &envoy_hcm.HttpFilter_TypedConfig{
TypedConfig: pbst,
},
})
}
config.ResponseRateLimit = rrl

pbst, err := proto.MarshalAnyDeterministic(config)
if err != nil {
return err
if len(httpFilters) == 0 {
return nil
}

return UpdateHTTPConnectionManager(filterChain, func(manager *envoy_hcm.HttpConnectionManager) error {
manager.HttpFilters = append([]*envoy_hcm.HttpFilter{
{
Name: "envoy.filters.http.fault",
ConfigType: &envoy_hcm.HttpFilter_TypedConfig{
TypedConfig: pbst,
},
},
}, manager.HttpFilters...)
manager.HttpFilters = append(httpFilters, manager.HttpFilters...)
return nil
})
}
Expand Down
Loading

0 comments on commit 591b89d

Please sign in to comment.