diff --git a/ansible/roles/test/files/ptftests/py3/dhcp_relay_test.py b/ansible/roles/test/files/ptftests/py3/dhcp_relay_test.py index 474ef4c129..216ce4a1d9 100644 --- a/ansible/roles/test/files/ptftests/py3/dhcp_relay_test.py +++ b/ansible/roles/test/files/ptftests/py3/dhcp_relay_test.py @@ -90,6 +90,11 @@ class DHCPTest(DataplaneBaseTest): DHCP_LEASE_TIME_LEN = 6 LEASE_TIME = 86400 DHCP_PKT_BOOTP_MIN_LEN = 300 + DHCP_ETHER_TYPE_IP = 0x0800 + DHCP_BOOTP_OP_REPLY = 2 + DHCP_BOOTP_HTYPE_ETHERNET = 1 + DHCP_BOOTP_HLEN_ETHERNET = 6 + DHCP_BOOTP_FLAGS_BROADCAST_REPLY = 0x8000 def __init__(self): DataplaneBaseTest.__init__(self) @@ -240,8 +245,78 @@ def create_dhcp_discover_relayed_packet(self): pkt = ether / ip / udp / bootp return pkt + def dhcp_offer_packet(self, + eth_server="00:01:02:03:04:05", + eth_dst="06:07:08:09:10:11", + eth_client="12:13:14:15:16:17", + ip_server="0.1.2.3", + ip_dst="255.255.255.255", + ip_offered="8.9.10.11", + port_dst=DHCP_CLIENT_PORT, + netmask_client="255.255.255.0", + ip_gateway=DEFAULT_ROUTE_IP, + dhcp_lease=256, + padding_bytes=0, + set_broadcast_bit=False, + ): + """ + Return a DHCPOFFER packet + Supports a few parameters: + @param eth_server MAC address of DHCP server + @param eth_dst MAC address of destination (DHCP Client, Relay agent) or broadcast (ff:ff:ff:ff:ff:ff) + @param eth_client MAC address of DHCP client + @param ip_server IP address of DHCP server + @param ip_dst IP address of destination (DHCP Client, Relay agent) or broadcast (255.255.255.255) + @param ip_offered IP address that server is assigning to client + @param ip_gateway Gateway IP Address, address of relay agent if encountered + @param port_dst Destination port of packet (default: DHCP_PORT_CLIENT) + @param netmask_client Subnet mask of client + @param dhcp_lease Time in seconds of DHCP lease + @param padding_bytes Number of '\x00' bytes to append to end of packet + Destination IP can be unicast or broadcast (255.255.255.255) + Source port is always 67 (DHCP server port) + Destination port by default is 68 (DHCP client port), but can be also be 67 (DHCP server port) if being sent to a DHCP relay agent + """ + my_chaddr = binascii.unhexlify(eth_client.replace(':', '')) + my_chaddr += b'\x00\x00\x00\x00\x00\x00' + + pkt = scapy.Ether(dst=eth_dst, src=eth_server, type=self.DHCP_ETHER_TYPE_IP) + pkt /= scapy.IP(src=ip_server, dst=ip_dst, ttl=128, id=0) + pkt /= scapy.UDP(sport=self.DHCP_SERVER_PORT, dport=port_dst) + pkt /= scapy.BOOTP( + op=self.DHCP_BOOTP_OP_REPLY, + htype=self.DHCP_BOOTP_HTYPE_ETHERNET, + hlen=self.DHCP_BOOTP_HLEN_ETHERNET, + hops=0, + xid=0, + secs=0, + flags=self.DHCP_BOOTP_FLAGS_BROADCAST_REPLY if set_broadcast_bit else 0, + ciaddr=self.DEFAULT_ROUTE_IP, + yiaddr=ip_offered, + siaddr=ip_server, + giaddr=ip_gateway, + chaddr=my_chaddr, + ) + # The length of option82 is 41 bytes, and dhcp relay will strip option82, + # when the length of next option is bigger than 42 bytes, + # it could introduce the overwritten issue. + pkt /= scapy.DHCP( + options=[ + ("message-type", "offer"), + ("server_id", ip_server), + ("lease_time", int(dhcp_lease)), + ("subnet_mask", netmask_client), + (82, self.option82), + ("vendor_class_id", "http://0.0.0.0/this_is_a_very_very_long_path/test.bin".encode('utf-8')), + ("end"), + ] + ) + if padding_bytes: + pkt /= scapy.PADDING("\x00" * padding_bytes) + return pkt + def create_dhcp_offer_packet(self): - return testutils.dhcp_offer_packet(eth_server=self.server_iface_mac, + return self.dhcp_offer_packet(eth_server=self.server_iface_mac, eth_dst=self.uplink_mac, eth_client=self.client_mac, ip_server=self.server_ip, @@ -266,8 +341,8 @@ def create_dhcp_offer_relayed_packet(self): # 4.) Replaces the destination IP with broadcast (255.255.255.255) # 5.) Replaces the destination port with the DHCP client port (68) ether = scapy.Ether(dst=self.BROADCAST_MAC, src=self.relay_iface_mac, type=0x0800) - ip = scapy.IP(src=self.relay_iface_ip, dst=self.BROADCAST_IP, len=290, ttl=64) - udp = scapy.UDP(sport=self.DHCP_SERVER_PORT, dport=self.DHCP_CLIENT_PORT, len=262) + ip = scapy.IP(src=self.relay_iface_ip, dst=self.BROADCAST_IP, ttl=64) + udp = scapy.UDP(sport=self.DHCP_SERVER_PORT, dport=self.DHCP_CLIENT_PORT) bootp = scapy.BOOTP(op=2, htype=1, hlen=6, @@ -284,13 +359,13 @@ def create_dhcp_offer_relayed_packet(self): ('server_id', self.server_ip), ('lease_time', self.LEASE_TIME), ('subnet_mask', self.client_subnet), + ("vendor_class_id", "http://0.0.0.0/this_is_a_very_very_long_path/test.bin".encode('utf-8')), ('end')]) - # TODO: Need to add this to the packet creation functions in PTF code first! # If our bootp layer is too small, pad it - #pad_bytes = self.DHCP_PKT_BOOTP_MIN_LEN - len(bootp) - #if pad_bytes > 0: - # bootp /= scapy.PADDING('\x00' * pad_bytes) + pad_bytes = self.DHCP_PKT_BOOTP_MIN_LEN - len(bootp) + if pad_bytes > 0: + bootp /= scapy.PADDING('\x00' * pad_bytes) pkt = ether / ip / udp / bootp return pkt