diff --git a/Server/CMakeLists.txt b/Server/CMakeLists.txt index 9a2f57a..ec7c9e0 100644 --- a/Server/CMakeLists.txt +++ b/Server/CMakeLists.txt @@ -15,7 +15,6 @@ if(ENABLE_REDIS) add_definitions(-DREDIS_ENABLED) endif() - # Find and set the env for the mysql c++ connector set(HINT_ROOT_DIR "${HINT_ROOT_DIR}" @@ -40,30 +39,57 @@ find_library(LIBYAML_CPP_LIBRARY lib64 lib) -find_path(LIBRDKAFKA_INCLUDE_DIR - librdkafka/rdkafkacpp.h - HINTS - ${HINT_ROOT_DIR} - PATH_SUFFIXES - include) - -find_library(LIBRDKAFKA_LIBRARY - NAMES - librdkafka.a rdkafka - HINTS - ${HINT_ROOT_DIR} - PATH_SUFFIXES - lib64 - lib) - -find_library(LIBRDKAFKA_CPP_LIBRARY - NAMES - librdkafka++.a rdkafka++ - HINTS - ${HINT_ROOT_DIR} - PATH_SUFFIXES - lib64 - lib) +if (NOT ENABLE_REDIS) + find_path(LIBRDKAFKA_INCLUDE_DIR + librdkafka/rdkafkacpp.h + HINTS + ${HINT_ROOT_DIR} + PATH_SUFFIXES + include) + + find_library(LIBRDKAFKA_LIBRARY + NAMES + librdkafka.a rdkafka + HINTS + ${HINT_ROOT_DIR} + PATH_SUFFIXES + lib64 + lib) + + find_library(LIBRDKAFKA_CPP_LIBRARY + NAMES + librdkafka++.a rdkafka++ + HINTS + ${HINT_ROOT_DIR} + PATH_SUFFIXES + lib64 + lib) +else () + find_path(LIBSWSSCOMMON_INCLUDE_DIR + swss/dbconnector.h + HINTS + ${HINT_ROOT_DIR} + PATH_SUFFIXES + include) + + find_library(LIBHIREDIS_LIBRARY + NAMES + hiredis + HINTS + ${HINT_ROOT_DIR} + PATH_SUFFIXES + lib64 + lib) + + find_library(LIBSWSSCOMMON_LIBRARY + NAMES + swsscommon + HINTS + ${HINT_ROOT_DIR} + PATH_SUFFIXES + lib64 + lib) +endif () find_library(LIBRT_LIBRARY NAMES @@ -74,10 +100,23 @@ find_library(LIBRT_LIBRARY lib64 lib) -if (NOT LIBRDKAFKA_INCLUDE_DIR OR NOT LIBRDKAFKA_LIBRARY OR NOT LIBRDKAFKA_CPP_LIBRARY) - Message (FATAL_ERROR "Librdkafka was not found, cannot proceed. Visit https://github.com/edenhill/librdkafka for details on how to install it.") -#else () -# Message ("lib = " ${LIBRDKAFKA_LIBRARY}) +if (ENABLE_REDIS) + if (NOT LIBHIREDIS_LIBRARY) + Message (FATAL_ERROR "Libhiredis was not found, cannot proceed. Visit https://github.com/redis/hiredis for details on how to install it.") + else () + Message ("lib = " ${LIBHIREDIS_LIBRARY}) + endif() + if (NOT LIBSWSSCOMMON_INCLUDE_DIR OR NOT LIBSWSSCOMMON_LIBRARY) + Message (FATAL_ERROR "swsscommon was not found, cannot proceed. Visit https://github.com/sonic-net/sonic-swss-common for details on how to install it.") + else () + Message ("lib = " ${LIBSWSSCOMMON_LIBRARY}) + endif() +else () + if (NOT LIBRDKAFKA_INCLUDE_DIR OR NOT LIBRDKAFKA_LIBRARY OR NOT LIBRDKAFKA_CPP_LIBRARY) + Message (FATAL_ERROR "Librdkafka was not found, cannot proceed. Visit https://github.com/edenhill/librdkafka for details on how to install it.") + else () + Message ("lib = " ${LIBRDKAFKA_LIBRARY}) + endif() endif() if (NOT LIBYAML_CPP_INCLUDE_DIR OR NOT LIBYAML_CPP_LIBRARY) @@ -89,7 +128,11 @@ if (NOT LIBRT_LIBRARY AND NOT MACOSX) endif() # Update the include dir -include_directories(${LIBRDKAFKA_INCLUDE_DIR} ${LIBYAML_CPP_INCLUDE_DIR} src/ src/bmp src/bgp src/bgp/linkstate src/kafka) +if (NOT ENABLE_REDIS) + include_directories(${LIBRDKAFKA_INCLUDE_DIR} ${LIBYAML_CPP_INCLUDE_DIR} src/ src/bmp src/bgp src/bgp/linkstate src/kafka) +else() + include_directories(${LIBSWSSCOMMON_INCLUDE_DIR} ${LIBYAML_CPP_INCLUDE_DIR} src/ src/bmp src/bgp src/bgp/linkstate src/redis) +endif() #link_directories(${LIBRDKAFKA_LIBRARY}) @@ -97,11 +140,6 @@ include_directories(${LIBRDKAFKA_INCLUDE_DIR} ${LIBYAML_CPP_INCLUDE_DIR} src/ sr set (SRC_FILES src/bmp/BMPListener.cpp src/bmp/BMPReader.cpp - src/kafka/MsgBusImpl_kafka.cpp - src/kafka/KafkaEventCallback.cpp - src/kafka/KafkaDeliveryReportCallback.cpp - src/kafka/KafkaTopicSelector.cpp - src/kafka/KafkaPeerPartitionerCallback.cpp src/openbmp.cpp src/bmp/parseBMP.cpp src/md5.cpp @@ -121,36 +159,51 @@ set (SRC_FILES src/bgp/linkstate/MPLinkStateAttr.cpp ) +# Add specific files used +if (NOT ENABLE_REDIS) + # Add Kafka-specific source files + file(GLOB KAFKA_FILES src/kafka/MsgBusImpl_kafka.cpp src/kafka/KafkaEventCallback.cpp src/kafka/KafkaDeliveryReportCallback.cpp src/kafka/KafkaTopicSelector.cpp src/kafka/KafkaPeerPartitionerCallback.cpp) + list(APPEND SRC_FILES ${KAFKA_FILES}) +else () + # Add Redis-specific source files + file(GLOB REDIS_FILES src/RedisManager.cpp src/redis/MsgBusImpl_redis.cpp) + list(APPEND SRC_FILES ${REDIS_FILES}) +endif () + # Disable warnings add_definitions ("-Wno-unused-result") -# Add C++11 +# Add C++14 if ("${CMAKE_CXX_COMPILER_ID}" STREQUAL "Clang" OR CMAKE_COMPILER_IS_GNUCXX) include(CheckCXXCompilerFlag) - check_cxx_compiler_flag(--std=c++11 SUPPORTS_STD_CXX11) + check_cxx_compiler_flag(--std=c++14 SUPPORTS_STD_CXX14) check_cxx_compiler_flag(--std=c++0x SUPPORTS_STD_CXX01) - if(SUPPORTS_STD_CXX11) - set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} --std=c++11") - set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} --std=c++11") + if(SUPPORTS_STD_CXX14) + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} --std=c++14") + set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} --std=c++14") elseif(SUPPORTS_STD_CXX01) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} --std=c++0x") set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} --std=c++0x") else() - message(ERROR "Compiler does not support --std=c++11 or --std=c++0x. Upgrade gcc 4.7 or greater") + message(ERROR "Compiler does not support --std=c++14 or --std=c++0x. Upgrade gcc 4.7 or greater") endif() endif() # Set the libs to link -set (LIBS pthread ${LIBYAML_CPP_LIBRARY} ${LIBRDKAFKA_CPP_LIBRARY} ${LIBRDKAFKA_LIBRARY} z ${SSL_LIBS} dl) +if (NOT ENABLE_REDIS) + set (LIBS pthread ${LIBYAML_CPP_LIBRARY} ${LIBRDKAFKA_CPP_LIBRARY} ${LIBRDKAFKA_LIBRARY} z ${SSL_LIBS} dl zstd) +else () + set (LIBS pthread ${LIBYAML_CPP_LIBRARY} z ${SSL_LIBS} dl) +endif () # Set the binary add_executable (openbmpd ${SRC_FILES}) # Link the binary -target_link_libraries (openbmpd ${LIBS}) +target_link_libraries (openbmpd ${LIBS} ${LIBSWSSCOMMON_LIBRARY}) if (LIBRT_LIBRARY) - target_link_libraries(openbmpd ${LIBRT_LIBRARY}) + target_link_libraries(openbmpd ${LIBRT_LIBRARY} ${LIBSWSSCOMMON_LIBRARY}) endif() # Install the binary and configs diff --git a/Server/src/Config.cpp b/Server/src/Config.cpp index c5ecbf5..d3f966c 100644 --- a/Server/src/Config.cpp +++ b/Server/src/Config.cpp @@ -21,7 +21,10 @@ #include #include "Config.h" + +#ifndef REDIS_ENABLED #include "kafka/KafkaTopicSelector.h" +#endif /*********************************************************************//** * Constructor for class @@ -62,6 +65,7 @@ Config::Config() { * The keys match the configuration node/vars. Topic name nodes will be ignored if * not initialized here. */ +#ifndef REDIS_ENABLED topic_names_map[MSGBUS_TOPIC_VAR_COLLECTOR] = MSGBUS_TOPIC_COLLECTOR; topic_names_map[MSGBUS_TOPIC_VAR_ROUTER] = MSGBUS_TOPIC_ROUTER; topic_names_map[MSGBUS_TOPIC_VAR_PEER] = MSGBUS_TOPIC_PEER; @@ -74,6 +78,7 @@ Config::Config() { topic_names_map[MSGBUS_TOPIC_VAR_LS_PREFIX] = MSGBUS_TOPIC_LS_PREFIX; topic_names_map[MSGBUS_TOPIC_VAR_L3VPN] = MSGBUS_TOPIC_L3VPN; topic_names_map[MSGBUS_TOPIC_VAR_EVPN] = MSGBUS_TOPIC_EVPN; +#endif } /*********************************************************************//** diff --git a/Server/src/RedisManager.cpp b/Server/src/RedisManager.cpp new file mode 100644 index 0000000..6cb79d3 --- /dev/null +++ b/Server/src/RedisManager.cpp @@ -0,0 +1,154 @@ +/* + * Copyright (c) 2024 Microsoft, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + * + */ + +#include "RedisManager.h" + + +/*********************************************************************//** + * Constructor for class + ***********************************************************************/ +RedisManager::RedisManager() { + exit_ = false; +} + +/*********************************************************************//** + * Constructor for class + ***********************************************************************/ +RedisManager::~RedisManager() { +} + + +/********************************************************************* + * Setup for this class + * + * \param [in] logPtr logger pointer + ***********************************************************************/ +void RedisManager::Setup(Logger *logPtr) { + logger = logPtr; + if (!swss::SonicDBConfig::isInit()) { + swss::SonicDBConfig::initialize(); + } + + stateDb_ = std::make_shared(BMP_DB_NAME, 0, false); + separator_ = swss::SonicDBConfig::getSeparator(BMP_DB_NAME); +} + + + +/** + * Get Key separator for deletion + * + * \param [in] N/A + */ +std::string RedisManager::GetKeySeparator() { + return separator_; +} + + +/** + * WriteBMPTable + * + * \param [in] table Reference to table name + * \param [in] key Reference to various keys list + * \param [in] fieldValues Reference to field-value pairs + */ +bool RedisManager::WriteBMPTable(const std::string& table, const std::vector& keys, const std::vector fieldValues) { + + if (enabledTables_.find(table) == enabledTables_.end()) { + LOG_INFO("RedisManager %s is disabled", table.c_str()); + return false; + } + std::unique_ptr stateBMPTable = std::make_unique(stateDb_.get(), table); + std::ostringstream oss; + for (const auto& key : keys) { + oss << key << separator_; + } + std::string fullKey = oss.str(); + fullKey.pop_back(); + + DEBUG("RedisManager WriteBMPTable key = %s", fullKey.c_str()); + + stateBMPTable->set(fullKey, fieldValues); + return true; +} + + +/** + * RemoveEntityFromBMPTable + * + * \param [in] keys Reference to various keys + */ +bool RedisManager::RemoveEntityFromBMPTable(const std::vector& keys) { + + for (const auto& key : keys) { + DEBUG("RedisManager RemoveEntityFromBMPTable key = %s", key.c_str()); + } + stateDb_->del(keys); + return true; +} + + +/** + * ExitRedisManager + * + * \param [in] N/A + */ +void RedisManager::ExitRedisManager() { + exit_ = true; +} + + +/** + * InitBMPConfig, read config_db for table enablement setting. + * + * \param [in] N/A + */ +bool RedisManager::InitBMPConfig() { + std::shared_ptr cfgDb = + std::make_shared("CONFIG_DB", 0, false); + std::unique_ptr cfgTable = std::make_unique(cfgDb.get(), BMP_CFG_TABLE_NAME); + std::vector fvt; + cfgTable->get(BMP_CFG_TABLE_KEY, fvt); + for (const auto& item : fvt) { + if (item.second == "true") { + enabledTables_.insert(item.first); + } + } + return true; +} + + +/** + * Reset ResetBMPTable, this will flush redis + * + * \param [in] table Reference to table name BGP_NEIGHBOR_TABLE/BGP_RIB_OUT_TABLE/BGP_RIB_IN_TABLE + */ +void RedisManager::ResetBMPTable(const std::string & table) { + + LOG_INFO("RedisManager ResetBMPTable %s", table.c_str()); + std::unique_ptr stateBMPTable = std::make_unique(stateDb_.get(), table); + std::vector keys; + stateBMPTable->getKeys(keys); + stateDb_->del(keys); +} + + + +/** + * Reset all Tables once FRR reconnects to BMP, this will not disable table population + * + * \param [in] N/A + */ +void RedisManager::ResetAllTables() { + LOG_INFO("RedisManager ResetAllTables"); + + for (const auto& enabledTable : enabledTables_) { + ResetBMPTable(enabledTable); + } +} \ No newline at end of file diff --git a/Server/src/RedisManager.h b/Server/src/RedisManager.h new file mode 100644 index 0000000..c03375c --- /dev/null +++ b/Server/src/RedisManager.h @@ -0,0 +1,137 @@ +/* + * Copyright (c) 2024 Microsoft, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + * + */ + +#ifndef REDISMANAGER_H_ +#define REDISMANAGER_H_ + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include "Logger.h" +#include "Config.h" + + +/** + * BMP_TABLE_* defines the default table name prefix + */ +#define BMP_DB_NAME "BMP_STATE_DB" +#define BMP_TABLE_NEI "BGP_NEIGHBOR_TABLE" +#define BMP_TABLE_RIB_IN "BGP_RIB_IN_TABLE" +#define BMP_TABLE_RIB_OUT "BGP_RIB_OUT_TABLE" +#define BMP_TABLE_NEI_PREFIX "BGP_NEIGHBOR" + + +/** + * BMP_CFG_TABLE_* defines config db tables. + */ +#define BMP_CFG_TABLE_NAME "BMP" +#define BMP_CFG_TABLE_KEY "table" +#define BMP_CFG_TABLE_NEI "bgp_neighbor_table" +#define BMP_CFG_TABLE_RIB_IN "bgp-rib-in-table" +#define BMP_CFG_TABLE_RIB_OUT "bgp-rib-out-table" + +/** + * \class RedisManager + * + * \brief RedisManager class for openbmpd + * \details + * Encapsulate redis operation in this class instance. + */ +class RedisManager { + +public: + /*********************************************************************** + * Constructor for class + ***********************************************************************/ + RedisManager(); + + /*********************************************************************//** + * Destructor for class + ***********************************************************************/ + ~RedisManager(); + + /*********************************************************************** + * Setup logger for this class + * + * \param [in] logPtr logger pointer + */ + void Setup(Logger *logPtr); + + + /** + * ExitRedisManager + * + * \param [in] N/A + */ + void ExitRedisManager(); + + /** + * Reset all Tables once FRR reconnects to BMP, this will not disable table population + * + * \param [in] N/A + */ + void ResetAllTables(); + + /** + * Reset ResetBMPTable, this will flush redis + * + * \param [in] table Reference to table name BGP_NEIGHBOR_TABLE/BGP_RIB_OUT_TABLE/BGP_RIB_IN_TABLE + */ + void ResetBMPTable(const std::string & table); + + /** + * WriteBMPTable + * + * \param [in] table Reference to table name + * \param [in] key Reference to various keys list + * \param [in] fieldValues Reference to field-value pairs + */ + bool WriteBMPTable(const std::string& table, const std::vector& keys, const std::vector fieldValues); + + /** + * InitBMPConfig, read config_db for table enablement setting. + * + * \param [in] N/A + */ + bool InitBMPConfig(); + + /** + * RemoveEntityFromBMPTable + * + * \param [in] table Reference to table name + * \param [in] args Reference to various keys + */ + bool RemoveEntityFromBMPTable(const std::vector& keys); + + /** + * Get Key separator for deletion + * + * \param [in] N/A + */ + std::string GetKeySeparator(); + +private: + std::shared_ptr stateDb_; + std::string separator_; + Logger *logger; + std::unordered_set enabledTables_; + bool exit_; +}; + + +#endif /* RedisManager_H_ */ diff --git a/Server/src/bgp/parseBGP.cpp b/Server/src/bgp/parseBGP.cpp index 0c4c459..797be6d 100644 --- a/Server/src/bgp/parseBGP.cpp +++ b/Server/src/bgp/parseBGP.cpp @@ -350,24 +350,30 @@ void parseBGP::UpdateDB(bgp_msg::UpdateMsg::parsed_update_data &parsed_data) { /* * Update the path attributes */ + #ifndef REDIS_ENABLED UpdateDBAttrs(parsed_data.attrs); + #endif /* * Update the bgp-ls data */ + #ifndef REDIS_ENABLED UpdateDbBgpLs(false, parsed_data.ls, parsed_data.ls_attrs); UpdateDbBgpLs(true, parsed_data.ls_withdrawn, parsed_data.ls_attrs); + #endif /* * Update the advertised prefixes (both ipv4 and ipv6) */ UpdateDBAdvPrefixes(parsed_data.advertised, parsed_data.attrs); + #ifndef REDIS_ENABLED UpdateDBL3Vpn(false,parsed_data.vpn, parsed_data.attrs); UpdateDBL3Vpn(true,parsed_data.vpn_withdrawn, parsed_data.attrs); UpdateDBeVPN(false, parsed_data.evpn, parsed_data.attrs); UpdateDBeVPN(true, parsed_data.evpn_withdrawn, parsed_data.attrs); + #endif /* * Update withdraws (both ipv4 and ipv6) diff --git a/Server/src/client_thread.cpp b/Server/src/client_thread.cpp index 88ba63b..4b72eda 100644 --- a/Server/src/client_thread.cpp +++ b/Server/src/client_thread.cpp @@ -55,11 +55,12 @@ void ClientThread_cancel(void *arg) { delete cInfo->bmp_reader_thread; cInfo->bmp_reader_thread = NULL; } - +#ifndef REDIS_ENABLED if (cInfo->mbus != NULL) { delete cInfo->mbus; cInfo->mbus = NULL; } +#endif } } @@ -78,7 +79,9 @@ void *ClientThread(void *arg) { // Setup the client thread info struct ClientThreadInfo cInfo; +#ifndef REDIS_ENABLED cInfo.mbus = NULL; +#endif cInfo.client = &thr->client; cInfo.log = thr->log; cInfo.closing = false; @@ -94,12 +97,17 @@ void *ClientThread(void *arg) { pthread_cleanup_push(ClientThread_cancel, &cInfo); try { +#ifndef REDIS_ENABLED // connect to message bus cInfo.mbus = new msgBus_kafka(logger, thr->cfg, thr->cfg->c_hash_id); if (thr->cfg->debug_msgbus) cInfo.mbus->enableDebug(); - +#else + // connect to redis + cInfo.redis = std::make_shared(logger, thr->cfg, cInfo.client); + cInfo.redis->ResetAllTables(); +#endif BMPReader rBMP(logger, thr->cfg); LOG_INFO("Thread started to monitor BMP from router %s using socket %d buffer in bytes = %u", cInfo.client->c_ip, cInfo.client->c_sock, thr->cfg->bmp_buffer_size); @@ -113,10 +121,14 @@ void *ClientThread(void *arg) { * Create and start the reader thread to monitor the pipe fd (read end) */ bool bmp_run = true; +#ifndef REDIS_ENABLED //cInfo.bmp_reader_thread = new std::thread([&] {rBMP.readerThreadLoop(bmp_run,cInfo.client, cInfo.bmp_reader_thread = new std::thread(&BMPReader::readerThreadLoop, &rBMP, std::ref(bmp_run), cInfo.client, (MsgBusInterface *)cInfo.mbus ); - +#else + cInfo.bmp_reader_thread = new std::thread(&BMPReader::readerThreadLoop, &rBMP, std::ref(bmp_run), cInfo.client, + (MsgBusInterface *)cInfo.redis.get()); +#endif // Variables to handle circular buffer sock_buf = new unsigned char[thr->cfg->bmp_buffer_size]; int bytes_read = 0; @@ -275,11 +287,12 @@ void *ClientThread(void *arg) { cInfo.bmp_reader_thread = NULL; } - +#ifndef REDIS_ENABLED if (cInfo.mbus != NULL) { delete cInfo.mbus; cInfo.mbus = NULL; } +#endif } // Exit the thread diff --git a/Server/src/client_thread.h b/Server/src/client_thread.h index d816d0f..0365318 100644 --- a/Server/src/client_thread.h +++ b/Server/src/client_thread.h @@ -10,7 +10,13 @@ #ifndef CLIENT_THREAD_H_ #define CLIENT_THREAD_H_ +#ifndef REDIS_ENABLED #include "MsgBusImpl_kafka.h" +#else +#include "redis/MsgBusImpl_redis.h" +#include +#endif + #include "BMPListener.h" #include "Logger.h" #include "Config.h" @@ -28,7 +34,11 @@ struct ThreadMgmt { }; struct ClientThreadInfo { +#ifndef REDIS_ENABLED msgBus_kafka *mbus; +#else + std::shared_ptr redis; +#endif BMPListener::ClientInfo *client; Logger *log; diff --git a/Server/src/openbmp.cpp b/Server/src/openbmp.cpp index 447867a..3f706c3 100644 --- a/Server/src/openbmp.cpp +++ b/Server/src/openbmp.cpp @@ -16,7 +16,9 @@ */ #include "BMPListener.h" +#ifndef REDIS_ENABLED #include "MsgBusImpl_kafka.h" +#endif #include "MsgBusInterface.hpp" #include "client_thread.h" #include "openbmpd_version.h" @@ -358,6 +360,40 @@ bool ReadCmdArgs(int argc, char **argv, Config &cfg) { return false; } +/** + * Collector Update Message + * + * \param [in] cfg Pointer to config instance + * \param [in] cfg Reference to configuration + * \param [in] code reason code for the update + */ +#ifdef REDIS_ENABLED +void collector_update_msg(Config &cfg, + MsgBusInterface::collector_action_code code) { + MsgBusInterface::obj_collector oc; + + snprintf(oc.admin_id, sizeof(oc.admin_id), "%s", cfg.admin_id); + + oc.router_count = thr_list.size(); + + string router_ips; + for (int i=0; i < thr_list.size(); i++) { + //MsgBusInterface::hash_toStr(thr_list.at(i)->client.hash_id, hash_str); + if (router_ips.size() > 0) + router_ips.append(", "); + + router_ips.append(thr_list.at(i)->client.c_ip); + } + + snprintf(oc.routers, sizeof(oc.routers), "%s", router_ips.c_str()); + + timeval tv; + gettimeofday(&tv, NULL); + oc.timestamp_secs = tv.tv_sec; + oc.timestamp_us = tv.tv_usec; +} +#endif + /** * Collector Update Message * @@ -365,9 +401,9 @@ bool ReadCmdArgs(int argc, char **argv, Config &cfg) { * \param [in] cfg Reference to configuration * \param [in] code reason code for the update */ +#ifndef REDIS_ENABLED void collector_update_msg(msgBus_kafka *kafka, Config &cfg, MsgBusInterface::collector_action_code code) { - MsgBusInterface::obj_collector oc; snprintf(oc.admin_id, sizeof(oc.admin_id), "%s", cfg.admin_id); @@ -392,6 +428,7 @@ void collector_update_msg(msgBus_kafka *kafka, Config &cfg, kafka->update_Collector(oc, code); } +#endif /** * Run Server loop @@ -399,7 +436,9 @@ void collector_update_msg(msgBus_kafka *kafka, Config &cfg, * \param [in] cfg Reference to the config options */ void runServer(Config &cfg) { +#ifndef REDIS_ENABLED msgBus_kafka *kafka; +#endif int active_connections = 0; // Number of active connections/threads int concurrent_routers = 0; // Number of concurrent routers time_t last_heartbeat_time = 0; @@ -417,13 +456,19 @@ void runServer(Config &cfg) { memcpy(cfg.c_hash_id, hash_raw, 16); delete[] hash_raw; +#ifndef REDIS_ENABLED // Kafka connection kafka = new msgBus_kafka(logger, &cfg, cfg.c_hash_id); +#endif // allocate and start a new bmp server BMPListener *bmp_svr = new BMPListener(logger, &cfg); +#ifndef REDIS_ENABLED collector_update_msg(kafka, cfg, MsgBusInterface::COLLECTOR_ACTION_STARTED); +#else + collector_update_msg(cfg, MsgBusInterface::COLLECTOR_ACTION_STARTED); +#endif last_heartbeat_time = time(NULL); LOG_INFO("Ready. Waiting for connections"); @@ -449,9 +494,13 @@ void runServer(Config &cfg) { delete thr_list.at(i); thr_list.erase(thr_list.begin() + i); +#ifndef REDIS_ENABLED collector_update_msg(kafka, cfg, MsgBusInterface::COLLECTOR_ACTION_CHANGE); - +#else + collector_update_msg(cfg, + MsgBusInterface::COLLECTOR_ACTION_CHANGE); +#endif } else if (!thr_list.at(i)->baselineTimeout) { @@ -518,9 +567,13 @@ void runServer(Config &cfg) { // Free attribute pthread_attr_destroy(&thr_attr); +#ifndef REDIS_ENABLED collector_update_msg(kafka, cfg, MsgBusInterface::COLLECTOR_ACTION_CHANGE); - +#else + collector_update_msg(cfg, + MsgBusInterface::COLLECTOR_ACTION_CHANGE); +#endif last_heartbeat_time = time(NULL); } else { @@ -528,7 +581,11 @@ void runServer(Config &cfg) { // Send heartbeat if needed if ( (time(NULL) - last_heartbeat_time) >= cfg.heartbeat_interval) { +#ifndef REDIS_ENABLED collector_update_msg(kafka, cfg, MsgBusInterface::COLLECTOR_ACTION_HEARTBEAT); +#else + collector_update_msg(cfg, MsgBusInterface::COLLECTOR_ACTION_HEARTBEAT); +#endif last_heartbeat_time = time(NULL); } @@ -542,9 +599,12 @@ void runServer(Config &cfg) { } } +#ifndef REDIS_ENABLED collector_update_msg(kafka, cfg, MsgBusInterface::COLLECTOR_ACTION_STOPPED); delete kafka; - +#else + collector_update_msg(cfg, MsgBusInterface::COLLECTOR_ACTION_STOPPED); +#endif } catch (char const *str) { LOG_WARN(str); } diff --git a/Server/src/redis/MsgBusImpl_redis.cpp b/Server/src/redis/MsgBusImpl_redis.cpp new file mode 100644 index 0000000..061e3a4 --- /dev/null +++ b/Server/src/redis/MsgBusImpl_redis.cpp @@ -0,0 +1,244 @@ +#include +#include +#include + +#include + +#include +#include + +#include +#include + +#include "MsgBusImpl_redis.h" +#include "RedisManager.h" + +using namespace std; + +/******************************************************************//** + * \brief This function will initialize and connect to Kafka. + * + * \details It is expected that this class will start off with a new connection. + * + * \param [in] logPtr Pointer to Logger instance + * \param [in] cfg Pointer to the config instance + ********************************************************************/ +MsgBusImpl_redis::MsgBusImpl_redis(Logger *logPtr, Config *cfg, BMPListener::ClientInfo *client) { + logger = logPtr; + this->cfg = cfg; + redisMgr_.Setup(logPtr); + redisMgr_.InitBMPConfig(); +} + +/** + * Destructor + */ +MsgBusImpl_redis::~MsgBusImpl_redis() { + redisMgr_.ExitRedisManager(); +} + +/** + * Reset all Tables once FRR reconnects to BMP, this will not disable table population + * + * \param [in] N/A + */ +void MsgBusImpl_redis::ResetAllTables() { + redisMgr_.ResetAllTables(); +} + +/** + * Abstract method Implementation - See MsgBusInterface.hpp for details + */ +void MsgBusImpl_redis::update_Peer(obj_bgp_peer &peer, obj_peer_up_event *up, obj_peer_down_event *down, peer_action_code code) { + + // Below attributes will be populated if exists, and no matter bgp neighbor is up or down + vector fieldValues; + fieldValues.reserve(MAX_ATTRIBUTES_COUNT); + vector keys; + keys.emplace_back(peer.peer_addr); + + fieldValues.emplace_back(make_pair("peer_addr", peer.peer_addr)); + fieldValues.emplace_back(make_pair("peer_asn", to_string(peer.peer_as))); + fieldValues.emplace_back(make_pair("peer_rd", peer.peer_rd)); + fieldValues.emplace_back(make_pair("remote_port", to_string(up->remote_port))); + fieldValues.emplace_back(make_pair("local_asn", to_string(up->local_asn))); + fieldValues.emplace_back(make_pair("local_ip", up->local_ip)); + fieldValues.emplace_back(make_pair("local_port", to_string(up->local_port))); + fieldValues.emplace_back(make_pair("sent_cap", up->sent_cap)); + fieldValues.emplace_back(make_pair("recv_cap", up->recv_cap)); + + switch (code) { + case PEER_ACTION_DOWN: + { + // PEER DOWN only + fieldValues.emplace_back(make_pair("bgp_err_code", to_string(down->bgp_err_code))); + fieldValues.emplace_back(make_pair("bgp_err_subcode", to_string(down->bgp_err_subcode))); + fieldValues.emplace_back(make_pair("error_text", down->error_text)); + + } + break; + } + for (const auto& fieldValue : fieldValues) { + const std::string& field = std::get<0>(fieldValue); + const std::string& value = std::get<1>(fieldValue); + DEBUG("MsgBusImpl_redis update_Peer field = %s, value = %s", field.c_str(), value.c_str()); + } + + redisMgr_.WriteBMPTable(BMP_TABLE_NEI, keys, fieldValues); +} + + +/** + * Abstract method Implementation - See MsgBusInterface.hpp for details + */ +void MsgBusImpl_redis::update_unicastPrefix(obj_bgp_peer &peer, vector &rib, + obj_path_attr *attr, unicast_prefix_action_code code) { + if (attr == NULL) + return; + + vector del_keys; + string neigh = peer.peer_addr; + + for (size_t i = 0; i < rib.size(); i++) { + // Loop through the vector array of rib entries + vector addFieldValues; + addFieldValues.reserve(MAX_ATTRIBUTES_COUNT); + + // rib table schema as BGP_RIB_OUT_TABLE|192.181.168.0/25|10.0.0.59 + vector keys; + string redisMgr_pfx = rib[i].prefix; + redisMgr_pfx += "/"; + redisMgr_pfx += to_string(rib[i].prefix_len); + keys.reserve(MAX_ATTRIBUTES_COUNT); + keys.emplace_back(redisMgr_pfx); + keys.emplace_back(peer.peer_addr); + + switch (code) { + + case UNICAST_PREFIX_ACTION_ADD: + { + addFieldValues.emplace_back(make_pair("origin", attr->origin)); + addFieldValues.emplace_back(make_pair("as_path", attr->as_path)); + addFieldValues.emplace_back(make_pair("as_path_count", to_string(attr->as_path_count))); + addFieldValues.emplace_back(make_pair("origin_as", to_string(attr->origin_as))); + addFieldValues.emplace_back(make_pair("next_hop", attr->next_hop)); + addFieldValues.emplace_back(make_pair("local_pref", to_string(attr->local_pref))); + addFieldValues.emplace_back(make_pair("community_list", attr->community_list)); + addFieldValues.emplace_back(make_pair("ext_community_list", attr->ext_community_list)); + addFieldValues.emplace_back(make_pair("large_community_list", attr->large_community_list)); + addFieldValues.emplace_back(make_pair("originator_id", attr->originator_id)); + + for (const auto& fieldValue : addFieldValues) { + const std::string& field = std::get<0>(fieldValue); + const std::string& value = std::get<1>(fieldValue); + DEBUG("MsgBusImpl_redis update_unicastPrefix field = %s, value = %s", field.c_str(), value.c_str()); + } + if(peer.isAdjIn) + { + redisMgr_.WriteBMPTable(BMP_TABLE_RIB_IN, keys, addFieldValues); + } + else + { + redisMgr_.WriteBMPTable(BMP_TABLE_RIB_OUT, keys, addFieldValues); + } + } + break; + + case UNICAST_PREFIX_ACTION_DEL: + { + string com_key; + if(peer.isAdjIn) + { + com_key = BMP_TABLE_RIB_IN; + } + else + { + com_key = BMP_TABLE_RIB_OUT; + } + com_key += redisMgr_.GetKeySeparator(); + com_key += redisMgr_pfx; + com_key += redisMgr_.GetKeySeparator(); + com_key += neigh; + del_keys.push_back(com_key); + } + break; + } + } + + if (!del_keys.empty()) { + redisMgr_.RemoveEntityFromBMPTable(del_keys); + } +} + + +/** + * Abstract method Implementation - See MsgBusInterface.hpp for details + */ +void MsgBusImpl_redis::update_Collector(obj_collector &c_object, collector_action_code action_code) { +} + +/** + * Abstract method Implementation - See MsgBusInterface.hpp for details + */ +void MsgBusImpl_redis::update_Router(obj_router &r_object, router_action_code code) { +} + + +/** + * Abstract method Implementation - See MsgBusInterface.hpp for details + */ +void MsgBusImpl_redis::update_baseAttribute(obj_bgp_peer &peer, obj_path_attr &attr, base_attr_action_code code) { + +} + +/** + * Abstract method Implementation - See MsgBusInterface.hpp for details + */ +void MsgBusImpl_redis::update_L3Vpn(obj_bgp_peer &peer, vector &vpn, + obj_path_attr *attr, vpn_action_code code) { + +} + + +/** + * Abstract method Implementation - See MsgBusInterface.hpp for details + */ +void MsgBusImpl_redis::update_eVPN(obj_bgp_peer &peer, vector &vpn, + obj_path_attr *attr, vpn_action_code code) { +} + + +/** + * Abstract method Implementation - See MsgBusInterface.hpp for details + */ +void MsgBusImpl_redis::add_StatReport(obj_bgp_peer &peer, obj_stats_report &stats) { +} + +/** + * Abstract method Implementation - See MsgBusInterface.hpp for details + */ +void MsgBusImpl_redis::update_LsNode(obj_bgp_peer &peer, obj_path_attr &attr, std::list &nodes, + ls_action_code code) { +} + +/** + * Abstract method Implementation - See MsgBusInterface.hpp for details + */ +void MsgBusImpl_redis::update_LsLink(obj_bgp_peer &peer, obj_path_attr &attr, std::list &links, + ls_action_code code) { +} + +/** + * Abstract method Implementation - See MsgBusInterface.hpp for details + */ +void MsgBusImpl_redis::update_LsPrefix(obj_bgp_peer &peer, obj_path_attr &attr, std::list &prefixes, + ls_action_code code) { +} + +/** + * Abstract method Implementation - See MsgBusInterface.hpp for details + * + * TODO: Consolidate this to single produce method + */ +void MsgBusImpl_redis::send_bmp_raw(u_char *r_hash, obj_bgp_peer &peer, u_char *data, size_t data_len) { +} \ No newline at end of file diff --git a/Server/src/redis/MsgBusImpl_redis.h b/Server/src/redis/MsgBusImpl_redis.h new file mode 100644 index 0000000..56d0df8 --- /dev/null +++ b/Server/src/redis/MsgBusImpl_redis.h @@ -0,0 +1,77 @@ +#ifndef MSGBUSIMPL_REDIS_H_ +#define MSGBUSIMPL_REDIS_H_ + +#define HASH_SIZE 16 + +#include "MsgBusInterface.hpp" +#include "RedisManager.h" +#include "BMPListener.h" + +#include "Logger.h" +#include +#include +#include +#include + + + +#include "Config.h" + +/** + * \class MsgBusImpl_redis + * + * \brief Kafka message bus implementation + */ +class MsgBusImpl_redis: public MsgBusInterface { +public: + const int MAX_ATTRIBUTES_COUNT = 100; + + /******************************************************************//** + * \brief This function will initialize and connect to Kafka. + * + * \details It is expected that this class will start off with a new connection. + * + * \param [in] logPtr Pointer to Logger instance + * \param [in] cfg Pointer to the config instance + ********************************************************************/ + MsgBusImpl_redis(Logger *logPtr, Config *cfg, BMPListener::ClientInfo *client); + ~MsgBusImpl_redis(); + + /** + * Reset all Tables once FRR reconnects to BMP, this will not disable table population + * + * \param [in] N/A + */ + void ResetAllTables(); + + /* + * abstract methods implemented + * See MsgBusInterface.hpp for method details + */ + void update_Collector(struct obj_collector &c_obj, collector_action_code action_code); + void update_Router(struct obj_router &r_entry, router_action_code code); + void update_Peer(obj_bgp_peer &peer, obj_peer_up_event *up, obj_peer_down_event *down, peer_action_code code); + void update_baseAttribute(obj_bgp_peer &peer, obj_path_attr &attr, base_attr_action_code code); + void update_unicastPrefix(obj_bgp_peer &peer, std::vector &rib, obj_path_attr *attr, unicast_prefix_action_code code); + void add_StatReport(obj_bgp_peer &peer, obj_stats_report &stats); + + void update_LsNode(obj_bgp_peer &peer, obj_path_attr &attr, std::list &nodes, + ls_action_code code); + void update_LsLink(obj_bgp_peer &peer, obj_path_attr &attr, std::list &links, + ls_action_code code); + void update_LsPrefix(obj_bgp_peer &peer, obj_path_attr &attr, std::list &prefixes, + ls_action_code code); + + void update_L3Vpn(obj_bgp_peer &peer, std::vector &vpn, obj_path_attr *attr, vpn_action_code code); + + void update_eVPN(obj_bgp_peer &peer, std::vector &vpn, obj_path_attr *attr, vpn_action_code code); + + void send_bmp_raw(u_char *r_hash, obj_bgp_peer &peer, u_char *data, size_t data_len); + +private: + Logger *logger; ///< Logging class pointer + Config *cfg; ///< Pointer to config instance + RedisManager redisMgr_; +}; + +#endif /* MSGBUSIMPL_REDIS_H_ */ diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 92a51b0..226d23e 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -19,13 +19,11 @@ variables: ${{ else }}: value: $(Build.SourceBranchName) -resources: - repositories: - - repository: sonic-swss-common - type: github - name: sonic-net/sonic-swss-common - endpoint: sonic-net - ref: refs/heads/$(BUILD_BRANCH) + - name: CHECKOUT_BRANCH + ${{ if eq(variables['Build.Reason'], 'PullRequest') }}: + value: $(System.PullRequest.SourceBranch) + ${{ else }}: + value: $(Build.SourceBranchName) parameters: - name: arch @@ -47,7 +45,8 @@ parameters: - name: sonic_slave type: string - default: sonic-slave-bullseye + default: sonic-slave-bookworm + schedules: - cron: "0 0 * * *" @@ -63,63 +62,118 @@ jobs: pool: vmImage: 'ubuntu-20.04' container: - image: sonicdev-microsoft.azurecr.io:443/${{ parameters.sonic_slave }}:master - + image: sonicdev-microsoft.azurecr.io:443/${{ parameters.sonic_slave }}:latest steps: - - checkout: none - script: | - set -ex sudo apt-get update - sudo apt-get install -y gcc g++ libboost-dev cmake zlib1g-dev libsasl2-2 libsasl2-dev - sudo apt-get remove -y libcurl4-openssl-dev + sudo apt-get install -qq -y \ + libhiredis-dev \ + libnl-3-dev \ + libnl-genl-3-dev \ + libnl-route-3-dev \ + libnl-nf-3-dev \ + swig + + sudo apt-get install -y libsasl2-dev \ + libcurl4-openssl-dev \ + libssl-dev \ + gcc \ + g++ \ + libboost-dev \ + cmake \ + zlib1g-dev \ + libsasl2-2 \ + libzstd-dev git clone https://github.com/jbeder/yaml-cpp.git - pushd yaml-cpp + cd yaml-cpp mkdir build - pushd build + cd build cmake -DBUILD_SHARED_LIBS=OFF .. make sudo make install - popd - popd + cd ../../ + sudo apt-get remove -y libcurl4-openssl-dev git clone https://github.com/edenhill/librdkafka.git pushd librdkafka ./configure make sudo make install popd + displayName: 'Install openbmp dependencies' + + - checkout: self + clean: true + submodules: recursive + displayName: 'Checkout code' + + - task: DownloadPipelineArtifact@2 + inputs: + source: specific + project: build + pipeline: 142 + artifact: sonic-buildimage.vs + runVersion: 'latestFromBranch' + runBranch: 'refs/heads/$(BUILD_BRANCH)' + patterns: | + target/debs/bookworm/libyang*.deb + target/debs/bookworm/libnl*.deb + target/python-wheels/bookworm/sonic_yang_models*.whl + displayName: "Download bookworm debs" - # Sanity check - mkdir original_bmp - pushd original_bmp - git clone https://github.com/sonic-net/sonic-bmp.git - pushd sonic-bmp - git checkout $(BUILD_BRANCH) + - script: | + # LIBSWSSCOMMON + sudo apt-get -y purge libnl-3-dev libnl-route-3-dev + sudo dpkg -i ../target/debs/bookworm/libnl-3-200_*.deb + sudo dpkg -i ../target/debs/bookworm/libnl-genl-3-200_*.deb + sudo dpkg -i ../target/debs/bookworm/libnl-route-3-200_*.deb + sudo dpkg -i ../target/debs/bookworm/libnl-nf-3-200_*.deb + # LIBYANG + sudo dpkg -i ../target/debs/bookworm/libyang*1.0.73*.deb + # SONIC YANGS + set -ex + sudo pip3 install ../target/python-wheels/bookworm/sonic_yang_models-1.0-py3-none-any.whl + displayName: "Install libswsscommon dependencies" + + - task: DownloadPipelineArtifact@2 + inputs: + source: specific + project: build + pipeline: Azure.sonic-swss-common + artifact: sonic-swss-common-bookworm + runVersion: 'latestFromBranch' + runBranch: 'refs/heads/$(BUILD_BRANCH)' + displayName: "Download sonic-swss-common" + + - script: | + set -ex + # LIBSWSSCOMMON + sudo dpkg -i libswsscommon_1.0.0_amd64.deb + sudo dpkg -i libswsscommon-dev_1.0.0_amd64.deb + sudo dpkg -i python3-swsscommon_1.0.0_amd64.deb + workingDirectory: $(Pipeline.Workspace)/ + displayName: 'Install libswsscommon package' + + - script: | + ls -alh + git branch mkdir build pushd build cmake -DCMAKE_INSTALL_PREFIX:PATH=/usr ../ make popd - popd - popd + displayName: 'Build original openbmp for sanity check' - # sonic option - mkdir sonic_bmp - pushd sonic_bmp - git clone https://github.com/sonic-net/sonic-bmp.git - pushd sonic-bmp - git checkout $(BUILD_BRANCH) + - script: | + rm -rf build/ mkdir build pushd build - cmake -DENABLE_REDIS=ON -DCMAKE_INSTALL_PREFIX:PATH=/usr ../ + cmake -DCMAKE_INSTALL_PREFIX:PATH=/usr -DENABLE_REDIS=ON ../ make - popd - popd - popd + displayName: 'Build sonic-bmp' - publish: $(System.DefaultWorkingDirectory)/ artifact: sonic-bmp.amd64.ubuntu20_04 - displayName: "Archive sonic-bmp packages" - + displayName: "Archive sonic-bmp packages" \ No newline at end of file