diff --git a/orchagent/bulker.h b/orchagent/bulker.h index 0578c6cc316e..cd1812861b14 100644 --- a/orchagent/bulker.h +++ b/orchagent/bulker.h @@ -158,7 +158,8 @@ class EntityBulker using Ts = SaiBulkerTraits; using Te = typename Ts::entry_t; - EntityBulker(typename Ts::api_t *api) + EntityBulker(typename Ts::api_t *api, size_t max_bulk_size) : + max_bulk_size(max_bulk_size) { throw std::logic_error("Not implemented"); } @@ -279,30 +280,15 @@ class EntityBulker if (*object_status == SAI_STATUS_NOT_EXECUTED) { rs.push_back(entry); - } - } - size_t count = rs.size(); - std::vector statuses(count); - sai_status_t status = (*remove_entries)((uint32_t)count, rs.data(), SAI_BULK_OP_ERROR_MODE_IGNORE_ERROR, statuses.data()); - if (status == SAI_STATUS_SUCCESS) - { - SWSS_LOG_INFO("EntityBulker.flush removing_entries %zu\n", removing_entries.size()); - } - else - { - SWSS_LOG_ERROR("EntityBulker.flush remove entries failed, number of entries to remove: %zu, status: %s", - removing_entries.size(), sai_serialize_status(status).c_str()); - } - for (size_t ir = 0; ir < count; ir++) - { - auto& entry = rs[ir]; - sai_status_t *object_status = removing_entries[entry]; - if (object_status) - { - *object_status = statuses[ir]; + if (rs.size() >= max_bulk_size) + { + flush_removing_entries(rs); + } } } + flush_removing_entries(rs); + removing_entries.clear(); } @@ -323,31 +309,15 @@ class EntityBulker rs.push_back(entry); tss.push_back(attrs.data()); cs.push_back((uint32_t)attrs.size()); - } - } - size_t count = rs.size(); - std::vector statuses(count); - sai_status_t status = (*create_entries)((uint32_t)count, rs.data(), cs.data(), tss.data() - , SAI_BULK_OP_ERROR_MODE_IGNORE_ERROR, statuses.data()); - if (status == SAI_STATUS_SUCCESS) - { - SWSS_LOG_INFO("EntityBulker.flush creating_entries %zu\n", creating_entries.size()); - } - else - { - SWSS_LOG_ERROR("EntityBulker.flush create entries failed, number of entries to create: %zu, status: %s", - creating_entries.size(), sai_serialize_status(status).c_str()); - } - for (size_t ir = 0; ir < count; ir++) - { - auto& entry = rs[ir]; - sai_status_t *object_status = creating_entries[entry].second; - if (object_status) - { - *object_status = statuses[ir]; + if (rs.size() >= max_bulk_size) + { + flush_creating_entries(rs, tss, cs); + } } } + flush_creating_entries(rs, tss, cs); + creating_entries.clear(); } @@ -371,32 +341,16 @@ class EntityBulker rs.push_back(entry); ts.push_back(attr); status_vector.push_back(object_status); + + if (rs.size() >= max_bulk_size) + { + flush_setting_entries(rs, ts, status_vector); + } } } } - size_t count = rs.size(); - std::vector statuses(count); - sai_status_t status = (*set_entries_attribute)((uint32_t)count, rs.data(), ts.data() - , SAI_BULK_OP_ERROR_MODE_IGNORE_ERROR, statuses.data()); - if (status == SAI_STATUS_SUCCESS) - { - SWSS_LOG_INFO("EntityBulker.flush setting_entries %zu, count %zu\n", setting_entries.size(), count); - } - else - { - SWSS_LOG_ERROR("EntityBulker.flush set entry attribute failed, number of entries to set: %zu, status: %s", - setting_entries.size(), sai_serialize_status(status).c_str()); - } + flush_setting_entries(rs, ts, status_vector); - for (size_t ir = 0; ir < count; ir++) - { - sai_status_t *object_status = status_vector[ir]; - if (object_status) - { - SWSS_LOG_INFO("EntityBulker.flush setting_entries status[%zu]=%d(0x%8p)\n", ir, statuses[ir], object_status); - *object_status = statuses[ir]; - } - } setting_entries.clear(); } } @@ -452,13 +406,131 @@ class EntityBulker sai_status_t * // OUT object_status > removing_entries; + size_t max_bulk_size; + typename Ts::bulk_create_entry_fn create_entries; typename Ts::bulk_remove_entry_fn remove_entries; typename Ts::bulk_set_entry_attribute_fn set_entries_attribute; + + sai_status_t flush_removing_entries( + _Inout_ std::vector &rs) + { + if (rs.empty()) + { + return SAI_STATUS_SUCCESS; + } + size_t count = rs.size(); + std::vector statuses(count); + sai_status_t status = (*remove_entries)((uint32_t)count, rs.data(), SAI_BULK_OP_ERROR_MODE_IGNORE_ERROR, statuses.data()); + if (status == SAI_STATUS_SUCCESS) + { + SWSS_LOG_INFO("EntityBulker.flush removing_entries %zu\n", count); + } + else + { + SWSS_LOG_ERROR("EntityBulker.flush remove entries failed, number of entries to remove: %zu, status: %s", + count, sai_serialize_status(status).c_str()); + } + + for (size_t ir = 0; ir < count; ir++) + { + auto& entry = rs[ir]; + sai_status_t *object_status = removing_entries[entry]; + if (object_status) + { + *object_status = statuses[ir]; + } + } + + rs.clear(); + + return status; + } + + sai_status_t flush_creating_entries( + _Inout_ std::vector &rs, + _Inout_ std::vector &tss, + _Inout_ std::vector &cs) + { + if (rs.empty()) + { + return SAI_STATUS_SUCCESS; + } + size_t count = rs.size(); + std::vector statuses(count); + sai_status_t status = (*create_entries)((uint32_t)count, rs.data(), cs.data(), tss.data() + , SAI_BULK_OP_ERROR_MODE_IGNORE_ERROR, statuses.data()); + if (status == SAI_STATUS_SUCCESS) + { + SWSS_LOG_INFO("EntityBulker.flush creating_entries %zu\n", count); + } + else + { + SWSS_LOG_ERROR("EntityBulker.flush create entries failed, number of entries to create: %zu, status: %s", + count, sai_serialize_status(status).c_str()); + } + + for (size_t ir = 0; ir < count; ir++) + { + auto& entry = rs[ir]; + sai_status_t *object_status = creating_entries[entry].second; + if (object_status) + { + *object_status = statuses[ir]; + } + } + + rs.clear(); + tss.clear(); + cs.clear(); + + return status; + } + + sai_status_t flush_setting_entries( + _Inout_ std::vector &rs, + _Inout_ std::vector &ts, + _Inout_ std::vector &status_vector) + { + if (rs.empty()) + { + return SAI_STATUS_SUCCESS; + } + size_t count = rs.size(); + std::vector statuses(count); + sai_status_t status = (*set_entries_attribute)((uint32_t)count, rs.data(), ts.data() + , SAI_BULK_OP_ERROR_MODE_IGNORE_ERROR, statuses.data()); + if (status == SAI_STATUS_SUCCESS) + { + SWSS_LOG_INFO("EntityBulker.flush setting_entries, count %zu\n", count); + } + else + { + SWSS_LOG_ERROR("EntityBulker.flush set entry attribute failed, number of entries to set: %zu, status: %s", + count, sai_serialize_status(status).c_str()); + } + + for (size_t ir = 0; ir < count; ir++) + { + sai_status_t *object_status = status_vector[ir]; + if (object_status) + { + SWSS_LOG_INFO("EntityBulker.flush setting_entries status[%zu]=%d(0x%8p)\n", ir, statuses[ir], object_status); + *object_status = statuses[ir]; + } + } + + rs.clear(); + ts.clear(); + status_vector.clear(); + + return status; + } }; template <> -inline EntityBulker::EntityBulker(sai_route_api_t *api) +inline EntityBulker::EntityBulker(sai_route_api_t *api, size_t max_bulk_size) : + max_bulk_size(max_bulk_size) { create_entries = api->create_route_entries; remove_entries = api->remove_route_entries; @@ -466,7 +538,8 @@ inline EntityBulker::EntityBulker(sai_route_api_t *api) } template <> -inline EntityBulker::EntityBulker(sai_fdb_api_t *api) +inline EntityBulker::EntityBulker(sai_fdb_api_t *api, size_t max_bulk_size) : + max_bulk_size(max_bulk_size) { // TODO: implement after create_fdb_entries() is available in SAI throw std::logic_error("Not implemented"); @@ -483,7 +556,8 @@ class ObjectBulker public: using Ts = SaiBulkerTraits; - ObjectBulker(typename Ts::api_t* next_hop_group_api, sai_object_id_t switch_id) + ObjectBulker(typename Ts::api_t* next_hop_group_api, sai_object_id_t switch_id, size_t max_bulk_size) : + max_bulk_size(max_bulk_size) { throw std::logic_error("Not implemented"); } @@ -564,33 +638,22 @@ class ObjectBulker if (*object_status == SAI_STATUS_NOT_EXECUTED) { rs.push_back(entry); + + if (rs.size() >= max_bulk_size) + { + flush_removing_entries(rs); + } } } - size_t count = rs.size(); - std::vector statuses(count); - sai_status_t status = (*remove_entries)((uint32_t)count, rs.data(), SAI_BULK_OP_ERROR_MODE_STOP_ON_ERROR, statuses.data()); - if (status == SAI_STATUS_SUCCESS) - { - SWSS_LOG_INFO("ObjectBulker.flush removing_entries %zu rc=%d statuses[0]=%d\n", removing_entries.size(), status, statuses[0]); - } - else - { - SWSS_LOG_ERROR("ObjectBulker.flush remove entries failed, number of entries to remove: %zu, status: %s", - removing_entries.size(), sai_serialize_status(status).c_str()); - } + flush_removing_entries(rs); - for (size_t i = 0; i < count; i++) - { - auto const& entry = rs[i]; - sai_status_t object_status = statuses[i]; - *removing_entries[entry] = object_status; - } removing_entries.clear(); } // Creating if (!creating_entries.empty()) { + std::vector rs; std::vector tss; std::vector cs; @@ -600,30 +663,17 @@ class ObjectBulker auto const& attrs = std::get<1>(i); if (*pid == SAI_NULL_OBJECT_ID) { + rs.push_back(pid); tss.push_back(attrs.data()); cs.push_back((uint32_t)attrs.size()); - } - } - size_t count = creating_entries.size(); - std::vector object_ids(count); - std::vector statuses(count); - sai_status_t status = (*create_entries)(switch_id, (uint32_t)count, cs.data(), tss.data() - , SAI_BULK_OP_ERROR_MODE_STOP_ON_ERROR, object_ids.data(), statuses.data()); - if (status == SAI_STATUS_SUCCESS) - { - SWSS_LOG_INFO("ObjectBulker.flush creating_entries %zu\n", creating_entries.size()); - } - else - { - SWSS_LOG_ERROR("ObjectBulker.flush create entries failed, number of entries to create: %zu, status: %s", - creating_entries.size(), sai_serialize_status(status).c_str()); - } - for (size_t i = 0; i < count; i++) - { - sai_object_id_t *pid = std::get<0>(creating_entries[i]); - *pid = (statuses[i] == SAI_STATUS_SUCCESS) ? object_ids[i] : SAI_NULL_OBJECT_ID; + if (rs.size() >= max_bulk_size) + { + flush_creating_entries(rs, tss, cs); + } + } } + flush_creating_entries(rs, tss, cs); creating_entries.clear(); } @@ -644,21 +694,14 @@ class ObjectBulker { rs.push_back(entry); ts.push_back(attr); + + if (rs.size() >= max_bulk_size) + { + flush_setting_entries(rs, ts); + } } } - size_t count = setting_entries.size(); - std::vector statuses(count); - sai_status_t status = (*set_entries_attribute)((uint32_t)count, rs.data(), ts.data() - , SAI_BULK_OP_ERROR_MODE_STOP_ON_ERROR, statuses.data()); - if (status == SAI_STATUS_SUCCESS) - { - SWSS_LOG_INFO("ObjectBulker.flush setting_entries %zu\n", setting_entries.size()); - } - else - { - SWSS_LOG_ERROR("ObjectBulker.flush set entry attribute failed, number of entries to set: %zu, status: %s", - setting_entries.size(), sai_serialize_status(status).c_str()); - } + flush_setting_entries(rs, ts); setting_entries.clear(); } @@ -702,6 +745,8 @@ class ObjectBulker sai_object_id_t switch_id; + size_t max_bulk_size; + std::vector // - attrs @@ -723,11 +768,112 @@ class ObjectBulker typename Ts::bulk_remove_entry_fn remove_entries; // TODO: wait until available in SAI //typename Ts::bulk_set_entry_attribute_fn set_entries_attribute; + + sai_status_t flush_removing_entries( + _Inout_ std::vector &rs) + { + if (rs.empty()) + { + return SAI_STATUS_SUCCESS; + } + size_t count = rs.size(); + std::vector statuses(count); + sai_status_t status = (*remove_entries)((uint32_t)count, rs.data(), SAI_BULK_OP_ERROR_MODE_STOP_ON_ERROR, statuses.data()); + if (status == SAI_STATUS_SUCCESS) + { + SWSS_LOG_INFO("ObjectBulker.flush removing_entries %zu rc=%d statuses[0]=%d\n", removing_entries.size(), status, statuses[0]); + } + else + { + SWSS_LOG_ERROR("ObjectBulker.flush remove entries failed, number of entries to remove: %zu, status: %s", + removing_entries.size(), sai_serialize_status(status).c_str()); + } + + for (size_t i = 0; i < count; i++) + { + auto const& entry = rs[i]; + sai_status_t object_status = statuses[i]; + *removing_entries[entry] = object_status; + } + + rs.clear(); + + return status; + } + + sai_status_t flush_creating_entries( + _Inout_ std::vector &rs, + _Inout_ std::vector &tss, + _Inout_ std::vector &cs) + { + if (rs.empty()) + { + return SAI_STATUS_SUCCESS; + } + size_t count = rs.size(); + std::vector object_ids(count); + std::vector statuses(count); + sai_status_t status = (*create_entries)(switch_id, (uint32_t)count, cs.data(), tss.data() + , SAI_BULK_OP_ERROR_MODE_STOP_ON_ERROR, object_ids.data(), statuses.data()); + if (status == SAI_STATUS_SUCCESS) + { + SWSS_LOG_INFO("ObjectBulker.flush creating_entries %zu\n", count); + } + else + { + SWSS_LOG_ERROR("ObjectBulker.flush create entries failed, number of entries to create: %zu, status: %s", + count, sai_serialize_status(status).c_str()); + } + + for (size_t i = 0; i < count; i++) + { + sai_object_id_t *pid = rs[i]; + *pid = (statuses[i] == SAI_STATUS_SUCCESS) ? object_ids[i] : SAI_NULL_OBJECT_ID; + } + + rs.clear(); + tss.clear(); + cs.clear(); + + return status; + } + + // TODO: wait until available in SAI + /* + sai_status_t flush_setting_entries( + _Inout_ std::vector &rs, + _Inout_ std::vector &ts) + { + if (rs.empty()) + { + return SAI_STATUS_SUCCESS; + } + size_t count = rs.size(); + std::vector statuses(count); + sai_status_t status = (*set_entries_attribute)((uint32_t)count, rs.data(), ts.data() + , SAI_BULK_OP_ERROR_MODE_STOP_ON_ERROR, statuses.data()); + if (status == SAI_STATUS_SUCCESS) + { + SWSS_LOG_INFO("ObjectBulker.flush setting_entries %zu\n", count); + } + else + { + SWSS_LOG_ERROR("ObjectBulker.flush set entry attribute failed, number of entries to set: %zu, status: %s", + count, sai_serialize_status(status).c_str()); + } + + rs.clear(); + ts.clear(); + + return status; + } + */ }; template <> -inline ObjectBulker::ObjectBulker(SaiBulkerTraits::api_t *api, sai_object_id_t switch_id) - : switch_id(switch_id) +inline ObjectBulker::ObjectBulker(SaiBulkerTraits::api_t *api, sai_object_id_t switch_id, size_t max_bulk_size) : + switch_id(switch_id), + max_bulk_size(max_bulk_size) { create_entries = api->create_next_hop_group_members; remove_entries = api->remove_next_hop_group_members; diff --git a/orchagent/main.cpp b/orchagent/main.cpp index 23d93bd291c0..018af5999b13 100644 --- a/orchagent/main.cpp +++ b/orchagent/main.cpp @@ -47,6 +47,8 @@ sai_object_id_t gSwitchId = SAI_NULL_OBJECT_ID; MacAddress gMacAddress; MacAddress gVxlanMacAddress; +extern size_t gMaxBulkSize; + #define DEFAULT_BATCH_SIZE 128 int gBatchSize = DEFAULT_BATCH_SIZE; @@ -72,7 +74,7 @@ string gMyAsicName = ""; void usage() { - cout << "usage: orchagent [-h] [-r record_type] [-d record_location] [-f swss_rec_filename] [-j sairedis_rec_filename] [-b batch_size] [-m MAC] [-i INST_ID] [-s] [-z mode]" << endl; + cout << "usage: orchagent [-h] [-r record_type] [-d record_location] [-f swss_rec_filename] [-j sairedis_rec_filename] [-b batch_size] [-m MAC] [-i INST_ID] [-s] [-z mode] [-k bulk_size]" << endl; cout << " -h: display this message" << endl; cout << " -r record_type: record orchagent logs with type (default 3)" << endl; cout << " 0: do not record logs" << endl; @@ -87,6 +89,7 @@ void usage() cout << " -z: redis communication mode (redis_async|redis_sync|zmq_sync), default: redis_async" << endl; cout << " -f swss_rec_filename: swss record log filename(default 'swss.rec')" << endl; cout << " -j sairedis_rec_filename: sairedis record log filename(default sairedis.rec)" << endl; + cout << " -k max bulk size in bulk mode (default 1000)"; } void sighup_handler(int signo) @@ -319,7 +322,7 @@ int main(int argc, char **argv) string swss_rec_filename = "swss.rec"; string sairedis_rec_filename = "sairedis.rec"; - while ((opt = getopt(argc, argv, "b:m:r:f:j:d:i:hsz:")) != -1) + while ((opt = getopt(argc, argv, "b:m:r:f:j:d:i:hsz:k:")) != -1) { switch (opt) { @@ -397,6 +400,20 @@ int main(int argc, char **argv) sairedis_rec_filename = optarg; } break; + case 'k': + { + auto limit = atoi(optarg); + if (limit > 0) + { + gMaxBulkSize = limit; + SWSS_LOG_NOTICE("Setting maximum bulk size in bulk mode as %zu", gMaxBulkSize); + } + else + { + SWSS_LOG_ERROR("Invalid input for maximum bulk size in bulk mode: %d. Ignoring.", limit); + } + } + break; default: /* '?' */ exit(EXIT_FAILURE); } diff --git a/orchagent/orchdaemon.cpp b/orchagent/orchdaemon.cpp index fa4acacd9190..f28908418a59 100644 --- a/orchagent/orchdaemon.cpp +++ b/orchagent/orchdaemon.cpp @@ -43,6 +43,9 @@ MACsecOrch *gMacsecOrch; bool gIsNatSupported = false; +#define DEFAULT_MAX_BULK_SIZE 1000 +size_t gMaxBulkSize = DEFAULT_MAX_BULK_SIZE; + OrchDaemon::OrchDaemon(DBConnector *applDb, DBConnector *configDb, DBConnector *stateDb, DBConnector *chassisAppDb) : m_applDb(applDb), m_configDb(configDb), diff --git a/orchagent/routeorch.cpp b/orchagent/routeorch.cpp index fc5b2276a8c8..28f6eaef4d3a 100644 --- a/orchagent/routeorch.cpp +++ b/orchagent/routeorch.cpp @@ -18,6 +18,8 @@ extern PortsOrch *gPortsOrch; extern CrmOrch *gCrmOrch; extern Directory gDirectory; +extern size_t gMaxBulkSize; + /* Default maximum number of next hop groups */ #define DEFAULT_NUMBER_OF_ECMP_GROUPS 128 #define DEFAULT_MAX_ECMP_GROUP_SIZE 32 @@ -25,8 +27,8 @@ extern Directory gDirectory; const int routeorch_pri = 5; RouteOrch::RouteOrch(DBConnector *db, string tableName, SwitchOrch *switchOrch, NeighOrch *neighOrch, IntfsOrch *intfsOrch, VRFOrch *vrfOrch, FgNhgOrch *fgNhgOrch) : - gRouteBulker(sai_route_api), - gNextHopGroupMemberBulker(sai_next_hop_group_api, gSwitchId), + gRouteBulker(sai_route_api, gMaxBulkSize), + gNextHopGroupMemberBulker(sai_next_hop_group_api, gSwitchId, gMaxBulkSize), Orch(db, tableName, routeorch_pri), m_switchOrch(switchOrch), m_neighOrch(neighOrch), diff --git a/tests/mock_tests/bulker_ut.cpp b/tests/mock_tests/bulker_ut.cpp index 1cdabcdd148b..a2cdaa07a300 100644 --- a/tests/mock_tests/bulker_ut.cpp +++ b/tests/mock_tests/bulker_ut.cpp @@ -29,9 +29,12 @@ namespace bulker_test TEST_F(BulkerTest, BulkerAttrOrder) { // Create bulker - EntityBulker gRouteBulker(sai_route_api); + EntityBulker gRouteBulker(sai_route_api, 1000); deque object_statuses; + // Check max bulk size + ASSERT_EQ(gRouteBulker.max_bulk_size, 1000); + // Create a dummy route entry sai_route_entry_t route_entry; route_entry.destination.addr_family = SAI_IP_ADDR_FAMILY_IPV4;