Skip to content

Commit

Permalink
feat(kuma-cp): better xds metrics (#7208)
Browse files Browse the repository at this point in the history
Add label "result" to "xds_generation" metric, allowed values are:
* skip - whenever we decide that we don't need to generate config (mesh hash did not change)
* generated - whenever we generated config, but it was the same
* changed - whenever we generated config and the config was actually different

Signed-off-by: Jakub Dyszkiewicz <jakub.dyszkiewicz@gmail.com>
  • Loading branch information
jakubdyszkiewicz authored Jul 11, 2023
1 parent b23774a commit 8c09d6f
Show file tree
Hide file tree
Showing 9 changed files with 108 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ var _ = Describe("Gateway Route", func() {

Expect(proxy.Dataplane.Spec.IsBuiltinGateway()).To(BeTrue())

if err := reconciler.Reconcile(*ctx, proxy); err != nil {
if _, err := reconciler.Reconcile(*ctx, proxy); err != nil {
return nil, err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/plugins/runtime/gateway/listener_generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ var _ = Describe("Gateway Listener", func() {

Expect(proxy.Dataplane.Spec.IsBuiltinGateway()).To(BeTrue())

if err := reconciler.Reconcile(*ctx, proxy); err != nil {
if _, err := reconciler.Reconcile(*ctx, proxy); err != nil {
return nil, err
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/xds/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@ import (
)

type Metrics struct {
XdsGenerations prometheus.Summary
XdsGenerations *prometheus.SummaryVec
XdsGenerationsErrors prometheus.Counter
KubeAuthCache *prometheus.CounterVec
}

func NewMetrics(metrics core_metrics.Metrics) (*Metrics, error) {
xdsGenerations := prometheus.NewSummary(prometheus.SummaryOpts{
xdsGenerations := prometheus.NewSummaryVec(prometheus.SummaryOpts{
Name: "xds_generation",
Help: "Summary of XDS Snapshot generation",
Objectives: core_metrics.DefaultObjectives,
})
}, []string{"proxy_type", "result"})
xdsGenerationsErrors := prometheus.NewCounter(prometheus.CounterOpts{
Name: "xds_generation_errors",
Help: "Counter of errors during XDS generation",
Expand Down
36 changes: 18 additions & 18 deletions pkg/xds/server/v3/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ func (r *reconciler) clearUndeliveredConfigStats(nodeId *envoy_core.Node) {
}
}

func (r *reconciler) Reconcile(ctx xds_context.Context, proxy *model.Proxy) error {
func (r *reconciler) Reconcile(ctx xds_context.Context, proxy *model.Proxy) (bool, error) {
node := &envoy_core.Node{Id: proxy.Id.String()}
snapshot, err := r.generator.GenerateSnapshot(ctx, proxy)
if err != nil {
return errors.Wrapf(err, "failed to generate a snapshot")
return false, errors.Wrapf(err, "failed to generate a snapshot")
}

// To avoid assigning a new version every time, compare with
Expand All @@ -76,33 +76,33 @@ func (r *reconciler) Reconcile(ctx xds_context.Context, proxy *model.Proxy) erro
// to Envoy. This ensures that we have as much in-band error
// information as possible, which is especially useful for tests
// that don't actually program an Envoy instance.
if len(changed) > 0 {
for _, resources := range snapshot.Resources {
for name, resource := range resources.Items {
if err := validateResource(resource.Resource); err != nil {
return errors.Wrapf(err, "invalid resource %q", name)
}
if len(changed) == 0 {
log.V(1).Info("config is the same")
return false, nil
}

for _, resources := range snapshot.Resources {
for name, resource := range resources.Items {
if err := validateResource(resource.Resource); err != nil {
return false, errors.Wrapf(err, "invalid resource %q", name)
}
}
}

if err := snapshot.Consistent(); err != nil {
log.Error(err, "inconsistent snapshot", "snapshot", snapshot, "proxy", proxy)
return errors.Wrap(err, "inconsistent snapshot")
}
log.Info("config has changed", "versions", changed)
} else {
log.V(1).Info("config is the same")
if err := snapshot.Consistent(); err != nil {
log.Error(err, "inconsistent snapshot", "snapshot", snapshot, "proxy", proxy)
return false, errors.Wrap(err, "inconsistent snapshot")
}
log.Info("config has changed", "versions", changed)

if err := r.cacher.Cache(node, snapshot); err != nil {
return errors.Wrap(err, "failed to store snapshot")
return false, errors.Wrap(err, "failed to store snapshot")
}

for _, version := range changed {
r.statsCallbacks.ConfigReadyForDelivery(version)
}

return nil
return true, nil
}

func validateResource(r envoy_types.Resource) error {
Expand Down
8 changes: 5 additions & 3 deletions pkg/xds/server/v3/reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,10 @@ var _ = Describe("Reconcile", func() {
Id: *xds_model.BuildProxyId("demo", "example"),
Dataplane: dataplane,
}
err = r.Reconcile(xds_context.Context{}, proxy)
changed, err := r.Reconcile(xds_context.Context{}, proxy)
// then
Expect(err).ToNot(HaveOccurred())
Expect(changed).To(BeTrue())
Expect(snapshot.Resources[envoy_types.Listener].Version).To(BeEmpty())
Expect(snapshot.Resources[envoy_types.Route].Version).To(BeEmpty())
Expect(snapshot.Resources[envoy_types.Cluster].Version).To(BeEmpty())
Expand All @@ -185,9 +186,10 @@ var _ = Describe("Reconcile", func() {

By("simulating discovery event (Dataplane watchdog triggers refresh)")
// when
err = r.Reconcile(xds_context.Context{}, proxy)
changed, err = r.Reconcile(xds_context.Context{}, proxy)
// then
Expect(err).ToNot(HaveOccurred())
Expect(changed).To(BeFalse())

By("verifying that snapshot versions remain the same")
// when
Expand All @@ -204,7 +206,7 @@ var _ = Describe("Reconcile", func() {

By("simulating discovery event (Dataplane gets changed)")
// when
err = r.Reconcile(xds_context.Context{}, proxy)
_, err = r.Reconcile(xds_context.Context{}, proxy)
// then
Expect(err).ToNot(HaveOccurred())

Expand Down
84 changes: 65 additions & 19 deletions pkg/xds/sync/dataplane_watchdog.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,19 @@ type DataplaneWatchdogDependencies struct {
ResManager core_manager.ReadOnlyResourceManager
}

type Status string

var (
SkipStatus Status = "skip"
GeneratedStatus Status = "generated"
ChangedStatus Status = "changed"
)

type SyncResult struct {
ProxyType mesh_proto.ProxyType
Status Status
}

type DataplaneWatchdog struct {
DataplaneWatchdogDependencies
key core_model.ResourceKey
Expand All @@ -52,10 +65,10 @@ func NewDataplaneWatchdog(deps DataplaneWatchdogDependencies, dpKey core_model.R
}
}

func (d *DataplaneWatchdog) Sync(ctx context.Context) error {
func (d *DataplaneWatchdog) Sync(ctx context.Context) (SyncResult, error) {
metadata := d.MetadataTracker.Metadata(d.key)
if metadata == nil {
return errors.New("metadata cannot be nil")
return SyncResult{}, errors.New("metadata cannot be nil")
}

if d.dpType == "" {
Expand All @@ -70,7 +83,7 @@ func (d *DataplaneWatchdog) Sync(ctx context.Context) error {
return d.syncEgress(ctx, metadata)
default:
// It might be a case that dp type is not yet inferred because there is no Dataplane definition yet.
return nil
return SyncResult{}, nil
}
}

Expand All @@ -91,17 +104,21 @@ func (d *DataplaneWatchdog) Cleanup() error {

// syncDataplane syncs state of the Dataplane.
// It uses Mesh Hash to decide if we need to regenerate configuration or not.
func (d *DataplaneWatchdog) syncDataplane(ctx context.Context, metadata *core_xds.DataplaneMetadata) error {
func (d *DataplaneWatchdog) syncDataplane(ctx context.Context, metadata *core_xds.DataplaneMetadata) (SyncResult, error) {
meshCtx, err := d.MeshCache.GetMeshContext(ctx, d.key.Mesh)
if err != nil {
return err
return SyncResult{}, err
}

certInfo := d.EnvoyCpCtx.Secrets.Info(d.key)
syncForCert := certInfo != nil && certInfo.ExpiringSoon() // check if we need to regenerate config because identity cert is expiring soon.
syncForConfig := meshCtx.Hash != d.lastHash // check if we need to regenerate config because Kuma policies has changed.
result := SyncResult{
ProxyType: mesh_proto.DataplaneProxyType,
}
if !syncForCert && !syncForConfig {
return nil
result.Status = SkipStatus
return result, nil
}
if syncForConfig {
d.log.V(1).Info("snapshot hash updated, reconcile", "prev", d.lastHash, "current", meshCtx.Hash)
Expand All @@ -116,65 +133,94 @@ func (d *DataplaneWatchdog) syncDataplane(ctx context.Context, metadata *core_xd
}
proxy, err := d.DataplaneProxyBuilder.Build(ctx, d.key, meshCtx)
if err != nil {
return err
return SyncResult{}, err
}
networking := proxy.Dataplane.Spec.Networking
envoyAdminMTLS, err := d.getEnvoyAdminMTLS(ctx, networking.Address, networking.AdvertisedAddress)
if err != nil {
return errors.Wrap(err, "could not get Envoy Admin mTLS certs")
return SyncResult{}, errors.Wrap(err, "could not get Envoy Admin mTLS certs")
}
proxy.EnvoyAdminMTLSCerts = envoyAdminMTLS
if !envoyCtx.Mesh.Resource.MTLSEnabled() {
d.EnvoyCpCtx.Secrets.Cleanup(d.key) // we need to cleanup secrets if mtls is disabled
}
proxy.Metadata = metadata
if err := d.DataplaneReconciler.Reconcile(*envoyCtx, proxy); err != nil {
return err
changed, err := d.DataplaneReconciler.Reconcile(*envoyCtx, proxy)
if err != nil {
return SyncResult{}, err
}
d.lastHash = meshCtx.Hash
return nil

if changed {
result.Status = ChangedStatus
} else {
result.Status = GeneratedStatus
}
return result, nil
}

// syncIngress synces state of Ingress Dataplane. Notice that it does not use Mesh Hash yet because Ingress supports many Meshes.
func (d *DataplaneWatchdog) syncIngress(ctx context.Context, metadata *core_xds.DataplaneMetadata) error {
func (d *DataplaneWatchdog) syncIngress(ctx context.Context, metadata *core_xds.DataplaneMetadata) (SyncResult, error) {
envoyCtx := &xds_context.Context{
ControlPlane: d.EnvoyCpCtx,
Mesh: xds_context.MeshContext{}, // ZoneIngress does not have a mesh!
}
proxy, err := d.IngressProxyBuilder.Build(ctx, d.key)
if err != nil {
return err
return SyncResult{}, err
}
networking := proxy.ZoneIngressProxy.ZoneIngressResource.Spec.GetNetworking()
envoyAdminMTLS, err := d.getEnvoyAdminMTLS(ctx, networking.GetAddress(), networking.GetAdvertisedAddress())
if err != nil {
return errors.Wrap(err, "could not get Envoy Admin mTLS certs")
return SyncResult{}, errors.Wrap(err, "could not get Envoy Admin mTLS certs")
}
proxy.EnvoyAdminMTLSCerts = envoyAdminMTLS
proxy.Metadata = metadata
return d.IngressReconciler.Reconcile(*envoyCtx, proxy)
changed, err := d.IngressReconciler.Reconcile(*envoyCtx, proxy)
if err != nil {
return SyncResult{}, err
}
result := SyncResult{
ProxyType: mesh_proto.IngressProxyType,
Status: GeneratedStatus,
}
if changed {
result.Status = ChangedStatus
}
return result, nil
}

// syncEgress syncs state of Egress Dataplane. Notice that it does not use
// Mesh Hash yet because Egress supports many Meshes.
func (d *DataplaneWatchdog) syncEgress(ctx context.Context, metadata *core_xds.DataplaneMetadata) error {
func (d *DataplaneWatchdog) syncEgress(ctx context.Context, metadata *core_xds.DataplaneMetadata) (SyncResult, error) {
envoyCtx := &xds_context.Context{
ControlPlane: d.EnvoyCpCtx,
Mesh: xds_context.MeshContext{}, // ZoneEgress does not have a mesh!
}

proxy, err := d.EgressProxyBuilder.Build(ctx, d.key)
if err != nil {
return err
return SyncResult{}, err
}
networking := proxy.ZoneEgressProxy.ZoneEgressResource.Spec.Networking
envoyAdminMTLS, err := d.getEnvoyAdminMTLS(ctx, networking.Address, "")
if err != nil {
return errors.Wrap(err, "could not get Envoy Admin mTLS certs")
return SyncResult{}, errors.Wrap(err, "could not get Envoy Admin mTLS certs")
}
proxy.EnvoyAdminMTLSCerts = envoyAdminMTLS
proxy.Metadata = metadata
return d.EgressReconciler.Reconcile(*envoyCtx, proxy)
changed, err := d.EgressReconciler.Reconcile(*envoyCtx, proxy)
if err != nil {
return SyncResult{}, err
}
result := SyncResult{
ProxyType: mesh_proto.IngressProxyType,
Status: GeneratedStatus,
}
if changed {
result.Status = ChangedStatus
}
return result, nil
}

func (d *DataplaneWatchdog) getEnvoyAdminMTLS(ctx context.Context, address string, advertisedAddress string) (core_xds.ServerSideMTLSCerts, error) {
Expand Down
12 changes: 8 additions & 4 deletions pkg/xds/sync/dataplane_watchdog_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,14 @@ func (d *dataplaneWatchdogFactory) New(dpKey model.ResourceKey) util_watchdog.Wa
},
OnTick: func() error {
start := core.Now()
defer func() {
d.xdsMetrics.XdsGenerations.Observe(float64(core.Now().Sub(start).Milliseconds()))
}()
return dataplaneWatchdog.Sync(ctx)
result, err := dataplaneWatchdog.Sync(ctx)
if err != nil {
return err
}
d.xdsMetrics.XdsGenerations.
WithLabelValues(string(result.ProxyType), string(result.Status)).
Observe(float64(core.Now().Sub(start).Milliseconds()))
return nil
},
OnError: func(err error) {
d.xdsMetrics.XdsGenerationsErrors.Inc()
Expand Down
12 changes: 6 additions & 6 deletions pkg/xds/sync/dataplane_watchdog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ type staticSnapshotReconciler struct {
proxy *core_xds.Proxy
}

func (s *staticSnapshotReconciler) Reconcile(ctx xds_context.Context, proxy *core_xds.Proxy) error {
func (s *staticSnapshotReconciler) Reconcile(ctx xds_context.Context, proxy *core_xds.Proxy) (bool, error) {
s.proxy = proxy
return nil
return true, nil
}

func (s *staticSnapshotReconciler) Clear(proxyId *core_xds.ProxyId) error {
Expand Down Expand Up @@ -124,7 +124,7 @@ var _ = Describe("Dataplane Watchdog", func() {

It("should reissue admin tls certificate when address has changed", func() {
// when
err := watchdog.Sync(ctx)
_, err := watchdog.Sync(ctx)

// then
Expect(err).ToNot(HaveOccurred())
Expand All @@ -144,7 +144,7 @@ var _ = Describe("Dataplane Watchdog", func() {

// and
time.Sleep(cacheExpirationTime)
err = watchdog.Sync(ctx)
_, err = watchdog.Sync(ctx)

// then cert is reissued with a new address
Expect(err).ToNot(HaveOccurred())
Expand All @@ -157,7 +157,7 @@ var _ = Describe("Dataplane Watchdog", func() {

It("should not reconcile if mesh hash is the same", func() {
// when
err := watchdog.Sync(ctx)
_, err := watchdog.Sync(ctx)

// then
Expect(err).ToNot(HaveOccurred())
Expand All @@ -166,7 +166,7 @@ var _ = Describe("Dataplane Watchdog", func() {
// when
snapshotReconciler.proxy = nil // set to nil so we can check if it was not called again
time.Sleep(cacheExpirationTime)
err = watchdog.Sync(ctx)
_, err = watchdog.Sync(ctx)

// then
Expect(err).ToNot(HaveOccurred())
Expand Down
2 changes: 1 addition & 1 deletion pkg/xds/sync/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type ConnectionInfoTracker interface {

// SnapshotReconciler reconciles Envoy XDS configuration (Snapshot) by executing all generators (pkg/xds/generator)
type SnapshotReconciler interface {
Reconcile(ctx xds_context.Context, proxy *core_xds.Proxy) error
Reconcile(ctx xds_context.Context, proxy *core_xds.Proxy) (bool, error)
Clear(proxyId *core_xds.ProxyId) error
}

Expand Down

0 comments on commit 8c09d6f

Please sign in to comment.