Skip to content

Commit

Permalink
Add convenience method to simply add statements to a Policy
Browse files Browse the repository at this point in the history
  • Loading branch information
Brett Swift authored and Brett Swift committed Oct 25, 2018
1 parent 32377c8 commit 75082b3
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 0 deletions.
31 changes: 31 additions & 0 deletions cumulus/util/policy_mutator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import awacs
import troposphere
from awacs.aws import PolicyDocument # noqa
from troposphere.iam import Policy # noqa


class PolicyMutator:
def __init__(self):
pass

@staticmethod
def add_statement_to_policy(policy, statement):
"""
:type policy: troposphere.iam.Policy
:type statement: awacs.aws.Statement
"""
if type(policy) is not troposphere.iam.Policy:
raise AssertionError("Expected to find troposphere.iam.Policy but found: %s" % type(policy))

if not isinstance(policy.PolicyDocument, awacs.aws.PolicyDocument):
msg = "Expected policy.PolicyDocument to be awacs.aws.PolicyDocument but found: %s" \
% type(policy.PolicyDocument)
raise AssertionError(msg)

if not isinstance(statement, awacs.aws.Statement):
msg = "Expected statement to be awacs.aws.Statement but found: %s " % type(statement)
raise AssertionError(msg)

policy.PolicyDocument.Statement.append(statement)
return policy
97 changes: 97 additions & 0 deletions tests/unit/util/test_policy_mutator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# try:
# #python 3
# from unittest.mock import patch
# except:
# #python 2
# from mock import patch

import unittest

import awacs
from awacs import aws # noqa
import troposphere
from troposphere import iam

from cumulus.util.policy_mutator import PolicyMutator

DEFAULT_STATEMENT_NAME = "DefaultStatement"


class TestPolicyMutator(unittest.TestCase):

def setUp(self):
pass
self.simple_policy = iam.Policy(
PolicyName="TestPolicy",
PolicyDocument=awacs.aws.PolicyDocument(
Version="2012-10-17",
Id="PipelinePolicy",
Statement=[
awacs.aws.Statement(
Sid=("%s" % DEFAULT_STATEMENT_NAME),
Effect=awacs.aws.Allow,
Action=[awacs.aws.Action("s3", "*")],
Resource=['*'],
),
],
)
)

self.dummy_statement = awacs.aws.Statement()

def tearDown(self):
pass
del self.simple_policy

def test_should_raise_assertion_error_on_wrong_policy_type(self):
policy = "not what you want"
self.assertRaises(
AssertionError,
PolicyMutator.add_statement_to_policy,
policy,
self.dummy_statement,
)

def test_should_raise_assertion_error_if_policydocument_is_not_awacs(self):
policy = troposphere.iam.Policy(
PolicyDocument={}
)
self.assertRaises(
AssertionError,
PolicyMutator.add_statement_to_policy,
policy,
self.dummy_statement,
)

def test_should_raise_assertion_error_on_wrong_statement_type(self):
policy = self.simple_policy
self.assertRaises(
AssertionError,
PolicyMutator.add_statement_to_policy,
policy,
{"statment": "is wrong"},
)

def test_should_add_statement_to_existing_policy(self):
pass
policy = self.simple_policy
lambda_policy_name = 'LambdaPolicy'
statement = awacs.aws.Statement(
Sid=('%s' % lambda_policy_name),
Effect=awacs.aws.Allow,
Action=[
awacs.aws.Action("lambda", "*")
],
Resource=["*"]
)

found_default = filter(lambda x: x.Sid == DEFAULT_STATEMENT_NAME, policy.PolicyDocument.Statement)
self.assertTrue(found_default, "Did not find the statement I was looking for")
self.assertIsInstance(found_default[0], awacs.aws.Statement)

policy = PolicyMutator.add_statement_to_policy(policy, statement)

# self.assertFalse(policy.PolicyDocument.Statement)
found_sut = filter(lambda x: x.Sid == lambda_policy_name, policy.PolicyDocument.Statement)
self.assertTrue(found_sut, "Did not find the statement I was looking for")
self.assertIsInstance(found_default[0], awacs.aws.Statement)

0 comments on commit 75082b3

Please sign in to comment.