Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(kuma-cp): trim zone ingress and service insights #7098

Merged
merged 4 commits into from
Jun 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/generated/kuma-cp.md
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,11 @@ experimental:
useTagFirstVirtualOutboundModel: false # ENV: KUMA_EXPERIMENTAL_USE_TAG_FIRST_VIRTUAL_OUTBOUND_MODEL
# If true, KDS will sync using incremental xDS updates
kdsDeltaEnabled: false # ENV: KUMA_EXPERIMENTAL_KDS_DELTA_ENABLED
# List of prefixes that will be used to filter out tags by keys from ingress' available services section.
# This can trim the size of the ZoneIngress object significantly.
# The drawback is that you cannot use filtered out tags for traffic routing.
# If empty, no filter is applied.
ingressTagFilters: [] # ENV: KUMA_EXPERIMENTAL_INGRESS_TAG_FILTERS

proxy:
gateway:
Expand Down
8 changes: 7 additions & 1 deletion pkg/api-server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
Expand Down Expand Up @@ -265,7 +266,12 @@ func addResourcesEndpoints(ws *restful.WebService, defs []model.ResourceTypeDesc
switch defType {
case mesh.ServiceInsightType:
// ServiceInsight is a bit different
ep := serviceInsightEndpoints{endpoints}
ep := serviceInsightEndpoints{
resourceEndpoints: endpoints,
addressPortGenerator: func(svc string) string {
return fmt.Sprintf("%s.%s:%d", svc, cfg.DNSServer.Domain, cfg.DNSServer.ServiceVipPort)
},
}
ep.addCreateOrUpdateEndpoint(ws, "/meshes/{mesh}/"+definition.WsPath)
ep.addDeleteEndpoint(ws, "/meshes/{mesh}/"+definition.WsPath)
ep.addFindEndpoint(ws, "/meshes/{mesh}/"+definition.WsPath)
Expand Down
11 changes: 11 additions & 0 deletions pkg/api-server/service_insight_endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

type serviceInsightEndpoints struct {
resourceEndpoints
addressPortGenerator func(string) string
}

func (s *serviceInsightEndpoints) addFindEndpoint(ws *restful.WebService, pathPrefix string) {
Expand All @@ -44,6 +45,7 @@ func (s *serviceInsightEndpoints) findResource(request *restful.Request, respons
Dataplanes: &v1alpha1.ServiceInsight_Service_DataplaneStat{},
}
}
s.fillStaticInfo(service, stat)
out := rest.From.Resource(serviceInsight)
res := out.(*rest_unversioned.Resource)
res.Meta.Name = service
Expand Down Expand Up @@ -88,6 +90,14 @@ func (s *serviceInsightEndpoints) listResources(request *restful.Request, respon
}
}

// fillStaticInfo fills static information, so we won't have to store this in the DB
func (s *serviceInsightEndpoints) fillStaticInfo(name string, stat *v1alpha1.ServiceInsight_Service) {
stat.Dataplanes.Total = stat.Dataplanes.Online + stat.Dataplanes.Offline
if stat.ServiceType == v1alpha1.ServiceInsight_Service_internal {
stat.AddressPort = s.addressPortGenerator(name)
}
}

// ServiceInsight is a resource that tracks insights about a Services (kuma.io/service tag in Dataplane)
// All of those statistics are put into a single ServiceInsight for a mesh for two reasons
// 1) It simpler and more efficient to manage 1 object because 1 Service != 1 Dataplane
Expand All @@ -98,6 +108,7 @@ func (s *serviceInsightEndpoints) expandInsights(serviceInsightList *mesh.Servic
restItems := []rest.Resource{} // Needs to be set to avoid returning nil and have the api return []
for _, insight := range serviceInsightList.Items {
for serviceName, stat := range insight.Spec.Services {
s.fillStaticInfo(serviceName, stat)
out := rest.From.Resource(insight)
res := out.(*rest_unversioned.Resource)
res.Meta.Name = serviceName
Expand Down
37 changes: 22 additions & 15 deletions pkg/api-server/service_insight_endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,13 @@ var _ = Describe("Service Insight Endpoints", func() {
"backend": {
Status: mesh_proto.ServiceInsight_Service_partially_degraded,
Dataplanes: &mesh_proto.ServiceInsight_Service_DataplaneStat{
Total: 100,
Online: 70,
Offline: 30,
},
},
"frontend": {
Status: mesh_proto.ServiceInsight_Service_partially_degraded,
Dataplanes: &mesh_proto.ServiceInsight_Service_DataplaneStat{
Total: 20,
Online: 19,
Offline: 1,
},
Expand All @@ -77,15 +75,13 @@ var _ = Describe("Service Insight Endpoints", func() {
"db": {
Status: mesh_proto.ServiceInsight_Service_partially_degraded,
Dataplanes: &mesh_proto.ServiceInsight_Service_DataplaneStat{
Total: 10,
Online: 9,
Offline: 1,
},
},
"redis": {
Status: mesh_proto.ServiceInsight_Service_partially_degraded,
Dataplanes: &mesh_proto.ServiceInsight_Service_DataplaneStat{
Total: 22,
Online: 19,
Offline: 3,
},
Expand All @@ -111,7 +107,8 @@ var _ = Describe("Service Insight Endpoints", func() {
"total": 100,
"online": 70,
"offline": 30
}
},
"addressPort": "backend.mesh:80"
},
{
"type": "ServiceInsight",
Expand All @@ -124,7 +121,8 @@ var _ = Describe("Service Insight Endpoints", func() {
"total": 20,
"online": 19,
"offline": 1
}
},
"addressPort": "frontend.mesh:80"
}
],
"next": null
Expand Down Expand Up @@ -154,7 +152,8 @@ var _ = Describe("Service Insight Endpoints", func() {
"total": 100,
"online": 70,
"offline": 30
}
},
"addressPort": "backend.mesh:80"
}`

// when
Expand Down Expand Up @@ -223,7 +222,8 @@ var _ = Describe("Service Insight Endpoints", func() {
"total": 100,
"online": 70,
"offline": 30
}
},
"addressPort": "backend.mesh:80"
},
{
"type": "ServiceInsight",
Expand All @@ -236,7 +236,8 @@ var _ = Describe("Service Insight Endpoints", func() {
"total": 20,
"online": 19,
"offline": 1
}
},
"addressPort": "frontend.mesh:80"
},
{
"type": "ServiceInsight",
Expand All @@ -249,7 +250,8 @@ var _ = Describe("Service Insight Endpoints", func() {
"total": 10,
"online": 9,
"offline": 1
}
},
"addressPort": "db.mesh:80"
},
{
"type": "ServiceInsight",
Expand All @@ -262,7 +264,8 @@ var _ = Describe("Service Insight Endpoints", func() {
"total": 22,
"online": 19,
"offline": 3
}
},
"addressPort": "redis.mesh:80"
}
],
"next": null
Expand All @@ -286,7 +289,8 @@ var _ = Describe("Service Insight Endpoints", func() {
"total": 100,
"online": 70,
"offline": 30
}
},
"addressPort": "backend.mesh:80"
},
{
"type": "ServiceInsight",
Expand All @@ -299,7 +303,8 @@ var _ = Describe("Service Insight Endpoints", func() {
"total": 20,
"online": 19,
"offline": 1
}
},
"addressPort": "frontend.mesh:80"
}
],
"next": "http://{{address}}/service-insights?offset=2&size=2"
Expand All @@ -323,7 +328,8 @@ var _ = Describe("Service Insight Endpoints", func() {
"total": 10,
"online": 9,
"offline": 1
}
},
"addressPort": "db.mesh:80"
},
{
"type": "ServiceInsight",
Expand All @@ -336,7 +342,8 @@ var _ = Describe("Service Insight Endpoints", func() {
"total": 22,
"online": 19,
"offline": 3
}
},
"addressPort": "redis.mesh:80"
}
],
"next": null
Expand Down
6 changes: 6 additions & 0 deletions pkg/config/app/kuma-cp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ var DefaultConfig = func() Config {
KubeOutboundsAsVIPs: true,
KDSDeltaEnabled: false,
UseTagFirstVirtualOutboundModel: false,
IngressTagFilters: []string{},
},
Proxy: xds.DefaultProxyConfig(),
InterCp: intercp.DefaultInterCpConfig(),
Expand Down Expand Up @@ -356,6 +357,11 @@ type ExperimentalConfig struct {
// you need to first disable this flag and redeploy cp, after config is rewritten to default
// format you can downgrade your cp
UseTagFirstVirtualOutboundModel bool `json:"useTagFirstVirtualOutboundModel" envconfig:"KUMA_EXPERIMENTAL_USE_TAG_FIRST_VIRTUAL_OUTBOUND_MODEL"`
// List of prefixes that will be used to filter out tags by keys from ingress' available services section.
// This can trim the size of the ZoneIngress object significantly.
// The drawback is that you cannot use filtered out tags for traffic routing.
// If empty, no filter is applied.
IngressTagFilters []string `json:"ingressTagFilters" envconfig:"KUMA_EXPERIMENTAL_INGRESS_TAG_FILTERS"`
}

func (e ExperimentalConfig) Validate() error {
Expand Down
5 changes: 5 additions & 0 deletions pkg/config/app/kuma-cp/kuma-cp.defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,11 @@ experimental:
useTagFirstVirtualOutboundModel: false # ENV: KUMA_EXPERIMENTAL_USE_TAG_FIRST_VIRTUAL_OUTBOUND_MODEL
# If true, KDS will sync using incremental xDS updates
kdsDeltaEnabled: false # ENV: KUMA_EXPERIMENTAL_KDS_DELTA_ENABLED
# List of prefixes that will be used to filter out tags by keys from ingress' available services section.
# This can trim the size of the ZoneIngress object significantly.
# The drawback is that you cannot use filtered out tags for traffic routing.
# If empty, no filter is applied.
ingressTagFilters: [] # ENV: KUMA_EXPERIMENTAL_INGRESS_TAG_FILTERS

proxy:
gateway:
Expand Down
3 changes: 3 additions & 0 deletions pkg/config/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ var _ = Describe("Config loader", func() {
Expect(cfg.Experimental.KubeOutboundsAsVIPs).To(BeTrue())
Expect(cfg.Experimental.KDSDeltaEnabled).To(BeTrue())
Expect(cfg.Experimental.UseTagFirstVirtualOutboundModel).To(BeFalse())
Expect(cfg.Experimental.IngressTagFilters).To(ContainElements("kuma.io/service"))

Expect(cfg.Proxy.Gateway.GlobalDownstreamMaxConnections).To(BeNumerically("==", 1))
},
Expand Down Expand Up @@ -650,6 +651,7 @@ experimental:
cniApp: "kuma-cni"
kdsDeltaEnabled: true
useTagFirstVirtualOutboundModel: false
ingressTagFilters: ["kuma.io/service"]
proxy:
gateway:
globalDownstreamMaxConnections: 1
Expand Down Expand Up @@ -884,6 +886,7 @@ proxy:
"KUMA_EXPERIMENTAL_GATEWAY_API": "true",
"KUMA_EXPERIMENTAL_KUBE_OUTBOUNDS_AS_VIPS": "true",
"KUMA_EXPERIMENTAL_USE_TAG_FIRST_VIRTUAL_OUTBOUND_MODEL": "false",
"KUMA_EXPERIMENTAL_INGRESS_TAG_FILTERS": "kuma.io/service",
"KUMA_PROXY_GATEWAY_GLOBAL_DOWNSTREAM_MAX_CONNECTIONS": "1",
},
yamlFileConfig: "",
Expand Down
5 changes: 0 additions & 5 deletions pkg/insights/components.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package insights

import (
"fmt"

"golang.org/x/time/rate"

config_core "github.com/kumahq/kuma/pkg/config/core"
Expand All @@ -24,9 +22,6 @@ func Setup(rt runtime.Runtime) error {
return rate.NewLimiter(rate.Every(rt.Config().Metrics.Mesh.MinResyncTimeout.Duration), 1)
},
Registry: registry.Global(),
AddressPortGenerator: func(svc string) string {
return fmt.Sprintf("%s.%s:%d", svc, rt.Config().DNSServer.Domain, rt.Config().DNSServer.ServiceVipPort)
},
}, rt.Tenants())
return rt.Add(component.NewResilientComponent(log, resyncer))
}
47 changes: 22 additions & 25 deletions pkg/insights/resyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,13 @@ func MeshInsightKey(mesh string) model.ResourceKey {
}

type Config struct {
Registry registry.TypeRegistry
ResourceManager manager.ResourceManager
EventReaderFactory events.ListenerFactory
MinResyncTimeout time.Duration
MaxResyncTimeout time.Duration
Tick func(d time.Duration) <-chan time.Time
RateLimiterFactory func() *rate.Limiter
AddressPortGenerator func(string) string
Registry registry.TypeRegistry
ResourceManager manager.ResourceManager
EventReaderFactory events.ListenerFactory
MinResyncTimeout time.Duration
MaxResyncTimeout time.Duration
Tick func(d time.Duration) <-chan time.Time
RateLimiterFactory func() *rate.Limiter
}

type resyncer struct {
Expand All @@ -68,9 +67,8 @@ type resyncer struct {
// info provides an information about the last sync for both ServiceInsight and MeshInsight
// Previously this data was stored in the Resource itself, but since resyncer runs only on leader
// we can just save this in memory and save requests to the DB.
infos map[string]syncInfo
infosMux sync.RWMutex
addressPortGenerator func(string) string
infos map[string]syncInfo
infosMux sync.RWMutex

registry registry.TypeRegistry
tenantFn multitenant.Tenants
Expand All @@ -91,16 +89,15 @@ type syncInfo struct {
// resync every t = MaxResyncTimeout - MinResyncTimeout.
func NewResyncer(config *Config, tenantFn multitenant.Tenants) component.Component {
r := &resyncer{
minResyncTimeout: config.MinResyncTimeout,
maxResyncTimeout: config.MaxResyncTimeout,
eventFactory: config.EventReaderFactory,
rm: config.ResourceManager,
rateLimiterFactory: config.RateLimiterFactory,
rateLimiters: map[string]*rate.Limiter{},
infos: map[string]syncInfo{},
registry: config.Registry,
addressPortGenerator: config.AddressPortGenerator,
tenantFn: tenantFn,
minResyncTimeout: config.MinResyncTimeout,
maxResyncTimeout: config.MaxResyncTimeout,
eventFactory: config.EventReaderFactory,
rm: config.ResourceManager,
rateLimiterFactory: config.RateLimiterFactory,
rateLimiters: map[string]*rate.Limiter{},
infos: map[string]syncInfo{},
registry: config.Registry,
tenantFn: tenantFn,
}

r.tick = config.Tick
Expand Down Expand Up @@ -237,8 +234,6 @@ func populateInsight(serviceType mesh_proto.ServiceInsight_Service_Type, insight

dataplanes := insight.Services[svcName].Dataplanes

dataplanes.Total++

switch status {
case core_mesh.Online:
dataplanes.Online++
Expand Down Expand Up @@ -283,7 +278,8 @@ func (r *resyncer) createOrUpdateServiceInsight(ctx context.Context, mesh string
}

for _, inbound := range networking.GetInbound() {
populateInsight(mesh_proto.ServiceInsight_Service_internal, insight, inbound.GetService(), status, backend, r.addressPortGenerator(inbound.GetService()))
// address port is empty to save space in the resource. It will be filled by the server on API response
populateInsight(mesh_proto.ServiceInsight_Service_internal, insight, inbound.GetService(), status, backend, "")
}
}

Expand All @@ -296,7 +292,8 @@ func (r *resyncer) createOrUpdateServiceInsight(ctx context.Context, mesh string
}

for _, svc := range insight.Services {
online, total := svc.Dataplanes.Online, svc.Dataplanes.Total
online := svc.Dataplanes.Online
total := svc.Dataplanes.Online + svc.Dataplanes.Offline

switch {
case svc.ServiceType == mesh_proto.ServiceInsight_Service_external:
Expand Down
Loading