Skip to content

Commit

Permalink
[orchagent]: Support LAG_MEMBER_TABLE (#187)
Browse files Browse the repository at this point in the history
* [orchagent]: Support LAG_MEMBER_TABLE

- Add LAG_MEMBER_TABLE in teamsyncd
- Add LAG_MEMBER_TABLE in orchagent

* Handle invalid index before indexing
* Replace KeyOpFieldValueTuples with auto &

Signed-off-by: Shuotian Cheng <shuche@microsoft.com>
  • Loading branch information
Shuotian Cheng authored Apr 13, 2017
1 parent a89c4a2 commit c5cbc69
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 114 deletions.
3 changes: 2 additions & 1 deletion orchagent/orchdaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ bool OrchDaemon::init()
vector<string> ports_tables = {
APP_PORT_TABLE_NAME,
APP_VLAN_TABLE_NAME,
APP_LAG_TABLE_NAME
APP_LAG_TABLE_NAME,
APP_LAG_MEMBER_TABLE_NAME
};

gPortsOrch = new PortsOrch(m_applDb, ports_tables);
Expand Down
214 changes: 111 additions & 103 deletions orchagent/portsorch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ void PortsOrch::doPortTask(Consumer &consumer)
auto it = consumer.m_toSync.begin();
while (it != consumer.m_toSync.end())
{
KeyOpFieldsValuesTuple t = it->second;
auto &t = it->second;

string alias = kfvKey(t);
string op = kfvOp(t);
Expand Down Expand Up @@ -397,7 +397,7 @@ void PortsOrch::doVlanTask(Consumer &consumer)
auto it = consumer.m_toSync.begin();
while (it != consumer.m_toSync.end())
{
KeyOpFieldsValuesTuple t = it->second;
auto &t = it->second;

string key = kfvKey(t);

Expand Down Expand Up @@ -527,144 +527,150 @@ void PortsOrch::doLagTask(Consumer &consumer)
auto it = consumer.m_toSync.begin();
while (it != consumer.m_toSync.end())
{
KeyOpFieldsValuesTuple t = it->second;
auto &t = it->second;

string lag_alias = kfvKey(t);
string op = kfvOp(t);

if (op == SET_COMMAND)
{
/* Duplicate entry */
if (m_portList.find(lag_alias) != m_portList.end())
{
it = consumer.m_toSync.erase(it);
continue;
}

if (addLag(lag_alias))
it = consumer.m_toSync.erase(it);
else
it++;
}
else if (op == DEL_COMMAND)
{
Port lag;
/* Cannot locate LAG */
if (!getPort(lag_alias, lag))
{
it = consumer.m_toSync.erase(it);
continue;
}

if (removeLag(lag))
it = consumer.m_toSync.erase(it);
else
it++;
}
else
{
SWSS_LOG_ERROR("Unknown operation type %s", op.c_str());
it = consumer.m_toSync.erase(it);
}
}
}

void PortsOrch::doLagMemberTask(Consumer &consumer)
{
if (!isInitDone())
return;

auto it = consumer.m_toSync.begin();
while (it != consumer.m_toSync.end())
{
auto &t = it->second;

/* Retrieve LAG alias and LAG member alias from key */
string key = kfvKey(t);
size_t found = key.find(':');
string lag_alias, port_alias;
/* Return if the format of key is wrong */
if (found == string::npos)
lag_alias = key;
else
{
lag_alias = key.substr(0, found);
port_alias = key.substr(found+1);
SWSS_LOG_ERROR("Failed to parse %s", key.c_str());
return;
}
string lag_alias = key.substr(0, found);
string port_alias = key.substr(found+1);

string op = kfvOp(t);

/* Manipulate LAG when port_alias is empty */
if (port_alias == "")
Port lag, port;
if (!getPort(lag_alias, lag))
{
if (op == SET_COMMAND)
SWSS_LOG_INFO("Failed to locate LAG %s", lag_alias.c_str());
it++;
continue;
}

if (!getPort(port_alias, port))
{
SWSS_LOG_ERROR("Failed to locate port %s", port_alias.c_str());
it = consumer.m_toSync.erase(it);
continue;
}

/* Update a LAG member */
if (op == SET_COMMAND)
{
string status;
for (auto i : kfvFieldsValues(t))
{
if (fvField(i) == "status")
status = fvValue(i);
}

/* Sync an enabled member */
if (status == "enabled")
{
/* Duplicate entry */
if (m_portList.find(lag_alias) != m_portList.end())
if (lag.m_members.find(port_alias) != lag.m_members.end())
{
it = consumer.m_toSync.erase(it);
continue;
}

if (addLag(lag_alias))
/* Assert the port doesn't belong to any LAG */
assert(!port.m_lag_id && !port.m_lag_member_id);

if (addLagMember(lag, port))
it = consumer.m_toSync.erase(it);
else
it++;
}
else if (op == DEL_COMMAND)
/* Sync an disabled member */
else /* status == "disabled" */
{
Port lag;
/* Cannot locate LAG */
if (!getPort(lag_alias, lag))
/* "status" is "disabled" at start when m_lag_id and
* m_lag_member_id are absent */
if (!port.m_lag_id || !port.m_lag_member_id)
{
it = consumer.m_toSync.erase(it);
continue;
}

if (removeLag(lag))
if (removeLagMember(lag, port))
it = consumer.m_toSync.erase(it);
else
it++;
}
else
{
SWSS_LOG_ERROR("Unknown operation type %s", op.c_str());
it = consumer.m_toSync.erase(it);
}
}
/* Manipulate a LAG member */
else
/* Remove a LAG member */
else if (op == DEL_COMMAND)
{
assert(m_portList.find(lag_alias) != m_portList.end());
Port lag, port;
/* Assert the LAG member exists */
assert(lag.m_members.find(port_alias) != lag.m_members.end());

/* When LAG member is to be created before LAG is created */
if (!getPort(lag_alias, lag))
{
SWSS_LOG_INFO("Failed to locate LAG %s", lag_alias.c_str());
it++;
continue;
}
/* Assert the port belongs to a LAG */
assert(port.m_lag_id && port.m_lag_member_id);

if (!getPort(port_alias, port))
{
SWSS_LOG_ERROR("Failed to locate port %s", port_alias.c_str());
if (removeLagMember(lag, port))
it = consumer.m_toSync.erase(it);
continue;
}

/* Add a LAG member */
if (op == SET_COMMAND)
{
string status;
for (auto i : kfvFieldsValues(t))
{
if (fvField(i) == "status")
status = fvValue(i);
}

/* Sync an enabled member */
if (status == "enabled")
{
/* Duplicate entry */
if (lag.m_members.find(port_alias) != lag.m_members.end())
{
it = consumer.m_toSync.erase(it);
continue;
}

/* Assert the port doesn't belong to any LAG */
assert(!port.m_lag_id && !port.m_lag_member_id);

if (addLagMember(lag, port))
it = consumer.m_toSync.erase(it);
else
it++;
}
/* Sync an disabled member */
else /* status == "disabled" */
{
/* "status" is "disabled" at start when m_lag_id and
* m_lag_member_id are absent */
if (!port.m_lag_id || !port.m_lag_member_id)
{
it = consumer.m_toSync.erase(it);
continue;
}

if (removeLagMember(lag, port))
it = consumer.m_toSync.erase(it);
else
it++;
}
}
/* Remove a LAG member */
else if (op == DEL_COMMAND)
{
/* Assert the LAG member exists */
assert(lag.m_members.find(port_alias) != lag.m_members.end());

/* Assert the port belongs to a LAG */
assert(port.m_lag_id && port.m_lag_member_id);

if (removeLagMember(lag, port))
it = consumer.m_toSync.erase(it);
else
it++;
}
else
{
SWSS_LOG_ERROR("Unknown operation type %s", op.c_str());
it = consumer.m_toSync.erase(it);
}
it++;
}
else
{
SWSS_LOG_ERROR("Unknown operation type %s", op.c_str());
it = consumer.m_toSync.erase(it);
}
}
}
Expand All @@ -681,6 +687,8 @@ void PortsOrch::doTask(Consumer &consumer)
doVlanTask(consumer);
else if (table_name == APP_LAG_TABLE_NAME)
doLagTask(consumer);
else if (table_name == APP_LAG_MEMBER_TABLE_NAME)
doLagMemberTask(consumer);
}

void PortsOrch::initializeQueues(Port &port)
Expand Down
1 change: 1 addition & 0 deletions orchagent/portsorch.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class PortsOrch : public Orch, public Subject
void doPortTask(Consumer &consumer);
void doVlanTask(Consumer &consumer);
void doLagTask(Consumer &consumer);
void doLagMemberTask(Consumer &consumer);

bool initializePort(Port &port);
void initializePriorityGroups(Port &port);
Expand Down
17 changes: 9 additions & 8 deletions teamsyncd/teamsync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ using namespace swss;

TeamSync::TeamSync(DBConnector *db, Select *select) :
m_select(select),
m_lagTable(db, APP_LAG_TABLE_NAME)
m_lagTable(db, APP_LAG_TABLE_NAME),
m_lagMemberTable(db, APP_LAG_MEMBER_TABLE_NAME)
{
}

Expand Down Expand Up @@ -68,9 +69,9 @@ void TeamSync::addLag(const string &lagName, int ifindex, bool admin_state,
return;

/* Start track the team instance */
TeamPortSync *sync = new TeamPortSync(lagName, ifindex, &m_lagTable);
m_select->addSelectable(sync);
m_teamPorts[lagName] = shared_ptr<TeamPortSync>(sync);
auto sync = make_shared<TeamPortSync>(lagName, ifindex, &m_lagMemberTable);
m_select->addSelectable(sync.get());
m_teamPorts[lagName] = sync;
}

void TeamSync::removeLag(const string &lagName)
Expand All @@ -95,8 +96,8 @@ const struct team_change_handler TeamSync::TeamPortSync::gPortChangeHandler = {
};

TeamSync::TeamPortSync::TeamPortSync(const string &lagName, int ifindex,
ProducerStateTable *lagTable) :
m_lagTable(lagTable),
ProducerStateTable *lagMemberTable) :
m_lagMemberTable(lagMemberTable),
m_lagName(lagName),
m_ifindex(ifindex)
{
Expand Down Expand Up @@ -171,7 +172,7 @@ int TeamSync::TeamPortSync::onChange()
vector<FieldValueTuple> v;
FieldValueTuple l("status", it.second ? "enabled" : "disabled");
v.push_back(l);
m_lagTable->set(key, v);
m_lagMemberTable->set(key, v);
}
}

