diff --git a/tests/conftest.py b/tests/conftest.py index f8d6202eb85e..8793c086e369 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1494,6 +1494,8 @@ def dvs_acl(request, dvs) -> DVSAcl: dvs.get_state_db(), dvs.get_counters_db()) +# FIXME: The rest of these also need to be reverted back to normal fixtures to +# appease the linter. @pytest.yield_fixture(scope="class") def dvs_lag_manager(request, dvs): request.cls.dvs_lag = dvs_lag.DVSLag(dvs.get_asic_db(), diff --git a/tests/dvslib/dvs_acl.py b/tests/dvslib/dvs_acl.py index 6016cce7836d..914e9ed40a70 100644 --- a/tests/dvslib/dvs_acl.py +++ b/tests/dvslib/dvs_acl.py @@ -31,6 +31,11 @@ class DVSAcl: "egress": "SAI_ACL_ENTRY_ATTR_ACTION_MIRROR_EGRESS" } + ADB_PORT_ATTR_LOOKUP = { + "ingress": "SAI_PORT_ATTR_INGRESS_ACL", + "egress": "SAI_PORT_ATTR_EGRESS_ACL" + } + def __init__(self, asic_db, config_db, state_db, counters_db): """Create a new DVS ACL Manager.""" self.asic_db = asic_db @@ -64,6 +69,25 @@ def create_acl_table( self.config_db.create_entry(self.CDB_ACL_TABLE_NAME, table_name, table_attrs) + def create_control_plane_acl_table( + self, + table_name: str, + services: List[str] + ) -> None: + """Create a new Control Plane ACL table in Config DB. + + Args: + table_name: The name for the new ACL table. + services: A list of services to bind to the ACL table. + """ + table_attrs = { + "policy_desc": table_name, + "type": "CTRLPLANE", + "services": ",".join(services) + } + + self.config_db.create_entry(self.CDB_ACL_TABLE_NAME, table_name, table_attrs) + def update_acl_table_port_list(self, table_name: str, ports: List[str]) -> None: """Update the port binding list for a given ACL table. @@ -172,13 +196,20 @@ def verify_acl_table_group_members(self, acl_table_id: str, acl_table_group_ids: assert set(member_groups) == set(acl_table_group_ids) - def verify_acl_table_port_binding(self, acl_table_id: str, bind_ports: List[str], num_tables: int) -> None: + def verify_acl_table_port_binding( + self, + acl_table_id: str, + bind_ports: List[str], + num_tables: int, + stage: str = "ingress" + ) -> None: """Verify that the ACL table has been bound to the given list of ports. Args: acl_table_id: The ACL table that is being checked. bind_ports: The ports that should be bound to the given ACL table. num_tables: The total number of ACL tables in ASIC DB. + stage: The stage of the ACL table that was created. """ acl_table_group_ids = self.asic_db.wait_for_n_keys(self.ADB_ACL_GROUP_TABLE_NAME, len(bind_ports)) @@ -187,7 +218,7 @@ def verify_acl_table_port_binding(self, acl_table_id: str, bind_ports: List[str] port_oid = self.counters_db.get_entry("COUNTERS_PORT_NAME_MAP", "").get(port) fvs = self.asic_db.wait_for_entry("ASIC_STATE:SAI_OBJECT_TYPE_PORT", port_oid) - acl_table_group_id = fvs.pop("SAI_PORT_ATTR_INGRESS_ACL", None) + acl_table_group_id = fvs.pop(self.ADB_PORT_ATTR_LOOKUP[stage], None) assert acl_table_group_id in acl_table_group_ids port_groups.append(acl_table_group_id) diff --git a/tests/dvslib/dvs_database.py b/tests/dvslib/dvs_database.py index 5289695b572c..e5d9f99fbbbe 100644 --- a/tests/dvslib/dvs_database.py +++ b/tests/dvslib/dvs_database.py @@ -1,4 +1,10 @@ -"""Utilities for interacting with redis when writing VS tests.""" +"""Utilities for interacting with redis when writing VS tests. + +FIXME: + - Reference DBs by name rather than ID/socket + - Move DEFAULT_POLLING_CONFIG to Common + - Add support for ProducerStateTable +""" from typing import Dict, List from swsscommon import swsscommon from dvslib.dvs_common import wait_for_result, PollingConfig diff --git a/tests/test_acl.py b/tests/test_acl.py index 192f853f849f..281572174c58 100644 --- a/tests/test_acl.py +++ b/tests/test_acl.py @@ -11,7 +11,7 @@ L3V6_RULE_NAME = "L3V6_TEST_RULE" -class TestAcl(): +class TestAcl: @pytest.yield_fixture def l3_acl_table(self, dvs_acl): try: @@ -57,6 +57,7 @@ def test_AclTableCreationDeletion(self, dvs_acl): acl_table_id = dvs_acl.get_acl_table_ids(1)[0] acl_table_group_ids = dvs_acl.get_acl_table_group_ids(len(L3_BIND_PORTS)) + dvs_acl.verify_acl_table_group_members(acl_table_id, acl_table_group_ids, 1) dvs_acl.verify_acl_table_port_binding(acl_table_id, L3_BIND_PORTS, 1) finally: @@ -413,7 +414,7 @@ def test_AclRuleRedirect(self, dvs, dvs_acl, l3_acl_table, setup_teardown_neighb dvs_acl.verify_no_acl_rules() -class TestAclRuleValidation(): +class TestAclRuleValidation: """Test class for cases that check if orchagent corectly validates ACL rules input.""" SWITCH_CAPABILITY_TABLE = "SWITCH_CAPABILITY" diff --git a/tests/test_acl_ctrl.py b/tests/test_acl_ctrl.py index 96b9cd8f932a..7d8eea315366 100644 --- a/tests/test_acl_ctrl.py +++ b/tests/test_acl_ctrl.py @@ -1,74 +1,24 @@ -import time -import pytest +TABLE_NAME = "CTRL_ACL_TEST" +RULE_NAME = "CTRL_ACL_TEST_RULE" -from swsscommon import swsscommon +class TestPortChannelAcl: + def test_AclCtrl(self, dvs_acl): + # Create ACL table and ACL rule + dvs_acl.create_control_plane_acl_table(TABLE_NAME, ["SNMP"]) + dvs_acl.create_acl_rule(TABLE_NAME, RULE_NAME, {"L4_SRC_PORT": "8888"}, priority="88") -class TestPortChannelAcl(object): - def setup_db(self, dvs): - self.pdb = swsscommon.DBConnector(0, dvs.redis_sock, 0) - self.adb = swsscommon.DBConnector(1, dvs.redis_sock, 0) - self.cdb = swsscommon.DBConnector(4, dvs.redis_sock, 0) + # Verify that no ASIC rules are created + dvs_acl.verify_acl_table_count(0) + dvs_acl.verify_no_acl_rules() - def create_acl_table(self, dvs): - tbl = swsscommon.Table(self.cdb, "ACL_TABLE") - fvs = swsscommon.FieldValuePairs([("POLICY_DESC", "CTRL_ACL_TEST"), - ("TYPE", "CTRLPLANE"), - ("SERVICES@", "SNMP")]) - tbl.set("CTRL_ACL_TABLE", fvs) - time.sleep(1) - - def remove_acl_table(self, dvs): - tbl = swsscommon.Table(self.cdb, "ACL_TABLE") - tbl._del("CTRL_ACL_TABLE") - time.sleep(1) - - def create_acl_rule(self, dvs): - tbl = swsscommon.Table(self.cdb, "ACL_RULE") - fvs = swsscommon.FieldValuePairs([("PRIORITY", "88"), - ("PACKET_ACTION", "FORWARD"), - ("L4_SRC_PORT", "8888")]) - tbl.set("CTRL_ACL_TABLE|CTRL_ACL_RULE", fvs) - time.sleep(1) - - def remove_acl_rule(self, dvs): - tbl = swsscommon.Table(self.cdb, "ACL_RULE") - tbl._del("CTRL_ACL_TABLE|CTRL_ACL_RULE") - time.sleep(1) - - def check_asic_table_absent(self, dvs): - tbl = swsscommon.Table(self.adb, "ASIC_STATE:SAI_OBJECT_TYPE_ACL_TABLE") - acl_tables = tbl.getKeys() - for key in dvs.asicdb.default_acl_tables: - assert key in acl_tables - acl_tables = [k for k in acl_tables if k not in dvs.asicdb.default_acl_tables] - - assert len(acl_tables) == 0 - - def check_asic_rule_absent(self, dvs): - tbl = swsscommon.Table(self.adb, "ASIC_STATE:SAI_OBJECT_TYPE_ACL_ENTRY") - acl_entries = tbl.getKeys() - for key in dvs.asicdb.default_acl_entries: - assert key in acl_entries - acl_entries = [k for k in acl_entries if k not in dvs.asicdb.default_acl_entries] - - assert len(acl_entries) == 0 - - def test_AclCtrl(self, dvs): - self.setup_db(dvs) - - # create ACL table and ACL rule - self.create_acl_table(dvs) - self.create_acl_rule(dvs) - - # check ASIC table - self.check_asic_table_absent(dvs) - self.check_asic_rule_absent(dvs) - - # remove ACL table - self.remove_acl_table(dvs) - self.remove_acl_rule(dvs) + # Cleanup ACL data from Config DB + dvs_acl.remove_acl_rule(TABLE_NAME, RULE_NAME) + dvs_acl.remove_acl_table(TABLE_NAME) + # Verify that the ASIC DB is clean + dvs_acl.verify_acl_table_count(0) + dvs_acl.verify_no_acl_rules() # Add Dummy always-pass test at end as workaroud diff --git a/tests/test_acl_egress_table.py b/tests/test_acl_egress_table.py index 9341a7738d9e..baab97bbbf59 100644 --- a/tests/test_acl_egress_table.py +++ b/tests/test_acl_egress_table.py @@ -1,345 +1,141 @@ -import time import pytest -from swsscommon import swsscommon - - -class TestEgressAclTable(object): - def setup_db(self, dvs): - self.pdb = swsscommon.DBConnector(0, dvs.redis_sock, 0) - self.adb = swsscommon.DBConnector(1, dvs.redis_sock, 0) - self.cdb = swsscommon.DBConnector(4, dvs.redis_sock, 0) - - def create_egress_acl_table(self, table_name, ports): - tbl = swsscommon.Table(self.cdb, "ACL_TABLE") - - fvs = swsscommon.FieldValuePairs([("POLICY_DESC", "EGRESS_ACL_TEST"), - ("TYPE", "L3"), - ("PORTS", ports), - ("stage", "EGRESS")]) - tbl.set(table_name, fvs) - time.sleep(1) - - def create_acl_rule(self, fv_pairs, rule_name): - rule_tbl = swsscommon.Table(self.cdb, "ACL_RULE") - fvs = swsscommon.FieldValuePairs(fv_pairs) - rule_tbl.set("egress_acl_table|" + rule_name, fvs) - time.sleep(1) - - def remove_acl_table(self, table_name): - tbl = swsscommon.Table(self.cdb, "ACL_TABLE") - tbl._del(table_name) - time.sleep(1) - - def get_acl_table_id(self, dvs): - atbl = swsscommon.Table(self.adb, "ASIC_STATE:SAI_OBJECT_TYPE_ACL_TABLE") - keys = atbl.getKeys() - for k in dvs.asicdb.default_acl_tables: - assert k in keys - acl_tables = [k for k in keys if k not in dvs.asicdb.default_acl_tables] - - assert len(acl_tables) == 1 - - return acl_tables[0] - - def remove_acl_rule(self, table_name, rule_name): - tbl = swsscommon.Table(self.cdb, "ACL_RULE") - tbl._del(table_name + "|" + rule_name) - time.sleep(1) - - def verify_acl_asic_table(self, dvs, bind_ports): - tbl = swsscommon.Table(self.adb, "ASIC_STATE:SAI_OBJECT_TYPE_ACL_TABLE_GROUP") - acl_table_groups = tbl.getKeys() - assert len(acl_table_groups) == len(bind_ports) - - tbl = swsscommon.Table(self.adb, "ASIC_STATE:SAI_OBJECT_TYPE_PORT") - port_groups = [] - for p in [dvs.asicdb.portnamemap[portname] for portname in bind_ports]: - (status, fvs) = tbl.get(p) - for fv in fvs: - if fv[0] == "SAI_PORT_ATTR_EGRESS_ACL": - assert fv[1] in acl_table_groups - port_groups.append(fv[1]) - - assert len(port_groups) == len(bind_ports) - assert set(port_groups) == set(acl_table_groups) - - tbl = swsscommon.Table(self.adb, "ASIC_STATE:SAI_OBJECT_TYPE_ACL_TABLE_GROUP") - for port_group in port_groups: - (status, fvs) = tbl.get(port_group) - assert status == True - assert len(fvs) == 3 - for fv in fvs: - if fv[0] == "SAI_ACL_TABLE_GROUP_ATTR_ACL_STAGE": - assert fv[1] == "SAI_ACL_STAGE_EGRESS" - elif fv[0] == "SAI_ACL_TABLE_GROUP_ATTR_ACL_BIND_POINT_TYPE_LIST": - assert fv[1] == "1:SAI_ACL_BIND_POINT_TYPE_PORT" - elif fv[0] == "SAI_ACL_TABLE_GROUP_ATTR_TYPE": - assert fv[1] == "SAI_ACL_TABLE_GROUP_TYPE_PARALLEL" - else: - assert False - - tbl = swsscommon.Table(self.adb, "ASIC_STATE:SAI_OBJECT_TYPE_ACL_TABLE_GROUP_MEMBER") - member = tbl.getKeys()[0] - (status, fvs) = tbl.get(member) - assert status == True - assert len(fvs) == 3 - for fv in fvs: - if fv[0] == "SAI_ACL_TABLE_GROUP_MEMBER_ATTR_ACL_TABLE_GROUP_ID": - assert fv[1] in port_groups - elif fv[0] == "SAI_ACL_TABLE_GROUP_MEMBER_ATTR_ACL_TABLE_ID": - table_id = fv[1] - elif fv[0] == "SAI_ACL_TABLE_GROUP_MEMBER_ATTR_PRIORITY": - assert fv[1] == "100" - else: - assert False - - tbl = swsscommon.Table(self.adb, "ASIC_STATE:SAI_OBJECT_TYPE_ACL_TABLE") - (status, fvs) = tbl.get(table_id) - assert status == True - - def verify_acl_rule_asic_fvs(self, dvs, fv_tuple): - # Verify Acl entry in ASIC DB - test_acl_table_id = self.get_acl_table_id(dvs) - acl_tbl = swsscommon.Table(self.adb, "ASIC_STATE:SAI_OBJECT_TYPE_ACL_ENTRY") - keys = acl_tbl.getKeys() - - acl_entry = [k for k in keys if k not in dvs.asicdb.default_acl_entries] - assert len(acl_entry) == 1 - - (status, fvs) = acl_tbl.get(acl_entry[0]) - assert status == True - assert len(fvs) == 6 - for fv in fvs: - if fv[0] == "SAI_ACL_ENTRY_ATTR_TABLE_ID": - assert fv[1] == test_acl_table_id - elif fv[0] == "SAI_ACL_ENTRY_ATTR_ADMIN_STATE": - assert fv[1] == "true" - elif fv[0] == "SAI_ACL_ENTRY_ATTR_PRIORITY": - assert fv[1] == "1000" - elif fv[0] == "SAI_ACL_ENTRY_ATTR_ACTION_COUNTER": - assert True - elif fv[0] == fv_tuple[0]: - assert fv[1] == fv_tuple[1] - elif fv[0] == "SAI_ACL_ENTRY_ATTR_ACTION_PACKET_ACTION": - assert fv[1] == "SAI_PACKET_ACTION_DROP" - else: - assert False - - def verify_acl_rule_with_L4PortRange_asic_fvs(self, dvs, fv_tuple): - # Verify Acl entry in ASIC DB - test_acl_table_id = self.get_acl_table_id(dvs) - acl_tbl = swsscommon.Table(self.adb, "ASIC_STATE:SAI_OBJECT_TYPE_ACL_ENTRY") - keys = acl_tbl.getKeys() - - acl_entry = [k for k in keys if k not in dvs.asicdb.default_acl_entries] - assert len(acl_entry) == 1 - - (status, fvs) = acl_tbl.get(acl_entry[0]) - assert status == True - assert len(fvs) == 6 - for fv in fvs: - if fv[0] == "SAI_ACL_ENTRY_ATTR_TABLE_ID": - assert fv[1] == test_acl_table_id - elif fv[0] == "SAI_ACL_ENTRY_ATTR_ADMIN_STATE": - assert fv[1] == "true" - elif fv[0] == "SAI_ACL_ENTRY_ATTR_PRIORITY": - assert fv[1] == "999" - elif fv[0] == "SAI_ACL_ENTRY_ATTR_ACTION_COUNTER": - assert True - elif fv[0] == "SAI_ACL_ENTRY_ATTR_FIELD_ACL_RANGE_TYPE": - aclrange = fv[1] - elif fv[0] == "SAI_ACL_ENTRY_ATTR_ACTION_PACKET_ACTION": - assert fv[1] == "SAI_PACKET_ACTION_FORWARD" - else: - assert False - - # Verify Acl range in ASIC DB - acl_tbl = swsscommon.Table(self.adb, "ASIC_STATE:SAI_OBJECT_TYPE_ACL_RANGE") - aclrange_obj = aclrange.split(":", 1)[1] - - (status, fvs) = acl_tbl.get(aclrange_obj) - assert status == True - assert len(fvs) == 2 - for fv in fvs: - if fv[0] == "SAI_ACL_RANGE_ATTR_TYPE": - assert fv[1] == fv_tuple[0] - elif fv[0] == "SAI_ACL_RANGE_ATTR_LIMIT": - assert fv[1] == fv_tuple[1] - else: - assert False - - def check_asic_table_absent(self, dvs): - tbl = swsscommon.Table(self.adb, "ASIC_STATE:SAI_OBJECT_TYPE_ACL_TABLE") - acl_tables = tbl.getKeys() - for key in dvs.asicdb.default_acl_tables: - assert key in acl_tables - acl_tables = [k for k in acl_tables if k not in dvs.asicdb.default_acl_tables] - - assert len(acl_tables) == 0 - - def check_asic_rule_absent(self, dvs): - tbl = swsscommon.Table(self.adb, "ASIC_STATE:SAI_OBJECT_TYPE_ACL_ENTRY") - acl_entries = tbl.getKeys() - for key in dvs.asicdb.default_acl_entries: - assert key in acl_entries - acl_entries = [k for k in acl_entries if k not in dvs.asicdb.default_acl_entries] - - assert len(acl_entries) == 0 - - def test_EgressAclTableCreation(self, dvs): - self.setup_db(dvs) - - # Create ACL_TABLE in config db - bind_ports = ["Ethernet0", "Ethernet4"] - self.create_egress_acl_table("egress_acl_table", ",".join(bind_ports)) - - time.sleep(1) - - # Check acl table in asic db - self.verify_acl_asic_table(dvs, bind_ports) - - def test_EgressAclRuleL4SrcPortRange(self, dvs): - self.setup_db(dvs) - - # Create L4 SrcPortRange Acl rule - fvPairs = [("priority", "999"), ("PACKET_ACTION", "FORWARD"), ("L4_SRC_PORT_RANGE", "0-1001")] - self.create_acl_rule(fvPairs, "L4SrcPortRange_rule") - - # Verify Acl rule in ASIC DB - fv_tuple = ("SAI_ACL_RANGE_TYPE_L4_SRC_PORT_RANGE", "0,1001") - self.verify_acl_rule_with_L4PortRange_asic_fvs(dvs, fv_tuple) - - # Remove Acl rule - self.remove_acl_rule("egress_acl_table", "L4SrcPortRange_rule") - self.check_asic_rule_absent(dvs) - - def test_EgressAclRuleL4DstPortRange(self, dvs): - self.setup_db(dvs) - - # Create L4 DstPortRange Acl rule - fvPairs = [("priority", "999"), ("PACKET_ACTION", "FORWARD"), ("L4_DST_PORT_RANGE", "1003-6666")] - self.create_acl_rule(fvPairs, "L4DstPortRange_rule") - - # Verify Acl rule in ASIC DB - fv_tuple = ("SAI_ACL_RANGE_TYPE_L4_DST_PORT_RANGE", "1003,6666") - self.verify_acl_rule_with_L4PortRange_asic_fvs(dvs, fv_tuple) - - # Remove Acl rule - self.remove_acl_rule("egress_acl_table", "L4DstPortRange_rule") - self.check_asic_rule_absent(dvs) - - def test_EgressAclRuleL2EthType(self, dvs): - self.setup_db(dvs) - - # Create L4 L2EthType Acl rule - fvPairs = [("priority", "1000"), ("PACKET_ACTION", "DROP"), ("ETHER_TYPE", "8000")] - self.create_acl_rule(fvPairs, "L2EthType_rule") - - # Verify Acl rule in ASIC DB - fv_tuple = ("SAI_ACL_ENTRY_ATTR_FIELD_ETHER_TYPE", "8000&mask:0xffff") - self.verify_acl_rule_asic_fvs( dvs, fv_tuple) - - # Remove Acl rule - self.remove_acl_rule("egress_acl_table", "L2EthType_rule") - self.check_asic_rule_absent(dvs) - - def test_EgressAclRuleTunnelVNI(self, dvs): - self.setup_db(dvs) +TABLE_TYPE = "L3" +TABLE_NAME = "EGRESS_TEST" +BIND_PORTS = ["Ethernet0", "Ethernet4"] +RULE_NAME = "EGRESS_TEST_RULE" + + +class TestEgressAclTable: + @pytest.yield_fixture + def egress_acl_table(self, dvs_acl): + try: + dvs_acl.create_acl_table(TABLE_NAME, TABLE_TYPE, BIND_PORTS, stage="egress") + yield dvs_acl.get_acl_table_ids(1)[0] + finally: + dvs_acl.remove_acl_table(TABLE_NAME) + dvs_acl.verify_acl_table_count(0) + + def test_EgressAclTableCreationDeletion(self, dvs_acl): + try: + dvs_acl.create_acl_table(TABLE_NAME, TABLE_TYPE, BIND_PORTS, stage="egress") + + acl_table_id = dvs_acl.get_acl_table_ids(1)[0] + acl_table_group_ids = dvs_acl.get_acl_table_group_ids(len(BIND_PORTS)) + + dvs_acl.verify_acl_table_group_members(acl_table_id, acl_table_group_ids, 1) + dvs_acl.verify_acl_table_port_binding(acl_table_id, BIND_PORTS, 1, stage="egress") + finally: + dvs_acl.remove_acl_table(TABLE_NAME) + dvs_acl.verify_acl_table_count(0) + + def test_EgressAclRuleL4SrcPortRange(self, dvs_acl, egress_acl_table): + config_qualifiers = {"L4_SRC_PORT_RANGE": "0-1001"} + expected_sai_qualifiers = { + "SAI_ACL_ENTRY_ATTR_FIELD_ACL_RANGE_TYPE": dvs_acl.get_acl_range_comparator("SAI_ACL_RANGE_TYPE_L4_SRC_PORT_RANGE", "0,1001") + } + + dvs_acl.create_acl_rule(TABLE_NAME, RULE_NAME, config_qualifiers, priority="999") + dvs_acl.verify_acl_rule(expected_sai_qualifiers, priority="999") + + dvs_acl.remove_acl_rule(TABLE_NAME, RULE_NAME) + dvs_acl.verify_no_acl_rules() + + def test_EgressAclRuleL4DstPortRange(self, dvs_acl, egress_acl_table): + config_qualifiers = {"L4_DST_PORT_RANGE": "1003-6666"} + expected_sai_qualifiers = { + "SAI_ACL_ENTRY_ATTR_FIELD_ACL_RANGE_TYPE": dvs_acl.get_acl_range_comparator("SAI_ACL_RANGE_TYPE_L4_DST_PORT_RANGE", "1003,6666") + } + + dvs_acl.create_acl_rule(TABLE_NAME, RULE_NAME, config_qualifiers, priority="999") + dvs_acl.verify_acl_rule(expected_sai_qualifiers, priority="999") + + dvs_acl.remove_acl_rule(TABLE_NAME, RULE_NAME) + dvs_acl.verify_no_acl_rules() + + def test_EgressAclRuleL2EthType(self, dvs_acl, egress_acl_table): + config_qualifiers = {"ETHER_TYPE": "8000"} + expected_sai_qualifiers = { + "SAI_ACL_ENTRY_ATTR_FIELD_ETHER_TYPE": dvs_acl.get_simple_qualifier_comparator("8000&mask:0xffff") + } + + dvs_acl.create_acl_rule(TABLE_NAME, RULE_NAME, config_qualifiers, action="DROP", priority="1000") + dvs_acl.verify_acl_rule(expected_sai_qualifiers, action="DROP", priority="1000") + + dvs_acl.remove_acl_rule(TABLE_NAME, RULE_NAME) + dvs_acl.verify_no_acl_rules() + + def test_EgressAclRuleTunnelVNI(self, dvs_acl, egress_acl_table): + config_qualifiers = {"TUNNEL_VNI": "5000"} + expected_sai_qualifiers = { + "SAI_ACL_ENTRY_ATTR_FIELD_TUNNEL_VNI": dvs_acl.get_simple_qualifier_comparator("5000&mask:0xffffffff") + } + + dvs_acl.create_acl_rule(TABLE_NAME, RULE_NAME, config_qualifiers, action="DROP", priority="1000") + dvs_acl.verify_acl_rule(expected_sai_qualifiers, action="DROP", priority="1000") + + dvs_acl.remove_acl_rule(TABLE_NAME, RULE_NAME) + dvs_acl.verify_no_acl_rules() + + def test_EgressAclRuleTC(self, dvs_acl, egress_acl_table): + config_qualifiers = {"TC": "1"} + expected_sai_qualifiers = { + "SAI_ACL_ENTRY_ATTR_FIELD_TC": dvs_acl.get_simple_qualifier_comparator("1&mask:0xff") + } + + dvs_acl.create_acl_rule(TABLE_NAME, RULE_NAME, config_qualifiers, action="DROP", priority="1000") + dvs_acl.verify_acl_rule(expected_sai_qualifiers, action="DROP", priority="1000") + + dvs_acl.remove_acl_rule(TABLE_NAME, RULE_NAME) + dvs_acl.verify_no_acl_rules() + + def test_EgressAclInnerIPProtocol(self, dvs_acl, egress_acl_table): + config_qualifiers = {"INNER_IP_PROTOCOL": "8"} + expected_sai_qualifiers = { + "SAI_ACL_ENTRY_ATTR_FIELD_INNER_IP_PROTOCOL": dvs_acl.get_simple_qualifier_comparator("8&mask:0xff") + } + + dvs_acl.create_acl_rule(TABLE_NAME, RULE_NAME, config_qualifiers, action="DROP", priority="1000") + dvs_acl.verify_acl_rule(expected_sai_qualifiers, action="DROP", priority="1000") + + dvs_acl.remove_acl_rule(TABLE_NAME, RULE_NAME) + dvs_acl.verify_no_acl_rules() + + def test_EgressAclInnerEthType(self, dvs_acl, egress_acl_table): + config_qualifiers = {"INNER_ETHER_TYPE": "8000"} + expected_sai_qualifiers = { + "SAI_ACL_ENTRY_ATTR_FIELD_INNER_ETHER_TYPE": dvs_acl.get_simple_qualifier_comparator("8000&mask:0xffff") + } + + dvs_acl.create_acl_rule(TABLE_NAME, RULE_NAME, config_qualifiers, action="DROP", priority="1000") + dvs_acl.verify_acl_rule(expected_sai_qualifiers, action="DROP", priority="1000") + + dvs_acl.remove_acl_rule(TABLE_NAME, RULE_NAME) + dvs_acl.verify_no_acl_rules() + + def test_EgressAclInnerL4SrcPort(self, dvs_acl, egress_acl_table): + config_qualifiers = {"INNER_L4_SRC_PORT": "999"} + expected_sai_qualifiers = { + "SAI_ACL_ENTRY_ATTR_FIELD_INNER_L4_SRC_PORT": dvs_acl.get_simple_qualifier_comparator("999&mask:0xffff") + } - # Create Tunnel VNI Acl rule - fvPairs = [("priority", "1000"), ("PACKET_ACTION", "DROP"), ("TUNNEL_VNI", "5000")] - self.create_acl_rule(fvPairs, "TunnelVNI_rule") + dvs_acl.create_acl_rule(TABLE_NAME, RULE_NAME, config_qualifiers, action="DROP", priority="1000") + dvs_acl.verify_acl_rule(expected_sai_qualifiers, action="DROP", priority="1000") - # Verify Acl rule in ASIC DB - fv_tuple = ("SAI_ACL_ENTRY_ATTR_FIELD_TUNNEL_VNI", "5000&mask:0xffffffff") - self.verify_acl_rule_asic_fvs(dvs, fv_tuple) + dvs_acl.remove_acl_rule(TABLE_NAME, RULE_NAME) + dvs_acl.verify_no_acl_rules() + + def test_EgressAclInnerL4DstPort(self, dvs_acl, egress_acl_table): + config_qualifiers = {"INNER_L4_DST_PORT": "999"} + expected_sai_qualifiers = { + "SAI_ACL_ENTRY_ATTR_FIELD_INNER_L4_DST_PORT": dvs_acl.get_simple_qualifier_comparator("999&mask:0xffff") + } - # Remove Acl rule - self.remove_acl_rule("egress_acl_table", "TunnelVNI_rule") - self.check_asic_rule_absent(dvs) + dvs_acl.create_acl_rule(TABLE_NAME, RULE_NAME, config_qualifiers, action="DROP", priority="1000") + dvs_acl.verify_acl_rule(expected_sai_qualifiers, action="DROP", priority="1000") - def test_EgressAclRuleTC(self, dvs): - self.setup_db(dvs) - - # Create TC Acl rule - fvPairs = [("priority", "1000"), ("PACKET_ACTION", "DROP"), ("TC", "1")] - self.create_acl_rule(fvPairs, "TC_rule") - - # Verify Acl rule in ASIC DB - fv_tuple = ("SAI_ACL_ENTRY_ATTR_FIELD_TC", "1&mask:0xff") - self.verify_acl_rule_asic_fvs(dvs, fv_tuple) - - # Remove Acl rule - self.remove_acl_rule("egress_acl_table", "TC_rule") - self.check_asic_rule_absent(dvs) - - def test_EgressAclInnerIPProtocol(self, dvs): - self.setup_db(dvs) - - # Create InnerIPProtocol Acl rule - fvPairs = [("priority", "1000"), ("PACKET_ACTION", "DROP"), ("INNER_IP_PROTOCOL", "8")] - self.create_acl_rule(fvPairs, "InnerIPProtocol_rule") - - # Verify Acl rule in ASIC DB - fv_tuple = ("SAI_ACL_ENTRY_ATTR_FIELD_INNER_IP_PROTOCOL", "8&mask:0xff") - self.verify_acl_rule_asic_fvs(dvs, fv_tuple) - - # Remove Acl rule - self.remove_acl_rule("egress_acl_table", "InnerIPProtocol_rule") - self.check_asic_rule_absent(dvs) - - def test_EgressAclInnerEthType(self, dvs): - self.setup_db(dvs) - - # Create InnerEthernetType Acl rule - fvPairs = [("priority", "1000"), ("PACKET_ACTION", "DROP"), ("INNER_ETHER_TYPE", "8000")] - self.create_acl_rule(fvPairs, "InnerEthType_rule") - - # Verify Acl rule in ASIC DB - fv_tuple = ("SAI_ACL_ENTRY_ATTR_FIELD_INNER_ETHER_TYPE", "8000&mask:0xffff") - self.verify_acl_rule_asic_fvs(dvs, fv_tuple) - - # Remove Acl rule - self.remove_acl_rule("egress_acl_table", "InnerEthType_rule") - self.check_asic_rule_absent(dvs) - - def test_EgressAclInnerL4SrcPort(self, dvs): - self.setup_db(dvs) - - # Create InnerL4SrcPort Acl rule - fvPairs = [("priority", "1000"), ("PACKET_ACTION", "DROP"), ("INNER_L4_SRC_PORT", "999")] - self.create_acl_rule(fvPairs, "InnerL4SrcPort_rule") - - # Verify Acl rule in ASIC DB - fv_tuple = ("SAI_ACL_ENTRY_ATTR_FIELD_INNER_L4_SRC_PORT", "999&mask:0xffff") - self.verify_acl_rule_asic_fvs(dvs, fv_tuple) - - # Remove Acl rule - self.remove_acl_rule("egress_acl_table", "InnerL4SrcPort_rule") - self.check_asic_rule_absent(dvs) - - def test_EgressAclInnerL4DstPort(self, dvs): - self.setup_db(dvs) - - # Create InnerL4DstPort Acl rule - fvPairs = [("priority", "1000"), ("PACKET_ACTION", "DROP"), ("INNER_L4_DST_PORT", "999")] - self.create_acl_rule(fvPairs, "InnerL4DstPort_rule") - - # Verify Acl rule in ASIC DB - fv_tuple = ("SAI_ACL_ENTRY_ATTR_FIELD_INNER_L4_DST_PORT", "999&mask:0xffff") - self.verify_acl_rule_asic_fvs(dvs, fv_tuple) - - # Remove Acl rule - self.remove_acl_rule("egress_acl_table", "InnerL4DstPort_rule") - self.check_asic_rule_absent(dvs) - - def test_EgressAclTableDeletion(self, dvs): - self.setup_db(dvs) - - # Remove Acl table - self.remove_acl_table("egress_acl_table") - self.check_asic_table_absent(dvs) + dvs_acl.remove_acl_rule(TABLE_NAME, RULE_NAME) + dvs_acl.verify_no_acl_rules() # Add Dummy always-pass test at end as workaroud