From 8a69634448dea1fb61178ef66c960349b2f339e4 Mon Sep 17 00:00:00 2001 From: rajashree Date: Sun, 19 Jan 2020 22:45:22 -0800 Subject: [PATCH] Upgrade worker nodes in user configurable batches --- .gitignore | 2 + cluster/cluster.go | 43 ++++++++-- cmd/up.go | 7 +- k8s/node.go | 2 +- services/workerplane.go | 185 ++++++++++++++++++++++++++++++++++++++++ 5 files changed, 228 insertions(+), 11 deletions(-) diff --git a/.gitignore b/.gitignore index e91e035d6..f2723dc6b 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,5 @@ kube_config* /rke .vscode +cluster.rkestate +cluster.yml diff --git a/cluster/cluster.go b/cluster/cluster.go index 5cd7f6be1..6ceb86dbc 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -133,22 +133,47 @@ func (c *Cluster) DeployControlPlane(ctx context.Context, svcOptionData map[stri return nil } -func (c *Cluster) DeployWorkerPlane(ctx context.Context, svcOptionData map[string]*v3.KubernetesServicesOptions) error { +func (c *Cluster) DeployWorkerPlane(ctx context.Context, svcOptionData map[string]*v3.KubernetesServicesOptions, isUpgrade bool) error { + var workerOnlyHosts, multipleRolesHosts []*hosts.Host + // Deploy Worker plane workerNodePlanMap := make(map[string]v3.RKEConfigNodePlan) // Build cp node plan map allHosts := hosts.GetUniqueHostList(c.EtcdHosts, c.ControlPlaneHosts, c.WorkerHosts) for _, workerHost := range allHosts { workerNodePlanMap[workerHost.Address] = BuildRKEConfigNodePlan(ctx, c, workerHost, workerHost.DockerInfo, svcOptionData) + if !workerHost.IsControl && !workerHost.IsEtcd { + workerOnlyHosts = append(workerOnlyHosts, workerHost) + } else { + multipleRolesHosts = append(multipleRolesHosts, workerHost) + } } - if err := services.RunWorkerPlane(ctx, allHosts, - c.LocalConnDialerFactory, - c.PrivateRegistriesMap, - workerNodePlanMap, - c.Certificates, - c.UpdateWorkersOnly, - c.SystemImages.Alpine); err != nil { - return fmt.Errorf("[workerPlane] Failed to bring up Worker Plane: %v", err) + + kubeClient, err := k8s.NewClient(c.LocalKubeConfigPath, c.K8sWrapTransport) + if err != nil { + return fmt.Errorf("failed to initialize new kubernetes client: %v", err) + } + + if !isUpgrade { + if err := services.RunWorkerPlane(ctx, allHosts, + c.LocalConnDialerFactory, + c.PrivateRegistriesMap, + workerNodePlanMap, + c.Certificates, + c.UpdateWorkersOnly, + c.SystemImages.Alpine); err != nil { + return fmt.Errorf("[workerPlane] Failed to bring up Worker Plane: %v", err) + } + } else { + if err := services.UpgradeWorkerPlane(ctx, kubeClient, multipleRolesHosts, workerOnlyHosts, len(c.InactiveHosts), + c.LocalConnDialerFactory, + c.PrivateRegistriesMap, + workerNodePlanMap, + c.Certificates, + c.UpdateWorkersOnly, + c.SystemImages.Alpine, c.UpgradeStrategy); err != nil { + return fmt.Errorf("[workerPlane] Failed to upgrade Worker Plane: %v", err) + } } return nil } diff --git a/cmd/up.go b/cmd/up.go index 6b5e2f7e3..00109a8ba 100644 --- a/cmd/up.go +++ b/cmd/up.go @@ -79,6 +79,7 @@ func UpCommand() cli.Command { func ClusterUp(ctx context.Context, dialersOptions hosts.DialersOptions, flags cluster.ExternalFlags, data map[string]interface{}) (string, string, string, string, map[string]pki.CertificatePKI, error) { var APIURL, caCrt, clientCert, clientKey string + var isUpgrade bool clusterState, err := cluster.ReadStateFile(ctx, cluster.GetStateFilePath(flags.ClusterFilePath, flags.ConfigDir)) if err != nil { @@ -118,6 +119,10 @@ func ClusterUp(ctx context.Context, dialersOptions hosts.DialersOptions, flags c return APIURL, caCrt, clientCert, clientKey, nil, err } + if currentCluster != nil { + isUpgrade = true + } + if !flags.DisablePortCheck { if err = kubeCluster.CheckClusterPorts(ctx, currentCluster); err != nil { return APIURL, caCrt, clientCert, clientKey, nil, err @@ -178,7 +183,7 @@ func ClusterUp(ctx context.Context, dialersOptions hosts.DialersOptions, flags c return APIURL, caCrt, clientCert, clientKey, nil, err } - err = kubeCluster.DeployWorkerPlane(ctx, svcOptionsData) + err = kubeCluster.DeployWorkerPlane(ctx, svcOptionsData, isUpgrade) if err != nil { return APIURL, caCrt, clientCert, clientKey, nil, err } diff --git a/k8s/node.go b/k8s/node.go index 66a666e00..738afd339 100644 --- a/k8s/node.go +++ b/k8s/node.go @@ -80,7 +80,7 @@ func CordonUncordon(k8sClient *kubernetes.Clientset, nodeName string, cordoned b func IsNodeReady(node v1.Node) bool { nodeConditions := node.Status.Conditions for _, condition := range nodeConditions { - if condition.Type == "Ready" && condition.Status == v1.ConditionTrue { + if condition.Type == v1.NodeReady && condition.Status == v1.ConditionTrue { return true } } diff --git a/services/workerplane.go b/services/workerplane.go index 41affce42..78357ec7e 100644 --- a/services/workerplane.go +++ b/services/workerplane.go @@ -1,14 +1,23 @@ package services import ( + "bytes" "context" + "fmt" + "strings" + "time" "github.com/rancher/rke/hosts" + "github.com/rancher/rke/k8s" "github.com/rancher/rke/log" "github.com/rancher/rke/pki" "github.com/rancher/rke/util" v3 "github.com/rancher/types/apis/management.cattle.io/v3" + "github.com/sirupsen/logrus" "golang.org/x/sync/errgroup" + k8sutil "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/client-go/kubernetes" + "k8s.io/kubectl/pkg/drain" ) const ( @@ -42,6 +51,182 @@ func RunWorkerPlane(ctx context.Context, allHosts []*hosts.Host, localConnDialer return nil } +func UpgradeWorkerPlane(ctx context.Context, kubeClient *kubernetes.Clientset, multipleRolesHosts []*hosts.Host, workerOnlyHosts []*hosts.Host, numInactiveHosts int, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, workerNodePlanMap map[string]v3.RKEConfigNodePlan, certMap map[string]pki.CertificatePKI, updateWorkersOnly bool, alpineImage string, upgradeStrategy *v3.NodeUpgradeStrategy) error { + log.Infof(ctx, "[%s] Upgrading Worker Plane..", WorkerRole) + var maxUnavailable int + + maxUnavailableVal := upgradeStrategy.MaxUnavailable + if maxUnavailableVal == "" { + // default to 50% + maxUnavailableVal = "50%" + } + maxUnavailableParsed := k8sutil.Parse(maxUnavailableVal) + maxUnavailable, err := calculateMaxUnavailable(maxUnavailableParsed, len(multipleRolesHosts)+len(workerOnlyHosts)) + if err != nil { + return err + } + // TunnelHosts finds out inactive hosts by trying to connect to them and removes them from the hosts list + if numInactiveHosts >= maxUnavailable { + // cannot upgrade since already hit maxUnavailable number of nodes + return fmt.Errorf("cannot proceed with upgrade since %v (maxUnavailable) hosts are inactive", numInactiveHosts) + } + log.Infof(ctx, fmt.Sprintf("Upgrading %v hosts at a time", maxUnavailable)) + // proceed with remaining active hosts + maxUnavailable -= numInactiveHosts + log.Infof(ctx, "First upgrading worker components on nodes with etcd/controlplane roles") + multipleRolesHostsFailedToUpgrade, err := startWorkerPlane(ctx, kubeClient, multipleRolesHosts, localConnDialerFactory, prsMap, workerNodePlanMap, certMap, updateWorkersOnly, alpineImage, maxUnavailable, upgradeStrategy) + if err != nil && len(multipleRolesHostsFailedToUpgrade) == maxUnavailable { + logrus.Errorf("Failed to upgrade hosts: %v", strings.Join(multipleRolesHostsFailedToUpgrade, ",")) + return err + } + + maxUnavailable -= len(multipleRolesHostsFailedToUpgrade) + log.Infof(ctx, "Upgrading worker components on nodes with only worker role") + workerOnlyHostsFailedToUpgrade, err := startWorkerPlane(ctx, kubeClient, workerOnlyHosts, localConnDialerFactory, prsMap, workerNodePlanMap, certMap, updateWorkersOnly, alpineImage, maxUnavailable, upgradeStrategy) + if err != nil { + multipleRolesHostsFailedToUpgrade = append(multipleRolesHostsFailedToUpgrade, workerOnlyHostsFailedToUpgrade...) + logrus.Errorf("Failed to upgrade hosts: %v", strings.Join(multipleRolesHostsFailedToUpgrade, ",")) + return err + } + + log.Infof(ctx, "[%s] Successfully upgraded Worker Plane..", WorkerRole) + return nil +} + +func calculateMaxUnavailable(maxUnavailableParsed k8sutil.IntOrString, numHosts int) (int, error) { + // if maxUnavailable is given in percent, round down + maxUnavailable, err := k8sutil.GetValueFromIntOrPercent(&maxUnavailableParsed, numHosts, false) + if err != nil { + return 0, err + } + if maxUnavailable == 0 { + // In case there is only one node and rounding down maxUnvailable percentage led to 0 + maxUnavailable = 1 + } + return maxUnavailable, nil +} + +func enqueueHosts(hosts chan<- *hosts.Host, allHosts []*hosts.Host) { + for _, host := range allHosts { + hosts <- host + } + close(hosts) +} + +func startWorkerPlane(ctx context.Context, kubeClient *kubernetes.Clientset, allHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, workerNodePlanMap map[string]v3.RKEConfigNodePlan, certMap map[string]pki.CertificatePKI, updateWorkersOnly bool, alpineImage string, + maxUnavailable int, upgradeStrategy *v3.NodeUpgradeStrategy) ([]string, error) { + var errgrp errgroup.Group + var drainHelper drain.Helper + var failedHosts []string + hostsQueue := make(chan *hosts.Host, maxUnavailable) + hostsFailedToUpgrade := make(chan string, maxUnavailable) + + go enqueueHosts(hostsQueue, allHosts) + + if upgradeStrategy.Drain { + drainHelper = drain.Helper{ + Client: kubeClient, + Force: upgradeStrategy.DrainInput.Force, // continue for pods not managed by deployment/replication controller/daemonset + IgnoreAllDaemonSets: upgradeStrategy.DrainInput.IgnoreDaemonSets, // ignore daemonset pods + DeleteLocalData: upgradeStrategy.DrainInput.DeleteLocalData, + GracePeriodSeconds: upgradeStrategy.DrainInput.GracePeriod, + Timeout: time.Duration(upgradeStrategy.DrainInput.Timeout), + Out: bytes.NewBuffer([]byte{}), + ErrOut: bytes.NewBuffer([]byte{}), + } + } + + workerThreads := maxUnavailable + for w := 0; w < workerThreads; w++ { + errgrp.Go(func() error { + var errList []error + for runHost := range hostsQueue { + // when cluster is upgrading, for each node check if it's ready, cordon it and drain optionally + if err := checkNodeStatus(kubeClient, runHost); err != nil { + errList = append(errList, err) + if len(hostsFailedToUpgrade) == maxUnavailable { + break + } + hostsFailedToUpgrade <- runHost.HostnameOverride + } + // cordon and drain + if err := cordonAndDrainNode(kubeClient, runHost, drainHelper, upgradeStrategy.Drain); err != nil { + errList = append(errList, err) + if len(hostsFailedToUpgrade) == maxUnavailable { + break + } + hostsFailedToUpgrade <- runHost.HostnameOverride + } + + // upgrade node + logrus.Debugf("[workerplane] upgrading host %v", runHost.HostnameOverride) + err := doDeployWorkerPlaneHost(ctx, runHost, localConnDialerFactory, prsMap, workerNodePlanMap[runHost.Address].Processes, certMap, updateWorkersOnly, alpineImage) + if err != nil { + errList = append(errList, err) + if len(hostsFailedToUpgrade) == maxUnavailable { + break + } + hostsFailedToUpgrade <- runHost.HostnameOverride + } + + // consider upgrade done when kubeclient lists node as ready + if err := checkNodeStatus(kubeClient, runHost); err != nil { + errList = append(errList, err) + if len(hostsFailedToUpgrade) == maxUnavailable { + break + } + hostsFailedToUpgrade <- runHost.HostnameOverride + } + // uncordon node + if err := k8s.CordonUncordon(kubeClient, runHost.HostnameOverride, false); err != nil { + errList = append(errList, err) + if len(hostsFailedToUpgrade) == maxUnavailable { + break + } + hostsFailedToUpgrade <- runHost.HostnameOverride + } + } + return util.ErrList(errList) + }) + } + + err := errgrp.Wait() + close(hostsFailedToUpgrade) + + if err != nil { + for host := range hostsFailedToUpgrade { + failedHosts = append(failedHosts, host) + } + } + return failedHosts, err +} + +func checkNodeStatus(kubeClient *kubernetes.Clientset, runHost *hosts.Host) error { + logrus.Debugf("[workerplane] Now checking status of node %v", runHost.HostnameOverride) + k8sNode, err := k8s.GetNode(kubeClient, runHost.HostnameOverride) + if err != nil { + logrus.Errorf("[workerplane] Error getting node %v: %v", runHost.HostnameOverride, err) + return err + } + logrus.Debugf("[workerplane] Found node by name %s", runHost.HostnameOverride) + if !k8s.IsNodeReady(*k8sNode) { + return fmt.Errorf("host %v not ready", runHost.HostnameOverride) + } + return nil +} + +func cordonAndDrainNode(kubeClient *kubernetes.Clientset, host *hosts.Host, drainHelper drain.Helper, drainNode bool) error { + logrus.Debugf("[workerplane] Cordoning node %v", host.HostnameOverride) + if err := k8s.CordonUncordon(kubeClient, host.HostnameOverride, true); err != nil { + return err + } + if !drainNode { + return nil + } + logrus.Debugf("[workerplane] Draining node %v", host.HostnameOverride) + return drain.RunNodeDrain(&drainHelper, host.HostnameOverride) +} + func doDeployWorkerPlaneHost(ctx context.Context, host *hosts.Host, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, processMap map[string]v3.Process, certMap map[string]pki.CertificatePKI, updateWorkersOnly bool, alpineImage string) error { if updateWorkersOnly { if !host.UpdateWorker {