Skip to content

Commit

Permalink
Fix:Add functionality authorize-cluster-security-group-ingress (#3742)
Browse files Browse the repository at this point in the history
* Fix:Add functionality  authorize-cluster-security-group-ingress

* Added tests

* Added more test cases
  • Loading branch information
usmangani1 authored Mar 10, 2021
1 parent 5aefbb1 commit 433e4c0
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 0 deletions.
8 changes: 8 additions & 0 deletions moto/redshift/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,3 +157,11 @@ def __init__(self, message):
super(UnknownSnapshotCopyRegionFaultError, self).__init__(
"UnknownSnapshotCopyRegionFault", message
)


class ClusterSecurityGroupNotFoundFaultError(RedshiftClientError):
def __init__(self):
super(ClusterSecurityGroupNotFoundFaultError, self).__init__(
"ClusterSecurityGroupNotFoundFault",
"The cluster security group name does not refer to an existing cluster security group.",
)
12 changes: 12 additions & 0 deletions moto/redshift/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
SnapshotCopyGrantAlreadyExistsFaultError,
SnapshotCopyGrantNotFoundFaultError,
UnknownSnapshotCopyRegionFaultError,
ClusterSecurityGroupNotFoundFaultError,
)


Expand Down Expand Up @@ -423,6 +424,7 @@ def __init__(
super(SecurityGroup, self).__init__(region_name, tags)
self.cluster_security_group_name = cluster_security_group_name
self.description = description
self.ingress_rules = []

@property
def resource_id(self):
Expand Down Expand Up @@ -749,6 +751,16 @@ def delete_cluster_security_group(self, security_group_identifier):
return self.security_groups.pop(security_group_identifier)
raise ClusterSecurityGroupNotFoundError(security_group_identifier)

def authorize_cluster_security_group_ingress(self, security_group_name, cidr_ip):
security_group = self.security_groups.get(security_group_name)
if not security_group:
raise ClusterSecurityGroupNotFoundFaultError()

# just adding the cidr_ip as ingress rule for now as there is no security rule
security_group.ingress_rules.append(cidr_ip)

return security_group

def create_cluster_parameter_group(
self,
cluster_parameter_group_name,
Expand Down
28 changes: 28 additions & 0 deletions moto/redshift/responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,34 @@ def delete_cluster_security_group(self):
}
)

def authorize_cluster_security_group_ingress(self):
cluster_security_group_name = self._get_param("ClusterSecurityGroupName")
cidr_ip = self._get_param("CIDRIP")

security_group = self.redshift_backend.authorize_cluster_security_group_ingress(
cluster_security_group_name, cidr_ip
)

return self.get_response(
{
"AuthorizeClusterSecurityGroupIngressResponse": {
"AuthorizeClusterSecurityGroupIngressResult": {
"ClusterSecurityGroup": {
"ClusterSecurityGroupName": cluster_security_group_name,
"Description": security_group.description,
"IPRanges": [
{
"Status": "authorized",
"CIDRIP": cidr_ip,
"Tags": security_group.tags,
},
],
}
}
}
}
)

def create_cluster_parameter_group(self):
cluster_parameter_group_name = self._get_param("ParameterGroupName")
group_family = self._get_param("ParameterGroupFamily")
Expand Down
52 changes: 52 additions & 0 deletions tests/test_redshift/test_redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,58 @@ def test_create_cluster_subnet_group():
set(subnet_ids).should.equal(set([subnet1.id, subnet2.id]))


@mock_redshift
def test_authorize_security_group_ingress():
iam_roles_arn = ["arn:aws:iam:::role/my-iam-role"]
client = boto3.client("redshift", region_name="us-east-1")
cluster_identifier = "my_cluster"

client.create_cluster(
ClusterIdentifier=cluster_identifier,
NodeType="single-node",
MasterUsername="username",
MasterUserPassword="password",
IamRoles=iam_roles_arn,
)

client.create_cluster_security_group(
ClusterSecurityGroupName="security_group",
Description="security_group_description",
)

response = client.authorize_cluster_security_group_ingress(
ClusterSecurityGroupName="security_group", CIDRIP="192.168.10.0/28"
)

assert (
response.get("ClusterSecurityGroup").get("ClusterSecurityGroupName")
== "security_group"
)
assert (
response.get("ClusterSecurityGroup").get("Description")
== "security_group_description"
)
assert (
response.get("ClusterSecurityGroup").get("IPRanges")[0].get("Status")
== "authorized"
)
assert (
response.get("ClusterSecurityGroup").get("IPRanges")[0].get("CIDRIP")
== "192.168.10.0/28"
)

with pytest.raises(ClientError) as ex:
client.authorize_cluster_security_group_ingress(
ClusterSecurityGroupName="invalid_security_group", CIDRIP="192.168.10.0/28"
)
assert ex.value.response["Error"]["Code"] == "ClusterSecurityGroupNotFoundFault"

assert (
ex.value.response["Error"]["Message"]
== "The cluster security group name does not refer to an existing cluster security group."
)


@mock_redshift_deprecated
@mock_ec2_deprecated
def test_create_invalid_cluster_subnet_group():
Expand Down

0 comments on commit 433e4c0

Please sign in to comment.