Skip to content

Commit

Permalink
refactor: simplify custom kubernetes pod operator (#730)
Browse files Browse the repository at this point in the history
  • Loading branch information
sbchaos authored Feb 6, 2023
1 parent 9d64ab2 commit dd55572
Showing 1 changed file with 2 additions and 108 deletions.
110 changes: 2 additions & 108 deletions ext/scheduler/airflow/__lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,16 @@
import logging
from datetime import datetime, timedelta

import time

from typing import Any, Dict, Optional

import pendulum
import requests
from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.hooks.base import BaseHook
from airflow.kubernetes import kube_client
from airflow.models import (XCOM_RETURN_KEY, Variable, XCom)
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from airflow.providers.cncf.kubernetes.utils import pod_launcher
from airflow.providers.slack.operators.slack import SlackAPIPostOperator
from airflow.sensors.base import BaseSensorOperator
from airflow.utils import yaml
from airflow.utils.state import State
from croniter import croniter
from kubernetes.client import models as k8s

Expand Down Expand Up @@ -59,14 +52,6 @@ def lookup_non_standard_cron_expression(expr: str) -> str:


class SuperKubernetesPodOperator(KubernetesPodOperator):
"""
** SAME AS KubernetesPodOperator: Execute a task in a Kubernetes Pod **
Wrapper to push xcom as a return value key even if container completes with non success status
.. note: keep this up to date if there is any change in KubernetesPodOperator execute method
"""
template_fields = ('image', 'cmds', 'arguments', 'env_vars', 'config_file', 'pod_template_file')

def __init__(self,
optimus_hostname,
optimus_projectname,
Expand Down Expand Up @@ -108,102 +93,11 @@ def fetch_env_from_optimus(self, context):
k8s.V1EnvVar(name=key, value=val) for key, val in job_meta["secrets"].items()
]

def _dry_run(self, pod):
def prune_dict(val: Any, mode='strict'):
"""
Given dict ``val``, returns new dict based on ``val`` with all
empty elements removed.
What constitutes "empty" is controlled by the ``mode`` parameter. If mode is 'strict'
then only ``None`` elements will be removed. If mode is ``truthy``, then element ``x``
will be removed if ``bool(x) is False``.
"""

def is_empty(x):
if mode == 'strict':
return x is None
elif mode == 'truthy':
return bool(x) is False
raise ValueError("allowable values for `mode` include 'truthy' and 'strict'")

if isinstance(val, dict):
new_dict = {}
for k, v in val.items():
if is_empty(v):
continue
elif isinstance(v, (list, dict)):
new_val = prune_dict(v, mode=mode)
if new_val:
new_dict[k] = new_val
else:
new_dict[k] = v
return new_dict
elif isinstance(val, list):
new_list = []
for v in val:
if is_empty(v):
continue
elif isinstance(v, (list, dict)):
new_val = prune_dict(v, mode=mode)
if new_val:
new_list.append(new_val)
else:
new_list.append(v)
return new_list
else:
return val

log.info(prune_dict(pod.to_dict(), mode='strict'))
log.info(yaml.dump(prune_dict(pod.to_dict(), mode='strict')))

def execute(self, context):
self.env_vars += self.fetch_env_from_optimus(context)
# init-container is not considered for rendering in airflow
self.render_init_containers(context)

log.info('Task image version: %s', self.image)
try:
if self.in_cluster is not None:
client = kube_client.get_kube_client(in_cluster=self.in_cluster,
cluster_context=self.cluster_context,
config_file=self.config_file)
else:
client = kube_client.get_kube_client(cluster_context=self.cluster_context,
config_file=self.config_file)

self.pod = self.create_pod_request_obj()
# self._dry_run(self.pod) # logs the yaml file for the pod [not compatible for future verison of implementation]

self.namespace = self.pod.metadata.namespace
self.client = client

# Add combination of labels to uniquely identify a running pod
labels = self.create_labels_for_pod(context)

label_selector = self._get_pod_identifying_label_string(labels)

pod_list = client.list_namespaced_pod(self.namespace, label_selector=label_selector)

if len(pod_list.items) > 1 and self.reattach_on_restart:
raise AirflowException(
'More than one pod running with labels: '
'{label_selector}'.format(label_selector=label_selector))

launcher = pod_launcher.PodLauncher(kube_client=client, extract_xcom=self.do_xcom_push)

if len(pod_list.items) == 1:
try_numbers_match = self._try_numbers_match(context, pod_list.items[0])
final_state, _, result = self.handle_pod_overlap(labels, try_numbers_match, launcher, pod_list.items[0])
else:
final_state, _, result = self.create_new_pod_for_operator(labels, launcher)

if final_state != State.SUCCESS:
# push xcom value even if pod fails
context.get('task_instance').xcom_push(key=XCOM_RETURN_KEY, value=result)
raise AirflowException(
'Pod returned a failure: {state}'.format(state=final_state))
return result
except AirflowException as ex:
raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
# call KubernetesPodOperator to handle the pod
return super().execute(context)


class OptimusAPIClient:
Expand Down

0 comments on commit dd55572

Please sign in to comment.