Skip to content

Commit

Permalink
[everflow] Add retry mechanism for mirror sessions and policers (soni…
Browse files Browse the repository at this point in the history
…c-net#1486)

Signed-off-by: Danny Allen <daall@microsoft.com>
  • Loading branch information
daall authored Dec 10, 2020
1 parent 376acfe commit b7e4410
Show file tree
Hide file tree
Showing 13 changed files with 243 additions and 86 deletions.
23 changes: 16 additions & 7 deletions orchagent/aclorch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1083,12 +1083,6 @@ bool AclRuleMirror::validateAddAction(string attr_name, string attr_value)

m_sessionName = attr_value;

if (!m_pMirrorOrch->sessionExists(m_sessionName))
{
SWSS_LOG_ERROR("Mirror rule reference mirror session that does not exists %s", m_sessionName.c_str());
return false;
}

// insert placeholder value, we'll set the session oid in AclRuleMirror::create()
m_actions[action] = sai_attribute_value_t{};

Expand Down Expand Up @@ -1178,6 +1172,12 @@ bool AclRuleMirror::create()
sai_object_id_t oid = SAI_NULL_OBJECT_ID;
bool state = false;

if (!m_pMirrorOrch->sessionExists(m_sessionName))
{
SWSS_LOG_ERROR("Mirror rule references mirror session \"%s\" that does not exist yet", m_sessionName.c_str());
return false;
}

if (!m_pMirrorOrch->getSessionStatus(m_sessionName, state))
{
SWSS_LOG_THROW("Failed to get mirror session state for session %s", m_sessionName.c_str());
Expand Down Expand Up @@ -3124,7 +3124,16 @@ void AclOrch::doAclRuleTask(Consumer &consumer)
}


newRule = AclRule::makeShared(type, this, m_mirrorOrch, m_dTelOrch, rule_id, table_id, t);
try
{
newRule = AclRule::makeShared(type, this, m_mirrorOrch, m_dTelOrch, rule_id, table_id, t);
}
catch (exception &e)
{
SWSS_LOG_ERROR("Error while creating ACL rule %s: %s", rule_id.c_str(), e.what());
it = consumer.m_toSync.erase(it);
return;
}

for (const auto& itr : kfvFieldsValues(t))
{
Expand Down
2 changes: 2 additions & 0 deletions orchagent/aclorch.h
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,8 @@ class AclOrch : public Orch, public Observer
// Get the OID for the ACL bind point for a given port
static bool getAclBindPortId(Port& port, sai_object_id_t& port_id);

using Orch::doTask; // Allow access to the basic doTask

private:
SwitchOrch *m_switchOrch;
void doTask(Consumer &consumer);
Expand Down
73 changes: 28 additions & 45 deletions orchagent/mirrororch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,6 @@ bool MirrorOrch::bake()
{
SWSS_LOG_ENTER();

// Freeze the route update during orchagent restoration
m_freeze = true;

deque<KeyOpFieldsValuesTuple> entries;
vector<string> keys;
m_mirrorTable.getKeys(keys);
Expand Down Expand Up @@ -134,23 +131,6 @@ bool MirrorOrch::bake()
return Orch::bake();
}

bool MirrorOrch::postBake()
{
SWSS_LOG_ENTER();

SWSS_LOG_NOTICE("Start MirrorOrch post-baking");

// Unfreeze the route update
m_freeze = false;

Orch::doTask();

// Clean up the recovery cache
m_recoverySessionMap.clear();

return Orch::postBake();
}

void MirrorOrch::update(SubjectType type, void *cntx)
{
SWSS_LOG_ENTER();
Expand Down Expand Up @@ -340,7 +320,7 @@ bool MirrorOrch::validateSrcPortList(const string& srcPortList)
return true;
}

void MirrorOrch::createEntry(const string& key, const vector<FieldValueTuple>& data)
task_process_status MirrorOrch::createEntry(const string& key, const vector<FieldValueTuple>& data)
{
SWSS_LOG_ENTER();

Expand All @@ -349,7 +329,7 @@ void MirrorOrch::createEntry(const string& key, const vector<FieldValueTuple>& d
{
SWSS_LOG_NOTICE("Failed to create session, session %s already exists",
key.c_str());
return;
return task_process_status::task_duplicated;
}

string platform = getenv("platform") ? getenv("platform") : "";
Expand All @@ -364,7 +344,7 @@ void MirrorOrch::createEntry(const string& key, const vector<FieldValueTuple>& d
if (!entry.srcIp.isV4())
{
SWSS_LOG_ERROR("Unsupported version of sessions %s source IP address", key.c_str());
return;
return task_process_status::task_invalid_entry;
}
}
else if (fvField(i) == MIRROR_SESSION_DST_IP)
Expand All @@ -373,7 +353,7 @@ void MirrorOrch::createEntry(const string& key, const vector<FieldValueTuple>& d
if (!entry.dstIp.isV4())
{
SWSS_LOG_ERROR("Unsupported version of sessions %s destination IP address", key.c_str());
return;
return task_process_status::task_invalid_entry;
}
}
else if (fvField(i) == MIRROR_SESSION_GRE_TYPE)
Expand All @@ -398,7 +378,7 @@ void MirrorOrch::createEntry(const string& key, const vector<FieldValueTuple>& d
{
SWSS_LOG_ERROR("Failed to get policer %s",
fvValue(i).c_str());
return;
return task_process_status::task_need_retry;
}

m_policerOrch->increaseRefCount(fvValue(i));
Expand All @@ -409,7 +389,7 @@ void MirrorOrch::createEntry(const string& key, const vector<FieldValueTuple>& d
if (!validateSrcPortList(fvValue(i)))
{
SWSS_LOG_ERROR("Failed to get valid source port list %s", fvValue(i).c_str());
return;
return task_process_status::task_invalid_entry;
}
entry.src_port = fvValue(i);
}
Expand All @@ -418,7 +398,7 @@ void MirrorOrch::createEntry(const string& key, const vector<FieldValueTuple>& d
if (!validateDstPort(fvValue(i)))
{
SWSS_LOG_ERROR("Failed to get valid destination port %s", fvValue(i).c_str());
return;
return task_process_status::task_invalid_entry;
}
entry.dst_port = fvValue(i);
}
Expand All @@ -428,7 +408,7 @@ void MirrorOrch::createEntry(const string& key, const vector<FieldValueTuple>& d
|| fvValue(i) == MIRROR_BOTH_DIRECTION))
{
SWSS_LOG_ERROR("Failed to get valid direction %s", fvValue(i).c_str());
return;
return task_process_status::task_invalid_entry;
}
entry.direction = fvValue(i);
}
Expand All @@ -439,18 +419,18 @@ void MirrorOrch::createEntry(const string& key, const vector<FieldValueTuple>& d
else
{
SWSS_LOG_ERROR("Failed to parse session %s configuration. Unknown attribute %s", key.c_str(), fvField(i).c_str());
return;
return task_process_status::task_invalid_entry;
}
}
catch (const exception& e)
{
SWSS_LOG_ERROR("Failed to parse session %s attribute %s error: %s.", key.c_str(), fvField(i).c_str(), e.what());
return;
return task_process_status::task_invalid_entry;
}
catch (...)
{
SWSS_LOG_ERROR("Failed to parse session %s attribute %s. Unknown error has been occurred", key.c_str(), fvField(i).c_str());
return;
return task_process_status::task_failed;
}
}

Expand All @@ -470,6 +450,8 @@ void MirrorOrch::createEntry(const string& key, const vector<FieldValueTuple>& d
// Attach the destination IP to the routeOrch
m_routeOrch->attach(this, entry.dstIp);
}

return task_process_status::task_success;
}

task_process_status MirrorOrch::deleteEntry(const string& name)
Expand Down Expand Up @@ -1412,11 +1394,6 @@ void MirrorOrch::doTask(Consumer& consumer)
{
SWSS_LOG_ENTER();

if (m_freeze)
{
return;
}

if (!gPortsOrch->allPortsReady())
{
return;
Expand All @@ -1429,26 +1406,32 @@ void MirrorOrch::doTask(Consumer& consumer)

string key = kfvKey(t);
string op = kfvOp(t);
task_process_status task_status = task_process_status::task_failed;

if (op == SET_COMMAND)
{
createEntry(key, kfvFieldsValues(t));
task_status = createEntry(key, kfvFieldsValues(t));
}
else if (op == DEL_COMMAND)
{
auto task_status = deleteEntry(key);
// Specifically retry the task when asked
if (task_status == task_process_status::task_need_retry)
{
it++;
continue;
}
task_status = deleteEntry(key);
}
else
{
SWSS_LOG_ERROR("Unknown operation type %s", op.c_str());
}

consumer.m_toSync.erase(it++);
// Specifically retry the task when asked
if (task_status == task_process_status::task_need_retry)
{
it++;
}
else
{
consumer.m_toSync.erase(it++);
}
}

// Clear any recovery state that might be leftover from warm reboot
m_recoverySessionMap.clear();
}
7 changes: 3 additions & 4 deletions orchagent/mirrororch.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,15 @@ class MirrorOrch : public Orch, public Observer, public Subject
PortsOrch *portOrch, RouteOrch *routeOrch, NeighOrch *neighOrch, FdbOrch *fdbOrch, PolicerOrch *policerOrch);

