Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(kuma-cp) subscription finalizer, rev 2 #2526

Merged
merged 18 commits into from
Aug 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions api/generic/insights.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package generic

import (
"time"

"github.com/golang/protobuf/proto"
)

type Insight interface {
proto.Message
IsOnline() bool
GetLastSubscription() Subscription
UpdateSubscription(Subscription) error
}

type Subscription interface {
proto.Message
GetId() string
GetGeneration() uint32
SetDisconnectTime(time time.Time)
}
16 changes: 14 additions & 2 deletions api/mesh/v1alpha1/dataplane_insight.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions api/mesh/v1alpha1/dataplane_insight.proto
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ message DiscoverySubscription {

// Version of Envoy and Kuma dataplane
Version version = 6;

// Generation is an integer number which is periodically increased by the
// status sink
uint32 generation = 7;
}

// DiscoverySubscriptionStatus defines status of an ADS subscription.
Expand Down
74 changes: 48 additions & 26 deletions api/mesh/v1alpha1/dataplane_insight_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@ import (
"strings"
"time"

"github.com/pkg/errors"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/kumahq/kuma/api/generic"
)

var _ generic.Insight = &DataplaneInsight{}

func NewSubscriptionStatus() *DiscoverySubscriptionStatus {
return &DiscoverySubscriptionStatus{
Total: &DiscoveryServiceStats{},
Expand All @@ -32,74 +37,80 @@ func NewVersion() *Version {
}
}

func (ds *DataplaneInsight) IsOnline() bool {
for _, s := range ds.GetSubscriptions() {
if s.ConnectTime != nil && s.DisconnectTime == nil {
func (x *DataplaneInsight) IsOnline() bool {
for _, s := range x.GetSubscriptions() {
if s.GetConnectTime() != nil && s.GetDisconnectTime() == nil {
return true
}
}
return false
}

func (ds *DataplaneInsight) GetSubscription(id string) (int, *DiscoverySubscription) {
for i, s := range ds.GetSubscriptions() {
func (x *DataplaneInsight) GetSubscription(id string) (int, *DiscoverySubscription) {
for i, s := range x.GetSubscriptions() {
if s.Id == id {
return i, s
}
}
return -1, nil
}

func (ds *DataplaneInsight) UpdateCert(generation time.Time, expiration time.Time) error {
if ds.MTLS == nil {
ds.MTLS = &DataplaneInsight_MTLS{}
func (x *DataplaneInsight) UpdateCert(generation time.Time, expiration time.Time) error {
if x.MTLS == nil {
x.MTLS = &DataplaneInsight_MTLS{}
}
ts := timestamppb.New(expiration)
if err := ts.CheckValid(); err != nil {
return err
}
ds.MTLS.CertificateExpirationTime = ts
ds.MTLS.CertificateRegenerations++
x.MTLS.CertificateExpirationTime = ts
x.MTLS.CertificateRegenerations++
ts = timestamppb.New(generation)
if err := ts.CheckValid(); err != nil {
return err
}
ds.MTLS.LastCertificateRegeneration = ts
x.MTLS.LastCertificateRegeneration = ts
return nil
}

func (ds *DataplaneInsight) UpdateSubscription(s *DiscoverySubscription) {
if ds == nil {
return
func (x *DataplaneInsight) UpdateSubscription(s generic.Subscription) error {
if x == nil {
return nil
}
discoverySubscription, ok := s.(*DiscoverySubscription)
if !ok {
return errors.Errorf("invalid type %T for DataplaneInsight", s)
}
i, old := ds.GetSubscription(s.Id)
i, old := x.GetSubscription(discoverySubscription.Id)
if old != nil {
ds.Subscriptions[i] = s
x.Subscriptions[i] = discoverySubscription
} else {
ds.finalizeSubscriptions()
ds.Subscriptions = append(ds.Subscriptions, s)
x.finalizeSubscriptions()
x.Subscriptions = append(x.Subscriptions, discoverySubscription)
}
return nil
}

// If Kuma CP was killed ungracefully then we can get a subscription without a DisconnectTime.
// Because of the way we process subscriptions the lack of DisconnectTime on old subscription
// will cause wrong status.
func (ds *DataplaneInsight) finalizeSubscriptions() {
func (x *DataplaneInsight) finalizeSubscriptions() {
now := timestamppb.Now()
for _, subscription := range ds.GetSubscriptions() {
for _, subscription := range x.GetSubscriptions() {
if subscription.DisconnectTime == nil {
subscription.DisconnectTime = now
}
}
}

func (ds *DataplaneInsight) GetLatestSubscription() (*DiscoverySubscription, *time.Time) {
if len(ds.GetSubscriptions()) == 0 {
// todo(lobkovilya): delete GetLatestSubscription, use GetLastSubscription instead
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this be done in this PR or separate? If separate, do we have a issue / card for it?

func (x *DataplaneInsight) GetLatestSubscription() (*DiscoverySubscription, *time.Time) {
if len(x.GetSubscriptions()) == 0 {
return nil, nil
}
var idx int = 0
var latest *time.Time
for i, s := range ds.GetSubscriptions() {
for i, s := range x.GetSubscriptions() {
if err := s.ConnectTime.CheckValid(); err != nil {
continue
}
Expand All @@ -109,12 +120,23 @@ func (ds *DataplaneInsight) GetLatestSubscription() (*DiscoverySubscription, *ti
latest = &t
}
}
return ds.Subscriptions[idx], latest
return x.Subscriptions[idx], latest
}

func (x *DataplaneInsight) GetLastSubscription() generic.Subscription {
if len(x.GetSubscriptions()) == 0 {
return nil
}
return x.GetSubscriptions()[len(x.GetSubscriptions())-1]
}

func (x *DiscoverySubscription) SetDisconnectTime(t time.Time) {
x.DisconnectTime = timestamppb.New(t)
}

func (ds *DataplaneInsight) Sum(v func(*DiscoverySubscription) uint64) uint64 {
func (x *DataplaneInsight) Sum(v func(*DiscoverySubscription) uint64) uint64 {
var result uint64 = 0
for _, s := range ds.GetSubscriptions() {
for _, s := range x.GetSubscriptions() {
result += v(s)
}
return result
Expand Down
33 changes: 29 additions & 4 deletions api/mesh/v1alpha1/dataplane_insight_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

util_proto "github.com/kumahq/kuma/api/internal/util/proto"
. "github.com/kumahq/kuma/api/mesh/v1alpha1"
system_proto "github.com/kumahq/kuma/api/system/v1alpha1"
)

var _ = Describe("DataplaneHelpers", func() {
Expand Down Expand Up @@ -36,7 +37,7 @@ var _ = Describe("DataplaneHelpers", func() {
}

// when
status.UpdateSubscription(subscription)
Expect(status.UpdateSubscription(subscription)).To(Succeed())

// then
Expect(util_proto.ToYAML(status)).To(MatchYAML(`
Expand Down Expand Up @@ -75,7 +76,7 @@ var _ = Describe("DataplaneHelpers", func() {
}

// when
status.UpdateSubscription(subscription)
Expect(status.UpdateSubscription(subscription)).To(Succeed())

// then
Expect(util_proto.ToYAML(status)).To(MatchYAML(`
Expand Down Expand Up @@ -116,15 +117,39 @@ var _ = Describe("DataplaneHelpers", func() {
}

// when
dataplaneInsight.UpdateSubscription(&DiscoverySubscription{
Expect(dataplaneInsight.UpdateSubscription(&DiscoverySubscription{
Id: "3",
ConnectTime: util_proto.MustTimestampProto(t1.Add(3 * time.Hour)),
})
})).To(Succeed())

// then
_, subscription := dataplaneInsight.GetSubscription("2")
Expect(subscription.DisconnectTime).ToNot(BeNil())
})

It("should return error for wrong subscription type", func() {
// given
dataplaneInsight := &DataplaneInsight{
Subscriptions: []*DiscoverySubscription{
{
Id: "1",
ConnectTime: util_proto.MustTimestampProto(t1),
DisconnectTime: util_proto.MustTimestampProto(t1.Add(1 * time.Hour)),
},
{
Id: "2",
ConnectTime: util_proto.MustTimestampProto(t1.Add(2 * time.Hour)),
},
},
}

// when
err := dataplaneInsight.UpdateSubscription(&system_proto.KDSSubscription{})

// then
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal("invalid type *v1alpha1.KDSSubscription for DataplaneInsight"))
})
})

Describe("GetLatestSubscription()", func() {
Expand Down
28 changes: 23 additions & 5 deletions api/mesh/v1alpha1/zone_ingress_insight_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,14 @@ package v1alpha1
import (
"time"

"github.com/pkg/errors"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/kumahq/kuma/api/generic"
)

var _ generic.Insight = &ZoneIngressInsight{}

func (x *ZoneIngressInsight) GetSubscription(id string) (int, *DiscoverySubscription) {
for i, s := range x.GetSubscriptions() {
if s.Id == id {
Expand All @@ -15,17 +20,22 @@ func (x *ZoneIngressInsight) GetSubscription(id string) (int, *DiscoverySubscrip
return -1, nil
}

func (x *ZoneIngressInsight) UpdateSubscription(s *DiscoverySubscription) {
func (x *ZoneIngressInsight) UpdateSubscription(s generic.Subscription) error {
if x == nil {
return
return nil
}
i, old := x.GetSubscription(s.Id)
discoverySubscription, ok := s.(*DiscoverySubscription)
if !ok {
return errors.Errorf("invalid type %T for ZoneIngressInsight", s)
}
i, old := x.GetSubscription(discoverySubscription.Id)
if old != nil {
x.Subscriptions[i] = s
x.Subscriptions[i] = discoverySubscription
} else {
x.finalizeSubscriptions()
x.Subscriptions = append(x.Subscriptions, s)
x.Subscriptions = append(x.Subscriptions, discoverySubscription)
}
return nil
}

// If Kuma CP was killed ungracefully then we can get a subscription without a DisconnectTime.
Expand All @@ -49,6 +59,14 @@ func (x *ZoneIngressInsight) IsOnline() bool {
return false
}

func (x *ZoneIngressInsight) GetLastSubscription() generic.Subscription {
if len(x.GetSubscriptions()) == 0 {
return nil
}
return x.GetSubscriptions()[len(x.GetSubscriptions())-1]
}

// todo(lobkovilya): delete GetLatestSubscription, use GetLastSubscription instead
func (x *ZoneIngressInsight) GetLatestSubscription() (*DiscoverySubscription, *time.Time) {
if len(x.GetSubscriptions()) == 0 {
return nil, nil
Expand Down
29 changes: 27 additions & 2 deletions api/mesh/v1alpha1/zone_ingress_insight_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

util_proto "github.com/kumahq/kuma/api/internal/util/proto"
mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
system_proto "github.com/kumahq/kuma/api/system/v1alpha1"
)

var _ = Describe("Zone Ingress Insights", func() {
Expand All @@ -31,14 +32,38 @@ var _ = Describe("Zone Ingress Insights", func() {
}

// when
zoneInsight.UpdateSubscription(&mesh_proto.DiscoverySubscription{
Expect(zoneInsight.UpdateSubscription(&mesh_proto.DiscoverySubscription{
Id: "3",
ConnectTime: util_proto.MustTimestampProto(t1.Add(3 * time.Hour)),
})
})).To(Succeed())

// then
_, subscription := zoneInsight.GetSubscription("2")
Expect(subscription.DisconnectTime).ToNot(BeNil())
})

It("should return error for wrong subscription type", func() {
// given
zoneInsight := &mesh_proto.ZoneIngressInsight{
Subscriptions: []*mesh_proto.DiscoverySubscription{
{
Id: "1",
ConnectTime: util_proto.MustTimestampProto(t1),
DisconnectTime: util_proto.MustTimestampProto(t1.Add(1 * time.Hour)),
},
{
Id: "2",
ConnectTime: util_proto.MustTimestampProto(t1.Add(2 * time.Hour)),
},
},
}

// when
err := zoneInsight.UpdateSubscription(&system_proto.KDSSubscription{})

// then
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal("invalid type *v1alpha1.KDSSubscription for ZoneIngressInsight"))
})
})
})
Loading