Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Yongming Ding <dyongming@vmware.com>
  • Loading branch information
dreamtalen committed Jun 3, 2022
1 parent 5eea1b9 commit eb2a155
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 61 deletions.
14 changes: 7 additions & 7 deletions plugins/policy-recommendation/antrea_crd.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,10 @@ def to_dict(self):
class NetworkPolicyPeer(object):
attribute_types = {
"ip_block": "IPBlock",
"pod_selector": "kubernete.client.V1LabelSelector",
"namespace_selector": "kubernete.client.V1LabelSelector",
"pod_selector": "kubernetes.client.V1LabelSelector",
"namespace_selector": "kubernetes.client.V1LabelSelector",
"namespaces": "PeerNamespaces",
"external_entity_selector": "kubernete.client.V1LabelSelector",
"external_entity_selector": "kubernetes.client.V1LabelSelector",
"group": "string",
"FQDN": "string"
}
Expand Down Expand Up @@ -217,7 +217,7 @@ class Rule(object):
"to_services": "list[NamespacedName]",
"name": "string",
"enable_logging": "bool",
"applied_to": "ist[NetworkPolicyPeer]"
"applied_to": "list[NetworkPolicyPeer]"
}

def __init__(self, action=None, ports=None, _from=None, to=None, to_services=None, name=None, enable_logging=None, applied_to=None):
Expand Down Expand Up @@ -332,11 +332,11 @@ def to_dict(self):

class GroupSpec(object):
attribute_types = {
"pod_selector": "kubernete.client.V1LabelSelector",
"namespace_selector": "kubernete.client.V1LabelSelector",
"pod_selector": "kubernetes.client.V1LabelSelector",
"namespace_selector": "kubernetes.client.V1LabelSelector",
"ip_blocks": "list[IPBlock]",
"service_reference": "ServiceReference",
"external_entity_selector": "kubernete.client.V1LabelSelector",
"external_entity_selector": "kubernetes.client.V1LabelSelector",
"child_groups": "list[string]"
}

Expand Down
121 changes: 67 additions & 54 deletions plugins/policy-recommendation/policy_recommendation_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import datetime
import getopt
import json
import logging
import os
import random
import string
Expand Down Expand Up @@ -60,6 +61,14 @@
"pod-template-generation"
]

logger = logging.getLogger('policy_recommendation')
logger.setLevel(logging.INFO)
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)

def get_flow_type(destinationServicePortName, destinationPodLabels):
if destinationServicePortName != "":
return "pod_to_svc"
Expand All @@ -68,15 +77,13 @@ def get_flow_type(destinationServicePortName, destinationPodLabels):
else:
return "pod_to_external"

def remove_meaningless_labels(podLables):
def remove_meaningless_labels(podLabels):
try:
labels_dict = json.loads(podLables)
labels_dict = json.loads(podLabels)
except Exception as e:
print("Error {}: labels {} is not in json format".format(e, podLables))
logger.error("Error {}: labels {} are not in json format".format(e, podLabels))
return ""
for key in list(labels_dict):
if key in MEANINGLESS_LABELS:
labels_dict.pop(key)
labels_dict = {key:value for key,value in labels_dict.items() if key not in MEANINGLESS_LABELS}
return json.dumps(labels_dict, sort_keys=True)

def get_protocol_string(protocolIdentifier):
Expand Down Expand Up @@ -109,6 +116,7 @@ def map_flow_to_ingress(flow):
dst = ROW_DELIMITER.join([flow.destinationPodNamespace, flow.destinationPodLabels])
return (dst, (src, ""))