bool bake() override;
bool postBake() override;
void update(SubjectType, void *);
bool sessionExists(const string&);
bool getSessionStatus(const string&, bool&);
bool getSessionOid(const string&, sai_object_id_t&);
bool increaseRefCount(const string&);
bool decreaseRefCount(const string&);

using Orch::doTask; // Allow access to the basic doTask

private:
PortsOrch *m_portsOrch;
RouteOrch *m_routeOrch;
Expand All @@ -101,9 +102,7 @@ class MirrorOrch : public Orch, public Observer, public Subject
// session_name -> VLAN | monitor_port_alias | next_hop_ip
map<string, string> m_recoverySessionMap;

bool m_freeze = false;

void createEntry(const string&, const vector<FieldValueTuple>&);
task_process_status createEntry(const string&, const vector<FieldValueTuple>&);
task_process_status deleteEntry(const string&);

bool activateSession(const string&, MirrorEntry&);
Expand Down
7 changes: 0 additions & 7 deletions orchagent/orch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,13 +299,6 @@ bool Orch::bake()
return true;
}

bool Orch::postBake()
{
SWSS_LOG_ENTER();

return true;
}

/*
- Validates reference has proper format which is [table_name:object_name]
- validates table_name exists
Expand Down
5 changes: 2 additions & 3 deletions orchagent/orch.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ typedef enum
task_invalid_entry,
task_failed,
task_need_retry,
task_ignore
task_ignore,
task_duplicated
} task_process_status;

typedef struct
Expand Down Expand Up @@ -204,8 +205,6 @@ class Orch
// Prepare for warm start if Redis contains valid input data
// otherwise fallback to cold start
virtual bool bake();
// Clean up the state set in bake()
virtual bool postBake();

/* Iterate all consumers in m_consumerMap and run doTask(Consumer) */
virtual void doTask();
Expand Down
23 changes: 15 additions & 8 deletions orchagent/orchdaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ NeighOrch *gNeighOrch;
RouteOrch *gRouteOrch;
FgNhgOrch *gFgNhgOrch;
AclOrch *gAclOrch;
MirrorOrch *gMirrorOrch;
CrmOrch *gCrmOrch;
BufferOrch *gBufferOrch;
SwitchOrch *gSwitchOrch;
Expand Down Expand Up @@ -177,7 +178,7 @@ bool OrchDaemon::init()

