From 5c625b2fb557a1b33261f7327b1eac4da79fa3bf Mon Sep 17 00:00:00 2001 From: Shi Su <67605788+shi-su@users.noreply.github.com> Date: Tue, 25 May 2021 18:11:35 -0700 Subject: [PATCH] [Bulk mode] Limit the size of bulker (#1744) What I did Limit the size of the bulker. Why I did it Without a constraint on the size of bulkers, the bulked operation may take longer than the SAI timeout to finish. Such behavior would result in errors and swss crashes. This PR limits the bulk size and avoids SAI timeout in bulk mode. --- orchagent/bulker.h | 390 ++++++++++++++++++++++----------- orchagent/main.cpp | 21 +- orchagent/orchdaemon.cpp | 3 + orchagent/routeorch.cpp | 6 +- tests/mock_tests/bulker_ut.cpp | 5 +- 5 files changed, 298 insertions(+), 127 deletions(-) diff --git a/orchagent/bulker.h b/orchagent/bulker.h index 0578c6cc316e..cd1812861b14 100644 --- a/orchagent/bulker.h +++ b/orchagent/bulker.h @@ -158,7 +158,8 @@ class EntityBulker using Ts = SaiBulkerTraits; using Te = typename Ts::entry_t; - EntityBulker(typename Ts::api_t *api) + EntityBulker(typename Ts::api_t *api, size_t max_bulk_size) : + max_bulk_size(max_bulk_size) { throw std::logic_error("Not implemented"); } @@ -279,30 +280,15 @@ class EntityBulker if (*object_status == SAI_STATUS_NOT_EXECUTED) { rs.push_back(entry); - } - } - size_t count = rs.size(); - std::vector statuses(count); - sai_status_t status = (*remove_entries)((uint32_t)count, rs.data(), SAI_BULK_OP_ERROR_MODE_IGNORE_ERROR, statuses.data()); - if (status == SAI_STATUS_SUCCESS) - { - SWSS_LOG_INFO("EntityBulker.flush removing_entries %zu\n", removing_entries.size()); - } - else - { - SWSS_LOG_ERROR("EntityBulker.flush remove entries failed, number of entries to remove: %zu, status: %s", - removing_entries.size(), sai_serialize_status(status).c_str()); - } - for (size_t ir = 0; ir < count; ir++) - { - auto& entry = rs[ir]; - sai_status_t *object_status = removing_entries[entry]; - if (object_status) - { - *object_status = statuses[ir]; + if (rs.size() >= max_bulk_size) + { + flush_removing_entries(rs); + } } } + flush_removing_entries(rs); + removing_entries.clear(); } @@ -323,31 +309,15 @@ class EntityBulker rs.push_back(entry); tss.push_back(attrs.data()); cs.push_back((uint32_t)attrs.size()); - } - } - size_t count = rs.size(); - std::vector statuses(count); - sai_status_t status = (*create_entries)((uint32_t)count, rs.data(), cs.data(), tss.data() - , SAI_BULK_OP_ERROR_MODE_IGNORE_ERROR, statuses.data()); - if (status == SAI_STATUS_SUCCESS) - { - SWSS_LOG_INFO("EntityBulker.flush creating_entries %zu\n", creating_entries.size()); - } - else - { - SWSS_LOG_ERROR("EntityBulker.flush create entries failed, number of entries to create: %zu, status: %s", - creating_entries.size(), sai_serialize_status(status).c_str()); - } - for (size_t ir = 0; ir < count; ir++) - { - auto& entry = rs[ir]; - sai_status_t *object_status = creating_entries[entry].second; - if (object_status) - { - *object_status = statuses[ir]; + if (rs.size() >= max_bulk_size) + { + flush_creating_entries(rs, tss, cs); + } } } + flush_creating_entries(rs, tss, cs); + creating_entries.clear(); } @@ -371,32 +341,16 @@ class EntityBulker rs.push_back(entry); ts.push_back(attr); status_vector.push_back(object_status); + + if (rs.size() >= max_bulk_size) + { + flush_setting_entries(rs, ts, status_vector); + } } } } - size_t count = rs.size(); - std::vector statuses(count); - sai_status_t status = (*set_entries_attribute)((uint32_t)count, rs.data(), ts.data() - , SAI_BULK_OP_ERROR_MODE_IGNORE_ERROR, statuses.data()); - if (status == SAI_STATUS_SUCCESS) - { - SWSS_LOG_INFO("EntityBulker.flush setting_entries %zu, count %zu\n", setting_entries.size(), count); - } - else - { - SWSS_LOG_ERROR("EntityBulker.flush set entry attribute failed, number of entries to set: %zu, status: %s", - setting_entries.size(), sai_serialize_status(status).c_str()); - } + flush_setting_entries(rs, ts, status_vector); - for (size_t ir = 0; ir < count; ir++) - { - sai_status_t *object_status = status_vector[ir]; - if (object_status) - { - SWSS_LOG_INFO("EntityBulker.flush setting_entries status[%zu]=%d(0x%8p)\n", ir, statuses[ir], object_status); - *object_status = statuses[ir]; - } - } setting_entries.clear(); } } @@ -452,13 +406,131 @@ class EntityBulker sai_status_t * // OUT object_status > removing_entries; + size_t max_bulk_size; + typename Ts::bulk_create_entry_fn create_entries; typename Ts::bulk_remove_entry_fn remove_entries; typename Ts::bulk_set_entry_attribute_fn set_entries_attribute; + + sai_status_t flush_removing_entries( + _Inout_ std::vector &rs) + { + if (rs.empty()) + { + return SAI_STATUS_SUCCESS; + } + size_t count = rs.size(); + std::vector statuses(count); + sai_status_t status = (*remove_entries)((uint32_t)count, rs.data(), SAI_BULK_OP_ERROR_MODE_IGNORE_ERROR, statuses.data()); + if (status == SAI_STATUS_SUCCESS) + { + SWSS_LOG_INFO("EntityBulker.flush removing_entries %zu\n", count); + } + else + { + SWSS_LOG_ERROR("EntityBulker.flush remove entries failed, number of entries to remove: %zu, status: %s", + count, sai_serialize_status(status).c_str()); + } + + for (size_t ir = 0; ir < count; ir++) + { + auto& entry = rs[ir]; + sai_status_t *object_status = removing_entries[entry]; + if (object_status) + { + *object_status = statuses[ir]; + } + } + + rs.clear(); + + return status; + } + + sai_status_t flush_creating_entries( + _Inout_ std::vector &rs, + _Inout_ std::vector &tss, + _Inout_ std::vector &cs) + { + if (rs.empty()) + { + return SAI_STATUS_SUCCESS; + } + size_t count = rs.size(); + std::vector statuses(count); + sai_status_t status = (*create_entries)((uint32_t)count, rs.data(), cs.data(), tss.data() + , SAI_BULK_OP_ERROR_MODE_IGNORE_ERROR, statuses.data()); + if (status == SAI_STATUS_SUCCESS) + { + SWSS_LOG_INFO("EntityBulker.flush creating_entries %zu\n", count); + } + else + { + SWSS_LOG_ERROR("EntityBulker.flush create entries failed, number of entries to create: %zu, status: %s", + count, sai_serialize_status(status).c_str()); + } + + for (size_t ir = 0; ir < count; ir++) + { + auto& entry = rs[ir]; + sai_status_t *object_status = creating_entries[entry].second; + if (object_status) + { + *object_status = statuses[ir]; + } + } + + rs.clear(); + tss.clear(); + cs.clear(); + + return status; + } + + sai_status_t flush_setting_entries( + _Inout_ std::vector &rs, + _Inout_ std::vector &ts, + _Inout_ std::vector &status_vector) + { + if (rs.empty()) + { + return SAI_STATUS_SUCCESS; + } + size_t count = rs.size(); + std::vector statuses(count); + sai_status_t status = (*set_entries_attribute)((uint32_t)count, rs.data(), ts.data() + , SAI_BULK_OP_ERROR_MODE_IGNORE_ERROR, statuses.data()); + if (status == SAI_STATUS_SUCCESS) + { + SWSS_LOG_INFO("EntityBulker.flush setting_entries, count %zu\n", count); + } + else + { + SWSS_LOG_ERROR("EntityBulker.flush set entry attribute failed, number of entries to set: %zu, status: %s", + count, sai_serialize_status(status).c_str()); + } + + for (size_t ir = 0; ir < count; ir++) + { + sai_status_t *object_status = status_vector[ir]; + if (object_status) + { + SWSS_LOG_INFO("EntityBulker.flush setting_entries status[%zu]=%d(0x%8p)\n", ir, statuses[ir], object_status); + *object_status = statuses[ir]; + } + } + + rs.clear(); + ts.clear(); + status_vector.clear(); + + return status; + } }; template <> -inline EntityBulker::EntityBulker(sai_route_api_t *api) +inline EntityBulker::EntityBulker(sai_route_api_t *api, size_t max_bulk_size) : + max_bulk_size(max_bulk_size) { create_entries = api->create_route_entries; remove_entries = api->remove_route_entries; @@ -466,7 +538,8 @@ inline EntityBulker::EntityBulker(sai_route_api_t *api) } template <> -inline EntityBulker::EntityBulker(sai_fdb_api_t *api) +inline EntityBulker::EntityBulker(sai_fdb_api_t *api, size_t max_bulk_size) : + max_bulk_size(max_bulk_size) { // TODO: implement after create_fdb_entries() is available in SAI throw std::logic_error("Not implemented"); @@ -483,7 +556,8 @@ class ObjectBulker public: using Ts = SaiBulkerTraits; - ObjectBulker(typename Ts::api_t* next_hop_group_api, sai_object_id_t switch_id) + ObjectBulker(typename Ts::api_t* next_hop_group_api, sai_object_id_t switch_id, size_t max_bulk_size) : + max_bulk_size(max_bulk_size) { throw std::logic_error("Not implemented"); } @@ -564,33 +638,22 @@ class ObjectBulker if (*object_status == SAI_STATUS_NOT_EXECUTED) { rs.push_back(entry); + + if (rs.size() >= max_bulk_size) + { + flush_removing_entries(rs); + } } } - size_t count = rs.size(); - std::vector statuses(count); - sai_status_t status = (*remove_entries)((uint32_t)count, rs.data(), SAI_BULK_OP_ERROR_MODE_STOP_ON_ERROR, statuses.data()); - if (status == SAI_STATUS_SUCCESS) - { - SWSS_LOG_INFO("ObjectBulker.flush removing_entries %zu rc=%d statuses[0]=%d\n", removing_entries.size(), status, statuses[0]); - } - else - { - SWSS_LOG_ERROR("ObjectBulker.flush remove entries failed, number of entries to remove: %zu, status: %s", - removing_entries.size(), sai_serialize_status(status).c_str()); - } + flush_removing_entries(rs); - for (size_t i = 0; i < count; i++) - { - auto const& entry = rs[i]; - sai_status_t object_status = statuses[i]; - *removing_entries[entry] = object_status; - } removing_entries.clear(); } // Creating if (!creating_entries.empty()) { + std::vector rs; std::vector tss; std::vector cs; @@ -600,30 +663,17 @@ class ObjectBulker auto const& attrs = std::get<1>(i); if (*pid == SAI_NULL_OBJECT_ID) { + rs.push_back(pid); tss.push_back(attrs.data()); cs.push_back((uint32_t)attrs.size()); - } - } - size_t count = creating_entries.size(); - std::vector object_ids(count); - std::vector statuses(count); - sai_status_t status = (*create_entries)(switch_id, (uint32_t)count, cs.data(), tss.data() - , SAI_BULK_OP_ERROR_MODE_STOP_ON_ERROR, object_ids.data(), statuses.data()); - if (status == SAI_STATUS_SUCCESS) - { - SWSS_LOG_INFO("ObjectBulker.flush creating_entries %zu\n", creating_entries.size()); - } - else - { - SWSS_LOG_ERROR("ObjectBulker.flush create entries failed, number of entries to create: %zu, status: %s", - creating_entries.size(), sai_serialize_status(status).c_str()); - } - for (size_t i = 0; i < count; i++) - { - sai_object_id_t *pid = std::get<0>(creating_entries[i]); - *pid = (statuses[i] == SAI_STATUS_SUCCESS) ? object_ids[i] : SAI_NULL_OBJECT_ID; + if (rs.size() >= max_bulk_size) + { + flush_creating_entries(rs, tss, cs); + } + } } + flush_creating_entries(rs, tss, cs); creating_entries.clear(); } @@ -644,21 +694,14 @@ class ObjectBulker { rs.push_back(entry); ts.push_back(attr); + + if (rs.size() >= max_bulk_size) + { + flush_setting_entries(rs, ts); + } } } - size_t count = setting_entries.size(); - std::vector statuses(count); - sai_status_t status = (*set_entries_attribute)((uint32_t)count, rs.data(), ts.data() - , SAI_BULK_OP_ERROR_MODE_STOP_ON_ERROR, statuses.data()); - if (status == SAI_STATUS_SUCCESS) - { - SWSS_LOG_INFO("ObjectBulker.flush setting_entries %zu\n", setting_entries.size()); - } - else - { - SWSS_LOG_ERROR("ObjectBulker.flush set entry attribute failed, number of entries to set: %zu, status: %s", - setting_entries.size(), sai_serialize_status(status).c_str()); - } + flush_setting_entries(rs, ts); setting_entries.clear(); } @@ -702,6 +745,8 @@ class ObjectBulker sai_object_id_t switch_id; + size_t max_bulk_size; + std::vector // - attrs @@ -723,11 +768,112 @@ class ObjectBulker typename Ts::bulk_remove_entry_fn remove_entries; // TODO: wait until available in SAI //typename Ts::bulk_set_entry_attribute_fn set_entries_attribute; + + sai_status_t flush_removing_entries( + _Inout_ std::vector &rs) + { + if (rs.empty()) + { + return SAI_STATUS_SUCCESS; + } + size_t count = rs.size(); + std::vector statuses(count); + sai_status_t status = (*remove_entries)((uint32_t)count, rs.data(), SAI_BULK_OP_ERROR_MODE_STOP_ON_ERROR, statuses.data()); + if (status == SAI_STATUS_SUCCESS) + { + SWSS_LOG_INFO("ObjectBulker.flush removing_entries %zu rc=%d statuses[0]=%d\n", removing_entries.size(), status, statuses[0]); + } + else + { + SWSS_LOG_ERROR("ObjectBulker.flush remove entries failed, number of entries to remove: %zu, status: %s", + removing_entries.size(), sai_serialize_status(status).c_str()); + } + + for (size_t i = 0; i < count; i++) + { + auto const& entry = rs[i]; + sai_status_t object_status = statuses[i]; + *removing_entries[entry] = object_status; + } + + rs.clear(); + + return status; + } + + sai_status_t flush_creating_entries( + _Inout_ std::vector &rs, + _Inout_ std::vector &tss, + _Inout_ std::vector &cs) + { + if (rs.empty()) + { + return SAI_STATUS_SUCCESS; + } + size_t count = rs.size(); + std::vector object_ids(count); + std::vector statuses(count); + sai_status_t status = (*create_entries)(switch_id, (uint32_t)count, cs.data(), tss.data() + , SAI_BULK_OP_ERROR_MODE_STOP_ON_ERROR, object_ids.data(), statuses.data()); + if (status == SAI_STATUS_SUCCESS) + { + SWSS_LOG_INFO("ObjectBulker.flush creating_entries %zu\n", count); + } + else + { + SWSS_LOG_ERROR("ObjectBulker.flush create entries failed, number of entries to create: %zu, status: %s", + count, sai_serialize_status(status).c_str()); + } + + for (size_t i = 0; i < count; i++) + { + sai_object_id_t *pid = rs[i]; + *pid = (statuses[i] == SAI_STATUS_SUCCESS) ? object_ids[i] : SAI_NULL_OBJECT_ID; + } + + rs.clear(); + tss.clear(); + cs.clear(); + + return status; + } + + // TODO: wait until available in SAI + /* + sai_status_t flush_setting_entries( + _Inout_ std::vector &rs, + _Inout_ std::vector &ts) + { + if (rs.empty()) + { + return SAI_STATUS_SUCCESS; + } + size_t count = rs.size(); + std::vector statuses(count); + sai_status_t status = (*set_entries_attribute)((uint32_t)count, rs.data(), ts.data() + , SAI_BULK_OP_ERROR_MODE_STOP_ON_ERROR, statuses.data()); + if (status == SAI_STATUS_SUCCESS) + { + SWSS_LOG_INFO("ObjectBulker.flush setting_entries %zu\n", count); + } + else + { + SWSS_LOG_ERROR("ObjectBulker.flush set entry attribute failed, number of entries to set: %zu, status: %s", + count, sai_serialize_status(status).c_str()); + } + + rs.clear(); + ts.clear(); + + return status; + } + */ }; template <> -inline ObjectBulker::ObjectBulker(SaiBulkerTraits::api_t *api, sai_object_id_t switch_id) - : switch_id(switch_id) +inline ObjectBulker::ObjectBulker(SaiBulkerTraits::api_t *api, sai_object_id_t switch_id, size_t max_bulk_size) : + switch_id(switch_id), + max_bulk_size(max_bulk_size) { create_entries = api->create_next_hop_group_members; remove_entries = api->remove_next_hop_group_members; diff --git a/orchagent/main.cpp b/orchagent/main.cpp index 23d93bd291c0..018af5999b13 100644 --- a/orchagent/main.cpp +++ b/orchagent/main.cpp @@ -47,6 +47,8 @@ sai_object_id_t gSwitchId = SAI_NULL_OBJECT_ID; MacAddress gMacAddress; MacAddress gVxlanMacAddress; +extern size_t gMaxBulkSize; + #define DEFAULT_BATCH_SIZE 128 int gBatchSize = DEFAULT_BATCH_SIZE; @@ -72,7 +74,7 @@ string gMyAsicName = ""; void usage() { - cout << "usage: orchagent [-h] [-r record_type] [-d record_location] [-f swss_rec_filename] [-j sairedis_rec_filename] [-b batch_size] [-m MAC] [-i INST_ID] [-s] [-z mode]" << endl; + cout << "usage: orchagent [-h] [-r record_type] [-d record_location] [-f swss_rec_filename] [-j sairedis_rec_filename] [-b batch_size] [-m MAC] [-i INST_ID] [-s] [-z mode] [-k bulk_size]" << endl; cout << " -h: display this message" << endl; cout << " -r record_type: record orchagent logs with type (default 3)" << endl; cout << " 0: do not record logs" << endl; @@ -87,6 +89,7 @@ void usage() cout << " -z: redis communication mode (redis_async|redis_sync|zmq_sync), default: redis_async" << endl; cout << " -f swss_rec_filename: swss record log filename(default 'swss.rec')" << endl; cout << " -j sairedis_rec_filename: sairedis record log filename(default sairedis.rec)" << endl; + cout << " -k max bulk size in bulk mode (default 1000)"; } void sighup_handler(int signo) @@ -319,7 +322,7 @@ int main(int argc, char **argv) string swss_rec_filename = "swss.rec"; string sairedis_rec_filename = "sairedis.rec"; - while ((opt = getopt(argc, argv, "b:m:r:f:j:d:i:hsz:")) != -1) + while ((opt = getopt(argc, argv, "b:m:r:f:j:d:i:hsz:k:")) != -1) { switch (opt) { @@ -397,6 +400,20 @@ int main(int argc, char **argv) sairedis_rec_filename = optarg; } break; + case 'k': + { + auto limit = atoi(optarg); + if (limit > 0) + { + gMaxBulkSize = limit; + SWSS_LOG_NOTICE("Setting maximum bulk size in bulk mode as %zu", gMaxBulkSize); + } + else + { + SWSS_LOG_ERROR("Invalid input for maximum bulk size in bulk mode: %d. Ignoring.", limit); + } + } + break; default: /* '?' */ exit(EXIT_FAILURE); } diff --git a/orchagent/orchdaemon.cpp b/orchagent/orchdaemon.cpp index fa4acacd9190..f28908418a59 100644 --- a/orchagent/orchdaemon.cpp +++ b/orchagent/orchdaemon.cpp @@ -43,6 +43,9 @@ MACsecOrch *gMacsecOrch; bool gIsNatSupported = false; +#define DEFAULT_MAX_BULK_SIZE 1000 +size_t gMaxBulkSize = DEFAULT_MAX_BULK_SIZE; + OrchDaemon::OrchDaemon(DBConnector *applDb, DBConnector *configDb, DBConnector *stateDb, DBConnector *chassisAppDb) : m_applDb(applDb), m_configDb(configDb), diff --git a/orchagent/routeorch.cpp b/orchagent/routeorch.cpp index fc5b2276a8c8..28f6eaef4d3a 100644 --- a/orchagent/routeorch.cpp +++ b/orchagent/routeorch.cpp @@ -18,6 +18,8 @@ extern PortsOrch *gPortsOrch; extern CrmOrch *gCrmOrch; extern Directory gDirectory; +extern size_t gMaxBulkSize; + /* Default maximum number of next hop groups */ #define DEFAULT_NUMBER_OF_ECMP_GROUPS 128 #define DEFAULT_MAX_ECMP_GROUP_SIZE 32 @@ -25,8 +27,8 @@ extern Directory gDirectory; const int routeorch_pri = 5; RouteOrch::RouteOrch(DBConnector *db, string tableName, SwitchOrch *switchOrch, NeighOrch *neighOrch, IntfsOrch *intfsOrch, VRFOrch *vrfOrch, FgNhgOrch *fgNhgOrch) : - gRouteBulker(sai_route_api), - gNextHopGroupMemberBulker(sai_next_hop_group_api, gSwitchId), + gRouteBulker(sai_route_api, gMaxBulkSize), + gNextHopGroupMemberBulker(sai_next_hop_group_api, gSwitchId, gMaxBulkSize), Orch(db, tableName, routeorch_pri), m_switchOrch(switchOrch), m_neighOrch(neighOrch), diff --git a/tests/mock_tests/bulker_ut.cpp b/tests/mock_tests/bulker_ut.cpp index 1cdabcdd148b..a2cdaa07a300 100644 --- a/tests/mock_tests/bulker_ut.cpp +++ b/tests/mock_tests/bulker_ut.cpp @@ -29,9 +29,12 @@ namespace bulker_test TEST_F(BulkerTest, BulkerAttrOrder) { // Create bulker - EntityBulker gRouteBulker(sai_route_api); + EntityBulker gRouteBulker(sai_route_api, 1000); deque object_statuses; + // Check max bulk size + ASSERT_EQ(gRouteBulker.max_bulk_size, 1000); + // Create a dummy route entry sai_route_entry_t route_entry; route_entry.destination.addr_family = SAI_IP_ADDR_FAMILY_IPV4;