forked from sonic-net/sonic-platform-common
-
Notifications
You must be signed in to change notification settings - Fork 0
/
sfputilhelper.py
216 lines (171 loc) · 8.04 KB
/
sfputilhelper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
# sfputilbase.py
#
# Base class for creating platform-specific SFP transceiver interfaces for SONiC
#
from __future__ import print_function
try:
import os
import re
import sys
from collections import OrderedDict
from natsort import natsorted
from portconfig import get_port_config
from sonic_py_common import device_info
from sonic_py_common.interface import backplane_prefix, inband_prefix
except ImportError as e:
raise ImportError("%s - required module not found" % str(e))
# Global Variable
PLATFORM_JSON = 'platform.json'
PORT_CONFIG_INI = 'port_config.ini'
class SfpUtilHelper(object):
# List to specify filter for sfp_ports
# Needed by platforms like dni-6448 which
# have only a subset of ports that support sfp
sfp_ports = []
# List of logical port names available on a system
""" ["swp1", "swp5", "swp6", "swp7", "swp8" ...] """
logical = []
# Mapping of logical port names available on a system to ASIC num
logical_to_asic = {}
# dicts for easier conversions between logical, physical and bcm ports
logical_to_physical = {}
physical_to_logical = {}
physical_to_phyaddrs = {}
def __init__(self):
pass
def read_porttab_mappings(self, porttabfile, asic_inst=0):
logical = []
logical_to_physical = {}
physical_to_logical = {}
last_fp_port_index = 0
last_portname = ""
first = 1
port_pos_in_file = 0
parse_fmt_port_config_ini = False
parse_fmt_platform_json = False
parse_fmt_port_config_ini = (os.path.basename(porttabfile) == PORT_CONFIG_INI)
parse_fmt_platform_json = (os.path.basename(porttabfile) == PLATFORM_JSON)
(platform, hwsku) = device_info.get_platform_and_hwsku()
if(parse_fmt_platform_json):
ports, _, _ = get_port_config(hwsku, platform)
if not ports:
print('Failed to get port config', file=sys.stderr)
sys.exit(1)
else:
logical_list = []
for intf in ports.keys():
logical_list.append(intf)
logical = natsorted(logical_list, key=lambda y: y.lower())
logical_to_physical, physical_to_logical = OrderedDict(), OrderedDict()
for intf_name in logical:
bcm_port = str(port_pos_in_file)
if 'index' in ports[intf_name].keys():
fp_port_index = int(ports[intf_name]['index'])
logical_to_physical[intf_name] = [fp_port_index]
if physical_to_logical.get(fp_port_index) is None:
physical_to_logical[fp_port_index] = [intf_name]
else:
physical_to_logical[fp_port_index].append(intf_name)
# Mapping of logical port names available on a system to ASIC instance
self.logical_to_asic[intf_name] = asic_inst
port_pos_in_file +=1
self.logical = logical
self.logical_to_physical = logical_to_physical
self.physical_to_logical = physical_to_logical
"""
print("logical: {}".format(self.logical))
print("logical to physical: {}".format(self.logical_to_physical))
print("physical to logical: {}".format( self.physical_to_logical))
"""
return None
try:
f = open(porttabfile)
except Exception:
raise
# Read the porttab file and generate dicts
# with mapping for future reference.
#
# TODO: Refactor this to use the portconfig.py module that now
# exists as part of the sonic-config-engine package.
title = []
for line in f:
line.strip()
if re.search("^#", line) is not None:
# The current format is: # name lanes alias index speed
# Where the ordering of the columns can vary
title = line.split()[1:]
continue
# Parsing logic for 'port_config.ini' file
if (parse_fmt_port_config_ini):
# bcm_port is not explicitly listed in port_config.ini format
# Currently we assume ports are listed in numerical order according to bcm_port
# so we use the port's position in the file (zero-based) as bcm_port
portname = line.split()[0]
# Ignore if this is an internal backplane interface and Inband interface
if portname.startswith(backplane_prefix()) or portname.startswith(inband_prefix()):
continue
bcm_port = str(port_pos_in_file)
if "index" in title:
fp_port_index = int(line.split()[title.index("index")])
# Leave the old code for backward compatibility
elif "asic_port_name" not in title and len(line.split()) >= 4:
fp_port_index = int(line.split()[3])
else:
fp_port_index = portname.split("Ethernet").pop()
fp_port_index = int(fp_port_index.split("s").pop(0))/4
else: # Parsing logic for older 'portmap.ini' file
(portname, bcm_port) = line.split("=")[1].split(",")[:2]
fp_port_index = portname.split("Ethernet").pop()
fp_port_index = int(fp_port_index.split("s").pop(0))/4
if ((len(self.sfp_ports) > 0) and (fp_port_index not in self.sfp_ports)):
continue
if first == 1:
# Initialize last_[physical|logical]_port
# to the first valid port
last_fp_port_index = fp_port_index
last_portname = portname
first = 0
logical.append(portname)
# Mapping of logical port names available on a system to ASIC instance
self.logical_to_asic[portname] = asic_inst
logical_to_physical[portname] = [fp_port_index]
if physical_to_logical.get(fp_port_index) is None:
physical_to_logical[fp_port_index] = [portname]
else:
physical_to_logical[fp_port_index].append(portname)
last_fp_port_index = fp_port_index
last_portname = portname
port_pos_in_file += 1
self.logical.extend(logical)
self.logical_to_physical.update(logical_to_physical)
self.physical_to_logical.update(physical_to_logical)
"""
print("logical: " + self.logical)
print("logical to physical: " + self.logical_to_physical)
print("physical to logical: " + self.physical_to_logical)
"""
def read_all_porttab_mappings(self, platform_dir, num_asic_inst):
# In multi asic scenario, get all the port_config files for different asics
for inst in range(num_asic_inst):
port_map_dir = os.path.join(platform_dir, str(inst))
port_map_file = os.path.join(port_map_dir, PORT_CONFIG_INI)
if os.path.exists(port_map_file):
self.read_porttab_mappings(port_map_file, inst)
else:
port_json_file = os.path.join(port_map_dir, PLATFORM_JSON)
if os.path.exists(port_json_file):
self.read_porttab_mappings(port_json_file, inst)
def get_physical_to_logical(self, port_num):
"""Returns list of logical ports for the given physical port"""
return self.physical_to_logical.get(port_num)
def get_logical_to_physical(self, logical_port):
"""Returns list of physical ports for the given logical port"""
return self.logical_to_physical[logical_port]
def is_logical_port(self, port):
if port in self.logical:
return 1
else:
return 0
def get_asic_id_for_logical_port(self, logical_port):
"""Returns the asic_id list of physical ports for the given logical port"""
return self.logical_to_asic.get(logical_port)