Skip to content

Commit

Permalink
Merge pull request #516 from noironetworks/persist
Browse files Browse the repository at this point in the history
Add support for coming up with a persist policy
  • Loading branch information
mchalla authored May 9, 2024
2 parents 81e16aa + a1e4e3c commit 425c6dd
Show file tree
Hide file tree
Showing 9 changed files with 251 additions and 7 deletions.
27 changes: 27 additions & 0 deletions agent-ovs/lib/Agent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ void Agent::setProperties(const boost::property_tree::ptree& properties) {
static const std::string BEHAVIOR_L34FLOWS_WITHOUT_SUBNET("behavior.l34flows-without-subnet");
static const std::string OPFLEX_ASYC_JSON("opflex.asyncjson.enabled");
static const std::string OVS_ASYNC_JSON("ovs.asyncjson.enabled");
static const std::string OPFLEX_POLICY_FILE("opflex.startup.policy-file");
static const std::string OPFLEX_LOCAL_RESOLVE_AFTER_CONNECTION("opflex.startup.resolve-aft-conn");
static const std::string OPFLEX_STARTUP_POLICY_DURATION("opflex.startup.policy-duration");

// set feature flags to true
clearFeatureFlags();
Expand Down Expand Up @@ -523,6 +526,27 @@ void Agent::setProperties(const boost::property_tree::ptree& properties) {
if (ovsAsyncJsonEnabled.get() == true)
setenv("OVS_USE_ASYNC_JSON", "", true);
}

optional<std::string> policyFile =
properties.get_optional<std::string>(OPFLEX_POLICY_FILE);
if (policyFile) {
opflexPolicyFile = policyFile;
LOG(INFO) << "Startup policy file set to " << opflexPolicyFile;
}

optional<uint64_t> startup_policy_duration_present =
properties.get_optional<uint64_t>(OPFLEX_STARTUP_POLICY_DURATION);
if (startup_policy_duration_present) {
startupPolicyDuration = startup_policy_duration_present.get()*1000;
LOG(INFO) << "Startup policy duration set to " << startupPolicyDuration << " ms";
}

optional<bool> lResolveAftConn =
properties.get_optional<bool>(OPFLEX_LOCAL_RESOLVE_AFTER_CONNECTION);
if (lResolveAftConn) {
localResolveAftConn = lResolveAftConn.get();
LOG(INFO) << "Startup policy resolve after connection set to " << localResolveAftConn;
}
}

void Agent::applyProperties() {
Expand Down Expand Up @@ -581,6 +605,9 @@ void Agent::applyProperties() {
framework.setPolicyRetryDelayTimerDuration(policy_retry_delay_timer*1000);
framework.setHandshakeTimeout(peerHandshakeTimeout);
framework.setKeepaliveTimeout(keepaliveTimeout);
framework.setStartupPolicy(opflexPolicyFile, modelgbp::getMetadata(),
startupPolicyDuration,
localResolveAftConn);
}

void Agent::start() {
Expand Down
5 changes: 5 additions & 0 deletions agent-ovs/lib/include/opflexagent/Agent.h
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,9 @@ typedef opflex::ofcore::OFConstants::OpflexElementMode opflex_elem_t;
/* How long to wait from platform config to switch Sync */
uint32_t switch_sync_delay = 5; /* seconds */
uint32_t switch_sync_dynamic = 0; /* dynamic retry default 0 no retry */
// startup policy duration from new connection in seconds
uint64_t startupPolicyDuration = 0; /* seconds */
bool localResolveAftConn = false; /* local resolve after conn estb */

std::set<std::string> endpointSourceFSPaths;
std::set<std::string> disabledFeaturesSet;
Expand Down Expand Up @@ -418,6 +421,8 @@ typedef opflex::ofcore::OFConstants::OpflexElementMode opflex_elem_t;
std::unordered_set<std::string> prometheusEpAttributes;
bool behaviorL34FlowsWithoutSubnet;
LogParams logParams;
/* Persistent policy from disk */
boost::optional<std::string> opflexPolicyFile;
};

} /* namespace opflexagent */
Expand Down
19 changes: 19 additions & 0 deletions agent-ovs/opflex-agent-ovs.conf.in
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,25 @@

