Skip to content
This repository has been archived by the owner on Oct 21, 2020. It is now read-only.

Commit

Permalink
Make leader-election configurable: default endpoints object namespace…
Browse files Browse the repository at this point in the history
… to controller's instead of kube-system
  • Loading branch information
wongma7 committed Aug 23, 2018
1 parent 11d0812 commit 5e4c602
Showing 1 changed file with 80 additions and 24 deletions.
104 changes: 80 additions & 24 deletions lib/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package controller

import (
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
Expand Down Expand Up @@ -139,6 +140,11 @@ type ProvisionController struct {
// The path of metrics endpoint path.
metricsPath string

// Whether to do kubernetes leader election at all. It should basically
// always be done when possible. It would be impossible e.g. when running
// out of cluster.
leaderElection bool
leaderElectionNamespace string
// Parameters of leaderelection.LeaderElectionConfig.
leaseDuration, renewDeadline, retryPeriod time.Duration

Expand All @@ -161,6 +167,8 @@ const (
DefaultFailedProvisionThreshold = 15
// DefaultFailedDeleteThreshold is used when option function FailedDeleteThreshold is omitted
DefaultFailedDeleteThreshold = 15
// DefaultLeaderElection is used when option function LeaderElection is omitted
DefaultLeaderElection = true
// DefaultLeaseDuration is used when option function LeaseDuration is omitted
DefaultLeaseDuration = 15 * time.Second
// DefaultRenewDeadline is used when option function RenewDeadline is omitted
Expand Down Expand Up @@ -263,6 +271,31 @@ func FailedDeleteThreshold(failedDeleteThreshold int) func(*ProvisionController)
}
}

// LeaderElection determines whether to enable leader election or not. Defaults
// to true.
func LeaderElection(leaderElection bool) func(*ProvisionController) error {
return func(c *ProvisionController) error {
if c.HasRun() {
return errRuntime
}
c.leaderElection = leaderElection
return nil
}
}

// LeaderElectionNamespace is the kubernetes namespace in which to create the
// leader election object. Defaults to the same namespace in which the
// the controller runs.
func LeaderElectionNamespace(leaderElectionNamespace string) func(*ProvisionController) error {
return func(c *ProvisionController) error {
if c.HasRun() {
return errRuntime
}
c.leaderElectionNamespace = leaderElectionNamespace
return nil
}
}

// LeaseDuration is the duration that non-leader candidates will
// wait to force acquire leadership. This is measured against time of
// last observed ack. Defaults to 15 seconds.
Expand Down Expand Up @@ -416,6 +449,8 @@ func NewProvisionController(
createProvisionedPVInterval: DefaultCreateProvisionedPVInterval,
failedProvisionThreshold: DefaultFailedProvisionThreshold,
failedDeleteThreshold: DefaultFailedDeleteThreshold,
leaderElection: DefaultLeaderElection,
leaderElectionNamespace: getInClusterNamespace(),
leaseDuration: DefaultLeaseDuration,
renewDeadline: DefaultRenewDeadline,
retryPeriod: DefaultRetryPeriod,
Expand Down Expand Up @@ -648,31 +683,36 @@ func (ctrl *ProvisionController) Run(stopCh <-chan struct{}) {
<-stopCh
}

rl, err := resourcelock.New("endpoints",
"kube-system",
strings.Replace(ctrl.provisionerName, "/", "-", -1),
ctrl.client.CoreV1(),
resourcelock.ResourceLockConfig{
Identity: ctrl.id,
EventRecorder: ctrl.eventRecorder,
})
if err != nil {
glog.Fatalf("Error creating lock: %v", err)
}

leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: ctrl.leaseDuration,
RenewDeadline: ctrl.renewDeadline,
RetryPeriod: ctrl.retryPeriod,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: run,
OnStoppedLeading: func() {
glog.Fatalf("leaderelection lost")
if ctrl.leaderElection {
// TODO: passed in stopCh is ignored.
rl, err := resourcelock.New("endpoints",
ctrl.leaderElectionNamespace,
strings.Replace(ctrl.provisionerName, "/", "-", -1),
ctrl.client.CoreV1(),
resourcelock.ResourceLockConfig{
Identity: ctrl.id,
EventRecorder: ctrl.eventRecorder,
})
if err != nil {
glog.Fatalf("Error creating lock: %v", err)
}

leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: ctrl.leaseDuration,
RenewDeadline: ctrl.renewDeadline,
RetryPeriod: ctrl.retryPeriod,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: run,
OnStoppedLeading: func() {
glog.Fatalf("leaderelection lost")
},
},
},
})
panic("unreachable")
})
panic("unreachable")
} else {
run(stopCh)
}
}

func (ctrl *ProvisionController) runClaimWorker() {
Expand Down Expand Up @@ -1164,6 +1204,22 @@ func logOperation(operation, format string, a ...interface{}) string {
return fmt.Sprintf(fmt.Sprintf("%s: %s", operation, format), a...)
}

// getInClusterNamespace returns the namespace in which the controller runs.
func getInClusterNamespace() string {
if ns := os.Getenv("POD_NAMESPACE"); ns != "" {
return ns
}

// Fall back to the namespace associated with the service account token, if available
if data, err := ioutil.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace"); err == nil {
if ns := strings.TrimSpace(string(data)); len(ns) > 0 {
return ns
}
}

return "default"
}

// getProvisionedVolumeNameForClaim returns PV.Name for the provisioned volume.
// The name must be unique.
func (ctrl *ProvisionController) getProvisionedVolumeNameForClaim(claim *v1.PersistentVolumeClaim) string {
Expand Down

0 comments on commit 5e4c602

Please sign in to comment.