Skip to content

Commit

Permalink
Implement DBInterface/SonicV2Connector in C++ (sonic-net#387)
Browse files Browse the repository at this point in the history
* [pyext] Add more OUTPUT type
* Refactor: add new class RedisConnector
* dbconnector: remove emtpy line
* Refine script functions parameter
* Add copy constructor to RedisConnector
* Optimize DBConnector ctor
* Revert back m_namespace
* Refactor: change name
* Fix build
* Extract psubscribe and subscribe function into DBConnector class
* Implement _subscribe_keyspace_notification, _unsubscribe_keyspace_notification and _connection_error_handler
* Implement blockable
* Implement connect with retry
* Implement DBConnector::publish(),
* Use c++11 syntax instead of c++14
* Implement blocking for get and del
* Add to pyext
* Add set_redis_kwargs(), fix _onetime_connect()
* Fix LGTM: delete implicitly-declared copy assignment operator
* update DBInterface redis_client index from db_id to db_name
* Add DBInterface::delete_all_by_pattern()
* Add SonicV2Connector class
* Add unit test for SonicV2Connector
* Make const strings public because they are used as public method default parameters
* SWIG supports keyword arguments in generated python module
* Add python namespace property to DBConnector class, solve the paramter conflicting with C++ keyword by customizing python code generation
* Move SonicV2Connector to standalone .h/.cpp files
* Add missing include statements into SWIG inteface file
* Add pytest unit test for DBInterface and SonicV2Connector
* Fix swig customization on SonicV2Connector ctor
* Add attrib SonicV2Connector.namespace
* Remove debug code
* Use EXPECT_NE to simplify test
* Remove unused code
  • Loading branch information
qiluo-msft authored and kktheballer committed Dec 21, 2020
1 parent c6e2b28 commit 5a4f355
Show file tree
Hide file tree
Showing 19 changed files with 983 additions and 131 deletions.
2 changes: 2 additions & 0 deletions common/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ libswsscommon_la_SOURCES = \
logger.cpp \
redisreply.cpp \
dbconnector.cpp \
dbinterface.cpp \
sonicv2connector.cpp \
table.cpp \
json.cpp \
producertable.cpp \
Expand Down
237 changes: 162 additions & 75 deletions common/dbconnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@

using json = nlohmann::json;
using namespace std;

namespace swss {
using namespace swss;

void SonicDBConfig::parseDatabaseConfig(const string &file,
std::unordered_map<std::string, RedisInstInfo> &inst_entry,
Expand Down Expand Up @@ -296,6 +295,22 @@ vector<string> SonicDBConfig::getNamespaces()
return list;
}

std::vector<std::string> SonicDBConfig::getDbList(const std::string &netns)
{
if (!m_init)
{
initialize();
}
validateNamespace(netns);

std::vector<std::string> dbNames;
for (auto& imap: m_db_info.at(netns))
{
dbNames.push_back(imap.first);
}
return dbNames;
}

constexpr const char *SonicDBConfig::DEFAULT_SONIC_DB_CONFIG_FILE;
constexpr const char *SonicDBConfig::DEFAULT_SONIC_DB_GLOBAL_CONFIG_FILE;
unordered_map<string, unordered_map<string, RedisInstInfo>> SonicDBConfig::m_inst_info;
Expand All @@ -304,7 +319,100 @@ unordered_map<string, unordered_map<int, string>> SonicDBConfig::m_db_separator;
bool SonicDBConfig::m_init = false;
bool SonicDBConfig::m_global_init = false;

constexpr const char *DBConnector::DEFAULT_UNIXSOCKET;
constexpr const char *RedisContext::DEFAULT_UNIXSOCKET;

RedisContext::~RedisContext()
{
redisFree(m_conn);
}

RedisContext::RedisContext()
{
}

RedisContext::RedisContext(const RedisContext &other)
{
auto octx = other.getContext();
const char *unixPath = octx->unix_sock.path;
if (unixPath)
{
initContext(unixPath, *octx->timeout);
}
else
{
initContext(octx->tcp.host, octx->tcp.port, *octx->timeout);
}
}

RedisContext::RedisContext(const string& hostname, int port,
unsigned int timeout)
{
struct timeval tv = {0, (suseconds_t)timeout * 1000};
initContext(hostname.c_str(), port, tv);
}

RedisContext::RedisContext(const string& unixPath, unsigned int timeout)
{
struct timeval tv = {0, (suseconds_t)timeout * 1000};
initContext(unixPath.c_str(), tv);
}

void RedisContext::initContext(const char *host, int port, const timeval& tv)
{
m_conn = redisConnectWithTimeout(host, port, tv);

if (m_conn->err)
throw system_error(make_error_code(errc::address_not_available),
"Unable to connect to redis");
}

void RedisContext::initContext(const char *path, const timeval &tv)
{
m_conn = redisConnectUnixWithTimeout(path, tv);

if (m_conn->err)
throw system_error(make_error_code(errc::address_not_available),
"Unable to connect to redis (unix-socket)");
}

redisContext *RedisContext::getContext() const
{
return m_conn;
}

void RedisContext::setContext(redisContext *ctx)
{
m_conn = ctx;
}

void RedisContext::setClientName(const string& clientName)
{
string command("CLIENT SETNAME ");
command += clientName;

RedisReply r(this, command, REDIS_REPLY_STATUS);
r.checkStatusOK();
}

string RedisContext::getClientName()
{
string command("CLIENT GETNAME");

RedisReply r(this, command);

auto ctx = r.getContext();
if (ctx->type == REDIS_REPLY_STRING)
{
return r.getReply<std::string>();
}
else
{
if (ctx->type != REDIS_REPLY_NIL)
SWSS_LOG_ERROR("Unable to obtain Redis client name");

return "";
}
}

void DBConnector::select(DBConnector *db)
{
Expand All @@ -315,45 +423,36 @@ void DBConnector::select(DBConnector *db)
r.checkStatusOK();
}

DBConnector::~DBConnector()
DBConnector::DBConnector(const DBConnector &other)
: RedisContext(other)
, m_dbId(other.m_dbId)
, m_namespace(other.m_namespace)
{
redisFree(m_conn);
select(this);
}

DBConnector::DBConnector(int dbId, const RedisContext& ctx)
: RedisContext(ctx)
, m_dbId(dbId)
, m_namespace(EMPTY_NAMESPACE)
{
select(this);
}

DBConnector::DBConnector(int dbId, const string& hostname, int port,
unsigned int timeout) :
RedisContext(hostname, port, timeout),
m_dbId(dbId),
m_namespace(EMPTY_NAMESPACE)
{
struct timeval tv = {0, (suseconds_t)timeout * 1000};

if (timeout)
m_conn = redisConnectWithTimeout(hostname.c_str(), port, tv);
else
m_conn = redisConnect(hostname.c_str(), port);

if (m_conn->err)
throw system_error(make_error_code(errc::address_not_available),
"Unable to connect to redis");

select(this);
}

DBConnector::DBConnector(int dbId, const string& unixPath, unsigned int timeout) :
RedisContext(unixPath, timeout),
m_dbId(dbId),
m_namespace(EMPTY_NAMESPACE)
{
struct timeval tv = {0, (suseconds_t)timeout * 1000};

if (timeout)
m_conn = redisConnectUnixWithTimeout(unixPath.c_str(), tv);
else
m_conn = redisConnectUnix(unixPath.c_str());

if (m_conn->err)
throw system_error(make_error_code(errc::address_not_available),
"Unable to connect to redis (unix-socket)");

select(this);
}

Expand All @@ -364,25 +463,15 @@ DBConnector::DBConnector(const string& dbName, unsigned int timeout, bool isTcpC
{
struct timeval tv = {0, (suseconds_t)timeout * 1000};

if (timeout)
if (isTcpConn)
{
if (isTcpConn)
m_conn = redisConnectWithTimeout(SonicDBConfig::getDbHostname(dbName, netns).c_str(), SonicDBConfig::getDbPort(dbName, netns), tv);
else
m_conn = redisConnectUnixWithTimeout(SonicDBConfig::getDbSock(dbName, netns).c_str(), tv);
initContext(SonicDBConfig::getDbHostname(dbName, netns).c_str(), SonicDBConfig::getDbPort(dbName, netns), tv);
}
else
{
if (isTcpConn)
m_conn = redisConnect(SonicDBConfig::getDbHostname(dbName, netns).c_str(), SonicDBConfig::getDbPort(dbName, netns));
else
m_conn = redisConnectUnix(SonicDBConfig::getDbSock(dbName, netns).c_str());
initContext(SonicDBConfig::getDbSock(dbName, netns).c_str(), tv);
}

if (m_conn->err)
throw system_error(make_error_code(errc::address_not_available),
"Unable to connect to redis");

select(this);
}

Expand All @@ -392,11 +481,6 @@ DBConnector::DBConnector(const string& dbName, unsigned int timeout, bool isTcpC
// Empty contructor
}

redisContext *DBConnector::getContext() const
{
return m_conn;
}

int DBConnector::getDbId() const
{
return m_dbId;
Expand All @@ -407,6 +491,11 @@ string DBConnector::getDbName() const
return m_dbName;
}

void DBConnector::setNamespace(const string& netns)
{
m_namespace = netns;
}

string DBConnector::getNamespace() const
{
return m_namespace;
Expand All @@ -427,40 +516,11 @@ DBConnector *DBConnector::newConnector(unsigned int timeout) const
timeout);

ret->m_dbName = m_dbName;
ret->m_namespace = m_namespace;
ret->setNamespace(getNamespace());

return ret;
}

void DBConnector::setClientName(const string& clientName)
{
string command("CLIENT SETNAME ");
command += clientName;

RedisReply r(this, command, REDIS_REPLY_STATUS);
r.checkStatusOK();
}

string DBConnector::getClientName()
{
string command("CLIENT GETNAME");

RedisReply r(this, command);

auto ctx = r.getContext();
if (ctx->type == REDIS_REPLY_STRING)
{
return r.getReply<std::string>();
}
else
{
if (ctx->type != REDIS_REPLY_NIL)
SWSS_LOG_ERROR("Unable to obtain Redis client name");

return "";
}
}

int64_t DBConnector::del(const string &key)
{
RedisCommand sdel;
Expand Down Expand Up @@ -512,6 +572,13 @@ void DBConnector::set(const string &key, const string &value)
RedisReply r(this, sset, REDIS_REPLY_STATUS);
}

void DBConnector::config_set(const std::string &key, const std::string &value)
{
RedisCommand sset;
sset.format("CONFIG SET %s %s", key.c_str(), value.c_str());
RedisReply r(this, sset, REDIS_REPLY_STATUS);
}

unordered_map<string, string> DBConnector::hgetall(const string &key)
{
unordered_map<string, string> map;
Expand Down Expand Up @@ -622,4 +689,24 @@ shared_ptr<string> DBConnector::blpop(const string &list, int timeout)
throw runtime_error("GET failed, memory exception");
}

void DBConnector::subscribe(const std::string &pattern)
{
std::string s("SUBSCRIBE ");
s += pattern;
RedisReply r(this, s, REDIS_REPLY_ARRAY);
}

void DBConnector::psubscribe(const std::string &pattern)
{
std::string s("PSUBSCRIBE ");
s += pattern;
RedisReply r(this, s, REDIS_REPLY_ARRAY);
}

int64_t DBConnector::publish(const string &channel, const string &message)
{
RedisCommand publish;
publish.format("PUBLISH %s %s", channel.c_str(), message.c_str());
RedisReply r(this, publish, REDIS_REPLY_INTEGER);
return r.getReply<long long int>();
}
Loading

0 comments on commit 5a4f355

Please sign in to comment.