diff --git a/.gitignore b/.gitignore index 13debec21ad4..04c3b514c769 100644 --- a/.gitignore +++ b/.gitignore @@ -74,6 +74,7 @@ swssconfig/swssplayer tlm_teamd/tlm_teamd teamsyncd/teamsyncd tests/tests +tests/mock_tests/tests_response_publisher tests/mock_tests/tests_fpmsyncd diff --git a/orchagent/orch.cpp b/orchagent/orch.cpp index 26093354c1db..5690d85dd417 100644 --- a/orchagent/orch.cpp +++ b/orchagent/orch.cpp @@ -562,6 +562,11 @@ void Orch::dumpPendingTasks(vector &ts) } } +void Orch::flushResponses() +{ + m_publisher.flush(); +} + void Orch::logfileReopen() { gRecordOfs.close(); diff --git a/orchagent/orch.h b/orchagent/orch.h index 6c620a3ef4e4..efee98a73c29 100644 --- a/orchagent/orch.h +++ b/orchagent/orch.h @@ -223,6 +223,11 @@ class Orch static void recordTuple(Consumer &consumer, const swss::KeyOpFieldsValuesTuple &tuple); void dumpPendingTasks(std::vector &ts); + + /** + * @brief Flush pending responses + */ + void flushResponses(); protected: ConsumerMap m_consumerMap; diff --git a/orchagent/orchdaemon.cpp b/orchagent/orchdaemon.cpp index 18295a9f59a8..5f432502566d 100644 --- a/orchagent/orchdaemon.cpp +++ b/orchagent/orchdaemon.cpp @@ -677,6 +677,11 @@ void OrchDaemon::flush() SWSS_LOG_ERROR("Failed to flush redis pipeline %d", status); handleSaiFailure(true); } + + for (auto* orch: m_orchList) + { + orch->flushResponses(); + } } /* Release the file handle so the log can be rotated */ diff --git a/orchagent/response_publisher.cpp b/orchagent/response_publisher.cpp index 5d0490167c57..169075faa495 100644 --- a/orchagent/response_publisher.cpp +++ b/orchagent/response_publisher.cpp @@ -90,7 +90,10 @@ void RecordResponse(const std::string &response_channel, const std::string &key, } // namespace -ResponsePublisher::ResponsePublisher() : m_db("APPL_STATE_DB", 0) +ResponsePublisher::ResponsePublisher(bool buffered) : + m_db(std::make_unique("APPL_STATE_DB", 0)), + m_pipe(std::make_unique(m_db.get())), + m_buffered(buffered) { } @@ -107,17 +110,14 @@ void ResponsePublisher::publish(const std::string &table, const std::string &key } std::string response_channel = "APPL_DB_" + table + "_RESPONSE_CHANNEL"; - if (m_notifiers.find(table) == m_notifiers.end()) - { - m_notifiers[table] = std::make_unique(&m_db, response_channel); - } + swss::NotificationProducer notificationProducer{m_pipe.get(), response_channel, m_buffered}; auto intent_attrs_copy = intent_attrs; // Add error message as the first field-value-pair. swss::FieldValueTuple err_str("err_str", PrependedComponent(status) + status.message()); intent_attrs_copy.insert(intent_attrs_copy.begin(), err_str); // Sends the response to the notification channel. - m_notifiers[table]->send(status.codeStr(), key, intent_attrs_copy); + notificationProducer.send(status.codeStr(), key, intent_attrs_copy); RecordResponse(response_channel, key, intent_attrs_copy, status.codeStr()); } @@ -140,17 +140,14 @@ void ResponsePublisher::publish(const std::string &table, const std::string &key void ResponsePublisher::writeToDB(const std::string &table, const std::string &key, const std::vector &values, const std::string &op, bool replace) { - if (m_tables.find(table) == m_tables.end()) - { - m_tables[table] = std::make_unique(&m_db, table); - } + swss::Table applStateTable{m_pipe.get(), table, m_buffered}; auto attrs = values; if (op == SET_COMMAND) { if (replace) { - m_tables[table]->del(key); + applStateTable.del(key); } if (!values.size()) { @@ -160,9 +157,9 @@ void ResponsePublisher::writeToDB(const std::string &table, const std::string &k // Write to DB only if the key does not exist or non-NULL attributes are // being written to the entry. std::vector fv; - if (!m_tables[table]->get(key, fv)) + if (!applStateTable.get(key, fv)) { - m_tables[table]->set(key, attrs); + applStateTable.set(key, attrs); RecordDBWrite(table, key, attrs, op); return; } @@ -179,13 +176,23 @@ void ResponsePublisher::writeToDB(const std::string &table, const std::string &k } if (attrs.size()) { - m_tables[table]->set(key, attrs); + applStateTable.set(key, attrs); RecordDBWrite(table, key, attrs, op); } } else if (op == DEL_COMMAND) { - m_tables[table]->del(key); + applStateTable.del(key); RecordDBWrite(table, key, {}, op); } } + +void ResponsePublisher::flush() +{ + m_pipe->flush(); +} + +void ResponsePublisher::setBuffered(bool buffered) +{ + m_buffered = buffered; +} diff --git a/orchagent/response_publisher.h b/orchagent/response_publisher.h index cd688112e860..db882d9c705a 100644 --- a/orchagent/response_publisher.h +++ b/orchagent/response_publisher.h @@ -16,7 +16,8 @@ class ResponsePublisher : public ResponsePublisherInterface { public: - explicit ResponsePublisher(); + explicit ResponsePublisher(bool buffered = false); + virtual ~ResponsePublisher() = default; // Intent attributes are the attributes sent in the notification into the @@ -42,10 +43,21 @@ class ResponsePublisher : public ResponsePublisherInterface void writeToDB(const std::string &table, const std::string &key, const std::vector &values, const std::string &op, bool replace = false) override; + /** + * @brief Flush pending responses + */ + void flush(); + + /** + * @brief Set buffering mode + * + * @param buffered Flag whether responses are buffered + */ + void setBuffered(bool buffered); + private: - swss::DBConnector m_db; - // Maps table names to tables. - std::unordered_map> m_tables; - // Maps table names to notifiers. - std::unordered_map> m_notifiers; + std::unique_ptr m_db; + std::unique_ptr m_pipe; + + bool m_buffered{false}; }; diff --git a/tests/mock_tests/Makefile.am b/tests/mock_tests/Makefile.am index 02bb54dd25ba..6b648d5f4547 100644 --- a/tests/mock_tests/Makefile.am +++ b/tests/mock_tests/Makefile.am @@ -4,9 +4,9 @@ P4_ORCH_DIR = $(top_srcdir)/orchagent/p4orch CFLAGS_SAI = -I /usr/include/sai -TESTS = tests tests_intfmgrd tests_portsyncd tests_fpmsyncd +TESTS = tests tests_intfmgrd tests_portsyncd tests_fpmsyncd tests_response_publisher -noinst_PROGRAMS = tests tests_intfmgrd tests_portsyncd tests_fpmsyncd +noinst_PROGRAMS = tests tests_intfmgrd tests_portsyncd tests_fpmsyncd tests_response_publisher LDADD_SAI = -lsaimeta -lsaimetadata -lsaivs -lsairedis @@ -183,3 +183,20 @@ tests_fpmsyncd_CFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_GTEST tests_fpmsyncd_CPPFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_GTEST) $(CFLAGS_SAI) $(tests_fpmsyncd_INCLUDES) tests_fpmsyncd_LDADD = $(LDADD_GTEST) $(LDADD_SAI) -lnl-genl-3 -lhiredis -lhiredis \ -lswsscommon -lswsscommon -lgtest -lgtest_main -lzmq -lnl-3 -lnl-route-3 -lpthread -lgmock -lgmock_main + +## response publisher unit tests + +tests_response_publisher_SOURCES = response_publisher/response_publisher_ut.cpp \ + $(top_srcdir)/orchagent/response_publisher.cpp \ + mock_orchagent_main.cpp \ + mock_dbconnector.cpp \ + mock_table.cpp \ + mock_hiredis.cpp \ + mock_redisreply.cpp + +tests_response_publisher_INCLUDES = $(tests_INCLUDES) +tests_response_publisher_CFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_GTEST) $(CFLAGS_SAI) +tests_response_publisher_CPPFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_GTEST) $(CFLAGS_SAI) $(tests_response_publisher_INCLUDES) +tests_response_publisher_LDADD = $(LDADD_GTEST) $(LDADD_SAI) -lnl-genl-3 -lhiredis -lhiredis \ + -lswsscommon -lswsscommon -lgtest -lgtest_main -lzmq -lnl-3 -lnl-route-3 -lpthread + diff --git a/tests/mock_tests/fake_response_publisher.cpp b/tests/mock_tests/fake_response_publisher.cpp index 94480913d50e..4c2c2b037098 100644 --- a/tests/mock_tests/fake_response_publisher.cpp +++ b/tests/mock_tests/fake_response_publisher.cpp @@ -3,7 +3,7 @@ #include "response_publisher.h" -ResponsePublisher::ResponsePublisher() : m_db("APPL_STATE_DB", 0) {} +ResponsePublisher::ResponsePublisher(bool buffered) : m_db(std::make_unique("APPL_STATE_DB", 0)), m_buffered(buffered) {} void ResponsePublisher::publish( const std::string& table, const std::string& key, @@ -20,3 +20,7 @@ void ResponsePublisher::writeToDB( const std::string& table, const std::string& key, const std::vector& values, const std::string& op, bool replace) {} + +void ResponsePublisher::flush() {} + +void ResponsePublisher::setBuffered(bool buffered) {} diff --git a/tests/mock_tests/response_publisher/response_publisher_ut.cpp b/tests/mock_tests/response_publisher/response_publisher_ut.cpp new file mode 100644 index 000000000000..3738ac6d8752 --- /dev/null +++ b/tests/mock_tests/response_publisher/response_publisher_ut.cpp @@ -0,0 +1,37 @@ +#include "response_publisher.h" + +#include + +bool gResponsePublisherRecord{false}; +bool gResponsePublisherLogRotate{false}; +std::ofstream gResponsePublisherRecordOfs; +std::string gResponsePublisherRecordFile; + +using namespace swss; + +TEST(ResponsePublisher, TestPublish) +{ + DBConnector conn{"APPL_STATE_DB", 0}; + Table stateTable{&conn, "SOME_TABLE"}; + std::string value; + ResponsePublisher publisher{}; + + publisher.publish("SOME_TABLE", "SOME_KEY", {{"field", "value"}}, ReturnCode(SAI_STATUS_SUCCESS)); + ASSERT_TRUE(stateTable.hget("SOME_KEY", "field", value)); + ASSERT_EQ(value, "value"); +} + +TEST(ResponsePublisher, TestPublishBuffered) +{ + DBConnector conn{"APPL_STATE_DB", 0}; + Table stateTable{&conn, "SOME_TABLE"}; + std::string value; + ResponsePublisher publisher{}; + + publisher.setBuffered(true); + + publisher.publish("SOME_TABLE", "SOME_KEY", {{"field", "value"}}, ReturnCode(SAI_STATUS_SUCCESS)); + publisher.flush(); + ASSERT_TRUE(stateTable.hget("SOME_KEY", "field", value)); + ASSERT_EQ(value, "value"); +}