Skip to content

Commit

Permalink
[AIRFLOW-6258] Add CloudFormation operators to AWS providers (apache#…
Browse files Browse the repository at this point in the history
  • Loading branch information
aviemzur committed Feb 2, 2020
1 parent 13d419a commit 63aa3db
Show file tree
Hide file tree
Showing 7 changed files with 629 additions and 0 deletions.
87 changes: 87 additions & 0 deletions airflow/providers/amazon/aws/hooks/cloud_formation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
This module contains AWS CloudFormation Hook
"""
from botocore.exceptions import ClientError

from airflow.contrib.hooks.aws_hook import AwsHook


class AWSCloudFormationHook(AwsHook):
"""
Interact with AWS CloudFormation.
"""

def __init__(self, region_name=None, *args, **kwargs):
self.region_name = region_name
self.conn = None
super().__init__(*args, **kwargs)

def get_conn(self):
if not self.conn:
self.conn = self.get_client_type('cloudformation', self.region_name)
return self.conn

def get_stack_status(self, stack_name):
"""
Get stack status from CloudFormation.
"""
cloudformation = self.get_conn()

self.log.info('Poking for stack %s', stack_name)

try:
stacks = cloudformation.describe_stacks(StackName=stack_name)['Stacks']
return stacks[0]['StackStatus']
except ClientError as e:
if 'does not exist' in str(e):
return None
else:
raise e

def create_stack(self, stack_name, params):
"""
Create stack in CloudFormation.
:param stack_name: stack_name.
:type stack_name: str
:param params: parameters to be passed to CloudFormation.
:type params: dict
"""

if 'StackName' not in params:
params['StackName'] = stack_name
self.get_conn().create_stack(**params)

def delete_stack(self, stack_name, params=None):
"""
Delete stack in CloudFormation.
:param stack_name: stack_name.
:type stack_name: str
:param params: parameters to be passed to CloudFormation (optional).
:type params: dict
"""

params = params or {}
if 'StackName' not in params:
params['StackName'] = stack_name
self.get_conn().delete_stack(**params)
102 changes: 102 additions & 0 deletions airflow/providers/amazon/aws/operators/cloud_formation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
This module contains CloudFormation create/delete stack operators.
"""
from typing import List

from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.cloud_formation import AWSCloudFormationHook
from airflow.utils.decorators import apply_defaults


class CloudFormationCreateStackOperator(BaseOperator):
"""
An operator that creates a CloudFormation stack.
:param stack_name: stack name (templated)
:type stack_name: str
:param params: parameters to be passed to CloudFormation.
.. seealso::
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/cloudformation.html#CloudFormation.Client.create_stack
:type params: dict
:param aws_conn_id: aws connection to uses
:type aws_conn_id: str
"""
template_fields: List[str] = ['stack_name']
template_ext = ()
ui_color = '#6b9659'

@apply_defaults
def __init__(
self,
stack_name,
params,
aws_conn_id='aws_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self.stack_name = stack_name
self.params = params
self.aws_conn_id = aws_conn_id

def execute(self, context):
self.log.info('Parameters: %s', self.params)

cloudformation_hook = AWSCloudFormationHook(aws_conn_id=self.aws_conn_id)
cloudformation_hook.create_stack(self.stack_name, self.params)


class CloudFormationDeleteStackOperator(BaseOperator):
"""
An operator that deletes a CloudFormation stack.
:param stack_name: stack name (templated)
:type stack_name: str
:param params: parameters to be passed to CloudFormation.
.. seealso::
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/cloudformation.html#CloudFormation.Client.delete_stack
:type params: dict
:param aws_conn_id: aws connection to uses
:type aws_conn_id: str
"""
template_fields: List[str] = ['stack_name']
template_ext = ()
ui_color = '#1d472b'
ui_fgcolor = '#FFF'

@apply_defaults
def __init__(
self,
stack_name,
params=None,
aws_conn_id='aws_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self.params = params or {}
self.stack_name = stack_name
self.params = params
self.aws_conn_id = aws_conn_id

def execute(self, context):
self.log.info('Parameters: %s', self.params)

cloudformation_hook = AWSCloudFormationHook(aws_conn_id=self.aws_conn_id)
cloudformation_hook.delete_stack(self.stack_name, self.params)
96 changes: 96 additions & 0 deletions airflow/providers/amazon/aws/sensors/cloud_formation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
This module contains sensors for AWS CloudFormation.
"""
from airflow.providers.amazon.aws.hooks.cloud_formation import AWSCloudFormationHook
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults


class CloudFormationCreateStackSensor(BaseSensorOperator):
"""
Waits for a stack to be created successfully on AWS CloudFormation.
:param stack_name: The name of the stack to wait for (templated)
:type stack_name: str
:param aws_conn_id: ID of the Airflow connection where credentials and extra configuration are
stored
:type aws_conn_id: str
:param poke_interval: Time in seconds that the job should wait between each try
:type poke_interval: int
"""

template_fields = ['stack_name']
ui_color = '#C5CAE9'

@apply_defaults
def __init__(self,
stack_name,
aws_conn_id='aws_default',
region_name=None,
*args,
**kwargs):
super().__init__(*args, **kwargs)
self.stack_name = stack_name
self.hook = AWSCloudFormationHook(aws_conn_id=aws_conn_id, region_name=region_name)

def poke(self, context):
stack_status = self.hook.get_stack_status(self.stack_name)
if stack_status == 'CREATE_COMPLETE':
return True
if stack_status in ('CREATE_IN_PROGRESS', None):
return False
raise ValueError(f'Stack {self.stack_name} in bad state: {stack_status}')


class CloudFormationDeleteStackSensor(BaseSensorOperator):
"""
Waits for a stack to be deleted successfully on AWS CloudFormation.
:param stack_name: The name of the stack to wait for (templated)
:type stack_name: str
:param aws_conn_id: ID of the Airflow connection where credentials and extra configuration are
stored
:type aws_conn_id: str
:param poke_interval: Time in seconds that the job should wait between each try
:type poke_interval: int
"""

template_fields = ['stack_name']
ui_color = '#C5CAE9'

@apply_defaults
def __init__(self,
stack_name,
aws_conn_id='aws_default',
region_name=None,
*args,
**kwargs):
super().__init__(*args, **kwargs)
self.stack_name = stack_name
self.hook = AWSCloudFormationHook(aws_conn_id=aws_conn_id, region_name=region_name)

def poke(self, context):
stack_status = self.hook.get_stack_status(self.stack_name)
if stack_status in ('DELETE_COMPLETE', None):
return True
if stack_status == 'DELETE_IN_PROGRESS':
return False
raise ValueError(f'Stack {self.stack_name} in bad state: {stack_status}')
6 changes: 6 additions & 0 deletions docs/operators-and-hooks-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,12 @@ These integrations allow you to perform various operations within the Amazon Web
- :mod:`airflow.providers.amazon.aws.operators.athena`
- :mod:`airflow.providers.amazon.aws.sensors.athena`

* - `Amazon CloudFormation <https://aws.amazon.com/cloudformation/>`__
-
- :mod:`airflow.providers.amazon.aws.hooks.cloud_formation`
- :mod:`airflow.providers.amazon.aws.operators.cloud_formation`
- :mod:`airflow.providers.amazon.aws.sensors.cloud_formation`

* - `Amazon CloudWatch Logs <https://aws.amazon.com/cloudwatch/>`__
-
- :mod:`airflow.providers.amazon.aws.hooks.logs`
Expand Down
Loading

0 comments on commit 63aa3db

Please sign in to comment.