Skip to content

Commit

Permalink
Disconnect GKE operators from deprecated hooks (apache#39434)
Browse files Browse the repository at this point in the history
* Disconnect GKE operators from deprecated hooks

* Remove GKE unit tests from deprecation ignore list
  • Loading branch information
moiseenkov authored and pateash committed May 13, 2024
1 parent b359c72 commit b71bc89
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 52 deletions.
47 changes: 21 additions & 26 deletions airflow/providers/google/cloud/operators/kubernetes_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,8 @@
)
from airflow.providers.cncf.kubernetes.utils.pod_manager import OnFinishAction
from airflow.providers.google.cloud.hooks.kubernetes_engine import (
GKECustomResourceHook,
GKEDeploymentHook,
GKEHook,
GKEJobHook,
GKEKubernetesHook,
GKEPodHook,
)
from airflow.providers.google.cloud.links.kubernetes_engine import (
KubernetesEngineClusterLink,
Expand Down Expand Up @@ -533,27 +529,28 @@ def cluster_hook(self) -> GKEHook:
)

@cached_property
def deployment_hook(self) -> GKEDeploymentHook:
def deployment_hook(self) -> GKEKubernetesHook:
if self._cluster_url is None or self._ssl_ca_cert is None:
raise AttributeError(
"Cluster url and ssl_ca_cert should be defined before using self.hook method. "
"Cluster url and ssl_ca_cert should be defined before using self.deployment_hook method. "
"Try to use self.get_kube_creds method",
)
return GKEDeploymentHook(
return GKEKubernetesHook(
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
cluster_url=self._cluster_url,
ssl_ca_cert=self._ssl_ca_cert,
)

@cached_property
def pod_hook(self) -> GKEPodHook:
def pod_hook(self) -> GKEKubernetesHook:
if self._cluster_url is None or self._ssl_ca_cert is None:
raise AttributeError(
"Cluster url and ssl_ca_cert should be defined before using self.hook method. "
"Cluster url and ssl_ca_cert should be defined before using self.pod_hook method. "
"Try to use self.get_kube_creds method",
)
return GKEPodHook(

return GKEKubernetesHook(
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
cluster_url=self._cluster_url,
Expand Down Expand Up @@ -742,21 +739,20 @@ def cluster_hook(self) -> GKEHook:
)

@cached_property
def hook(self) -> GKEPodHook:
def hook(self) -> GKEKubernetesHook:
if self._cluster_url is None or self._ssl_ca_cert is None:
raise AttributeError(
"Cluster url and ssl_ca_cert should be defined before using self.hook method. "
"Try to use self.get_kube_creds method",
)

hook = GKEPodHook(
return GKEKubernetesHook(
gcp_conn_id=self.gcp_conn_id,
cluster_url=self._cluster_url,
ssl_ca_cert=self._ssl_ca_cert,
impersonation_chain=self.impersonation_chain,
enable_tcp_keepalive=True,
)
return hook

def execute(self, context: Context):
"""Execute process of creating pod and executing provided command inside it."""
Expand Down Expand Up @@ -901,19 +897,18 @@ def cluster_hook(self) -> GKEHook:
)

@cached_property
def hook(self) -> GKEJobHook:
def hook(self) -> GKEKubernetesHook:
if self._cluster_url is None or self._ssl_ca_cert is None:
raise AttributeError(
"Cluster url and ssl_ca_cert should be defined before using self.hook method. "
"Try to use self.get_kube_creds method",
)

hook = GKEJobHook(
return GKEKubernetesHook(
gcp_conn_id=self.gcp_conn_id,
cluster_url=self._cluster_url,
ssl_ca_cert=self._ssl_ca_cert,
)
return hook

def execute(self, context: Context):
"""Execute process of creating Job."""
Expand Down Expand Up @@ -1027,15 +1022,15 @@ def cluster_hook(self) -> GKEHook:
)

@cached_property
def hook(self) -> GKEJobHook:
def hook(self) -> GKEKubernetesHook:
self._cluster_url, self._ssl_ca_cert = GKEClusterAuthDetails(
cluster_name=self.cluster_name,
project_id=self.project_id,
use_internal_ip=self.use_internal_ip,
cluster_hook=self.cluster_hook,
).fetch_cluster_info()

return GKEJobHook(
return GKEKubernetesHook(
gcp_conn_id=self.gcp_conn_id,
cluster_url=self._cluster_url,
ssl_ca_cert=self._ssl_ca_cert,
Expand Down Expand Up @@ -1128,15 +1123,15 @@ def cluster_hook(self) -> GKEHook:
)

@cached_property
def hook(self) -> GKEJobHook:
def hook(self) -> GKEKubernetesHook:
self._cluster_url, self._ssl_ca_cert = GKEClusterAuthDetails(
cluster_name=self.cluster_name,
project_id=self.project_id,
use_internal_ip=self.use_internal_ip,
cluster_hook=self.cluster_hook,
).fetch_cluster_info()

return GKEJobHook(
return GKEKubernetesHook(
gcp_conn_id=self.gcp_conn_id,
cluster_url=self._cluster_url,
ssl_ca_cert=self._ssl_ca_cert,
Expand Down Expand Up @@ -1234,13 +1229,13 @@ def cluster_hook(self) -> GKEHook:
)

@cached_property
def hook(self) -> GKECustomResourceHook:
def hook(self) -> GKEKubernetesHook:
if self._cluster_url is None or self._ssl_ca_cert is None:
raise AttributeError(
"Cluster url and ssl_ca_cert should be defined before using self.hook method. "
"Try to use self.get_kube_creds method",
)
return GKECustomResourceHook(
return GKEKubernetesHook(
gcp_conn_id=self.gcp_conn_id,
cluster_url=self._cluster_url,
ssl_ca_cert=self._ssl_ca_cert,
Expand Down Expand Up @@ -1336,13 +1331,13 @@ def cluster_hook(self) -> GKEHook:
)

@cached_property
def hook(self) -> GKECustomResourceHook:
def hook(self) -> GKEKubernetesHook:
if self._cluster_url is None or self._ssl_ca_cert is None:
raise AttributeError(
"Cluster url and ssl_ca_cert should be defined before using self.hook method. "
"Try to use self.get_kube_creds method",
)
return GKECustomResourceHook(
return GKEKubernetesHook(
gcp_conn_id=self.gcp_conn_id,
cluster_url=self._cluster_url,
ssl_ca_cert=self._ssl_ca_cert,
Expand Down Expand Up @@ -1475,14 +1470,14 @@ def cluster_hook(self) -> GKEHook:
)

@cached_property
def hook(self) -> GKEJobHook:
def hook(self) -> GKEKubernetesHook:
if self._cluster_url is None or self._ssl_ca_cert is None:
raise AttributeError(
"Cluster url and ssl_ca_cert should be defined before using self.hook method. "
"Try to use self.get_kube_creds method",
)

return GKEJobHook(
return GKEKubernetesHook(
gcp_conn_id=self.gcp_conn_id,
cluster_url=self._cluster_url,
ssl_ca_cert=self._ssl_ca_cert,
Expand Down
5 changes: 2 additions & 3 deletions airflow/providers/google/cloud/triggers/kubernetes_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
from airflow.providers.google.cloud.hooks.kubernetes_engine import (
GKEAsyncHook,
GKEKubernetesAsyncHook,
GKEPodAsyncHook,
)
from airflow.triggers.base import BaseTrigger, TriggerEvent

Expand Down Expand Up @@ -147,8 +146,8 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
)

@cached_property
def hook(self) -> GKEPodAsyncHook: # type: ignore[override]
return GKEPodAsyncHook(
def hook(self) -> GKEKubernetesAsyncHook: # type: ignore[override]
return GKEKubernetesAsyncHook(
cluster_url=self._cluster_url,
ssl_ca_cert=self._ssl_ca_cert,
gcp_conn_id=self.gcp_conn_id,
Expand Down
11 changes: 0 additions & 11 deletions tests/deprecations_ignore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -517,25 +517,14 @@
- tests/providers/google/cloud/operators/test_dataproc.py::test_scale_cluster_operator_extra_links
- tests/providers/google/cloud/operators/test_dataproc.py::test_submit_spark_job_operator_extra_links
- tests/providers/google/cloud/operators/test_gcs.py::TestGoogleCloudStorageListOperator::test_execute__delimiter
- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEDeleteJobOperator::test_default_gcp_conn_id
- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEDeleteJobOperator::test_gcp_conn_id
- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEDescribeJobOperator::test_default_gcp_conn_id
- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEDescribeJobOperator::test_gcp_conn_id
- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperator::test_cluster_info
- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperator::test_config_file_throws_error
- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperator::test_default_gcp_conn_id
- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperator::test_execute
- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperator::test_execute_with_impersonation_service_account
- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperator::test_execute_with_impersonation_service_chain_one_element
- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperator::test_gcp_conn_id
- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperator::test_on_finish_action_handler
- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperator::test_template_fields
- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperatorAsync::test_async_create_pod_should_execute_successfully
- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEStartJobOperator::test_default_gcp_conn_id
- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEStartJobOperator::test_gcp_conn_id
- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEStartKueueInsideClusterOperator::test_execute
- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEStartKueueJobOperator::test_default_gcp_conn_id
- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEStartKueueJobOperator::test_gcp_conn_id
- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGoogleCloudPlatformContainerOperator::test_create_execute
- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGoogleCloudPlatformContainerOperator::test_create_execute_call_defer_method
- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGoogleCloudPlatformContainerOperator::test_create_execute_error_body
Expand Down
22 changes: 10 additions & 12 deletions tests/providers/google/cloud/operators/test_kubernetes_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,7 @@
KUB_OP_PATH = "airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.{}"
GKE_HOOK_MODULE_PATH = "airflow.providers.google.cloud.operators.kubernetes_engine"
GKE_HOOK_PATH = f"{GKE_HOOK_MODULE_PATH}.GKEHook"
GKE_POD_HOOK_PATH = f"{GKE_HOOK_MODULE_PATH}.GKEPodHook"
GKE_DEPLOYMENT_HOOK_PATH = f"{GKE_HOOK_MODULE_PATH}.GKEDeploymentHook"
GKE_JOB_HOOK_PATH = f"{GKE_HOOK_MODULE_PATH}.GKEJobHook"
GKE_KUBERNETES_HOOK = f"{GKE_HOOK_MODULE_PATH}.GKEKubernetesHook"
GKE_K8S_HOOK_PATH = f"{GKE_HOOK_MODULE_PATH}.GKEKubernetesHook"
KUB_OPERATOR_EXEC = "airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.execute"
KUB_JOB_OPERATOR_EXEC = "airflow.providers.cncf.kubernetes.operators.job.KubernetesJobOperator.execute"
Expand Down Expand Up @@ -502,8 +500,8 @@ def setup_test(self):
@mock.patch(TEMP_FILE)
@mock.patch(f"{GKE_CLUSTER_AUTH_DETAILS_PATH}.fetch_cluster_info")
@mock.patch(GKE_HOOK_PATH)
@mock.patch(f"{GKE_DEPLOYMENT_HOOK_PATH}.check_kueue_deployment_running")
@mock.patch(GKE_POD_HOOK_PATH)
@mock.patch(f"{GKE_KUBERNETES_HOOK}.check_kueue_deployment_running")
@mock.patch(GKE_KUBERNETES_HOOK)
def test_execute(self, mock_pod_hook, mock_deployment, mock_hook, fetch_cluster_info_mock, file_mock):
mock_pod_hook.return_value.apply_from_yaml_file.side_effect = mock.MagicMock()
fetch_cluster_info_mock.return_value = (CLUSTER_URL, SSL_CA_CERT)
Expand All @@ -515,9 +513,9 @@ def test_execute(self, mock_pod_hook, mock_deployment, mock_hook, fetch_cluster_
@mock.patch.dict(os.environ, {})
@mock.patch(TEMP_FILE)
@mock.patch(f"{GKE_CLUSTER_AUTH_DETAILS_PATH}.fetch_cluster_info")
@mock.patch(GKE_DEPLOYMENT_HOOK_PATH)
@mock.patch(GKE_KUBERNETES_HOOK)
@mock.patch(GKE_HOOK_PATH)
@mock.patch(GKE_POD_HOOK_PATH)
@mock.patch(GKE_KUBERNETES_HOOK)
def test_execute_autoscaled_cluster(
self, mock_pod_hook, mock_hook, mock_depl_hook, fetch_cluster_info_mock, file_mock, caplog
):
Expand All @@ -534,7 +532,7 @@ def test_execute_autoscaled_cluster(
@mock.patch(TEMP_FILE)
@mock.patch(f"{GKE_CLUSTER_AUTH_DETAILS_PATH}.fetch_cluster_info")
@mock.patch(GKE_HOOK_PATH)
@mock.patch(GKE_POD_HOOK_PATH)
@mock.patch(GKE_KUBERNETES_HOOK)
def test_execute_autoscaled_cluster_check_error(
self, mock_pod_hook, mock_hook, fetch_cluster_info_mock, file_mock, caplog
):
Expand All @@ -550,7 +548,7 @@ def test_execute_autoscaled_cluster_check_error(
@mock.patch(TEMP_FILE)
@mock.patch(f"{GKE_CLUSTER_AUTH_DETAILS_PATH}.fetch_cluster_info")
@mock.patch(GKE_HOOK_PATH)
@mock.patch(GKE_POD_HOOK_PATH)
@mock.patch(GKE_KUBERNETES_HOOK)
def test_execute_non_autoscaled_cluster_check_error(
self, mock_pod_hook, mock_hook, fetch_cluster_info_mock, file_mock, caplog
):
Expand Down Expand Up @@ -916,7 +914,7 @@ def setup_method(self):
@mock.patch(TEMP_FILE)
@mock.patch(f"{GKE_CLUSTER_AUTH_DETAILS_PATH}.fetch_cluster_info")
@mock.patch(GKE_HOOK_PATH)
@mock.patch(GKE_JOB_HOOK_PATH)
@mock.patch(GKE_KUBERNETES_HOOK)
def test_execute(self, mock_job_hook, mock_hook, fetch_cluster_info_mock, file_mock):
mock_job_hook.return_value.get_job.return_value = mock.MagicMock()
fetch_cluster_info_mock.return_value = (CLUSTER_URL, SSL_CA_CERT)
Expand All @@ -931,7 +929,7 @@ def test_execute(self, mock_job_hook, mock_hook, fetch_cluster_info_mock, file_m
@mock.patch(TEMP_FILE)
@mock.patch(f"{GKE_CLUSTER_AUTH_DETAILS_PATH}.fetch_cluster_info")
@mock.patch(GKE_HOOK_PATH)
@mock.patch(GKE_JOB_HOOK_PATH)
@mock.patch(GKE_KUBERNETES_HOOK)
def test_execute_with_impersonation_service_account(
self, mock_job_hook, mock_hook, fetch_cluster_info_mock, file_mock, get_con_mock
):
Expand All @@ -949,7 +947,7 @@ def test_execute_with_impersonation_service_account(
@mock.patch(TEMP_FILE)
@mock.patch(f"{GKE_CLUSTER_AUTH_DETAILS_PATH}.fetch_cluster_info")
@mock.patch(GKE_HOOK_PATH)
@mock.patch(GKE_JOB_HOOK_PATH)
@mock.patch(GKE_KUBERNETES_HOOK)
def test_execute_with_impersonation_service_chain_one_element(
self, mock_job_hook, mock_hook, fetch_cluster_info_mock, file_mock, get_con_mock
):
Expand Down

0 comments on commit b71bc89

Please sign in to comment.