diff --git a/airflow/providers/celery/executors/celery_kubernetes_executor.py b/airflow/providers/celery/executors/celery_kubernetes_executor.py index bc2ed7904f5a..acd1afcba995 100644 --- a/airflow/providers/celery/executors/celery_kubernetes_executor.py +++ b/airflow/providers/celery/executors/celery_kubernetes_executor.py @@ -21,6 +21,7 @@ from typing import TYPE_CHECKING, Sequence from airflow.configuration import conf +from airflow.executors.base_executor import BaseExecutor from airflow.providers.celery.executors.celery_executor import CeleryExecutor try: @@ -30,18 +31,21 @@ raise AirflowOptionalProviderFeatureException(e) -from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.providers_configuration_loader import providers_configuration_loaded if TYPE_CHECKING: from airflow.callbacks.base_callback_sink import BaseCallbackSink from airflow.callbacks.callback_requests import CallbackRequest - from airflow.executors.base_executor import CommandType, EventBufferValueType, QueuedTaskInstanceType + from airflow.executors.base_executor import ( + CommandType, + EventBufferValueType, + QueuedTaskInstanceType, + ) from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance from airflow.models.taskinstancekey import TaskInstanceKey -class CeleryKubernetesExecutor(LoggingMixin): +class CeleryKubernetesExecutor(BaseExecutor): """ CeleryKubernetesExecutor consists of CeleryExecutor and KubernetesExecutor. @@ -71,11 +75,21 @@ def kubernetes_queue(self) -> str: def __init__(self, celery_executor: CeleryExecutor, kubernetes_executor: KubernetesExecutor): super().__init__() - self._job_id: int | None = None + self._job_id: int | str | None = None self.celery_executor = celery_executor self.kubernetes_executor = kubernetes_executor self.kubernetes_executor.kubernetes_queue = self.kubernetes_queue + @property + def _task_event_logs(self): + self.celery_executor._task_event_logs += self.kubernetes_executor._task_event_logs + self.kubernetes_executor._task_event_logs.clear() + return self.celery_executor._task_event_logs + + @_task_event_logs.setter + def _task_event_logs(self, value): + """Not implemented for hybrid executors.""" + @property def queued_tasks(self) -> dict[TaskInstanceKey, QueuedTaskInstanceType]: """Return queued tasks from celery and kubernetes executor.""" @@ -84,13 +98,21 @@ def queued_tasks(self) -> dict[TaskInstanceKey, QueuedTaskInstanceType]: return queued_tasks + @queued_tasks.setter + def queued_tasks(self, value) -> None: + """Not implemented for hybrid executors.""" + @property def running(self) -> set[TaskInstanceKey]: """Return running tasks from celery and kubernetes executor.""" return self.celery_executor.running.union(self.kubernetes_executor.running) + @running.setter + def running(self, value) -> None: + """Not implemented for hybrid executors.""" + @property - def job_id(self) -> int | None: + def job_id(self) -> int | str | None: """ Inherited attribute from BaseExecutor. @@ -100,7 +122,7 @@ def job_id(self) -> int | None: return self._job_id @job_id.setter - def job_id(self, value: int | None) -> None: + def job_id(self, value: int | str | None) -> None: """Expose job ID for SchedulerJob.""" self._job_id = value self.kubernetes_executor.job_id = value diff --git a/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py b/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py index 75de1101c59b..63755d3d11a1 100644 --- a/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py +++ b/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py @@ -20,18 +20,22 @@ from typing import TYPE_CHECKING, Sequence from airflow.configuration import conf +from airflow.executors.base_executor import BaseExecutor from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import KubernetesExecutor -from airflow.utils.log.logging_mixin import LoggingMixin if TYPE_CHECKING: from airflow.callbacks.base_callback_sink import BaseCallbackSink from airflow.callbacks.callback_requests import CallbackRequest - from airflow.executors.base_executor import CommandType, EventBufferValueType, QueuedTaskInstanceType + from airflow.executors.base_executor import ( + CommandType, + EventBufferValueType, + QueuedTaskInstanceType, + ) from airflow.executors.local_executor import LocalExecutor from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance, TaskInstanceKey -class LocalKubernetesExecutor(LoggingMixin): +class LocalKubernetesExecutor(BaseExecutor): """ Chooses between LocalExecutor and KubernetesExecutor based on the queue defined on the task. @@ -57,11 +61,21 @@ class LocalKubernetesExecutor(LoggingMixin): def __init__(self, local_executor: LocalExecutor, kubernetes_executor: KubernetesExecutor): super().__init__() - self._job_id: str | None = None + self._job_id: int | str | None = None self.local_executor = local_executor self.kubernetes_executor = kubernetes_executor self.kubernetes_executor.kubernetes_queue = self.KUBERNETES_QUEUE + @property + def _task_event_logs(self): + self.local_executor._task_event_logs += self.kubernetes_executor._task_event_logs + self.kubernetes_executor._task_event_logs.clear() + return self.local_executor._task_event_logs + + @_task_event_logs.setter + def _task_event_logs(self, value): + """Not implemented for hybrid executors.""" + @property def queued_tasks(self) -> dict[TaskInstanceKey, QueuedTaskInstanceType]: """Return queued tasks from local and kubernetes executor.""" @@ -70,13 +84,21 @@ def queued_tasks(self) -> dict[TaskInstanceKey, QueuedTaskInstanceType]: return queued_tasks + @queued_tasks.setter + def queued_tasks(self, value) -> None: + """Not implemented for hybrid executors.""" + @property def running(self) -> set[TaskInstanceKey]: """Return running tasks from local and kubernetes executor.""" return self.local_executor.running.union(self.kubernetes_executor.running) + @running.setter + def running(self, value) -> None: + """Not implemented for hybrid executors.""" + @property - def job_id(self) -> str | None: + def job_id(self) -> int | str | None: """ Inherited attribute from BaseExecutor. @@ -86,7 +108,7 @@ def job_id(self) -> str | None: return self._job_id @job_id.setter - def job_id(self, value: str | None) -> None: + def job_id(self, value: int | str | None) -> None: """Expose job ID for SchedulerJob.""" self._job_id = value self.kubernetes_executor.job_id = value