Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kuma-cp): better xds metrics #7208

Merged
merged 3 commits into from
Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ var _ = Describe("Gateway Route", func() {

Expect(proxy.Dataplane.Spec.IsBuiltinGateway()).To(BeTrue())

if err := reconciler.Reconcile(*ctx, proxy); err != nil {
if _, err := reconciler.Reconcile(*ctx, proxy); err != nil {
return nil, err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/plugins/runtime/gateway/listener_generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ var _ = Describe("Gateway Listener", func() {

Expect(proxy.Dataplane.Spec.IsBuiltinGateway()).To(BeTrue())

if err := reconciler.Reconcile(*ctx, proxy); err != nil {
if _, err := reconciler.Reconcile(*ctx, proxy); err != nil {
return nil, err
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/xds/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@ import (
)

type Metrics struct {
XdsGenerations prometheus.Summary
XdsGenerations *prometheus.SummaryVec
XdsGenerationsErrors prometheus.Counter
KubeAuthCache *prometheus.CounterVec
}

func NewMetrics(metrics core_metrics.Metrics) (*Metrics, error) {
xdsGenerations := prometheus.NewSummary(prometheus.SummaryOpts{
xdsGenerations := prometheus.NewSummaryVec(prometheus.SummaryOpts{
Name: "xds_generation",
Help: "Summary of XDS Snapshot generation",
Objectives: core_metrics.DefaultObjectives,
})
}, []string{"proxy_type", "result"})
xdsGenerationsErrors := prometheus.NewCounter(prometheus.CounterOpts{
Name: "xds_generation_errors",
Help: "Counter of errors during XDS generation",
Expand Down
36 changes: 18 additions & 18 deletions pkg/xds/server/v3/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ func (r *reconciler) clearUndeliveredConfigStats(nodeId *envoy_core.Node) {
}
}

func (r *reconciler) Reconcile(ctx xds_context.Context, proxy *model.Proxy) error {
func (r *reconciler) Reconcile(ctx xds_context.Context, proxy *model.Proxy) (bool, error) {
node := &envoy_core.Node{Id: proxy.Id.String()}
snapshot, err := r.generator.GenerateSnapshot(ctx, proxy)
if err != nil {
return errors.Wrapf(err, "failed to generate a snapshot")
return false, errors.Wrapf(err, "failed to generate a snapshot")
}

// To avoid assigning a new version every time, compare with
Expand All @@ -76,33 +76,33 @@ func (r *reconciler) Reconcile(ctx xds_context.Context, proxy *model.Proxy) erro
// to Envoy. This ensures that we have as much in-band error
// information as possible, which is especially useful for tests
// that don't actually program an Envoy instance.
if len(changed) > 0 {
for _, resources := range snapshot.Resources {
for name, resource := range resources.Items {
if err := validateResource(resource.Resource); err != nil {
return errors.Wrapf(err, "invalid resource %q", name)
}
if len(changed) == 0 {
log.V(1).Info("config is the same")
return false, nil
}

for _, resources := range snapshot.Resources {
for name, resource := range resources.Items {
if err := validateResource(resource.Resource); err != nil {
return false, errors.Wrapf(err, "invalid resource %q", name)
}
}
}

if err := snapshot.Consistent(); err != nil {
log.Error(err, "inconsistent snapshot", "snapshot", snapshot, "proxy", proxy)
return errors.Wrap(err, "inconsistent snapshot")
}
log.Info("config has changed", "versions", changed)
} else {
log.V(1).Info("config is the same")
if err := snapshot.Consistent(); err != nil {
log.Error(err, "inconsistent snapshot", "snapshot", snapshot, "proxy", proxy)
return false, errors.Wrap(err, "inconsistent snapshot")
}
log.Info("config has changed", "versions", changed)

if err := r.cacher.Cache(node, snapshot); err != nil {
return errors.Wrap(err, "failed to store snapshot")
return false, errors.Wrap(err, "failed to store snapshot")
}

for _, version := range changed {
r.statsCallbacks.ConfigReadyForDelivery(version)
}

return nil
return true, nil
}

func validateResource(r envoy_types.Resource) error {
Expand Down
8 changes: 5 additions & 3 deletions pkg/xds/server/v3/reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,10 @@ var _ = Describe("Reconcile", func() {
Id: *xds_model.BuildProxyId("demo", "example"),
Dataplane: dataplane,
}
err = r.Reconcile(xds_context.Context{}, proxy)
changed, err := r.Reconcile(xds_context.Context{}, proxy)
// then
Expect(err).ToNot(HaveOccurred())
Expect(changed).To(BeTrue())
Expect(snapshot.Resources[envoy_types.Listener].Version).To(BeEmpty())
Expect(snapshot.Resources[envoy_types.Route].Version).To(BeEmpty())
Expect(snapshot.Resources[envoy_types.Cluster].Version).To(BeEmpty())
Expand All @@ -185,9 +186,10 @@ var _ = Describe("Reconcile", func() {

By("simulating discovery event (Dataplane watchdog triggers refresh)")
// when
err = r.Reconcile(xds_context.Context{}, proxy)
changed, err = r.Reconcile(xds_context.Context{}, proxy)
// then
Expect(err).ToNot(HaveOccurred())
Expect(changed).To(BeFalse())

By("verifying that snapshot versions remain the same")
// when
Expand All @@ -204,7 +206,7 @@ var _ = Describe("Reconcile", func() {

By("simulating discovery event (Dataplane gets changed)")
// when
err = r.Reconcile(xds_context.Context{}, proxy)
_, err = r.Reconcile(xds_context.Context{}, proxy)
// then
Expect(err).ToNot(HaveOccurred())

Expand Down
84 changes: 65 additions & 19 deletions pkg/xds/sync/dataplane_watchdog.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,19 @@ type DataplaneWatchdogDependencies struct {
ResManager core_manager.ReadOnlyResourceManager
}

type Status string

var (
SkipStatus Status = "skip"
GeneratedStatus Status = "generated"
ChangedStatus Status = "changed"
)

type SyncResult struct {
ProxyType mesh_proto.ProxyType
Status Status
}

type DataplaneWatchdog struct {
DataplaneWatchdogDependencies
key core_model.ResourceKey
Expand All @@ -52,10 +65,10 @@ func NewDataplaneWatchdog(deps DataplaneWatchdogDependencies, dpKey core_model.R
}
}

func (d *DataplaneWatchdog) Sync(ctx context.Context) error {
func (d *DataplaneWatchdog) Sync(ctx context.Context) (SyncResult, error) {
metadata := d.MetadataTracker.Metadata(d.key)
if metadata == nil {
return errors.New("metadata cannot be nil")
return SyncResult{}, errors.New("metadata cannot be nil")
}

if d.dpType == "" {
Expand All @@ -70,7 +83,7 @@ func (d *DataplaneWatchdog) Sync(ctx context.Context) error {
return d.syncEgress(ctx, metadata)
default:
// It might be a case that dp type is not yet inferred because there is no Dataplane definition yet.
return nil
return SyncResult{}, nil
}
}

Expand All @@ -91,17 +104,21 @@ func (d *DataplaneWatchdog) Cleanup() error {

// syncDataplane syncs state of the Dataplane.
// It uses Mesh Hash to decide if we need to regenerate configuration or not.
func (d *DataplaneWatchdog) syncDataplane(ctx context.Context, metadata *core_xds.DataplaneMetadata) error {
func (d *DataplaneWatchdog) syncDataplane(ctx context.Context, metadata *core_xds.DataplaneMetadata) (SyncResult, error) {
meshCtx, err := d.MeshCache.GetMeshContext(ctx, d.key.Mesh)
if err != nil {
return err
return SyncResult{}, err
}

certInfo := d.EnvoyCpCtx.Secrets.Info(d.key)
syncForCert := certInfo != nil && certInfo.ExpiringSoon() // check if we need to regenerate config because identity cert is expiring soon.
syncForConfig := meshCtx.Hash != d.lastHash // check if we need to regenerate config because Kuma policies has changed.
result := SyncResult{
ProxyType: mesh_proto.DataplaneProxyType,
}
if !syncForCert && !syncForConfig {
return nil
result.Status = SkipStatus
return result, nil
}
if syncForConfig {
d.log.V(1).Info("snapshot hash updated, reconcile", "prev", d.lastHash, "current", meshCtx.Hash)
Expand All @@ -116,65 +133,94 @@ func (d *DataplaneWatchdog) syncDataplane(ctx context.Context, metadata *core_xd
}
proxy, err := d.DataplaneProxyBuilder.Build(ctx, d.key, meshCtx)
if err != nil {
return err
return SyncResult{}, err
}
networking := proxy.Dataplane.Spec.Networking
envoyAdminMTLS, err := d.getEnvoyAdminMTLS(ctx, networking.Address, networking.AdvertisedAddress)
if err != nil {
return errors.Wrap(err, "could not get Envoy Admin mTLS certs")
return SyncResult{}, errors.Wrap(err, "could not get Envoy Admin mTLS certs")
}
proxy.EnvoyAdminMTLSCerts = envoyAdminMTLS
if !envoyCtx.Mesh.Resource.MTLSEnabled() {
d.EnvoyCpCtx.Secrets.Cleanup(d.key) // we need to cleanup secrets if mtls is disabled
}
proxy.Metadata = metadata
if err := d.DataplaneReconciler.Reconcile(*envoyCtx, proxy); err != nil {
return err
changed, err := d.DataplaneReconciler.Reconcile(*envoyCtx, proxy)
if err != nil {
return SyncResult{}, err
}
d.lastHash = meshCtx.Hash
return nil

if changed {
result.Status = ChangedStatus
} else {
result.Status = GeneratedStatus
}
return result, nil
}

// syncIngress synces state of Ingress Dataplane. Notice that it does not use Mesh Hash yet because Ingress supports many Meshes.
func (d *DataplaneWatchdog) syncIngress(ctx context.Context, metadata *core_xds.DataplaneMetadata) error {
func (d *DataplaneWatchdog) syncIngress(ctx context.Context, metadata *core_xds.DataplaneMetadata) (SyncResult, error) {
envoyCtx := &xds_context.Context{
ControlPlane: d.EnvoyCpCtx,
Mesh: xds_context.MeshContext{}, // ZoneIngress does not have a mesh!
}
proxy, err := d.IngressProxyBuilder.Build(ctx, d.key)
if err != nil {
return err
return SyncResult{}, err
}
networking := proxy.ZoneIngressProxy.ZoneIngressResource.Spec.GetNetworking()
envoyAdminMTLS, err := d.getEnvoyAdminMTLS(ctx, networking.GetAddress(), networking.GetAdvertisedAddress())
if err != nil {
return errors.Wrap(err, "could not get Envoy Admin mTLS certs")
return SyncResult{}, errors.Wrap(err, "could not get Envoy Admin mTLS certs")
}
proxy.EnvoyAdminMTLSCerts = envoyAdminMTLS
proxy.Metadata = metadata
return d.IngressReconciler.Reconcile(*envoyCtx, proxy)
changed, err := d.IngressReconciler.Reconcile(*envoyCtx, proxy)
if err != nil {
return SyncResult{}, err
}
result := SyncResult{
ProxyType: mesh_proto.IngressProxyType,
Status: GeneratedStatus,
}
if changed {
result.Status = ChangedStatus
}
return result, nil
}

// syncEgress syncs state of Egress Dataplane. Notice that it does not use
// Mesh Hash yet because Egress supports many Meshes.
func (d *DataplaneWatchdog) syncEgress(ctx context.Context, metadata *core_xds.DataplaneMetadata) error {
func (d *DataplaneWatchdog) syncEgress(ctx context.Context, metadata *core_xds.DataplaneMetadata) (SyncResult, error) {
envoyCtx := &xds_context.Context{
ControlPlane: d.EnvoyCpCtx,
Mesh: xds_context.MeshContext{}, // ZoneEgress does not have a mesh!
}

proxy, err := d.EgressProxyBuilder.Build(ctx, d.key)
if err != nil {
return err
return SyncResult{}, err
}
networking := proxy.ZoneEgressProxy.ZoneEgressResource.Spec.Networking
envoyAdminMTLS, err := d.getEnvoyAdminMTLS(ctx, networking.Address, "")
if err != nil {
return errors.Wrap(err, "could not get Envoy Admin mTLS certs")
return SyncResult{}, errors.Wrap(err, "could not get Envoy Admin mTLS certs")
}
proxy.EnvoyAdminMTLSCerts = envoyAdminMTLS
proxy.Metadata = metadata
return d.EgressReconciler.Reconcile(*envoyCtx, proxy)
changed, err := d.EgressReconciler.Reconcile(*envoyCtx, proxy)
if err != nil {
return SyncResult{}, err
}
result := SyncResult{
ProxyType: mesh_proto.IngressProxyType,
Status: GeneratedStatus,
}
if changed {
result.Status = ChangedStatus
}
return result, nil
}

func (d *DataplaneWatchdog) getEnvoyAdminMTLS(ctx context.Context, address string, advertisedAddress string) (core_xds.ServerSideMTLSCerts, error) {
Expand Down
12 changes: 8 additions & 4 deletions pkg/xds/sync/dataplane_watchdog_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,14 @@ func (d *dataplaneWatchdogFactory) New(dpKey model.ResourceKey) util_watchdog.Wa
},
OnTick: func() error {
start := core.Now()
defer func() {
d.xdsMetrics.XdsGenerations.Observe(float64(core.Now().Sub(start).Milliseconds()))
}()
return dataplaneWatchdog.Sync(ctx)
result, err := dataplaneWatchdog.Sync(ctx)
if err != nil {
return err
}
d.xdsMetrics.XdsGenerations.
WithLabelValues(string(result.ProxyType), string(result.Status)).
Observe(float64(core.Now().Sub(start).Milliseconds()))
return nil
},
OnError: func(err error) {
d.xdsMetrics.XdsGenerationsErrors.Inc()
Expand Down
12 changes: 6 additions & 6 deletions pkg/xds/sync/dataplane_watchdog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ type staticSnapshotReconciler struct {
proxy *core_xds.Proxy
}

func (s *staticSnapshotReconciler) Reconcile(ctx xds_context.Context, proxy *core_xds.Proxy) error {
func (s *staticSnapshotReconciler) Reconcile(ctx xds_context.Context, proxy *core_xds.Proxy) (bool, error) {
s.proxy = proxy
return nil
return true, nil
}

func (s *staticSnapshotReconciler) Clear(proxyId *core_xds.ProxyId) error {
Expand Down Expand Up @@ -124,7 +124,7 @@ var _ = Describe("Dataplane Watchdog", func() {

It("should reissue admin tls certificate when address has changed", func() {
// when
err := watchdog.Sync(ctx)
_, err := watchdog.Sync(ctx)

// then
Expect(err).ToNot(HaveOccurred())
Expand All @@ -144,7 +144,7 @@ var _ = Describe("Dataplane Watchdog", func() {

// and
time.Sleep(cacheExpirationTime)
err = watchdog.Sync(ctx)
_, err = watchdog.Sync(ctx)

// then cert is reissued with a new address
Expect(err).ToNot(HaveOccurred())
Expand All @@ -157,7 +157,7 @@ var _ = Describe("Dataplane Watchdog", func() {

It("should not reconcile if mesh hash is the same", func() {
// when
err := watchdog.Sync(ctx)
_, err := watchdog.Sync(ctx)

// then
Expect(err).ToNot(HaveOccurred())
Expand All @@ -166,7 +166,7 @@ var _ = Describe("Dataplane Watchdog", func() {
// when
snapshotReconciler.proxy = nil // set to nil so we can check if it was not called again
time.Sleep(cacheExpirationTime)
err = watchdog.Sync(ctx)
_, err = watchdog.Sync(ctx)

// then
Expect(err).ToNot(HaveOccurred())
Expand Down
2 changes: 1 addition & 1 deletion pkg/xds/sync/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type ConnectionInfoTracker interface {

// SnapshotReconciler reconciles Envoy XDS configuration (Snapshot) by executing all generators (pkg/xds/generator)
type SnapshotReconciler interface {
Reconcile(ctx xds_context.Context, proxy *core_xds.Proxy) error
Reconcile(ctx xds_context.Context, proxy *core_xds.Proxy) (bool, error)
Clear(proxyId *core_xds.ProxyId) error
}

Expand Down