Skip to content

Commit

Permalink
[ResponsePublisher] add pipeline support (sonic-net#2511)
Browse files Browse the repository at this point in the history
* [ResponsePublisher] add pipeline support

Why I did it

I did it to improve performance when sending many responses. Responses are buffered in redis client before beeing sent out to redis server, while orchagent has no more pending tasks to do, responses are flushed to redis.
  • Loading branch information
stepanblyschak authored Feb 9, 2023
1 parent 44ea6a0 commit 4df5cab
Show file tree
Hide file tree
Showing 9 changed files with 117 additions and 24 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ swssconfig/swssplayer
tlm_teamd/tlm_teamd
teamsyncd/teamsyncd
tests/tests
tests/mock_tests/tests_response_publisher
tests/mock_tests/tests_fpmsyncd


Expand Down
5 changes: 5 additions & 0 deletions orchagent/orch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,11 @@ void Orch::dumpPendingTasks(vector<string> &ts)
}
}

void Orch::flushResponses()
{
m_publisher.flush();
}

void Orch::logfileReopen()
{
gRecordOfs.close();
Expand Down
5 changes: 5 additions & 0 deletions orchagent/orch.h
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,11 @@ class Orch
static void recordTuple(Consumer &consumer, const swss::KeyOpFieldsValuesTuple &tuple);

void dumpPendingTasks(std::vector<std::string> &ts);

/**
* @brief Flush pending responses
*/
void flushResponses();
protected:
ConsumerMap m_consumerMap;

Expand Down
5 changes: 5 additions & 0 deletions orchagent/orchdaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,11 @@ void OrchDaemon::flush()
SWSS_LOG_ERROR("Failed to flush redis pipeline %d", status);
handleSaiFailure(true);
}

for (auto* orch: m_orchList)
{
orch->flushResponses();
}
}

/* Release the file handle so the log can be rotated */
Expand Down
37 changes: 22 additions & 15 deletions orchagent/response_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ void RecordResponse(const std::string &response_channel, const std::string &key,

} // namespace

ResponsePublisher::ResponsePublisher() : m_db("APPL_STATE_DB", 0)
ResponsePublisher::ResponsePublisher(bool buffered) :
m_db(std::make_unique<swss::DBConnector>("APPL_STATE_DB", 0)),
m_pipe(std::make_unique<swss::RedisPipeline>(m_db.get())),
m_buffered(buffered)
{
}

Expand All @@ -107,17 +110,14 @@ void ResponsePublisher::publish(const std::string &table, const std::string &key
}

std::string response_channel = "APPL_DB_" + table + "_RESPONSE_CHANNEL";
if (m_notifiers.find(table) == m_notifiers.end())
{
m_notifiers[table] = std::make_unique<swss::NotificationProducer>(&m_db, response_channel);
}
swss::NotificationProducer notificationProducer{m_pipe.get(), response_channel, m_buffered};

auto intent_attrs_copy = intent_attrs;
// Add error message as the first field-value-pair.
swss::FieldValueTuple err_str("err_str", PrependedComponent(status) + status.message());
intent_attrs_copy.insert(intent_attrs_copy.begin(), err_str);
// Sends the response to the notification channel.
m_notifiers[table]->send(status.codeStr(), key, intent_attrs_copy);
notificationProducer.send(status.codeStr(), key, intent_attrs_copy);
RecordResponse(response_channel, key, intent_attrs_copy, status.codeStr());
}

