Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[orchagent]: Remove global lock caused by notifications running in another thread #478

Merged
merged 7 commits into from
Apr 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ fpmsyncd/fpmsyncd
intfsyncd/intfsyncd
cfgmgr/intfmgrd
cfgmgr/vlanmgrd
cfgmgr/buffermanager
neighsyncd/neighsyncd
portsyncd/portsyncd
orchagent/orchagent
orchagent/routeresync
swssconfig/swssconfig
swssconfig/swssplayer
tests/tests

91 changes: 63 additions & 28 deletions orchagent/fdborch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "fdborch.h"
#include "crmorch.h"
#include "notifier.h"
#include "sai_serialize.h"

extern sai_fdb_api_t *sai_fdb_api;

Expand All @@ -24,9 +25,15 @@ FdbOrch::FdbOrch(DBConnector *db, string tableName, PortsOrch *port) :
m_table(Table(db, tableName))
{
m_portsOrch->attach(this);
auto consumer = new NotificationConsumer(db, "FLUSHFDBREQUEST");
auto fdbNotification = new Notifier(consumer, this);
Orch::addExecutor("", fdbNotification);
m_flushNotificationsConsumer = new NotificationConsumer(db, "FLUSHFDBREQUEST");
auto flushNotifier = new Notifier(m_flushNotificationsConsumer, this);
Orch::addExecutor("", flushNotifier);

/* Add FDB notifications support from ASIC */
DBConnector *notificationsDb = new DBConnector(ASIC_DB, DBConnector::DEFAULT_UNIXSOCKET, 0);
m_fdbNotificationConsumer = new swss::NotificationConsumer(notificationsDb, "NOTIFICATIONS");
auto fdbNotifier = new Notifier(m_fdbNotificationConsumer, this);
Orch::addExecutor("FDB_NOTIFICATIONS", fdbNotifier);
}

void FdbOrch::update(sai_fdb_event_t type, const sai_fdb_entry_t* entry, sai_object_id_t bridge_port_id)
Expand Down Expand Up @@ -290,36 +297,64 @@ void FdbOrch::doTask(NotificationConsumer& consumer)

consumer.pop(op, data, values);

if (op == "ALL")
if (&consumer == m_flushNotificationsConsumer)
{
/*
* so far only support flush all the FDB entris
* flush per port and flush per vlan will be added later.
*/
status = sai_fdb_api->flush_fdb_entries(gSwitchId, 0, NULL);
if (status != SAI_STATUS_SUCCESS)
if (op == "ALL")
{
SWSS_LOG_ERROR("Flush fdb failed, return code %x", status);
}
/*
* so far only support flush all the FDB entris
* flush per port and flush per vlan will be added later.
*/
status = sai_fdb_api->flush_fdb_entries(gSwitchId, 0, NULL);
if (status != SAI_STATUS_SUCCESS)
{
SWSS_LOG_ERROR("Flush fdb failed, return code %x", status);
}

return;
}
else if (op == "PORT")
{
/*place holder for flush port fdb*/
SWSS_LOG_ERROR("Received unsupported flush port fdb request");
return;
}
else if (op == "VLAN")
{
/*place holder for flush vlan fdb*/
SWSS_LOG_ERROR("Received unsupported flush vlan fdb request");
return;
return;
}
else if (op == "PORT")
{
/*place holder for flush port fdb*/
SWSS_LOG_ERROR("Received unsupported flush port fdb request");
return;
}
else if (op == "VLAN")
{
/*place holder for flush vlan fdb*/
SWSS_LOG_ERROR("Received unsupported flush vlan fdb request");
return;
}
else
{
SWSS_LOG_ERROR("Received unknown flush fdb request");
return;
}
}
else
else if (&consumer == m_fdbNotificationConsumer && op == "fdb_event")
{
SWSS_LOG_ERROR("Received unknown flush fdb request");
return;
uint32_t count;
sai_fdb_event_notification_data_t *fdbevent = nullptr;

sai_deserialize_fdb_event_ntf(data, count, &fdbevent);

for (uint32_t i = 0; i < count; ++i)
{
sai_object_id_t oid = SAI_NULL_OBJECT_ID;

for (uint32_t j = 0; j < fdbevent[i].attr_count; ++j)
{
if (fdbevent[i].attr[j].id == SAI_FDB_ENTRY_ATTR_BRIDGE_PORT_ID)
{
oid = fdbevent[i].attr[j].value.oid;
break;
}
}

this->update(fdbevent[i].event_type, &fdbevent[i].fdb_entry, oid);

sai_deserialize_free_fdb_event_ntf(count, fdbevent);
}
}
}

