diff --git a/dockers/docker-orchagent/tunnel_packet_handler.py b/dockers/docker-orchagent/tunnel_packet_handler.py index 398d7956abf6..8d1b775ec9ce 100755 --- a/dockers/docker-orchagent/tunnel_packet_handler.py +++ b/dockers/docker-orchagent/tunnel_packet_handler.py @@ -8,11 +8,14 @@ destination IP to trigger the process of obtaining neighbor information """ import subprocess +import sys import time from datetime import datetime from ipaddress import ip_interface +from queue import Queue -from swsscommon.swsscommon import ConfigDBConnector, SonicV2Connector +from swsscommon.swsscommon import ConfigDBConnector, SonicV2Connector, \ + DBConnector, Select, SubscriberStateTable from sonic_py_common import logger as log from pyroute2 import IPRoute @@ -25,18 +28,35 @@ logger = log.Logger() STATE_DB = 'STATE_DB' +APPL_DB = 'APPL_DB' PORTCHANNEL_INTERFACE_TABLE = 'PORTCHANNEL_INTERFACE' TUNNEL_TABLE = 'TUNNEL' PEER_SWITCH_TABLE = 'PEER_SWITCH' INTF_TABLE_TEMPLATE = 'INTERFACE_TABLE|{}|{}' +LAG_TABLE = 'LAG_TABLE' STATE_KEY = 'state' TUNNEL_TYPE_KEY = 'tunnel_type' DST_IP_KEY = 'dst_ip' ADDRESS_IPV4_KEY = 'address_ipv4' +OPER_STATUS_KEY = 'oper_status' IPINIP_TUNNEL = 'ipinip' - RTM_NEWLINK = 'RTM_NEWLINK' +SELECT_TIMEOUT = 1000 + +nl_msgs = Queue() +portchannel_intfs = None + +def add_msg_to_queue(target, msg): + """ + Adds a netlink message to a queue + Args: + target: unused, needed by NDB API + msg: a netlink message + """ + + if msg.get_attr('IFLA_IFNAME') in portchannel_intfs: + nl_msgs.put(msg) class TunnelPacketHandler(object): """ @@ -55,7 +75,10 @@ def __init__(self): self.sniffer = None self.self_ip = '' self.packet_filter = '' - self.sniff_intfs = [] + self.sniff_intfs = set() + + global portchannel_intfs + portchannel_intfs = [name for name, _ in self.portchannel_intfs] @property def portchannel_intfs(self): @@ -95,17 +118,6 @@ def get_intf_name(self, msg): return '' - def netlink_msg_is_for_portchannel(self, msg): - """ - Determines if a netlink message is about a PortChannel interface - - Returns: - (list) integers representing kernel indices - """ - ifname = self.get_intf_name(msg) - - return ifname in [name for name, _ in self.portchannel_intfs] - def get_up_portchannels(self): """ Returns the portchannels which are operationally up @@ -125,11 +137,11 @@ def get_up_portchannels(self): logger.log_notice("Skipping non-existent interface {}".format(intf)) continue link_statuses.append(status[0]) - up_portchannels = [] + up_portchannels = set() for status in link_statuses: - if status['state'] == 'up': - up_portchannels.append(self.get_intf_name(status)) + if status.get_attr('IFLA_OPERSTATE').lower() == 'up': + up_portchannels.add(status.get_attr('IFLA_IFNAME')) return up_portchannels @@ -242,52 +254,47 @@ def get_inner_pkt_type(self, packet): return IPv6 return False - def wait_for_netlink_msgs(self): - """ - Gathers any RTM_NEWLINK messages - - Returns: - (list) containing any received messages - """ - msgs = [] - with IPRoute() as ipr: - ipr.bind() - for msg in ipr.get(): - if msg['event'] == RTM_NEWLINK: - msgs.append(msg) - - return msgs - - def sniffer_restart_required(self, messages): + def sniffer_restart_required(self, lag, fvs): """ Determines if the packet sniffer needs to be restarted - A restart is required if all of the following conditions are met: - 1. A netlink message of type RTM_NEWLINK is received - (this is checked by `wait_for_netlink_msgs`) - 2. The interface index of the message corresponds to a portchannel - interface - 3. The state of the interface in the message is 'up' - Here, we do not care about an interface going down since - the sniffer is able to continue sniffing on the other - interfaces. However, if an interface has gone down and - come back up, we need to restart the sniffer to be able - to sniff traffic on the interface that has come back up. + The sniffer needs to be restarted when a portchannel interface transitions + from down to up. When a portchannel interface goes down, the sniffer is + able to continue sniffing on other portchannels. """ - for msg in messages: - if self.netlink_msg_is_for_portchannel(msg): - if msg['state'] == 'up': - logger.log_info('{} came back up, sniffer restart required' - .format(self.get_intf_name(msg))) - return True - return False + oper_status = dict(fvs).get(OPER_STATUS_KEY) + if lag not in self.sniff_intfs and oper_status == 'up': + logger.log_info('{} came back up, sniffer restart required' + .format(lag)) + # Don't need to modify self.sniff_intfs here since it is repopulated + # by self.get_up_portchannels() + return True + elif lag in self.sniff_intfs and oper_status == 'down': + # A portchannel interface went down, remove it from the list of + # sniffed interfaces so we can detect when it comes back up + self.sniff_intfs.remove(lag) + return False + else: + return False def start_sniffer(self): """ Starts an AsyncSniffer and waits for it to inititalize fully """ + start = datetime.now() + + self.sniff_intfs = self.get_up_portchannels() + + while not self.sniff_intfs: + logger.log_info('No portchannels are up yet...') + if (datetime.now() - start).seconds > 180: + logger.log_error('All portchannels failed to come up within 3 minutes, exiting...') + sys.exit(1) + self.sniff_intfs = self.get_up_portchannels() + time.sleep(10) + self.sniffer = AsyncSniffer( - iface=self.sniff_intfs, + iface=list(self.sniff_intfs), filter=self.packet_filter, prn=self.ping_inner_dst, store=0 @@ -332,18 +339,35 @@ def listen_for_tunnel_pkts(self): logger.log_notice('Starting tunnel packet handler for {}' .format(self.packet_filter)) - self.sniff_intfs = self.get_up_portchannels() - logger.log_info("Listening on interfaces {}".format(self.sniff_intfs)) + + app_db = DBConnector(APPL_DB, 0) + lag_table = SubscriberStateTable(app_db, LAG_TABLE) + sel = Select() + sel.addSelectable(lag_table) self.start_sniffer() + logger.log_info("Listening on interfaces {}".format(self.sniff_intfs)) while True: - msgs = self.wait_for_netlink_msgs() - if self.sniffer_restart_required(msgs): - self.sniffer.stop() - sniff_intfs = self.get_up_portchannels() - logger.log_notice('Restarting tunnel packet handler on ' - 'interfaces {}'.format(sniff_intfs)) - self.start_sniffer() + rc, _ = sel.select(SELECT_TIMEOUT) + + if rc == Select.TIMEOUT: + continue + elif rc == Select.ERROR: + raise Exception("Select() error") + else: + lag, op, fvs = lag_table.pop() + if self.sniffer_restart_required(lag, fvs): + self.sniffer.stop() + start = datetime.now() + # wait up to 3 seconds for the kernel interface to be synced with APPL_DB status + while (datetime.now() - start).seconds < 3: + self.sniff_intfs = self.get_up_portchannels() + if lag in self.sniff_intfs: + break + time.sleep(0.1) + logger.log_notice('Restarting tunnel packet handler on ' + 'interfaces {}'.format(self.sniff_intfs)) + self.start_sniffer() def run(self): """ @@ -360,4 +384,4 @@ def main(): if __name__ == "__main__": - main() + main() \ No newline at end of file