Skip to content

Commit

Permalink
fix(kuma-cp) review
Browse files Browse the repository at this point in the history
Signed-off-by: Ilya Lobkov <lobkovilya@yandex.ru>
  • Loading branch information
lobkovilya committed Aug 13, 2021
1 parent d2094a3 commit f0754db
Show file tree
Hide file tree
Showing 14 changed files with 128 additions and 37 deletions.
2 changes: 1 addition & 1 deletion api/generic/insights.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ type Insight interface {
proto.Message
IsOnline() bool
GetLastSubscription() Subscription
UpdateSubscription(Subscription)
UpdateSubscription(Subscription) error
}

type Subscription interface {
Expand Down
10 changes: 7 additions & 3 deletions api/mesh/v1alpha1/dataplane_insight_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ import (
"strings"
"time"

"github.com/pkg/errors"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/kumahq/kuma/api/generic"
)

var _ generic.Insight = &DataplaneInsight{}

func NewSubscriptionStatus() *DiscoverySubscriptionStatus {
return &DiscoverySubscriptionStatus{
Total: &DiscoveryServiceStats{},
Expand Down Expand Up @@ -70,13 +73,13 @@ func (x *DataplaneInsight) UpdateCert(generation time.Time, expiration time.Time
return nil
}

func (x *DataplaneInsight) UpdateSubscription(s generic.Subscription) {
func (x *DataplaneInsight) UpdateSubscription(s generic.Subscription) error {
if x == nil {
return
return nil
}
discoverySubscription, ok := s.(*DiscoverySubscription)
if !ok {
return
return errors.Errorf("invalid type %T for DataplaneInsight", s)
}
i, old := x.GetSubscription(discoverySubscription.Id)
if old != nil {
Expand All @@ -85,6 +88,7 @@ func (x *DataplaneInsight) UpdateSubscription(s generic.Subscription) {
x.finalizeSubscriptions()
x.Subscriptions = append(x.Subscriptions, discoverySubscription)
}
return nil
}

// If Kuma CP was killed ungracefully then we can get a subscription without a DisconnectTime.
Expand Down
33 changes: 29 additions & 4 deletions api/mesh/v1alpha1/dataplane_insight_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

util_proto "github.com/kumahq/kuma/api/internal/util/proto"
. "github.com/kumahq/kuma/api/mesh/v1alpha1"
system_proto "github.com/kumahq/kuma/api/system/v1alpha1"
)

var _ = Describe("DataplaneHelpers", func() {
Expand Down Expand Up @@ -36,7 +37,7 @@ var _ = Describe("DataplaneHelpers", func() {
}

// when
status.UpdateSubscription(subscription)
Expect(status.UpdateSubscription(subscription)).To(Succeed())

// then
Expect(util_proto.ToYAML(status)).To(MatchYAML(`
Expand Down Expand Up @@ -75,7 +76,7 @@ var _ = Describe("DataplaneHelpers", func() {
}

// when
status.UpdateSubscription(subscription)
Expect(status.UpdateSubscription(subscription)).To(Succeed())

// then
Expect(util_proto.ToYAML(status)).To(MatchYAML(`
Expand Down Expand Up @@ -116,15 +117,39 @@ var _ = Describe("DataplaneHelpers", func() {
}

// when
dataplaneInsight.UpdateSubscription(&DiscoverySubscription{
Expect(dataplaneInsight.UpdateSubscription(&DiscoverySubscription{
Id: "3",
ConnectTime: util_proto.MustTimestampProto(t1.Add(3 * time.Hour)),
})
})).To(Succeed())

// then
_, subscription := dataplaneInsight.GetSubscription("2")
Expect(subscription.DisconnectTime).ToNot(BeNil())
})

It("should return error for wrong subscription type", func() {
// given
dataplaneInsight := &DataplaneInsight{
Subscriptions: []*DiscoverySubscription{
{
Id: "1",
ConnectTime: util_proto.MustTimestampProto(t1),
DisconnectTime: util_proto.MustTimestampProto(t1.Add(1 * time.Hour)),
},
{
Id: "2",
ConnectTime: util_proto.MustTimestampProto(t1.Add(2 * time.Hour)),
},
},
}

// when
err := dataplaneInsight.UpdateSubscription(&system_proto.KDSSubscription{})

// then
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal("invalid type *v1alpha1.KDSSubscription for DataplaneInsight"))
})
})

Describe("GetLatestSubscription()", func() {
Expand Down
10 changes: 7 additions & 3 deletions api/mesh/v1alpha1/zone_ingress_insight_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@ package v1alpha1
import (
"time"

"github.com/pkg/errors"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/kumahq/kuma/api/generic"
)

var _ generic.Insight = &ZoneIngressInsight{}

func (x *ZoneIngressInsight) GetSubscription(id string) (int, *DiscoverySubscription) {
for i, s := range x.GetSubscriptions() {
if s.Id == id {
Expand All @@ -17,13 +20,13 @@ func (x *ZoneIngressInsight) GetSubscription(id string) (int, *DiscoverySubscrip
return -1, nil
}

func (x *ZoneIngressInsight) UpdateSubscription(s generic.Subscription) {
func (x *ZoneIngressInsight) UpdateSubscription(s generic.Subscription) error {
if x == nil {
return
return nil
}
discoverySubscription, ok := s.(*DiscoverySubscription)
if !ok {
return
return errors.Errorf("invalid type %T for ZoneIngressInsight", s)
}
i, old := x.GetSubscription(discoverySubscription.Id)
if old != nil {
Expand All @@ -32,6 +35,7 @@ func (x *ZoneIngressInsight) UpdateSubscription(s generic.Subscription) {
x.finalizeSubscriptions()
x.Subscriptions = append(x.Subscriptions, discoverySubscription)
}
return nil
}

// If Kuma CP was killed ungracefully then we can get a subscription without a DisconnectTime.
Expand Down
29 changes: 27 additions & 2 deletions api/mesh/v1alpha1/zone_ingress_insight_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

util_proto "github.com/kumahq/kuma/api/internal/util/proto"
mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
system_proto "github.com/kumahq/kuma/api/system/v1alpha1"
)

var _ = Describe("Zone Ingress Insights", func() {
Expand All @@ -31,14 +32,38 @@ var _ = Describe("Zone Ingress Insights", func() {
}

// when
zoneInsight.UpdateSubscription(&mesh_proto.DiscoverySubscription{
Expect(zoneInsight.UpdateSubscription(&mesh_proto.DiscoverySubscription{
Id: "3",
ConnectTime: util_proto.MustTimestampProto(t1.Add(3 * time.Hour)),
})
})).To(Succeed())

// then
_, subscription := zoneInsight.GetSubscription("2")
Expect(subscription.DisconnectTime).ToNot(BeNil())
})

It("should return error for wrong subscription type", func() {
// given
zoneInsight := &mesh_proto.ZoneIngressInsight{
Subscriptions: []*mesh_proto.DiscoverySubscription{
{
Id: "1",
ConnectTime: util_proto.MustTimestampProto(t1),
DisconnectTime: util_proto.MustTimestampProto(t1.Add(1 * time.Hour)),
},
{
Id: "2",
ConnectTime: util_proto.MustTimestampProto(t1.Add(2 * time.Hour)),
},
},
}

// when
err := zoneInsight.UpdateSubscription(&system_proto.KDSSubscription{})

// then
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal("invalid type *v1alpha1.KDSSubscription for ZoneIngressInsight"))
})
})
})
10 changes: 7 additions & 3 deletions api/system/v1alpha1/zone_insight_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@ package v1alpha1
import (
"time"

"github.com/pkg/errors"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/kumahq/kuma/api/generic"
)

var _ generic.Insight = &ZoneInsight{}

func NewSubscriptionStatus() *KDSSubscriptionStatus {
return &KDSSubscriptionStatus{
Total: &KDSServiceStats{},
Expand Down Expand Up @@ -72,13 +75,13 @@ func (x *ZoneInsight) Sum(v func(*KDSSubscription) uint64) uint64 {
return result
}

func (x *ZoneInsight) UpdateSubscription(s generic.Subscription) {
func (x *ZoneInsight) UpdateSubscription(s generic.Subscription) error {
if x == nil {
return
return nil
}
kdsSubscription, ok := s.(*KDSSubscription)
if !ok {
return
return errors.Errorf("invalid type %T for ZoneInsight", s)
}
i, old := x.GetSubscription(kdsSubscription.Id)
if old != nil {
Expand All @@ -87,6 +90,7 @@ func (x *ZoneInsight) UpdateSubscription(s generic.Subscription) {
x.finalizeSubscriptions()
x.Subscriptions = append(x.Subscriptions, kdsSubscription)
}
return nil
}

// If Global CP was killed ungracefully then we can get a subscription without a DisconnectTime.
Expand Down
29 changes: 27 additions & 2 deletions api/system/v1alpha1/zone_insight_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
. "github.com/onsi/gomega"

util_proto "github.com/kumahq/kuma/api/internal/util/proto"
mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
system_proto "github.com/kumahq/kuma/api/system/v1alpha1"
)

Expand All @@ -31,14 +32,38 @@ var _ = Describe("Zone Insights", func() {
}

// when
zoneInsight.UpdateSubscription(&system_proto.KDSSubscription{
Expect(zoneInsight.UpdateSubscription(&system_proto.KDSSubscription{
Id: "3",
ConnectTime: util_proto.MustTimestampProto(t1.Add(3 * time.Hour)),
})
})).To(Succeed())

// then
_, subscription := zoneInsight.GetSubscription("2")
Expect(subscription.DisconnectTime).ToNot(BeNil())
})

It("should return error for wrong subscription type", func() {
// given
zoneInsight := &system_proto.ZoneInsight{
Subscriptions: []*system_proto.KDSSubscription{
{
Id: "1",
ConnectTime: util_proto.MustTimestampProto(t1),
DisconnectTime: util_proto.MustTimestampProto(t1.Add(1 * time.Hour)),
},
{
Id: "2",
ConnectTime: util_proto.MustTimestampProto(t1.Add(2 * time.Hour)),
},
},
}

// when
err := zoneInsight.UpdateSubscription(&mesh_proto.DiscoverySubscription{})

// then
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal("invalid type *v1alpha1.DiscoverySubscription for ZoneInsight"))
})
})
})
6 changes: 4 additions & 2 deletions pkg/core/resources/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func NewUpsertOpts(fs ...UpsertFunc) UpsertOpts {
return opts
}

func Upsert(manager ResourceManager, key model.ResourceKey, resource model.Resource, fn func(resource model.Resource), fs ...UpsertFunc) error {
func Upsert(manager ResourceManager, key model.ResourceKey, resource model.Resource, fn func(resource model.Resource) error, fs ...UpsertFunc) error {
upsert := func() error {
create := false
err := manager.Get(context.Background(), resource, store.GetBy(key))
Expand All @@ -134,7 +134,9 @@ func Upsert(manager ResourceManager, key model.ResourceKey, resource model.Resou
return err
}
}
fn(resource)
if err := fn(resource); err != nil {
return err
}
if create {
return manager.Create(context.Background(), resource, store.CreateBy(key))
} else {
Expand Down
4 changes: 2 additions & 2 deletions pkg/gc/finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ func (f *subscriptionFinalizer) checkResourceVersion(typ core_model.ResourceType
insight.GetLastSubscription().SetDisconnectTime(core.Now())

upsertInsight, _ := registry.Global().NewObject(typ)
err := manager.Upsert(f.rm, core_model.MetaToResourceKey(item.GetMeta()), upsertInsight, func(_ core_model.Resource) {
upsertInsight.GetSpec().(generic.Insight).UpdateSubscription(insight.GetLastSubscription())
err := manager.Upsert(f.rm, key, upsertInsight, func(r core_model.Resource) error {
return upsertInsight.GetSpec().(generic.Insight).UpdateSubscription(insight.GetLastSubscription())
})
if err != nil {
log.Error(err, "unable to finalize subscription")
Expand Down
8 changes: 4 additions & 4 deletions pkg/insights/resyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,9 @@ func (r *resyncer) createOrUpdateServiceInsight(mesh string) error {
}
}

err := manager.Upsert(r.rm, model.ResourceKey{Mesh: mesh, Name: ServiceInsightName(mesh)}, core_mesh.NewServiceInsightResource(), func(resource model.Resource) {
err := manager.Upsert(r.rm, model.ResourceKey{Mesh: mesh, Name: ServiceInsightName(mesh)}, core_mesh.NewServiceInsightResource(), func(resource model.Resource) error {
insight.LastSync = proto.MustTimestampProto(core.Now())
_ = resource.SetSpec(insight)
return resource.SetSpec(insight)
})
if err != nil {
if manager.IsMeshNotFound(err) {
Expand Down Expand Up @@ -349,9 +349,9 @@ func (r *resyncer) createOrUpdateMeshInsight(mesh string) error {
}
}

err := manager.Upsert(r.rm, model.ResourceKey{Mesh: model.NoMesh, Name: mesh}, core_mesh.NewMeshInsightResource(), func(resource model.Resource) {
err := manager.Upsert(r.rm, model.ResourceKey{Mesh: model.NoMesh, Name: mesh}, core_mesh.NewMeshInsightResource(), func(resource model.Resource) error {
insight.LastSync = proto.MustTimestampProto(core.Now())
_ = resource.SetSpec(insight)
return resource.SetSpec(insight)
})
if err != nil {
if manager.IsMeshNotFound(err) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/kds/server/status_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (s *zoneInsightStore) Upsert(zone string, subscription *system_proto.KDSSub
Name: zone,
}
zoneInsight := system.NewZoneInsightResource()
return manager.Upsert(s.resManager, key, zoneInsight, func(resource core_model.Resource) {
zoneInsight.Spec.UpdateSubscription(subscription)
return manager.Upsert(s.resManager, key, zoneInsight, func(resource core_model.Resource) error {
return zoneInsight.Spec.UpdateSubscription(subscription)
})
}
4 changes: 3 additions & 1 deletion pkg/sds/server/v3/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,12 @@ func (d *DataplaneReconciler) generateSnapshot(dataplane *core_mesh.DataplaneRes
}

func (d *DataplaneReconciler) updateInsights(dataplaneId core_model.ResourceKey, info snapshotInfo) error {
return core_manager.Upsert(d.resManager, dataplaneId, core_mesh.NewDataplaneInsightResource(), func(resource core_model.Resource) {
return core_manager.Upsert(d.resManager, dataplaneId, core_mesh.NewDataplaneInsightResource(), func(resource core_model.Resource) error {
insight := resource.(*core_mesh.DataplaneInsightResource)
if err := insight.Spec.UpdateCert(core.Now(), info.expiration); err != nil {
sdsServerLog.Error(err, "could not update the certificate", "dataplaneId", dataplaneId)
return err
}
return nil
}, core_manager.WithConflictRetry(d.upsertConfig.ConflictRetryBaseBackoff, d.upsertConfig.ConflictRetryMaxTimes)) // retry because DataplaneInsight could be updated from other parts of the code
}
8 changes: 4 additions & 4 deletions pkg/xds/server/callbacks/dataplane_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,16 +115,16 @@ func NewDataplaneLifecycle(resManager manager.ResourceManager, shutdownCh <-chan
func (d *DataplaneLifecycle) registerDataplane(dp *core_mesh.DataplaneResource) error {
key := model.MetaToResourceKey(dp.GetMeta())
existing := core_mesh.NewDataplaneResource()
return manager.Upsert(d.resManager, key, existing, func(resource model.Resource) {
_ = existing.SetSpec(dp.GetSpec()) // ignore error because the spec type is the same
return manager.Upsert(d.resManager, key, existing, func(resource model.Resource) error {
return existing.SetSpec(dp.GetSpec())
})
}

func (d *DataplaneLifecycle) registerZoneIngress(zi *core_mesh.ZoneIngressResource) error {
key := model.MetaToResourceKey(zi.GetMeta())
existing := core_mesh.NewZoneIngressResource()
return manager.Upsert(d.resManager, key, existing, func(resource model.Resource) {
_ = existing.SetSpec(zi.GetSpec()) // ignore error because the spec type is the same
return manager.Upsert(d.resManager, key, existing, func(resource model.Resource) error {
return existing.SetSpec(zi.GetSpec())
})
}

Expand Down
Loading

0 comments on commit f0754db

Please sign in to comment.