Skip to content

Commit

Permalink
fix(kuma-cp): make zone insight context independent from parent (#6909)
Browse files Browse the repository at this point in the history
When we are using parent context and the stream is canceled function responsible for flushing stats to the database is also canceled and fails. We need a separate context that still can flush data after the parent context is closed but the stream is not fully destroyed. That happens when using pgx and postgres.

Signed-off-by: Lukasz Dziedziak <lukidzi@gmail.com>
  • Loading branch information
lukidzi authored Jun 2, 2023
1 parent b178614 commit d80be85
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 1 deletion.
10 changes: 9 additions & 1 deletion pkg/kds/server/status_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
core_model "github.com/kumahq/kuma/pkg/core/resources/model"
"github.com/kumahq/kuma/pkg/core/resources/store"
"github.com/kumahq/kuma/pkg/core/user"
"github.com/kumahq/kuma/pkg/multitenant"
)

type ZoneInsightSink interface {
Expand Down Expand Up @@ -55,6 +56,13 @@ func (s *zoneInsightSink) Start(ctx context.Context, stop <-chan struct{}) {
var lastStoredState *system_proto.KDSSubscription
var generation uint32

gracefulCtx, cancel := context.WithCancel(context.Background())
defer cancel()
tenantId, ok := multitenant.TenantFromCtx(ctx)
if ok {
gracefulCtx = multitenant.WithTenant(gracefulCtx, tenantId)
}

flush := func() {
zone, currentState := s.accessor.GetStatus()
select {
Expand All @@ -67,7 +75,7 @@ func (s *zoneInsightSink) Start(ctx context.Context, stop <-chan struct{}) {
return
}

if err := s.store.Upsert(ctx, zone, currentState); err != nil {
if err := s.store.Upsert(gracefulCtx, zone, currentState); err != nil {
if store.IsResourceConflict(err) {
s.log.V(1).Info("failed to flush ZoneInsight because it was updated in other place. Will retry in the next tick", "zone", zone)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,4 +160,20 @@ func ResilienceMultizoneUniversalPostgres() {
return zoneUniversal.GetKumactlOptions().RunKumactlAndGetOutput("inspect", "zone-ingresses")
}, "40s", "1s").Should(ContainSubstring("Offline"))
})

It("should mark zone as offline when zone control-plane is down", func() {
// given zone connected to global
Eventually(func() (string, error) {
return global.GetKumactlOptions().RunKumactlAndGetOutput("inspect", "zones")
}, "30s", "1s").Should(ContainSubstring("Online"))

// when Zone CP is killed
_, _, err := zoneUniversal.Exec("", "", AppModeCP, "pkill", "-9", "kuma-cp")
Expect(err).ToNot(HaveOccurred())

// then zone is offline immediately
Eventually(func() (string, error) {
return global.GetKumactlOptions().RunKumactlAndGetOutput("inspect", "zones")
}, "10s", "1s").Should(ContainSubstring("Offline"))
})
}

0 comments on commit d80be85

Please sign in to comment.