Skip to content

Commit

Permalink
Add eksctl drain nodegroup
Browse files Browse the repository at this point in the history
  • Loading branch information
errordeveloper committed Feb 28, 2019
1 parent 3c84af4 commit adac1b6
Show file tree
Hide file tree
Showing 5 changed files with 280 additions and 0 deletions.
7 changes: 7 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,16 @@ You can also enable SSH, ASG access and other feature for each particular nodegr
eksctl create nodegroup --cluster=cluster-1 --node-labels="autoscaling=enabled,purpose=ci-worker" --asg-access --full-ecr-access --ssh-access
```

To cordon a nodegroup and evict all of the pods, run:
```
eksctl drain nodegroup --cluster=<clusterName> --name=<nodegroupName>
```

To uncordon a nodegroup, run:
```
eksctl drain nodegroup --cluster=<clusterName> --name=<nodegroupName> --undo
```

To delete a nodegroup, run:

```
Expand Down
3 changes: 3 additions & 0 deletions cmd/eksctl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import (

"github.com/kris-nova/logger"
"github.com/spf13/cobra"

"github.com/weaveworks/eksctl/pkg/ctl/cmdutils"
"github.com/weaveworks/eksctl/pkg/ctl/completion"
"github.com/weaveworks/eksctl/pkg/ctl/create"
"github.com/weaveworks/eksctl/pkg/ctl/delete"
"github.com/weaveworks/eksctl/pkg/ctl/drain"
"github.com/weaveworks/eksctl/pkg/ctl/get"
"github.com/weaveworks/eksctl/pkg/ctl/scale"
"github.com/weaveworks/eksctl/pkg/ctl/utils"
Expand Down Expand Up @@ -74,6 +76,7 @@ func addCommands(g *cmdutils.Grouping) {
rootCmd.AddCommand(delete.Command(g))
rootCmd.AddCommand(get.Command(g))
rootCmd.AddCommand(scale.Command(g))
rootCmd.AddCommand(drain.Command(g))
rootCmd.AddCommand(utils.Command(g))
rootCmd.AddCommand(completion.Command(rootCmd))
}
24 changes: 24 additions & 0 deletions pkg/ctl/drain/drain.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package drain

import (
"github.com/kris-nova/logger"
"github.com/spf13/cobra"
"github.com/weaveworks/eksctl/pkg/ctl/cmdutils"
)

// Command will create the `drain` commands
func Command(g *cmdutils.Grouping) *cobra.Command {
cmd := &cobra.Command{
Use: "drain",
Short: "drain resources(s)",
Run: func(c *cobra.Command, _ []string) {
if err := c.Help(); err != nil {
logger.Debug("ignoring error %q", err.Error())
}
},
}

cmd.AddCommand(drainNodeGroupCmd(g))

return cmd
}
236 changes: 236 additions & 0 deletions pkg/ctl/drain/nodegroup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
package drain

import (
"fmt"
"os"
"time"

"github.com/kris-nova/logger"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"github.com/spf13/pflag"

api "github.com/weaveworks/eksctl/pkg/apis/eksctl.io/v1alpha4"
"github.com/weaveworks/eksctl/pkg/ctl/cmdutils"
"github.com/weaveworks/eksctl/pkg/eks"

"github.com/weaveworks/eksctl/pkg/drain"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
)

var (
drainNodeGroupUndo bool
)

func drainNodeGroupCmd(g *cmdutils.Grouping) *cobra.Command {
p := &api.ProviderConfig{}
cfg := api.NewClusterConfig()
ng := cfg.NewNodeGroup()

cmd := &cobra.Command{
Use: "nodegroup",
Short: "Cordon and drain a nodegroup",
Run: func(_ *cobra.Command, args []string) {
if err := doDrainNodeGroup(p, cfg, ng, cmdutils.GetNameArg(args)); err != nil {
logger.Critical("%s\n", err.Error())
os.Exit(1)
}
},
}

group := g.New(cmd)

group.InFlagSet("General", func(fs *pflag.FlagSet) {
fs.StringVar(&cfg.Metadata.Name, "cluster", "", "EKS cluster name (required)")
cmdutils.AddRegionFlag(fs, p)
fs.StringVarP(&ng.Name, "name", "n", "", "Name of the nodegroup to delete (required)")
fs.BoolVar(&drainNodeGroupUndo, "undo", false, "Uncordone the nodegroup")
})

cmdutils.AddCommonFlagsForAWS(group, p, true)

group.AddTo(cmd)

return cmd
}

func doDrainNodeGroup(p *api.ProviderConfig, cfg *api.ClusterConfig, ng *api.NodeGroup, nameArg string) error {
ctl := eks.New(p, cfg)

if err := api.Register(); err != nil {
return err
}

if err := ctl.CheckAuth(); err != nil {
return err
}

if cfg.Metadata.Name == "" {
return errors.New("--cluster must be set")
}

if ng.Name != "" && nameArg != "" {
return cmdutils.ErrNameFlagAndArg(ng.Name, nameArg)
}

if nameArg != "" {
ng.Name = nameArg
}

if ng.Name == "" {
return fmt.Errorf("--name must be set")
}

if err := ctl.GetCredentials(cfg); err != nil {
return errors.Wrapf(err, "getting credentials for cluster %q", cfg.Metadata.Name)
}

clientSet, err := ctl.NewStdClientSet(cfg)
if err != nil {
return err
}

drainer := &drain.Helper{
Client: clientSet,

// TODO: Force, DeleteLocalData & IgnoreAllDaemonSets shouldn't
// be enabled by default, we need flags to control thes, but that
// requires more improvements in the underlying drain package,
// as it currently produces errors and warnings with references
// to kubectl flags
Force: true,
DeleteLocalData: true,
IgnoreAllDaemonSets: true,

// TODO: ideally only the list of well-known DaemonSets should
// be set by default
IgnoreDaemonSets: []metav1.ObjectMeta{
{
Namespace: "kube-system",
Name: "aws-node",
},
{
Namespace: "kube-system",
Name: "kube-proxy",
},
{
Name: "node-exporter",
},
{
Name: "prom-node-exporter",
},
{
Name: "weave-scope",
},
{
Name: "weave-scope-agent",
},
{
Name: "weave-net",
},
},
}

if err := drainer.CanUseEvictions(); err != nil {
return errors.Wrapf(err, "checking if cluster implements policy API")
}

drainedNodes := sets.NewString()
// loop until all nodes are drained to handle accidential scale-up
// or any other changes in the ASG
timer := time.After(ctl.Provider.WaitTimeout())
timeout := false
for !timeout {
select {
case <-timer:
timeout = true
default:
nodes, err := clientSet.CoreV1().Nodes().List(ng.ListOptions())
if err != nil {
return err
}

if len(nodes.Items) == 0 {
logger.Warning("no nodes found in nodegroup %q (label selector: %q)", ng.Name, ng.ListOptions().LabelSelector)
return nil
}

newPendingNodes := sets.NewString()

for _, node := range nodes.Items {
if drainedNodes.Has(node.Name) {
continue // already drained, get next one
}
newPendingNodes.Insert(node.Name)
desired := drain.CordonNode
if drainNodeGroupUndo {
desired = drain.UncordonNode
}
c := drain.NewCordonHelper(&node, desired)
if c.IsUpdateRequired() {
err, patchErr := c.PatchOrReplace(clientSet)
if patchErr != nil {
logger.Warning(patchErr.Error())
}
if err != nil {
logger.Critical(err.Error())
}
logger.Info("%s node %q", desired, node.Name)
} else {
logger.Debug("no need to %s node %q", desired, node.Name)
}
}

if drainNodeGroupUndo {
return nil // no need to kill any pods
}

if drainedNodes.HasAll(newPendingNodes.List()...) {
logger.Success("drained nodes: %v", drainedNodes.List())
return nil // no new nodes were seen
}

logger.Debug("already drained: %v", drainedNodes.List())
logger.Debug("will drain: %v", newPendingNodes.List())

for _, node := range nodes.Items {
if newPendingNodes.Has(node.Name) {
pending, err := evictPods(drainer, &node)
if err != nil {
return err
}
logger.Debug("%d pods to be evicted from %s", pending, node.Name)
if pending == 0 {
drainedNodes.Insert(node.Name)
}
}
}
}
}
if timeout {
return fmt.Errorf("timed out (after %s) waiting for nodedroup %q to be drain", ctl.Provider.WaitTimeout(), ng.Name)
}

return nil
}

func evictPods(drainer *drain.Helper, node *corev1.Node) (int, error) {
list, errs := drainer.GetPodsForDeletion(node.Name)
if len(errs) > 0 {
return 0, fmt.Errorf("errs: %v", errs) // TODO: improve formatting
}
if w := list.Warnings(); w != "" {
logger.Warning(w)
}
pods := list.Pods()
pending := len(pods)
for _, pod := range pods {
// TODO: handle API rate limitter error
if err := drainer.EvictOrDeletePod(pod); err != nil {
return pending, err
}
}
return pending, nil
}

0 comments on commit adac1b6

Please sign in to comment.