Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async time coordinator #2384

Merged
merged 10 commits into from
Jul 1, 2022
4 changes: 4 additions & 0 deletions docs/references/configuration_options_reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,9 @@ String used to define the configuration of the broker if one is autogenerated. S
- `--file_log_level=` - Specifies the level of logging to file for this broker.
- `--console_log_level=` - Specifies the level of logging to file for this broker.
- `--dumplog` - Captures a record of all logging messages and writes them out to file or console when the broker terminates.
- `--globaltime` - Specify that the broker should use a globalTime coordinator to coordinate a master clock time with all federates.
- `--asynctime` - Specify that the federation should use the asynchronous time coordinator (only minimal time management is handled in HELICS and federates are allowed to operate independently).
- `--timing = ("async"|"global"|"default"|"distributed")` - specify the timing mode to use for time coordination
- `--tick=` - Heartbeat period in ms. When brokers fail to respond after 2 ticks secondary actions are taking to confirm the broker is still connected to the federation. Times can also be entered as strings such as "15s" or "75ms".
- `--timeout=` milliseconds to wait for all the federates to connect to the broker (can also be entered as a time like '10s' or '45ms')
- `--network_timeout=` - Time to establish a socket connection in ms. Times can also be entered as strings such as "15s" or "75ms".
Expand Down Expand Up @@ -385,6 +388,7 @@ _Valid values:_
- `interfaces` - `HELICS_LOG_LEVEL_INTERFACES`
- `timing` - `HELICS_LOG_LEVEL_TIMING`
- `data` - `HELICS_LOG_LEVEL_DATA`
- `debug` - `HELICS_LOG_LEVEL_DEBUG`
- `trace` - `HELICS_LOG_LEVEL_TRACE`

Determines the level of detail for log messages. In the list above, the keywords on the left can be used when specifying the logging level via a JSON configuration file. The enumerations on the right are used when configuring via the API.
Expand Down
10 changes: 5 additions & 5 deletions src/helics/application_api/MessageOperators.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ MessageConditionalOperator::MessageConditionalOperator(
}

void MessageConditionalOperator::setConditionFunction(
std::function<bool(const Message*)> userConditionalFunction)
std::function<bool(const Message*)> userConditionFunction)
{
evalFunction = std::move(userConditionalFunction);
evalFunction = std::move(userConditionFunction);
}

