Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MirrorOrch]: Mirror Session Retention across Warm Reboot #1054

Merged
merged 2 commits into from
Sep 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 172 additions & 24 deletions orchagent/mirrororch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
#include "swssnet.h"
#include "converter.h"
#include "mirrororch.h"
#include "tokenize.h"

#define MIRROR_SESSION_STATUS "status"
#define MIRROR_SESSION_STATUS_ACTIVE "active"
#define MIRROR_SESSION_STATUS_INACTIVE "inactive"
#define MIRROR_SESSION_NEXT_HOP_IP "next_hop_ip"
#define MIRROR_SESSION_SRC_IP "src_ip"
#define MIRROR_SESSION_DST_IP "dst_ip"
#define MIRROR_SESSION_GRE_TYPE "gre_type"
Expand All @@ -23,7 +25,7 @@
#define MIRROR_SESSION_DST_MAC_ADDRESS "dst_mac"
#define MIRROR_SESSION_MONITOR_PORT "monitor_port"
#define MIRROR_SESSION_ROUTE_PREFIX "route_prefix"
#define MIRROR_SESSION_VLAN_HEADER_VALID "vlan_header_valid"
#define MIRROR_SESSION_VLAN_ID "vlan_id"
#define MIRROR_SESSION_POLICER "policer"

#define MIRROR_SESSION_DEFAULT_VLAN_PRI 0
Expand Down Expand Up @@ -58,6 +60,7 @@ MirrorEntry::MirrorEntry(const string& platform) :
}

nexthopInfo.prefix = IpPrefix("0.0.0.0/0");
nexthopInfo.nexthop = IpAddress("0.0.0.0");
}

