Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RedshiftDeleteClusterOperator support #23563

Merged
merged 5 commits into from
May 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from airflow.models.baseoperator import chain
from airflow.providers.amazon.aws.operators.redshift_cluster import (
RedshiftCreateClusterOperator,
RedshiftDeleteClusterOperator,
RedshiftPauseClusterOperator,
RedshiftResumeClusterOperator,
)
Expand Down Expand Up @@ -80,10 +81,18 @@
)
# [END howto_operator_redshift_resume_cluster]

# [START howto_operator_redshift_delete_cluster]
task_delete_cluster = RedshiftDeleteClusterOperator(
task_id="delete_cluster",
cluster_identifier=REDSHIFT_CLUSTER_IDENTIFIER,
)
# [END howto_operator_redshift_delete_cluster]

chain(
task_create_cluster,
task_wait_cluster_available,
task_pause_cluster,
task_wait_cluster_paused,
task_resume_cluster,
task_delete_cluster,
)
64 changes: 64 additions & 0 deletions airflow/providers/amazon/aws/operators/redshift_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import time
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence

from airflow.models import BaseOperator
Expand Down Expand Up @@ -317,3 +318,66 @@ def execute(self, context: 'Context'):
self.log.warning(
"Unable to pause cluster since cluster is currently in status: %s", cluster_state
)


class RedshiftDeleteClusterOperator(BaseOperator):
"""
Delete an AWS Redshift cluster.

.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:RedshiftDeleteClusterOperator`

:param cluster_identifier: unique identifier of a cluster
:param skip_final_cluster_snapshot: determines cluster snapshot creation
:param final_cluster_snapshot_identifier: name of final cluster snapshot
:param wait_for_completion: Whether wait for cluster deletion or not
The default value is ``True``
:param aws_conn_id: aws connection to use
:param poll_interval: Time (in seconds) to wait between two consecutive calls to check cluster state
"""

template_fields: Sequence[str] = ("cluster_identifier",)
ui_color = "#eeaa11"
ui_fgcolor = "#ffffff"

def __init__(
self,
*,
cluster_identifier: str,
skip_final_cluster_snapshot: bool = True,
final_cluster_snapshot_identifier: Optional[str] = None,
wait_for_completion: bool = True,
aws_conn_id: str = "aws_default",
poll_interval: float = 30.0,
**kwargs,
):
super().__init__(**kwargs)
self.cluster_identifier = cluster_identifier
self.skip_final_cluster_snapshot = skip_final_cluster_snapshot
self.final_cluster_snapshot_identifier = final_cluster_snapshot_identifier
self.wait_for_completion = wait_for_completion
self.redshift_hook = RedshiftHook(aws_conn_id=aws_conn_id)
self.poll_interval = poll_interval

def execute(self, context: 'Context'):
self.delete_cluster()

if self.wait_for_completion:
cluster_status: str = self.check_status()
while cluster_status != "cluster_not_found":
self.log.info(
"cluster status is %s. Sleeping for %s seconds.", cluster_status, self.poll_interval
)
time.sleep(self.poll_interval)
cluster_status = self.check_status()

def delete_cluster(self) -> None:
self.redshift_hook.delete_cluster(
cluster_identifier=self.cluster_identifier,
skip_final_cluster_snapshot=self.skip_final_cluster_snapshot,
final_cluster_snapshot_identifier=self.final_cluster_snapshot_identifier,
)

def check_status(self) -> str:
return self.redshift_hook.cluster_status(self.cluster_identifier)
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,20 @@ To pause an 'available' Amazon Redshift Cluster you can use
:start-after: [START howto_operator_redshift_pause_cluster]
:end-before: [END howto_operator_redshift_pause_cluster]

.. _howto/operator:RedshiftDeleteClusterOperator:

Delete an Amazon Redshift Cluster
"""""""""""""""""""""""""""""""""

To delete an Amazon Redshift Cluster you can use
:class:`RedshiftDeleteClusterOperator <airflow.providers.amazon.aws.operators.redshift_cluster>`

.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_redshift_cluster.py
:language: python
:dedent: 4
:start-after: [START howto_operator_redshift_delete_cluster]
:end-before: [END howto_operator_redshift_delete_cluster]

Reference
^^^^^^^^^

Expand Down
34 changes: 34 additions & 0 deletions tests/providers/amazon/aws/operators/test_redshift_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from airflow.providers.amazon.aws.operators.redshift_cluster import (
RedshiftCreateClusterOperator,
RedshiftDeleteClusterOperator,
RedshiftPauseClusterOperator,
RedshiftResumeClusterOperator,
)
Expand Down Expand Up @@ -127,3 +128,36 @@ def test_pause_cluster_not_called_when_cluster_is_not_available(self, mock_get_c
)
redshift_operator.execute(None)
mock_get_conn.return_value.pause_cluster.assert_not_called()


class TestDeleteClusterOperator:
pankajastro marked this conversation as resolved.
Show resolved Hide resolved
@mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftHook.cluster_status")
@mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftHook.get_conn")
def test_delete_cluster_with_wait_for_completion(self, mock_get_conn, mock_cluster_status):
mock_cluster_status.return_value = 'cluster_not_found'
redshift_operator = RedshiftDeleteClusterOperator(
task_id="task_test", cluster_identifier="test_cluster", aws_conn_id="aws_conn_test"
)
redshift_operator.execute(None)
mock_get_conn.return_value.delete_cluster.assert_called_once_with(
ClusterIdentifier='test_cluster',
SkipFinalClusterSnapshot=True,
FinalClusterSnapshotIdentifier='',
)

@mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftHook.get_conn")
def test_delete_cluster_without_wait_for_completion(self, mock_get_conn):
redshift_operator = RedshiftDeleteClusterOperator(
task_id="task_test",
cluster_identifier="test_cluster",
aws_conn_id="aws_conn_test",
wait_for_completion=False,
)
redshift_operator.execute(None)
mock_get_conn.return_value.delete_cluster.assert_called_once_with(
ClusterIdentifier='test_cluster',
SkipFinalClusterSnapshot=True,
FinalClusterSnapshotIdentifier='',
)

mock_get_conn.return_value.cluster_status.assert_not_called()