Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(kuma-cp) stop components on leader election lost #2318

Merged
merged 1 commit into from
Jul 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/core/runtime/component/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,14 @@ func (cm *manager) startLeaderComponents(stop <-chan struct{}, errCh chan error)
closeLeaderCh := func() {
mutex.Lock()
defer mutex.Unlock()
if channels.IsClosed(leaderStopCh) {
if !channels.IsClosed(leaderStopCh) {
lobkovilya marked this conversation as resolved.
Show resolved Hide resolved
close(leaderStopCh)
}
}

cm.leaderElector.AddCallbacks(LeaderCallbacks{
OnStartedLeading: func() {
log.Info("Leader acquired")
log.Info("leader acquired")
mutex.Lock()
defer mutex.Unlock()
leaderStopCh = make(chan struct{})
Expand All @@ -118,7 +118,7 @@ func (cm *manager) startLeaderComponents(stop <-chan struct{}, errCh chan error)
}
},
OnStoppedLeading: func() {
log.Info("Leader lost")
log.Info("leader lost")
closeLeaderCh()
},
})
Expand Down
31 changes: 19 additions & 12 deletions pkg/plugins/leader/postgres/leader_elector.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,29 +39,22 @@ func NewPostgresLeaderElector(lockClient *pglock.Client) component.LeaderElector
}

