Skip to content

Commit

Permalink
fix(kuma-cp) regenerate cert on DP change (#1988)
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Dyszkiewicz <jakub.dyszkiewicz@gmail.com>
  • Loading branch information
jakubdyszkiewicz authored May 20, 2021
1 parent c17ec8f commit ed84c9c
Show file tree
Hide file tree
Showing 7 changed files with 350 additions and 163 deletions.
149 changes: 87 additions & 62 deletions pkg/sds/server/v2/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,22 @@ import (
"crypto/x509"
"encoding/pem"
"fmt"
"strconv"
"strings"
"sync"
"time"

"github.com/golang/protobuf/proto"
"github.com/pkg/errors"

mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
"github.com/kumahq/kuma/pkg/config/core/resources/store"
sds_metrics "github.com/kumahq/kuma/pkg/sds/metrics"
envoy_common "github.com/kumahq/kuma/pkg/xds/envoy"
envoy_secrets "github.com/kumahq/kuma/pkg/xds/envoy/secrets/v2"
"github.com/kumahq/kuma/pkg/xds/envoy/tls"

"github.com/pkg/errors"

envoy_auth "github.com/envoyproxy/go-control-plane/envoy/api/v2/auth"
envoy_types "github.com/envoyproxy/go-control-plane/pkg/cache/types"
envoy_cache "github.com/envoyproxy/go-control-plane/pkg/cache/v2"
envoy_resource "github.com/envoyproxy/go-control-plane/pkg/resource/v2"

"github.com/kumahq/kuma/pkg/core"
"github.com/kumahq/kuma/pkg/core/ca/issuer"
mesh_core "github.com/kumahq/kuma/pkg/core/resources/apis/mesh"
mesh_helper "github.com/kumahq/kuma/pkg/core/resources/apis/mesh"
core_manager "github.com/kumahq/kuma/pkg/core/resources/manager"
Expand All @@ -38,8 +35,6 @@ import (
// When Dataplane connects to the Control Plane, the Watchdog (separate goroutine) is started which on the defined interval
// execute DataplaneReconciler#Reconcile. It will then check if certs needs to be regenerated because Mesh CA was changed
// This follows the same pattern as XDS.
//
// Snapshot are versioned with UnixNano;NameOfTheCA pattern
type DataplaneReconciler struct {
resManager core_manager.ResourceManager
readOnlyResManager core_manager.ReadOnlyResourceManager
Expand All @@ -48,6 +43,20 @@ type DataplaneReconciler struct {
cache envoy_cache.SnapshotCache
upsertConfig store.UpsertConfig
sdsMetrics *sds_metrics.Metrics

sync.RWMutex
// proxySnapshotInfo contains information about snapshot for every proxy
// It is used to make a decision whether to regenerate certificate or not.
// This can be kept in memory and not synced between instances of CP because the state of the stream is local to the control plane
// When DP reconnects to the CP, snapshot will be regenerated anyways, because the stream is reinitialized.
proxySnapshotInfo map[string]snapshotInfo
}

type snapshotInfo struct {
tags mesh_proto.MultiValueTagSet
mtls *mesh_proto.Mesh_Mtls
expiration time.Time
generation time.Time
}

func (d *DataplaneReconciler) Reconcile(dataplaneId core_model.ResourceKey) error {
Expand All @@ -74,103 +83,119 @@ func (d *DataplaneReconciler) Reconcile(dataplaneId core_model.ResourceKey) erro
return nil
}

generateSnapshot, reason, err := d.shouldGenerateSnapshot(proxyID, mesh)
generateSnapshot, reason, err := d.shouldGenerateSnapshot(proxyID, mesh, dataplane)
if err != nil {
return err
}

if generateSnapshot {
sdsServerLog.Info("Generating the Snapshot.", "dataplaneId", dataplaneId, "reason", reason)
snapshot, err := d.generateSnapshot(dataplane, mesh)
snapshot, info, err := d.generateSnapshot(dataplane, mesh)
if err != nil {
return err
}
d.sdsMetrics.CertGenerations(envoy_common.APIV2).Inc()
if err := d.updateInsights(dataplaneId, snapshot); err != nil {
// do not stop updating Envoy even if insights update fails
sdsServerLog.Error(err, "Could not update Dataplane Insights", "dataplaneId", dataplaneId)
}

d.sdsMetrics.CertGenerations(envoy_common.APIV3).Inc()
if err := d.cache.SetSnapshot(proxyID, snapshot); err != nil {
return err
}
d.setSnapshotInfo(proxyID, info)

if err := d.updateInsights(dataplaneId, info); err != nil {
// do not stop updating Envoy even if insights update fails
sdsServerLog.Error(err, "Could not update Dataplane Insights", "dataplaneId", dataplaneId)
}
}
return nil
}

func (d *DataplaneReconciler) Cleanup(dataplaneId core_model.ResourceKey) error {
proxyID := core_xds.FromResourceKey(dataplaneId).String()
if err := d.cache.SetSnapshot(proxyID, envoy_cache.Snapshot{}); err != nil {
return err
}
d.Lock()
delete(d.proxySnapshotInfo, proxyID)
d.Unlock()
return nil
}

func (d *DataplaneReconciler) shouldGenerateSnapshot(proxyID string, mesh *mesh_core.MeshResource) (bool, string, error) {
currentSnapshot, err := d.cache.GetSnapshot(proxyID)
func (d *DataplaneReconciler) shouldGenerateSnapshot(proxyID string, mesh *mesh_helper.MeshResource, dataplane *mesh_helper.DataplaneResource) (bool, string, error) {
_, err := d.cache.GetSnapshot(proxyID)
if err != nil {
return true, "Snapshot does not exist", nil
}

parts := strings.Split(currentSnapshot.GetVersion(envoy_resource.SecretType), ";")
if len(parts) != 2 {
return false, "", errors.New(`invalid snapshot version format. Format should be "UnixNano-NameOfTheCA"`)
info := d.snapshotInfo(proxyID)
if !proto.Equal(info.mtls, mesh.Spec.Mtls) {
return true, "Mesh mTLS settings has changed", nil
}
// generate snapshot if CA changed
caName := parts[1]
if caName != mesh.GetEnabledCertificateAuthorityBackend().Name {
return true, fmt.Sprintf("Enabled CA changed from %s to %s", caName, mesh.GetEnabledCertificateAuthorityBackend().Name), nil
if dataplane.Spec.TagSet().String() != info.tags.String() {
return true, "Dataplane tags have changed", nil
}

// generate snapshot if cert expired
generationUnixNano, err := strconv.Atoi(parts[0])
if err != nil {
return false, "", errors.Wrap(err, `invalid snapshot version format. Format should be "UnixNano;NameOfTheCA"`)
}
expiration := issuer.DefaultWorkloadCertValidityPeriod
if mesh.GetEnabledCertificateAuthorityBackend().GetDpCert().GetRotation().GetExpiration() != "" {
expiration, err = mesh_helper.ParseDuration(mesh.GetEnabledCertificateAuthorityBackend().GetDpCert().GetRotation().GetExpiration())
if err != nil {
return false, "", nil
}
}
generationTime := time.Unix(0, int64(generationUnixNano))
expirationTime := generationTime.Add(expiration)
if core.Now().After(generationTime.Add(expiration / 5 * 4)) { // regenerate cert after 4/5 of its lifetime
reason := fmt.Sprintf("Certificate generated at %s will expire in %s", generationTime, expirationTime.Sub(core.Now()))
lifetime := info.expiration.Sub(info.generation)
if core.Now().After(info.generation.Add(lifetime / 5 * 4)) { // regenerate cert after 4/5 of its lifetime
reason := fmt.Sprintf("Certificate generated at %s will expire in %s", info.generation, info.expiration.Sub(core.Now()))
return true, reason, nil
}
return false, "", nil
}

func (d *DataplaneReconciler) generateSnapshot(dataplane *mesh_core.DataplaneResource, mesh *mesh_core.MeshResource) (envoy_cache.Snapshot, error) {
func (d *DataplaneReconciler) snapshotInfo(proxyID string) snapshotInfo {
d.RLock()
defer d.RUnlock()
return d.proxySnapshotInfo[proxyID]
}

func (d *DataplaneReconciler) setSnapshotInfo(proxyID string, info snapshotInfo) {
d.Lock()
defer d.Unlock()
d.proxySnapshotInfo[proxyID] = info
}

func (d *DataplaneReconciler) generateSnapshot(dataplane *mesh_core.DataplaneResource, mesh *mesh_core.MeshResource) (envoy_cache.Snapshot, snapshotInfo, error) {
requestor := sds_identity.Identity{
Services: dataplane.Spec.TagSet(),
Mesh: dataplane.GetMeta().GetMesh(),
}
identitySecret, err := d.identityProvider.Get(context.Background(), requestor)
if err != nil {
return envoy_cache.Snapshot{}, errors.Wrap(err, "could not get Dataplane cert pair")
return envoy_cache.Snapshot{}, snapshotInfo{}, errors.Wrap(err, "could not get Dataplane cert pair")
}

caSecret, err := d.meshCaProvider.Get(context.Background(), dataplane.GetMeta().GetMesh())
if err != nil {
return envoy_cache.Snapshot{}, errors.Wrap(err, "could not get mesh CA cert")
return envoy_cache.Snapshot{}, snapshotInfo{}, errors.Wrap(err, "could not get mesh CA cert")
}

version := fmt.Sprintf("%d;%s", core.Now().UTC().UnixNano(), mesh.GetEnabledCertificateAuthorityBackend().Name)
snap := envoy_cache.Snapshot{
Resources: [envoy_types.UnknownType]envoy_cache.Resources{},
}
snap.Resources[envoy_types.Secret] = envoy_cache.NewResources(version, []envoy_types.Resource{
envoy_secrets.CreateIdentitySecret(identitySecret),
envoy_secrets.CreateCaSecret(caSecret),
})
return snap, nil
}

func (d *DataplaneReconciler) updateInsights(dataplaneId core_model.ResourceKey, snapshot envoy_cache.Snapshot) error {
secret := snapshot.Resources[envoy_types.Secret].Items[tls.IdentityCertResource].Resource.(*envoy_auth.Secret)
certPEM := secret.GetTlsCertificate().CertificateChain.GetInlineBytes()
block, _ := pem.Decode(certPEM)
block, _ := pem.Decode(identitySecret.PemCerts[0])
cert, err := x509.ParseCertificate(block.Bytes)
if err != nil {
return err
return envoy_cache.Snapshot{}, snapshotInfo{}, err
}

info := snapshotInfo{
tags: dataplane.Spec.TagSet(),
mtls: mesh.Spec.Mtls,
expiration: cert.NotAfter,
generation: core.Now(),
}

resources := envoy_cache.SnapshotResources{
Secrets: []envoy_types.Resource{
envoy_secrets.CreateIdentitySecret(identitySecret),
envoy_secrets.CreateCaSecret(caSecret),
},
}
snaphot := envoy_cache.NewSnapshotWithResources(core.NewUUID(), resources)
return snaphot, info, nil
}

func (d *DataplaneReconciler) updateInsights(dataplaneId core_model.ResourceKey, info snapshotInfo) error {
return core_manager.Upsert(d.resManager, dataplaneId, mesh_core.NewDataplaneInsightResource(), func(resource core_model.Resource) {
insight := resource.(*mesh_core.DataplaneInsightResource)
if err := insight.Spec.UpdateCert(core.Now(), cert.NotAfter); err != nil {
if err := insight.Spec.UpdateCert(core.Now(), info.expiration); err != nil {
sdsServerLog.Error(err, "could not update the certificate", "dataplaneId", dataplaneId)
}
}, core_manager.WithConflictRetry(d.upsertConfig.ConflictRetryBaseBackoff, d.upsertConfig.ConflictRetryMaxTimes)) // retry because DataplaneInsight could be updated from other parts of the code
Expand Down
6 changes: 6 additions & 0 deletions pkg/sds/server/v2/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func RegisterSDS(rt core_runtime.Runtime, sdsMetrics *sds_metrics.Metrics) error
cache: cache,
upsertConfig: rt.Config().Store.Upsert,
sdsMetrics: sdsMetrics,
proxySnapshotInfo: map[string]snapshotInfo{},
}

syncTracker, err := syncTracker(&reconciler, rt.Config().SdsServer.DataplaneConfigurationRefreshInterval, sdsMetrics)
Expand Down Expand Up @@ -88,6 +89,11 @@ func syncTracker(reconciler *DataplaneReconciler, refresh time.Duration, sdsMetr
sdsMetrics.SdsGenerationsErrors(envoy_common.APIV2).Inc()
sdsServerLog.Error(err, "OnTick() failed")
},
OnStop: func() {
if err := reconciler.Cleanup(dataplaneId); err != nil {
sdsServerLog.Error(err, "could not cleanup sync", "dataplane", dataplaneId)
}
},
}
}), nil
}
Expand Down
87 changes: 67 additions & 20 deletions pkg/sds/server/v2/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync/atomic"
"time"

"github.com/pkg/errors"
prometheus_client "github.com/prometheus/client_model/go"

sds_server "github.com/kumahq/kuma/pkg/sds/server"
Expand Down Expand Up @@ -167,6 +168,13 @@ var _ = Describe("SDS Server", func() {
close(stop)
})

BeforeEach(func() {
err := resManager.Delete(context.Background(), mesh_core.NewDataplaneInsightResource(), core_store.DeleteByKey("backend-01", "default"))
if !core_store.IsResourceNotFound(err) {
Expect(err).ToNot(HaveOccurred())
}
})

newRequestForSecrets := func() envoy_api.DiscoveryRequest {
return envoy_api.DiscoveryRequest{
Node: &envoy_api_core.Node{
Expand Down Expand Up @@ -199,13 +207,22 @@ var _ = Describe("SDS Server", func() {
Expect(resp).ToNot(BeNil())
Expect(resp.Resources).To(HaveLen(2))

// and insight is generated
dpInsight := mesh_core.NewDataplaneInsightResource()
err = resManager.Get(context.Background(), dpInsight, core_store.GetByKey("backend-01", "default"))
Expect(err).ToNot(HaveOccurred())
Expect(dpInsight.Spec.MTLS.CertificateRegenerations).To(Equal(uint32(1)))
expirationSeconds := now.Load().(time.Time).Add(60 * time.Second).Unix()
Expect(dpInsight.Spec.MTLS.CertificateExpirationTime.Seconds).To(Equal(expirationSeconds))
// and insight is generated (insight is updated async, to does not have to be done before response is sent)
Eventually(func() error {
dpInsight := mesh_core.NewDataplaneInsightResource()
err := resManager.Get(context.Background(), dpInsight, core_store.GetByKey("backend-01", "default"))
if err != nil {
return err
}
if dpInsight.Spec.MTLS.CertificateRegenerations != 1 {
return errors.Errorf("Certs were generated %d times. Expected 1", dpInsight.Spec.MTLS.CertificateRegenerations)
}
expirationSeconds := now.Load().(time.Time).Add(60 * time.Second).Unix()
if dpInsight.Spec.MTLS.CertificateExpirationTime.Seconds != expirationSeconds {
return errors.Errorf("Expiration time is not correct. Got %d, expected %d", dpInsight.Spec.MTLS.CertificateExpirationTime.Seconds, expirationSeconds)
}
return nil
}, "30s", "1s").ShouldNot(HaveOccurred())

// and metrics are published (metrics are published async, it does not have to be done before response is sent)
Eventually(func() float64 {
Expand All @@ -216,7 +233,7 @@ var _ = Describe("SDS Server", func() {
}, "5s").ShouldNot(BeNil())

close(done)
}, 10)
}, 60)

Context("should return new pair of + key", func() { // we cannot use DescribeTable because it does not support timeouts

Expand All @@ -242,7 +259,7 @@ var _ = Describe("SDS Server", func() {
Expect(resp.Resources).To(HaveLen(2))
firstExchangeResponse = resp
close(done)
}, 10)
}, 60)

AfterEach(func() {
Expect(stream.CloseSend()).To(Succeed())
Expand Down Expand Up @@ -270,18 +287,23 @@ var _ = Describe("SDS Server", func() {
Expect(resp).ToNot(BeNil())
Expect(firstExchangeResponse.Resources).ToNot(Equal(resp.Resources))

// and insight is updated
dpInsight := mesh_core.NewDataplaneInsightResource()
err = resManager.Get(context.Background(), dpInsight, core_store.GetByKey("backend-01", "default"))
Expect(err).ToNot(HaveOccurred())
Expect(dpInsight.Spec.MTLS.CertificateRegenerations).To(Equal(uint32(2)))
expirationSeconds := now.Load().(time.Time).Add(60 * time.Second).Unix()
Expect(dpInsight.Spec.MTLS.CertificateExpirationTime.Seconds).To(Equal(expirationSeconds))
// and insight is generated (insight is updated async, to does not have to be done before response is sent)
Eventually(func() error {
dpInsight := mesh_core.NewDataplaneInsightResource()
err := resManager.Get(context.Background(), dpInsight, core_store.GetByKey("backend-01", "default"))
if err != nil {
return err
}
if dpInsight.Spec.MTLS.CertificateRegenerations != 2 {
return errors.Errorf("Certs were generated %d times. Expected 2", dpInsight.Spec.MTLS.CertificateRegenerations)
}
return nil
}, "30s", "1s").ShouldNot(HaveOccurred())

close(done)
}, 10)
}, 60)

It("should return pair when cert expired", func(done Done) {
It("should return a new pair when cert expired", func(done Done) {
// when time is moved 1s after 4/5 of 60s cert expiration
shiftedTime := now.Load().(time.Time).Add(49 * time.Second)
now.Store(shiftedTime)
Expand All @@ -300,7 +322,32 @@ var _ = Describe("SDS Server", func() {
Expect(firstExchangeResponse.Resources).ToNot(Equal(resp.Resources))

close(done)
}, 10)
}, 60)

It("should return a new pair when dataplane has changed", func(done Done) {
// when
dpRes := mesh_core.NewDataplaneResource()
Expect(resManager.Get(context.Background(), dpRes, core_store.GetByKey("backend-01", "default"))).To(Succeed())
dpRes.Spec.Networking.Inbound[0].Tags["version"] = "xyz"

// when new tag is added
Expect(resManager.Update(context.Background(), dpRes)).To(Succeed())

// and when send a request with version previously fetched
req := newRequestForSecrets()
req.VersionInfo = firstExchangeResponse.VersionInfo
req.ResponseNonce = firstExchangeResponse.Nonce
err := stream.Send(&req)
Expect(err).ToNot(HaveOccurred())
resp, err := stream.Recv()
Expect(err).ToNot(HaveOccurred())

// then certs are different
Expect(resp).ToNot(BeNil())
Expect(firstExchangeResponse.Resources).ToNot(Equal(resp.Resources))

close(done)
}, 60)
})

It("should not return certs when DP is not authorized", func(done Done) {
Expand All @@ -323,5 +370,5 @@ var _ = Describe("SDS Server", func() {
Expect(err).To(MatchError("rpc error: code = Unknown desc = authentication failed: could not parse token: token contains an invalid number of segments"))

close(done)
}, 10)
}, 60)
})
Loading

0 comments on commit ed84c9c

Please sign in to comment.