Skip to content

Commit

Permalink
feat(kuma-cp): trim service insights and introduce ingress tag filters
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Dyszkiewicz <jakub.dyszkiewicz@gmail.com>
  • Loading branch information
jakubdyszkiewicz committed Jun 23, 2023
1 parent aa525a4 commit 4366273
Show file tree
Hide file tree
Showing 15 changed files with 166 additions and 67 deletions.
5 changes: 5 additions & 0 deletions docs/generated/kuma-cp.md
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,11 @@ experimental:
useTagFirstVirtualOutboundModel: false # ENV: KUMA_EXPERIMENTAL_USE_TAG_FIRST_VIRTUAL_OUTBOUND_MODEL
# If true, KDS will sync using incremental xDS updates
kdsDeltaEnabled: false # ENV: KUMA_EXPERIMENTAL_KDS_DELTA_ENABLED
# List of prefixes that will be used to filter out tags by keys from ingress' available services section.
# This can trim the size of the ZoneIngress object significantly.
# The drawback is that you cannot use filtered out tags for traffic routing.
# If empty, no filter is applied.
ingressTagFilters: [] # ENV: KUMA_EXPERIMENTAL_INGRESS_TAG_FILTERS

proxy:
gateway:
Expand Down
8 changes: 7 additions & 1 deletion pkg/api-server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
Expand Down Expand Up @@ -264,7 +265,12 @@ func addResourcesEndpoints(ws *restful.WebService, defs []model.ResourceTypeDesc
switch defType {
case mesh.ServiceInsightType:
// ServiceInsight is a bit different
ep := serviceInsightEndpoints{endpoints}
ep := serviceInsightEndpoints{
resourceEndpoints: endpoints,
addressPortGenerator: func(svc string) string {
return fmt.Sprintf("%s.%s:%d", svc, cfg.DNSServer.Domain, cfg.DNSServer.ServiceVipPort)
},
}
ep.addCreateOrUpdateEndpoint(ws, "/meshes/{mesh}/"+definition.WsPath)
ep.addDeleteEndpoint(ws, "/meshes/{mesh}/"+definition.WsPath)
ep.addFindEndpoint(ws, "/meshes/{mesh}/"+definition.WsPath)
Expand Down
11 changes: 11 additions & 0 deletions pkg/api-server/service_insight_endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

type serviceInsightEndpoints struct {
resourceEndpoints
addressPortGenerator func(string) string
}

func (s *serviceInsightEndpoints) addFindEndpoint(ws *restful.WebService, pathPrefix string) {
Expand All @@ -44,6 +45,7 @@ func (s *serviceInsightEndpoints) findResource(request *restful.Request, respons
Dataplanes: &v1alpha1.ServiceInsight_Service_DataplaneStat{},
}
}
s.fillStaticInfo(service, stat)
out := rest.From.Resource(serviceInsight)
res := out.(*rest_unversioned.Resource)
res.Meta.Name = service
Expand Down Expand Up @@ -88,6 +90,14 @@ func (s *serviceInsightEndpoints) listResources(request *restful.Request, respon
}
}

// fillStaticInfo fills static information, so we won't have to store this in the DB
func (s *serviceInsightEndpoints) fillStaticInfo(name string, stat *v1alpha1.ServiceInsight_Service) {
stat.Dataplanes.Total = stat.Dataplanes.Online + stat.Dataplanes.Offline
if stat.ServiceType == v1alpha1.ServiceInsight_Service_internal {
stat.AddressPort = s.addressPortGenerator(name)
}
}

// ServiceInsight is a resource that tracks insights about a Services (kuma.io/service tag in Dataplane)
// All of those statistics are put into a single ServiceInsight for a mesh for two reasons
// 1) It simpler and more efficient to manage 1 object because 1 Service != 1 Dataplane
Expand All @@ -98,6 +108,7 @@ func (s *serviceInsightEndpoints) expandInsights(serviceInsightList *mesh.Servic
restItems := []rest.Resource{} // Needs to be set to avoid returning nil and have the api return []
for _, insight := range serviceInsightList.Items {
for serviceName, stat := range insight.Spec.Services {
s.fillStaticInfo(serviceName, stat)
out := rest.From.Resource(insight)
res := out.(*rest_unversioned.Resource)
res.Meta.Name = serviceName
Expand Down
37 changes: 22 additions & 15 deletions pkg/api-server/service_insight_endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,13 @@ var _ = Describe("Service Insight Endpoints", func() {
"backend": {
Status: mesh_proto.ServiceInsight_Service_partially_degraded,
Dataplanes: &mesh_proto.ServiceInsight_Service_DataplaneStat{
Total: 100,
Online: 70,
Offline: 30,
},
},
"frontend": {
Status: mesh_proto.ServiceInsight_Service_partially_degraded,
Dataplanes: &mesh_proto.ServiceInsight_Service_DataplaneStat{
Total: 20,
Online: 19,
Offline: 1,
},
Expand All @@ -77,15 +75,13 @@ var _ = Describe("Service Insight Endpoints", func() {
"db": {
Status: mesh_proto.ServiceInsight_Service_partially_degraded,
Dataplanes: &mesh_proto.ServiceInsight_Service_DataplaneStat{
Total: 10,
Online: 9,
Offline: 1,
},
},
"redis": {
Status: mesh_proto.ServiceInsight_Service_partially_degraded,
Dataplanes: &mesh_proto.ServiceInsight_Service_DataplaneStat{
Total: 22,
Online: 19,
Offline: 3,
},
Expand All @@ -111,7 +107,8 @@ var _ = Describe("Service Insight Endpoints", func() {
"total": 100,
"online": 70,
"offline": 30
}
},
"addressPort": "backend.mesh:80"
},
{
"type": "ServiceInsight",
Expand All @@ -124,7 +121,8 @@ var _ = Describe("Service Insight Endpoints", func() {
"total": 20,
"online": 19,
"offline": 1
}
},
"addressPort": "frontend.mesh:80"
}
],
"next": null
Expand Down Expand Up @@ -154,7 +152,8 @@ var _ = Describe("Service Insight Endpoints", func() {
"total": 100,
"online": 70,
"offline": 30
}
},
"addressPort": "backend.mesh:80"
}`

// when
Expand Down Expand Up @@ -223,7 +222,8 @@ var _ = Describe("Service Insight Endpoints", func() {
"total": 100,
"online": 70,
"offline": 30
}
},
"addressPort": "backend.mesh:80"
},
{
"type": "ServiceInsight",
Expand All @@ -236,7 +236,8 @@ var _ = Describe("Service Insight Endpoints", func() {
"total": 20,
"online": 19,
"offline": 1
}
},
"addressPort": "frontend.mesh:80"
},
{
"type": "ServiceInsight",
Expand All @@ -249,7 +250,8 @@ var _ = Describe("Service Insight Endpoints", func() {
"total": 10,
"online": 9,
"offline": 1
}
},
"addressPort": "db.mesh:80"
},
{
"type": "ServiceInsight",
Expand All @@ -262,7 +264,8 @@ var _ = Describe("Service Insight Endpoints", func() {
"total": 22,
"online": 19,
"offline": 3
}
},
"addressPort": "redis.mesh:80"
}
],
"next": null
Expand All @@ -286,7 +289,8 @@ var _ = Describe("Service Insight Endpoints", func() {
"total": 100,
"online": 70,
"offline": 30
}
},
"addressPort": "backend.mesh:80"
},
{
"type": "ServiceInsight",
Expand All @@ -299,7 +303,8 @@ var _ = Describe("Service Insight Endpoints", func() {
"total": 20,
"online": 19,
"offline": 1
}
},
"addressPort": "frontend.mesh:80"
}
],
"next": "http://{{address}}/service-insights?offset=2&size=2"
Expand All @@ -323,7 +328,8 @@ var _ = Describe("Service Insight Endpoints", func() {
"total": 10,
"online": 9,
"offline": 1
}
},
"addressPort": "db.mesh:80"
},
{
"type": "ServiceInsight",
Expand All @@ -336,7 +342,8 @@ var _ = Describe("Service Insight Endpoints", func() {
"total": 22,
"online": 19,
"offline": 3
}
},
"addressPort": "redis.mesh:80"
}
],
"next": null
Expand Down
6 changes: 6 additions & 0 deletions pkg/config/app/kuma-cp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ var DefaultConfig = func() Config {
KubeOutboundsAsVIPs: true,
KDSDeltaEnabled: false,
UseTagFirstVirtualOutboundModel: false,
IngressTagFilters: []string{},
},
Proxy: xds.DefaultProxyConfig(),
InterCp: intercp.DefaultInterCpConfig(),
Expand Down Expand Up @@ -356,6 +357,11 @@ type ExperimentalConfig struct {
// you need to first disable this flag and redeploy cp, after config is rewritten to default
// format you can downgrade your cp
UseTagFirstVirtualOutboundModel bool `json:"useTagFirstVirtualOutboundModel" envconfig:"KUMA_EXPERIMENTAL_USE_TAG_FIRST_VIRTUAL_OUTBOUND_MODEL"`
// List of prefixes that will be used to filter out tags by keys from ingress' available services section.
// This can trim the size of the ZoneIngress object significantly.
// The drawback is that you cannot use filtered out tags for traffic routing.
// If empty, no filter is applied.
IngressTagFilters []string `json:"ingressTagFilters" envconfig:"KUMA_EXPERIMENTAL_INGRESS_TAG_FILTERS"`
}

func (e ExperimentalConfig) Validate() error {
Expand Down
5 changes: 5 additions & 0 deletions pkg/config/app/kuma-cp/kuma-cp.defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,11 @@ experimental:
useTagFirstVirtualOutboundModel: false # ENV: KUMA_EXPERIMENTAL_USE_TAG_FIRST_VIRTUAL_OUTBOUND_MODEL
# If true, KDS will sync using incremental xDS updates
kdsDeltaEnabled: false # ENV: KUMA_EXPERIMENTAL_KDS_DELTA_ENABLED
# List of prefixes that will be used to filter out tags by keys from ingress' available services section.
# This can trim the size of the ZoneIngress object significantly.
# The drawback is that you cannot use filtered out tags for traffic routing.
# If empty, no filter is applied.
ingressTagFilters: [] # ENV: KUMA_EXPERIMENTAL_INGRESS_TAG_FILTERS

proxy:
gateway:
Expand Down
3 changes: 3 additions & 0 deletions pkg/config/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ var _ = Describe("Config loader", func() {
Expect(cfg.Experimental.KubeOutboundsAsVIPs).To(BeTrue())
Expect(cfg.Experimental.KDSDeltaEnabled).To(BeTrue())
Expect(cfg.Experimental.UseTagFirstVirtualOutboundModel).To(BeFalse())
Expect(cfg.Experimental.IngressTagFilters).To(ContainElements("kuma.io/service"))

Expect(cfg.Proxy.Gateway.GlobalDownstreamMaxConnections).To(BeNumerically("==", 1))
},
Expand Down Expand Up @@ -650,6 +651,7 @@ experimental:
cniApp: "kuma-cni"
kdsDeltaEnabled: true
useTagFirstVirtualOutboundModel: false
ingressTagFilters: ["kuma.io/service"]
proxy:
gateway:
globalDownstreamMaxConnections: 1
Expand Down Expand Up @@ -884,6 +886,7 @@ proxy:
"KUMA_EXPERIMENTAL_GATEWAY_API": "true",
"KUMA_EXPERIMENTAL_KUBE_OUTBOUNDS_AS_VIPS": "true",
"KUMA_EXPERIMENTAL_USE_TAG_FIRST_VIRTUAL_OUTBOUND_MODEL": "false",
"KUMA_EXPERIMENTAL_INGRESS_TAG_FILTERS": "kuma.io/service",
"KUMA_PROXY_GATEWAY_GLOBAL_DOWNSTREAM_MAX_CONNECTIONS": "1",
},
yamlFileConfig: "",
Expand Down
5 changes: 0 additions & 5 deletions pkg/insights/components.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package insights

import (
"fmt"

"golang.org/x/time/rate"

config_core "github.com/kumahq/kuma/pkg/config/core"
Expand All @@ -24,9 +22,6 @@ func Setup(rt runtime.Runtime) error {
return rate.NewLimiter(rate.Every(rt.Config().Metrics.Mesh.MinResyncTimeout.Duration), 1)
},
Registry: registry.Global(),
AddressPortGenerator: func(svc string) string {
return fmt.Sprintf("%s.%s:%d", svc, rt.Config().DNSServer.Domain, rt.Config().DNSServer.ServiceVipPort)
},
}, rt.Tenants())
return rt.Add(component.NewResilientComponent(log, resyncer))
}
47 changes: 22 additions & 25 deletions pkg/insights/resyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,13 @@ func MeshInsightKey(mesh string) model.ResourceKey {
}

type Config struct {
Registry registry.TypeRegistry
ResourceManager manager.ResourceManager
EventReaderFactory events.ListenerFactory
MinResyncTimeout time.Duration
MaxResyncTimeout time.Duration
Tick func(d time.Duration) <-chan time.Time
RateLimiterFactory func() *rate.Limiter
AddressPortGenerator func(string) string
Registry registry.TypeRegistry
ResourceManager manager.ResourceManager
EventReaderFactory events.ListenerFactory
MinResyncTimeout time.Duration
MaxResyncTimeout time.Duration
Tick func(d time.Duration) <-chan time.Time
RateLimiterFactory func() *rate.Limiter
}

type resyncer struct {
Expand All @@ -68,9 +67,8 @@ type resyncer struct {
// info provides an information about the last sync for both ServiceInsight and MeshInsight
// Previously this data was stored in the Resource itself, but since resyncer runs only on leader
// we can just save this in memory and save requests to the DB.
infos map[string]syncInfo
infosMux sync.RWMutex
addressPortGenerator func(string) string
infos map[string]syncInfo
infosMux sync.RWMutex

registry registry.TypeRegistry
tenantFn multitenant.Tenants
Expand All @@ -91,16 +89,15 @@ type syncInfo struct {
// resync every t = MaxResyncTimeout - MinResyncTimeout.
func NewResyncer(config *Config, tenantFn multitenant.Tenants) component.Component {
r := &resyncer{
minResyncTimeout: config.MinResyncTimeout,
maxResyncTimeout: config.MaxResyncTimeout,
eventFactory: config.EventReaderFactory,
rm: config.ResourceManager,
rateLimiterFactory: config.RateLimiterFactory,
rateLimiters: map[string]*rate.Limiter{},
infos: map[string]syncInfo{},
registry: config.Registry,
addressPortGenerator: config.AddressPortGenerator,
tenantFn: tenantFn,
minResyncTimeout: config.MinResyncTimeout,
maxResyncTimeout: config.MaxResyncTimeout,
eventFactory: config.EventReaderFactory,
rm: config.ResourceManager,
rateLimiterFactory: config.RateLimiterFactory,
rateLimiters: map[string]*rate.Limiter{},
infos: map[string]syncInfo{},
registry: config.Registry,
tenantFn: tenantFn,
}

r.tick = config.Tick
Expand Down Expand Up @@ -237,8 +234,6 @@ func populateInsight(serviceType mesh_proto.ServiceInsight_Service_Type, insight

dataplanes := insight.Services[svcName].Dataplanes

dataplanes.Total++

switch status {
case core_mesh.Online:
dataplanes.Online++
Expand Down Expand Up @@ -283,7 +278,8 @@ func (r *resyncer) createOrUpdateServiceInsight(ctx context.Context, mesh string
}

for _, inbound := range networking.GetInbound() {
populateInsight(mesh_proto.ServiceInsight_Service_internal, insight, inbound.GetService(), status, backend, r.addressPortGenerator(inbound.GetService()))
// address port is empty to save space in the resource. It will be filled by the server on API response
populateInsight(mesh_proto.ServiceInsight_Service_internal, insight, inbound.GetService(), status, backend, "")
}
}

Expand All @@ -296,7 +292,8 @@ func (r *resyncer) createOrUpdateServiceInsight(ctx context.Context, mesh string
}

for _, svc := range insight.Services {
online, total := svc.Dataplanes.Online, svc.Dataplanes.Total
online := svc.Dataplanes.Online
total := svc.Dataplanes.Online + svc.Dataplanes.Offline

switch {
case svc.ServiceType == mesh_proto.ServiceInsight_Service_external:
Expand Down
Loading

0 comments on commit 4366273

Please sign in to comment.