Skip to content

Commit

Permalink
fix(kuma-cp) add owner when create ZoneIngressInsight (#2456)
Browse files Browse the repository at this point in the history
Signed-off-by: Ilya Lobkov <ilya.lobkov@konghq.com>
(cherry picked from commit cc10d19)
  • Loading branch information
lobkovilya authored and jpeach committed Aug 11, 2021
1 parent 491f5c1 commit 77970d9
Show file tree
Hide file tree
Showing 9 changed files with 300 additions and 2 deletions.
2 changes: 1 addition & 1 deletion app/kuma-dp/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func newRunCmd(rootCtx *RootContext) *cobra.Command {
}

if err != nil {
runLog.Error(err, "unable to read provided %s policy", cfg.Dataplane.ProxyType)
runLog.Error(err, "failed to read policy", "proxyType", cfg.Dataplane.ProxyType)
return err
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/core/bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/kumahq/kuma/pkg/core/managers/apis/dataplaneinsight"
mesh_managers "github.com/kumahq/kuma/pkg/core/managers/apis/mesh"
"github.com/kumahq/kuma/pkg/core/managers/apis/zone"
"github.com/kumahq/kuma/pkg/core/managers/apis/zoneingressinsight"
"github.com/kumahq/kuma/pkg/core/managers/apis/zoneinsight"
core_plugins "github.com/kumahq/kuma/pkg/core/plugins"
"github.com/kumahq/kuma/pkg/core/resources/apis/mesh"
Expand Down Expand Up @@ -313,6 +314,11 @@ func initializeResourceManager(cfg kuma_cp.Config, builder *core_runtime.Builder
zoneinsight.NewZoneInsightManager(builder.ResourceStore(), builder.Config().Metrics.Zone),
)

customizableManager.Customize(
mesh.ZoneIngressInsightType,
zoneingressinsight.NewZoneIngressInsightManager(builder.ResourceStore(), builder.Config().Metrics.Dataplane),
)

var cipher secret_cipher.Cipher
switch cfg.Store.Type {
case store.KubernetesStore:
Expand Down
3 changes: 2 additions & 1 deletion pkg/core/managers/apis/zone/zone_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ type zoneManager struct {
}

func (z *zoneManager) Delete(ctx context.Context, r model.Resource, opts ...core_store.DeleteOptionsFunc) error {
if err := z.validator.ValidateDelete(ctx, r.GetMeta().GetName()); err != nil {
options := core_store.NewDeleteOptions(opts...)
if err := z.validator.ValidateDelete(ctx, options.Name); err != nil {
return err
}
return z.ResourceManager.Delete(ctx, r, opts...)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package zoneingressinsight

import (
"context"

kuma_cp "github.com/kumahq/kuma/pkg/config/app/kuma-cp"
"github.com/kumahq/kuma/pkg/core"
"github.com/kumahq/kuma/pkg/core/resources/apis/mesh"
core_manager "github.com/kumahq/kuma/pkg/core/resources/manager"
core_model "github.com/kumahq/kuma/pkg/core/resources/model"
core_store "github.com/kumahq/kuma/pkg/core/resources/store"
)

func NewZoneIngressInsightManager(store core_store.ResourceStore, config *kuma_cp.DataplaneMetrics) core_manager.ResourceManager {
return &zoneIngressInsightManager{
ResourceManager: core_manager.NewResourceManager(store),
store: store,
config: config,
}
}

type zoneIngressInsightManager struct {
core_manager.ResourceManager
store core_store.ResourceStore
config *kuma_cp.DataplaneMetrics
}

func (m *zoneIngressInsightManager) Create(ctx context.Context, resource core_model.Resource, fs ...core_store.CreateOptionsFunc) error {
if err := resource.Validate(); err != nil {
return err
}
opts := core_store.NewCreateOptions(fs...)

m.limitSubscription(resource.(*mesh.ZoneIngressInsightResource))

zoneIngress := mesh.NewZoneIngressResource()
if err := m.store.Get(ctx, zoneIngress, core_store.GetByKey(opts.Name, core_model.NoMesh)); err != nil {
return err
}
return m.store.Create(ctx, resource, append(fs, core_store.CreatedAt(core.Now()), core_store.CreateWithOwner(zoneIngress))...)
}

func (m *zoneIngressInsightManager) Update(ctx context.Context, resource core_model.Resource, fs ...core_store.UpdateOptionsFunc) error {
m.limitSubscription(resource.(*mesh.ZoneIngressInsightResource))
return m.ResourceManager.Update(ctx, resource, fs...)
}

func (m *zoneIngressInsightManager) limitSubscription(zoneIngressInsight *mesh.ZoneIngressInsightResource) {
if !m.config.Enabled {
zoneIngressInsight.Spec.Subscriptions = nil
return
}
if m.config.SubscriptionLimit == 0 {
return
}
if len(zoneIngressInsight.Spec.Subscriptions) <= m.config.SubscriptionLimit {
return
}
s := zoneIngressInsight.Spec.Subscriptions
zoneIngressInsight.Spec.Subscriptions = s[len(s)-m.config.SubscriptionLimit:]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package zoneingressinsight_test

import (
"testing"

"github.com/kumahq/kuma/pkg/test"
)

func TestZoneIngressInsightManager(t *testing.T) {
test.RunSpecs(t, "Zone Ingress Insights Manager Suite")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package zoneingressinsight_test

import (
"context"
"fmt"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"

mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
kuma_cp "github.com/kumahq/kuma/pkg/config/app/kuma-cp"
"github.com/kumahq/kuma/pkg/core/managers/apis/zoneingressinsight"
"github.com/kumahq/kuma/pkg/core/resources/apis/mesh"
"github.com/kumahq/kuma/pkg/core/resources/model"
"github.com/kumahq/kuma/pkg/core/resources/store"
"github.com/kumahq/kuma/pkg/plugins/resources/memory"
)

var _ = Describe("ZoneIngressInsight Manager", func() {

It("should limit the number of subscription", func() {
// setup
s := memory.NewStore()
cfg := &kuma_cp.DataplaneMetrics{
Enabled: true,
SubscriptionLimit: 3,
}
manager := zoneingressinsight.NewZoneIngressInsightManager(s, cfg)

err := s.Create(context.Background(), mesh.NewZoneIngressResource(), store.CreateByKey("di1", model.NoMesh))
Expect(err).ToNot(HaveOccurred())

input := mesh.NewZoneIngressInsightResource()
for i := 0; i < 10; i++ {
input.Spec.Subscriptions = append(input.Spec.Subscriptions, &mesh_proto.DiscoverySubscription{
Id: fmt.Sprintf("%d", i),
})
}

// when
err = manager.Create(context.Background(), input, store.CreateByKey("di1", model.NoMesh))
Expect(err).ToNot(HaveOccurred())

actual := mesh.NewZoneIngressInsightResource()
err = s.Get(context.Background(), actual, store.GetByKey("di1", model.NoMesh))
Expect(err).ToNot(HaveOccurred())

// then
Expect(actual.Spec.Subscriptions).To(HaveLen(3))
Expect(actual.Spec.Subscriptions[0].Id).To(Equal("7"))
Expect(actual.Spec.Subscriptions[1].Id).To(Equal("8"))
Expect(actual.Spec.Subscriptions[2].Id).To(Equal("9"))
})

It("should cleanup subscriptions if disabled", func() {
// setup
s := memory.NewStore()
cfg := &kuma_cp.DataplaneMetrics{
Enabled: false,
}
manager := zoneingressinsight.NewZoneIngressInsightManager(s, cfg)

err := s.Create(context.Background(), mesh.NewZoneIngressResource(), store.CreateByKey("di1", model.NoMesh))
Expect(err).ToNot(HaveOccurred())

input := mesh.NewZoneIngressInsightResource()
for i := 0; i < 10; i++ {
input.Spec.Subscriptions = append(input.Spec.Subscriptions, &mesh_proto.DiscoverySubscription{
Id: fmt.Sprintf("%d", i),
})
}

// when
err = manager.Create(context.Background(), input, store.CreateByKey("di1", model.NoMesh))
Expect(err).ToNot(HaveOccurred())

actual := mesh.NewZoneIngressInsightResource()
err = s.Get(context.Background(), actual, store.GetByKey("di1", model.NoMesh))
Expect(err).ToNot(HaveOccurred())

// then
Expect(actual.Spec.Subscriptions).To(HaveLen(0))
Expect(actual.Spec.Subscriptions).To(BeNil())
})
})
109 changes: 109 additions & 0 deletions test/e2e/ownership/multizone_universal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package ownership

import (
"strings"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"

"github.com/kumahq/kuma/pkg/config/core"
. "github.com/kumahq/kuma/test/framework"
)

func MultizoneUniversal() {
var global, zoneUniversal Cluster
var optsGlobal, optsZone1 = KumaUniversalDeployOpts, KumaUniversalDeployOpts

BeforeEach(func() {
clusters, err := NewUniversalClusters([]string{Kuma1, Kuma2}, Silent)
Expect(err).ToNot(HaveOccurred())

// Global
global = clusters.GetCluster(Kuma1)
Expect(Kuma(core.Global, optsGlobal...)(global)).To(Succeed())
Expect(global.VerifyKuma()).To(Succeed())

// Cluster 1
optsZone1 = append(optsZone1, WithGlobalAddress(global.GetKuma().GetKDSServerAddress()))
zoneUniversal = clusters.GetCluster(Kuma2)
Expect(Kuma(core.Zone, optsZone1...)(zoneUniversal)).To(Succeed())
Expect(zoneUniversal.VerifyKuma()).To(Succeed())
})

E2EAfterEach(func() {
Expect(zoneUniversal.DeleteKuma(optsZone1...)).To(Succeed())
Expect(zoneUniversal.DismissCluster()).To(Succeed())

Expect(global.DeleteKuma(optsGlobal...)).To(Succeed())
Expect(global.DismissCluster()).To(Succeed())
})

installZoneIngress := func() {
ingressToken, err := global.GetKuma().GenerateZoneIngressToken(Kuma2)
Expect(err).ToNot(HaveOccurred())
Expect(IngressUniversal(ingressToken)(zoneUniversal)).To(Succeed())
}

installDataplane := func() {
token, err := global.GetKuma().GenerateDpToken("default", AppModeDemoClient)
Expect(err).ToNot(HaveOccurred())
Expect(DemoClientUniversal(AppModeDemoClient, "default", token)(zoneUniversal)).To(Succeed())
}

has := func(resourceURI string) func() bool {
return func() bool {
cmd := []string{"curl", "-v", "-m", "3", "--fail", "localhost:5681/" + resourceURI}
stdout, _, err := global.ExecWithRetries("", "", AppModeCP, cmd...)
Expect(err).ToNot(HaveOccurred())
return strings.Contains(stdout, `"total": 1`)
}
}

killKumaDP := func(appname string) {
_, _, err := zoneUniversal.Exec("", "", appname, "pkill", "-9", "envoy")
Expect(err).ToNot(HaveOccurred())
}

killZone := func() {
_, _, err := zoneUniversal.Exec("", "", AppModeCP, "pkill", "-9", "kuma-cp")
Expect(err).ToNot(HaveOccurred())
Eventually(func() (string, error) {
return global.GetKumactlOptions().RunKumactlAndGetOutput("inspect", "zones")
}, "30s", "1s").Should(ContainSubstring("Offline"))
Expect(global.GetKumactlOptions().RunKumactl("delete", "zone", Kuma2)).To(Succeed())
}

It("should delete ZoneInsight when Zone is deleted", func() {
Eventually(has("zones"), "30s", "1s").Should(BeTrue())
Eventually(has("zone-insights"), "30s", "1s").Should(BeTrue())

killZone()

Eventually(has("zones"), "30s", "1s").Should(BeFalse())
Eventually(has("zone-insights"), "30s", "1s").Should(BeFalse())
})

It("should delete ZoneIngressInsights when ZoneIngress is deleted", func() {
installZoneIngress()

Eventually(has("zone-ingresses"), "30s", "1s").Should(BeTrue())
Eventually(has("zone-ingress-insights"), "30s", "1s").Should(BeTrue())

killKumaDP(AppIngress)

Eventually(has("zone-ingresses"), "30s", "1s").Should(BeFalse())
Eventually(has("zone-ingress-insights"), "30s", "1s").Should(BeFalse())
})

It("should delete DataplaneInsight when Dataplane is deleted", func() {
installDataplane()

Eventually(has("dataplanes"), "30s", "1s").Should(BeTrue())
Eventually(has("dataplane-insights"), "30s", "1s").Should(BeTrue())

killKumaDP(AppModeDemoClient)

Eventually(has("dataplanes"), "30s", "1s").Should(BeFalse())
Eventually(has("dataplane-insights"), "30s", "1s").Should(BeFalse())
})
}
9 changes: 9 additions & 0 deletions test/e2e/ownership/mutlizone_universal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package ownership_test

import (
. "github.com/onsi/ginkgo"

"github.com/kumahq/kuma/test/e2e/ownership"
)

var _ = Describe("Test Multizone Ownership for Universal", ownership.MultizoneUniversal)
16 changes: 16 additions & 0 deletions test/e2e/ownership/ownership_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package ownership_test

import (
"testing"

"github.com/kumahq/kuma/pkg/test"
"github.com/kumahq/kuma/test/framework"
)

func TestE2EOwnership(t *testing.T) {
if framework.IsK8sClustersStarted() {
test.RunSpecs(t, "Ownership tests")
} else {
t.SkipNow()
}
}

0 comments on commit 77970d9

Please sign in to comment.