From 56db1623407bd2ebc198b6df88fc3ada624c793e Mon Sep 17 00:00:00 2001 From: Dmytro Date: Wed, 2 Jun 2021 04:38:22 +0300 Subject: [PATCH] [config] Fix config int add incorrect ip (#1414) * Added ip and mask check to config int Signed-off-by: d-dashkov --- config/main.py | 28 ++++++- tests/ip_config_test.py | 157 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 183 insertions(+), 2 deletions(-) mode change 100755 => 100644 config/main.py create mode 100644 tests/ip_config_test.py diff --git a/config/main.py b/config/main.py old mode 100755 new mode 100644 index 102927d0b885..828c447e5f95 --- a/config/main.py +++ b/config/main.py @@ -779,6 +779,22 @@ def validate_mirror_session_config(config_db, session_name, dst_port, src_port, return True +def validate_ip_mask(ctx, ip_addr): + split_ip_mask = ip_addr.split("/") + # Check if the IP address is correct or if there are leading zeros. + ip_obj = ipaddress.ip_address(split_ip_mask[0]) + + # Check if the mask is correct + mask_range = 33 if isinstance(ip_obj, ipaddress.IPv4Address) else 129 + # If mask is not specified + if len(split_ip_mask) < 2: + return 0 + + if not int(split_ip_mask[1]) in range(1, mask_range): + return 0 + + return str(ip_obj) + '/' + str(int(split_ip_mask[1])) + def cli_sroute_to_config(ctx, command_str, strict_nh = True): if len(command_str) < 2 or len(command_str) > 9: ctx.fail("argument is not in pattern prefix [vrf ] nexthop <[vrf ] >|>!") @@ -3473,6 +3489,10 @@ def add(ctx, interface_name, ip_addr, gw): if '/' not in ip_addr: ip_addr = str(net) + ip_addr = validate_ip_mask(ctx, ip_addr) + if not ip_addr: + raise ValueError('') + if interface_name == 'eth0': # Configuring more than 1 IPv4 or more than 1 IPv6 address fails. @@ -3509,7 +3529,7 @@ def add(ctx, interface_name, ip_addr, gw): config_db.set_entry(table_name, interface_name, {"NULL": "NULL"}) config_db.set_entry(table_name, (interface_name, ip_addr), {"NULL": "NULL"}) except ValueError: - ctx.fail("'ip_addr' is not valid.") + ctx.fail("ip address or mask is not valid.") # # 'del' subcommand @@ -3533,6 +3553,10 @@ def remove(ctx, interface_name, ip_addr): net = ipaddress.ip_network(ip_addr, strict=False) if '/' not in ip_addr: ip_addr = str(net) + + ip_addr = validate_ip_mask(ctx, ip_addr) + if not ip_addr: + raise ValueError('') if interface_name == 'eth0': config_db.set_entry("MGMT_INTERFACE", (interface_name, ip_addr), None) @@ -3572,7 +3596,7 @@ def remove(ctx, interface_name, ip_addr): command = "ip neigh flush dev {} {}".format(interface_name, ip_addr) clicommon.run_command(command) except ValueError: - ctx.fail("'ip_addr' is not valid.") + ctx.fail("ip address or mask is not valid.") # diff --git a/tests/ip_config_test.py b/tests/ip_config_test.py new file mode 100644 index 000000000000..d08a03ca8fe5 --- /dev/null +++ b/tests/ip_config_test.py @@ -0,0 +1,157 @@ +import os +import traceback +from unittest import mock + +from click.testing import CliRunner + +import config.main as config +import show.main as show +from utilities_common.db import Db + +ERROR_MSG = ''' +Error: ip address or mask is not valid. +''' + +class TestConfigIP(object): + @classmethod + def setup_class(cls): + os.environ['UTILITIES_UNIT_TESTING'] = "1" + print("SETUP") + + ''' Tests for IPv4 ''' + + def test_add_del_interface_valid_ipv4(self): + db = Db() + runner = CliRunner() + obj = {'config_db':db.cfgdb} + + # config int ip add Ethernet64 10.10.10.1/24 + result = runner.invoke(config.config.commands["interface"].commands["ip"].commands["add"], ["Ethernet64", "10.10.10.1/24"], obj=obj) + print(result.exit_code, result.output) + assert result.exit_code == 0 + assert ('Ethernet64', '10.10.10.1/24') in db.cfgdb.get_table('INTERFACE') + + # config int ip remove Ethernet64 10.10.10.1/24 + result = runner.invoke(config.config.commands["interface"].commands["ip"].commands["remove"], ["Ethernet64", "10.10.10.1/24"], obj=obj) + print(result.exit_code, result.output) + assert result.exit_code != 0 + assert ('Ethernet64', '10.10.10.1/24') not in db.cfgdb.get_table('INTERFACE') + + def test_add_interface_invalid_ipv4(self): + db = Db() + runner = CliRunner() + obj = {'config_db':db.cfgdb} + + # config int ip add Ethernet64 10000.10.10.1/24 + result = runner.invoke(config.config.commands["interface"].commands["ip"].commands["add"], ["Ethernet64", "10000.10.10.1/24"], obj=obj) + print(result.exit_code, result.output) + assert result.exit_code != 0 + assert ERROR_MSG in result.output + + def test_add_interface_ipv4_invalid_mask(self): + db = Db() + runner = CliRunner() + obj = {'config_db':db.cfgdb} + + # config int ip add Ethernet64 10.10.10.1/37 + result = runner.invoke(config.config.commands["interface"].commands["ip"].commands["add"], ["Ethernet64", "10.10.10.1/37"], obj=obj) + print(result.exit_code, result.output) + assert result.exit_code != 0 + assert ERROR_MSG in result.output + + def test_add_del_interface_ipv4_with_leading_zeros(self): + db = Db() + runner = CliRunner() + obj = {'config_db':db.cfgdb} + + # config int ip add Ethernet68 10.10.10.002/24 + result = runner.invoke(config.config.commands["interface"].commands["ip"].commands["add"], ["Ethernet68", "10.10.10.002/24"], obj=obj) + print(result.exit_code, result.output) + assert result.exit_code == 0 + assert ('Ethernet68', '10.10.10.2/24') in db.cfgdb.get_table('INTERFACE') + + # config int ip remove Ethernet68 10.10.10.002/24 + result = runner.invoke(config.config.commands["interface"].commands["ip"].commands["remove"], ["Ethernet68", "10.10.10.002/24"], obj=obj) + print(result.exit_code, result.output) + assert result.exit_code != 0 + assert ('Ethernet68', '10.10.10.2/24') not in db.cfgdb.get_table('INTERFACE') + + ''' Tests for IPv6 ''' + + def test_add_del_interface_valid_ipv6(self): + db = Db() + runner = CliRunner() + obj = {'config_db':db.cfgdb} + + # config int ip add Ethernet72 2001:1db8:11a3:19d7:1f34:8a2e:17a0:765d/34 + result = runner.invoke(config.config.commands["interface"].commands["ip"].commands["add"], ["Ethernet72", "2001:1db8:11a3:19d7:1f34:8a2e:17a0:765d/34"], obj=obj) + print(result.exit_code, result.output) + assert result.exit_code == 0 + assert ('Ethernet72', '2001:1db8:11a3:19d7:1f34:8a2e:17a0:765d/34') in db.cfgdb.get_table('INTERFACE') + + # config int ip remove Ethernet72 2001:1db8:11a3:19d7:1f34:8a2e:17a0:765d/34 + result = runner.invoke(config.config.commands["interface"].commands["ip"].commands["remove"], ["Ethernet72", "2001:1db8:11a3:19d7:1f34:8a2e:17a0:765d/34"], obj=obj) + print(result.exit_code, result.output) + assert result.exit_code != 0 + assert ('Ethernet72', '2001:1db8:11a3:19d7:1f34:8a2e:17a0:765d/34') not in db.cfgdb.get_table('INTERFACE') + + def test_add_interface_invalid_ipv6(self): + db = Db() + runner = CliRunner() + obj = {'config_db':db.cfgdb} + + # config int ip add Ethernet72 20001:0db8:11a3:09d7:1f34:8a2e:07a0:765d/34 + result = runner.invoke(config.config.commands["interface"].commands["ip"].commands["add"], ["Ethernet72", "20001:0db8:11a3:19d7:1f34:8a2e:17a0:765d/34"], obj=obj) + print(result.exit_code, result.output) + assert result.exit_code != 0 + assert ERROR_MSG in result.output + + def test_add_interface_ipv6_invalid_mask(self): + db = Db() + runner = CliRunner() + obj = {'config_db':db.cfgdb} + + # config int ip add Ethernet72 2001:0db8:11a3:09d7:1f34:8a2e:07a0:765d/200 + result = runner.invoke(config.config.commands["interface"].commands["ip"].commands["add"], ["Ethernet72", "2001:0db8:11a3:09d7:1f34:8a2e:07a0:765d/200"], obj=obj) + print(result.exit_code, result.output) + assert result.exit_code != 0 + assert ERROR_MSG in result.output + + def test_add_del_interface_ipv6_with_leading_zeros(self): + db = Db() + runner = CliRunner() + obj = {'config_db':db.cfgdb} + + # config int ip del Ethernet68 2001:0db8:11a3:09d7:1f34:8a2e:07a0:765d/34 + result = runner.invoke(config.config.commands["interface"].commands["ip"].commands["add"], ["Ethernet68", "2001:0db8:11a3:09d7:1f34:8a2e:07a0:765d/34"], obj=obj) + print(result.exit_code, result.output) + assert result.exit_code == 0 + assert ('Ethernet68', '2001:db8:11a3:9d7:1f34:8a2e:7a0:765d/34') in db.cfgdb.get_table('INTERFACE') + + # config int ip remove Ethernet68 2001:0db8:11a3:09d7:1f34:8a2e:07a0:765d/34 + result = runner.invoke(config.config.commands["interface"].commands["ip"].commands["remove"], ["Ethernet68", "2001:0db8:11a3:09d7:1f34:8a2e:07a0:765d/34"], obj=obj) + print(result.exit_code, result.output) + assert result.exit_code != 0 + assert ('Ethernet68', '2001:db8:11a3:9d7:1f34:8a2e:7a0:765d/34') not in db.cfgdb.get_table('INTERFACE') + + def test_add_del_interface_shortened_ipv6_with_leading_zeros(self): + db = Db() + runner = CliRunner() + obj = {'config_db':db.cfgdb} + + # config int ip del Ethernet68 3000::001/64 + result = runner.invoke(config.config.commands["interface"].commands["ip"].commands["add"], ["Ethernet68", "3000::001/64"], obj=obj) + print(result.exit_code, result.output) + assert result.exit_code == 0 + assert ('Ethernet68', '3000::1/64') in db.cfgdb.get_table('INTERFACE') + + # config int ip remove Ethernet68 3000::001/64 + result = runner.invoke(config.config.commands["interface"].commands["ip"].commands["remove"], ["Ethernet68", "3000::001/64"], obj=obj) + print(result.exit_code, result.output) + assert result.exit_code != 0 + assert ('Ethernet68', '3000::1/64') not in db.cfgdb.get_table('INTERFACE') + + @classmethod + def teardown_class(cls): + os.environ['UTILITIES_UNIT_TESTING'] = "0" + print("TEARDOWN")