func (p *postgresLeaderElector) Start(stop <-chan struct{}) {
log.Info("Starting leader election")
log.Info("starting Leader Elector")
ctx, cancelFn := context.WithCancel(context.Background())
go func() {
<-stop
log.Info("Stopping Leader Elector")
log.Info("stopping Leader Elector")
cancelFn()
}()

for {
log.Info("Waiting for lock")
log.Info("waiting for lock")
err := p.lockClient.Do(ctx, kumaLockName, func(ctx context.Context, lock *pglock.Lock) error {
p.setLeader(true)
for _, callback := range p.callbacks {
callback.OnStartedLeading()
}
p.leaderAcquired()
<-ctx.Done()
p.setLeader(false)
for _, callback := range p.callbacks {
callback.OnStoppedLeading()
}
p.leaderLost()
return nil
})
p.setLeader(false)
// in case of error (ex. connection to postgres is dropped) we want to retry the lock with some backoff
// returning error here would shut down the CP
if err != nil {
Expand All @@ -76,6 +69,20 @@ func (p *postgresLeaderElector) Start(stop <-chan struct{}) {
log.Info("Leader Elector stopped")
}

func (p *postgresLeaderElector) leaderAcquired() {
p.setLeader(true)
for _, callback := range p.callbacks {
callback.OnStartedLeading()
}
}

func (p *postgresLeaderElector) leaderLost() {
p.setLeader(false)
for _, callback := range p.callbacks {
callback.OnStoppedLeading()
}
}

func (p *postgresLeaderElector) AddCallbacks(callbacks component.LeaderCallbacks) {
p.callbacks = append(p.callbacks, callbacks)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/plugins/runtime/universal/outbound/outbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/kumahq/kuma/pkg/dns"
)

var log = core.Log.WithName("dns-vips-allocator")
var log = core.Log.WithName("vip-outbounds-reconciler")

type VIPOutboundsReconciler struct {
rorm manager.ReadOnlyResourceManager
Expand Down
93 changes: 93 additions & 0 deletions test/e2e/resilience/leader_election_postgres.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package resilience

import (
"github.com/kumahq/kuma/pkg/config/core"
. "github.com/kumahq/kuma/test/framework"
"github.com/kumahq/kuma/test/framework/deployments/postgres"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)

func LeaderElectionPostgres() {
var standalone1, standalone2 Cluster
var standalone1Opts, standalone2Opts []DeployOptionsFunc

BeforeEach(func() {
clusters, err := NewUniversalClusters(
[]string{Kuma1, Kuma2},
Silent)
Expect(err).ToNot(HaveOccurred())
standalone1 = clusters.GetCluster(Kuma1)
standalone2 = clusters.GetCluster(Kuma2)

err = NewClusterSetup().
Install(postgres.Install(Kuma1)).
Setup(standalone1)
Expect(err).ToNot(HaveOccurred())
postgresInstance := postgres.From(standalone1, Kuma1)

// Standalone 1
standalone1Opts = KumaUniversalDeployOpts
standalone1Opts = append(standalone1Opts, WithPostgres(postgresInstance.GetEnvVars()))

err = NewClusterSetup().
Install(Kuma(core.Standalone, standalone1Opts...)).
Setup(standalone1)

Expect(err).ToNot(HaveOccurred())
Expect(standalone1.VerifyKuma()).To(Succeed())

// Standalone 2
standalone2Opts = KumaUniversalDeployOpts
standalone2Opts = append(standalone2Opts, WithPostgres(postgresInstance.GetEnvVars()))

err = NewClusterSetup().
Install(Kuma(core.Standalone, standalone2Opts...)).
Setup(standalone2)

Expect(err).ToNot(HaveOccurred())
Expect(standalone2.VerifyKuma()).To(Succeed())
})

E2EAfterEach(func() {
err := standalone1.DeleteKuma(standalone1Opts...)
Expect(err).ToNot(HaveOccurred())
err = standalone1.DismissCluster()
Expect(err).ToNot(HaveOccurred())

err = standalone2.DeleteKuma(standalone2Opts...)
Expect(err).ToNot(HaveOccurred())
err = standalone2.DismissCluster()
Expect(err).ToNot(HaveOccurred())
})

It("should elect only one leader and drop the leader on DB disconnect", func() {
// given two instances of the control plane connected to one postgres, only one is a leader
Eventually(func() (string, error) {
return standalone1.GetKuma().GetMetrics()
}, "30s", "1s").Should(ContainSubstring(`leader{zone="Standalone"} 1`))

metrics, err := standalone2.GetKuma().GetMetrics()
Expect(err).ToNot(HaveOccurred())
Expect(metrics).To(ContainSubstring(`leader{zone="Standalone"} 0`))

// when CP 1 is killed
_, _, err = standalone1.Exec("", "", AppModeCP, "pkill", "-9", "kuma-cp")
Expect(err).ToNot(HaveOccurred())

// then CP 2 is leader
Eventually(func() (string, error) {
return standalone2.GetKuma().GetMetrics()
}, "30s", "1s").Should(ContainSubstring(`leader{zone="Standalone"} 1`))

// when postgres is down
err = standalone1.DeleteDeployment(postgres.AppPostgres + Kuma1)
Expect(err).ToNot(HaveOccurred())

// then CP 2 is not a leader anymore
Eventually(func() (string, error) {
return standalone2.GetKuma().GetMetrics()
}, "30s", "1s").Should(ContainSubstring(`leader{zone="Standalone"} 0`))
})
}
9 changes: 9 additions & 0 deletions test/e2e/resilience/leader_election_postgres_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package resilience_test

import (
"github.com/kumahq/kuma/test/e2e/resilience"

. "github.com/onsi/ginkgo"
)

var _ = Describe("Test Leader Election with Postgres", resilience.LeaderElectionPostgres)
2 changes: 2 additions & 0 deletions test/framework/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ type Cluster interface {
GetKumactlOptions() *KumactlOptions
Deployment(name string) Deployment
Deploy(deployment Deployment) error
DeleteDeployment(name string) error
WithTimeout(timeout time.Duration) Cluster
WithRetries(retries int) Cluster

Expand All @@ -312,6 +313,7 @@ type Cluster interface {
type ControlPlane interface {
GetName() string
GetKumaCPLogs() (string, error)
GetMetrics() (string, error)
GetKDSServerAddress() string
GetGlobaStatusAPI() string
GenerateDpToken(mesh, appname string) (string, error)
Expand Down
15 changes: 14 additions & 1 deletion test/framework/k8s_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -799,10 +799,11 @@ func (c *K8sCluster) GetTesting() testing.TestingT {
}

func (c *K8sCluster) DismissCluster() (errs error) {
for _, deployment := range c.deployments {
for name, deployment := range c.deployments {
if err := deployment.Delete(c); err != nil {
errs = multierr.Append(errs, err)
}
delete(c.deployments, name)
}
return nil
}
Expand All @@ -812,6 +813,18 @@ func (c *K8sCluster) Deploy(deployment Deployment) error {
return deployment.Deploy(c)
}

func (c *K8sCluster) DeleteDeployment(name string) error {
deployment, ok := c.deployments[name]
if !ok {
return errors.Errorf("deployment %s not found", name)
}
if err := deployment.Delete(c); err != nil {
return err
}
delete(c.deployments, name)
return nil
}

func (c *K8sCluster) WaitApp(name, namespace string, replicas int) error {
k8s.WaitUntilNumPodsCreated(c.t,
c.GetKubectlOptions(namespace),
Expand Down
9 changes: 9 additions & 0 deletions test/framework/k8s_clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,3 +227,12 @@ func (cs *K8sClusters) Deploy(deployment Deployment) error {
}
return nil
}

func (cs *K8sClusters) DeleteDeployment(deploymentName string) error {
for name, c := range cs.clusters {
if err := c.DeleteDeployment(deploymentName); err != nil {
return errors.Wrapf(err, "delete deployment %s failed on %s cluster", deploymentName, name)
}
}
return nil
}
4 changes: 4 additions & 0 deletions test/framework/k8s_controlplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,10 @@ func (c *K8sControlPlane) GetKDSServerAddress() string {
pod.Status.HostIP, strconv.FormatUint(uint64(kdsPort), 10))
}

func (c *K8sControlPlane) GetMetrics() (string, error) {
panic("not implemented")
}

func (c *K8sControlPlane) GetGlobaStatusAPI() string {
return "http://localhost:" + strconv.FormatUint(uint64(c.portFwd.localAPIPort), 10) + "/status/zones"
}
Expand Down
15 changes: 14 additions & 1 deletion test/framework/universal_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,11 @@ func (c *UniversalCluster) DismissCluster() (errs error) {
errs = multierr.Append(errs, err)
}
}
for _, deployment := range c.deployments {
for name, deployment := range c.deployments {
if err := deployment.Delete(c); err != nil {
errs = multierr.Append(errs, err)
}
delete(c.deployments, name)
}
return
}
Expand Down Expand Up @@ -315,3 +316,15 @@ func (c *UniversalCluster) Deploy(deployment Deployment) error {
c.deployments[deployment.Name()] = deployment
return deployment.Deploy(c)
}

func (c *UniversalCluster) DeleteDeployment(name string) error {
deployment, ok := c.deployments[name]
if !ok {
return errors.Errorf("deployment %s not found", name)
}
if err := deployment.Delete(c); err != nil {
return err
}
delete(c.deployments, name)
return nil
}
9 changes: 9 additions & 0 deletions test/framework/universal_clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,3 +201,12 @@ func (cs *UniversalClusters) Deploy(deployment Deployment) error {
}
return nil
}

func (cs *UniversalClusters) DeleteDeployment(deploymentName string) error {
for name, c := range cs.clusters {
if err := c.DeleteDeployment(deploymentName); err != nil {
return errors.Wrapf(err, "delete deployment %s failed on %s cluster", deploymentName, name)
}
}
return nil
}
15 changes: 15 additions & 0 deletions test/framework/universal_controlplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,21 @@ func (c *UniversalControlPlane) GetGlobaStatusAPI() string {
panic("not implemented")
}

func (c *UniversalControlPlane) GetMetrics() (string, error) {
return retry.DoWithRetryE(c.t, "fetching CP metrics", DefaultRetries, DefaultTimeout, func() (string, error) {
sshApp := NewSshApp(c.verbose, c.cluster.apps[AppModeCP].ports["22"], []string{}, []string{"curl",
"--fail", "--show-error",
"http://localhost:5680/metrics"})
if err := sshApp.Run(); err != nil {
return "", err
}
if sshApp.Err() != "" {
return "", errors.New(sshApp.Err())
}
return sshApp.Out(), nil
})
}

func (c *UniversalControlPlane) GenerateDpToken(mesh, service string) (string, error) {
dpType := ""
if service == "ingress" {
Expand Down