Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NotificationProducer] add pipeline support #708

Merged
merged 6 commits into from
Jan 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ cc_library(
"common/*.hpp",
]),
copts = [
"-std=c++14",
Copy link
Contributor

@qiluo-msft qiluo-msft Jan 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

c++14

Other places are using c++11. Is it a concern?
Like: pyext/py3/Makefile.am
And sairedis using c++11 depends on swss-common. #Closed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@qiluo-msft I don't see a problem. Everything builds successfully. I don't think other projects need to stay with c++11. c++14 has been around for a long time and adopted by compiler. As long as the headers are c++11 compatible and ABI does not change - everything works.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a blocking issue?
If the change is really needed, I prefer changing the c++NN option in a consistent way, and change dependent libraries/applications first (swss->sairedis->swss-common). And change pyext/py[23]/Makefiles.am at the same time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sairedis is c++14 - https://github.com/sonic-net/sonic-sairedis/blob/master/configure.ac#L121
swss is c++14 - https://github.com/sonic-net/sonic-swss/blob/master/configure.ac#L57
I believe not every app using swss-common is now c++14, not to mention app extensions.
My point is that swss-common may use c++-14 in .cpp but must have c++-11 compatible headers for those apps that might not be able to migrate yet. Python swss-common bindings compile a wrapper cpp based of the headers, that's why py[23]/Makefile.am aren't changed and stay c++-11.

"-I/usr/include/libnl3", # Expected location in the SONiC build container"
],
includes = [
Expand Down
25 changes: 23 additions & 2 deletions common/notificationproducer.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
#include "notificationproducer.h"

#define NON_BUFFERED_COMMAND_BUFFER_SIZE 1

swss::NotificationProducer::NotificationProducer(swss::DBConnector *db, const std::string &channel):
m_db(db), m_channel(channel)
m_ownedpipe(std::make_unique<swss::RedisPipeline>(db, NON_BUFFERED_COMMAND_BUFFER_SIZE)), m_pipe(m_ownedpipe.get()), m_channel(channel), m_buffered(false)
Copy link
Contributor

@qiluo-msft qiluo-msft Jan 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

m_ownedpipe

You are assuming m_pipe initialized after m_ownedpipe initialized. The sequence is not well known knowledge. Suggest you move m_pipe to function body for simplicity purpose. #Closed

Copy link
Contributor Author

@stepanblyschak stepanblyschak Jan 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@qiluo-msft The order of member initialization is defined by the standard and is well known. Member variables are initialized in the order they are defined in the class definition. If by any chance members are reordered in the definition any modern compiler will raise a warning (-Wreorder in gcc) and many of them should be treated as errors. Since we use -Wall + -Werror it is going to be catched.

{
}

swss::NotificationProducer::NotificationProducer(swss::RedisPipeline *pipeline, const std::string &channel, bool buffered):
m_pipe(pipeline), m_channel(channel), m_buffered(buffered)
{
}

Expand All @@ -19,5 +26,19 @@ int64_t swss::NotificationProducer::send(const std::string &op, const std::strin

SWSS_LOG_DEBUG("channel %s, publish: %s", m_channel.c_str(), msg.c_str());

return m_db->publish(m_channel, msg);
RedisCommand command;
command.format("PUBLISH %s %s", m_channel.c_str(), msg.c_str());

if (m_buffered)
{
m_pipe->push(command, REDIS_REPLY_INTEGER);
Copy link
Contributor

@qiluo-msft qiluo-msft Jan 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

m_pipe

You may use m_ownedpipe directly. Are you defining two member variables for better performance? If yes, I prefer one variable for simplicity.
Then you can change var name to m_pipe. #Closed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@qiluo-msft NotificationProducer has two invariants 1) Created from DBConnector and having it's own RedisPipeline in m_ownedpipe smart pointer and m_pipe pointing to it 2) NotificationProducer created from RedisPipeline which is not owned - m_ownedpipe is nullptr, m_pipe is a non-owning raw pointer to RedisPipeline object.
So, m_pipe is always pointing to RedisPipeline that needs to be used regardless of how NotificationProducer has been created.

I see the same pattern beeing used for Table class (https://github.com/sonic-net/sonic-swss-common/blob/master/common/table.cpp#L38) and it was a motivation to follow the same desing.

However, the table class uses raw pointer and manual memory allocation/deallocation + a flag to indicate wether we own the RedisPipeline object. So I came up with a bit simpler approach.


// if operating in buffered mode return -1 as we can't know the number
// of client's that have read the message immediately
return -1;
Copy link
Contributor

@qiluo-msft qiluo-msft Jan 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-1

Is it better to use 1? Client will think it send it successsfully. #Closed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@qiluo-msft As stated by the comment on send() method the return code does not indicate the success or failure but the number of clients that read the message. In buffered mode we can't imidatelly tell how many clients read the message, so returning 1 might fool the user of this API that his consumer has already read message.

}

RedisReply reply = m_pipe->push(command);
reply.checkReplyType(REDIS_REPLY_INTEGER);
return reply.getReply<long long int>();
}
13 changes: 12 additions & 1 deletion common/notificationproducer.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "logger.h"
#include "table.h"
#include "redisreply.h"
#include "redispipeline.h"
#include "json.h"

namespace swss {
Expand All @@ -18,6 +19,14 @@ class NotificationProducer
public:
NotificationProducer(swss::DBConnector *db, const std::string &channel);

/**
* @brief Create NotificationProducer using RedisPipeline
* @param pipeline Pointer to RedisPipeline
* @param channel Channel name
* @param buffered Whether NotificationProducer will work in buffered mode
*/
NotificationProducer(RedisPipeline *pipeline, const std::string &channel, bool buffered = false);

// Returns: the number of clients that received the message
int64_t send(const std::string &op, const std::string &data, std::vector<FieldValueTuple> &values);

Expand All @@ -26,8 +35,10 @@ class NotificationProducer
NotificationProducer(const NotificationProducer &other);
NotificationProducer& operator = (const NotificationProducer &other);

swss::DBConnector *m_db;
std::unique_ptr<RedisPipeline> m_ownedpipe{};
RedisPipeline *m_pipe;
std::string m_channel;
bool m_buffered{false};
};

}
Expand Down
2 changes: 1 addition & 1 deletion configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ AC_PATH_PROGS(SWIG, [swig4.0 swig3.0 swig])
CFLAGS_COMMON=""
CFLAGS_COMMON+=" -ansi"
CFLAGS_COMMON+=" -fPIC"
CFLAGS_COMMON+=" -std=c++11"
CFLAGS_COMMON+=" -std=c++14"
CFLAGS_COMMON+=" -Wall"
CFLAGS_COMMON+=" -Wcast-align"
CFLAGS_COMMON+=" -Wcast-qual"
Expand Down
51 changes: 51 additions & 0 deletions tests/ntf_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,54 @@ TEST(Notifications, peek)
int rc = nc.peek();
EXPECT_EQ(rc, 0);
}

