diff --git a/tests/test_acl.py b/tests/test_acl.py index ab9b1b8ac9..794162ede6 100644 --- a/tests/test_acl.py +++ b/tests/test_acl.py @@ -864,7 +864,7 @@ def check_rule_existence(self, entry, rules, verifs): return True #did not find the rule return False - + def test_InsertAclRuleBetweenPriorities(self, dvs): db = swsscommon.DBConnector(4, dvs.redis_sock, 0) adb = swsscommon.DBConnector(1, dvs.redis_sock, 0) @@ -963,6 +963,91 @@ def test_InsertAclRuleBetweenPriorities(self, dvs): # only the default table was left assert len(keys) == 1 + def test_RulesWithDiffMaskLengths(self, dvs): + db = swsscommon.DBConnector(4, dvs.redis_sock, 0) + adb = swsscommon.DBConnector(1, dvs.redis_sock, 0) + + bind_ports = ["Ethernet0", "Ethernet4"] + # create ACL_TABLE in config db + tbl = swsscommon.Table(db, "ACL_TABLE") + fvs = swsscommon.FieldValuePairs([("policy_desc", "test"), ("type", "L3"), ("ports", ",".join(bind_ports))]) + tbl.set("test_subnet", fvs) + + time.sleep(2) + + subnet_mask_rules = 0 + #create ACL rules + tbl = swsscommon.Table(db, "ACL_RULE") + rules = [ [("PRIORITY", "10"), ("PACKET_ACTION", "FORWARD"), ("SRC_IP", "23.103.0.0/18")], + [("PRIORITY", "20"), ("PACKET_ACTION", "FORWARD"), ("SRC_IP", "104.44.94.0/23")], + [("PRIORITY", "30"), ("PACKET_ACTION", "FORWARD"), ("DST_IP", "172.16.0.0/12")], + [("PRIORITY", "40"), ("PACKET_ACTION", "FORWARD"), ("DST_IP", "100.64.0.0/10")], + [("PRIORITY", "50"), ("PACKET_ACTION", "FORWARD"), ("DST_IP", "104.146.32.0/19")], + [("PRIORITY", "60"), ("PACKET_ACTION", "FORWARD"), ("SRC_IP", "21.0.0.0/8")] ] + #used to verify how ACL rules are programmed in ASICDB + #order must match the list of rules + verifs = [ {'SAI_ACL_ENTRY_ATTR_PRIORITY': '10', + 'SAI_ACL_ENTRY_ATTR_FIELD_SRC_IP': '23.103.0.0&mask:255.255.192.0', + 'SAI_ACL_ENTRY_ATTR_ACTION_PACKET_ACTION': 'SAI_PACKET_ACTION_FORWARD'}, + {'SAI_ACL_ENTRY_ATTR_PRIORITY': '20', + 'SAI_ACL_ENTRY_ATTR_FIELD_SRC_IP': '104.44.94.0&mask:255.255.254.0', + 'SAI_ACL_ENTRY_ATTR_ACTION_PACKET_ACTION': 'SAI_PACKET_ACTION_FORWARD'}, + {'SAI_ACL_ENTRY_ATTR_PRIORITY': '30', + 'SAI_ACL_ENTRY_ATTR_FIELD_DST_IP': '172.16.0.0&mask:255.240.0.0', + 'SAI_ACL_ENTRY_ATTR_ACTION_PACKET_ACTION': 'SAI_PACKET_ACTION_FORWARD'}, + {'SAI_ACL_ENTRY_ATTR_PRIORITY': '40', + 'SAI_ACL_ENTRY_ATTR_FIELD_DST_IP': '100.64.0.0&mask:255.192.0.0', + 'SAI_ACL_ENTRY_ATTR_ACTION_PACKET_ACTION': 'SAI_PACKET_ACTION_FORWARD'}, + {'SAI_ACL_ENTRY_ATTR_PRIORITY': '50', + 'SAI_ACL_ENTRY_ATTR_FIELD_DST_IP': '104.146.32.0&mask:255.255.224.0', + 'SAI_ACL_ENTRY_ATTR_ACTION_PACKET_ACTION': 'SAI_PACKET_ACTION_FORWARD'}, + {'SAI_ACL_ENTRY_ATTR_PRIORITY': '60', + 'SAI_ACL_ENTRY_ATTR_FIELD_SRC_IP': '21.0.0.0&mask:255.0.0.0', + 'SAI_ACL_ENTRY_ATTR_ACTION_PACKET_ACTION': 'SAI_PACKET_ACTION_FORWARD'} ] + #insert rules + for rule in rules: + fvs = swsscommon.FieldValuePairs(rule) + subnet_mask_rules += 1 + tbl.set( "test_subnet|acl_test_rule%s" % subnet_mask_rules, fvs ) + + time.sleep(1) + + atbl = swsscommon.Table(adb, "ASIC_STATE:SAI_OBJECT_TYPE_ACL_ENTRY") + keys = atbl.getKeys() + + acl_entry = [k for k in keys if k not in dvs.asicdb.default_acl_entries] + assert len(acl_entry) == subnet_mask_rules + + #match each entry to its corresponding verification + matched_masks = 0 + for entry in acl_entry: + (status, fvs) = atbl.get(entry) + assert status == True + assert len(fvs) == 6 + #helper function + if self.check_rule_existence(dict(fvs), rules, verifs): + matched_masks += 1 + + assert matched_masks == subnet_mask_rules + + while subnet_mask_rules > 0: + tbl._del("test_subnet|acl_test_rule%s" % subnet_mask_rules) + subnet_mask_rules -= 1 + + time.sleep(1) + + (status, fvs) = atbl.get(acl_entry[0]) + assert status == False + + tbl = swsscommon.Table(db, "ACL_TABLE") + tbl._del("test_subnet") + + time.sleep(1) + + atbl = swsscommon.Table(adb, "ASIC_STATE:SAI_OBJECT_TYPE_ACL_TABLE") + keys = atbl.getKeys() + assert len(keys) == 1 + def test_AclTableCreationOnLAGMember(self, dvs): # prepare db and tables self.clean_up_left_over(dvs) @@ -1118,4 +1203,4 @@ def test_AclTableCreationBeforeLAG(self, dvs): self.verify_acl_lag_binding(adb, lag_ids) tbl = swsscommon.Table(db, "ACL_TABLE") - tbl._del("test_LAG_2") + tbl._del("test_LAG_2") \ No newline at end of file