Skip to content

Commit

Permalink
fix(kuma-cp) stop components on leader election lost (#2318)
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Dyszkiewicz <jakub.dyszkiewicz@gmail.com>
  • Loading branch information
jakubdyszkiewicz authored Jul 6, 2021
1 parent 048b961 commit 1c6a7c2
Show file tree
Hide file tree
Showing 12 changed files with 192 additions and 18 deletions.
6 changes: 3 additions & 3 deletions pkg/core/runtime/component/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,14 @@ func (cm *manager) startLeaderComponents(stop <-chan struct{}, errCh chan error)
closeLeaderCh := func() {
mutex.Lock()
defer mutex.Unlock()
if channels.IsClosed(leaderStopCh) {
if !channels.IsClosed(leaderStopCh) {
close(leaderStopCh)
}
}

cm.leaderElector.AddCallbacks(LeaderCallbacks{
OnStartedLeading: func() {
log.Info("Leader acquired")
log.Info("leader acquired")
mutex.Lock()
defer mutex.Unlock()
leaderStopCh = make(chan struct{})
Expand All @@ -118,7 +118,7 @@ func (cm *manager) startLeaderComponents(stop <-chan struct{}, errCh chan error)
}
},
OnStoppedLeading: func() {
log.Info("Leader lost")
log.Info("leader lost")
closeLeaderCh()
},
})
Expand Down
31 changes: 19 additions & 12 deletions pkg/plugins/leader/postgres/leader_elector.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,29 +39,22 @@ func NewPostgresLeaderElector(lockClient *pglock.Client) component.LeaderElector
}