TEST(Notifications, pipelineProducer)
{
SWSS_LOG_ENTER();

swss::DBConnector dbNtf("ASIC_DB", 0, true);
swss::RedisPipeline pipeline{&dbNtf};
swss::NotificationConsumer nc(&dbNtf, "NOTIFICATIONS", 100, (size_t)10);
const bool buffered = true;
swss::NotificationProducer notifications(&pipeline, "NOTIFICATIONS", buffered);

std::vector<swss::FieldValueTuple> entry;
for(int i = 0; i < messages; i++)
{
auto s = std::to_string(i+1);
auto sentClients = notifications.send("ntf", s, entry);
// In buffered mode we get -1 in return
EXPECT_EQ(sentClients, -1);
}

// Flush the pipeline
pipeline.flush();

// Pop all the notifications
std::deque<swss::KeyOpFieldsValuesTuple> vkco;
size_t popped = 0;
size_t npop = 10000;
int collected = 0;
while(nc.peek() > 0 && popped < npop)
{
nc.pops(vkco);
popped += vkco.size();

for (auto& kco : vkco)
{
collected++;
auto data = kfvKey(kco);
auto op = kfvOp(kco);

EXPECT_EQ(op, "ntf");
int i = stoi(data);
EXPECT_EQ(i, collected);
}
}
EXPECT_EQ(popped, (size_t)messages);

// Peek and get nothing more
int rc = nc.peek();
EXPECT_EQ(rc, 0);
}