diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 08a5d904ca577..49c65186961a4 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -398,7 +398,7 @@ def consume_logs( Returns the last timestamp observed in logs. """ - timestamp = None + last_captured_timestamp = None try: logs = self.read_pod_logs( pod=pod, @@ -412,7 +412,9 @@ def consume_logs( ) for raw_line in logs: line = raw_line.decode("utf-8", errors="backslashreplace") - timestamp, message = self.parse_log_line(line) + line_timestamp, message = self.parse_log_line(line) + if line_timestamp is not None: + last_captured_timestamp = line_timestamp self.log.info("[%s] %s", container_name, message) except BaseHTTPError as e: self.log.warning( @@ -426,7 +428,7 @@ def consume_logs( pod.metadata.name, exc_info=True, ) - return timestamp or since_time + return last_captured_timestamp or since_time # note: `read_pod_logs` follows the logs, so we shouldn't necessarily *need* to # loop as we do here. But in a long-running process we might temporarily lose connectivity. diff --git a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py index c6c42105b367b..4868f9ca228d3 100644 --- a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py +++ b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py @@ -20,6 +20,7 @@ from datetime import datetime from json.decoder import JSONDecodeError from types import SimpleNamespace +from typing import cast from unittest import mock from unittest.mock import MagicMock @@ -254,6 +255,19 @@ def test_parse_log_line(self): assert timestamp == pendulum.parse(real_timestamp) assert line == log_message + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running") + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.read_pod_logs") + def test_fetch_container_logs_returning_last_timestamp( + self, mock_read_pod_logs, mock_container_is_running + ): + timestamp_string = "2020-10-08T14:16:17.793417674Z" + mock_read_pod_logs.return_value = [bytes(f"{timestamp_string} message", "utf-8"), b"notimestamp"] + mock_container_is_running.side_effect = [True, False] + + status = self.pod_manager.fetch_container_logs(mock.MagicMock(), mock.MagicMock(), follow=True) + + assert status.last_log_time == cast(DateTime, pendulum.parse(timestamp_string)) + def test_parse_invalid_log_line(self, caplog): with caplog.at_level(logging.INFO): self.pod_manager.parse_log_line("2020-10-08T14:16:17.793417674ZInvalidmessage\n")