Skip to content

Commit

Permalink
ch
Browse files Browse the repository at this point in the history
  • Loading branch information
mrajashree committed Jan 18, 2020
1 parent 65665db commit 8993b8b
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 46 deletions.
3 changes: 0 additions & 3 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,9 +370,6 @@ func ParseConfig(clusterFile string) (*v3.RancherKubernetesEngineConfig, error)
return nil, err
}

fmt.Printf("rkeConfig: %#v", rkeConfig)
fmt.Printf("rkeConfig upgrade strategy!!: %#v", rkeConfig.UpgradeStrategy)

if isEncryptionEnabled(&rkeConfig) && secretConfig != nil {
rkeConfig.Services.KubeAPI.SecretsEncryptionConfig.CustomConfig = secretConfig
}
Expand Down
73 changes: 30 additions & 43 deletions services/workerplane.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,23 @@
package services

import (
"bytes"
"context"
"fmt"
"strings"
"time"

"github.com/rancher/rke/hosts"
"github.com/rancher/rke/k8s"
"github.com/rancher/rke/log"
"github.com/rancher/rke/pki"
"github.com/rancher/rke/util"
v3 "github.com/rancher/types/apis/management.cattle.io/v3"
"golang.org/x/sync/errgroup"

"bytes"
"fmt"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
k8sutil "k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes"
"k8s.io/kubectl/pkg/drain"
"time"
)

