From 49596ab18681917b7c46b00d3c873cea95572a24 Mon Sep 17 00:00:00 2001 From: harjeevanmaan Date: Thu, 26 Sep 2024 19:49:01 +0530 Subject: [PATCH 1/2] Added unit tests and restructred `await_xcom_sidecar_container_start` method. - The `await_xcom_sidecar_container_start` method in `PodManager` checks if the xcom sidecar container has started running before executing `do_xcom_push`. - The function logs the status periodically and raises an `AirflowException` if the container does not start within the specified timeout. - Added two unit tests: - `test_await_xcom_sidecar_container_timeout`: Verifies that an `AirflowException` is raised if the sidecar container fails to start within the timeout. - `test_await_xcom_sidecar_container_starts`: Confirms the method successfully exits when the sidecar container starts. --- .../cncf/kubernetes/utils/pod_manager.py | 26 ++++++++++++++----- .../cncf/kubernetes/utils/test_pod_manager.py | 15 +++++++++++ 2 files changed, 35 insertions(+), 6 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 7c283eaccc98..cd91dc09281f 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -19,7 +19,6 @@ from __future__ import annotations import enum -import itertools import json import math import time @@ -721,14 +720,29 @@ def read_pod(self, pod: V1Pod) -> V1Pod: except HTTPError as e: raise AirflowException(f"There was an error reading the kubernetes API: {e}") - def await_xcom_sidecar_container_start(self, pod: V1Pod) -> None: + def await_xcom_sidecar_container_start( + self, pod: V1Pod, timeout: int = 900, log_interval: int = 30 + ) -> None: + """Check if the sidecar container has reached the 'Running' state before performing do_xcom_push.""" self.log.info("Checking if xcom sidecar container is started.") - for attempt in itertools.count(): + start_time = time.time() + last_log_time = start_time + + while True: + elapsed_time = time.time() - start_time if self.container_is_running(pod, PodDefaults.SIDECAR_CONTAINER_NAME): - self.log.info("The xcom sidecar container is started.") + self.log.info("The xcom sidecar container has started.") break - if not attempt: - self.log.warning("The xcom sidecar container is not yet started.") + if (time.time() - last_log_time) >= log_interval: + self.log.warning( + "Still waiting for the xcom sidecar container to start. Elapsed time: %d seconds.", + int(elapsed_time), + ) + last_log_time = time.time() + if elapsed_time > timeout: + raise AirflowException( + f"Xcom sidecar container did not start within {timeout // 60} minutes." + ) time.sleep(1) def extract_xcom(self, pod: V1Pod) -> str: diff --git a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py index 3e4f2d086fc1..77c7e887dbc7 100644 --- a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py +++ b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py @@ -601,6 +601,21 @@ def test_extract_xcom_none(self, mock_exec_xcom_kill, mock_exec_pod_command, moc self.pod_manager.extract_xcom(pod=mock_pod) assert mock_exec_xcom_kill.call_count == 1 + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running") + def test_await_xcom_sidecar_container_timeout(self, mock_container_is_running): + mock_pod = MagicMock() + mock_container_is_running.return_value = False + with pytest.raises(AirflowException): + self.pod_manager.await_xcom_sidecar_container_start(pod=mock_pod, timeout=10, log_interval=5) + mock_container_is_running.assert_called_once_with(pod=mock_pod, container_name="airflow-xcom-sidecar") + + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running") + def test_await_xcom_sidecar_container_starts(self, mock_container_is_running): + mock_pod = MagicMock() + mock_container_is_running.return_value = True + self.pod_manager.await_xcom_sidecar_container_start(pod=mock_pod) + mock_container_is_running.assert_called_once_with(mock_pod, "airflow-xcom-sidecar") + def params_for_test_container_is_running(): """The `container_is_running` method is designed to handle an assortment of bad objects From 9e16110420a57daff3edd82e7989a5a54dc7207b Mon Sep 17 00:00:00 2001 From: harjeevanmaan Date: Fri, 4 Oct 2024 03:43:45 +0530 Subject: [PATCH 2/2] Fixed the assertion test failures --- tests/providers/cncf/kubernetes/utils/test_pod_manager.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py index 77c7e887dbc7..73dac5255d62 100644 --- a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py +++ b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py @@ -607,14 +607,14 @@ def test_await_xcom_sidecar_container_timeout(self, mock_container_is_running): mock_container_is_running.return_value = False with pytest.raises(AirflowException): self.pod_manager.await_xcom_sidecar_container_start(pod=mock_pod, timeout=10, log_interval=5) - mock_container_is_running.assert_called_once_with(pod=mock_pod, container_name="airflow-xcom-sidecar") + mock_container_is_running.assert_any_call(mock_pod, "airflow-xcom-sidecar") @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running") def test_await_xcom_sidecar_container_starts(self, mock_container_is_running): mock_pod = MagicMock() mock_container_is_running.return_value = True self.pod_manager.await_xcom_sidecar_container_start(pod=mock_pod) - mock_container_is_running.assert_called_once_with(mock_pod, "airflow-xcom-sidecar") + mock_container_is_running.assert_any_call(mock_pod, "airflow-xcom-sidecar") def params_for_test_container_is_running():