Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change RKE upgrade logic for zero downtime #1800

Merged
merged 2 commits into from
Feb 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@
kube_config*
/rke
.vscode
cluster.rkestate
cluster.yml
119 changes: 109 additions & 10 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type Cluster struct {
v3.RancherKubernetesEngineConfig `yaml:",inline"`
WorkerHosts []*hosts.Host
EncryptionConfig encryptionConfig
NewHosts map[string]bool
}

type encryptionConfig struct {
Expand Down Expand Up @@ -98,7 +99,12 @@ const (
SystemNamespace = "kube-system"
)

func (c *Cluster) DeployControlPlane(ctx context.Context, svcOptionData map[string]*v3.KubernetesServicesOptions) error {
func (c *Cluster) DeployControlPlane(ctx context.Context, svcOptionData map[string]*v3.KubernetesServicesOptions, reconcileCluster bool) error {
kubeClient, err := k8s.NewClient(c.LocalKubeConfigPath, c.K8sWrapTransport)
if err != nil {
return fmt.Errorf("failed to initialize new kubernetes client: %v", err)
}

// Deploy Etcd Plane
etcdNodePlanMap := make(map[string]v3.RKEConfigNodePlan)
// Build etcd node plan map
Expand All @@ -120,37 +126,73 @@ func (c *Cluster) DeployControlPlane(ctx context.Context, svcOptionData map[stri
for _, cpHost := range c.ControlPlaneHosts {
cpNodePlanMap[cpHost.Address] = BuildRKEConfigNodePlan(ctx, c, cpHost, cpHost.DockerInfo, svcOptionData)
}
if err := services.RunControlPlane(ctx, c.ControlPlaneHosts,

if !reconcileCluster {
if err := services.RunControlPlane(ctx, c.ControlPlaneHosts,
c.LocalConnDialerFactory,
c.PrivateRegistriesMap,
cpNodePlanMap,
c.UpdateWorkersOnly,
c.SystemImages.Alpine,
c.Certificates); err != nil {
return fmt.Errorf("[controlPlane] Failed to bring up Control Plane: %v", err)
}
return nil
}
if err := services.UpgradeControlPlane(ctx, kubeClient, c.ControlPlaneHosts,
c.LocalConnDialerFactory,
c.PrivateRegistriesMap,
cpNodePlanMap,
c.UpdateWorkersOnly,
c.SystemImages.Alpine,
c.Certificates); err != nil {
return fmt.Errorf("[controlPlane] Failed to bring up Control Plane: %v", err)
c.Certificates, c.UpgradeStrategy, c.NewHosts); err != nil {
return fmt.Errorf("[controlPlane] Failed to upgrade Control Plane: %v", err)
}

return nil
}

func (c *Cluster) DeployWorkerPlane(ctx context.Context, svcOptionData map[string]*v3.KubernetesServicesOptions) error {
func (c *Cluster) DeployWorkerPlane(ctx context.Context, svcOptionData map[string]*v3.KubernetesServicesOptions, reconcileCluster bool) (string, error) {
var workerOnlyHosts, multipleRolesHosts []*hosts.Host
kubeClient, err := k8s.NewClient(c.LocalKubeConfigPath, c.K8sWrapTransport)
if err != nil {
return "", fmt.Errorf("failed to initialize new kubernetes client: %v", err)
}
// Deploy Worker plane
workerNodePlanMap := make(map[string]v3.RKEConfigNodePlan)
// Build cp node plan map
allHosts := hosts.GetUniqueHostList(c.EtcdHosts, c.ControlPlaneHosts, c.WorkerHosts)
for _, workerHost := range allHosts {
workerNodePlanMap[workerHost.Address] = BuildRKEConfigNodePlan(ctx, c, workerHost, workerHost.DockerInfo, svcOptionData)
if !workerHost.IsControl && !workerHost.IsEtcd {
workerOnlyHosts = append(workerOnlyHosts, workerHost)
} else {
multipleRolesHosts = append(multipleRolesHosts, workerHost)
}
}
if err := services.RunWorkerPlane(ctx, allHosts,

if !reconcileCluster {
if err := services.RunWorkerPlane(ctx, allHosts,
c.LocalConnDialerFactory,
c.PrivateRegistriesMap,
workerNodePlanMap,
c.Certificates,
c.UpdateWorkersOnly,
c.SystemImages.Alpine); err != nil {
return "", fmt.Errorf("[workerPlane] Failed to bring up Worker Plane: %v", err)
}
return "", nil
}
errMsgMaxUnavailableNotFailed, err := services.UpgradeWorkerPlane(ctx, kubeClient, multipleRolesHosts, workerOnlyHosts, c.InactiveHosts,
c.LocalConnDialerFactory,
c.PrivateRegistriesMap,
workerNodePlanMap,
c.Certificates,
c.UpdateWorkersOnly,
c.SystemImages.Alpine); err != nil {
return fmt.Errorf("[workerPlane] Failed to bring up Worker Plane: %v", err)
c.SystemImages.Alpine, c.UpgradeStrategy, c.NewHosts)
if err != nil {
return "", fmt.Errorf("[workerPlane] Failed to upgrade Worker Plane: %v", err)
}
return nil
return errMsgMaxUnavailableNotFailed, nil
}

func parseAuditLogConfig(clusterFile string, rkeConfig *v3.RancherKubernetesEngineConfig) error {
Expand Down Expand Up @@ -328,6 +370,60 @@ func parseIngressExtraVolumeMounts(ingressMap map[string]interface{}, rkeConfig
return nil
}

func parseNodeDrainInput(clusterFile string, rkeConfig *v3.RancherKubernetesEngineConfig) error {
// setting some defaults here because for these fields there's no way of differentiating between user provided null value vs golang setting it to null during unmarshal
if rkeConfig.UpgradeStrategy == nil || rkeConfig.UpgradeStrategy.DrainInput == nil {
return nil
}
var config map[string]interface{}
err := ghodssyaml.Unmarshal([]byte(clusterFile), &config)
if err != nil {
return fmt.Errorf("[parseNodeDrainInput] error unmarshalling: %v", err)
}
upgradeStrategy, err := convert.EncodeToMap(config["upgrade_strategy"])
if err != nil {
return err
}
nodeDrainInputMap, err := convert.EncodeToMap(upgradeStrategy["node_drain_input"])
if err != nil {
return err
}
nodeDrainInputBytes, err := ghodssyaml.Marshal(nodeDrainInputMap)
if err != nil {
return err
}
// this will only have fields that user set and none of the default empty values
var nodeDrainInput v3.NodeDrainInput
if err := ghodssyaml.Unmarshal(nodeDrainInputBytes, &nodeDrainInput); err != nil {
return err
}
var update bool
if _, ok := nodeDrainInputMap["ignore_daemonsets"]; !ok {
// user hasn't provided any input, default to true
nodeDrainInput.IgnoreDaemonSets = DefaultNodeDrainIgnoreDaemonsets
update = true
}
if _, ok := nodeDrainInputMap["timeout"]; !ok {
// user hasn't provided any input, default to 120
nodeDrainInput.Timeout = DefaultNodeDrainTimeout
update = true
}
if providedGracePeriod, ok := nodeDrainInputMap["grace_period"].(float64); !ok {
// user hasn't provided any input, default to -1
nodeDrainInput.GracePeriod = DefaultNodeDrainGracePeriod
update = true
} else {
// TODO: ghodssyaml.Marshal is losing the user provided value for GracePeriod, investigate why, till then assign the provided value explicitly
nodeDrainInput.GracePeriod = int(providedGracePeriod)
}

if update {
rkeConfig.UpgradeStrategy.DrainInput = &nodeDrainInput
}

return nil
}

func ParseConfig(clusterFile string) (*v3.RancherKubernetesEngineConfig, error) {
logrus.Debugf("Parsing cluster file [%v]", clusterFile)
var rkeConfig v3.RancherKubernetesEngineConfig
Expand Down Expand Up @@ -356,6 +452,9 @@ func ParseConfig(clusterFile string) (*v3.RancherKubernetesEngineConfig, error)
if err := parseIngressConfig(clusterFile, &rkeConfig); err != nil {
return &rkeConfig, fmt.Errorf("error parsing ingress config: %v", err)
}
if err := parseNodeDrainInput(clusterFile, &rkeConfig); err != nil {
return &rkeConfig, fmt.Errorf("error parsing upgrade strategy and node drain input: %v", err)
}
return &rkeConfig, nil
}

Expand Down
30 changes: 30 additions & 0 deletions cluster/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ const (
KubeAPIArgAuditPolicyFile = "audit-policy-file"
DefaultKubeAPIArgAuditLogPathValue = "/var/log/kube-audit/audit-log.json"
DefaultKubeAPIArgAuditPolicyFileValue = "/etc/kubernetes/audit-policy.yaml"

DefaultMaxUnavailable = "10%"
DefaultNodeDrainTimeout = 120
DefaultNodeDrainGracePeriod = -1
DefaultNodeDrainIgnoreDaemonsets = true
)

type ExternalFlags struct {
Expand Down Expand Up @@ -188,10 +193,35 @@ func (c *Cluster) setClusterDefaults(ctx context.Context, flags ExternalFlags) e
c.setClusterServicesDefaults()
c.setClusterNetworkDefaults()
c.setClusterAuthnDefaults()
c.setNodeUpgradeStrategy()

return nil
}

func (c *Cluster) setNodeUpgradeStrategy() {
if c.UpgradeStrategy == nil {
logrus.Info("No input provided for maxUnavailable, setting it to default value of 10%")
c.UpgradeStrategy = &v3.NodeUpgradeStrategy{
MaxUnavailable: DefaultMaxUnavailable,
}
return
}
setDefaultIfEmpty(&c.UpgradeStrategy.MaxUnavailable, DefaultMaxUnavailable)
if !c.UpgradeStrategy.Drain {
return
}
if c.UpgradeStrategy.DrainInput == nil {
c.UpgradeStrategy.DrainInput = &v3.NodeDrainInput{
IgnoreDaemonSets: DefaultNodeDrainIgnoreDaemonsets,
// default to 120 seems to work better for controlplane nodes
Timeout: DefaultNodeDrainTimeout,
//Period of time in seconds given to each pod to terminate gracefully.
// If negative, the default value specified in the pod will be used
GracePeriod: DefaultNodeDrainGracePeriod,
}
}
}

func (c *Cluster) setClusterServicesDefaults() {
// We don't accept per service images anymore.
c.Services.KubeAPI.Image = c.SystemImages.Kubernetes
Expand Down
37 changes: 35 additions & 2 deletions cluster/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ package cluster
import (
"context"
"fmt"
"github.com/rancher/rke/metadata"
"k8s.io/api/core/v1"
"strings"

"github.com/rancher/rke/log"
"github.com/rancher/rke/metadata"
"github.com/rancher/rke/pki"
"github.com/rancher/rke/services"
"github.com/rancher/rke/util"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/validation"
)

Expand Down Expand Up @@ -196,6 +196,39 @@ func ValidateHostCount(c *Cluster) error {
return nil
}

func (c *Cluster) ValidateHostCountForUpgrade() error {
var inactiveControlPlaneHosts, inactiveWorkerOnlyHosts []string
var workerOnlyHosts int
for _, host := range c.InactiveHosts {
if host.IsControl {
inactiveControlPlaneHosts = append(inactiveControlPlaneHosts, host.HostnameOverride)
}
if !host.IsEtcd && !host.IsControl {
inactiveWorkerOnlyHosts = append(inactiveWorkerOnlyHosts, host.HostnameOverride)
}
// not breaking out of the loop so we can log all of the inactive hosts
}
if len(inactiveControlPlaneHosts) >= 1 {
return fmt.Errorf("cannot proceed with upgrade of controlplane if one or more controlplane hosts are inactive; found inactive hosts: %v", strings.Join(inactiveControlPlaneHosts, ","))
}

for _, host := range c.WorkerHosts {
if host.IsControl || host.IsEtcd {
continue
}
workerOnlyHosts++
}

maxUnavailable, err := services.CalculateMaxUnavailable(c.UpgradeStrategy.MaxUnavailable, workerOnlyHosts)
if err != nil {
return err
}
if len(inactiveWorkerOnlyHosts) >= maxUnavailable {
return fmt.Errorf("cannot proceed with upgrade of worker components since %v (>=maxUnavailable) hosts are inactive; found inactive hosts: %v", len(inactiveWorkerOnlyHosts), strings.Join(inactiveWorkerOnlyHosts, ","))
}
return nil
}

func validateDuplicateNodes(c *Cluster) error {
for i := range c.Nodes {
for j := range c.Nodes {
Expand Down
2 changes: 1 addition & 1 deletion cmd/cert.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func rebuildClusterWithRotatedCertificates(ctx context.Context,
}
if isLegacyKubeAPI {
log.Infof(ctx, "[controlplane] Redeploying controlplane to update kubeapi parameters")
if err := kubeCluster.DeployControlPlane(ctx, svcOptionData); err != nil {
if err := kubeCluster.DeployControlPlane(ctx, svcOptionData, true); err != nil {
return APIURL, caCrt, clientCert, clientKey, nil, err
}
}
Expand Down
31 changes: 29 additions & 2 deletions cmd/up.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func UpCommand() cli.Command {

func ClusterUp(ctx context.Context, dialersOptions hosts.DialersOptions, flags cluster.ExternalFlags, data map[string]interface{}) (string, string, string, string, map[string]pki.CertificatePKI, error) {
var APIURL, caCrt, clientCert, clientKey string
var reconcileCluster bool

clusterState, err := cluster.ReadStateFile(ctx, cluster.GetStateFilePath(flags.ClusterFilePath, flags.ConfigDir))
if err != nil {
Expand Down Expand Up @@ -113,11 +114,34 @@ func ClusterUp(ctx context.Context, dialersOptions hosts.DialersOptions, flags c
return APIURL, caCrt, clientCert, clientKey, nil, err
}

err = kubeCluster.ValidateHostCountForUpgrade()
if err != nil {
return APIURL, caCrt, clientCert, clientKey, nil, err
}
currentCluster, err := kubeCluster.GetClusterState(ctx, clusterState)
if err != nil {
return APIURL, caCrt, clientCert, clientKey, nil, err
}

if currentCluster != nil {
// reconcile this cluster, to check if upgrade is needed, or new nodes are getting added/removed
/*This is to separate newly added nodes, so we don't try to check their status/cordon them before upgrade.
This will also cover nodes that were considered inactive first time cluster was provisioned, but are now active during upgrade*/
currentClusterNodes := make(map[string]bool)
for _, node := range clusterState.CurrentState.RancherKubernetesEngineConfig.Nodes {
currentClusterNodes[node.HostnameOverride] = true
}

newNodes := make(map[string]bool)
for _, node := range clusterState.DesiredState.RancherKubernetesEngineConfig.Nodes {
if !currentClusterNodes[node.HostnameOverride] {
newNodes[node.HostnameOverride] = true
}
}
kubeCluster.NewHosts = newNodes
reconcileCluster = true
}

if !flags.DisablePortCheck {
if err = kubeCluster.CheckClusterPorts(ctx, currentCluster); err != nil {
return APIURL, caCrt, clientCert, clientKey, nil, err
Expand Down Expand Up @@ -158,7 +182,7 @@ func ClusterUp(ctx context.Context, dialersOptions hosts.DialersOptions, flags c
return APIURL, caCrt, clientCert, clientKey, nil, err
}

err = kubeCluster.DeployControlPlane(ctx, svcOptionsData)
err = kubeCluster.DeployControlPlane(ctx, svcOptionsData, reconcileCluster)
if err != nil {
return APIURL, caCrt, clientCert, clientKey, nil, err
}
Expand All @@ -178,7 +202,7 @@ func ClusterUp(ctx context.Context, dialersOptions hosts.DialersOptions, flags c
return APIURL, caCrt, clientCert, clientKey, nil, err
}

err = kubeCluster.DeployWorkerPlane(ctx, svcOptionsData)
errMsgMaxUnavailableNotFailed, err := kubeCluster.DeployWorkerPlane(ctx, svcOptionsData, reconcileCluster)
if err != nil {
return APIURL, caCrt, clientCert, clientKey, nil, err
}
Expand Down Expand Up @@ -206,6 +230,9 @@ func ClusterUp(ctx context.Context, dialersOptions hosts.DialersOptions, flags c
return APIURL, caCrt, clientCert, clientKey, nil, err
}

if errMsgMaxUnavailableNotFailed != "" {
return APIURL, caCrt, clientCert, clientKey, nil, fmt.Errorf(errMsgMaxUnavailableNotFailed)
}
log.Infof(ctx, "Finished building Kubernetes cluster successfully")
return APIURL, caCrt, clientCert, clientKey, kubeCluster.Certificates, nil
}
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@ require (
github.com/mattn/go-colorable v0.1.0
github.com/mcuadros/go-version v0.0.0-20180611085657-6d5863ca60fa
github.com/morikuni/aec v0.0.0-20170113033406-39771216ff4c // indirect
github.com/opencontainers/go-digest v1.0.0-rc1 // indirect
github.com/opencontainers/image-spec v1.0.1 // indirect
github.com/pkg/errors v0.8.1
github.com/rancher/kontainer-driver-metadata v0.0.0-20200123225253-a5b9f3e0b2df
github.com/rancher/norman v0.0.0-20200123223841-6d86f4e37a69
github.com/rancher/types v0.0.0-20200123224322-9adcafc483ee
github.com/rancher/types v0.0.0-20200128160249-56c60bfefb76
github.com/sirupsen/logrus v1.4.2
github.com/smartystreets/goconvey v0.0.0-20190731233626-505e41936337 // indirect
github.com/stretchr/testify v1.4.0
Expand All @@ -44,5 +43,6 @@ require (
k8s.io/apimachinery v0.17.2
k8s.io/apiserver v0.17.2
k8s.io/client-go v2.0.0-alpha.0.0.20181121191925-a47917edff34+incompatible
k8s.io/kubectl v0.17.2
sigs.k8s.io/yaml v1.1.0
)
Loading