TableConnector stateDbMirrorSession(m_stateDb, STATE_MIRROR_SESSION_TABLE_NAME);
TableConnector confDbMirrorSession(m_configDb, CFG_MIRROR_SESSION_TABLE_NAME);
MirrorOrch *mirror_orch = new MirrorOrch(stateDbMirrorSession, confDbMirrorSession, gPortsOrch, gRouteOrch, gNeighOrch, gFdbOrch, policer_orch);
gMirrorOrch = new MirrorOrch(stateDbMirrorSession, confDbMirrorSession, gPortsOrch, gRouteOrch, gNeighOrch, gFdbOrch, policer_orch);

TableConnector confDbAclTable(m_configDb, CFG_ACL_TABLE_TABLE_NAME);
TableConnector confDbAclRuleTable(m_configDb, CFG_ACL_RULE_TABLE_NAME);
Expand Down Expand Up @@ -273,10 +274,10 @@ bool OrchDaemon::init()
dtel_orch = new DTelOrch(m_configDb, dtel_tables, gPortsOrch);
m_orchList.push_back(dtel_orch);
}
gAclOrch = new AclOrch(acl_table_connectors, gSwitchOrch, gPortsOrch, mirror_orch, gNeighOrch, gRouteOrch, dtel_orch);
gAclOrch = new AclOrch(acl_table_connectors, gSwitchOrch, gPortsOrch, gMirrorOrch, gNeighOrch, gRouteOrch, dtel_orch);

