Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/mkl/uncordon autoscaler #27

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ A terraform example on how to deploy `eks-rolling-update` as a Kubernetes CronJo
| Environment Variable | Description | Default |
|----------------------------|----------------------------------------------------------------------------------------------------------------------------|------------------------------------------|
| K8S_AUTOSCALER_ENABLED | If True Kubernetes Autoscaler will be paused before running update | False |
| K8S_AUTOSCALER_RESUME_ON_ERROR | If True, Kubernetes Autoscaler will be automatically resumed in case of a previous failure. | True |
| K8S_AUTOSCALER_NAMESPACE | Namespace where Kubernetes Autoscaler is deployed | default |
| K8S_AUTOSCALER_DEPLOYMENT | Deployment name of Kubernetes Autoscaler | cluster-autoscaler |
| K8S_AUTOSCALER_REPLICAS | Number of replicas to scale back up to after Kubernentes Autoscaler paused | 2 |
Expand All @@ -185,6 +186,7 @@ A terraform example on how to deploy `eks-rolling-update` as a Kubernetes CronJo
| TAINT_NODES | Replace the default **cordon**-before-drain strategy with `NoSchedule` **taint**ing, as a workaround for K8S < `1.19` [prematurely removing cordoned nodes](https://github.com/kubernetes/kubernetes/issues/65013) from `Service`-managed `LoadBalancer`s | False |
| EXTRA_DRAIN_ARGS | Additional space-delimited args to supply to the `kubectl drain` function, e.g `--force=true`. See `kubectl drain -h` | "" |
| ENFORCED_DRAINING | If draining fails for a node due to corrupted `PodDisruptionBudget`s or failing pods, retry draining with `--disable-eviction=true` and `--force=true` for this node to prevent aborting the script. This is useful to get the rolling update done in development and testing environments and **should not be used in productive environments** since this will bypass checking `PodDisruptionBudget`s | False |
| UNCORDON_ON_DRAIN_ERROR | If true, and a node can't be drained, the rolling-update will afterwards mark the node as schedulable again to minimize workload interruption | True |

## Run Modes

Expand Down
45 changes: 29 additions & 16 deletions eksrollup/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from .lib.aws import is_asg_scaled, is_asg_healthy, instance_terminated, get_asg_tag, modify_aws_autoscaling, \
count_all_cluster_instances, save_asg_tags, get_asgs, scale_asg, plan_asgs, terminate_instance_in_asg, delete_asg_tags, plan_asgs_older_nodes
from .lib.k8s import k8s_nodes_count, k8s_nodes_ready, get_k8s_nodes, modify_k8s_autoscaler, get_node_by_instance_id, \
drain_node, delete_node, cordon_node, taint_node
from .lib.exceptions import RollingUpdateException
drain_node, delete_node, cordon_node, taint_node, uncordon_node
from .lib.exceptions import RollingUpdateException, NodeNotDrainedException


def validate_cluster_health(asg_name, new_desired_asg_capacity, cluster_name, predictive, health_check_type="regular",):
Expand Down Expand Up @@ -220,25 +220,32 @@ def update_asgs(asgs, cluster_name):

# start draining and terminating
desired_asg_capacity = asg_state_dict[asg_name][0]
exceptions = []
for outdated in outdated_instances:
# catch any failures so we can resume aws autoscaling
try:
# get the k8s node name instead of instance id
node_name = get_node_by_instance_id(k8s_nodes, outdated['InstanceId'])
desired_asg_capacity -= 1
drain_node(node_name)
delete_node(node_name)
save_asg_tags(asg_name, app_config["ASG_DESIRED_STATE_TAG"], desired_asg_capacity)
# terminate/detach outdated instances only if ASG termination policy is ignored
if not use_asg_termination_policy:
terminate_instance_in_asg(outdated['InstanceId'])
if not instance_terminated(outdated['InstanceId']):
raise Exception('Instance is failing to terminate. Cancelling out.')

between_nodes_wait = app_config['BETWEEN_NODES_WAIT']
if between_nodes_wait != 0:
logger.info(f'Waiting for {between_nodes_wait} seconds before continuing...')
time.sleep(between_nodes_wait)
try:
drain_node(node_name)
delete_node(node_name)
save_asg_tags(asg_name, app_config["ASG_DESIRED_STATE_TAG"], desired_asg_capacity)
# terminate/detach outdated instances only if ASG termination policy is ignored
if not use_asg_termination_policy:
terminate_instance_in_asg(outdated['InstanceId'])
if not instance_terminated(outdated['InstanceId']):
raise Exception('Instance is failing to terminate. Cancelling out.')
except NodeNotDrainedException as e:
logger.warn(f"node {e.node_name} could not be drained force={e.forced}, log={e.message}")
if app_config['UNCORDON_ON_DRAIN_ERROR']:
cordon_node(node_name, False)
exceptions.append(e)

between_nodes_wait = app_config['BETWEEN_NODES_WAIT']
if between_nodes_wait != 0:
logger.info(f'Waiting for {between_nodes_wait} seconds before continuing...')
time.sleep(between_nodes_wait)
except Exception as drain_exception:
try:
node_name = get_node_by_instance_id(k8s_excluded_nodes, outdated['InstanceId'])
Expand All @@ -247,6 +254,9 @@ def update_asgs(asgs, cluster_name):
except:
raise RollingUpdateException("Rolling update on ASG failed", asg_name)

if len(exceptions > 0):
raise RollingUpdateException("Rolling update on ASG failed", asg_name)

# scaling cluster back down
logger.info("Scaling asg back down to original state")
asg_desired_capacity, asg_orig_desired_capacity, asg_orig_max_capacity = asg_state_dict[asg_name]
Expand Down Expand Up @@ -298,5 +308,8 @@ def main(args=None):
logger.error('*** Rolling update of ASG has failed. Exiting ***')
logger.error('AWS Auto Scaling Group processes will need resuming manually')
if app_config['K8S_AUTOSCALER_ENABLED']:
logger.error('Kubernetes Cluster Autoscaler will need resuming manually')
if app_config['K8S_AUTOSCALER_RESUME_ON_ERROR']:
modify_k8s_autoscaler("resume")
else:
logger.error('Kubernetes Cluster Autoscaler will need resuming manually')
sys.exit(1)
2 changes: 2 additions & 0 deletions eksrollup/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ def str_to_bool(val):
'K8S_AUTOSCALER_NAMESPACE': os.getenv('K8S_AUTOSCALER_NAMESPACE', 'default'),
'K8S_AUTOSCALER_DEPLOYMENT': os.getenv('K8S_AUTOSCALER_DEPLOYMENT', 'cluster-autoscaler'),
'K8S_AUTOSCALER_REPLICAS': int(os.getenv('K8S_AUTOSCALER_REPLICAS', 2)),
'K8S_AUTOSCALER_RESUME_ON_ERROR': str_to_bool(os.getenv('K8S_AUTOSCALER_RESUME_ON_ERROR', True)),
'K8S_CONTEXT': os.getenv('K8S_CONTEXT', None),
'K8S_PROXY_BYPASS': str_to_bool(os.getenv('K8S_PROXY_BYPASS', False)),
'ASG_DESIRED_STATE_TAG': os.getenv('ASG_DESIRED_STATE_TAG', 'eks-rolling-update:desired_capacity'),
Expand All @@ -33,5 +34,6 @@ def str_to_bool(val):
'TAINT_NODES': str_to_bool(os.getenv('TAINT_NODES', False)),
'BATCH_SIZE': int(os.getenv('BATCH_SIZE', 0)),
'ENFORCED_DRAINING': str_to_bool(os.getenv('ENFORCED_DRAINING', False)),
'UNCORDON_ON_DRAIN_ERROR': str_to_bool(os.getenv('UNCORDON_ON_DRAIN_ERROR', True)),
'ASG_NAMES': os.getenv('ASG_NAMES', '').split()
}
6 changes: 6 additions & 0 deletions eksrollup/lib/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,9 @@ class RollingUpdateException(Exception):
def __init__(self, message, asg_name):
self.message = message
self.asg_name = asg_name