std::unique_ptr<Message> MessageConditionalOperator::process(std::unique_ptr<Message> message)
Expand Down Expand Up @@ -151,17 +151,17 @@ std::unique_ptr<Message> FirewallOperator::process(std::unique_ptr<Message> mess
break;
case operations::set_flag1:
if (res) {
setActionFlag(*message, extra_flag1);
setActionFlag(*message, user_custom_message_flag1);
}
break;
case operations::set_flag2:
if (res) {
setActionFlag(*message, extra_flag2);
setActionFlag(*message, user_custom_message_flag2);
}
break;
case operations::set_flag3:
if (res) {
setActionFlag(*message, extra_flag3);
setActionFlag(*message, user_custom_message_flag3);
}
break;
case operations::none:
Expand Down
128 changes: 128 additions & 0 deletions src/helics/core/AsyncTimeCoordinator.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
Copyright (c) 2017-2022,
Battelle Memorial Institute; Lawrence Livermore National Security, LLC; Alliance for Sustainable
Energy, LLC. See the top-level NOTICE for additional details. All rights reserved.
SPDX-License-Identifier: BSD-3-Clause
*/

#include "AsyncTimeCoordinator.hpp"

#include "../common/fmt_format.h"
#include "flagOperations.hpp"
#include "helics_definitions.hpp"

#include "json/json.h"
#include <algorithm>
#include <iostream>
#include <set>
#include <string>
#include <vector>

namespace helics {

bool AsyncTimeCoordinator::updateTimeFactors()
{
auto timeStream = generateMinTimeTotal(dependencies, true, mSourceId, NoIgnoredFederates, 0);
currentMinTime = timeStream.next;
currentTimeState = timeStream.mTimeState;

return false;
}

void AsyncTimeCoordinator::generateDebuggingTimeInfo(Json::Value& base) const
{
base["type"] = "global";
base["nextEvent"] = static_cast<double>(nextEvent);
addTimeState(base, currentTimeState);
base["minTime"] = static_cast<double>(currentMinTime);
BaseTimeCoordinator::generateDebuggingTimeInfo(base);
}

std::string AsyncTimeCoordinator::printTimeStatus() const
{
return fmt::format(R"raw({{"time_next":{}, "Te":{}}})raw",
static_cast<double>(currentMinTime),
static_cast<double>(nextEvent));
}

MessageProcessingResult AsyncTimeCoordinator::checkExecEntry(GlobalFederateId /*triggerFed*/)
{
auto ret = MessageProcessingResult::CONTINUE_PROCESSING;

executionMode = true;
ret = MessageProcessingResult::NEXT_STEP;

currentMinTime = timeZero;
currentTimeState = TimeState::time_granted;
nextEvent = timeZero;

ActionMessage execgrant(CMD_EXEC_GRANT);
execgrant.source_id = mSourceId;
transmitTimingMessagesDownstream(execgrant);
transmitTimingMessagesUpstream(execgrant);
return ret;
}

void AsyncTimeCoordinator::transmitTimingMessagesUpstream(ActionMessage& msg) const
{
if (!sendMessageFunction) {
return;
}

for (const auto& dep : dependencies) {
if (dep.connection == ConnectionType::child) {
continue;
}
if (!dep.dependent) {
continue;
}
msg.dest_id = dep.fedID;
if (msg.action() == CMD_EXEC_REQUEST) {
msg.setExtraDestData(dep.sequenceCounter);
}
sendMessageFunction(msg);
}
}

void AsyncTimeCoordinator::transmitTimingMessagesDownstream(ActionMessage& msg,
GlobalFederateId skipFed) const
{
if (!sendMessageFunction) {
return;
}
if ((msg.action() == CMD_TIME_REQUEST || msg.action() == CMD_TIME_GRANT)) {
for (const auto& dep : dependencies) {
if (dep.connection != ConnectionType::child) {
continue;
}
if (!dep.dependent) {
continue;
}
if (dep.fedID == skipFed) {
continue;
}
if (dep.dependency) {
if (dep.next > msg.actionTime) {
continue;
}
}
msg.dest_id = dep.fedID;
sendMessageFunction(msg);
}
} else {
for (const auto& dep : dependencies) {
if (dep.dependent) {
if (dep.fedID == skipFed) {
continue;
}
if (msg.action() == CMD_EXEC_REQUEST) {
msg.setExtraDestData(dep.sequenceCounter);
}
msg.dest_id = dep.fedID;
sendMessageFunction(msg);
}
}
}
}

} // namespace helics
68 changes: 68 additions & 0 deletions src/helics/core/AsyncTimeCoordinator.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
Copyright (c) 2017-2022,
Battelle Memorial Institute; Lawrence Livermore National Security, LLC; Alliance for Sustainable
Energy, LLC. See the top-level NOTICE for additional details. All rights reserved.
SPDX-License-Identifier: BSD-3-Clause
*/
#pragma once

#include "ActionMessage.hpp"
#include "BaseTimeCoordinator.hpp"
#include "CoreFederateInfo.hpp"
#include "TimeDependencies.hpp"

#include "json/forwards.h"
#include <atomic>
#include <functional>
#include <string>
#include <utility>
#include <vector>

namespace helics {

/** class implementing a time coordinator that explicitly allows asynchronous operation of the
federates -- that is, the federates are not time synchronized and are allowed to operate ahead of
others. Potential uses are: 1) the use of an external time coordination mechanism, 2) a purely
command driven system, 3) all federates are operated in real time mode, 4) you don't care about
the data and just want to see how fast all the federates go on their own.

This time coordinator does only minimal time keeping for entry to execution mode and allows
asynchronous operation of the federation
*/
class AsyncTimeCoordinator: public BaseTimeCoordinator {
private:
// the variables for time coordination
Time currentMinTime{Time::minVal()};
TimeState currentTimeState{TimeState::initialized};
Time nextEvent{Time::maxVal()};

protected:
bool iterating{false}; //!< flag indicating that the min dependency is iterating

public:
AsyncTimeCoordinator() = default;

/** compute updates to time values
and send an update if needed
*/
virtual bool updateTimeFactors() override;

private:
void transmitTimingMessagesUpstream(ActionMessage& msg) const;
void transmitTimingMessagesDownstream(ActionMessage& msg,
GlobalFederateId skipFed = GlobalFederateId{}) const;

public:
/** check if entry to the executing state can be granted*/
virtual MessageProcessingResult
checkExecEntry(GlobalFederateId triggerFed = GlobalFederateId{}) override;

/** generate a string with the current time status*/
virtual std::string printTimeStatus() const override;
/** generate debugging time information*/
virtual void generateDebuggingTimeInfo(Json::Value& base) const override;

/** get the current next time*/
virtual Time getNextTime() const override { return currentMinTime; }
};
} // namespace helics
15 changes: 2 additions & 13 deletions src/helics/core/BasicHandleInfo.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,6 @@ SPDX-License-Identifier: BSD-3-Clause

namespace helics {

/** define extra flag definitions*/
enum handle_flag_definitions {
mapped_flag = extra_flag1,
/// indicator that an endpoint or message has a source filter
has_source_filter_flag = extra_flag2,
/// indicator that an endpoint or message has a destination filter
has_dest_filter_flag = extra_flag3,
/// indicator that the endpoint or filter has a destination filter that alters the message
has_non_cloning_dest_filter_flag = extra_flag4
};

/** class defining and capturing basic information about a handle*/
class BasicHandleInfo {
public:
Expand All @@ -49,8 +38,8 @@ class BasicHandleInfo {
LocalFederateId local_fed_id{}; //!< the local federate id of the handle
const InterfaceType handleType{InterfaceType::UNKNOWN}; //!< the type of the handle
bool used{false}; //!< indicator that the handle is being used to link with another federate
uint16_t flags{
0}; //!< flags corresponding to the flags used in ActionMessages +some extra ones
/// flags corresponding to the flags used in ActionMessages +some extra ones
uint16_t flags{0};

const std::string key; //!< the name of the handle
const std::string type; //!< the type of data used by the handle
Expand Down
24 changes: 23 additions & 1 deletion src/helics/core/BrokerBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ SPDX-License-Identifier: BSD-3-Clause

#include "../common/fmt_format.h"
#include "../common/logging.hpp"
#include "AsyncTimeCoordinator.hpp"
#include "ForwardingTimeCoordinator.hpp"
#include "GlobalTimeCoordinator.hpp"
#include "LogManager.hpp"
Expand Down Expand Up @@ -150,6 +151,24 @@ std::shared_ptr<helicsCLI11App> BrokerBase::generateBaseCLI()
"--globaltime",
globalTime,
"specify that the broker should use a globalTime coordinator to coordinate a master clock time with all federates");
hApp->add_flag(
"--asynctime",
asyncTime,
"specify that the federation should use the asynchronous time coordinator (only minimal time management is handled in HELICS and federates are allowed to operate independently)");
hApp->add_option_function<std::string>(
"--timing",
[this](const std::string& arg) {
if (arg == "async") {
asyncTime = true;
} else if (arg == "global") {
globalTime = true;
} else {
asyncTime = false;
globalTime = false;
}
},
"specify the timing method to use in the broker")
->check(CLI::IsMember({"async", "global", "distributed"}));
phlptp marked this conversation as resolved.
Show resolved Hide resolved
hApp->add_flag("--observer",
observer,
"specify that the broker/core should be added as an observer only");
Expand Down Expand Up @@ -295,7 +314,10 @@ void BrokerBase::configureBase()
uuid_like = true;
}
}
if (globalTime) {
if (asyncTime) {
timeCoord = std::make_unique<AsyncTimeCoordinator>();
hasTimeDependency = true;
} else if (globalTime) {
timeCoord = std::make_unique<GlobalTimeCoordinator>();
hasTimeDependency = true;
} else {
Expand Down
2 changes: 2 additions & 0 deletions src/helics/core/BrokerBase.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ class BrokerBase {
bool observer{false};
/// flag indicating that the broker should use a global time coordinator
bool globalTime{false};
/// flag indicating the use of async time keeping
bool asyncTime{false};

private:
/// flag indicating that the main processing loop is running
Expand Down
2 changes: 2 additions & 0 deletions src/helics/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ set(SRC_FILES
BaseTimeCoordinator.cpp
ForwardingTimeCoordinator.cpp
GlobalTimeCoordinator.cpp
AsyncTimeCoordinator.cpp
TimeDependencies.cpp
HandleManager.cpp
FilterInfo.cpp
Expand Down Expand Up @@ -69,6 +70,7 @@ set(INCLUDE_FILES
BaseTimeCoordinator.hpp
ForwardingTimeCoordinator.hpp
GlobalTimeCoordinator.hpp
AsyncTimeCoordinator.hpp
loggingHelper.hpp
GlobalFederateId.hpp
basic_CoreTypes.hpp
Expand Down
35 changes: 20 additions & 15 deletions src/helics/core/CommonCore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3048,8 +3048,11 @@ void CommonCore::processPriorityCommand(ActionMessage&& command)
if (checkActionFlag(command, slow_responding_flag)) {
timeoutMon->disableParentPing();
}
if (checkActionFlag(command, indicator_flag)) {
if (checkActionFlag(command, global_timing_flag)) {
globalTime = true;
if (checkActionFlag(command, async_timing_flag)) {
asyncTime = true;
}
}
timeoutMon->reset();
if (delayInitCounter < 0 && minFederateCount == 0 && minChildCount == 0) {
Expand Down Expand Up @@ -3843,20 +3846,22 @@ void CommonCore::connectFilterTiming()

auto fid = filterFedID.load();
if (globalTime) {
ActionMessage ad(CMD_ADD_DEPENDENT);
setActionFlag(ad, parent_flag);
ad.dest_id = fid;
ad.source_id = gRootBrokerID;
filterFed->handleMessage(ad);

ad.setAction(CMD_ADD_DEPENDENCY);
filterFed->handleMessage(ad);
clearActionFlag(ad, parent_flag);
setActionFlag(ad, child_flag);
ad.swapSourceDest();
transmit(parent_route_id, ad);
ad.setAction(CMD_ADD_DEPENDENT);
transmit(parent_route_id, ad);
if (!asyncTime) {
ActionMessage ad(CMD_ADD_DEPENDENT);
setActionFlag(ad, parent_flag);
ad.dest_id = fid;
ad.source_id = gRootBrokerID;
filterFed->handleMessage(ad);

ad.setAction(CMD_ADD_DEPENDENCY);
filterFed->handleMessage(ad);
clearActionFlag(ad, parent_flag);
setActionFlag(ad, child_flag);
ad.swapSourceDest();
transmit(parent_route_id, ad);
ad.setAction(CMD_ADD_DEPENDENT);
transmit(parent_route_id, ad);
}
} else {
if (timeCoord->addDependent(higher_broker_id)) {
ActionMessage add(CMD_ADD_INTERDEPENDENCY, global_broker_id_local, higher_broker_id);
Expand Down
Loading