m_orchList.push_back(gFdbOrch);
m_orchList.push_back(mirror_orch);
m_orchList.push_back(gMirrorOrch);
m_orchList.push_back(gAclOrch);
m_orchList.push_back(chassis_frontend_orch);
m_orchList.push_back(vrf_orch);
Expand Down Expand Up @@ -548,18 +549,24 @@ bool OrchDaemon::warmRestoreAndSyncUp()

for (auto it = 0; it < 3; it++)
{
SWSS_LOG_DEBUG("The current iteration is %d", it);
SWSS_LOG_DEBUG("The current doTask iteration is %d", it);

for (Orch *o : m_orchList)
{
if (o == gMirrorOrch) {
SWSS_LOG_DEBUG("Skipping mirror processing until the end");
continue;
}

o->doTask();
}
}

for (Orch *o : m_orchList)
{
o->postBake();
}
// MirrorOrch depends on everything else being settled before it can run,
// and mirror ACL rules depend on MirrorOrch, so run these two at the end
// after the rest of the data has been processed.
gMirrorOrch->doTask();
gAclOrch->doTask();

/*
* At this point, all the pre-existing data should have been processed properly, and
Expand Down
13 changes: 11 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1021,16 +1021,25 @@ def remove_neighbor(self, interface, ip):
tbl._del(interface + ":" + ip)
time.sleep(1)

# deps: mirror_port_erspan
# deps: mirror_port_erspan, warm_reboot
def add_route(self, prefix, nexthop):
self.runcmd("ip route add " + prefix + " via " + nexthop)
time.sleep(1)

# deps: mirror_port_erspan
# deps: mirror_port_erspan, warm_reboot
def change_route(self, prefix, nexthop):
self.runcmd("ip route change " + prefix + " via " + nexthop)
time.sleep(1)

# deps: warm_reboot
def change_route_ecmp(self, prefix, nexthops):
cmd = ""
for nexthop in nexthops:
cmd += " nexthop via " + nexthop

self.runcmd("ip route change " + prefix + cmd)
time.sleep(1)

# deps: acl, mirror_port_erspan
def remove_route(self, prefix):
self.runcmd("ip route del " + prefix)
Expand Down
Loading

0 comments on commit b7e4410

Please sign in to comment.