Skip to content

Commit

Permalink
[tph]: Detect LAG flaps from APPL_DB (#16879)
Browse files Browse the repository at this point in the history
Why I did it
A race condition exists while the TPH is processing a netlink message - if a second netlink message arrives during processing it will be missed since TPH is not listening for other messages.
Another bug was found where TPH was unnecessarily restarting since it was checking admin status instead of operational status of portchannels.

How I did it
Subscribe to APPL_DB for updates on LAG operational state
Track currently sniffed interfaces

How to verify it
Send tunnel packets with destination IP of an unresolved neighbor, verify that ping commands are run
Shut down a portchannel interface, verify that sniffer does not restart
Send tunnel packets, verify ping commands are still run
Bring up portchannel interface, verify that sniffer restarts

Signed-off-by: Lawrence Lee <lawlee@microsoft.com>
  • Loading branch information
theasianpianist authored and mssonicbld committed Nov 15, 2023
1 parent 6696a26 commit 795ac1a
Showing 1 changed file with 87 additions and 63 deletions.
150 changes: 87 additions & 63 deletions dockers/docker-orchagent/tunnel_packet_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@
destination IP to trigger the process of obtaining neighbor information
"""
import subprocess
import sys
import time
from datetime import datetime
from ipaddress import ip_interface
from queue import Queue

from swsscommon.swsscommon import ConfigDBConnector, SonicV2Connector
from swsscommon.swsscommon import ConfigDBConnector, SonicV2Connector, \
DBConnector, Select, SubscriberStateTable
from sonic_py_common import logger as log

from pyroute2 import IPRoute
Expand All @@ -25,18 +28,35 @@
logger = log.Logger()

STATE_DB = 'STATE_DB'
APPL_DB = 'APPL_DB'
PORTCHANNEL_INTERFACE_TABLE = 'PORTCHANNEL_INTERFACE'
TUNNEL_TABLE = 'TUNNEL'
PEER_SWITCH_TABLE = 'PEER_SWITCH'
INTF_TABLE_TEMPLATE = 'INTERFACE_TABLE|{}|{}'
LAG_TABLE = 'LAG_TABLE'
STATE_KEY = 'state'
TUNNEL_TYPE_KEY = 'tunnel_type'
DST_IP_KEY = 'dst_ip'
ADDRESS_IPV4_KEY = 'address_ipv4'
OPER_STATUS_KEY = 'oper_status'
IPINIP_TUNNEL = 'ipinip'

RTM_NEWLINK = 'RTM_NEWLINK'
SELECT_TIMEOUT = 1000

nl_msgs = Queue()
portchannel_intfs = None

def add_msg_to_queue(target, msg):
"""
Adds a netlink message to a queue
Args:
target: unused, needed by NDB API
msg: a netlink message
"""

if msg.get_attr('IFLA_IFNAME') in portchannel_intfs:
nl_msgs.put(msg)

class TunnelPacketHandler(object):
"""
Expand All @@ -55,7 +75,10 @@ def __init__(self):
self.sniffer = None
self.self_ip = ''
self.packet_filter = ''
self.sniff_intfs = []
self.sniff_intfs = set()

global portchannel_intfs
portchannel_intfs = [name for name, _ in self.portchannel_intfs]

@property
def portchannel_intfs(self):
Expand Down Expand Up @@ -95,17 +118,6 @@ def get_intf_name(self, msg):

return ''

def netlink_msg_is_for_portchannel(self, msg):
"""
Determines if a netlink message is about a PortChannel interface
Returns:
(list) integers representing kernel indices
"""
ifname = self.get_intf_name(msg)

return ifname in [name for name, _ in self.portchannel_intfs]

def get_up_portchannels(self):
"""
Returns the portchannels which are operationally up
Expand All @@ -125,11 +137,11 @@ def get_up_portchannels(self):
logger.log_notice("Skipping non-existent interface {}".format(intf))
continue
link_statuses.append(status[0])
up_portchannels = []
up_portchannels = set()

for status in link_statuses:
if status['state'] == 'up':
up_portchannels.append(self.get_intf_name(status))
if status.get_attr('IFLA_OPERSTATE').lower() == 'up':
up_portchannels.add(status.get_attr('IFLA_IFNAME'))

return up_portchannels

Expand Down Expand Up @@ -242,52 +254,47 @@ def get_inner_pkt_type(self, packet):
return IPv6
return False

def wait_for_netlink_msgs(self):
"""
Gathers any RTM_NEWLINK messages
Returns:
(list) containing any received messages
"""
msgs = []
with IPRoute() as ipr:
ipr.bind()
for msg in ipr.get():
if msg['event'] == RTM_NEWLINK:
msgs.append(msg)

return msgs

def sniffer_restart_required(self, messages):
def sniffer_restart_required(self, lag, fvs):
"""
Determines if the packet sniffer needs to be restarted
A restart is required if all of the following conditions are met:
1. A netlink message of type RTM_NEWLINK is received
(this is checked by `wait_for_netlink_msgs`)
2. The interface index of the message corresponds to a portchannel
interface
3. The state of the interface in the message is 'up'
Here, we do not care about an interface going down since
the sniffer is able to continue sniffing on the other
interfaces. However, if an interface has gone down and
come back up, we need to restart the sniffer to be able
to sniff traffic on the interface that has come back up.
The sniffer needs to be restarted when a portchannel interface transitions
from down to up. When a portchannel interface goes down, the sniffer is
able to continue sniffing on other portchannels.
"""
for msg in messages:
if self.netlink_msg_is_for_portchannel(msg):
if msg['state'] == 'up':
logger.log_info('{} came back up, sniffer restart required'
.format(self.get_intf_name(msg)))
return True
return False
oper_status = dict(fvs).get(OPER_STATUS_KEY)
if lag not in self.sniff_intfs and oper_status == 'up':
logger.log_info('{} came back up, sniffer restart required'
.format(lag))
# Don't need to modify self.sniff_intfs here since it is repopulated
# by self.get_up_portchannels()
return True
elif lag in self.sniff_intfs and oper_status == 'down':
# A portchannel interface went down, remove it from the list of
# sniffed interfaces so we can detect when it comes back up
self.sniff_intfs.remove(lag)
return False
else:
return False

def start_sniffer(self):
"""
Starts an AsyncSniffer and waits for it to inititalize fully
"""
start = datetime.now()

self.sniff_intfs = self.get_up_portchannels()

while not self.sniff_intfs:
logger.log_info('No portchannels are up yet...')
if (datetime.now() - start).seconds > 180:
logger.log_error('All portchannels failed to come up within 3 minutes, exiting...')
sys.exit(1)
self.sniff_intfs = self.get_up_portchannels()
time.sleep(10)

self.sniffer = AsyncSniffer(
iface=self.sniff_intfs,
iface=list(self.sniff_intfs),
filter=self.packet_filter,
prn=self.ping_inner_dst,
store=0
Expand Down Expand Up @@ -332,18 +339,35 @@ def listen_for_tunnel_pkts(self):
logger.log_notice('Starting tunnel packet handler for {}'
.format(self.packet_filter))

self.sniff_intfs = self.get_up_portchannels()
logger.log_info("Listening on interfaces {}".format(self.sniff_intfs))

app_db = DBConnector(APPL_DB, 0)
lag_table = SubscriberStateTable(app_db, LAG_TABLE)
sel = Select()
sel.addSelectable(lag_table)

self.start_sniffer()
logger.log_info("Listening on interfaces {}".format(self.sniff_intfs))
while True:
msgs = self.wait_for_netlink_msgs()
if self.sniffer_restart_required(msgs):
self.sniffer.stop()
sniff_intfs = self.get_up_portchannels()
logger.log_notice('Restarting tunnel packet handler on '
'interfaces {}'.format(sniff_intfs))
self.start_sniffer()
rc, _ = sel.select(SELECT_TIMEOUT)

if rc == Select.TIMEOUT:
continue
elif rc == Select.ERROR:
raise Exception("Select() error")
else:
lag, op, fvs = lag_table.pop()
if self.sniffer_restart_required(lag, fvs):
self.sniffer.stop()
start = datetime.now()
# wait up to 3 seconds for the kernel interface to be synced with APPL_DB status
while (datetime.now() - start).seconds < 3:
self.sniff_intfs = self.get_up_portchannels()
if lag in self.sniff_intfs:
break
time.sleep(0.1)
logger.log_notice('Restarting tunnel packet handler on '
'interfaces {}'.format(self.sniff_intfs))
self.start_sniffer()

def run(self):
"""
Expand All @@ -360,4 +384,4 @@ def main():


if __name__ == "__main__":
main()
main()

0 comments on commit 795ac1a

Please sign in to comment.