Skip to content

Commit

Permalink
CloudFormation sensor
Browse files Browse the repository at this point in the history
  • Loading branch information
aviemzur committed Jan 2, 2020
1 parent 0814c9c commit 51d2028
Show file tree
Hide file tree
Showing 5 changed files with 339 additions and 23 deletions.
61 changes: 41 additions & 20 deletions airflow/contrib/operators/cloudformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,19 @@
from airflow.utils.decorators import apply_defaults


class CloudFormationCreateStackOperator(BaseOperator):
class BaseCloudFormationOperator(BaseOperator):
"""
An operator that creates a CloudFormation stack.
Base operator for CloudFormation operations.
:param params: parameters to be passed to CloudFormation. For possible arguments see:
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/cloudformation.html#CloudFormation.Client.create_stack
:param params: parameters to be passed to CloudFormation.
:type dict
:param aws_conn_id: aws connection to uses
:type aws_conn_id: str
"""
template_fields: List[str] = []
template_ext = ()
ui_color = '#6b9659'
ui_color = '#1d472b'
ui_fgcolor = '#FFF'

@apply_defaults
def __init__(
Expand All @@ -51,19 +51,49 @@ def __init__(
self.aws_conn_id = aws_conn_id

def execute(self, context):
self.log.info('Creating CloudFormation stack: %s', self.params)
self.log.info('Parameters: %s', self.params)

cloudformation = CloudFormationHook(aws_conn_id=self.aws_conn_id).get_conn()
self.cloudformation_op(CloudFormationHook(aws_conn_id=self.aws_conn_id).get_conn())

def cloudformation_op(self, cloudformation):
"""
This is the main method to run CloudFormation operation.
"""
raise NotImplementedError()


class CloudFormationCreateStackOperator(BaseCloudFormationOperator):
"""
An operator that creates a CloudFormation stack.
:param params: parameters to be passed to CloudFormation. For possible arguments see:
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/cloudformation.html#CloudFormation.Client.create_stack
:type dict
:param aws_conn_id: aws connection to uses
:type aws_conn_id: str
"""
template_fields: List[str] = []
template_ext = ()
ui_color = '#6b9659'

@apply_defaults
def __init__(
self,
params,
aws_conn_id='aws_default',
*args, **kwargs):
super().__init__(params=params, aws_conn_id=aws_conn_id, *args, **kwargs)

def cloudformation_op(self, cloudformation):
cloudformation.create_stack(**self.params)


class CloudFormationDeleteStackOperator(BaseOperator):
class CloudFormationDeleteStackOperator(BaseCloudFormationOperator):
"""
An operator that deletes a CloudFormation stack.
:param params: parameters to be passed to CloudFormation. For possible arguments see:
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/cloudformation.html#CloudFormation.Client.delete_stack
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/cloudformation.html#CloudFormation.Client.delete_stack
:type dict
:param aws_conn_id: aws connection to uses
:type aws_conn_id: str
Expand All @@ -79,16 +109,7 @@ def __init__(
params,
aws_conn_id='aws_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self.params = params
self.aws_conn_id = aws_conn_id

def execute(self, context):
self.log.info('Deleting CloudFormation stack: %s', self.params)

cloudformation = CloudFormationHook(aws_conn_id=self.aws_conn_id).get_conn()
super().__init__(params=params, aws_conn_id=aws_conn_id, *args, **kwargs)

def cloudformation_op(self, cloudformation):
cloudformation.delete_stack(**self.params)

waiter = cloudformation.get_waiter('stack_delete_complete')
waiter.wait(StackName=self.params['StackName'])
162 changes: 162 additions & 0 deletions airflow/contrib/sensors/cloudformation_sensor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
This module contains sensors for AWS CloudFormation.
"""
from botocore.exceptions import ClientError

from airflow.contrib.hooks.aws_cloudformation_hook import CloudFormationHook
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults


class BaseCloudFormationSensor(BaseSensorOperator):
"""
Waits for a stack operation to complete on AWS CloudFormation.
:param stack_name: The name of the stack to wait for (templated)
:type stack_name: str
:param aws_conn_id: ID of the Airflow connection where credentials and extra configuration are
stored
:type aws_conn_id: str
:param poke_interval: Time in seconds that the job should wait between each try
:type poke_interval: int
"""

@apply_defaults
def __init__(self,
stack_name,
complete_status,
in_progress_status,
aws_conn_id='aws_default',
poke_interval=60 * 3,
*args,
**kwargs):
super().__init__(poke_interval=poke_interval, *args, **kwargs)
self.aws_conn_id = aws_conn_id
self.stack_name = stack_name
self.complete_status = complete_status
self.in_progress_status = in_progress_status
self.hook = None

def poke(self, context):
"""
Checks for existence of the stack in AWS CloudFormation.
"""
cloudformation = self.get_hook().get_conn()

self.log.info('Poking for stack %s', self.stack_name)

try:
stacks = cloudformation.describe_stacks(StackName=self.stack_name)['Stacks']
stack_status = stacks[0]['StackStatus']
if stack_status == self.complete_status:
return True
elif stack_status == self.in_progress_status:
return False
else:
raise ValueError(f'Stack {self.stack_name} in bad state: {stack_status}')
except ClientError as e:
if 'does not exist' in str(e):
if not self.allow_non_existing_stack_status():
raise ValueError(f'Stack {self.stack_name} does not exist')
else:
return True
else:
raise e

def get_hook(self):
"""
Gets the AwsGlueCatalogHook
"""
if not self.hook:
self.hook = CloudFormationHook(aws_conn_id=self.aws_conn_id)

return self.hook

def allow_non_existing_stack_status(self):
"""
Boolean value whether or not sensor should allow non existing stack responses.
"""
return False


class CloudFormationCreateStackSensor(BaseCloudFormationSensor):
"""
Waits for a stack to be created successfully on AWS CloudFormation.
:param stack_name: The name of the stack to wait for (templated)
:type stack_name: str
:param aws_conn_id: ID of the Airflow connection where credentials and extra configuration are
stored
:type aws_conn_id: str
:param poke_interval: Time in seconds that the job should wait between each try
:type poke_interval: int
"""

template_fields = ['stack_name']
ui_color = '#C5CAE9'

@apply_defaults
def __init__(self,
stack_name,
aws_conn_id='aws_default',
poke_interval=60 * 3,
*args,
**kwargs):
super().__init__(stack_name=stack_name,
complete_status='CREATE_COMPLETE',
in_progress_status='CREATE_IN_PROGRESS',
aws_conn_id=aws_conn_id,
poke_interval=poke_interval,
*args,
**kwargs)


class CloudFormationDeleteStackSensor(BaseCloudFormationSensor):
"""
Waits for a stack to be deleted successfully on AWS CloudFormation.
:param stack_name: The name of the stack to wait for (templated)
:type stack_name: str
:param aws_conn_id: ID of the Airflow connection where credentials and extra configuration are
stored
:type aws_conn_id: str
:param poke_interval: Time in seconds that the job should wait between each try
:type poke_interval: int
"""

template_fields = ['stack_name']
ui_color = '#C5CAE9'

@apply_defaults
def __init__(self,
stack_name,
aws_conn_id='aws_default',
poke_interval=60 * 3,
*args,
**kwargs):
super().__init__(stack_name=stack_name,
complete_status='DELETE_COMPLETE',
in_progress_status='DELETE_IN_PROGRESS',
aws_conn_id=aws_conn_id,
poke_interval=poke_interval, *args, **kwargs)

def allow_non_existing_stack_status(self):
return True
6 changes: 6 additions & 0 deletions docs/operators-and-hooks-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,12 @@ These integrations allow you to perform various operations within the Amazon Web
- :mod:`airflow.providers.amazon.aws.operators.athena`
- :mod:`airflow.providers.amazon.aws.sensors.athena`

* - `Amazon CloudFormation <https://aws.amazon.com/cloudformation/>`__
-
- :mod:`airflow.contrib.hooks.aws_cloudformation_hook`
- :mod:`airflow.contrib.operators.cloudformation`
- :mod:`airflow.contrib.sensors.cloudformation_sensor`

* - `Amazon CloudWatch Logs <https://aws.amazon.com/cloudwatch/>`__
-
- :mod:`airflow.contrib.hooks.aws_logs_hook`
Expand Down
6 changes: 3 additions & 3 deletions tests/contrib/operators/test_cloudformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import unittest
from unittest.mock import MagicMock, patch

from airflow import DAG
from airflow.contrib.operators.cloudformation import CloudFormationCreateStackOperator, \
CloudFormationDeleteStackOperator
from airflow.contrib.operators.cloudformation import (
CloudFormationCreateStackOperator, CloudFormationDeleteStackOperator,
)
from airflow.utils import timezone

DEFAULT_DATE = timezone.datetime(2019, 1, 1)
Expand Down
Loading

0 comments on commit 51d2028

Please sign in to comment.