const (
Expand Down Expand Up @@ -53,34 +52,36 @@ func RunWorkerPlane(ctx context.Context, allHosts []*hosts.Host, localConnDialer
}

func UpgradeWorkerPlane(ctx context.Context, kubeClient *kubernetes.Clientset, multipleRolesHosts []*hosts.Host, workerOnlyHosts []*hosts.Host, numInactiveHosts int, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, workerNodePlanMap map[string]v3.RKEConfigNodePlan, certMap map[string]pki.CertificatePKI, updateWorkersOnly bool, alpineImage string, upgradeStrategy *v3.NodeUpgradeStrategy) error {
log.Infof(ctx, "[%s] Upgrading up Worker Plane..", WorkerRole)
log.Infof(ctx, "[%s] Upgrading Worker Plane..", WorkerRole)
var maxUnavailable int
var maxUnavailableParsed k8sutil.IntOrString

if upgradeStrategy.MaxUnavailable == "" {
maxUnavailableParsed = k8sutil.Parse("50%")
maxUnavailableVal := upgradeStrategy.MaxUnavailable
if maxUnavailableVal == "" {
maxUnavailableVal = "50%"
}
maxUnavailableParsed = k8sutil.Parse(upgradeStrategy.MaxUnavailable)
maxUnavailableParsed := k8sutil.Parse(maxUnavailableVal)
maxUnavailable, err := calculateMaxUnavailable(maxUnavailableParsed, len(multipleRolesHosts)+len(workerOnlyHosts))
if err != nil {
return err
}
if numInactiveHosts == maxUnavailable {
// cannot upgrade since already hit maxUnavailable number of nodes
return fmt.Errorf("cannot proceed with upgrade since %v number of hosts are inactive", numInactiveHosts)
return fmt.Errorf("cannot proceed with upgrade since %v (maxUnavailable) hosts are inactive", numInactiveHosts)
}
maxUnavailable -= numInactiveHosts
log.Infof(ctx, "First upgrading worker components on nodes with etcd/controlplane roles")
numUpgradeFailedHosts, err := startWorkerPlane(ctx, kubeClient, multipleRolesHosts, localConnDialerFactory, prsMap, workerNodePlanMap, certMap, updateWorkersOnly, alpineImage, maxUnavailable, upgradeStrategy)
if err != nil {
if numUpgradeFailedHosts == maxUnavailable {
return err
}
upgradeFailedHosts, err := startWorkerPlane(ctx, kubeClient, multipleRolesHosts, localConnDialerFactory, prsMap, workerNodePlanMap, certMap, updateWorkersOnly, alpineImage, maxUnavailable, upgradeStrategy)
if err != nil && len(upgradeFailedHosts) == maxUnavailable {
logrus.Errorf("Failed to upgrade hosts: %v", strings.Join(upgradeFailedHosts, ","))
return err
}

maxUnavailable -= numUpgradeFailedHosts
numUpgradeFailedHosts, err = startWorkerPlane(ctx, kubeClient, workerOnlyHosts, localConnDialerFactory, prsMap, workerNodePlanMap, certMap, updateWorkersOnly, alpineImage, maxUnavailable, upgradeStrategy)
maxUnavailable -= len(upgradeFailedHosts)
log.Infof(ctx, "Upgrading worker components on nodes with only worker roles")
upgradeFailedWorkerOnlyHosts, err := startWorkerPlane(ctx, kubeClient, workerOnlyHosts, localConnDialerFactory, prsMap, workerNodePlanMap, certMap, updateWorkersOnly, alpineImage, maxUnavailable, upgradeStrategy)
if err != nil {
upgradeFailedHosts = append(upgradeFailedHosts, upgradeFailedWorkerOnlyHosts...)
logrus.Errorf("Failed to upgrade hosts: %v", strings.Join(upgradeFailedHosts, ","))
return err
}

Expand Down Expand Up @@ -108,9 +109,10 @@ func enqueueHosts(hosts chan<- *hosts.Host, allHosts []*hosts.Host) {
}

func startWorkerPlane(ctx context.Context, kubeClient *kubernetes.Clientset, allHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, workerNodePlanMap map[string]v3.RKEConfigNodePlan, certMap map[string]pki.CertificatePKI, updateWorkersOnly bool, alpineImage string,
maxUnavailable int, upgradeStrategy *v3.NodeUpgradeStrategy) (int, error) {
maxUnavailable int, upgradeStrategy *v3.NodeUpgradeStrategy) ([]string, error) {
var errgrp errgroup.Group
var drainHelper drain.Helper
var failedHosts []string
hostsQueue := make(chan *hosts.Host, maxUnavailable)
hostsFailedToUpgrade := make(chan string, maxUnavailable)

Expand Down Expand Up @@ -188,55 +190,40 @@ func startWorkerPlane(ctx context.Context, kubeClient *kubernetes.Clientset, all

close(hostsFailedToUpgrade)

if err != nil && len(hostsFailedToUpgrade) == maxUnavailable {
var failedHosts []string
if err != nil {
for host := range hostsFailedToUpgrade {
failedHosts = append(failedHosts, host)
}
logrus.Errorf("Failed to upgrade hosts: %v", strings.Join(failedHosts, ","))
return len(failedHosts), err
}

return len(hostsFailedToUpgrade), nil
return failedHosts, err
}

func getNodeAndCheckStatus(kubeClient *kubernetes.Clientset, runHost *hosts.Host) error {
// if cluster is undergoing upgrade, for each node check if it's ready, cordon it and drain optionally
logrus.Debugf("[workerplane] Now checking for node %v", runHost.HostnameOverride)
logrus.Debugf("[workerplane] Now checking status of node %v", runHost.HostnameOverride)
k8sNode, err := k8s.GetNode(kubeClient, runHost.HostnameOverride)
if err != nil {
logrus.Errorf("[workerplane] Error getting node %v: %v", runHost.HostnameOverride, err)
return err
}
logrus.Debugf("[workerplane] Found host by name %s", runHost.HostnameOverride)
logrus.Debugf("[workerplane] Found node by name %s", runHost.HostnameOverride)
if !k8s.IsNodeReady(*k8sNode) {
return fmt.Errorf("Host %v not ready", runHost.HostnameOverride)
return fmt.Errorf("host %v not ready", runHost.HostnameOverride)
}

return nil
}

func cordonAndDrainNode(kubeClient *kubernetes.Clientset, host *hosts.Host, drainHelper drain.Helper, drain bool) error {
func cordonAndDrainNode(kubeClient *kubernetes.Clientset, host *hosts.Host, drainHelper drain.Helper, drainNode bool) error {
logrus.Debugf("[workerplane] Cordoning node %v", host.HostnameOverride)
if err := k8s.CordonUncordon(kubeClient, host.HostnameOverride, true); err != nil {
return err
}
if !drain {
if !drainNode {
return nil
}
logrus.Debugf("[workerplane] Draining host %v", host.HostnameOverride)
podList, drainError := drainHelper.GetPodsForDeletion(host.HostnameOverride)
if drainError != nil {
fmt.Println("Error draining node ", host.HostnameOverride, drainError)
return drainError[0]
}
logrus.Debugf("[workerplane] Deleting/evicting pods %v from node %v", podList.Pods(), host.HostnameOverride)
podEvictionError := drainHelper.DeleteOrEvictPods(podList.Pods())
if podEvictionError != nil {
return podEvictionError
}
fmt.Println("Drained node ", host.HostnameOverride)
return nil
logrus.Debugf("[workerplane] Draining node %v", host.HostnameOverride)
return drain.RunNodeDrain(&drainHelper, host.HostnameOverride)
}

func doDeployWorkerPlaneHost(ctx context.Context, host *hosts.Host, localConnDialerFactory hosts.DialerFactory, prsMap map[string]v3.PrivateRegistry, processMap map[string]v3.Process, certMap map[string]pki.CertificatePKI, updateWorkersOnly bool, alpineImage string) error {
Expand Down

0 comments on commit 8993b8b

Please sign in to comment.