// Configuration related to the OpFlex protocol
"opflex": {
// // This section controls how persistant config is treated
// // during opflex startup
// "startup": {
// // Location of the pol.json that will be read and used on startup
// // will be provided by the orchestrator
// "policy-file": "/foo/bar/pol.json",
// // How long to use the above file after agent connects to the leaf
// // default is 0 meaning as soon as agent connects to leaf we will
// // stop using the local policy for future resolves
// "policy-duration": 0,
// // Wait till opflex connects to leaf before using the local policy
// // default 0, use in combination with policy-duration > 0
// // This is useful if you want to preserve the old flows until
// // the leaf connection succeeds and not overwrite them before we
// // connect to the leaf using the local policy
// // A related knob timers.switch-sync-delay controls after connection
// // how much more longer to freeze the flow tables
// "resolve-aft-conn": false
// },
// The policy domain for this agent.
"domain": "openstack",

Expand Down
8 changes: 5 additions & 3 deletions libopflex/engine/MOSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ void MOSerializer::deserialize_enum(modb::mointernal::StoreClient& client,
void MOSerializer::deserialize(const rapidjson::Value& mo,
modb::mointernal::StoreClient& client,
bool replaceChildren,
/* out */ modb::mointernal::StoreClient::notif_t* notifs) {
/* out */ modb::mointernal::StoreClient::notif_t* notifs,
bool skiplocal) {
if (!mo.IsObject()
|| !mo.HasMember("uri")
|| !mo.HasMember("subject")) return;
Expand All @@ -120,6 +121,7 @@ void MOSerializer::deserialize(const rapidjson::Value& mo,
try {
URI uri(uriv.GetString());
const ClassInfo& ci = store->getClassInfo(classv.GetString());
if (skiplocal && ci.getOwner() != "policyreg") return;
std::shared_ptr<ObjectInstance> oi =
std::make_shared<ObjectInstance>(ci.getId(), false);
if (mo.HasMember("properties")) {
Expand Down Expand Up @@ -412,7 +414,7 @@ void MOSerializer::dumpMODB(const std::string& file, bool excludeObservable) {
LOG(INFO) << "Wrote MODB to " << file;
}

size_t MOSerializer::readMOs(FILE* pfile, StoreClient& client) {
size_t MOSerializer::readMOs(FILE* pfile, StoreClient& client, bool skiplocal) {
char buffer[1024];
rapidjson::FileReadStream f(pfile, buffer, sizeof(buffer));
rapidjson::Document d;
Expand All @@ -425,7 +427,7 @@ size_t MOSerializer::readMOs(FILE* pfile, StoreClient& client) {
size_t i = 0;
for (moit = d.Begin(); moit != d.End(); ++ moit) {
const rapidjson::Value& mo = *moit;
deserialize(mo, client, true, NULL);
deserialize(mo, client, true, NULL, skiplocal);
i += 1;
}
return i;
Expand Down
134 changes: 132 additions & 2 deletions libopflex/engine/Processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include <limits>
#include <cmath>
#include <random>
#include <rapidjson/document.h>
#include <rapidjson/filereadstream.h>

#include <boost/generator_iterator.hpp>
#include <boost/tuple/tuple.hpp>
Expand Down Expand Up @@ -66,7 +68,8 @@ Processor::Processor(ObjectStore* store_, ThreadManager& threadManager_)
processingDelay(DEFAULT_PROC_DELAY),
retryDelay(DEFAULT_RETRY_DELAY),
proc_loop(nullptr),
proc_active(false) {
proc_active(false),
startupdb(s_threadManager) {
cleanup_async = {};
proc_async = {};
connect_async = {};
Expand Down Expand Up @@ -189,7 +192,8 @@ bool Processor::isObjNew(const URI& uri) {
// ancestor that has a zero refcount.
bool Processor::isOrphan(const item& item) {
// simplest case: refcount is nonzero or item is local
if (item.details->local || item.details->refcount > 0)
// since startupdb is known state skip orphan check when using it
if (item.details->local || item.details->refcount > 0 || shouldResolveLocal())
return false;

try {
Expand Down Expand Up @@ -269,6 +273,83 @@ void Processor::sendToRole(const item& i, uint64_t& newexp,
}
}

bool Processor::shouldResolveLocal() {
uint64_t curtime = now(proc_loop);
// check >= for duration 0 because now wont update till next uv_loop
if (!opflexPolicyFile
|| ((newConnectiontime != 0)
&& (curtime >= (newConnectiontime + startupPolicyDuration)))
|| ((newConnectiontime == 0)
&& local_resolve_after_connection))
return false;
else
return true;
}

void Processor::resolveObjLocal(const modb::class_id_t& class_id,
const modb::URI& uri,
StoreClient::notif_t& notifs) {
if (!shouldResolveLocal()) return;
std::shared_ptr<const modb::mointernal::ObjectInstance> oi;
StoreClient& s_client = startupdb.getReadOnlyStoreClient();
// Get from startupdb
s_client.get(class_id, uri, oi);
if (!oi) {
LOG(DEBUG) << "Local policy missing for " << class_id
<< " " << uri;
} else {
LOG(DEBUG) << "Local policy resolved for " << class_id
<< " " << uri;
// Put in activedb
if (client->putIfModified(class_id, uri, oi)) {
client->queueNotification(class_id, uri, notifs);
LOG(DEBUG) << "QUEUE NOTIF for " << class_id
<< " : " << uri;
}
// check if this mo has a parent
try {
std::pair<modb::URI, modb::prop_id_t> parent(modb::URI::ROOT, 0);
if (s_client.getParent(class_id, uri, parent)) {
const modb::ClassInfo& parent_class =
startupdb.getPropClassInfo(parent.second);
const modb::PropertyInfo& parent_prop =
parent_class.getProperty(parent.second);
if (client->isPresent(parent_class.getId(), parent.first)) {
if (client->addChild(parent_class.getId(),
parent.first,
parent_prop.getId(),
class_id,
uri)) {
client->queueNotification(parent_class.getId(),
parent.first,
notifs);
}
}
}
} catch (const std::out_of_range& e) {
// no parent class or property found
LOG(ERROR) << "Invalid parent or property for "
<< uri;
}
// recursively add children
const ClassInfo& ci = startupdb.getClassInfo(class_id);
const ClassInfo::property_map_t& pmap = ci.getProperties();
ClassInfo::property_map_t::const_iterator it;
for (it = pmap.begin(); it != pmap.end(); ++it) {
if (it->second.getType() == PropertyInfo::COMPOSITE) {
class_id_t prop_class = it->second.getClassId();
prop_id_t prop_id = it->second.getId();
std::vector<URI> children;
s_client.getChildren(class_id, uri, prop_id, prop_class, children);
std::vector<URI>::iterator cit;
for (cit = children.begin(); cit != children.end(); ++cit) {
resolveObjLocal(prop_class, *cit, notifs);
}
}
}
}
}

bool Processor::resolveObj(ClassInfo::class_type_t type, const item& i,
uint64_t& newexp, bool checkTime) {
uint64_t curTime = now(proc_loop);
Expand All @@ -286,6 +367,11 @@ bool Processor::resolveObj(ClassInfo::class_type_t type, const item& i,
case ClassInfo::POLICY:
{
LOG(DEBUG) << "Resolving policy " << i.uri;
if (checkTime && shouldResolveLocal()) {
StoreClient::notif_t notifs;
resolveObjLocal(i.details->class_id, i.uri, notifs);
client->deliverNotifications(notifs);
}
i.details->resolve_time = curTime;
vector<reference_t> refs;
refs.emplace_back(i.details->class_id, i.uri);
Expand All @@ -298,6 +384,11 @@ bool Processor::resolveObj(ClassInfo::class_type_t type, const item& i,
case ClassInfo::REMOTE_ENDPOINT:
{
LOG(DEBUG) << "Resolving remote endpoint " << i.uri;
if (checkTime && shouldResolveLocal()) {
StoreClient::notif_t notifs;
resolveObjLocal(i.details->class_id, i.uri, notifs);
client->deliverNotifications(notifs);
}
i.details->resolve_time = curTime;
vector<reference_t> refs;
refs.emplace_back(i.details->class_id, i.uri);
Expand Down Expand Up @@ -604,13 +695,47 @@ bool Processor::waitForPendingItems(uint32_t& wait) {
return pool.waitForPendingItems(wait);
}

void Processor::setStartupPolicy(boost::optional<std::string>& file,
const modb::ModelMetadata& model,
uint64_t& duration,
bool& resolve_after_connection) {
opflexPolicyFile = file;
if (file) {
startupdb.init(model);
startupPolicyDuration = duration;
local_resolve_after_connection = resolve_after_connection;
}
}

size_t Processor::readStartupPolicy() {
if (!opflexPolicyFile) {
LOG(DEBUG) << "Skip missing startup policy read";
return 0;
}

FILE* pfile = fopen(opflexPolicyFile.get().c_str(), "r");
if (pfile == NULL) {
LOG(ERROR) << "Could not open policy file "
<< opflexPolicyFile.get() << " for reading";
return 0;
}

startupdb.start();
MOSerializer s_serializer(&startupdb);
StoreClient& s_client = startupdb.getStoreClient("_SYSTEM_");
return s_serializer.readMOs(pfile, s_client, true);
}

void Processor::start(ofcore::OFConstants::OpflexElementMode agent_mode) {
if (proc_active) return;
proc_active = true;
pool.setClientMode(agent_mode);

LOG(DEBUG) << "Starting OpFlex Processor";

size_t objs = readStartupPolicy();
LOG(DEBUG) << "Read " << objs << " objects from startup policy";

client = &store->getStoreClient("_SYSTEM_");
store->forEachClass(&register_listeners, this);

Expand Down Expand Up @@ -739,6 +864,11 @@ void Processor::handleNewConnections() {
const std::lock_guard<std::mutex> lock(item_mutex);
obj_state_by_uri& uri_index = obj_state.get<uri_tag>();

// Only set this for the first connection after startup
if (newConnectiontime == 0) {
newConnectiontime = now(proc_loop);
}

for (const item& i : obj_state) {
uint64_t newexp = i.expiration;
const ClassInfo& ci = store->getClassInfo(i.details->class_id);
Expand Down
38 changes: 38 additions & 0 deletions libopflex/engine/include/opflex/engine/Processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,17 @@ class Processor : public internal::AbstractObjectListener,
*/
bool waitForPendingItems(uint32_t& wait);

/**
* Set startup policy
* @param file startup policy file name or boost::none
* @param model the model for initializing startupdb
* @param duration the amount of time in ms from new
* connection to continue using startupdb
*/
void setStartupPolicy(boost::optional<std::string>& file,
const modb::ModelMetadata& model,
uint64_t& duration,
bool& resolve_after_connection);
/**
* Enable/Disable reporting of observable changes to registered observers
*
Expand Down Expand Up @@ -576,11 +587,33 @@ class Processor : public internal::AbstractObjectListener,
*/
uint64_t policyRefTimerDuration = 1000*DEFAULT_PRR_TIMER_DURATION/2;

/**
* new connection timestamp in msecs
*/
volatile int64_t newConnectiontime = 0;

/**
* local resolves only after a new connection till startupPolicyDuration
*/
volatile bool local_resolve_after_connection;

/**
* Startup policy timeout in ms. This is the amount of time
* from new connection that we would continue using the startup
* policy to resolve Mos from startupdb.
*/
uint64_t startupPolicyDuration;

/**
* Processing thread
*/
uv_loop_t* proc_loop;
boost::atomic<bool> proc_active;
/* Persistent policy from disk */
boost::optional<std::string> opflexPolicyFile;
util::ThreadManager s_threadManager;
modb::ObjectStore startupdb;

uv_async_t cleanup_async;
uv_async_t proc_async;
uv_async_t connect_async;
Expand Down Expand Up @@ -608,6 +641,11 @@ class Processor : public internal::AbstractObjectListener,
bool declareObj(modb::ClassInfo::class_type_t type, const item& it,
uint64_t& newexp);
void handleNewConnections();
size_t readStartupPolicy();
bool shouldResolveLocal();
void resolveObjLocal(const modb::class_id_t& class_id,
const modb::URI& uri,
modb::mointernal::StoreClient::notif_t& notifs);
};

} /* namespace engine */
Expand Down
Loading

0 comments on commit 425c6dd

Please sign in to comment.