Skip to content

Commit

Permalink
Upgrade worker nodes in user configurable batches
Browse files Browse the repository at this point in the history
  • Loading branch information
mrajashree committed Jan 20, 2020
1 parent a7d3eb6 commit 8a69634
Show file tree
Hide file tree
Showing 5 changed files with 228 additions and 11 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@
kube_config*
/rke
.vscode
cluster.rkestate
cluster.yml
43 changes: 34 additions & 9 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,22 +133,47 @@ func (c *Cluster) DeployControlPlane(ctx context.Context, svcOptionData map[stri
return nil
}

func (c *Cluster) DeployWorkerPlane(ctx context.Context, svcOptionData map[string]*v3.KubernetesServicesOptions) error {
func (c *Cluster) DeployWorkerPlane(ctx context.Context, svcOptionData map[string]*v3.KubernetesServicesOptions, isUpgrade bool) error {
var workerOnlyHosts, multipleRolesHosts []*hosts.Host

// Deploy Worker plane
workerNodePlanMap := make(map[string]v3.RKEConfigNodePlan)
// Build cp node plan map
allHosts := hosts.GetUniqueHostList(c.EtcdHosts, c.ControlPlaneHosts, c.WorkerHosts)
for _, workerHost := range allHosts {
workerNodePlanMap[workerHost.Address] = BuildRKEConfigNodePlan(ctx, c, workerHost, workerHost.DockerInfo, svcOptionData)
if !workerHost.IsControl && !workerHost.IsEtcd {
workerOnlyHosts = append(workerOnlyHosts, workerHost)
} else {
multipleRolesHosts = append(multipleRolesHosts, workerHost)
}
}
if err := services.RunWorkerPlane(ctx, allHosts,
c.LocalConnDialerFactory,
c.PrivateRegistriesMap,
workerNodePlanMap,
c.Certificates,
c.UpdateWorkersOnly,
c.SystemImages.Alpine); err != nil {
return fmt.Errorf("[workerPlane] Failed to bring up Worker Plane: %v", err)

kubeClient, err := k8s.NewClient(c.LocalKubeConfigPath, c.K8sWrapTransport)
if err != nil {
return fmt.Errorf("failed to initialize new kubernetes client: %v", err)
}

if !isUpgrade {
if err := services.RunWorkerPlane(ctx, allHosts,
c.LocalConnDialerFactory,
c.PrivateRegistriesMap,
workerNodePlanMap,
c.Certificates,
c.UpdateWorkersOnly,
c.SystemImages.Alpine); err != nil {
return fmt.Errorf("[workerPlane] Failed to bring up Worker Plane: %v", err)
}
} else {
if err := services.UpgradeWorkerPlane(ctx, kubeClient, multipleRolesHosts, workerOnlyHosts, len(c.InactiveHosts),
c.LocalConnDialerFactory,
c.PrivateRegistriesMap,
workerNodePlanMap,
c.Certificates,
c.UpdateWorkersOnly,
c.SystemImages.Alpine, c.UpgradeStrategy); err != nil {
return fmt.Errorf("[workerPlane] Failed to upgrade Worker Plane: %v", err)
}
}
return nil
}
Expand Down
7 changes: 6 additions & 1 deletion cmd/up.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func UpCommand() cli.Command {

func ClusterUp(ctx context.Context, dialersOptions hosts.DialersOptions, flags cluster.ExternalFlags, data map[string]interface{}) (string, string, string, string, map[string]pki.CertificatePKI, error) {
var APIURL, caCrt, clientCert, clientKey string
var isUpgrade bool

clusterState, err := cluster.ReadStateFile(ctx, cluster.GetStateFilePath(flags.ClusterFilePath, flags.ConfigDir))
if err != nil {
Expand Down Expand Up @@ -118,6 +119,10 @@ func ClusterUp(ctx context.Context, dialersOptions hosts.DialersOptions, flags c
return APIURL, caCrt, clientCert, clientKey, nil, err
}

if currentCluster != nil {
isUpgrade = true
}

if !flags.DisablePortCheck {
if err = kubeCluster.CheckClusterPorts(ctx, currentCluster); err != nil {
return APIURL, caCrt, clientCert, clientKey, nil, err
Expand Down Expand Up @@ -178,7 +183,7 @@ func ClusterUp(ctx context.Context, dialersOptions hosts.DialersOptions, flags c
return APIURL, caCrt, clientCert, clientKey, nil, err
}

err = kubeCluster.DeployWorkerPlane(ctx, svcOptionsData)
err = kubeCluster.DeployWorkerPlane(ctx, svcOptionsData, isUpgrade)
if err != nil {
return APIURL, caCrt, clientCert, clientKey, nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion k8s/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func CordonUncordon(k8sClient *kubernetes.Clientset, nodeName string, cordoned b
func IsNodeReady(node v1.Node) bool {
nodeConditions := node.Status.Conditions
for _, condition := range nodeConditions {
if condition.Type == "Ready" && condition.Status == v1.ConditionTrue {
if condition.Type == v1.NodeReady && condition.Status == v1.ConditionTrue {
return true
}
}
Expand Down
185 changes: 185 additions & 0 deletions services/workerplane.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,23 @@
package services

import (
"bytes"
"context"
"fmt"
"strings"
"time"

"github.com/rancher/rke/hosts"
"github.com/rancher/rke/k8s"
"github.com/rancher/rke/log"
"github.com/rancher/rke/pki"
"github.com/rancher/rke/util"
v3 "github.com/rancher/types/apis/management.cattle.io/v3"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
k8sutil "k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes"
"k8s.io/kubectl/pkg/drain"
)

const (
Expand Down Expand Up @@ -42,6 +51,182 @@ func RunWorkerPlane(ctx context.Context, allHosts []*hosts.Host, localConnDialer
return nil
}

func UpgradeWorkerPlane(ctx context.Context, kubeClient *kubernetes.Clientset, multipleRolesHosts []*hosts.Host, workerOnlyHosts []*hosts.Host, numInactiveHosts int, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, workerNodePlanMap map[string]v3.RKEConfigNodePlan, certMap map[string]pki.CertificatePKI, updateWorkersOnly bool, alpineImage string, upgradeStrategy *v3.NodeUpgradeStrategy) error {
log.Infof(ctx, "[%s] Upgrading Worker Plane..", WorkerRole)
var maxUnavailable int

maxUnavailableVal := upgradeStrategy.MaxUnavailable
if maxUnavailableVal == "" {
// default to 50%
maxUnavailableVal = "50%"
}
maxUnavailableParsed := k8sutil.Parse(maxUnavailableVal)
maxUnavailable, err := calculateMaxUnavailable(maxUnavailableParsed, len(multipleRolesHosts)+len(workerOnlyHosts))
if err != nil {
return err
}
// TunnelHosts finds out inactive hosts by trying to connect to them and removes them from the hosts list
if numInactiveHosts >= maxUnavailable {
// cannot upgrade since already hit maxUnavailable number of nodes
return fmt.Errorf("cannot proceed with upgrade since %v (maxUnavailable) hosts are inactive", numInactiveHosts)
}
log.Infof(ctx, fmt.Sprintf("Upgrading %v hosts at a time", maxUnavailable))
// proceed with remaining active hosts
maxUnavailable -= numInactiveHosts
log.Infof(ctx, "First upgrading worker components on nodes with etcd/controlplane roles")
multipleRolesHostsFailedToUpgrade, err := startWorkerPlane(ctx, kubeClient, multipleRolesHosts, localConnDialerFactory, prsMap, workerNodePlanMap, certMap, updateWorkersOnly, alpineImage, maxUnavailable, upgradeStrategy)
if err != nil && len(multipleRolesHostsFailedToUpgrade) == maxUnavailable {
logrus.Errorf("Failed to upgrade hosts: %v", strings.Join(multipleRolesHostsFailedToUpgrade, ","))
return err
}

maxUnavailable -= len(multipleRolesHostsFailedToUpgrade)
log.Infof(ctx, "Upgrading worker components on nodes with only worker role")
workerOnlyHostsFailedToUpgrade, err := startWorkerPlane(ctx, kubeClient, workerOnlyHosts, localConnDialerFactory, prsMap, workerNodePlanMap, certMap, updateWorkersOnly, alpineImage, maxUnavailable, upgradeStrategy)
if err != nil {
multipleRolesHostsFailedToUpgrade = append(multipleRolesHostsFailedToUpgrade, workerOnlyHostsFailedToUpgrade...)
logrus.Errorf("Failed to upgrade hosts: %v", strings.Join(multipleRolesHostsFailedToUpgrade, ","))
return err
}

log.Infof(ctx, "[%s] Successfully upgraded Worker Plane..", WorkerRole)
return nil
}

func calculateMaxUnavailable(maxUnavailableParsed k8sutil.IntOrString, numHosts int) (int, error) {
// if maxUnavailable is given in percent, round down
maxUnavailable, err := k8sutil.GetValueFromIntOrPercent(&maxUnavailableParsed, numHosts, false)
if err != nil {
return 0, err
}
if maxUnavailable == 0 {
// In case there is only one node and rounding down maxUnvailable percentage led to 0
maxUnavailable = 1
}
return maxUnavailable, nil
}

func enqueueHosts(hosts chan<- *hosts.Host, allHosts []*hosts.Host) {
for _, host := range allHosts {
hosts <- host
}
close(hosts)
}

func startWorkerPlane(ctx context.Context, kubeClient *kubernetes.Clientset, allHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, workerNodePlanMap map[string]v3.RKEConfigNodePlan, certMap map[string]pki.CertificatePKI, updateWorkersOnly bool, alpineImage string,
maxUnavailable int, upgradeStrategy *v3.NodeUpgradeStrategy) ([]string, error) {
var errgrp errgroup.Group
var drainHelper drain.Helper
var failedHosts []string
hostsQueue := make(chan *hosts.Host, maxUnavailable)
hostsFailedToUpgrade := make(chan string, maxUnavailable)

go enqueueHosts(hostsQueue, allHosts)

if upgradeStrategy.Drain {
drainHelper = drain.Helper{
Client: kubeClient,
Force: upgradeStrategy.DrainInput.Force, // continue for pods not managed by deployment/replication controller/daemonset
IgnoreAllDaemonSets: upgradeStrategy.DrainInput.IgnoreDaemonSets, // ignore daemonset pods
DeleteLocalData: upgradeStrategy.DrainInput.DeleteLocalData,
GracePeriodSeconds: upgradeStrategy.DrainInput.GracePeriod,
Timeout: time.Duration(upgradeStrategy.DrainInput.Timeout),
Out: bytes.NewBuffer([]byte{}),
ErrOut: bytes.NewBuffer([]byte{}),
}
}

workerThreads := maxUnavailable
for w := 0; w < workerThreads; w++ {
errgrp.Go(func() error {
var errList []error
for runHost := range hostsQueue {
// when cluster is upgrading, for each node check if it's ready, cordon it and drain optionally
if err := checkNodeStatus(kubeClient, runHost); err != nil {
errList = append(errList, err)
if len(hostsFailedToUpgrade) == maxUnavailable {
break
}
hostsFailedToUpgrade <- runHost.HostnameOverride
}
// cordon and drain
if err := cordonAndDrainNode(kubeClient, runHost, drainHelper, upgradeStrategy.Drain); err != nil {
errList = append(errList, err)
if len(hostsFailedToUpgrade) == maxUnavailable {
break
}
hostsFailedToUpgrade <- runHost.HostnameOverride
}

// upgrade node
logrus.Debugf("[workerplane] upgrading host %v", runHost.HostnameOverride)
err := doDeployWorkerPlaneHost(ctx, runHost, localConnDialerFactory, prsMap, workerNodePlanMap[runHost.Address].Processes, certMap, updateWorkersOnly, alpineImage)
if err != nil {
errList = append(errList, err)
if len(hostsFailedToUpgrade) == maxUnavailable {
break
}
hostsFailedToUpgrade <- runHost.HostnameOverride
}

// consider upgrade done when kubeclient lists node as ready
if err := checkNodeStatus(kubeClient, runHost); err != nil {
errList = append(errList, err)
if len(hostsFailedToUpgrade) == maxUnavailable {
break
}
hostsFailedToUpgrade <- runHost.HostnameOverride
}
// uncordon node
if err := k8s.CordonUncordon(kubeClient, runHost.HostnameOverride, false); err != nil {
errList = append(errList, err)
if len(hostsFailedToUpgrade) == maxUnavailable {
break
}
hostsFailedToUpgrade <- runHost.HostnameOverride
}
}
return util.ErrList(errList)
})
}

err := errgrp.Wait()
close(hostsFailedToUpgrade)

if err != nil {
for host := range hostsFailedToUpgrade {
failedHosts = append(failedHosts, host)
}
}
return failedHosts, err
}

func checkNodeStatus(kubeClient *kubernetes.Clientset, runHost *hosts.Host) error {
logrus.Debugf("[workerplane] Now checking status of node %v", runHost.HostnameOverride)
k8sNode, err := k8s.GetNode(kubeClient, runHost.HostnameOverride)
if err != nil {
logrus.Errorf("[workerplane] Error getting node %v: %v", runHost.HostnameOverride, err)
return err
}
logrus.Debugf("[workerplane] Found node by name %s", runHost.HostnameOverride)
if !k8s.IsNodeReady(*k8sNode) {
return fmt.Errorf("host %v not ready", runHost.HostnameOverride)
}
return nil
}

func cordonAndDrainNode(kubeClient *kubernetes.Clientset, host *hosts.Host, drainHelper drain.Helper, drainNode bool) error {
logrus.Debugf("[workerplane] Cordoning node %v", host.HostnameOverride)
if err := k8s.CordonUncordon(kubeClient, host.HostnameOverride, true); err != nil {
return err
}
if !drainNode {
return nil
}
logrus.Debugf("[workerplane] Draining node %v", host.HostnameOverride)
return drain.RunNodeDrain(&drainHelper, host.HostnameOverride)
}

func doDeployWorkerPlaneHost(ctx context.Context, host *hosts.Host, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, processMap map[string]v3.Process, certMap map[string]pki.CertificatePKI, updateWorkersOnly bool, alpineImage string) error {
if updateWorkersOnly {
if !host.UpdateWorker {
Expand Down

0 comments on commit 8a69634

Please sign in to comment.