Expand All @@ -180,7 +181,7 @@ int TeamSync::TeamPortSync::onChange()
if (tmp_lag_members.find(it.first) == tmp_lag_members.end())
{
string key = m_lagName + ":" + it.first;
m_lagTable->del(key);
m_lagMemberTable->del(key);
}
}

Expand Down
5 changes: 3 additions & 2 deletions teamsyncd/teamsync.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class TeamSync : public NetMsg
public:
enum { MAX_IFNAME = 64 };
TeamPortSync(const std::string &lagName, int ifindex,
ProducerStateTable *lagTable);
ProducerStateTable *lagMemberTable);
~TeamPortSync();

virtual void addFd(fd_set *fd);
Expand All @@ -43,7 +43,7 @@ class TeamSync : public NetMsg
team_change_type_mask_t type_mask);
static const struct team_change_handler gPortChangeHandler;
private:
ProducerStateTable *m_lagTable;
ProducerStateTable *m_lagMemberTable;
struct team_handle *m_team;
std::string m_lagName;
int m_ifindex;
Expand All @@ -58,6 +58,7 @@ class TeamSync : public NetMsg
private:
Select *m_select;
ProducerStateTable m_lagTable;
ProducerStateTable m_lagMemberTable;
std::map<std::string, std::shared_ptr<TeamPortSync> > m_teamPorts;
};

Expand Down

0 comments on commit c5cbc69

Please sign in to comment.