Expand Down
2 changes: 2 additions & 0 deletions orchagent/fdborch.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ class FdbOrch: public Orch, public Subject, public Observer
set<FdbEntry> m_entries;
fdb_entries_by_port_t saved_fdb_entries;
Table m_table;
NotificationConsumer* m_flushNotificationsConsumer;
NotificationConsumer* m_fdbNotificationConsumer;

void doTask(Consumer& consumer);
void doTask(NotificationConsumer& consumer);
Expand Down
4 changes: 0 additions & 4 deletions orchagent/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ extern "C" {
#include <iostream>
#include <unordered_map>
#include <map>
#include <mutex>
#include <thread>
#include <chrono>
#include <getopt.h>
Expand Down Expand Up @@ -47,9 +46,6 @@ bool gLogRotate = false;
ofstream gRecordOfs;
string gRecordFile;

/* Global database mutex */
mutex gDbMutex;

void usage()
{
cout << "usage: orchagent [-h] [-r record_type] [-d record_location] [-b batch_size] [-m MAC]" << endl;
Expand Down
60 changes: 4 additions & 56 deletions orchagent/notifications.cpp
Original file line number Diff line number Diff line change
@@ -1,72 +1,20 @@
#include <unordered_map>
#include <mutex>
#include <assert.h>

#include "portsorch.h"
#include "fdborch.h"

extern "C" {
#include "sai.h"
}

#include "logger.h"
#include "notifications.h"

extern mutex gDbMutex;
extern PortsOrch *gPortsOrch;
extern FdbOrch *gFdbOrch;

void on_fdb_event(uint32_t count, sai_fdb_event_notification_data_t *data)
{
SWSS_LOG_ENTER();

lock_guard<mutex> lock(gDbMutex);

if (!gFdbOrch)
{
SWSS_LOG_NOTICE("gFdbOrch is not initialized");
return;
}

for (uint32_t i = 0; i < count; ++i)
{
sai_object_id_t oid = SAI_NULL_OBJECT_ID;

for (uint32_t j = 0; j < data[i].attr_count; ++j)
{
if (data[i].attr[j].id == SAI_FDB_ENTRY_ATTR_BRIDGE_PORT_ID)
{
oid = data[i].attr[j].value.oid;
break;
}
}

gFdbOrch->update(data[i].event_type, &data[i].fdb_entry, oid);
}
// don't use this event handler, because it runs by libsairedis in a separate thread
// which causes concurrency access to the DB
}

void on_port_state_change(uint32_t count, sai_port_oper_status_notification_t *data)
{
SWSS_LOG_ENTER();

lock_guard<mutex> lock(gDbMutex);

if (!gPortsOrch)
{
SWSS_LOG_NOTICE("gPortsOrch is not initialized");
return;
}

for (uint32_t i = 0; i < count; i++)
{
sai_object_id_t id = data[i].port_id;
sai_port_oper_status_t status = data[i].port_state;

SWSS_LOG_NOTICE("Get port state change notification id:%lx status:%d", id, status);

gPortsOrch->updateDbPortOperStatus(id, status);
gPortsOrch->setHostIntfsOperStatus(id, status == SAI_PORT_OPER_STATUS_UP);
}
// don't use this event handler, because it runs by libsairedis in a separate thread
// which causes concurrency access to the DB
}

void on_switch_shutdown_request()
Expand Down
6 changes: 0 additions & 6 deletions orchagent/orch.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#include <fstream>
#include <iostream>
#include <mutex>
#include <sys/time.h>
#include "timestamp.h"
#include "orch.h"
Expand All @@ -15,8 +14,6 @@ using namespace swss;

extern int gBatchSize;

extern mutex gDbMutex;

extern bool gSwssRecord;
extern ofstream gRecordOfs;
extern bool gLogRotate;
Expand Down Expand Up @@ -73,9 +70,6 @@ void Consumer::execute()
{
SWSS_LOG_ENTER();

// TODO: remove DbMutex when there is only single thread
lock_guard<mutex> lock(gDbMutex);

std::deque<KeyOpFieldsValuesTuple> entries;
getConsumerTable()->pops(entries);

Expand Down
50 changes: 50 additions & 0 deletions orchagent/portsorch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "sai_serialize.h"
#include "crmorch.h"
#include "countercheckorch.h"
#include "notifier.h"

extern sai_switch_api_t *sai_switch_api;
extern sai_bridge_api_t *sai_bridge_api;
Expand Down Expand Up @@ -235,6 +236,12 @@ PortsOrch::PortsOrch(DBConnector *db, vector<table_name_with_pri_t> &tableNames)

removeDefaultVlanMembers();
removeDefaultBridgePorts();

/* Add port oper status notification support */
DBConnector *notificationsDb = new DBConnector(ASIC_DB, DBConnector::DEFAULT_UNIXSOCKET, 0);
m_portStatusNotificationConsumer = new swss::NotificationConsumer(notificationsDb, "NOTIFICATIONS");
auto portStatusNotificatier = new Notifier(m_portStatusNotificationConsumer, this);
Orch::addExecutor("PORT_STATUS_NOTIFICATIONS", portStatusNotificatier);
}

void PortsOrch::removeDefaultVlanMembers()
Expand Down Expand Up @@ -2330,3 +2337,46 @@ bool PortsOrch::removeLagMember(Port &lag, Port &port)

return true;
}

void PortsOrch::doTask(NotificationConsumer &consumer)
{
SWSS_LOG_ENTER();

/* Wait for all ports to be initialized */
if (!isInitDone())
{
return;
}

std::string op;
std::string data;
std::vector<swss::FieldValueTuple> values;

consumer.pop(op, data, values);

if (&consumer != m_portStatusNotificationConsumer)
{
return;
}

if (op == "port_state_change")
{
uint32_t count;
sai_port_oper_status_notification_t *portoperstatus = nullptr;

sai_deserialize_port_oper_status_ntf(data, count, &portoperstatus);

for (uint32_t i = 0; i < count; i++)
{
sai_object_id_t id = portoperstatus[i].port_id;
sai_port_oper_status_t status = portoperstatus[i].port_state;

SWSS_LOG_NOTICE("Get port state change notification id:%lx status:%d", id, status);

this->updateDbPortOperStatus(id, status);
this->setHostIntfsOperStatus(id, status == SAI_PORT_OPER_STATUS_UP);
}

sai_deserialize_free_port_oper_status_ntf(count, portoperstatus);
}
}
4 changes: 4 additions & 0 deletions orchagent/portsorch.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,17 @@ class PortsOrch : public Orch, public Subject
map<set<int>, tuple<string, uint32_t>> m_lanesAliasSpeedMap;
map<string, Port> m_portList;

NotificationConsumer* m_portStatusNotificationConsumer;

void doTask(Consumer &consumer);
void doPortTask(Consumer &consumer);
void doVlanTask(Consumer &consumer);
void doVlanMemberTask(Consumer &consumer);
void doLagTask(Consumer &consumer);
void doLagMemberTask(Consumer &consumer);

void doTask(NotificationConsumer &consumer);

void removeDefaultVlanMembers();
void removeDefaultBridgePorts();

Expand Down