func (p *postgresLeaderElector) Start(stop <-chan struct{}) {
log.Info("Starting leader election")
log.Info("starting Leader Elector")
ctx, cancelFn := context.WithCancel(context.Background())
go func() {
<-stop
log.Info("Stopping Leader Elector")
log.Info("stopping Leader Elector")
cancelFn()
}()

for {
log.Info("Waiting for lock")
log.Info("waiting for lock")
err := p.lockClient.Do(ctx, kumaLockName, func(ctx context.Context, lock *pglock.Lock) error {
p.setLeader(true)
for _, callback := range p.callbacks {
callback.OnStartedLeading()
}
p.leaderAcquired()
<-ctx.Done()
p.setLeader(false)
for _, callback := range p.callbacks {
callback.OnStoppedLeading()
}
p.leaderLost()
return nil
})
p.setLeader(false)
// in case of error (ex. connection to postgres is dropped) we want to retry the lock with some backoff
// returning error here would shut down the CP
if err != nil {
Expand All @@ -76,6 +69,20 @@ func (p *postgresLeaderElector) Start(stop <-chan struct{}) {
log.Info("Leader Elector stopped")
}

func (p *postgresLeaderElector) leaderAcquired() {
p.setLeader(true)
for _, callback := range p.callbacks {
callback.OnStartedLeading()
}
}

func (p *postgresLeaderElector) leaderLost() {
p.setLeader(false)
for _, callback := range p.callbacks {
callback.OnStoppedLeading()
}
}

func (p *postgresLeaderElector) AddCallbacks(callbacks component.LeaderCallbacks) {
p.callbacks = append(p.callbacks, callbacks)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/plugins/runtime/universal/outbound/outbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/kumahq/kuma/pkg/dns"
)

var log = core.Log.WithName("dns-vips-allocator")
var log = core.Log.WithName("vip-outbounds-reconciler")

type VIPOutboundsReconciler struct {
rorm manager.ReadOnlyResourceManager
Expand Down
93 changes: 93 additions & 0 deletions test/e2e/resilience/leader_election_postgres.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package resilience

import (
"github.com/kumahq/kuma/pkg/config/core"
. "github.com/kumahq/kuma/test/framework"
"github.com/kumahq/kuma/test/framework/deployments/postgres"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)

func LeaderElectionPostgres() {
var standalone1, standalone2 Cluster
var standalone1Opts, standalone2Opts []DeployOptionsFunc

BeforeEach(func() {
clusters, err := NewUniversalClusters(
[]string{Kuma1, Kuma2},
Silent)
Expect(err).ToNot(HaveOccurred())
standalone1 = clusters.GetCluster(Kuma1)
standalone2 = clusters.GetCluster(Kuma2)

err = NewClusterSetup().
Install(postgres.Install(Kuma1)).
Setup(standalone1)
Expect(err).ToNot(HaveOccurred())
postgresInstance := postgres.From(standalone1, Kuma1)

// Standalone 1
standalone1Opts = KumaUniversalDeployOpts
standalone1Opts = append(standalone1Opts, WithPostgres(postgresInstance.GetEnvVars()))

err = NewClusterSetup().
Install(Kuma(core.Standalone, standalone1Opts...)).
Setup(standalone1)

Expect(err).ToNot(HaveOccurred())
Expect(standalone1.VerifyKuma()).To(Succeed())

// Standalone 2
standalone2Opts = KumaUniversalDeployOpts
standalone2Opts = append(standalone2Opts, WithPostgres(postgresInstance.GetEnvVars()))

err = NewClusterSetup().
Install(Kuma(core.Standalone, standalone2Opts...)).
Setup(standalone2)

Expect(err).ToNot(HaveOccurred())
Expect(standalone2.VerifyKuma()).To(Succeed())
})

E2EAfterEach(func() {
err := standalone1.DeleteKuma(standalone1Opts...)
Expect(err).ToNot(HaveOccurred())
err = standalone1.DismissCluster()
Expect(err).ToNot(HaveOccurred())

err = standalone2.DeleteKuma(standalone2Opts...)
Expect(err).ToNot(HaveOccurred())
err = standalone2.DismissCluster()
Expect(err).ToNot(HaveOccurred())
})

It("should elect only one leader and drop the leader on DB disconnect", func() {
// given two instances of the control plane connected to one postgres, only one is a leader
Eventually(func() (string, error) {
return standalone1.GetKuma().GetMetrics()
}, "30s", "1s").Should(ContainSubstring(`leader{zone="Standalone"} 1`))

metrics, err := standalone2.GetKuma().GetMetrics()
Expect(err).ToNot(HaveOccurred())
Expect(metrics).To(ContainSubstring(`leader{zone="Standalone"} 0`))

// when CP 1 is killed
_, _, err = standalone1.Exec("", "", AppModeCP, "pkill", "-9", "kuma-cp")
Expect(err).ToNot(HaveOccurred())

// then CP 2 is leader
Eventually(func() (string, error) {
return standalone2.GetKuma().GetMetrics()
}, "30s", "1s").Should(ContainSubstring(`leader{zone="Standalone"} 1`))

// when postgres is down
err = standalone1.DeleteDeployment(postgres.AppPostgres + Kuma1)
Expect(err).ToNot(HaveOccurred())

// then CP 2 is not a leader anymore
Eventually(func() (string, error) {
return standalone2.GetKuma().GetMetrics()
}, "30s", "1s").Should(ContainSubstring(`leader{zone="Standalone"} 0`))
})
}
9 changes: 9 additions & 0 deletions test/e2e/resilience/leader_election_postgres_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package resilience_test

import (
"github.com/kumahq/kuma/test/e2e/resilience"

. "github.com/onsi/ginkgo"
)

var _ = Describe("Test Leader Election with Postgres", resilience.LeaderElectionPostgres)
2 changes: 2 additions & 0 deletions test/framework/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ type Cluster interface {
GetKumactlOptions() *KumactlOptions
Deployment(name string) Deployment
Deploy(deployment Deployment) error
DeleteDeployment(name string) error
WithTimeout(timeout time.Duration) Cluster
WithRetries(retries int) Cluster

Expand All @@ -312,6 +313,7 @@ type Cluster interface {
type ControlPlane interface {
GetName() string
GetKumaCPLogs() (string, error)
GetMetrics() (string, error)
GetKDSServerAddress() string
GetGlobaStatusAPI() string
GenerateDpToken(mesh, appname string) (string, error)
Expand Down
15 changes: 14 additions & 1 deletion test/framework/k8s_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -799,10 +799,11 @@ func (c *K8sCluster) GetTesting() testing.TestingT {
}

func (c *K8sCluster) DismissCluster() (errs error) {
for _, deployment := range c.deployments {
for name, deployment := range c.deployments {
if err := deployment.Delete(c); err != nil {
errs = multierr.Append(errs, err)
}
delete(c.deployments, name)
}
return nil
}
Expand All @@ -812,6 +813,18 @@ func (c *K8sCluster) Deploy(deployment Deployment) error {
return deployment.Deploy(c)
}

func (c *K8sCluster) DeleteDeployment(name string) error {
deployment, ok := c.deployments[name]
if !ok {
return errors.Errorf("deployment %s not found", name)
}
if err := deployment.Delete(c); err != nil {
return err
}
delete(c.deployments, name)
return nil
}

func (c *K8sCluster) WaitApp(name, namespace string, replicas int) error {
k8s.WaitUntilNumPodsCreated(c.t,
c.GetKubectlOptions(namespace),
Expand Down
9 changes: 9 additions & 0 deletions test/framework/k8s_clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,3 +227,12 @@ func (cs *K8sClusters) Deploy(deployment Deployment) error {
}
return nil
}

func (cs *K8sClusters) DeleteDeployment(deploymentName string) error {
for name, c := range cs.clusters {
if err := c.DeleteDeployment(deploymentName); err != nil {
return errors.Wrapf(err, "delete deployment %s failed on %s cluster", deploymentName, name)
}
}
return nil
}
4 changes: 4 additions & 0 deletions test/framework/k8s_controlplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,10 @@ func (c *K8sControlPlane) GetKDSServerAddress() string {
pod.Status.HostIP, strconv.FormatUint(uint64(kdsPort), 10))
}

func (c *K8sControlPlane) GetMetrics() (string, error) {
panic("not implemented")
}

func (c *K8sControlPlane) GetGlobaStatusAPI() string {
return "http://localhost:" + strconv.FormatUint(uint64(c.portFwd.localAPIPort), 10) + "/status/zones"
}
Expand Down
15 changes: 14 additions & 1 deletion test/framework/universal_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,11 @@ func (c *UniversalCluster) DismissCluster() (errs error) {
errs = multierr.Append(errs, err)
}
}
for _, deployment := range c.deployments {
for name, deployment := range c.deployments {
if err := deployment.Delete(c); err != nil {
errs = multierr.Append(errs, err)
}
delete(c.deployments, name)
}
return
}
Expand Down Expand Up @@ -315,3 +316,15 @@ func (c *UniversalCluster) Deploy(deployment Deployment) error {
c.deployments[deployment.Name()] = deployment
return deployment.Deploy(c)
}

func (c *UniversalCluster) DeleteDeployment(name string) error {
deployment, ok := c.deployments[name]
if !ok {
return errors.Errorf("deployment %s not found", name)
}
if err := deployment.Delete(c); err != nil {
return err
}
delete(c.deployments, name)
return nil
}
9 changes: 9 additions & 0 deletions test/framework/universal_clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,3 +201,12 @@ func (cs *UniversalClusters) Deploy(deployment Deployment) error {
}
return nil
}

func (cs *UniversalClusters) DeleteDeployment(deploymentName string) error {
for name, c := range cs.clusters {
if err := c.DeleteDeployment(deploymentName); err != nil {
return errors.Wrapf(err, "delete deployment %s failed on %s cluster", deploymentName, name)
}
}
return nil
}
15 changes: 15 additions & 0 deletions test/framework/universal_controlplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,21 @@ func (c *UniversalControlPlane) GetGlobaStatusAPI() string {
panic("not implemented")
}

func (c *UniversalControlPlane) GetMetrics() (string, error) {
return retry.DoWithRetryE(c.t, "fetching CP metrics", DefaultRetries, DefaultTimeout, func() (string, error) {
sshApp := NewSshApp(c.verbose, c.cluster.apps[AppModeCP].ports["22"], []string{}, []string{"curl",
"--fail", "--show-error",
"http://localhost:5680/metrics"})
if err := sshApp.Run(); err != nil {
return "", err
}
if sshApp.Err() != "" {
return "", errors.New(sshApp.Err())
}
return sshApp.Out(), nil
})
}

func (c *UniversalControlPlane) GenerateDpToken(mesh, service string) (string, error) {
dpType := ""
if service == "ingress" {
Expand Down

0 comments on commit 1c6a7c2

Please sign in to comment.