MirrorOrch::MirrorOrch(TableConnector stateDbConnector, TableConnector confDbConnector,
Expand All @@ -75,6 +78,74 @@ MirrorOrch::MirrorOrch(TableConnector stateDbConnector, TableConnector confDbCon
m_fdbOrch->attach(this);
}

bool MirrorOrch::bake()
{
SWSS_LOG_ENTER();

// Freeze the route update during orchagent restoration
m_freeze = true;

deque<KeyOpFieldsValuesTuple> entries;
vector<string> keys;
m_mirrorTable.getKeys(keys);
for (const auto &key : keys)
{
vector<FieldValueTuple> tuples;
m_mirrorTable.get(key, tuples);

bool active = false;
string monitor_port;
string next_hop_ip;

for (const auto &tuple : tuples)
{
if (fvField(tuple) == MIRROR_SESSION_STATUS)
{
active = fvValue(tuple) == MIRROR_SESSION_STATUS_ACTIVE;
}
else if (fvField(tuple) == MIRROR_SESSION_MONITOR_PORT)
{
monitor_port = fvValue(tuple);
}
else if (fvField(tuple) == MIRROR_SESSION_NEXT_HOP_IP)
{
next_hop_ip = fvValue(tuple);
}
}

if (!active)
{
continue;
}

SWSS_LOG_NOTICE("Found mirror session %s active before warm reboot",
key.c_str());

// Recover saved active session's monitor port
m_recoverySessionMap.emplace(
key, monitor_port + state_db_key_delimiter + next_hop_ip);
}

return Orch::bake();
}

bool MirrorOrch::postBake()
Copy link
Contributor

@lguohan lguohan Sep 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the current algorithm has a flaw. for example, let's say previous orchagent has a bug which cacluates a wrong session results (wrong port), in this algorithm, the warm reboot will give the same wrong port, and if the route is not update later, the wrong port will not be corrected.

the correct algorithm will need to check if the recorded results is in the newly calculated results or not, if it is in then use it, if it is not, the warm reboot need to correct that error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found current approach is difficult to achieve above results.

the best approach is the move mirror orch after route orch in the dotask loop.

Then in the mirror orch dotask loop, caculate the new mirror session results, check if the old results is contained in the new results, if yes, then use old results. It should also results simpler code since the postBake() introduce new logics which can be avoided.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will leads back to the original issue that we are facing - if there are more than 1 mirror sessions require destination update, comparison logic will not be able to handle such cases, and the processes will crash. If it is expected to fix the wrong session results during the warm reboot, such logic shall be introduced after the comparison logic.

{
SWSS_LOG_ENTER();

SWSS_LOG_NOTICE("Start MirrorOrch post-baking");

// Unfreeze the route update
m_freeze = false;

Orch::doTask();

// Clean up the recovery cache
m_recoverySessionMap.clear();

return Orch::postBake();
}

void MirrorOrch::update(SubjectType type, void *cntx)
{
SWSS_LOG_ENTER();
Expand Down Expand Up @@ -320,10 +391,11 @@ void MirrorOrch::setSessionState(const string& name, const MirrorEntry& session,
{
SWSS_LOG_ENTER();

SWSS_LOG_INFO("Setting mirroring sessions %s state\n", name.c_str());
SWSS_LOG_INFO("Update mirroring sessions %s state", name.c_str());

vector<FieldValueTuple> fvVector;
string value;

if (attr.empty() || attr == MIRROR_SESSION_STATUS)
{
value = session.status ? MIRROR_SESSION_STATUS_ACTIVE : MIRROR_SESSION_STATUS_INACTIVE;
Expand All @@ -332,8 +404,9 @@ void MirrorOrch::setSessionState(const string& name, const MirrorEntry& session,

if (attr.empty() || attr == MIRROR_SESSION_MONITOR_PORT)
{
value = sai_serialize_object_id(session.neighborInfo.portId);
fvVector.emplace_back(MIRROR_SESSION_MONITOR_PORT, value);
Port port;
m_portsOrch->getPort(session.neighborInfo.portId, port);
fvVector.emplace_back(MIRROR_SESSION_MONITOR_PORT, port.m_alias);
}

if (attr.empty() || attr == MIRROR_SESSION_DST_MAC_ADDRESS)
Expand All @@ -348,10 +421,16 @@ void MirrorOrch::setSessionState(const string& name, const MirrorEntry& session,
fvVector.emplace_back(MIRROR_SESSION_ROUTE_PREFIX, value);
}

if (attr.empty() || attr == MIRROR_SESSION_VLAN_HEADER_VALID)
if (attr.empty() || attr == MIRROR_SESSION_VLAN_ID)
{
value = to_string(session.neighborInfo.port.m_vlan_info.vlan_id);
fvVector.emplace_back(MIRROR_SESSION_VLAN_ID, value);
}

if (attr.empty() || attr == MIRROR_SESSION_NEXT_HOP_IP)
{
value = to_string(session.neighborInfo.port.m_type == Port::VLAN);
fvVector.emplace_back(MIRROR_SESSION_VLAN_HEADER_VALID, value);
value = session.nexthopInfo.nexthop.to_string();
fvVector.emplace_back(MIRROR_SESSION_NEXT_HOP_IP, value);
stcheng marked this conversation as resolved.
Show resolved Hide resolved
}

m_mirrorTable.set(name, fvVector);
Expand Down Expand Up @@ -396,32 +475,68 @@ bool MirrorOrch::getNeighborInfo(const string& name, MirrorEntry& session)
return false;
}

// Get the firt member of the LAG
Port member;
const auto& first_member_alias = *session.neighborInfo.port.m_members.begin();
m_portsOrch->getPort(first_member_alias, member);
// Recover the LAG member monitor port picked before warm reboot
// to minimalize the data plane changes across warm reboot.
if (m_recoverySessionMap.find(name) != m_recoverySessionMap.end())
{
string alias = tokenize(m_recoverySessionMap[name],
state_db_key_delimiter, 1)[0];
Port member;
m_portsOrch->getPort(alias, member);

SWSS_LOG_NOTICE("Recover mirror session %s with LAG member port %s",
name.c_str(), alias.c_str());
session.neighborInfo.portId = member.m_port_id;
}
else
{
// Get the firt member of the LAG
Port member;
string first_member_alias = *session.neighborInfo.port.m_members.begin();
m_portsOrch->getPort(first_member_alias, member);

session.neighborInfo.portId = member.m_port_id;
}

session.neighborInfo.portId = member.m_port_id;
return true;
}
case Port::VLAN:
{
SWSS_LOG_NOTICE("Get mirror session destination IP neighbor VLAN %d",
session.neighborInfo.port.m_vlan_info.vlan_id);
Port member;
if (!m_fdbOrch->getPort(session.neighborInfo.mac, session.neighborInfo.port.m_vlan_info.vlan_id, member))

// Recover the VLAN member monitor port picked before warm reboot
// since the FDB entries are not yet learned on the hardware
if (m_recoverySessionMap.find(name) != m_recoverySessionMap.end())
{
SWSS_LOG_NOTICE("Waiting to get FDB entry MAC %s under VLAN %s",
session.neighborInfo.mac.to_string().c_str(),
session.neighborInfo.port.m_alias.c_str());
return false;
string alias = tokenize(m_recoverySessionMap[name],
state_db_key_delimiter, 1)[0];
Port member;
m_portsOrch->getPort(alias, member);

SWSS_LOG_NOTICE("Recover mirror session %s with VLAN member port %s",
name.c_str(), alias.c_str());
session.neighborInfo.portId = member.m_port_id;
}
else
{
// Update monitor port
session.neighborInfo.portId = member.m_port_id;
return true;
Port member;
if (!m_fdbOrch->getPort(session.neighborInfo.mac,
session.neighborInfo.port.m_vlan_info.vlan_id, member))
{
SWSS_LOG_NOTICE("Waiting to get FDB entry MAC %s under VLAN %s",
session.neighborInfo.mac.to_string().c_str(),
session.neighborInfo.port.m_alias.c_str());
return false;
}
else
{
// Update monitor port
session.neighborInfo.portId = member.m_port_id;
}
}

return true;
}
default:
{
Expand Down Expand Up @@ -741,7 +856,7 @@ bool MirrorOrch::updateSessionType(const string& name, MirrorEntry& session)
SWSS_LOG_NOTICE("Update mirror session %s VLAN to %s",
name.c_str(), session.neighborInfo.port.m_alias.c_str());

setSessionState(name, session, MIRROR_SESSION_VLAN_HEADER_VALID);
setSessionState(name, session, MIRROR_SESSION_VLAN_ID);

return true;
}
Expand Down Expand Up @@ -782,7 +897,35 @@ void MirrorOrch::updateNextHop(const NextHopUpdate& update)

if (update.nexthopGroup != IpAddresses())
{
session.nexthopInfo.nexthop = *update.nexthopGroup.getIpAddresses().begin();
SWSS_LOG_NOTICE(" next hop IPs: %s", update.nexthopGroup.to_string().c_str());

// Recover the session based on the state database information
if (m_recoverySessionMap.find(name) != m_recoverySessionMap.end())
{
IpAddress nexthop = IpAddress(tokenize(m_recoverySessionMap[name],
state_db_key_delimiter, 1)[1]);

// Check if recovered next hop IP is within the update's next hop IPs
if (update.nexthopGroup.getIpAddresses().count(nexthop))
{
SWSS_LOG_NOTICE("Recover mirror session %s with next hop %s",
name.c_str(), nexthop.to_string().c_str());
session.nexthopInfo.nexthop = nexthop;
}
else
{
// Correct the next hop IP
SWSS_LOG_NOTICE("Correct mirror session %s next hop from %s to %s",
name.c_str(), session.nexthopInfo.nexthop.to_string().c_str(),
nexthop.to_string().c_str());
session.nexthopInfo.nexthop = *update.nexthopGroup.getIpAddresses().begin();
}
}
else
{
// Pick the first one from the next hop group
session.nexthopInfo.nexthop = *update.nexthopGroup.getIpAddresses().begin();
}
}
else
{
Expand Down Expand Up @@ -968,6 +1111,11 @@ void MirrorOrch::doTask(Consumer& consumer)
{
SWSS_LOG_ENTER();

if (m_freeze)
{
return;
}

if (!gPortsOrch->allPortsReady())
{
return;
Expand All @@ -991,7 +1139,7 @@ void MirrorOrch::doTask(Consumer& consumer)
}
else
{
SWSS_LOG_ERROR("Unknown operation type %s\n", op.c_str());
SWSS_LOG_ERROR("Unknown operation type %s", op.c_str());
}

consumer.m_toSync.erase(it++);
Expand Down
6 changes: 6 additions & 0 deletions orchagent/mirrororch.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ class MirrorOrch : public Orch, public Observer, public Subject
MirrorOrch(TableConnector appDbConnector, TableConnector confDbConnector,
PortsOrch *portOrch, RouteOrch *routeOrch, NeighOrch *neighOrch, FdbOrch *fdbOrch, PolicerOrch *policerOrch);

bool bake() override;
bool postBake() override;
void update(SubjectType, void *);
bool sessionExists(const string&);
bool getSessionStatus(const string&, bool&);
Expand All @@ -86,6 +88,10 @@ class MirrorOrch : public Orch, public Observer, public Subject
Table m_mirrorTable;

MirrorTable m_syncdMirrors;
// session_name -> VLAN | monitor_port_alias | next_hop_ip
map<string, string> m_recoverySessionMap;

bool m_freeze = false;

void createEntry(const string&, const vector<FieldValueTuple>&);
void deleteEntry(const string&);
Expand Down
19 changes: 13 additions & 6 deletions orchagent/orch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ Orch::Orch(DBConnector *db, const string tableName, int pri)

Orch::Orch(DBConnector *db, const vector<string> &tableNames)
{
for(auto it : tableNames)
for (auto it : tableNames)
{
addConsumer(db, it, default_orch_pri);
}
}

Orch::Orch(DBConnector *db, const vector<table_name_with_pri_t> &tableNames_with_pri)
{
for(const auto& it : tableNames_with_pri)
for (const auto& it : tableNames_with_pri)
{
addConsumer(db, it.first, it.second);
}
Expand All @@ -60,7 +60,7 @@ Orch::~Orch()
vector<Selectable *> Orch::getSelectables()
{
vector<Selectable *> selectables;
for(auto& it : m_consumerMap)
for (auto& it : m_consumerMap)
{
selectables.push_back(it.second.get());
}
Expand Down Expand Up @@ -240,7 +240,7 @@ bool Orch::bake()
{
SWSS_LOG_ENTER();

for(auto &it : m_consumerMap)
for (auto &it : m_consumerMap)
{
string executorName = it.first;
auto executor = it.second;
Expand All @@ -257,6 +257,13 @@ bool Orch::bake()
return true;
}

bool Orch::postBake()
{
SWSS_LOG_ENTER();

return true;
}

/*
- Validates reference has proper format which is [table_name:object_name]
- validates table_name exists
Expand Down Expand Up @@ -365,15 +372,15 @@ ref_resolve_status Orch::resolveFieldRefValue(

void Orch::doTask()
{
for(auto &it : m_consumerMap)
for (auto &it : m_consumerMap)
{
it.second->drain();
}
}

void Orch::dumpPendingTasks(vector<string> &ts)
{
for(auto &it : m_consumerMap)
for (auto &it : m_consumerMap)
{
Consumer* consumer = dynamic_cast<Consumer *>(it.second.get());
if (consumer == NULL)
Expand Down
2 changes: 2 additions & 0 deletions orchagent/orch.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ class Orch
// Prepare for warm start if Redis contains valid input data
// otherwise fallback to cold start
virtual bool bake();
// Clean up the state set in bake()
virtual bool postBake();
stcheng marked this conversation as resolved.
Show resolved Hide resolved

/* Iterate all consumers in m_consumerMap and run doTask(Consumer) */
virtual void doTask();
Expand Down
Loading