From dd555728de976cb430ee5450598cbc6c9338f978 Mon Sep 17 00:00:00 2001 From: Sandeep Bhardwaj Date: Mon, 6 Feb 2023 15:26:37 +0530 Subject: [PATCH] refactor: simplify custom kubernetes pod operator (#730) --- ext/scheduler/airflow/__lib.py | 110 +-------------------------------- 1 file changed, 2 insertions(+), 108 deletions(-) diff --git a/ext/scheduler/airflow/__lib.py b/ext/scheduler/airflow/__lib.py index bc3511580e..bc0838cdb8 100644 --- a/ext/scheduler/airflow/__lib.py +++ b/ext/scheduler/airflow/__lib.py @@ -2,23 +2,16 @@ import logging from datetime import datetime, timedelta -import time - from typing import Any, Dict, Optional import pendulum import requests from airflow.configuration import conf -from airflow.exceptions import AirflowException from airflow.hooks.base import BaseHook -from airflow.kubernetes import kube_client from airflow.models import (XCOM_RETURN_KEY, Variable, XCom) from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator -from airflow.providers.cncf.kubernetes.utils import pod_launcher from airflow.providers.slack.operators.slack import SlackAPIPostOperator from airflow.sensors.base import BaseSensorOperator -from airflow.utils import yaml -from airflow.utils.state import State from croniter import croniter from kubernetes.client import models as k8s @@ -59,14 +52,6 @@ def lookup_non_standard_cron_expression(expr: str) -> str: class SuperKubernetesPodOperator(KubernetesPodOperator): - """ - ** SAME AS KubernetesPodOperator: Execute a task in a Kubernetes Pod ** - Wrapper to push xcom as a return value key even if container completes with non success status - - .. note: keep this up to date if there is any change in KubernetesPodOperator execute method - """ - template_fields = ('image', 'cmds', 'arguments', 'env_vars', 'config_file', 'pod_template_file') - def __init__(self, optimus_hostname, optimus_projectname, @@ -108,102 +93,11 @@ def fetch_env_from_optimus(self, context): k8s.V1EnvVar(name=key, value=val) for key, val in job_meta["secrets"].items() ] - def _dry_run(self, pod): - def prune_dict(val: Any, mode='strict'): - """ - Given dict ``val``, returns new dict based on ``val`` with all - empty elements removed. - What constitutes "empty" is controlled by the ``mode`` parameter. If mode is 'strict' - then only ``None`` elements will be removed. If mode is ``truthy``, then element ``x`` - will be removed if ``bool(x) is False``. - """ - - def is_empty(x): - if mode == 'strict': - return x is None - elif mode == 'truthy': - return bool(x) is False - raise ValueError("allowable values for `mode` include 'truthy' and 'strict'") - - if isinstance(val, dict): - new_dict = {} - for k, v in val.items(): - if is_empty(v): - continue - elif isinstance(v, (list, dict)): - new_val = prune_dict(v, mode=mode) - if new_val: - new_dict[k] = new_val - else: - new_dict[k] = v - return new_dict - elif isinstance(val, list): - new_list = [] - for v in val: - if is_empty(v): - continue - elif isinstance(v, (list, dict)): - new_val = prune_dict(v, mode=mode) - if new_val: - new_list.append(new_val) - else: - new_list.append(v) - return new_list - else: - return val - - log.info(prune_dict(pod.to_dict(), mode='strict')) - log.info(yaml.dump(prune_dict(pod.to_dict(), mode='strict'))) - def execute(self, context): self.env_vars += self.fetch_env_from_optimus(context) - # init-container is not considered for rendering in airflow self.render_init_containers(context) - - log.info('Task image version: %s', self.image) - try: - if self.in_cluster is not None: - client = kube_client.get_kube_client(in_cluster=self.in_cluster, - cluster_context=self.cluster_context, - config_file=self.config_file) - else: - client = kube_client.get_kube_client(cluster_context=self.cluster_context, - config_file=self.config_file) - - self.pod = self.create_pod_request_obj() - # self._dry_run(self.pod) # logs the yaml file for the pod [not compatible for future verison of implementation] - - self.namespace = self.pod.metadata.namespace - self.client = client - - # Add combination of labels to uniquely identify a running pod - labels = self.create_labels_for_pod(context) - - label_selector = self._get_pod_identifying_label_string(labels) - - pod_list = client.list_namespaced_pod(self.namespace, label_selector=label_selector) - - if len(pod_list.items) > 1 and self.reattach_on_restart: - raise AirflowException( - 'More than one pod running with labels: ' - '{label_selector}'.format(label_selector=label_selector)) - - launcher = pod_launcher.PodLauncher(kube_client=client, extract_xcom=self.do_xcom_push) - - if len(pod_list.items) == 1: - try_numbers_match = self._try_numbers_match(context, pod_list.items[0]) - final_state, _, result = self.handle_pod_overlap(labels, try_numbers_match, launcher, pod_list.items[0]) - else: - final_state, _, result = self.create_new_pod_for_operator(labels, launcher) - - if final_state != State.SUCCESS: - # push xcom value even if pod fails - context.get('task_instance').xcom_push(key=XCOM_RETURN_KEY, value=result) - raise AirflowException( - 'Pod returned a failure: {state}'.format(state=final_state)) - return result - except AirflowException as ex: - raise AirflowException('Pod Launching failed: {error}'.format(error=ex)) + # call KubernetesPodOperator to handle the pod + return super().execute(context) class OptimusAPIClient: