Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add queuestat changes for aggregate VOQ counters #3617

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 121 additions & 20 deletions scripts/queuestat
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ import sys
from collections import namedtuple, OrderedDict
from natsort import natsorted
from tabulate import tabulate
from sonic_py_common import multi_asic
from sonic_py_common import multi_asic, device_info
from redis import Redis, exceptions
from swsscommon import swsscommon

# mock the redis for unit test purposes #
try:
Expand Down Expand Up @@ -77,6 +79,39 @@ cnstat_dir = 'N/A'
cnstat_fqn_file = 'N/A'


def get_redis_ips(db):
db.connect(db.STATE_DB)
redis_ips = []
chassis_midplane_table = db.keys(db.STATE_DB, "CHASSIS_MIDPLANE_TABLE*")
lc_metadata = []
for lc in chassis_midplane_table:
lc_metadata.append(db.get_all(db.STATE_DB, lc))

db.connect(db.CHASSIS_STATE_DB)
for lc in lc_metadata:
# skip if LC is offline
if lc['access'] == "False":
continue

slot_id = int(lc['ip_address'].split(".")[2]) - 1
num_asics = db.get(db.CHASSIS_STATE_DB, f"CHASSIS_MODULE_TABLE|LINE-CARD{slot_id}", 'num_asics')

# Skip if pmon hasn't started on LC yet
if num_asics == None:
continue

# No namespace in single ASIC LC
if num_asics == "1":
redis_ips.append(lc['ip_address'])
else:
prefix, _ = lc['ip_address'].rsplit(".", maxsplit=1)
for i in range(int(num_asics)):
prefix, _, _ = lc['ip_address'].rpartition(".")
redis_ips.append(f"{prefix}.{10+i}")

return redis_ips


def build_json(port, cnstat, voq=False):
def ports_stats(k):
p = {}
Expand All @@ -102,6 +137,19 @@ def build_json(port, cnstat, voq=False):
out.update(ports_stats(k))
return out


def run_queuestat(save_fresh_stats, port_to_show_stats, json_opt, non_zero, ns, db, voq):
queuestat = Queuestat(ns, db, voq)
if save_fresh_stats:
queuestat.save_fresh_stats()
return

if port_to_show_stats != None:
queuestat.get_print_port_stat(port_to_show_stats, json_opt, non_zero)
else:
queuestat.get_print_all_stat(json_opt, non_zero)


class QueuestatWrapper(object):
"""A wrapper to execute queuestat cmd over the correct namespaces"""
def __init__(self, namespace, voq):
Expand All @@ -114,22 +162,18 @@ class QueuestatWrapper(object):

@multi_asic_util.run_on_multi_asic
def run(self, save_fresh_stats, port_to_show_stats, json_opt, non_zero):
queuestat = Queuestat(self.multi_asic.current_namespace, self.db, self.voq)
if save_fresh_stats:
queuestat.save_fresh_stats()
return

if port_to_show_stats != None:
queuestat.get_print_port_stat(port_to_show_stats, json_opt, non_zero)
else:
queuestat.get_print_all_stat(json_opt, non_zero)

run_queuestat(save_fresh_stats, port_to_show_stats, json_opt, non_zero, \
self.multi_asic.current_namespace, self.db, self.voq)

class Queuestat(object):
def __init__(self, namespace, db, voq=False):
self.db = db
self.voq = voq
self.voq_stats = {}
self.namespace = namespace
if namespace is None:
self.db = SonicV2Connector(use_unix_socket_path=False)
self.db.connect(self.db.COUNTERS_DB)
self.namespace_str = f" for {namespace}" if namespace else ''

def get_queue_port(table_id):
Expand All @@ -142,7 +186,9 @@ class Queuestat(object):

# Get all ports
if voq:
self.counter_port_name_map = self.db.get_all(self.db.COUNTERS_DB, COUNTERS_SYSTEM_PORT_NAME_MAP)
# counter_port_name_map is assigned later for supervisor as a list
self.counter_port_name_map = [] if device_info.is_supervisor() else \
self.db.get_all(self.db.COUNTERS_DB, COUNTERS_SYSTEM_PORT_NAME_MAP)
else:
self.counter_port_name_map = self.db.get_all(self.db.COUNTERS_DB, COUNTERS_PORT_NAME_MAP)

Expand All @@ -157,6 +203,14 @@ class Queuestat(object):
self.port_queues_map[port] = {}
self.port_name_map[self.counter_port_name_map[port]] = port

if self.voq:
counter_bucket_dict.update(voq_counter_bucket_dict)

if device_info.is_supervisor():
self.aggregate_voq_stats()
self.counter_port_name_map = self.voq_stats.keys()
return

