Skip to content

Commit

Permalink
All executors should inherit from BaseExecutor (apache#41904)
Browse files Browse the repository at this point in the history
  • Loading branch information
dstandish authored and joaopamaral committed Oct 21, 2024
1 parent 830f35b commit 5600388
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 12 deletions.
34 changes: 28 additions & 6 deletions airflow/providers/celery/executors/celery_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from typing import TYPE_CHECKING, Sequence

from airflow.configuration import conf
from airflow.executors.base_executor import BaseExecutor
from airflow.providers.celery.executors.celery_executor import CeleryExecutor

try:
Expand All @@ -30,18 +31,21 @@

raise AirflowOptionalProviderFeatureException(e)

from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.providers_configuration_loader import providers_configuration_loaded

if TYPE_CHECKING:
from airflow.callbacks.base_callback_sink import BaseCallbackSink
from airflow.callbacks.callback_requests import CallbackRequest
from airflow.executors.base_executor import CommandType, EventBufferValueType, QueuedTaskInstanceType
from airflow.executors.base_executor import (
CommandType,
EventBufferValueType,
QueuedTaskInstanceType,
)
from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance
from airflow.models.taskinstancekey import TaskInstanceKey


class CeleryKubernetesExecutor(LoggingMixin):
class CeleryKubernetesExecutor(BaseExecutor):
"""
CeleryKubernetesExecutor consists of CeleryExecutor and KubernetesExecutor.
Expand Down Expand Up @@ -71,11 +75,21 @@ def kubernetes_queue(self) -> str:

def __init__(self, celery_executor: CeleryExecutor, kubernetes_executor: KubernetesExecutor):
super().__init__()
self._job_id: int | None = None
self._job_id: int | str | None = None
self.celery_executor = celery_executor
self.kubernetes_executor = kubernetes_executor
self.kubernetes_executor.kubernetes_queue = self.kubernetes_queue

@property
def _task_event_logs(self):
self.celery_executor._task_event_logs += self.kubernetes_executor._task_event_logs
self.kubernetes_executor._task_event_logs.clear()
return self.celery_executor._task_event_logs

@_task_event_logs.setter
def _task_event_logs(self, value):
"""Not implemented for hybrid executors."""

@property
def queued_tasks(self) -> dict[TaskInstanceKey, QueuedTaskInstanceType]:
"""Return queued tasks from celery and kubernetes executor."""
Expand All @@ -84,13 +98,21 @@ def queued_tasks(self) -> dict[TaskInstanceKey, QueuedTaskInstanceType]:

return queued_tasks

@queued_tasks.setter
def queued_tasks(self, value) -> None:
"""Not implemented for hybrid executors."""

@property
def running(self) -> set[TaskInstanceKey]:
"""Return running tasks from celery and kubernetes executor."""
return self.celery_executor.running.union(self.kubernetes_executor.running)

@running.setter
def running(self, value) -> None:
"""Not implemented for hybrid executors."""

@property
def job_id(self) -> int | None:
def job_id(self) -> int | str | None:
"""
Inherited attribute from BaseExecutor.
Expand All @@ -100,7 +122,7 @@ def job_id(self) -> int | None:
return self._job_id

@job_id.setter
def job_id(self, value: int | None) -> None:
def job_id(self, value: int | str | None) -> None:
"""Expose job ID for SchedulerJob."""
self._job_id = value
self.kubernetes_executor.job_id = value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,22 @@
from typing import TYPE_CHECKING, Sequence

from airflow.configuration import conf
from airflow.executors.base_executor import BaseExecutor
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import KubernetesExecutor
from airflow.utils.log.logging_mixin import LoggingMixin

if TYPE_CHECKING:
from airflow.callbacks.base_callback_sink import BaseCallbackSink
from airflow.callbacks.callback_requests import CallbackRequest
from airflow.executors.base_executor import CommandType, EventBufferValueType, QueuedTaskInstanceType
from airflow.executors.base_executor import (
CommandType,
EventBufferValueType,
QueuedTaskInstanceType,
)
from airflow.executors.local_executor import LocalExecutor
from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance, TaskInstanceKey


class LocalKubernetesExecutor(LoggingMixin):
class LocalKubernetesExecutor(BaseExecutor):
"""
Chooses between LocalExecutor and KubernetesExecutor based on the queue defined on the task.
Expand All @@ -57,11 +61,21 @@ class LocalKubernetesExecutor(LoggingMixin):

def __init__(self, local_executor: LocalExecutor, kubernetes_executor: KubernetesExecutor):
super().__init__()
self._job_id: str | None = None
self._job_id: int | str | None = None
self.local_executor = local_executor
self.kubernetes_executor = kubernetes_executor
self.kubernetes_executor.kubernetes_queue = self.KUBERNETES_QUEUE

@property
def _task_event_logs(self):
self.local_executor._task_event_logs += self.kubernetes_executor._task_event_logs
self.kubernetes_executor._task_event_logs.clear()
return self.local_executor._task_event_logs

@_task_event_logs.setter
def _task_event_logs(self, value):
"""Not implemented for hybrid executors."""

@property
def queued_tasks(self) -> dict[TaskInstanceKey, QueuedTaskInstanceType]:
"""Return queued tasks from local and kubernetes executor."""
Expand All @@ -70,13 +84,21 @@ def queued_tasks(self) -> dict[TaskInstanceKey, QueuedTaskInstanceType]:

return queued_tasks

@queued_tasks.setter
def queued_tasks(self, value) -> None:
"""Not implemented for hybrid executors."""

@property
def running(self) -> set[TaskInstanceKey]:
"""Return running tasks from local and kubernetes executor."""
return self.local_executor.running.union(self.kubernetes_executor.running)

@running.setter
def running(self, value) -> None:
"""Not implemented for hybrid executors."""

@property
def job_id(self) -> str | None:
def job_id(self) -> int | str | None:
"""
Inherited attribute from BaseExecutor.
Expand All @@ -86,7 +108,7 @@ def job_id(self) -> str | None:
return self._job_id

@job_id.setter
def job_id(self, value: str | None) -> None:
def job_id(self, value: int | str | None) -> None:
"""Expose job ID for SchedulerJob."""
self._job_id = value
self.kubernetes_executor.job_id = value
Expand Down

0 comments on commit 5600388

Please sign in to comment.