From eb2a15506ac6f3160c6cd58c48642721942ba26c Mon Sep 17 00:00:00 2001 From: Yongming Ding Date: Tue, 31 May 2022 17:12:23 -0700 Subject: [PATCH] Address comments Signed-off-by: Yongming Ding --- plugins/policy-recommendation/antrea_crd.py | 14 +- .../policy_recommendation_job.py | 121 ++++++++++-------- 2 files changed, 74 insertions(+), 61 deletions(-) diff --git a/plugins/policy-recommendation/antrea_crd.py b/plugins/policy-recommendation/antrea_crd.py index 293eaf21..80729c70 100644 --- a/plugins/policy-recommendation/antrea_crd.py +++ b/plugins/policy-recommendation/antrea_crd.py @@ -102,10 +102,10 @@ def to_dict(self): class NetworkPolicyPeer(object): attribute_types = { "ip_block": "IPBlock", - "pod_selector": "kubernete.client.V1LabelSelector", - "namespace_selector": "kubernete.client.V1LabelSelector", + "pod_selector": "kubernetes.client.V1LabelSelector", + "namespace_selector": "kubernetes.client.V1LabelSelector", "namespaces": "PeerNamespaces", - "external_entity_selector": "kubernete.client.V1LabelSelector", + "external_entity_selector": "kubernetes.client.V1LabelSelector", "group": "string", "FQDN": "string" } @@ -217,7 +217,7 @@ class Rule(object): "to_services": "list[NamespacedName]", "name": "string", "enable_logging": "bool", - "applied_to": "ist[NetworkPolicyPeer]" + "applied_to": "list[NetworkPolicyPeer]" } def __init__(self, action=None, ports=None, _from=None, to=None, to_services=None, name=None, enable_logging=None, applied_to=None): @@ -332,11 +332,11 @@ def to_dict(self): class GroupSpec(object): attribute_types = { - "pod_selector": "kubernete.client.V1LabelSelector", - "namespace_selector": "kubernete.client.V1LabelSelector", + "pod_selector": "kubernetes.client.V1LabelSelector", + "namespace_selector": "kubernetes.client.V1LabelSelector", "ip_blocks": "list[IPBlock]", "service_reference": "ServiceReference", - "external_entity_selector": "kubernete.client.V1LabelSelector", + "external_entity_selector": "kubernetes.client.V1LabelSelector", "child_groups": "list[string]" } diff --git a/plugins/policy-recommendation/policy_recommendation_job.py b/plugins/policy-recommendation/policy_recommendation_job.py index c7cc4f38..9f6858f3 100644 --- a/plugins/policy-recommendation/policy_recommendation_job.py +++ b/plugins/policy-recommendation/policy_recommendation_job.py @@ -17,6 +17,7 @@ import datetime import getopt import json +import logging import os import random import string @@ -60,6 +61,14 @@ "pod-template-generation" ] +logger = logging.getLogger('policy_recommendation') +logger.setLevel(logging.INFO) +ch = logging.StreamHandler() +ch.setLevel(logging.INFO) +formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') +ch.setFormatter(formatter) +logger.addHandler(ch) + def get_flow_type(destinationServicePortName, destinationPodLabels): if destinationServicePortName != "": return "pod_to_svc" @@ -68,15 +77,13 @@ def get_flow_type(destinationServicePortName, destinationPodLabels): else: return "pod_to_external" -def remove_meaningless_labels(podLables): +def remove_meaningless_labels(podLabels): try: - labels_dict = json.loads(podLables) + labels_dict = json.loads(podLabels) except Exception as e: - print("Error {}: labels {} is not in json format".format(e, podLables)) + logger.error("Error {}: labels {} are not in json format".format(e, podLabels)) return "" - for key in list(labels_dict): - if key in MEANINGLESS_LABELS: - labels_dict.pop(key) + labels_dict = {key:value for key,value in labels_dict.items() if key not in MEANINGLESS_LABELS} return json.dumps(labels_dict, sort_keys=True) def get_protocol_string(protocolIdentifier): @@ -109,6 +116,7 @@ def map_flow_to_ingress(flow): dst = ROW_DELIMITER.join([flow.destinationPodNamespace, flow.destinationPodLabels]) return (dst, (src, "")) +# Combine the ingress row (src, "") with the egress row ("", dst) to a network peer row (src, dst) def combine_network_peers(a, b): if a[0] != "": new_src = a[0] @@ -145,8 +153,8 @@ def generate_k8s_egress_rule(egress): ) ) else: - print("Warning: egress tuple {} has wrong format".format(egress)) - return "" + logger.fatal("Egress tuple {} has wrong format".format(egress)) + sys.exit(1) ports = kubernetes.client.V1NetworkPolicyPort( port = int(port), protocol = protocolIdentifier @@ -159,8 +167,8 @@ def generate_k8s_egress_rule(egress): def generate_k8s_ingress_rule(ingress): if len(ingress.split(ROW_DELIMITER)) != 4: - print("Warning: ingress tuple {} has wrong format".format(ingress)) - return "" + logger.fatal("Ingress tuple {} has wrong format".format(ingress)) + sys.exit(1) ns, labels, port, protocolIdentifier = ingress.split(ROW_DELIMITER) ingress_peer = kubernetes.client.V1NetworkPolicyPeer( namespace_selector = kubernetes.client.V1LabelSelector( @@ -223,7 +231,9 @@ def generate_k8s_np(x): policy_types = policy_types ) ) - return [dict_to_yaml(np.to_dict())] + return [dict_to_yaml(np.to_dict())] + else: + return [] def generate_anp_egress_rule(egress): if len(egress.split(ROW_DELIMITER)) == 4: @@ -232,7 +242,7 @@ def generate_anp_egress_rule(egress): try: labels_dict = json.loads(labels) except Exception as e: - print("Error {}: labels {} in egress {} is not in json format".format(e, labels, egress)) + logger.error("Error {}: labels {} in egress {} are not in json format".format(e, labels, egress)) return "" egress_peer = antrea_crd.NetworkPolicyPeer( namespace_selector = kubernetes.client.V1LabelSelector( @@ -287,7 +297,8 @@ def generate_anp_egress_rule(egress): ] ) else: - print("Warning: egress tuple {} has wrong format".format(egress)) + logger.fatal("Egress tuple {} has wrong format".format(egress)) + sys.exit(1) return egress_rule def generate_anp_ingress_rule(ingress): @@ -295,7 +306,7 @@ def generate_anp_ingress_rule(ingress): try: labels_dict = json.loads(labels) except Exception as e: - print("Error {}: labels {} in ingress {} is not in json format".format(e, labels, ingress)) + logger.error("Error {}: labels {} in ingress {} are not in json format".format(e, labels, ingress)) return "" ingress_peer = antrea_crd.NetworkPolicyPeer( namespace_selector = kubernetes.client.V1LabelSelector( @@ -326,7 +337,7 @@ def generate_anp(network_peers): try: labels_dict = json.loads(labels) except Exception as e: - print("Error {}: labels {} in applied_to {} is not in json format".format(e, labels, applied_to)) + logger.error("Error {}: labels {} in applied_to {} are not in json format".format(e, labels, applied_to)) return [] ingress_list = list(set(ingresses.split(PEER_DELIMITER))) egress_list = list(set(egresses.split(PEER_DELIMITER))) @@ -413,7 +424,7 @@ def generate_svc_acnp(x): try: labels_dict = json.loads(labels) except Exception as e: - print("Error {}: labels {} in applied_to {} is not in json format".format(e, labels, applied_to)) + logger.error("Error {}: labels {} in applied_to {} are not in json format".format(e, labels, applied_to)) return [] egress_list = egresses.split(PEER_DELIMITER) egressRules = [] @@ -443,7 +454,9 @@ def generate_svc_acnp(x): egress = egressRules, ) ) - return [dict_to_yaml(np.to_dict())] + return [dict_to_yaml(np.to_dict())] + else: + return [] def generate_reject_acnp(applied_to): if not applied_to: @@ -460,7 +473,7 @@ def generate_reject_acnp(applied_to): try: labels_dict = json.loads(labels) except Exception as e: - print("Error {}: labels {} in applied_to {} is not in json format".format(e, labels, applied_to)) + logger.error("Error {}: labels {} in applied_to {} are not in json format".format(e, labels, applied_to)) return [] applied_to = antrea_crd.NetworkPolicyPeer( pod_selector = kubernetes.client.V1LabelSelector( @@ -538,7 +551,7 @@ def recommend_antrea_policies(flows_df, option=1, deny_rules=True, to_services=T svc_acnp_list = svc_acnp_rdd.collect() if deny_rules: if option == 1: - # Recommend deny ANPs for the applied to groups of allow policies + # Recommend deny ANPs for the appliedTo groups of allow policies if not to_services: applied_groups_rdd = network_peers_rdd.map(lambda x: x[0])\ .union(egress_svc_rdd.map(lambda x: x[0]))\ @@ -557,10 +570,10 @@ def recommend_antrea_policies(flows_df, option=1, deny_rules=True, to_services=T def recommend_policies_for_unprotected_flows(unprotected_flows_df, option=1, to_services=True): if option not in [1, 2, 3]: - print("Error: option {} is not valid".format(option)) + logger.error("Error: option {} is not valid".format(option)) return [] if option == 3: - # Recommend k8s native network policies for unprotected flows + # Recommend K8s native NetworkPolicies for unprotected flows return recommend_k8s_policies(unprotected_flows_df) else: return recommend_antrea_policies(unprotected_flows_df, option, True, to_services) @@ -611,7 +624,7 @@ def generate_sql_query(table_name, limit, start_time, end_time, unprotected): # Select user trusted denied flows when unprotected equals False sql_query += " WHERE trusted == 1" if start_time: - sql_query += " AND flowEndSeconds >= '{}'".format(start_time) + sql_query += " AND flowStartSeconds >= '{}'".format(start_time) if end_time: sql_query += " AND flowEndSeconds < '{}'".format(end_time) sql_query += " GROUP BY {}".format(", ".join(FLOW_TABLE_COLUMNS)) @@ -635,11 +648,11 @@ def read_flow_df(spark, db_jdbc_address, sql_query, rm_labels): flow_df = flow_df.withColumn('flowType', udf(get_flow_type, StringType())("destinationServicePortName", "destinationPodLabels")) return flow_df -def write_recommendation_result(spark, result, recommendation_type, db_jdbc_address, table_name, id): - if not id: +def write_recommendation_result(spark, result, recommendation_type, db_jdbc_address, table_name, recommendation_id_input): + if not recommendation_id_input: recommendation_id = str(uuid.uuid4()) else: - recommendation_id = id + recommendation_id = recommendation_id_input result_dict = { 'id': recommendation_id, 'type': recommendation_type, @@ -668,9 +681,9 @@ def initial_recommendation_job(spark, db_jdbc_address, table_name, limit=0, opti table_name: Name of the table storing flow records in database. limit: Limit on the number of flow records fetched in database. Default value is 100, setting to 0 means unlimited. option: Option of network isolation preference in policy recommendation. Currently we have 3 options and default value is 1: - 1: Recommending allow ANP/ACNP policies, with default deny rules only on applied to Pod labels which have allow rules recommended. + 1: Recommending allow ANP/ACNP policies, with default deny rules only on appliedTo Pod labels which have allow rules recommended. 2: Recommending allow ANP/ACNP policies, with default deny rules for whole cluster. - 3: Recommending allow K8s network policies, with no deny rules at all. + 3: Recommending allow K8s NetworkPolicies, with no deny rules at all. start_time: The start time of the flow records considered for the policy recommendation. Default value is None, which means no limit of the start time of flow records. end_time: The end time of the flow records considered for the policy recommendation. Default value is None, which means no limit of the end time of flow records. ns_allow_list: List of default traffic allow namespaces. Default value is Antrea CNI related namespaces. @@ -694,9 +707,9 @@ def subsequent_recommendation_job(spark, db_jdbc_address, table_name, limit=0, o table_name: Name of the table storing flow records in database. limit: Limit on the number of flow records fetched in database. Default value is 100, setting to 0 means unlimited. option: Option of network isolation preference in policy recommendation. Currently we have 3 options and default value is 1: - 1: Recommending allow ANP/ACNP policies, with default deny rules only on applied to Pod labels which have allow rules recommended. + 1: Recommending allow ANP/ACNP policies, with default deny rules only on appliedTo Pod labels which have allow rules recommended. 2: Recommending allow ANP/ACNP policies, with default deny rules for whole cluster. - 3: Recommending allow K8s network policies, with no deny rules at all. + 3: Recommending allow K8s NetworkPolicies, with no deny rules at all. start_time: The start time of the flow records considered for the policy recommendation. Default value is None, which means no limit of the start time of flow records. end_time: The end time of the flow records considered for the policy recommendation. Default value is None, which means no limit of the end time of flow records. rm_labels: Remove automatically generated Pod labels including 'pod-template-hash', 'controller-revision-hash', 'pod-template-generation'. @@ -725,7 +738,7 @@ def main(argv): start_time = "" end_time = "" ns_allow_list = NAMESPACE_ALLOW_LIST - id = "" + recommendation_id_input = "" rm_labels = True to_services = True help_message = """ @@ -739,9 +752,9 @@ def main(argv): -l, --limit=0: The limit on the number of flow records read from the database. 0 means no limit. -o, --option=1: Option of network isolation preference in policy recommendation. Currently we have 3 options: - 1: Recommending allow ANP/ACNP policies, with default deny rules only on applied to Pod labels which have allow rules recommended. + 1: Recommending allow ANP/ACNP policies, with default deny rules only on appliedTo Pod labels which have allow rules recommended. 2: Recommending allow ANP/ACNP policies, with default deny rules for whole cluster. - 3: Recommending allow K8s network policies, with no deny rules at all. + 3: Recommending allow K8s NetworkPolicies, with no deny rules at all. -s, --start_time=None: The start time of the flow records considered for the policy recommendation. Format is YYYY-MM-DD hh:mm:ss in UTC timezone. Default value is None, which means no limit of the start time of flow records. -e, --end_time=None: The end time of the flow records considered for the policy recommendation. @@ -762,65 +775,65 @@ def main(argv): try: opts, _ = getopt.getopt(argv, "ht:d:l:o:s:e:n:i:", ["help", "type=", "db_jdbc_url=", "limit=", "option=", "start_time=", "end_time=", "ns_allow_list=", "id=", "rm_labels=", "to_services="]) except getopt.GetoptError as e: - print("getopt.getopt ERROR: {}".format(e)) - print(help_message) + logger.error("ERROR of getopt.getopt: {}".format(e)) + logger.info(help_message) sys.exit(2) option_keys = [option[0] for option in opts] if "-h" in option_keys or "--help" in option_keys: - print(help_message) + logger.info(help_message) sys.exit() for opt, arg in opts: if opt in ("-t", "--type"): valid_types = ["initial", "subsequent"] if arg not in valid_types: - print("Recommendation type should be in {}".format("or".join(valid_types))) - print(help_message) + logger.error("Recommendation type should be in {}".format("or".join(valid_types))) + logger.info(help_message) sys.exit(2) recommendation_type = arg elif opt in ("-d", "--db_jdbc_url"): parse_url = urlparse("arg") if parse_url.scheme != "jdbc": - print("Please provide a valid JDBC url for ClickHouse database") - print(help_message) + logger.error("Please provide a valid JDBC url for ClickHouse database") + logger.info(help_message) sys.exit(2) db_jdbc_address = arg elif opt in ("-l", "--limit"): if not is_intstring(arg) or int(arg) < 0: - print("Limit should be an integer >= 0.") - print(help_message) + logger.error("Limit should be an integer >= 0.") + logger.info(help_message) sys.exit(2) limit = int(arg) elif opt in ("-o", "--option"): if not is_intstring(arg) or int(arg) not in [1, 2, 3]: - print("Option of network isolation preference should be 1 or 2 or 3.") - print(help_message) + logger.error("Option of network isolation preference should be 1 or 2 or 3.") + logger.info(help_message) sys.exit(2) option = int(arg) elif opt in ("-s", "--start_time"): try: datetime.datetime.strptime(arg, "%Y-%m-%d %H:%M:%S") except ValueError: - print("start_time should be in 'YYYY-MM-DD hh:mm:ss' format.") - print(help_message) + logger.error("start_time should be in 'YYYY-MM-DD hh:mm:ss' format.") + logger.info(help_message) sys.exit(2) start_time = arg elif opt in ("-e", "--end_time"): try: datetime.datetime.strptime(arg, "%Y-%m-%d %H:%M:%S") except ValueError: - print("end_time should be in 'YYYY-MM-DD hh:mm:ss' format.") - print(help_message) + logger.error("end_time should be in 'YYYY-MM-DD hh:mm:ss' format.") + logger.info(help_message) sys.exit(2) end_time = arg elif opt in ("-n", "--ns_allow_list"): arg_list = json.loads(arg) if not isinstance(arg_list, list): - print("ns_allow_list should be a list.") - print(help_message) + logger.error("ns_allow_list should be a list.") + logger.info(help_message) sys.exit(2) ns_allow_list = arg_list elif opt in ("-i", "--id"): - id = arg + recommendation_id_input = arg elif opt in ("--rm_labels"): if arg == "false": rm_labels = False @@ -831,12 +844,12 @@ def main(argv): spark = SparkSession.builder.getOrCreate() if recommendation_type == 'initial': result = initial_recommendation_job(spark, db_jdbc_address, flow_table_name, limit, option, start_time, end_time, ns_allow_list, rm_labels, to_services) - recommendation_id = write_recommendation_result(spark, result, 'initial', db_jdbc_address, result_table_name, id) - print("Initial policy recommendation completed, id: {}, policy number: {}".format(recommendation_id, len(result))) + recommendation_id = write_recommendation_result(spark, result, 'initial', db_jdbc_address, result_table_name, recommendation_id_input) + logger.info("Initial policy recommendation completed, id: {}, policy number: {}".format(recommendation_id, len(result))) else: result = subsequent_recommendation_job(spark, db_jdbc_address, flow_table_name, limit, option, start_time, end_time, rm_labels, to_services) - recommendation_id = write_recommendation_result(spark, result, 'subsequent', db_jdbc_address, result_table_name, id) - print("Subsequent policy recommendation completed, id: {}, policy number: {}".format(recommendation_id, len(result))) + recommendation_id = write_recommendation_result(spark, result, 'subsequent', db_jdbc_address, result_table_name, recommendation_id_input) + logger.info("Subsequent policy recommendation completed, id: {}, policy number: {}".format(recommendation_id, len(result))) spark.stop() if __name__ == '__main__':