class NodeNotDrainedException(Exception):
def __init__(self, message, node_name, forced):
self.message = message
self.node_name = node_name
self.forced = forced
17 changes: 9 additions & 8 deletions eksrollup/lib/k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import time
import sys
from .logger import logger
from .exceptions import NodeNotDrainedException
from eksrollup.config import app_config


Expand Down Expand Up @@ -122,7 +123,7 @@ def delete_node(node_name):
logger.info("Exception when calling CoreV1Api->delete_node: {}".format(e))


def cordon_node(node_name):
def cordon_node(node_name, cordon=True):
"""
Cordon a kubernetes node to avoid new pods being scheduled on it
"""
Expand All @@ -131,14 +132,14 @@ def cordon_node(node_name):

# create an instance of the API class
k8s_api = client.CoreV1Api()
logger.info("Cordoning k8s node {}...".format(node_name))
logger.info("Cordoning k8s node {}={}...".format(node_name, cordon))
try:
api_call_body = client.V1Node(spec=client.V1NodeSpec(unschedulable=True))
api_call_body = client.V1Node(spec=client.V1NodeSpec(unschedulable=cordon))
if not app_config['DRY_RUN']:
k8s_api.patch_node(node_name, api_call_body)
else:
k8s_api.patch_node(node_name, api_call_body, dry_run=True)
logger.info("Node cordoned")
logger.info("Node (un)cordoned")
except ApiException as e:
logger.info("Exception when calling CoreV1Api->patch_node: {}".format(e))

Expand Down Expand Up @@ -181,7 +182,7 @@ def drain_node(node_name):
kubectl_args += ['--dry-run']

logger.info('Draining worker node with {}...'.format(' '.join(kubectl_args)))
result = subprocess.run(kubectl_args)
result = subprocess.run(kubectl_args, capture_output=True)

# If returncode is non-zero run enforced draining of the node or raise a CalledProcessError.
if result.returncode != 0:
Expand All @@ -191,11 +192,11 @@ def drain_node(node_name):
'--force=true'
]
logger.info('There was an error draining the worker node, proceed with enforced draining ({})...'.format(' '.join(kubectl_args)))
enforced_result = subprocess.run(kubectl_args)
enforced_result = subprocess.run(kubectl_args, capture_output=True)
if enforced_result.returncode != 0:
raise Exception("Node not drained properly with enforced draining enabled. Exiting")
raise NodeNotDrainedException(enforced_result.stdout, node_name, True)
else:
raise Exception("Node not drained properly. Exiting")
raise NodeNotDrainedException(result.stdout, node_name, False)


def k8s_nodes_ready(max_retry=app_config['GLOBAL_MAX_RETRY'], wait=app_config['GLOBAL_HEALTH_WAIT']):
Expand Down
Loading