Skip to content

Commit

Permalink
Add ProducerStateTable temp view implementation and UT (#247)
Browse files Browse the repository at this point in the history
* Add ProducerStateTable temp view implementation and UT
  • Loading branch information
taoyl-ms authored Nov 16, 2018
1 parent 41408f2 commit 9918ae6
Show file tree
Hide file tree
Showing 7 changed files with 679 additions and 11 deletions.
1 change: 1 addition & 0 deletions common/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ lib_LTLIBRARIES = libswsscommon.la
EXTRA_DIST = \
consumer_state_table_pops.lua \
consumer_table_pops.lua \
producer_state_table_apply_view.lua \
table_dump.lua

swssdir = $(datadir)/swss
Expand Down
39 changes: 39 additions & 0 deletions common/producer_state_table_apply_view.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
--[[
Sample args format:
KEYS:
SAMPLE_CHANNEL
SAMPLE_KEY_SET
SAMPLE_DEL_KEY_SET
_SAMPLE:key_0
_SAMPLE:key_1
ARGV:
G (String to be published to channel)
2 (Count of objects to set)
key_0
key_1
0 (Count of objects to del)
2 (Count of A/V pair of object 0)
attribute_0
value_0
attribute_1
value_1
1 (Count of A/V pair of object 1)
attribute_0
value_0
]]
local arg_start = 2
for i = 1, ARGV[arg_start] do
redis.call('SADD', KEYS[2], ARGV[arg_start + i])
end
arg_start = arg_start + ARGV[arg_start] + 1
for i = 1, ARGV[arg_start] do
redis.call('SADD', KEYS[3], ARGV[arg_start + i])
end
arg_start = arg_start + ARGV[arg_start] + 1
for j = 4, #KEYS do
for i = 1, ARGV[arg_start] do
redis.call('HSET', KEYS[j], ARGV[arg_start + i * 2 - 1], ARGV[arg_start + i * 2])
end
arg_start = arg_start + 2 * ARGV[arg_start] + 1
end
redis.call('PUBLISH', KEYS[1], ARGV[1])
184 changes: 184 additions & 0 deletions common/producerstatetable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ ProducerStateTable::ProducerStateTable(RedisPipeline *pipeline, const string &ta
, TableName_KeySet(tableName)
, m_buffered(buffered)
, m_pipeowned(false)
, m_tempViewActive(false)
, m_pipe(pipeline)
{
string luaSet =
Expand All @@ -49,6 +50,9 @@ ProducerStateTable::ProducerStateTable(RedisPipeline *pipeline, const string &ta
"end\n"
"redis.call('DEL', KEYS[3])\n";
m_shaClear = m_pipe->loadRedisScript(luaClear);

string luaApplyView = loadLuaScript("producer_state_table_apply_view.lua");
m_shaApplyView = m_pipe->loadRedisScript(luaApplyView);
}

ProducerStateTable::~ProducerStateTable()
Expand All @@ -67,6 +71,16 @@ void ProducerStateTable::setBuffered(bool buffered)
void ProducerStateTable::set(const string &key, const vector<FieldValueTuple> &values,
const string &op /*= SET_COMMAND*/, const string &prefix)
{
if (m_tempViewActive)
{
// Write to temp view instead of DB
for (const auto& iv: values)
{
m_tempViewState[key][fvField(iv)] = fvValue(iv);
}
return;
}

// Assembly redis command args into a string vector
vector<string> args;
args.emplace_back("EVALSHA");
Expand Down Expand Up @@ -101,6 +115,13 @@ void ProducerStateTable::set(const string &key, const vector<FieldValueTuple> &v

void ProducerStateTable::del(const string &key, const string &op /*= DEL_COMMAND*/, const string &prefix)
{
if (m_tempViewActive)
{
// Write to temp view instead of DB
m_tempViewState.erase(key);
return;
}

// Assembly redis command args into a string vector
vector<string> args;
args.emplace_back("EVALSHA");
Expand Down Expand Up @@ -168,4 +189,167 @@ void ProducerStateTable::clear()
m_pipe->flush();
}

void ProducerStateTable::create_temp_view()
{
if (m_tempViewActive)
{
SWSS_LOG_WARN("create_temp_view() called for table %s when another temp view is under work, %zd objects in existing temp view will be discarded.", getTableName().c_str(), m_tempViewState.size());
}
m_tempViewActive = true;
m_tempViewState.clear();
}

void ProducerStateTable::apply_temp_view()
{
if (!m_tempViewActive)
{
SWSS_LOG_THROW("apply_temp_view() called for table %s, however no temp view was created.", getTableName().c_str());
}

// Drop all pending operation first
clear();

TableDump currentState;
{
Table mainTable(m_pipe, getTableName(), false);
mainTable.dump(currentState);
}

// Print content of current view and temp view as debug log
SWSS_LOG_INFO("View switch of table %s required.", getTableName().c_str());
SWSS_LOG_INFO("Objects in current view:");
for (auto const & kfvPair : currentState)
{
SWSS_LOG_INFO(" %s: %zd fields;", kfvPair.first.c_str(), kfvPair.second.size());
}
SWSS_LOG_INFO("Objects in target view:");
for (auto const & kfvPair : m_tempViewState)
{
SWSS_LOG_INFO(" %s: %zd fields;", kfvPair.first.c_str(), kfvPair.second.size());
}


std::vector<std::string> keysToSet;
std::vector<std::string> keysToDel;

// Compare based on existing objects.
// Please note that this comparation is literal not contextual -
// e.g. {nexthop: 10.1.1.1, 10.1.1.2} and {nexthop: 10.1.1.2, 10.1.1.1} will be treated as different.
// Application will need to handle it, to make sure contextually identical field values also literally identical.
for (auto const & kfvPair : currentState)
{
const string& key = kfvPair.first;
const TableMap& fieldValueMap = kfvPair.second;
// DEL is needed if object does not exist in new state, or any field is not presented in new state
// SET is almost always needed, unless old state and new state exactly match each other
// (All old fields exists in new state, values match, and there is no additional field in new state)
if (m_tempViewState.find(key) == m_tempViewState.end()) // Key does not exist in new view
{
keysToDel.emplace_back(key);
keysToSet.emplace_back(key);
continue;
}
const TableMap& newFieldValueMap = m_tempViewState[key];
bool needDel = false;
bool needSet = false;
for (auto const& fvPair : fieldValueMap)
{
const string& field = fvPair.first;
const string& value = fvPair.second;
if (newFieldValueMap.find(field) == newFieldValueMap.end()) // Field does not exist in new view
{
needDel = true;
needSet = true;
break;
}
if (newFieldValueMap.at(field) != value) // Field value changed
{
needSet = true;
}
}
if (newFieldValueMap.size() > fieldValueMap.size()) // New field added
{
needSet = true;
}

if (needDel)
{
keysToDel.emplace_back(key);
}
if (needSet)
{
keysToSet.emplace_back(key);
}
else // If exactly match, no need to sync new state to StateHash in DB
{
m_tempViewState.erase(key);
}
}
// Objects that do not exist currently need to be created
for (auto const & kfvPair : m_tempViewState)
{
const string& key = kfvPair.first;
if (currentState.find(key) == currentState.end())
{
keysToSet.emplace_back(key);
}
}

// Assembly redis command args into a string vector
// See comment in producer_state_table_apply_view.lua for argument format
vector<string> args;
args.emplace_back("EVALSHA");
args.emplace_back(m_shaApplyView);
args.emplace_back(to_string(m_tempViewState.size() + 3));
args.emplace_back(getChannelName());
args.emplace_back(getKeySetName());
args.emplace_back(getDelKeySetName());

vector<string> argvs;
argvs.emplace_back("G");
argvs.emplace_back(to_string(keysToSet.size()));
argvs.insert(argvs.end(), keysToSet.begin(), keysToSet.end());
argvs.emplace_back(to_string(keysToDel.size()));
argvs.insert(argvs.end(), keysToDel.begin(), keysToDel.end());
for (auto const & kfvPair : m_tempViewState)
{
const string& key = kfvPair.first;
const TableMap& fieldValueMap = kfvPair.second;
args.emplace_back(getStateHashPrefix() + getKeyName(key));
argvs.emplace_back(to_string(fieldValueMap.size()));
for (auto const& fvPair : fieldValueMap)
{
const string& field = fvPair.first;
const string& value = fvPair.second;
argvs.emplace_back(field);
argvs.emplace_back(value);
}
}
args.insert(args.end(), argvs.begin(), argvs.end());

// Log arguments for debug
{
std::stringstream ss;
for (auto const & item : args)
{
ss << item << " ";
}
SWSS_LOG_DEBUG("apply_view.lua is called with following argument list: %s", ss.str().c_str());
}

// Transform data structure
vector<const char *> args1;
transform(args.begin(), args.end(), back_inserter(args1), [](const string &s) { return s.c_str(); } );

// Invoke redis command
RedisCommand command;
command.formatArgv((int)args1.size(), &args1[0], NULL);
m_pipe->push(command, REDIS_REPLY_NIL);
m_pipe->flush();

// Clear state, temp view operation is now finished
m_tempViewState.clear();
m_tempViewActive = false;
}

}
7 changes: 7 additions & 0 deletions common/producerstatetable.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,20 @@ class ProducerStateTable : public TableBase, public TableName_KeySet
int64_t count();

void clear();

void create_temp_view();

void apply_temp_view();
private:
bool m_buffered;
bool m_pipeowned;
bool m_tempViewActive;
RedisPipeline *m_pipe;
std::string m_shaSet;
std::string m_shaDel;
std::string m_shaClear;
std::string m_shaApplyView;
TableDump m_tempViewState;
};

}
5 changes: 0 additions & 5 deletions common/table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,6 @@ void Table::dump(TableDump& tableDump)
{
SWSS_LOG_ENTER();

// note that this function is not efficient
// it can take ~100ms for entire asic dump
// but it's not intended to be efficient
// since it will not be used many times

static std::string luaScript = loadLuaScript("table_dump.lua");

static std::string sha = m_pipe->loadRedisScript(luaScript);
Expand Down
10 changes: 4 additions & 6 deletions common/table_dump.lua
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
local keys = redis.call("keys", KEYS[1] .. ":*")
local keys = redis.call("KEYS", KEYS[1] .. ":*")
local res = {}

for i,k in pairs(keys) do

local skeys = redis.call("HKEYS", k)
local sres={}

for j,sk in pairs(skeys) do
sres[sk] = redis.call("HGET", k, sk)
local flat_map = redis.call('HGETALL', k)
for j = 1, #flat_map, 2 do
sres[flat_map[j]] = flat_map[j + 1]
end

res[k] = sres

end

return cjson.encode(res)
Loading

0 comments on commit 9918ae6

Please sign in to comment.