Expand All @@ -140,17 +140,14 @@ void ResponsePublisher::publish(const std::string &table, const std::string &key
void ResponsePublisher::writeToDB(const std::string &table, const std::string &key,
const std::vector<swss::FieldValueTuple> &values, const std::string &op, bool replace)
{
if (m_tables.find(table) == m_tables.end())
{
m_tables[table] = std::make_unique<swss::Table>(&m_db, table);
}
swss::Table applStateTable{m_pipe.get(), table, m_buffered};

auto attrs = values;
if (op == SET_COMMAND)
{
if (replace)
{
m_tables[table]->del(key);
applStateTable.del(key);
}
if (!values.size())
{
Expand All @@ -160,9 +157,9 @@ void ResponsePublisher::writeToDB(const std::string &table, const std::string &k
// Write to DB only if the key does not exist or non-NULL attributes are
// being written to the entry.
std::vector<swss::FieldValueTuple> fv;
if (!m_tables[table]->get(key, fv))
if (!applStateTable.get(key, fv))
{
m_tables[table]->set(key, attrs);
applStateTable.set(key, attrs);
RecordDBWrite(table, key, attrs, op);
return;
}
Expand All @@ -179,13 +176,23 @@ void ResponsePublisher::writeToDB(const std::string &table, const std::string &k
}
if (attrs.size())
{
m_tables[table]->set(key, attrs);
applStateTable.set(key, attrs);
RecordDBWrite(table, key, attrs, op);
}
}
else if (op == DEL_COMMAND)
{
m_tables[table]->del(key);
applStateTable.del(key);
RecordDBWrite(table, key, {}, op);
}
}

void ResponsePublisher::flush()
{
m_pipe->flush();
}

void ResponsePublisher::setBuffered(bool buffered)
{
m_buffered = buffered;
}
24 changes: 18 additions & 6 deletions orchagent/response_publisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
class ResponsePublisher : public ResponsePublisherInterface
{
public:
explicit ResponsePublisher();
explicit ResponsePublisher(bool buffered = false);

virtual ~ResponsePublisher() = default;

// Intent attributes are the attributes sent in the notification into the
Expand All @@ -42,10 +43,21 @@ class ResponsePublisher : public ResponsePublisherInterface
void writeToDB(const std::string &table, const std::string &key, const std::vector<swss::FieldValueTuple> &values,
const std::string &op, bool replace = false) override;

/**
* @brief Flush pending responses
*/
void flush();

/**
* @brief Set buffering mode
*
* @param buffered Flag whether responses are buffered
*/
void setBuffered(bool buffered);

private:
swss::DBConnector m_db;
// Maps table names to tables.
std::unordered_map<std::string, std::unique_ptr<swss::Table>> m_tables;
// Maps table names to notifiers.
std::unordered_map<std::string, std::unique_ptr<swss::NotificationProducer>> m_notifiers;
std::unique_ptr<swss::DBConnector> m_db;
std::unique_ptr<swss::RedisPipeline> m_pipe;

bool m_buffered{false};
};
21 changes: 19 additions & 2 deletions tests/mock_tests/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ P4_ORCH_DIR = $(top_srcdir)/orchagent/p4orch

CFLAGS_SAI = -I /usr/include/sai

TESTS = tests tests_intfmgrd tests_portsyncd tests_fpmsyncd
TESTS = tests tests_intfmgrd tests_portsyncd tests_fpmsyncd tests_response_publisher

noinst_PROGRAMS = tests tests_intfmgrd tests_portsyncd tests_fpmsyncd
noinst_PROGRAMS = tests tests_intfmgrd tests_portsyncd tests_fpmsyncd tests_response_publisher

LDADD_SAI = -lsaimeta -lsaimetadata -lsaivs -lsairedis

Expand Down Expand Up @@ -183,3 +183,20 @@ tests_fpmsyncd_CFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_GTEST
tests_fpmsyncd_CPPFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_GTEST) $(CFLAGS_SAI) $(tests_fpmsyncd_INCLUDES)
tests_fpmsyncd_LDADD = $(LDADD_GTEST) $(LDADD_SAI) -lnl-genl-3 -lhiredis -lhiredis \
-lswsscommon -lswsscommon -lgtest -lgtest_main -lzmq -lnl-3 -lnl-route-3 -lpthread -lgmock -lgmock_main

## response publisher unit tests

tests_response_publisher_SOURCES = response_publisher/response_publisher_ut.cpp \
$(top_srcdir)/orchagent/response_publisher.cpp \
mock_orchagent_main.cpp \
mock_dbconnector.cpp \
mock_table.cpp \
mock_hiredis.cpp \
mock_redisreply.cpp

tests_response_publisher_INCLUDES = $(tests_INCLUDES)
tests_response_publisher_CFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_GTEST) $(CFLAGS_SAI)
tests_response_publisher_CPPFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_GTEST) $(CFLAGS_SAI) $(tests_response_publisher_INCLUDES)
tests_response_publisher_LDADD = $(LDADD_GTEST) $(LDADD_SAI) -lnl-genl-3 -lhiredis -lhiredis \
-lswsscommon -lswsscommon -lgtest -lgtest_main -lzmq -lnl-3 -lnl-route-3 -lpthread

6 changes: 5 additions & 1 deletion tests/mock_tests/fake_response_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

#include "response_publisher.h"

ResponsePublisher::ResponsePublisher() : m_db("APPL_STATE_DB", 0) {}
ResponsePublisher::ResponsePublisher(bool buffered) : m_db(std::make_unique<swss::DBConnector>("APPL_STATE_DB", 0)), m_buffered(buffered) {}

void ResponsePublisher::publish(
const std::string& table, const std::string& key,
Expand All @@ -20,3 +20,7 @@ void ResponsePublisher::writeToDB(
const std::string& table, const std::string& key,
const std::vector<swss::FieldValueTuple>& values, const std::string& op,
bool replace) {}

void ResponsePublisher::flush() {}

void ResponsePublisher::setBuffered(bool buffered) {}
37 changes: 37 additions & 0 deletions tests/mock_tests/response_publisher/response_publisher_ut.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#include "response_publisher.h"

#include <gtest/gtest.h>

bool gResponsePublisherRecord{false};
bool gResponsePublisherLogRotate{false};
std::ofstream gResponsePublisherRecordOfs;
std::string gResponsePublisherRecordFile;

using namespace swss;

TEST(ResponsePublisher, TestPublish)
{
DBConnector conn{"APPL_STATE_DB", 0};
Table stateTable{&conn, "SOME_TABLE"};
std::string value;
ResponsePublisher publisher{};

publisher.publish("SOME_TABLE", "SOME_KEY", {{"field", "value"}}, ReturnCode(SAI_STATUS_SUCCESS));
ASSERT_TRUE(stateTable.hget("SOME_KEY", "field", value));
ASSERT_EQ(value, "value");
}

TEST(ResponsePublisher, TestPublishBuffered)
{
DBConnector conn{"APPL_STATE_DB", 0};
Table stateTable{&conn, "SOME_TABLE"};
std::string value;
ResponsePublisher publisher{};

publisher.setBuffered(true);

publisher.publish("SOME_TABLE", "SOME_KEY", {{"field", "value"}}, ReturnCode(SAI_STATUS_SUCCESS));
publisher.flush();
ASSERT_TRUE(stateTable.hget("SOME_KEY", "field", value));
ASSERT_EQ(value, "value");
}

0 comments on commit 4df5cab

Please sign in to comment.