# Combine the ingress row (src, "") with the egress row ("", dst) to a network peer row (src, dst)
def combine_network_peers(a, b):
if a[0] != "":
new_src = a[0]
Expand Down Expand Up @@ -145,8 +153,8 @@ def generate_k8s_egress_rule(egress):
)
)
else:
print("Warning: egress tuple {} has wrong format".format(egress))
return ""
logger.fatal("Egress tuple {} has wrong format".format(egress))
sys.exit(1)
ports = kubernetes.client.V1NetworkPolicyPort(
port = int(port),
protocol = protocolIdentifier
Expand All @@ -159,8 +167,8 @@ def generate_k8s_egress_rule(egress):

def generate_k8s_ingress_rule(ingress):
if len(ingress.split(ROW_DELIMITER)) != 4:
print("Warning: ingress tuple {} has wrong format".format(ingress))
return ""
logger.fatal("Ingress tuple {} has wrong format".format(ingress))
sys.exit(1)
ns, labels, port, protocolIdentifier = ingress.split(ROW_DELIMITER)
ingress_peer = kubernetes.client.V1NetworkPolicyPeer(
namespace_selector = kubernetes.client.V1LabelSelector(
Expand Down Expand Up @@ -223,7 +231,9 @@ def generate_k8s_np(x):
policy_types = policy_types
)
)
return [dict_to_yaml(np.to_dict())]
return [dict_to_yaml(np.to_dict())]
else:
return []

def generate_anp_egress_rule(egress):
if len(egress.split(ROW_DELIMITER)) == 4:
Expand All @@ -232,7 +242,7 @@ def generate_anp_egress_rule(egress):
try:
labels_dict = json.loads(labels)
except Exception as e:
print("Error {}: labels {} in egress {} is not in json format".format(e, labels, egress))
logger.error("Error {}: labels {} in egress {} are not in json format".format(e, labels, egress))
return ""
egress_peer = antrea_crd.NetworkPolicyPeer(
namespace_selector = kubernetes.client.V1LabelSelector(
Expand Down Expand Up @@ -287,15 +297,16 @@ def generate_anp_egress_rule(egress):
]
)
else:
print("Warning: egress tuple {} has wrong format".format(egress))
logger.fatal("Egress tuple {} has wrong format".format(egress))
sys.exit(1)
return egress_rule

def generate_anp_ingress_rule(ingress):
ns, labels, port, protocolIdentifier = ingress.split(ROW_DELIMITER)
try:
labels_dict = json.loads(labels)
except Exception as e:
print("Error {}: labels {} in ingress {} is not in json format".format(e, labels, ingress))
logger.error("Error {}: labels {} in ingress {} are not in json format".format(e, labels, ingress))
return ""
ingress_peer = antrea_crd.NetworkPolicyPeer(
namespace_selector = kubernetes.client.V1LabelSelector(
Expand Down Expand Up @@ -326,7 +337,7 @@ def generate_anp(network_peers):
try:
labels_dict = json.loads(labels)
except Exception as e:
print("Error {}: labels {} in applied_to {} is not in json format".format(e, labels, applied_to))
logger.error("Error {}: labels {} in applied_to {} are not in json format".format(e, labels, applied_to))
return []
ingress_list = list(set(ingresses.split(PEER_DELIMITER)))
egress_list = list(set(egresses.split(PEER_DELIMITER)))
Expand Down Expand Up @@ -413,7 +424,7 @@ def generate_svc_acnp(x):
try:
labels_dict = json.loads(labels)
except Exception as e:
print("Error {}: labels {} in applied_to {} is not in json format".format(e, labels, applied_to))
logger.error("Error {}: labels {} in applied_to {} are not in json format".format(e, labels, applied_to))
return []
egress_list = egresses.split(PEER_DELIMITER)
egressRules = []
Expand Down Expand Up @@ -443,7 +454,9 @@ def generate_svc_acnp(x):
egress = egressRules,
)
)
return [dict_to_yaml(np.to_dict())]
return [dict_to_yaml(np.to_dict())]
else:
return []

def generate_reject_acnp(applied_to):
if not applied_to:
Expand All @@ -460,7 +473,7 @@ def generate_reject_acnp(applied_to):
try:
labels_dict = json.loads(labels)
except Exception as e:
print("Error {}: labels {} in applied_to {} is not in json format".format(e, labels, applied_to))
logger.error("Error {}: labels {} in applied_to {} are not in json format".format(e, labels, applied_to))
return []
applied_to = antrea_crd.NetworkPolicyPeer(
pod_selector = kubernetes.client.V1LabelSelector(
Expand Down Expand Up @@ -538,7 +551,7 @@ def recommend_antrea_policies(flows_df, option=1, deny_rules=True, to_services=T
svc_acnp_list = svc_acnp_rdd.collect()
if deny_rules:
if option == 1:
# Recommend deny ANPs for the applied to groups of allow policies
# Recommend deny ANPs for the appliedTo groups of allow policies
if not to_services:
applied_groups_rdd = network_peers_rdd.map(lambda x: x[0])\
.union(egress_svc_rdd.map(lambda x: x[0]))\
Expand All @@ -557,10 +570,10 @@ def recommend_antrea_policies(flows_df, option=1, deny_rules=True, to_services=T

def recommend_policies_for_unprotected_flows(unprotected_flows_df, option=1, to_services=True):
if option not in [1, 2, 3]:
print("Error: option {} is not valid".format(option))
logger.error("Error: option {} is not valid".format(option))
return []
if option == 3:
# Recommend k8s native network policies for unprotected flows
# Recommend K8s native NetworkPolicies for unprotected flows
return recommend_k8s_policies(unprotected_flows_df)
else:
return recommend_antrea_policies(unprotected_flows_df, option, True, to_services)
Expand Down Expand Up @@ -611,7 +624,7 @@ def generate_sql_query(table_name, limit, start_time, end_time, unprotected):
# Select user trusted denied flows when unprotected equals False
sql_query += " WHERE trusted == 1"
if start_time:
sql_query += " AND flowEndSeconds >= '{}'".format(start_time)
sql_query += " AND flowStartSeconds >= '{}'".format(start_time)
if end_time:
sql_query += " AND flowEndSeconds < '{}'".format(end_time)
sql_query += " GROUP BY {}".format(", ".join(FLOW_TABLE_COLUMNS))
Expand All @@ -635,11 +648,11 @@ def read_flow_df(spark, db_jdbc_address, sql_query, rm_labels):
flow_df = flow_df.withColumn('flowType', udf(get_flow_type, StringType())("destinationServicePortName", "destinationPodLabels"))
return flow_df

def write_recommendation_result(spark, result, recommendation_type, db_jdbc_address, table_name, id):
if not id:
def write_recommendation_result(spark, result, recommendation_type, db_jdbc_address, table_name, recommendation_id_input):
if not recommendation_id_input:
recommendation_id = str(uuid.uuid4())
else:
recommendation_id = id
recommendation_id = recommendation_id_input
result_dict = {
'id': recommendation_id,
'type': recommendation_type,
Expand Down Expand Up @@ -668,9 +681,9 @@ def initial_recommendation_job(spark, db_jdbc_address, table_name, limit=0, opti
table_name: Name of the table storing flow records in database.
limit: Limit on the number of flow records fetched in database. Default value is 100, setting to 0 means unlimited.
option: Option of network isolation preference in policy recommendation. Currently we have 3 options and default value is 1:
1: Recommending allow ANP/ACNP policies, with default deny rules only on applied to Pod labels which have allow rules recommended.
1: Recommending allow ANP/ACNP policies, with default deny rules only on appliedTo Pod labels which have allow rules recommended.
2: Recommending allow ANP/ACNP policies, with default deny rules for whole cluster.
3: Recommending allow K8s network policies, with no deny rules at all.
3: Recommending allow K8s NetworkPolicies, with no deny rules at all.
start_time: The start time of the flow records considered for the policy recommendation. Default value is None, which means no limit of the start time of flow records.
end_time: The end time of the flow records considered for the policy recommendation. Default value is None, which means no limit of the end time of flow records.
ns_allow_list: List of default traffic allow namespaces. Default value is Antrea CNI related namespaces.
Expand All @@ -694,9 +707,9 @@ def subsequent_recommendation_job(spark, db_jdbc_address, table_name, limit=0, o
table_name: Name of the table storing flow records in database.
limit: Limit on the number of flow records fetched in database. Default value is 100, setting to 0 means unlimited.
option: Option of network isolation preference in policy recommendation. Currently we have 3 options and default value is 1:
1: Recommending allow ANP/ACNP policies, with default deny rules only on applied to Pod labels which have allow rules recommended.
1: Recommending allow ANP/ACNP policies, with default deny rules only on appliedTo Pod labels which have allow rules recommended.
2: Recommending allow ANP/ACNP policies, with default deny rules for whole cluster.
3: Recommending allow K8s network policies, with no deny rules at all.
3: Recommending allow K8s NetworkPolicies, with no deny rules at all.
start_time: The start time of the flow records considered for the policy recommendation. Default value is None, which means no limit of the start time of flow records.
end_time: The end time of the flow records considered for the policy recommendation. Default value is None, which means no limit of the end time of flow records.
rm_labels: Remove automatically generated Pod labels including 'pod-template-hash', 'controller-revision-hash', 'pod-template-generation'.
Expand Down Expand Up @@ -725,7 +738,7 @@ def main(argv):
start_time = ""
end_time = ""
ns_allow_list = NAMESPACE_ALLOW_LIST
id = ""
recommendation_id_input = ""
rm_labels = True
to_services = True
help_message = """
Expand All @@ -739,9 +752,9 @@ def main(argv):
-l, --limit=0: The limit on the number of flow records read from the database. 0 means no limit.
-o, --option=1: Option of network isolation preference in policy recommendation.
Currently we have 3 options:
1: Recommending allow ANP/ACNP policies, with default deny rules only on applied to Pod labels which have allow rules recommended.
1: Recommending allow ANP/ACNP policies, with default deny rules only on appliedTo Pod labels which have allow rules recommended.
2: Recommending allow ANP/ACNP policies, with default deny rules for whole cluster.
3: Recommending allow K8s network policies, with no deny rules at all.
3: Recommending allow K8s NetworkPolicies, with no deny rules at all.
-s, --start_time=None: The start time of the flow records considered for the policy recommendation.
Format is YYYY-MM-DD hh:mm:ss in UTC timezone. Default value is None, which means no limit of the start time of flow records.
-e, --end_time=None: The end time of the flow records considered for the policy recommendation.
Expand All @@ -762,65 +775,65 @@ def main(argv):
try:
opts, _ = getopt.getopt(argv, "ht:d:l:o:s:e:n:i:", ["help", "type=", "db_jdbc_url=", "limit=", "option=", "start_time=", "end_time=", "ns_allow_list=", "id=", "rm_labels=", "to_services="])
except getopt.GetoptError as e:
print("getopt.getopt ERROR: {}".format(e))
print(help_message)
logger.error("ERROR of getopt.getopt: {}".format(e))
logger.info(help_message)
sys.exit(2)
option_keys = [option[0] for option in opts]
if "-h" in option_keys or "--help" in option_keys:
print(help_message)
logger.info(help_message)
sys.exit()
for opt, arg in opts:
if opt in ("-t", "--type"):
valid_types = ["initial", "subsequent"]
if arg not in valid_types:
print("Recommendation type should be in {}".format("or".join(valid_types)))
print(help_message)
logger.error("Recommendation type should be in {}".format("or".join(valid_types)))
logger.info(help_message)
sys.exit(2)
recommendation_type = arg
elif opt in ("-d", "--db_jdbc_url"):
parse_url = urlparse("arg")
if parse_url.scheme != "jdbc":
print("Please provide a valid JDBC url for ClickHouse database")
print(help_message)
logger.error("Please provide a valid JDBC url for ClickHouse database")
logger.info(help_message)
sys.exit(2)
db_jdbc_address = arg
elif opt in ("-l", "--limit"):
if not is_intstring(arg) or int(arg) < 0:
print("Limit should be an integer >= 0.")
print(help_message)
logger.error("Limit should be an integer >= 0.")
logger.info(help_message)
sys.exit(2)
limit = int(arg)
elif opt in ("-o", "--option"):
if not is_intstring(arg) or int(arg) not in [1, 2, 3]:
print("Option of network isolation preference should be 1 or 2 or 3.")
print(help_message)
logger.error("Option of network isolation preference should be 1 or 2 or 3.")
logger.info(help_message)
sys.exit(2)
option = int(arg)
elif opt in ("-s", "--start_time"):
try:
datetime.datetime.strptime(arg, "%Y-%m-%d %H:%M:%S")
except ValueError:
print("start_time should be in 'YYYY-MM-DD hh:mm:ss' format.")
print(help_message)
logger.error("start_time should be in 'YYYY-MM-DD hh:mm:ss' format.")
logger.info(help_message)
sys.exit(2)
start_time = arg
elif opt in ("-e", "--end_time"):
try:
datetime.datetime.strptime(arg, "%Y-%m-%d %H:%M:%S")
except ValueError:
print("end_time should be in 'YYYY-MM-DD hh:mm:ss' format.")
print(help_message)
logger.error("end_time should be in 'YYYY-MM-DD hh:mm:ss' format.")
logger.info(help_message)
sys.exit(2)
end_time = arg
elif opt in ("-n", "--ns_allow_list"):
arg_list = json.loads(arg)
if not isinstance(arg_list, list):
print("ns_allow_list should be a list.")
print(help_message)
logger.error("ns_allow_list should be a list.")
logger.info(help_message)
sys.exit(2)
ns_allow_list = arg_list
elif opt in ("-i", "--id"):
id = arg
recommendation_id_input = arg
elif opt in ("--rm_labels"):
if arg == "false":
rm_labels = False
Expand All @@ -831,12 +844,12 @@ def main(argv):
spark = SparkSession.builder.getOrCreate()
if recommendation_type == 'initial':
result = initial_recommendation_job(spark, db_jdbc_address, flow_table_name, limit, option, start_time, end_time, ns_allow_list, rm_labels, to_services)
recommendation_id = write_recommendation_result(spark, result, 'initial', db_jdbc_address, result_table_name, id)
print("Initial policy recommendation completed, id: {}, policy number: {}".format(recommendation_id, len(result)))
recommendation_id = write_recommendation_result(spark, result, 'initial', db_jdbc_address, result_table_name, recommendation_id_input)
logger.info("Initial policy recommendation completed, id: {}, policy number: {}".format(recommendation_id, len(result)))
else:
result = subsequent_recommendation_job(spark, db_jdbc_address, flow_table_name, limit, option, start_time, end_time, rm_labels, to_services)
recommendation_id = write_recommendation_result(spark, result, 'subsequent', db_jdbc_address, result_table_name, id)
print("Subsequent policy recommendation completed, id: {}, policy number: {}".format(recommendation_id, len(result)))
recommendation_id = write_recommendation_result(spark, result, 'subsequent', db_jdbc_address, result_table_name, recommendation_id_input)
logger.info("Subsequent policy recommendation completed, id: {}, policy number: {}".format(recommendation_id, len(result)))
spark.stop()

if __name__ == '__main__':
Expand Down

0 comments on commit eb2a155

Please sign in to comment.