diff --git a/orchagent/Makefile.am b/orchagent/Makefile.am index fe94cc4ae865..0b7f9265faed 100644 --- a/orchagent/Makefile.am +++ b/orchagent/Makefile.am @@ -8,7 +8,9 @@ dist_swss_DATA = \ pfc_detect_mellanox.lua \ pfc_detect_broadcom.lua \ pfc_detect_barefoot.lua \ - pfc_restore.lua + pfc_restore.lua \ + watermark_queue.lua \ + watermark_pg.lua bin_PROGRAMS = orchagent routeresync orchagent_restart_check @@ -46,6 +48,7 @@ orchagent_SOURCES = \ vnetorch.cpp \ dtelorch.cpp \ flexcounterorch.cpp \ + watermarkorch.cpp \ acltable.h \ aclorch.h \ bufferorch.h \ @@ -76,7 +79,8 @@ orchagent_SOURCES = \ countercheckorch.h \ vxlanorch.h \ vnetorch.h \ - flexcounterorch.h + flexcounterorch.h \ + watermarkorch.h orchagent_CFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_SAI) orchagent_CPPFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_SAI) diff --git a/orchagent/flexcounterorch.cpp b/orchagent/flexcounterorch.cpp index 5aaec6b986b5..b844fce6df7b 100644 --- a/orchagent/flexcounterorch.cpp +++ b/orchagent/flexcounterorch.cpp @@ -16,6 +16,8 @@ unordered_map flexCounterGroupMap = {"PORT", PORT_STAT_COUNTER_FLEX_COUNTER_GROUP}, {"QUEUE", QUEUE_STAT_COUNTER_FLEX_COUNTER_GROUP}, {"PFCWD", PFC_WD_FLEX_COUNTER_GROUP}, + {"QUEUE_WATERMARK", QUEUE_WATERMARK_STAT_COUNTER_FLEX_COUNTER_GROUP}, + {"PG_WATERMARK", PG_WATERMARK_STAT_COUNTER_FLEX_COUNTER_GROUP}, }; @@ -75,6 +77,7 @@ void FlexCounterOrch::doTask(Consumer &consumer) // Currently the counters are disabled by default // The queue maps will be generated as soon as counters are enabled gPortsOrch->generateQueueMap(); + gPortsOrch->generatePriorityGroupMap(); vector fieldValues; fieldValues.emplace_back(FLEX_COUNTER_STATUS_FIELD, value); diff --git a/orchagent/orchdaemon.cpp b/orchagent/orchdaemon.cpp index 1a103ed3f6e9..856bb03e6689 100644 --- a/orchagent/orchdaemon.cpp +++ b/orchagent/orchdaemon.cpp @@ -138,6 +138,8 @@ bool OrchDaemon::init() CFG_DTEL_EVENT_TABLE_NAME }; + WatermarkOrch *wm_orch = new WatermarkOrch(m_configDb, CFG_WATERMARK_TABLE_NAME); + /* * The order of the orch list is important for state restore of warm start and * the queued processing in m_toSync map after gPortsOrch->isInitDone() is set. @@ -146,7 +148,8 @@ bool OrchDaemon::init() * when iterating ConsumerMap. * That is ensured implicitly by the order of map key, "LAG_TABLE" is smaller than "VLAN_TABLE" in lexicographic order. */ - m_orchList = { gSwitchOrch, gCrmOrch, gBufferOrch, gPortsOrch, gIntfsOrch, gNeighOrch, gRouteOrch, copp_orch, tunnel_decap_orch, qos_orch}; + m_orchList = { gSwitchOrch, gCrmOrch, gBufferOrch, gPortsOrch, gIntfsOrch, gNeighOrch, gRouteOrch, copp_orch, tunnel_decap_orch, qos_orch, wm_orch }; + bool initialize_dtel = false; if (platform == BFN_PLATFORM_SUBSTRING || platform == VS_PLATFORM_SUBSTRING) diff --git a/orchagent/orchdaemon.h b/orchagent/orchdaemon.h index b398dba05833..09342088b1e6 100644 --- a/orchagent/orchdaemon.h +++ b/orchagent/orchdaemon.h @@ -25,6 +25,7 @@ #include "vnetorch.h" #include "countercheckorch.h" #include "flexcounterorch.h" +#include "watermarkorch.h" #include "directory.h" using namespace swss; diff --git a/orchagent/pfcwdorch.cpp b/orchagent/pfcwdorch.cpp index 58bb35125de2..ae2c50a2d853 100644 --- a/orchagent/pfcwdorch.cpp +++ b/orchagent/pfcwdorch.cpp @@ -650,6 +650,7 @@ PfcWdSwOrch::PfcWdSwOrch( vector fieldValues; fieldValues.emplace_back(QUEUE_PLUGIN_FIELD, detectSha + "," + restoreSha); fieldValues.emplace_back(POLL_INTERVAL_FIELD, to_string(m_pollInterval)); + fieldValues.emplace_back(STATS_MODE_FIELD, STATS_MODE_READ); m_flexCounterGroupTable->set(PFC_WD_FLEX_COUNTER_GROUP, fieldValues); } catch (...) diff --git a/orchagent/portsorch.cpp b/orchagent/portsorch.cpp index e907606d0106..24a646363d7d 100644 --- a/orchagent/portsorch.cpp +++ b/orchagent/portsorch.cpp @@ -16,6 +16,7 @@ #include "logger.h" #include "schema.h" +#include "redisapi.h" #include "converter.h" #include "sai_serialize.h" #include "crmorch.h" @@ -40,6 +41,9 @@ extern BufferOrch *gBufferOrch; #define DEFAULT_VLAN_ID 1 #define PORT_FLEX_STAT_COUNTER_POLL_MSECS "1000" #define QUEUE_FLEX_STAT_COUNTER_POLL_MSECS "10000" +#define QUEUE_WATERMARK_FLEX_STAT_COUNTER_POLL_MSECS "1000" +#define PG_WATERMARK_FLEX_STAT_COUNTER_POLL_MSECS "1000" + static map fec_mode_map = { @@ -105,6 +109,17 @@ static const vector queueStatIds = SAI_QUEUE_STAT_DROPPED_BYTES, }; +static const vector queueWatermarkStatIds = +{ + SAI_QUEUE_STAT_SHARED_WATERMARK_BYTES, +}; + +static const vector ingressPriorityGroupWatermarkStatIds = +{ + SAI_INGRESS_PRIORITY_GROUP_STAT_XOFF_ROOM_WATERMARK_BYTES, + SAI_INGRESS_PRIORITY_GROUP_STAT_SHARED_WATERMARK_BYTES, +}; + static char* hostif_vlan_tag[] = { [SAI_HOSTIF_VLAN_TAG_STRIP] = "SAI_HOSTIF_VLAN_TAG_STRIP", [SAI_HOSTIF_VLAN_TAG_KEEP] = "SAI_HOSTIF_VLAN_TAG_KEEP", @@ -141,17 +156,53 @@ PortsOrch::PortsOrch(DBConnector *db, vector &tableNames) m_queueIndexTable = unique_ptr(new Table(m_counter_db.get(), COUNTERS_QUEUE_INDEX_MAP)); m_queueTypeTable = unique_ptr
(new Table(m_counter_db.get(), COUNTERS_QUEUE_TYPE_MAP)); + /* Initialize ingress priority group tables */ + m_pgTable = unique_ptr
(new Table(m_counter_db.get(), COUNTERS_PG_NAME_MAP)); + m_pgPortTable = unique_ptr
(new Table(m_counter_db.get(), COUNTERS_PG_PORT_MAP)); + m_pgIndexTable = unique_ptr
(new Table(m_counter_db.get(), COUNTERS_PG_INDEX_MAP)); + m_flex_db = shared_ptr(new DBConnector(FLEX_COUNTER_DB, DBConnector::DEFAULT_UNIXSOCKET, 0)); m_flexCounterTable = unique_ptr(new ProducerTable(m_flex_db.get(), FLEX_COUNTER_TABLE)); m_flexCounterGroupTable = unique_ptr(new ProducerTable(m_flex_db.get(), FLEX_COUNTER_GROUP_TABLE)); vector fields; fields.emplace_back(POLL_INTERVAL_FIELD, PORT_FLEX_STAT_COUNTER_POLL_MSECS); + fields.emplace_back(STATS_MODE_FIELD, STATS_MODE_READ); m_flexCounterGroupTable->set(PORT_STAT_COUNTER_FLEX_COUNTER_GROUP, fields); fields.emplace_back(POLL_INTERVAL_FIELD, QUEUE_FLEX_STAT_COUNTER_POLL_MSECS); + fields.emplace_back(STATS_MODE_FIELD, STATS_MODE_READ); m_flexCounterGroupTable->set(QUEUE_STAT_COUNTER_FLEX_COUNTER_GROUP, fields); + string queueWmSha, pgWmSha; + string queueWmPluginName = "watermark_queue.lua"; + string pgWmPluginName = "watermark_pg.lua"; + + try + { + string queueLuaScript = swss::loadLuaScript(queueWmPluginName); + queueWmSha = swss::loadRedisScript(m_counter_db.get(), queueLuaScript); + + string pgLuaScript = swss::loadLuaScript(pgWmPluginName); + pgWmSha = swss::loadRedisScript(m_counter_db.get(), pgLuaScript); + + vector fieldValues; + fieldValues.emplace_back(QUEUE_PLUGIN_FIELD, queueWmSha); + fieldValues.emplace_back(POLL_INTERVAL_FIELD, QUEUE_WATERMARK_FLEX_STAT_COUNTER_POLL_MSECS); + fieldValues.emplace_back(STATS_MODE_FIELD, STATS_MODE_READ_AND_CLEAR); + m_flexCounterGroupTable->set(QUEUE_WATERMARK_STAT_COUNTER_FLEX_COUNTER_GROUP, fieldValues); + + fieldValues.clear(); + fieldValues.emplace_back(PG_PLUGIN_FIELD, pgWmSha); + fieldValues.emplace_back(POLL_INTERVAL_FIELD, PG_WATERMARK_FLEX_STAT_COUNTER_POLL_MSECS); + fieldValues.emplace_back(STATS_MODE_FIELD, STATS_MODE_READ_AND_CLEAR); + m_flexCounterGroupTable->set(PG_WATERMARK_STAT_COUNTER_FLEX_COUNTER_GROUP, fieldValues); + } + catch (...) + { + SWSS_LOG_WARN("Watermark flex counter groups were not set successfully"); + } + uint32_t i, j; sai_status_t status; sai_attribute_t attr; @@ -1248,6 +1299,16 @@ string PortsOrch::getQueueFlexCounterTableKey(string key) return string(QUEUE_STAT_COUNTER_FLEX_COUNTER_GROUP) + ":" + key; } +string PortsOrch::getQueueWatermarkFlexCounterTableKey(string key) +{ + return string(QUEUE_WATERMARK_STAT_COUNTER_FLEX_COUNTER_GROUP) + ":" + key; +} + +string PortsOrch::getPriorityGroupWatermarkFlexCounterTableKey(string key) +{ + return string(PG_WATERMARK_STAT_COUNTER_FLEX_COUNTER_GROUP) + ":" + key; +} + bool PortsOrch::initPort(const string &alias, const set &lane_set) { SWSS_LOG_ENTER(); @@ -1287,7 +1348,7 @@ bool PortsOrch::initPort(const string &alias, const set &lane_set) for (const auto &id: portStatIds) { counters_stream << delimiter << sai_serialize_port_stat(id); - delimiter = ","; + delimiter = comma; } fields.clear(); @@ -2873,6 +2934,7 @@ void PortsOrch::generateQueueMapPerPort(const Port& port) queueIndexVector.emplace_back(id, to_string(queueRealIndex)); } + /* add ordinary Queue stat counters */ string key = getQueueFlexCounterTableKey(id); std::string delimiter = ""; @@ -2880,13 +2942,29 @@ void PortsOrch::generateQueueMapPerPort(const Port& port) for (const auto& it: queueStatIds) { counters_stream << delimiter << sai_serialize_queue_stat(it); - delimiter = ","; + delimiter = comma; } vector fieldValues; fieldValues.emplace_back(QUEUE_COUNTER_ID_LIST, counters_stream.str()); m_flexCounterTable->set(key, fieldValues); + + /* add watermark queue counters */ + key = getQueueWatermarkFlexCounterTableKey(id); + + delimiter = ""; + counters_stream.str(""); + for (const auto& it: queueWatermarkStatIds) + { + counters_stream << delimiter << sai_serialize_queue_stat(it); + delimiter = comma; + } + + fieldValues.clear(); + fieldValues.emplace_back(QUEUE_COUNTER_ID_LIST, counters_stream.str()); + + m_flexCounterTable->set(key, fieldValues); } m_queueTable->set("", queueVector); @@ -2897,6 +2975,67 @@ void PortsOrch::generateQueueMapPerPort(const Port& port) CounterCheckOrch::getInstance().addPort(port); } +void PortsOrch::generatePriorityGroupMap() +{ + if (m_isPriorityGroupMapGenerated) + { + return; + } + + for (const auto& it: m_portList) + { + if (it.second.m_type == Port::PHY) + { + generatePriorityGroupMapPerPort(it.second); + } + } + + m_isPriorityGroupMapGenerated = true; +} + +void PortsOrch::generatePriorityGroupMapPerPort(const Port& port) +{ + /* Create the PG map in the Counter DB */ + /* Add stat counters to flex_counter */ + vector pgVector; + vector pgPortVector; + vector pgIndexVector; + + for (size_t pgIndex = 0; pgIndex < port.m_priority_group_ids.size(); ++pgIndex) + { + std::ostringstream name; + name << port.m_alias << ":" << pgIndex; + + const auto id = sai_serialize_object_id(port.m_priority_group_ids[pgIndex]); + + pgVector.emplace_back(name.str(), id); + pgPortVector.emplace_back(id, sai_serialize_object_id(port.m_port_id)); + pgIndexVector.emplace_back(id, to_string(pgIndex)); + + string key = getPriorityGroupWatermarkFlexCounterTableKey(id); + + std::string delimiter = ""; + std::ostringstream counters_stream; + /* Add watermark counters to flex_counter */ + for (const auto& it: ingressPriorityGroupWatermarkStatIds) + { + counters_stream << delimiter << sai_serialize_ingress_priority_group_stat(it); + delimiter = comma; + } + + vector fieldValues; + fieldValues.emplace_back(PG_COUNTER_ID_LIST, counters_stream.str()); + + m_flexCounterTable->set(key, fieldValues); + } + + m_pgTable->set("", pgVector); + m_pgPortTable->set("", pgPortVector); + m_pgIndexTable->set("", pgIndexVector); + + CounterCheckOrch::getInstance().addPort(port); +} + void PortsOrch::doTask(NotificationConsumer &consumer) { SWSS_LOG_ENTER(); diff --git a/orchagent/portsorch.h b/orchagent/portsorch.h index d7ef5c366cce..d55485c7a35b 100644 --- a/orchagent/portsorch.h +++ b/orchagent/portsorch.h @@ -14,6 +14,8 @@ #define VLAN_TAG_LEN 4 #define PORT_STAT_COUNTER_FLEX_COUNTER_GROUP "PORT_STAT_COUNTER" #define QUEUE_STAT_COUNTER_FLEX_COUNTER_GROUP "QUEUE_STAT_COUNTER" +#define QUEUE_WATERMARK_STAT_COUNTER_FLEX_COUNTER_GROUP "QUEUE_WATERMARK_STAT_COUNTER" +#define PG_WATERMARK_STAT_COUNTER_FLEX_COUNTER_GROUP "PG_WATERMARK_STAT_COUNTER" typedef std::vector PortSupportedSpeeds; @@ -76,7 +78,10 @@ class PortsOrch : public Orch, public Subject bool setPortPfc(sai_object_id_t portId, uint8_t pfc_bitmask); void generateQueueMap(); + void generatePriorityGroupMap(); + void refreshPortStatus(); + private: unique_ptr
m_counterTable; unique_ptr
m_portTable; @@ -84,11 +89,16 @@ class PortsOrch : public Orch, public Subject unique_ptr
m_queuePortTable; unique_ptr
m_queueIndexTable; unique_ptr
m_queueTypeTable; + unique_ptr
m_pgTable; + unique_ptr
m_pgPortTable; + unique_ptr
m_pgIndexTable; unique_ptr m_flexCounterTable; unique_ptr m_flexCounterGroupTable; std::string getQueueFlexCounterTableKey(std::string s); + std::string getQueueWatermarkFlexCounterTableKey(std::string s); std::string getPortFlexCounterTableKey(std::string s); + std::string getPriorityGroupWatermarkFlexCounterTableKey(std::string s); shared_ptr m_counter_db; shared_ptr m_flex_db; @@ -166,6 +176,9 @@ class PortsOrch : public Orch, public Subject bool m_isQueueMapGenerated = false; void generateQueueMapPerPort(const Port& port); + bool m_isPriorityGroupMapGenerated = false; + void generatePriorityGroupMapPerPort(const Port& port); + bool setPortAutoNeg(sai_object_id_t id, int an); bool setPortFecMode(sai_object_id_t id, int fec); diff --git a/orchagent/watermark_pg.lua b/orchagent/watermark_pg.lua new file mode 100644 index 000000000000..cdbe0c450d22 --- /dev/null +++ b/orchagent/watermark_pg.lua @@ -0,0 +1,49 @@ +-- KEYS - queue IDs +-- ARGV[1] - counters db index +-- ARGV[2] - counters table name +-- ARGV[3] - poll time interval +-- return nothing for now + +local counters_db = ARGV[1] +local counters_table_name = "COUNTERS" + +local periodic_table_name = "PERIODIC_WATERMARKS" +local persistent_table_name = "PERSISTENT_WATERMARKS" +local user_table_name = "USER_WATERMARKS" + +local rets = {} + +redis.call('SELECT', counters_db) + +-- Iterate through each queue +local n = table.getn(KEYS) +for i = n, 1, -1 do + -- Gen new WM value + local pg_shared_wm = redis.call('HGET', counters_table_name .. ':' .. KEYS[i], 'SAI_INGRESS_PRIORITY_GROUP_STAT_SHARED_WATERMARK_BYTES') + local pg_headroom_wm = redis.call('HGET', counters_table_name .. ':' .. KEYS[i], 'SAI_INGRESS_PRIORITY_GROUP_STAT_XOFF_ROOM_WATERMARK_BYTES') + + -- Get the last values in the other tables (assume 0 if does not exist) + local periodic_shared_wm = redis.call('HGET', periodic_table_name .. ':' .. KEYS[i], 'SAI_INGRESS_PRIORITY_GROUP_STAT_SHARED_WATERMARK_BYTES') + local persistent_shared_wm = redis.call('HGET', persistent_table_name .. ':' .. KEYS[i], 'SAI_INGRESS_PRIORITY_GROUP_STAT_SHARED_WATERMARK_BYTES') + local user_shared_wm = redis.call('HGET', user_table_name .. ':' .. KEYS[i], 'SAI_INGRESS_PRIORITY_GROUP_STAT_SHARED_WATERMARK_BYTES') + + local periodic_headroom_wm = redis.call('HGET', periodic_table_name .. ':' .. KEYS[i], 'SAI_INGRESS_PRIORITY_GROUP_STAT_XOFF_ROOM_WATERMARK_BYTES') + local persistent_headroom_wm = redis.call('HGET', persistent_table_name .. ':' .. KEYS[i], 'SAI_INGRESS_PRIORITY_GROUP_STAT_XOFF_ROOM_WATERMARK_BYTES') + local user_headroom_wm = redis.call('HGET', user_table_name .. ':' .. KEYS[i], 'SAI_INGRESS_PRIORITY_GROUP_STAT_XOFF_ROOM_WATERMARK_BYTES') + + -- Set the values into the other tables. Make comparioson, if th evalue was absent in COUNTERS, set to N/A + if (pg_shared_wm) then + redis.call('HSET', periodic_table_name .. ':' .. KEYS[i], 'SAI_INGRESS_PRIORITY_GROUP_STAT_SHARED_WATERMARK_BYTES', periodic_shared_wm and math.max(tonumber(pg_shared_wm), tonumber(periodic_shared_wm)) or pg_shared_wm) + redis.call('HSET', persistent_table_name .. ':' .. KEYS[i], 'SAI_INGRESS_PRIORITY_GROUP_STAT_SHARED_WATERMARK_BYTES', persistent_shared_wm and math.max(tonumber(pg_shared_wm), tonumber(persistent_shared_wm)) or pg_shared_wm) + redis.call('HSET', user_table_name .. ':' .. KEYS[i], 'SAI_INGRESS_PRIORITY_GROUP_STAT_SHARED_WATERMARK_BYTES', user_shared_wm and math.max(tonumber(pg_shared_wm), tonumber(user_shared_wm)) or pg_shared_wm) + end + + if (pg_headroom_wm) then + redis.call('HSET', periodic_table_name .. ':' .. KEYS[i], 'SAI_INGRESS_PRIORITY_GROUP_STAT_XOFF_ROOM_WATERMARK_BYTES', periodic_headroom_wm and math.max(tonumber(pg_headroom_wm), tonumber(periodic_headroom_wm)) or pg_headroom_wm) + redis.call('HSET', persistent_table_name .. ':' .. KEYS[i], 'SAI_INGRESS_PRIORITY_GROUP_STAT_XOFF_ROOM_WATERMARK_BYTES', persistent_headroom_wm and math.max(tonumber(pg_headroom_wm), tonumber(persistent_headroom_wm)) or pg_headroom_wm) + redis.call('HSET', user_table_name .. ':' .. KEYS[i], 'SAI_INGRESS_PRIORITY_GROUP_STAT_XOFF_ROOM_WATERMARK_BYTES', user_headroom_wm and math.max(tonumber(pg_headroom_wm), tonumber(user_headroom_wm)) or pg_headroom_wm) + end + +end + +return rets diff --git a/orchagent/watermark_queue.lua b/orchagent/watermark_queue.lua new file mode 100644 index 000000000000..7cbc30c3eee7 --- /dev/null +++ b/orchagent/watermark_queue.lua @@ -0,0 +1,37 @@ +-- KEYS - queue IDs +-- ARGV[1] - counters db index +-- ARGV[2] - counters table name +-- ARGV[3] - poll time interval +-- return nothing for now + +local counters_db = ARGV[1] +local counters_table_name = "COUNTERS" + +local periodic_table_name = "PERIODIC_WATERMARKS" +local persistent_table_name = "PERSISTENT_WATERMARKS" +local user_table_name = "USER_WATERMARKS" + +local rets = {} + +redis.call('SELECT', counters_db) + +-- Iterate through each queue +local n = table.getn(KEYS) +for i = n, 1, -1 do + -- Gen new WM value + local queue_shared_wm = redis.call('HGET', counters_table_name .. ':' .. KEYS[i], 'SAI_QUEUE_STAT_SHARED_WATERMARK_BYTES') + + -- Get the last values in the other tables (assume 0 if does not exist) + local periodic_shared_wm = redis.call('HGET', periodic_table_name .. ':' .. KEYS[i], 'SAI_QUEUE_STAT_SHARED_WATERMARK_BYTES') + local persistent_shared_wm = redis.call('HGET', persistent_table_name .. ':' .. KEYS[i], 'SAI_QUEUE_STAT_SHARED_WATERMARK_BYTES') + local user_shared_wm = redis.call('HGET', user_table_name .. ':' .. KEYS[i], 'SAI_QUEUE_STAT_SHARED_WATERMARK_BYTES') + + -- Set the values into the other tables. Make comparioson, if th evalue was absent in COUNTERS, set to N/A + if tonumber(queue_shared_wm) then + redis.call('HSET', periodic_table_name .. ':' .. KEYS[i], 'SAI_QUEUE_STAT_SHARED_WATERMARK_BYTES', periodic_shared_wm and math.max(queue_shared_wm, periodic_shared_wm) or queue_shared_wm) + redis.call('HSET', persistent_table_name .. ':' .. KEYS[i], 'SAI_QUEUE_STAT_SHARED_WATERMARK_BYTES', persistent_shared_wm and math.max(queue_shared_wm, persistent_shared_wm) or queue_shared_wm) + redis.call('HSET', user_table_name .. ':' .. KEYS[i], 'SAI_QUEUE_STAT_SHARED_WATERMARK_BYTES', user_shared_wm and math.max(queue_shared_wm, user_shared_wm) or queue_shared_wm) + end +end + +return rets diff --git a/orchagent/watermarkorch.cpp b/orchagent/watermarkorch.cpp new file mode 100644 index 000000000000..4e4bda916fa8 --- /dev/null +++ b/orchagent/watermarkorch.cpp @@ -0,0 +1,224 @@ +#include "watermarkorch.h" +#include "sai_serialize.h" +#include "portsorch.h" +#include "notifier.h" +#include "converter.h" + +#define DEFAULT_TELEMETRY_INTERVAL 120 + +#define CLEAR_PG_HEADROOM_REQUEST "PG_HEADROOM" +#define CLEAR_PG_SHARED_REQUEST "PG_SHARED" +#define CLEAR_QUEUE_SHARED_UNI_REQUEST "Q_SHARED_UNI" +#define CLEAR_QUEUE_SHARED_MULTI_REQUEST "Q_SHARED_MULTI" + +extern PortsOrch *gPortsOrch; + + +WatermarkOrch::WatermarkOrch(DBConnector *db, const string tableName): + Orch(db, tableName) +{ + SWSS_LOG_ENTER(); + + m_countersDb = make_shared(COUNTERS_DB, DBConnector::DEFAULT_UNIXSOCKET, 0); + m_appDb = make_shared(APPL_DB, DBConnector::DEFAULT_UNIXSOCKET, 0); + m_countersTable = make_shared
(m_countersDb.get(), COUNTERS_TABLE); + m_periodicWatermarkTable = make_shared
(m_countersDb.get(), PERIODIC_WATERMARKS_TABLE); + m_persistentWatermarkTable = make_shared
(m_countersDb.get(), PERSISTENT_WATERMARKS_TABLE); + m_userWatermarkTable = make_shared
(m_countersDb.get(), USER_WATERMARKS_TABLE); + + m_clearNotificationConsumer = new swss::NotificationConsumer( + m_appDb.get(), + "WATERMARK_CLEAR_REQUEST"); + auto clearNotifier = new Notifier(m_clearNotificationConsumer, this, "WM_CLEAR_NOTIFIER"); + Orch::addExecutor(clearNotifier); + + auto intervT = timespec { .tv_sec = DEFAULT_TELEMETRY_INTERVAL , .tv_nsec = 0 }; + m_telemetryTimer = new SelectableTimer(intervT); + auto executorT = new ExecutableTimer(m_telemetryTimer, this, "WM_TELEMETRY_TIMER"); + Orch::addExecutor(executorT); + m_telemetryTimer->start(); + + m_telemetryInterval = DEFAULT_TELEMETRY_INTERVAL; +} + +WatermarkOrch::~WatermarkOrch() +{ + SWSS_LOG_ENTER(); +} + +void WatermarkOrch::doTask(Consumer &consumer) +{ + SWSS_LOG_ENTER(); + + if (!gPortsOrch->isInitDone()) + { + return; + } + + auto it = consumer.m_toSync.begin(); + while (it != consumer.m_toSync.end()) + { + KeyOpFieldsValuesTuple t = it->second; + + string key = kfvKey(t); + string op = kfvOp(t); + std::vector fvt = kfvFieldsValues(t); + + if (op == SET_COMMAND) + { + if (key == "TELEMETRY_INTERVAL") + { + for (std::pair, std::basic_string > i: fvt) + { + if (i.first == "interval") + { + m_telemetryInterval = to_uint(i.second.c_str()); + } + else + { + SWSS_LOG_WARN("Unsupported key: %s", i.first.c_str()); + } + } + } + } + else if (op == DEL_COMMAND) + { + SWSS_LOG_WARN("Unsupported op %s", op.c_str()); + } + else + { + SWSS_LOG_ERROR("Unknown operation type %s\n", op.c_str()); + } + + consumer.m_toSync.erase(it++); + } +} + +void WatermarkOrch::doTask(NotificationConsumer &consumer) +{ + if (!gPortsOrch->isInitDone()) + { + return; + } + + std::string op; + std::string data; + std::vector values; + + consumer.pop(op, data, values); + + if(&consumer == m_clearNotificationConsumer){ + string wm_name = ""; + vector &obj_ids = m_pg_ids; + if(data == CLEAR_PG_HEADROOM_REQUEST) + { + wm_name = "SAI_INGRESS_PRIORITY_GROUP_STAT_XOFF_ROOM_WATERMARK_BYTES"; + obj_ids = m_pg_ids; + } + else if(data == CLEAR_PG_SHARED_REQUEST) + { + wm_name = "SAI_INGRESS_PRIORITY_GROUP_STAT_SHARED_WATERMARK_BYTES"; + obj_ids = m_pg_ids; + } + else if(data == CLEAR_QUEUE_SHARED_UNI_REQUEST) + { + wm_name = "SAI_QUEUE_STAT_SHARED_WATERMARK_BYTES"; + obj_ids = m_unicast_queue_ids; + } + else if(data == CLEAR_QUEUE_SHARED_MULTI_REQUEST) + { + wm_name = "SAI_QUEUE_STAT_SHARED_WATERMARK_BYTES"; + obj_ids = m_multicast_queue_ids; + } + if(op == "USER") + { + clearSingleWm(m_userWatermarkTable.get(), wm_name, obj_ids); + } + else if (op == "PERSISTENT") + { + clearSingleWm(m_persistentWatermarkTable.get(), wm_name, obj_ids); + } + else + { + SWSS_LOG_WARN("Unknown watermark clear request op: %s", op.c_str()); + } + } +} + +void WatermarkOrch::doTask(SelectableTimer &timer) +{ + SWSS_LOG_ENTER(); + + if (m_pg_ids.empty() or m_multicast_queue_ids.empty() or m_unicast_queue_ids.empty()) + { + init_pg_ids(); + init_queue_ids(); + } + + if (&timer == m_telemetryTimer) + { + /* If the interval was changed */ + auto intervT = timespec { .tv_sec = m_telemetryInterval , .tv_nsec = 0 }; + m_telemetryTimer->setInterval(intervT); + m_telemetryTimer->reset(); + + /* TODO: replace with removing all PG and queue entries? */ + clearSingleWm(m_periodicWatermarkTable.get(), "SAI_INGRESS_PRIORITY_GROUP_STAT_XOFF_ROOM_WATERMARK_BYTES", m_pg_ids); + clearSingleWm(m_periodicWatermarkTable.get(), "SAI_INGRESS_PRIORITY_GROUP_STAT_SHARED_WATERMARK_BYTES", m_pg_ids); + clearSingleWm(m_periodicWatermarkTable.get(), "SAI_QUEUE_STAT_SHARED_WATERMARK_BYTES", m_unicast_queue_ids); + clearSingleWm(m_periodicWatermarkTable.get(), "SAI_QUEUE_STAT_SHARED_WATERMARK_BYTES", m_multicast_queue_ids); + SWSS_LOG_INFO("Periodic watermark cleared by timer!"); + } + +} + +void WatermarkOrch::init_pg_ids() +{ + SWSS_LOG_ENTER(); + std::vector values; + Table pg_index_table(m_countersDb.get(), COUNTERS_PG_INDEX_MAP); + pg_index_table.get("", values); + for (auto fv: values) + { + sai_object_id_t id; + sai_deserialize_object_id(fv.first, id); + m_pg_ids.push_back(id); + } +} + +void WatermarkOrch::init_queue_ids() +{ + SWSS_LOG_ENTER(); + std::vector values; + Table m_queue_type_table(m_countersDb.get(), COUNTERS_QUEUE_TYPE_MAP); + m_queue_type_table.get("", values); + for (auto fv: values) + { + sai_object_id_t id; + sai_deserialize_object_id(fv.first, id); + if (fv.second == "SAI_QUEUE_TYPE_UNICAST") + { + m_unicast_queue_ids.push_back(id); + } + else + { + m_multicast_queue_ids.push_back(id); + } + } +} + +void WatermarkOrch::clearSingleWm(Table *table, string wm_name, vector &obj_ids) +{ + /* Zero-out some WM in some table for some vector of object ids*/ + SWSS_LOG_ENTER(); + SWSS_LOG_DEBUG("clear WM %s, for %ld obj ids", wm_name.c_str(), obj_ids.size()); + + vector vfvt = {{wm_name, "0"}}; + + for (sai_object_id_t id: obj_ids) + { + table->set(sai_serialize_object_id(id), vfvt); + } +} + + diff --git a/orchagent/watermarkorch.h b/orchagent/watermarkorch.h new file mode 100644 index 000000000000..b2a9847debdd --- /dev/null +++ b/orchagent/watermarkorch.h @@ -0,0 +1,54 @@ +#ifndef WATERMARKORCH_H +#define WATERMARKORCH_H + +#include "orch.h" +#include "port.h" + +#include "notificationconsumer.h" +#include "timer.h" + + +class WatermarkOrch : public Orch +{ +public: + WatermarkOrch(DBConnector *db, const std::string tableName); + virtual ~WatermarkOrch(void); + + void doTask(Consumer &consumer); + void doTask(NotificationConsumer &consumer); + void doTask(SelectableTimer &timer); + + void init_pg_ids(); + void init_queue_ids(); + + void clearSingleWm(Table *table, string wm_name, vector &obj_ids); + + shared_ptr
getCountersTable(void) + { + return m_countersTable; + } + + shared_ptr getCountersDb(void) + { + return m_countersDb; + } + +private: + shared_ptr m_countersDb = nullptr; + shared_ptr m_appDb = nullptr; + shared_ptr
m_countersTable = nullptr; + shared_ptr
m_periodicWatermarkTable = nullptr; + shared_ptr
m_persistentWatermarkTable = nullptr; + shared_ptr
m_userWatermarkTable = nullptr; + + NotificationConsumer* m_clearNotificationConsumer = nullptr; + SelectableTimer* m_telemetryTimer = nullptr; + + vector m_unicast_queue_ids; + vector m_multicast_queue_ids; + vector m_pg_ids; + + int m_telemetryInterval; +}; + +#endif // WATERMARKORCH_H diff --git a/tests/test_watermark.py b/tests/test_watermark.py new file mode 100644 index 000000000000..a3dc8ecf0fc2 --- /dev/null +++ b/tests/test_watermark.py @@ -0,0 +1,214 @@ +from swsscommon import swsscommon +import os +import re +import time +import json +import redis + + +class SaiWmStats: + queue_shared = "SAI_QUEUE_STAT_SHARED_WATERMARK_BYTES" + pg_shared = "SAI_INGRESS_PRIORITY_GROUP_STAT_SHARED_WATERMARK_BYTES" + pg_headroom = "SAI_INGRESS_PRIORITY_GROUP_STAT_XOFF_ROOM_WATERMARK_BYTES" + + +class WmTables: + persistent = "PERSISTENT_WATERMARKS" + periodic = "PERIODIC_WATERMARKS" + user = "USER_WATERMARKS" + + +class TestWatermark(object): + + DEFAULT_TELEMETRY_INTERVAL = 120 + NEW_INTERVAL = 5 + + def enable_unittests(self, dvs, status): + db = swsscommon.DBConnector(swsscommon.ASIC_DB, dvs.redis_sock, 0) + ntf = swsscommon.NotificationProducer(db, "SAI_VS_UNITTEST_CHANNEL") + fvp = swsscommon.FieldValuePairs() + ntf.send("enable_unittests", status, fvp) + + def set_counter(self, dvs, obj_type, obj_id, attr, val): + + db = swsscommon.DBConnector(swsscommon.ASIC_DB, dvs.redis_sock, 0) + ntf = swsscommon.NotificationProducer(db, "SAI_VS_UNITTEST_CHANNEL") + + r = redis.Redis(unix_socket_path=dvs.redis_sock, db=swsscommon.ASIC_DB) + rid = r.hget("VIDTORID", obj_id) + + assert rid is not None + + fvp = swsscommon.FieldValuePairs([(attr, val)]) + key = rid + + ntf.send("set_stats", key, fvp) + + def populate_asic(self, dvs, obj_type, attr, val): + + db = swsscommon.DBConnector(swsscommon.ASIC_DB, dvs.redis_sock, 0) + + oids = self.qs if obj_type == "SAI_OBJECT_TYPE_QUEUE" else self.pgs + + for obj_id in oids: + self.set_counter(dvs, obj_type, obj_id, attr, val) + + def populate_asic_all(self, dvs, val): + self.populate_asic(dvs, "SAI_OBJECT_TYPE_QUEUE", SaiWmStats.queue_shared, val) + self.populate_asic(dvs, "SAI_OBJECT_TYPE_INGRESS_PRIORITY_GROUP", SaiWmStats.pg_shared, val) + self.populate_asic(dvs, "SAI_OBJECT_TYPE_INGRESS_PRIORITY_GROUP", SaiWmStats.pg_headroom, val) + time.sleep(1) + + def verify_value(self, dvs, obj_ids, table_name, watermark_name, expected_value): + + counters_db = swsscommon.DBConnector(swsscommon.COUNTERS_DB, dvs.redis_sock, 0) + table = swsscommon.Table(counters_db, table_name) + + for obj_id in obj_ids: + + ret = table.get(obj_id) + + status = ret[0] + assert status + keyvalues = ret[1] + found = False + for key, value in keyvalues: + if key == watermark_name: + assert value == expected_value + found = True + assert found, "no such watermark found" + + def get_oids(self, dvs, obj_type): + + db = swsscommon.DBConnector(swsscommon.ASIC_DB, dvs.redis_sock, 0) + tbl = swsscommon.Table(db, "ASIC_STATE:{0}".format(obj_type)) + keys = tbl.getKeys() + return keys + + def set_up_flex_counter(self, dvs): + for q in self.qs: + dvs.runcmd("redis-cli -n 5 hset 'FLEX_COUNTER_TABLE:QUEUE_WATERMARK_STAT_COUNTER:{}' ".format(q) + \ + "QUEUE_COUNTER_ID_LIST SAI_QUEUE_STAT_SHARED_WATERMARK_BYTES") + + for pg in self.pgs: + dvs.runcmd("redis-cli -n 5 hset 'FLEX_COUNTER_TABLE:PG_WATERMARK_STAT_COUNTER:{}' ".format(pg) + \ + "PG_COUNTER_ID_LIST 'SAI_INGRESS_PRIORITY_GROUP_STAT_SHARED_WATERMARK_BYTES,SAI_INGRESS_PRIORITY_GROUP_STAT_XOFF_ROOM_WATERMARK_BYTES'") + + dvs.runcmd("redis-cli -n 4 hset 'FLEX_COUNTER_TABLE|PG_WATERMARK' 'FLEX_COUNTER_STATUS' 'enable'") + dvs.runcmd("redis-cli -n 4 hset 'FLEX_COUNTER_TABLE|QUEUE_WATERMARK' 'FLEX_COUNTER_STATUS' 'enable'") + + self.populate_asic(dvs, "SAI_OBJECT_TYPE_QUEUE", SaiWmStats.queue_shared, "0") + self.populate_asic(dvs, "SAI_OBJECT_TYPE_INGRESS_PRIORITY_GROUP", SaiWmStats.pg_shared, "0") + self.populate_asic(dvs, "SAI_OBJECT_TYPE_INGRESS_PRIORITY_GROUP", SaiWmStats.pg_headroom, "0") + + time.sleep(self.DEFAULT_TELEMETRY_INTERVAL*2) + + def set_up(self, dvs): + + self.qs = self.get_oids(dvs, "SAI_OBJECT_TYPE_QUEUE") + self.pgs = self.get_oids(dvs, "SAI_OBJECT_TYPE_INGRESS_PRIORITY_GROUP") + + db = swsscommon.DBConnector(swsscommon.COUNTERS_DB, dvs.redis_sock, 0) + tbl = swsscommon.Table(db, "COUNTERS_QUEUE_TYPE_MAP") + + self.uc_q = [] + self.mc_q = [] + + for q in self.qs: + if self.qs.index(q) % 16 < 8: + tbl.set('', [(q, "SAI_QUEUE_TYPE_UNICAST")]) + self.uc_q.append(q) + else: + tbl.set('', [(q, "SAI_QUEUE_TYPE_MULTICAST")]) + self.mc_q.append(q) + + def test_telemetry_period(self, dvs): + + self.set_up(dvs) + self.set_up_flex_counter(dvs) + self.enable_unittests(dvs, "true") + + self.populate_asic_all(dvs, "100") + + time.sleep(self.DEFAULT_TELEMETRY_INTERVAL + 1) + + self.verify_value(dvs, self.pgs, WmTables.periodic, SaiWmStats.pg_shared, "0") + self.verify_value(dvs, self.pgs, WmTables.periodic, SaiWmStats.pg_headroom, "0") + self.verify_value(dvs, self.qs, WmTables.periodic, SaiWmStats.queue_shared, "0") + + self.populate_asic_all(dvs, "123") + + dvs.runcmd("config watermark telemetry interval {}".format(5)) + + time.sleep(self.DEFAULT_TELEMETRY_INTERVAL + 1) + time.sleep(self.NEW_INTERVAL + 1) + + self.verify_value(dvs, self.pgs, WmTables.periodic, SaiWmStats.pg_shared, "0") + self.verify_value(dvs, self.pgs, WmTables.periodic, SaiWmStats.pg_headroom, "0") + self.verify_value(dvs, self.qs, WmTables.periodic, SaiWmStats.queue_shared, "0") + + self.enable_unittests(dvs, "false") + + def test_lua_plugins(self, dvs): + + self.set_up(dvs) + self.set_up_flex_counter(dvs) + self.enable_unittests(dvs, "true") + + self.populate_asic_all(dvs, "192") + + for table_name in [WmTables.user, WmTables.persistent]: + self.verify_value(dvs, self.qs, table_name, SaiWmStats.queue_shared, "192") + self.verify_value(dvs, self.pgs, table_name, SaiWmStats.pg_headroom, "192") + self.verify_value(dvs, self.pgs, table_name, SaiWmStats.pg_shared, "192") + + self.populate_asic_all(dvs, "96") + + for table_name in [WmTables.user, WmTables.persistent]: + self.verify_value(dvs, self.qs, table_name, SaiWmStats.queue_shared, "192") + self.verify_value(dvs, self.pgs, table_name, SaiWmStats.pg_headroom, "192") + self.verify_value(dvs, self.pgs, table_name, SaiWmStats.pg_shared, "192") + + self.populate_asic_all(dvs, "288") + + for table_name in [WmTables.user, WmTables.persistent]: + self.verify_value(dvs, self.qs, table_name, SaiWmStats.queue_shared, "288") + self.verify_value(dvs, self.pgs, table_name, SaiWmStats.pg_headroom, "288") + self.verify_value(dvs, self.pgs, table_name, SaiWmStats.pg_shared, "288") + + self.enable_unittests(dvs, "false") + + def test_clear(self, dvs): + + self.set_up(dvs) + self.enable_unittests(dvs, "true") + + self.populate_asic_all(dvs, "288") + + # clear pg shared watermark, and verify that headroom watermark and persistent watermarks are not affected + + dvs.runcmd("sonic-clear priority-group watermark shared") + + # make sure it cleared + self.verify_value(dvs, self.pgs, WmTables.user, SaiWmStats.pg_shared, "0") + + # make sure the rest is untouched + + self.verify_value(dvs, self.pgs, WmTables.user, SaiWmStats.pg_headroom, "288") + self.verify_value(dvs, self.pgs, WmTables.persistent, SaiWmStats.pg_shared, "288") + self.verify_value(dvs, self.pgs, WmTables.persistent, SaiWmStats.pg_headroom, "288") + + # clear queue unicast persistent watermark, and verify that multicast watermark and user watermarks are not affected + + dvs.runcmd("sonic-clear queue persistent-watermark unicast") + + # make sure it cleared + self.verify_value(dvs, self.uc_q, WmTables.persistent, SaiWmStats.queue_shared, "0") + + # make sure the rest is untouched + + self.verify_value(dvs, self.mc_q, WmTables.persistent, SaiWmStats.queue_shared, "288") + self.verify_value(dvs, self.uc_q, WmTables.user, SaiWmStats.queue_shared, "288") + self.verify_value(dvs, self.mc_q, WmTables.user, SaiWmStats.queue_shared, "288") + + self.enable_unittests(dvs, "false")