-
Notifications
You must be signed in to change notification settings - Fork 4
/
SecurityGroupCorrector.py
107 lines (84 loc) · 3.42 KB
/
SecurityGroupCorrector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# SecurityGroupCorrector.py || part of ZocSec.SecurityAsCode.AWS
#
# An AWS Lambda for removing security groups that expose sensitive ports to the entire Internet
#
# Owner: Copyright © 2018 Zocdoc Inc. www.zocdoc.com
# Author: Jay Ball @veggiespam
#
import boto3
from botocore.exceptions import ClientError
import json
def lambda_handler(event, context):
print("Event is: ", json.dumps(event))
source = event['detail']['eventSource']
if source != "ec2.amazonaws.com":
# wrong caller, just silently return
print("Wrong Filter on CT Events, source=", source)
return
allowed_event_list = [
'CreateSecurityGroup',
"AuthorizeSecurityGroupIngress"
]
event_name = event['detail']['eventName']
if not(event_name in allowed_event_list):
# wrong event, just silently return
print("Wrong Filter on CT Events, source=", source, " / event_name=", event_name)
return
resp = event['detail']['responseElements']
if (resp["_return"] != True):
# event was not a successful update, so we can ignore it.
print("event was not a successful update, so we can ignore it")
return
SG_id = 'invalid'
if event_name == 'CreateSecurityGroup':
SG_id = resp["groupId"]
elif event_name == 'AuthorizeSecurityGroupIngress':
SG_id = event['detail']['requestParameters']['groupId']
else:
# We shouldn't actually get here.
return
print("groupID = ", SG_id)
ec2 = boto3.resource('ec2')
security_group = ec2.SecurityGroup(SG_id)
sensitive_ports = [ 22, 3389, 54321 ] ; # your sensitive ports
ingress_list = security_group.ip_permissions
for perm in ingress_list:
# print(json.dumps(perm))
fromport=0
toport=0
ipprot=0
sensitive=False
if 'FromPort' in perm:
fromport = perm['FromPort']
if 'ToPort' in perm:
toport = perm['ToPort']
if 'IpProtocol' in perm:
ipprot = perm['IpProtocol']
if ipprot == "-1":
sensitive = True
# print("F:",fromport," T:",toport)
if fromport > 0:
for p in sensitive_ports:
if fromport <= p and p <= toport:
sensitive = True
if sensitive:
for r in perm['IpRanges']:
# this could be more complex, but 0000/0 catches 90% of the cases
if r['CidrIp'] == "0.0.0.0/0":
print("Ingress Rule violates policy, removed: ", json.dumps(perm))
try:
security_group.revoke_ingress(
CidrIp = r['CidrIp'],
IpProtocol = perm['IpProtocol'],
FromPort = fromport,
ToPort = toport,
# , DryRun = True
)
except ClientError as e:
if 'DryRunOperation' not in str(e):
print("Error: ", e)
raise
else:
print('DryRun: ', e)
print(json.dumps(ingress_list))
return