Skip to content

Commit

Permalink
fix(kuma-cp): traffic split with internal and external service (#5904)
Browse files Browse the repository at this point in the history
Configure RateLimit for the "destination" in TrafficRoute even if one of the services in split doesn't have RateLimit.
Signed-off-by: Ilya Lobkov <ilya.lobkov@konghq.com>
  • Loading branch information
lobkovilya authored Feb 6, 2023
1 parent eed566e commit 5118d98
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 34 deletions.
60 changes: 26 additions & 34 deletions pkg/xds/generator/outbound_proxy_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,10 +354,10 @@ func (OutboundProxyGenerator) determineRoutes(
timeoutConf = timeout.Spec.GetConf()
}

// Return internal, external
clustersFromSplit := func(splits []*mesh_proto.TrafficRoute_Split) ([]envoy_common.Cluster, []envoy_common.Cluster) {
var clustersInternal []envoy_common.Cluster
var clustersExternal []envoy_common.Cluster
rateLimit := proxy.Policies.RateLimitsOutbound[oface]

clustersFromSplit := func(splits []*mesh_proto.TrafficRoute_Split) []envoy_common.Cluster {
var clusters []envoy_common.Cluster
for _, destination := range splits {
service := destination.Destination[mesh_proto.ServiceTag]
if destination.GetWeight().GetValue() == 0 {
Expand Down Expand Up @@ -410,52 +410,44 @@ func (OutboundProxyGenerator) determineRoutes(
clusterCache[allTags.String()] = cluster.Name()
}

if isExternalService {
clustersExternal = append(clustersExternal, cluster)
} else {
clustersInternal = append(clustersInternal, cluster)
}
clusters = append(clusters, cluster)
}
return clustersInternal, clustersExternal
return clusters
}

appendRoute := func(routes envoy_common.Routes, match *mesh_proto.TrafficRoute_Http_Match, modify *mesh_proto.TrafficRoute_Http_Modify,
clusters []envoy_common.Cluster, rateLimit *core_mesh.RateLimitResource) envoy_common.Routes {
clusters []envoy_common.Cluster) envoy_common.Routes {
if len(clusters) == 0 {
return routes
}

// backwards compatibility to support RateLimit for ExternalServices without ZoneEgress
if hasEgress {
return append(routes, envoy_common.Route{
Match: match,
Modify: modify,
Clusters: clusters,
})
} else {
var rlSpec *mesh_proto.RateLimit
if rateLimit != nil {
rlSpec = rateLimit.Spec
hasExternal := false
for _, cluster := range clusters {
if cluster.IsExternalService() {
hasExternal = true
break
}
return append(routes, envoy_common.Route{
Match: match,
Modify: modify,
RateLimit: rlSpec,
Clusters: clusters,
})
}

var rlSpec *mesh_proto.RateLimit
if hasExternal && !hasEgress && rateLimit != nil {
rlSpec = rateLimit.Spec
} // otherwise rate limit is applied on the inbound side

return append(routes, envoy_common.Route{
Match: match,
Modify: modify,
RateLimit: rlSpec,
Clusters: clusters,
})
}

for _, http := range route.Spec.GetConf().GetHttp() {
clustersInternal, clustersExternal := clustersFromSplit(http.GetSplitWithDestination())
routes = appendRoute(routes, http.Match, http.Modify, clustersInternal, nil)
routes = appendRoute(routes, http.Match, http.Modify, clustersExternal, proxy.Policies.RateLimitsOutbound[oface])
routes = appendRoute(routes, http.Match, http.Modify, clustersFromSplit(http.GetSplitWithDestination()))
}

if defaultDestination := route.Spec.GetConf().GetSplitWithDestination(); len(defaultDestination) != 0 {
clustersInternal, clustersExternal := clustersFromSplit(defaultDestination)
routes = appendRoute(routes, nil, nil, clustersInternal, nil)
routes = appendRoute(routes, nil, nil, clustersExternal, proxy.Policies.RateLimitsOutbound[oface])
routes = appendRoute(routes, nil, nil, clustersFromSplit(defaultDestination))
}

return routes
Expand Down
99 changes: 99 additions & 0 deletions test/e2e_env/universal/trafficroute/traffic_route.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,22 @@ package trafficroute

import (
"fmt"
"net"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/onsi/gomega/types"
"github.com/pkg/errors"

"github.com/kumahq/kuma/pkg/core/resources/apis/mesh"
. "github.com/kumahq/kuma/test/framework"
. "github.com/kumahq/kuma/test/framework/client"
"github.com/kumahq/kuma/test/framework/envs/universal"
)

func TrafficRoute() {
meshName := "trafficroute"
var esHttpHostPort string

BeforeAll(func() {
Expect(NewClusterSetup().
Expand All @@ -39,8 +42,11 @@ func TrafficRoute() {
WithArgs([]string{"echo", "--instance", "another-test-server"}),
WithServiceName("another-test-server"),
)).
Install(TestServerExternalServiceUniversal("es-http", meshName, 80, false)).
Install(DemoClientUniversal(AppModeDemoClient, meshName, WithTransparentProxy(true))).
Setup(universal.Cluster)).To(Succeed())

esHttpHostPort = net.JoinHostPort(universal.Cluster.GetApp("es-http").GetContainerName(), "80")
})

E2EAfterAll(func() {
Expand All @@ -59,6 +65,7 @@ func TrafficRoute() {
err := universal.Cluster.GetKumactlOptions().KumactlDelete("traffic-route", item, meshName)
Expect(err).ToNot(HaveOccurred())
}
Expect(DeleteMeshResources(universal.Cluster, meshName, mesh.ExternalServiceResourceTypeDescriptor)).To(Succeed())
})

It("should access all instances of the service", func() {
Expand Down Expand Up @@ -175,6 +182,49 @@ conf:
)
})

It("should split traffic between internal and external services", func() {
Expect(universal.Cluster.Install(YamlUniversal(`
type: TrafficRoute
name: route-internal-external
mesh: trafficroute
sources:
- match:
kuma.io/service: demo-client
destinations:
- match:
kuma.io/service: test-server
conf:
split:
- weight: 50
destination:
kuma.io/service: test-server
version: v1
- weight: 50
destination:
kuma.io/service: es-http
`))).To(Succeed())
Expect(universal.Cluster.Install(YamlUniversal(fmt.Sprintf(`
type: ExternalService
name: es-http-1
mesh: trafficroute
networking:
address: %s
tags:
kuma.io/service: es-http
kuma.io/protocol: http
`, esHttpHostPort)))).To(Succeed())

Eventually(func() (map[string]int, error) {
return CollectResponsesByInstance(universal.Cluster, "demo-client", "test-server.mesh")
}, "30s", "500ms").Should(
And(
HaveLen(2),
HaveKey(Equal(`echo-v1`)),
HaveKey(Equal(`es-http`)),
),
)
})

Context("HTTP routing", func() {
HaveOnlyResponseFrom := func(response string) types.GomegaMatcher {
return And(
Expand Down Expand Up @@ -608,5 +658,54 @@ conf:
return nil
}, "30s", "500ms").Should(Succeed())
})

It("should split traffic between internal and external destinations", func() {
Expect(YamlUniversal(`
type: TrafficRoute
name: route-internal-external
mesh: trafficroute
sources:
- match:
kuma.io/service: demo-client
destinations:
- match:
kuma.io/service: test-server
conf:
http:
- match:
path:
prefix: /
split:
- weight: 50
destination:
kuma.io/service: test-server
version: v1
- weight: 50
destination:
kuma.io/service: es-http
destination:
kuma.io/service: test-server
`)(universal.Cluster)).To(Succeed())
Expect(universal.Cluster.Install(YamlUniversal(fmt.Sprintf(`
type: ExternalService
name: es-http-1
mesh: trafficroute
networking:
address: %s
tags:
kuma.io/service: es-http
kuma.io/protocol: http
`, esHttpHostPort)))).To(Succeed())

Eventually(func() (map[string]int, error) {
return CollectResponsesByInstance(universal.Cluster, "demo-client", "test-server.mesh")
}, "30s", "500ms").Should(
And(
HaveLen(2),
HaveKey(Equal(`echo-v1`)),
HaveKey(Equal(`es-http`)),
),
)
})
})
}

0 comments on commit 5118d98

Please sign in to comment.