counter_queue_name_map = None
# Get Queues for each port
if voq:
Expand All @@ -172,6 +226,44 @@ class Queuestat(object):
port = self.port_name_map[get_queue_port(counter_queue_name_map[queue])]
self.port_queues_map[port][queue] = counter_queue_name_map[queue]

def aggregate_voq_stats(self):
redis_ips = get_redis_ips(self.db)
self.voq_stats = {}

for ip in redis_ips:
asic_counters_db = Redis(host=ip, db=swsscommon.COUNTERS_DB, decode_responses=True)
try:
counters_voq_name_map = asic_counters_db.hgetall(COUNTERS_VOQ_NAME_MAP)
if counters_voq_name_map is None:
continue
for voq in counters_voq_name_map:
# key LINECARD|ASIC|EthernetXXX:INDEX
sysPort, idx = voq.split(":")
for counter_name in counter_bucket_dict:
self.voq_stats.setdefault(sysPort, {}).setdefault(idx, {}).setdefault(counter_name, 0)
oid = counters_voq_name_map[voq]
counter_data = asic_counters_db.hget("COUNTERS:"+oid, counter_name)
if counter_data is not None:
self.voq_stats[sysPort][idx][counter_name] += int(counter_data)

except exceptions.ConnectionError as e:
# Skip further operations for this redis-instance
continue

def get_aggregate_port_stats(self, port):
# Build a dictionary of stats
cnstat_dict = OrderedDict()
cnstat_dict['time'] = datetime.datetime.now()
for idx in sorted(self.voq_stats[port].keys()):
fields = ["0"]*len(voq_header)
fields[0] = idx
fields[1] = QUEUE_TYPE_VOQ
for counter_name, pos in counter_bucket_dict.items():
fields[pos] = str(self.voq_stats[port][idx][counter_name])
cntr = VoqStats._make(fields)._asdict()
cnstat_dict[port+":"+idx] = cntr
return cnstat_dict

def get_cnstat(self, queue_map):
"""
Get the counters info from database.
Expand Down Expand Up @@ -214,8 +306,6 @@ class Queuestat(object):

counter_dict = {}
counter_dict.update(counter_bucket_dict)
if self.voq:
counter_dict.update(voq_counter_bucket_dict)

for counter_name, pos in counter_dict.items():
full_table_id = COUNTER_TABLE_PREFIX + table_id
Expand Down Expand Up @@ -272,7 +362,8 @@ class Queuestat(object):
else:
hdr = voq_header if self.voq else header
if table:
print(f"For namespace {self.namespace}:")
if not device_info.is_supervisor():
print(f"For namespace {self.namespace}:")
print(tabulate(table, hdr, tablefmt='simple', stralign='right'))
print()

Expand Down Expand Up @@ -345,7 +436,10 @@ class Queuestat(object):
json_output = {}
for port in natsorted(self.counter_port_name_map):
json_output[port] = {}
cnstat_dict = self.get_cnstat(self.port_queues_map[port])
if self.voq and device_info.is_supervisor():
cnstat_dict = self.get_aggregate_port_stats(port)
else:
cnstat_dict = self.get_cnstat(self.port_queues_map[port])

cnstat_fqn_file_name = cnstat_fqn_file + port
if os.path.isfile(cnstat_fqn_file_name):
Expand All @@ -372,12 +466,16 @@ class Queuestat(object):
Get stat for the port
If JSON option is True print data in JSON format
"""
if not port in self.port_queues_map:
if port not in self.port_queues_map and port not in self.voq_stats:
print("Port doesn't exist!", port)
sys.exit(1)

# Get stat for the port queried
cnstat_dict = self.get_cnstat(self.port_queues_map[port])
if self.voq and device_info.is_supervisor():
cnstat_dict = self.get_aggregate_port_stats(port)
else:
cnstat_dict = self.get_cnstat(self.port_queues_map[port])

cnstat_fqn_file_name = cnstat_fqn_file + port
json_output = {}
json_output[port] = {}
Expand Down Expand Up @@ -449,8 +547,11 @@ def main(port, clear, delete, json_opt, voq, non_zero, namespace):
if delete_stats:
cache.remove()

queuestat_wrapper = QueuestatWrapper(namespace, voq)
queuestat_wrapper.run(save_fresh_stats, port_to_show_stats, json_opt, non_zero)
if device_info.is_supervisor() and namespace is None:
run_queuestat(save_fresh_stats, port_to_show_stats, json_opt, non_zero, namespace, None, voq)
else:
queuestat_wrapper = QueuestatWrapper(namespace, voq)
queuestat_wrapper.run(save_fresh_stats, port_to_show_stats, json_opt, non_zero)

sys.exit(0)

Expand Down
Loading