From 74945f0044c7a789b71da8d2e57bc7fb52c0e471 Mon Sep 17 00:00:00 2001 From: Scott Hemmert Date: Wed, 25 Sep 2024 12:28:35 -0600 Subject: [PATCH] Initial implementation of parallel checkpointing. (#1132) * Initial implemenation of parallel checkpointing. * Removed use of std::filesystem until gcc9 is our minimum gcc version supported. * Remove #include for . * Fixed uninitialized variable causing spurious creation of checkpoint directories. * Some format and intialization fixes. --- .clang-format | 2 + src/sst/core/action.h | 2 + src/sst/core/activity.h | 3 + src/sst/core/checkpointAction.cc | 242 +++++++++++++++- src/sst/core/checkpointAction.h | 32 ++- src/sst/core/config.cc | 74 ++++- src/sst/core/config.h | 3 + src/sst/core/event.h | 2 + src/sst/core/link.cc | 267 +++++++++++++++--- src/sst/core/link.h | 4 + src/sst/core/main.cc | 84 +++++- src/sst/core/oneshot.h | 3 +- src/sst/core/realtime.cc | 2 + src/sst/core/realtime.h | 28 +- src/sst/core/realtimeAction.h | 14 +- src/sst/core/simulation.cc | 230 +++++++++++---- src/sst/core/simulation_impl.h | 27 +- src/sst/core/statapi/statoutput.cc | 2 + src/sst/core/sync/Makefile.inc | 3 +- src/sst/core/sync/rankSyncParallelSkip.cc | 81 ++---- src/sst/core/sync/rankSyncParallelSkip.h | 17 +- src/sst/core/sync/rankSyncSerialSkip.cc | 59 ++-- src/sst/core/sync/rankSyncSerialSkip.h | 17 +- src/sst/core/sync/syncManager.cc | 128 ++++----- src/sst/core/sync/syncManager.h | 54 ++-- src/sst/core/sync/syncQueue.cc | 26 +- src/sst/core/sync/syncQueue.h | 64 ++++- src/sst/core/sync/threadSyncDirectSkip.cc | 17 -- src/sst/core/sync/threadSyncDirectSkip.h | 5 +- src/sst/core/sync/threadSyncQueue.h | 60 ---- src/sst/core/sync/threadSyncSimpleSkip.cc | 22 +- src/sst/core/sync/threadSyncSimpleSkip.h | 5 +- .../coreTest_SharedObjectComponent.h | 3 + tests/subcomponent_tests/test_sc_2a.py | 13 + tests/subcomponent_tests/test_sc_2u.py | 13 + tests/subcomponent_tests/test_sc_2u2a.py | 13 + tests/subcomponent_tests/test_sc_2u2u.py | 14 +- tests/subcomponent_tests/test_sc_2ua.py | 13 + tests/subcomponent_tests/test_sc_2uu.py | 13 + tests/subcomponent_tests/test_sc_a.py | 13 + tests/subcomponent_tests/test_sc_u.py | 13 + tests/subcomponent_tests/test_sc_u2a.py | 13 + tests/subcomponent_tests/test_sc_u2u.py | 13 + tests/subcomponent_tests/test_sc_ua.py | 13 + tests/subcomponent_tests/test_sc_uu.py | 13 + tests/test_Checkpoint.py | 11 + tests/test_SharedObject.py | 6 + tests/testsuite_default_Checkpoint.py | 78 ++--- 48 files changed, 1317 insertions(+), 517 deletions(-) delete mode 100644 src/sst/core/sync/threadSyncQueue.h diff --git a/.clang-format b/.clang-format index 3addcde47..d330c075c 100644 --- a/.clang-format +++ b/.clang-format @@ -157,6 +157,7 @@ StatementMacros: - Q_UNUSED - QT_REQUIRE_VERSION - ImplementSerializable + - NotSerializable TabWidth: 8 UseCRLF: false UseTab: Never @@ -194,6 +195,7 @@ WhitespaceSensitiveMacros: - SST_ELI_REGISTER_SUBCOMPONENT - SST_ELI_REGISTER_SUBCOMPONENT_DERIVED - SST_ELI_REGISTER_SUBCOMPONENT_DERIVED_API + - SST_ELI_REGISTER_REALTIME_ACTION - STRINGIZE - PP_STRINGIZE - BOOST_PP_STRINGIZE diff --git a/src/sst/core/action.h b/src/sst/core/action.h index 69c7cb334..98f0c2b41 100644 --- a/src/sst/core/action.h +++ b/src/sst/core/action.h @@ -29,6 +29,8 @@ class Action : public Activity Action() {} ~Action() {} + bool isAction() final { return true; } + protected: /** Called to signal to the Simulation object to end the simulation */ void endSimulation(); diff --git a/src/sst/core/activity.h b/src/sst/core/activity.h index 5a87f5492..cb3f776c6 100644 --- a/src/sst/core/activity.h +++ b/src/sst/core/activity.h @@ -155,6 +155,9 @@ class Activity : public SST::Core::MemPoolItem /** Returns the queue order associated with this activity */ inline uint64_t getQueueOrder() const { return queue_order; } + virtual bool isEvent() { return false; } + virtual bool isAction() { return false; } + /** Get a string represenation of the event. The default version * will just use the name of the class, retrieved through the * cls_name() function inherited from the serialzable class, which diff --git a/src/sst/core/checkpointAction.cc b/src/sst/core/checkpointAction.cc index 0d883dd9a..2ebcb1198 100644 --- a/src/sst/core/checkpointAction.cc +++ b/src/sst/core/checkpointAction.cc @@ -15,11 +15,17 @@ #include "sst/core/component.h" #include "sst/core/mempoolAccessor.h" +#include "sst/core/objectComms.h" #include "sst/core/simulation_impl.h" #include "sst/core/stringize.h" #include "sst/core/timeConverter.h" #include "sst/core/warnmacros.h" +// #include +#include +#include + + #ifdef SST_CONFIG_HAVE_MPI DISABLE_WARN_MISSING_OVERRIDE #include @@ -38,13 +44,29 @@ CheckpointAction::CheckpointAction( next_sim_time_ = 0; last_cpu_time_ = 0; + // If period_ is set, then we have a simtime checkpoint period if ( period_ ) { + // Compute the first checkpoint time next_sim_time_ = (period_->getFactor() * (sim->getCurrentSimCycle() / period_->getFactor())) + period_->getFactor(); - sim->insertActivity(next_sim_time_, this); + + // If this is a serial job, we also need to insert this into + // the time TimeVortex. If it is parallel, then the + // CheckpointAction is managed by the SyncManager. + RankInfo num_ranks = sim->getNumRanks(); + if ( num_ranks.rank == 1 && num_ranks.thread == 1 ) { sim->insertActivity(next_sim_time_, this); } + } + else { + next_sim_time_ = MAX_SIMTIME_T; } if ( (0 == this_rank.rank) ) { last_cpu_time_ = sst_get_cpu_time(); } + // Set the priority to be the same as the SyncManager so that + // checkpointing happens in the same place for both serial and + // parallel runs. We will never have both a SyncManager and a + // CheckpointAction in the TimeVortex at the same time since the + // SyncManager manages the CheckpointAction on parallel runs. + setPriority(SYNCPRIORITY); } CheckpointAction::~CheckpointAction() {} @@ -62,7 +84,6 @@ CheckpointAction::execute(void) void CheckpointAction::createCheckpoint(Simulation_impl* sim) { - if ( 0 == rank_.rank ) { const double now = sst_get_cpu_time(); sim->getSimulationOutput().output( @@ -72,16 +93,111 @@ CheckpointAction::createCheckpoint(Simulation_impl* sim) last_cpu_time_ = now; } - sim->checkpoint(); + // Need to create a directory for this checkpoint + std::string prefix = sim->checkpoint_prefix_; + std::string basename = prefix + "_" + std::to_string(checkpoint_id) + "_" + std::to_string(sim->currentSimCycle); + + // Directory is shared across threads. Make it a static and make + // sure we barrier in the right places + static std::string directory; + + // Only thread 0 will participate in setup + if ( rank_.thread == 0 ) { + // Rank 0 will create the directory for this checkpoint + if ( rank_.rank == 0 ) { + directory = Checkpointing::createUniqueDirectory(sim->checkpoint_directory_ + "/" + basename); +#ifdef SST_CONFIG_HAVE_MPI + Comms::broadcast(directory, 0); +#endif + } + else { + // Get directory name (really just a barrier since each + // rank already knows the name and it shouldn't have to + // create a unique one) +#ifdef SST_CONFIG_HAVE_MPI + Comms::broadcast(directory, 0); +#endif + } + } + barrier.wait(); + if ( rank_.thread == 0 ) checkpoint_id++; + + std::string filename = + directory + "/" + basename + "_" + std::to_string(rank_.rank) + "_" + std::to_string(rank_.thread) + ".bin"; + + // Write out the checkpoints for the partitions + sim->checkpoint(filename); + + // Write out the registry. Rank 0 thread 0 will write the global + // state and its registry, then each thread will take a turn + // writing its part of the registry + RankInfo num_ranks = sim->getNumRanks(); + + std::string registry_name = directory + "/" + basename + ".sstcpt"; + + if ( rank_.rank == 0 && rank_.thread == 0 ) { + // Need to write out the globals + std::string globals_name = directory + "/" + basename + "_globals.bin"; + sim->checkpoint_write_globals(checkpoint_id - 1, registry_name, globals_name); + } + // No need to barrier here since rank 0 thread 0 will be the first + // to execute in the loop below and everything else will wait + + for ( uint32_t r = 0; r < num_ranks.rank; ++r ) { + if ( r == rank_.rank ) { + // If this is my rank go ahead + for ( uint32_t t = 0; t < num_ranks.thread; ++t ) { + // If this is my thread go ahead + if ( t == rank_.thread ) { + sim->checkpoint_append_registry(registry_name, filename); + barrier.wait(); + } + else { + barrier.wait(); + } + } +#ifdef SST_CONFIG_HAVE_MPI + if ( rank_.thread == 0 ) { + MPI_Barrier(MPI_COMM_WORLD); + barrier.wait(); + } + else { + barrier.wait(); + } +#endif + } + else { +#ifdef SST_CONFIG_HAVE_MPI + // If this isn't my rank, just barrier and wait until my + // turn + if ( rank_.thread == 0 ) { + MPI_Barrier(MPI_COMM_WORLD); + barrier.wait(); + } + else { + barrier.wait(); + } +#endif + } + } } -void -CheckpointAction::check() +SimTime_t +CheckpointAction::check(SimTime_t current_time) { - // TODO add logic for simulation-interval checkpoints in parallel - Simulation_impl* sim = Simulation_impl::getSimulation(); - if ( generate_ ) { createCheckpoint(sim); } - generate_ = false; + // The if-logic is a little weird, but it's trying to minimize the + // number of branches in the normal case of no checkpoint being + // initiated. This will also handle the case where both a sim and + // real-time trigger happened at the same time + if ( (current_time == next_sim_time_) || generate_ ) { + Simulation_impl* sim = Simulation_impl::getSimulation(); + createCheckpoint(sim); + generate_ = false; + // Only add to the simulation-interval checkpoint time if it + // was what triggered this + if ( current_time == next_sim_time_ ) { next_sim_time_ += period_->getFactor(); } + } + return next_sim_time_; } void @@ -96,4 +212,112 @@ CheckpointAction::getNextCheckpointSimTime() return next_sim_time_; } +Core::ThreadSafe::Barrier CheckpointAction::barrier; +uint32_t CheckpointAction::checkpoint_id = 0; + +namespace Checkpointing { + +/** + Function to see if a directory exists. We need this bacause + std::filesystem isn't fully supported until GCC9. + + @param dirName name of directory to check + + @param include_files will also return true if a filename matches + the input name + + @return true if dirName is a directory in the filesystem. If + include_files is set to true, this will also return true if a file + name matches the passed in name (i.e. the name is available for + use) + +*/ +bool +doesDirectoryExist(const std::string& dirName, bool include_files) +{ + struct stat info; + if ( stat(dirName.c_str(), &info) != 0 ) { + // Directory does not exist + return false; + } + else if ( info.st_mode & S_IFDIR ) { + // Directory exists + return true; + } + else { + // Exists but it's a file, so return include_files + return include_files; + } +} + +/** + Function to create a directory. We need this bacause + std::filesystem isn't fully supported until GCC9 +*/ +bool +createDirectory(const std::string& dirName) +{ + if ( mkdir(dirName.c_str(), 0755) == 0 ) { + return true; // Directory created successfully + } + else { + return false; // Failed to create directory + } +} + +std::string +createUniqueDirectory(const std::string basename) +{ + std::string dirName = basename; + + // Check if the directory exists + // if ( std::filesystem::exists(dirName) ) { + if ( doesDirectoryExist(dirName, true) ) { + // Append a unique random set of characters to the directory name + std::string newDirName; + int num = 0; + do { + ++num; + newDirName = dirName + "_" + std::to_string(num); + // } while ( std::filesystem::exists(newDirName) ); // Ensure the new directory name is unique + } while ( doesDirectoryExist(newDirName, true) ); // Ensure the new directory name is unique + + dirName = newDirName; + } + + // Create the directory + // if ( !std::filesystem::create_directory(dirName) ) { + if ( !createDirectory(dirName) ) { + Simulation_impl::getSimulationOutput().fatal( + CALL_INFO_LONG, 1, "Failed to create directory: %s\n", dirName.c_str()); + } + return dirName; +} + +void +removeDirectory(const std::string UNUSED(name)) +{ + // Implement when adding logic to keep only N checkpoints +} + +std::string +initializeCheckpointInfrastructure(Config* cfg, bool rt_can_ckpt, int myRank) +{ + ////// Check to see if checkpointing is enabled ////// + if ( !cfg->canInitiateCheckpoint() && !rt_can_ckpt ) return ""; + + std::string checkpoint_dir_name = ""; + + if ( myRank == 0 ) { checkpoint_dir_name = createUniqueDirectory(cfg->checkpoint_prefix()); } +#ifdef SST_CONFIG_HAVE_MPI + // Broadcast the directory name + Comms::broadcast(checkpoint_dir_name, 0); +#endif + + return checkpoint_dir_name; +} + + +} // namespace Checkpointing + } // namespace SST diff --git a/src/sst/core/checkpointAction.h b/src/sst/core/checkpointAction.h index 71b6e9e58..6b93dec50 100644 --- a/src/sst/core/checkpointAction.h +++ b/src/sst/core/checkpointAction.h @@ -18,6 +18,7 @@ #include "sst/core/output.h" #include "sst/core/rankInfo.h" #include "sst/core/sst_types.h" +#include "sst/core/threadsafe.h" #include @@ -26,6 +27,32 @@ namespace SST { class Simulation_impl; class TimeConverter; +namespace Checkpointing { +/* Utility functions needed to manage directories */ + +/** + Creates a directory of the specified basename. If a directory named + basename already exists, it will append an _N to the end, + incrementing N from 1 until it finds an unused name. + */ +std::string createUniqueDirectory(const std::string basename); + +/** + Removes a directory. For safety, this will recurse and remove each + file individually instead of issuing an rm -r. It will not follow + links, but will simply remove the link. + */ +void removeDirectory(const std::string name); + +/** + Initializes the infrastructure needed for checkpointing. Uses the + createUniqueDirectory() function to create the directory, then + broadcasts the name to all ranks. + */ +std::string initializeCheckpointInfrastructure(Config* cfg, bool rt_can_ckpt, int myRank); + +} // namespace Checkpointing + /** \class CheckpointAction A recurring event to trigger checkpoint generation @@ -46,11 +73,14 @@ class CheckpointAction : public Action void execute(void) override; /** Called by SyncManager to check whether a checkpoint should be generated */ - void check(); + SimTime_t check(SimTime_t current_time); /** Return next checkpoint time */ SimTime_t getNextCheckpointSimTime(); + static Core::ThreadSafe::Barrier barrier; + static uint32_t checkpoint_id; + NotSerializable(SST::CheckpointAction); private: diff --git a/src/sst/core/config.cc b/src/sst/core/config.cc index e50bc6522..62edb7ae9 100644 --- a/src/sst/core/config.cc +++ b/src/sst/core/config.cc @@ -614,10 +614,68 @@ class ConfigHelper // Set the prefix for checkpoint files static int setCheckpointPrefix(Config* cfg, const std::string& arg) { + if ( arg == "" ) { + fprintf(stderr, "Error, checkpoint-prefix must not be an empty string\n"); + return -1; + } cfg->checkpoint_prefix_ = arg; return 0; } + static std::string getCheckpointPrefixExtHelp() + { + std::string msg = "Checkpointing:\n\n"; + msg.append("The checkpoint prefix is used in the naming of the directories and files created by the " + "checkpoint engine. If no checkpoint prefix is set, sst will simply use \"checkpoint\"." + "In the following explanation, will be used to represent the " + "prefix set with the --checkpoint-prefix option. On sst start, the checkpoint engine will " + "create a directory with the name to hold all the checkpoint files. If " + "already exists, the it will append _N, where N starts at 1 and increases by one until a " + "directory name that doesn't already exist is reached (i.e. _1, _2, etc.).\n"); + + msg.append("\nWithin the checkpoint directory, each checkpoint will create its own subdirectory with " + "the form __, where checkpoint_id starts at 0 and " + "increments by one for each checkpoint. Within this directory, there are three types of " + "files:\n\n"); + + msg.append("Registry file: The file containes a list of some " + "of the global parameters from the sst run, followed by a list of all other files that " + "are a part of the checkpoint. The two files, described below, are the globals file and " + "the serialized data from each of the threads in the simulation. After each of the serialized " + "data files, each Component that was in that partition is listed, along with its offset to the " + "location in the file for the Components serialized data. this file is named the same as the " + "directory with a .sstcpt extension:\n" + " __.sstcpt.\n\n"); + + msg.append("Globals file: This contains the serialized binary data needed at sst startup time that is " + "needed by all partitions. This file is named:\n" + " ___globals.bin\n\n"); + + msg.append("Serialized data files: these are the files that hold all of the data for each thread of " + "execution in the original run. The files are named by rank:\n" + " ____.bin\n\n"); + + msg.append("A sample directory structure using a checkpoint prefix of \"checkpoint\" using two ranks " + "with one thread each would look something like:\n\n" + "current working directory\n" + "|--checkpoint\n" + " |--checkpoint_0_1000\n" + " |--checkpoint_0_1000.sstcpt\n" + " |--checkpoint_0_1000_globals.bin\n" + " |--checkpoint_0_1000_0_0.bin\n" + " |--checkpoint_0_1000_1_0.bin\n" + " |--checkpoint_1_2000\n" + " |--checkpoint_1_2000.sstcpt\n" + " |--checkpoint_1_2000_globals.bin\n" + " |--checkpoint_1_2000_0_0.bin\n" + " |--checkpoint_1_2000_1_0.bin\n\n"); + + msg.append("When restarting from a checkpoint, the registry file (*.sstcpt) should be specified as the " + "input file.\n"); + + return msg; + } + // Advanced options - environment // Disable signal handlers @@ -993,9 +1051,12 @@ Config::insertOptions() "load-checkpoint", 0, "Load checkpoint and continue simulation. Specified SDL file will be used as the checkpoint file.", std::bind(&ConfigHelper::setLoadFromCheckpoint, this, _1), false); - DEF_ARG( - "checkpoint-prefix", 0, "PREFIX", "Set prefix for checkpoint filenames.", - std::bind(&ConfigHelper::setCheckpointPrefix, this, _1), true); + DEF_ARG_EH( + "checkpoint-prefix", 0, "PREFIX", + "Set prefix for checkpoint filenames. The checkpoint prefix defaults to checkpoint if this option is not set " + "and checkpointing is enabled.", + std::bind(&ConfigHelper::setCheckpointPrefix, this, _1), std::bind(&ConfigHelper::getCheckpointPrefixExtHelp), + true); enableDashDashSupport(std::bind(&ConfigHelper::setModelOptions, this, _1)); addPositionalCallback(std::bind(&Config::positionalCallback, this, _1, _2)); @@ -1072,5 +1133,12 @@ Config::setOptionFromModel(const string& entryName, const string& value) return false; } +bool +Config::canInitiateCheckpoint() +{ + if ( checkpoint_wall_period_ != 0 ) return true; + if ( checkpoint_sim_period_ != "" ) return true; + return false; +} } // namespace SST diff --git a/src/sst/core/config.h b/src/sst/core/config.h index 19f7f6815..294d911b8 100644 --- a/src/sst/core/config.h +++ b/src/sst/core/config.h @@ -372,6 +372,9 @@ class Config : public ConfigShared, public SST::Core::Serialization::serializabl // executes. + /** Get whether or not any of the checkpoint options were turned on */ + bool canInitiateCheckpoint(); + /** Print to stdout the current configuration */ void print(); diff --git a/src/sst/core/event.h b/src/sst/core/event.h index ee46b08d2..92b694d98 100644 --- a/src/sst/core/event.h +++ b/src/sst/core/event.h @@ -119,6 +119,8 @@ class Event : public Activity #endif + bool isEvent() final { return true; } + void serialize_order(SST::Core::Serialization::serializer& ser) override { Activity::serialize_order(ser); diff --git a/src/sst/core/link.cc b/src/sst/core/link.cc index 32b35bde7..28b0046d5 100644 --- a/src/sst/core/link.cc +++ b/src/sst/core/link.cc @@ -20,6 +20,8 @@ #include "sst/core/profile/eventHandlerProfileTool.h" #include "sst/core/simulation_impl.h" #include "sst/core/ssthandler.h" +#include "sst/core/sync/syncManager.h" +#include "sst/core/sync/syncQueue.h" #include "sst/core/timeConverter.h" #include "sst/core/timeLord.h" #include "sst/core/timeVortex.h" @@ -40,6 +42,7 @@ SST::Core::Serialization::serialize_impl::operator()(Link*& s, SST::Core: // 0 - nullptr // 1 - Link // 2 - SelfLink + // 3 - Sync Link Pair int16_t type; switch ( ser.mode() ) { @@ -51,6 +54,7 @@ SST::Core::Serialization::serialize_impl::operator()(Link*& s, SST::Core: ser& type; return; } + self_link = (s == s->pair_link); if ( self_link ) { type = 2; @@ -76,22 +80,23 @@ SST::Core::Serialization::serialize_impl::operator()(Link*& s, SST::Core: ser & s->tag; // Profile tools not yet supported // ser & s->profile_tools; + return; } - else { - // Regular link - type = 1; - ser& type; - // Need to put a uintptr_t in for my pointer and my pair's - // pointer. This will be used to identify link - // connections on restart + // Check to see if this is a SYNC link pair. If so, we will + // serialize all the info together so we can do the + // registerLink() calls on deserialization. The serialization + // call will always be on the non-sync link of the pair. + if ( s->pair_link->type == Link::SYNC ) { + type = 3; + ser& type; // MULTI-PARELLEL RESTART: When supporting different // restart parallelism, will also need to store the rank // of the links in order to have unique identifies for - // each link. For pair links on another rank, we will - // use the delivery_info field as the pointer part of the - // tag (this is the uintptr_t representation of the link + // each link. For pair links on another rank, we will use + // the delivery_info field as the pointer part of the tag + // (this is the uintptr_t representation of the link // pointer on the remote rank). This will also require // that the remote rank be stored somewhere in the link // object. The most likely place is in the tag, since @@ -99,6 +104,80 @@ SST::Core::Serialization::serialize_impl::operator()(Link*& s, SST::Core: // connected to a sync object (No ordering in the // SyncQueue and the real tag will be added on the remote // side). + + // No need to keep pointers as tags since we'll have all + // the data in the serialization stream + + // Store info for the non-sync link + ser & s->type; + ser & s->mode; + ser & s->tag; + + if ( s->type == Link::POLL ) { + // If I'm a polling link, I need to serialize my + // pair's send_queue. For HANDLER and SYNC links, the + // send_queue will be reinitialized after restart + PollingLinkQueue* queue = dynamic_cast(s->pair_link->send_queue); + ser& queue; + } + + // Now serialize the handler + Event::HandlerBase* handler = reinterpret_cast(s->pair_link->delivery_info); + + // Need to serialize both the uintptr_t + // (delivery_info) and the pointer because we'll need + // the numerical value of the pointer as a tag when + // restarting. + ser & s->pair_link->delivery_info; + ser& handler; + + ser & s->defaultTimeBase; + ser & s->latency; + + // Store data for sync link + + // We'll need the pointer of the synclink in order to + // generate a globally unique name for + // SyncManager::registerLink(). + uintptr_t ptr = reinterpret_cast(s->pair_link); + ser& ptr; + + ser & s->pair_link->type; + ser & s->pair_link->mode; + ser & s->pair_link->tag; + + // No need to store queue info, it will be reintialized by + // the registerLink() call on restart + + // Just put in the delivery_info directly, this is the + // pointer to the link on the other parition + ser & s->delivery_info; + + // Need to store my rank info and the rank info for + // the other partition. This information will be used + // to create a unique name for connecting things on + // restart + RankInfo ri = Simulation_impl::getSimulation()->getRank(); + ser& ri; + + // Get the remote rank from my pairs send queue (which + // is a sync queue) + SyncQueue* q = dynamic_cast(s->send_queue); + ri = q->getToRank(); + ser& ri; + + ser & s->pair_link->defaultTimeBase; + ser & s->pair_link->latency; + } + else { + // Regular link + type = 1; + ser& type; + + // Need to put a uintptr_t in for my pointer and my pair's + // pointer. This will be used to identify link + // connections on restart + uintptr_t ptr = reinterpret_cast(s); ser& ptr; ptr = reinterpret_cast(s->pair_link); @@ -118,33 +197,24 @@ SST::Core::Serialization::serialize_impl::operator()(Link*& s, SST::Core: ser& queue; } - // My delivery_info is stored in my pair_link. - // pair_link->delivery_info is an Event::Handler if the - // link type is Handler, and is a pointer to the remote - // link if it's SYNC - if ( s->type == Link::SYNC ) { - // Just put in the delivery_info directly - ser & s->pair_link->delivery_info; - } - else { - // My handler is stored in my pair_link + // My handler is stored in pair_link->delivery_info - // First serialize the pointer tag so we can fix - // things up after restart + // First serialize the pointer tag so we can fix + // things up after restart - // Now serialize the handler - Event::HandlerBase* handler = reinterpret_cast(s->pair_link->delivery_info); + // Now serialize the handler + Event::HandlerBase* handler = reinterpret_cast(s->pair_link->delivery_info); - // Need to serialize both the uintptr_t - // (delivery_info) and the pointer because we'll need - // the numerical value of the pointer as a tag when - // restarting. - ser & s->pair_link->delivery_info; - ser& handler; - } + // Need to serialize both the uintptr_t + // (delivery_info) and the pointer because we'll need + // the numerical value of the pointer as a tag when + // restarting. + ser & s->pair_link->delivery_info; + ser& handler; ser & s->defaultTimeBase; ser & s->latency; + // s->pair_link - tag stored above // s->current_time is automatically set on construction so // no need to serialize @@ -192,6 +262,78 @@ SST::Core::Serialization::serialize_impl::operator()(Link*& s, SST::Core: s->send_queue = Simulation_impl::getSimulation()->getTimeVortex(); } + else if ( type == 3 ) { + // Sync link + + // Need to create both links in the pair + s = new Link(); + ser.report_new_pointer(reinterpret_cast(s)); + + Link* pair_link = new Link(); + s->pair_link = pair_link; + pair_link->pair_link = s; + + // Get data for non-sync link + ser & s->type; + ser & s->mode; + ser & s->tag; + + // Get the send_queue. It goes in my pair_link + if ( s->type == Link::POLL ) { + // If I'm a polling link, need to deserialize my + // pair's send_queue. For now, I will store it in my + // own send_queue variable and swap once we have both + // links. + PollingLinkQueue* queue; + ser& queue; + pair_link->send_queue = queue; + } + else { + pair_link->send_queue = Simulation_impl::getSimulation()->getTimeVortex(); + } + + // Get delivery_info (handler). It goes in my pair link + uintptr_t delivery_info; + ser& delivery_info; + + Event::HandlerBase* handler; + ser& handler; + pair_link->delivery_info = reinterpret_cast(handler); + + Simulation_impl::getSimulation()->event_handler_restart_tracking[delivery_info] = pair_link->delivery_info; + + // Get defaultTimeBase and latency + ser & s->defaultTimeBase; + ser & s->latency; + + // Now get data from the sync side of the link + uintptr_t my_tag; + ser& my_tag; + + ser & pair_link->type; + ser & pair_link->mode; + ser & pair_link->tag; + + // Get the delivery info for the sync. This is the + // pointer to the link on the remote partition + ser& delivery_info; + + + RankInfo local_rank; + RankInfo remote_rank; + ser& local_rank; + ser& remote_rank; + + // Need to reregister with the SyncManager, but first need to create a unique name + std::string uname = s->createUniqueGlobalLinkName(local_rank, my_tag, remote_rank, delivery_info); + ActivityQueue* sync_q = + Simulation_impl::getSimulation()->syncManager->registerLink(remote_rank, local_rank, uname, pair_link); + // SyncQueue goes in the non-sync link + s->send_queue = sync_q; + + ser & pair_link->defaultTimeBase; + ser & pair_link->latency; + } else { // Regular link @@ -240,17 +382,15 @@ SST::Core::Serialization::serialize_impl::operator()(Link*& s, SST::Core: s->send_queue = Simulation_impl::getSimulation()->getTimeVortex(); } - if ( s->type == Link::SYNC ) { ser & s->delivery_info; } - else { - uintptr_t delivery_info; - ser& delivery_info; + uintptr_t delivery_info; + ser& delivery_info; - Event::HandlerBase* handler; - ser& handler; - s->delivery_info = reinterpret_cast(handler); + Event::HandlerBase* handler; + ser& handler; + s->delivery_info = reinterpret_cast(handler); + + Simulation_impl::getSimulation()->event_handler_restart_tracking[delivery_info] = s->delivery_info; - Simulation_impl::getSimulation()->event_handler_restart_tracking[delivery_info] = s->delivery_info; - } // If we have a pair link already, swap delivery_info and // send_queue @@ -268,10 +408,6 @@ SST::Core::Serialization::serialize_impl::operator()(Link*& s, SST::Core: ser & s->defaultTimeBase; ser & s->latency; - // s->pair_link taken care of above - // s->current_time is automatically set on construction so - // no need to serialize - // Profile tools not yet supported // ser & s->profile_tools; } @@ -600,6 +736,49 @@ Link::getDefaultTimeBase() const return Simulation_impl::getSimulation()->getTimeLord()->getTimeConverter(defaultTimeBase); } +std::string +Link::createUniqueGlobalLinkName(RankInfo local_rank, uintptr_t local_ptr, RankInfo remote_rank, uintptr_t remote_ptr) +{ + std::stringstream ss; + + uint32_t high_rank; + uint32_t low_rank; + uintptr_t high_ptr; + uintptr_t low_ptr; + if ( local_rank.rank > remote_rank.rank ) { + high_rank = local_rank.rank; + high_ptr = local_ptr; + low_rank = remote_rank.rank; + low_ptr = remote_ptr; + } + else if ( remote_rank.rank > local_rank.rank ) { + high_rank = remote_rank.rank; + high_ptr = remote_ptr; + low_rank = local_rank.rank; + low_ptr = local_ptr; + } + else { // Ranks are the same + high_rank = remote_rank.rank; + low_rank = high_rank; + if ( local_ptr > remote_ptr ) { + high_ptr = local_ptr; + low_ptr = remote_ptr; + } + else { + high_ptr = remote_ptr; + low_ptr = local_ptr; + } + } + + // Convert each parameter to hexadecimal and concatenate + ss << std::hex << std::setw(8) << std::setfill('0') << low_rank << "-" << std::hex + << std::setw(sizeof(uintptr_t) * 2) << std::setfill('0') << low_ptr << "-" << std::hex << std::setw(8) + << std::setfill('0') << high_rank << "-" << std::hex << std::setw(sizeof(uintptr_t) * 2) << std::setfill('0') + << high_ptr; + + return ss.str(); +} + void Link::addProfileTool(SST::Profile::EventHandlerProfileTool* tool, const EventHandlerMetaData& mdata) { diff --git a/src/sst/core/link.h b/src/sst/core/link.h index 9c0ea0530..70937d5a8 100644 --- a/src/sst/core/link.h +++ b/src/sst/core/link.h @@ -280,6 +280,10 @@ class alignas(64) Link void finalizeConfiguration(); void prepareForComplete(); + std::string + createUniqueGlobalLinkName(RankInfo local_rank, uintptr_t local_ptr, RankInfo remote_rank, uintptr_t remote_ptr); + + void addProfileTool(SST::Profile::EventHandlerProfileTool* tool, const EventHandlerMetaData& mdata); diff --git a/src/sst/core/main.cc b/src/sst/core/main.cc index fe93a04ea..cb3aadf7e 100644 --- a/src/sst/core/main.cc +++ b/src/sst/core/main.cc @@ -29,6 +29,7 @@ REENABLE_WARNING #endif #include "sst/core/activity.h" +#include "sst/core/checkpointAction.h" #include "sst/core/config.h" #include "sst/core/configGraph.h" #include "sst/core/cputimer.h" @@ -445,6 +446,26 @@ start_simulation(uint32_t tid, SimThreadInfo_t& info, Core::ThreadSafe::Barrier& SST::Simulation_impl* sim = Simulation_impl::createSimulation(info.config, info.myRank, info.world_size, restart); double start_run = 0.0; + // Setup the real time actions (all of these have to be defined on + // the command-line or SDL file, they will not be checkpointed and + // restored + sim->setupSimActions(info.config); + + // Thread zero needs to initialize the checkpoint data structures + // if any checkpointing options were turned on. This will return + // an empty string if checkpointing was not enabled. + + if ( tid == 0 ) { + sim->checkpoint_directory_ = Checkpointing::initializeCheckpointInfrastructure( + info.config, sim->real_time_->canInitiateCheckpoint(), info.myRank.rank); + + if ( sim->checkpoint_directory_ != "" ) { + // Write out any data structures needed for all checkpoints + } + } + // Wait for all checkpointing files to be initialzed + barrier.wait(); + if ( !restart ) { double start_build = sst_get_cpu_time(); @@ -520,8 +541,6 @@ start_simulation(uint32_t tid, SimThreadInfo_t& info, Core::ThreadSafe::Barrier& CALL_INFO, 1, 0, "# Start time: %04u/%02u/%02u at: %02u:%02u:%02u\n", (now->tm_year + 1900), (now->tm_mon + 1), now->tm_mday, now->tm_hour, now->tm_min, now->tm_sec); } - // g_output.output("info.config.stopAtCycle = %s\n",info.config->stopAtCycle.c_str()); - sim->setupSimActions(info.config); if ( tid == 0 && info.world_size.rank > 1 ) { // If we are a MPI_parallel job, need to makes sure that all used @@ -549,6 +568,7 @@ start_simulation(uint32_t tid, SimThreadInfo_t& info, Core::ThreadSafe::Barrier& Comms::broadcast(lib_names, 0); Factory::getFactory()->loadUnloadedLibraries(lib_names); + #endif } barrier.wait(); @@ -573,6 +593,12 @@ start_simulation(uint32_t tid, SimThreadInfo_t& info, Core::ThreadSafe::Barrier& // Finish parsing checkpoint for restart sim->restart(info.config); + barrier.wait(); + + if ( info.myRank.thread == 0 ) { sim->exchangeLinkInfo(); } + + barrier.wait(); + start_run = sst_get_cpu_time(); info.build_time = start_run - start_build; } @@ -694,14 +720,10 @@ main(int argc, char* argv[]) // If restarting, update config from checkpoint uint32_t cpt_num_threads, cpt_num_ranks; if ( restart ) { - size_t size; - char* buffer; - + // Need to open the registry file if ( cfg.checkConfigFile() == false ) { return -1; /* checkConfigFile provides error message */ } - SST::Core::Serialization::serializer ser; - ser.enable_pointer_tracking(); - std::ifstream fs(cfg.configFile(), std::ios::binary); + std::ifstream fs(cfg.configFile()); if ( !fs.is_open() ) { if ( fs.bad() ) { fprintf(stderr, "Unable to open checkpoint file [%s]: badbit set\n", cfg.configFile().c_str()); @@ -715,9 +737,48 @@ main(int argc, char* argv[]) return -1; } - fs.read(reinterpret_cast(&size), sizeof(size)); + std::string line; + + // Look for the line that has the global data file + std::string globals_filename; + std::string search_str("** (globals): "); + while ( std::getline(fs, line) ) { + // Look for lines starting with "** (globals):", then get the filename. + size_t pos = line.find(search_str); + if ( pos == 0 ) { + // Get the file name + globals_filename = line.substr(search_str.length()); + break; + } + } + fs.close(); + + // Need to open the globals file + std::ifstream fs_globals(globals_filename); + if ( !fs_globals.is_open() ) { + if ( fs_globals.bad() ) { + fprintf(stderr, "Unable to open checkpoint globals file [%s]: badbit set\n", globals_filename.c_str()); + return -1; + } + if ( fs_globals.fail() ) { + fprintf( + stderr, "Unable to open checkpoint globals file [%s]: %s\n", globals_filename.c_str(), + strerror(errno)); + return -1; + } + fprintf(stderr, "Unable to open checkpoint globals file [%s]: unknown error\n", globals_filename.c_str()); + return -1; + } + + size_t size; + char* buffer; + + SST::Core::Serialization::serializer ser; + ser.enable_pointer_tracking(); + + fs_globals.read(reinterpret_cast(&size), sizeof(size)); buffer = new char[size]; - fs.read(buffer, size); + fs_globals.read(buffer, size); std::string cpt_lib_path; std::string cpt_timebase; @@ -744,7 +805,7 @@ main(int argc, char* argv[]) ser& cpt_params_key_map_reverse; ser& cpt_params_next_key_id; - fs.close(); + fs_globals.close(); delete[] buffer; // Error check that ranks & threads match after output is created @@ -1019,6 +1080,7 @@ main(int argc, char* argv[]) Simulation_impl::factory = factory; Simulation_impl::sim_output = g_output; Simulation_impl::resizeBarriers(world_size.thread); + CheckpointAction::barrier.resize(world_size.thread); #ifdef USE_MEMPOOL MemPoolAccessor::initializeGlobalData(world_size.thread, cfg.cache_align_mempools()); #endif diff --git a/src/sst/core/oneshot.h b/src/sst/core/oneshot.h index 2a6f6e22f..1537274cb 100644 --- a/src/sst/core/oneshot.h +++ b/src/sst/core/oneshot.h @@ -80,7 +80,8 @@ class OneShot : public Action NotSerializable(SST::OneShot) - private : typedef std::vector HandlerList_t; +private: + typedef std::vector HandlerList_t; // Since this only gets fixed latency events, the times will fire // in order of arrival. No need to use a full map, a double ended diff --git a/src/sst/core/realtime.cc b/src/sst/core/realtime.cc index e803663d9..0e311cba4 100644 --- a/src/sst/core/realtime.cc +++ b/src/sst/core/realtime.cc @@ -537,6 +537,7 @@ void RealTimeManager::registerSignal(RealTimeAction* action, int signum) { signal_actions_.insert(std::make_pair(signum, action)); + if ( action->canInitiateCheckpoint() ) can_checkpoint_ = true; } void @@ -548,6 +549,7 @@ RealTimeManager::registerInterval(uint32_t interval, RealTimeAction* action) } static_cast(signal_actions_[SIGALRM])->addIntervalAction(interval, action); + if ( action->canInitiateCheckpoint() ) can_checkpoint_ = true; } void diff --git a/src/sst/core/realtime.h b/src/sst/core/realtime.h index afc69a8a0..df5d8e180 100644 --- a/src/sst/core/realtime.h +++ b/src/sst/core/realtime.h @@ -35,7 +35,7 @@ class ExitCleanRealTimeAction : public RealTimeAction { public: SST_ELI_REGISTER_REALTIMEACTION( - ExitCleanRealTimeAction, "sst", "exit.clean", SST_ELI_ELEMENT_VERSION(0, 1, 0), + ExitCleanRealTimeAction, "sst", "rt.exit.clean", SST_ELI_ELEMENT_VERSION(0, 1, 0), "Signal handler that causes an immediate, but non-emergency shutdown. This is the default action for the " "'--exit-after' option.") @@ -49,8 +49,9 @@ class ExitEmergencyRealTimeAction : public RealTimeAction { public: SST_ELI_REGISTER_REALTIMEACTION( - ExitEmergencyRealTimeAction, "sst", "exit.emergency", SST_ELI_ELEMENT_VERSION(0, 1, 0), + ExitEmergencyRealTimeAction, "sst", "rt.exit.emergency", SST_ELI_ELEMENT_VERSION(0, 1, 0), "Signal handler that causes an emergency shutdown. This is the default action for SIGTERM and SIGINT.") + ExitEmergencyRealTimeAction(); virtual void execute() override; }; @@ -60,8 +61,9 @@ class CoreStatusRealTimeAction : public RealTimeAction { public: SST_ELI_REGISTER_REALTIMEACTION( - CoreStatusRealTimeAction, "sst", "status.core", SST_ELI_ELEMENT_VERSION(0, 1, 0), + CoreStatusRealTimeAction, "sst", "rt.status.core", SST_ELI_ELEMENT_VERSION(0, 1, 0), "Signal handler that causes SST-Core to print its status. This is the default action for SIGUSR1.") + CoreStatusRealTimeAction(); void execute() override; }; @@ -71,9 +73,10 @@ class ComponentStatusRealTimeAction : public RealTimeAction { public: SST_ELI_REGISTER_REALTIMEACTION( - ComponentStatusRealTimeAction, "sst", "status.all", SST_ELI_ELEMENT_VERSION(0, 1, 0), + ComponentStatusRealTimeAction, "sst", "rt.status.all", SST_ELI_ELEMENT_VERSION(0, 1, 0), "Signal handler that causes SST-Core to print its status along with component status. This is the default " "action for SIGUSR2.") + ComponentStatusRealTimeAction(); void execute() override; }; @@ -83,12 +86,15 @@ class CheckpointRealTimeAction : public RealTimeAction { public: SST_ELI_REGISTER_REALTIMEACTION( - CheckpointRealTimeAction, "sst", "checkpoint", SST_ELI_ELEMENT_VERSION(0, 1, 0), + CheckpointRealTimeAction, "sst", "rt.checkpoint", SST_ELI_ELEMENT_VERSION(0, 1, 0), "Signal handler that causes SST to generate a checkpoint. This is the default action for the " "'--checkpoint-wall-period' option.") + CheckpointRealTimeAction(); virtual void execute() override; virtual void begin(time_t scheduled_time) override; + + bool canInitiateCheckpoint() override { return true; } }; /* Action to generate a heartbeat message */ @@ -96,9 +102,10 @@ class HeartbeatRealTimeAction : public RealTimeAction { public: SST_ELI_REGISTER_REALTIMEACTION( - HeartbeatRealTimeAction, "sst", "heartbeat", SST_ELI_ELEMENT_VERSION(0, 1, 0), + HeartbeatRealTimeAction, "sst", "rt.heartbeat", SST_ELI_ELEMENT_VERSION(0, 1, 0), "Signal handler that causes SST to generate a heartbeat message (status and some resource usage information). " "This is the default action for the '--heartbeat-wall-period' option.") + HeartbeatRealTimeAction(); virtual void execute() override; virtual void begin(time_t scheduled_time) override; @@ -179,11 +186,18 @@ class RealTimeManager : public SST::Core::Serialization::serializable /* SyncManager request to get signals. Also clears local signals */ bool getSignals(int& sig_end, int& sig_usr, int& sig_alrm); + /** + Check whether or not any of the Actions registered with the + manager can initiate a checkpoint. + */ + bool canInitiateCheckpoint() { return can_checkpoint_; } + void serialize_order(SST::Core::Serialization::serializer& ser) override; ImplementSerializable(SST::RealTimeManager) private: - bool serial_exec_; // Whether execution is serial or parallel + bool serial_exec_; // Whether execution is serial or parallel + bool can_checkpoint_ = false; // Set to true if any Actions can trigger checkpoint /* The set of signal handlers for all signals */ std::map signal_actions_; diff --git a/src/sst/core/realtimeAction.h b/src/sst/core/realtimeAction.h index 7660ed890..5ded89cc8 100644 --- a/src/sst/core/realtimeAction.h +++ b/src/sst/core/realtimeAction.h @@ -35,11 +35,21 @@ class RealTimeAction RealTimeAction(); - /* Optional function called just before run loop starts. Passes in the next scheduled time of the event or 0 if the - * event is not scheduled */ + /* Optional function called just before run loop starts. Passes in + * the next scheduled time of the event or 0 if the event is not + * scheduled */ virtual void begin(time_t UNUSED(scheduled_time)) {} virtual void execute() = 0; + /* Attribute functions that let the core know when certain actions + * need to be planned for */ + + /** + Let's the core know if this action may trigger a checkpoint so + that all the checkpoint infrastructure can be initialized. + */ + virtual bool canInitiateCheckpoint() { return false; } + /* Accessors for core state that signal handlers may need * These accessors return per-thread information unless noted in a comment */ diff --git a/src/sst/core/simulation.cc b/src/sst/core/simulation.cc index 1ad5b4861..a3d14c7a4 100644 --- a/src/sst/core/simulation.cc +++ b/src/sst/core/simulation.cc @@ -42,8 +42,11 @@ #include #include +#include +#include #include + #define SST_SIMTIME_MAX 0xffffffffffffffff using namespace SST::Statistics; @@ -98,12 +101,15 @@ Simulation_impl::~Simulation_impl() // If checkpoint_action is triggered on sim time then it will // be deleted when the timeVortex is deleted - if ( checkpoint_action_->getNextCheckpointSimTime() == 0 ) delete checkpoint_action_; + if ( checkpoint_action_->getNextCheckpointSimTime() == MAX_SIMTIME_T ) delete checkpoint_action_; // Delete the timeVortex first. This will delete all events left // in the queue, as well as the Sync, Exit and Clock objects. delete timeVortex; + // For serial runs, the sync object is not in the timevortex + if ( num_ranks.rank == 1 && num_ranks.thread == 1 ) delete syncManager; + // Delete all the components // for ( CompMap_t::iterator it = compMap.begin(); it != compMap.end(); ++it ) { // delete it->second; @@ -222,9 +228,12 @@ Simulation_impl::Simulation_impl(Config* cfg, RankInfo my_rank, RankInfo num_ran else { checkpoint_action_ = new CheckpointAction(cfg, my_rank, this, nullptr); } - - real_time_ = new RealTimeManager(num_ranks); } + else { + // Direct interthread links not yet supported with checkpointing + direct_interthread = false; + } + real_time_ = new RealTimeManager(num_ranks); } void @@ -241,21 +250,22 @@ Simulation_impl::setupSimActions(Config* cfg) // Signal handling - default if ( !cfg->enable_sig_handling() ) return; - real_time_->registerSignal(factory->Create("sst.exit.emergency"), SIGINT); - real_time_->registerSignal(factory->Create("sst.exit.emergency"), SIGTERM); - real_time_->registerSignal(factory->Create("sst.status.core"), SIGUSR1); - real_time_->registerSignal(factory->Create("sst.status.all"), SIGUSR2); + real_time_->registerSignal(factory->Create("sst.rt.exit.emergency"), SIGINT); + real_time_->registerSignal(factory->Create("sst.rt.exit.emergency"), SIGTERM); + real_time_->registerSignal(factory->Create("sst.rt.status.core"), SIGUSR1); + real_time_->registerSignal(factory->Create("sst.rt.status.all"), SIGUSR2); if ( cfg->exit_after() != 0 ) { - real_time_->registerInterval(cfg->exit_after(), factory->Create("sst.exit.clean")); + real_time_->registerInterval(cfg->exit_after(), factory->Create("sst.rt.exit.clean")); } if ( cfg->checkpoint_wall_period() != 0 ) { - real_time_->registerInterval(cfg->checkpoint_wall_period(), factory->Create("sst.checkpoint")); + real_time_->registerInterval( + cfg->checkpoint_wall_period(), factory->Create("sst.rt.checkpoint")); } if ( cfg->heartbeat_wall_period() != 0 ) { - real_time_->registerInterval(cfg->heartbeat_wall_period(), factory->Create("sst.heartbeat")); + real_time_->registerInterval(cfg->heartbeat_wall_period(), factory->Create("sst.rt.heartbeat")); } } @@ -347,8 +357,8 @@ Simulation_impl::processGraphInfo(ConfigGraph& graph, const RankInfo& UNUSED(myR // Create the SyncManager for this rank. It gets created even if // we are single rank/single thread because it also manages the // Exit and Heartbeat actions. - syncManager = new SyncManager( - my_rank, num_ranks, minPartTC = minPartToTC(min_part), min_part, interThreadLatencies, real_time_); + minPartTC = minPartToTC(min_part); + syncManager = new SyncManager(my_rank, num_ranks, min_part, interThreadLatencies, real_time_); // Check to see if the SyncManager profile tool is installed auto tools = getProfileTool("sync"); @@ -1314,15 +1324,14 @@ Simulation_impl::scheduleCheckpoint() checkpoint_action_->setCheckpoint(); } + void -Simulation_impl::checkpoint() +Simulation_impl::checkpoint_write_globals( + int checkpoint_id, const std::string& registry_filename, const std::string& globals_filename) { - std::string checkpoint_filename = - std::to_string(currentSimCycle) + "_" + std::to_string(checkpoint_id_) + ".sstcpt"; - if ( checkpoint_prefix_ != "" ) checkpoint_filename = checkpoint_prefix_ + "_" + checkpoint_filename; - checkpoint_id_++; + std::ofstream fs(globals_filename, std::ios::out | std::ios::binary); - std::ofstream fs(checkpoint_filename, std::ios::out | std::ios::binary); + // TODO: Add error checking for file open SST::Core::Serialization::serializer ser; ser.enable_pointer_tracking(); @@ -1368,6 +1377,71 @@ Simulation_impl::checkpoint() fs.write(reinterpret_cast(&size), sizeof(size)); fs.write(buffer, size); + fs.close(); + + std::ofstream fs_reg(registry_filename, std::ios::out); + + /* Section 1: Checkpoint info */ + fs_reg << "## Checkpoint #" << checkpoint_id << " at time " << currentSimCycle << "\n" << std::endl; + + /* Section 2: Config options */ + fs_reg << "## Configuaration Options" << std::endl; + fs_reg << "## Note: Values in this section are for information only, editing values will not affect restart" + << std::endl; +#define WR(var) fs_reg << #var << " = " << var << std::endl; + WR(num_ranks.rank); + WR(num_ranks.thread); + WR(libpath); + WR(timeLord.timeBaseString); + WR(output_directory); + std::string output_prefix = sim_output.getPrefix(); + WR(output_prefix); + uint32_t output_verbose = sim_output.getVerboseLevel(); + WR(output_verbose); + WR(globalOutputFileName); + std::string checkpoint_prefix = checkpoint_prefix_; + WR(checkpoint_prefix); + fs_reg << std::endl; +#undef WR + + fs_reg << "** (globals): " << globals_filename << std::endl; + + fs_reg.close(); +} + +void +Simulation_impl::checkpoint_append_registry(const std::string& registry_name, const std::string& blob_name) +{ + // The top level registry file for the checkpoint will be a text + // file and will include global data first, then a registery of + // where each component data blob is located (file + offset). + + // Rank 0, thread 0 will write out the global data, then after all + // the binary checkpoint files are written, each rank/thread will + // take turns writing their registry data to the file. + + std::ofstream fs(registry_name, std::ios::out | std::ios::app); + + // Write out the component offsets + fs << "\n** (" << my_rank.rank << ":" << my_rank.thread << "): " << blob_name << std::endl; + for ( auto x : component_blob_offsets_ ) { + fs << x.first << " : " << x.second << std::endl; + } + fs.close(); +} + +void +Simulation_impl::checkpoint(const std::string& checkpoint_filename) +{ + std::ofstream fs(checkpoint_filename, std::ios::out | std::ios::binary); + // TODO: Add error checking for file open + uint64_t offset = 0; + + SST::Core::Serialization::serializer ser; + ser.enable_pointer_tracking(); + + size_t size, buffer_size; + char* buffer; /* Section 2: Loaded libraries */ ser.start_sizing(); @@ -1375,18 +1449,16 @@ Simulation_impl::checkpoint() factory->getLoadedLibraryNames(libnames); ser& libnames; - size = ser.size(); - if ( size > buffer_size ) { - buffer_size = size; - delete[] buffer; - buffer = new char[buffer_size]; - } + size = ser.size(); + buffer_size = size; + buffer = new char[buffer_size]; ser.start_packing(buffer, size); ser& libnames; fs.write(reinterpret_cast(&size), sizeof(size)); fs.write(buffer, size); + offset += (sizeof(size) + size); /* Section 3: Simulation_impl */ @@ -1409,17 +1481,16 @@ Simulation_impl::checkpoint() ser& output_directory; // Actions that may also be in TV ser& real_time_; - ser& m_exit; - ser& syncManager; + if ( my_rank.thread == 0 ) { ser& m_exit; } ser& m_heartbeat; // Add statistics engine and associated state // Individual statistics are checkpointing with component - ser& StatisticProcessingEngine::m_statOutputs; + if ( my_rank.thread == 0 ) { ser& StatisticProcessingEngine::m_statOutputs; } ser& stat_engine; // Add shared regions - ser& SharedObject::manager; + if ( my_rank.thread == 0 ) { ser& SharedObject::manager; } // Serialize the clockmap ser& clockMap; @@ -1454,18 +1525,18 @@ Simulation_impl::checkpoint() ser& output_directory; // Actions that may also be in TV ser& real_time_; - ser& m_exit; - ser& syncManager; + if ( my_rank.thread == 0 ) { ser& m_exit; } + initBarrier.wait(); ser& m_heartbeat; // Add shared StatisticOutput vector // Add statistic engine - ser& StatisticProcessingEngine::m_statOutputs; + if ( my_rank.thread == 0 ) { ser& StatisticProcessingEngine::m_statOutputs; } ser& stat_engine; // Add shared regions - ser& SharedObject::manager; + if ( my_rank.thread == 0 ) { ser& SharedObject::manager; } ser& clockMap; @@ -1475,9 +1546,14 @@ Simulation_impl::checkpoint() // Write buffer to file fs.write(reinterpret_cast(&size), sizeof(size)); fs.write(buffer, size); + offset += (sizeof(size) + size); size = compInfoMap.size(); fs.write(reinterpret_cast(&size), sizeof(size)); + offset += size; + + // Clear the offsets vector to start this round + component_blob_offsets_.clear(); // Serialize component blobs individually for ( auto comp = compInfoMap.begin(); comp != compInfoMap.end(); comp++ ) { @@ -1495,8 +1571,10 @@ Simulation_impl::checkpoint() ser.start_packing(buffer, size); ser& compinfo; + component_blob_offsets_.emplace_back(compinfo->id, offset); fs.write(reinterpret_cast(&size), sizeof(size)); fs.write(buffer, size); + offset += (sizeof(size) + size); } fs.close(); @@ -1515,22 +1593,36 @@ Simulation_impl::checkpoint() void Simulation_impl::restart(Config* cfg) { + std::ifstream fs(cfg->configFile()); + + std::string line; + + // Look for the line that has my rank's file info + std::string blob_filename; + std::string search_str("** ("); + search_str = search_str + std::to_string(my_rank.rank) + ":" + std::to_string(my_rank.thread) + "): "; + while ( std::getline(fs, line) ) { + size_t pos = line.find(search_str); + if ( pos == 0 ) { + // Get the file name + blob_filename = line.substr(search_str.length()); + break; + } + } + fs.close(); + size_t size, buffer_size; char* buffer; SST::Core::Serialization::serializer ser; ser.enable_pointer_tracking(); - std::ifstream fs(cfg->configFile(), std::ios::binary); - - /* Skip config section, main did that already */ - fs.read(reinterpret_cast(&size), sizeof(size)); - fs.seekg(size, std::ios::cur); + std::ifstream fs_blob(blob_filename, std::ios::binary); /* Begin deserialization, libraries */ - fs.read(reinterpret_cast(&size), sizeof(size)); + fs_blob.read(reinterpret_cast(&size), sizeof(size)); buffer_size = size; buffer = new char[buffer_size]; - fs.read(buffer, size); + fs_blob.read(buffer, size); ser.start_unpacking(buffer, size); std::set libnames; @@ -1540,13 +1632,13 @@ Simulation_impl::restart(Config* cfg) factory->loadUnloadedLibraries(libnames); /* Now get the global blob */ - fs.read(reinterpret_cast(&size), sizeof(size)); + fs_blob.read(reinterpret_cast(&size), sizeof(size)); if ( size > buffer_size ) { delete[] buffer; buffer_size = size; buffer = new char[buffer_size]; } - fs.read(buffer, size); + fs_blob.read(buffer, size); ser.start_unpacking(buffer, size); @@ -1568,45 +1660,72 @@ Simulation_impl::restart(Config* cfg) ser& output_directory; // Actions that may also be in TV ser& real_time_; - ser& m_exit; - ser& syncManager; + if ( my_rank.thread == 0 ) { ser& m_exit; } + + // Create new checkpoint object. Needs to be done before SyncManager is reinitialized + if ( cfg->checkpoint_sim_period() != "" ) { + sim_output.output( + "# Creating simulation checkpoint at simulated time period of %s.\n", cfg->checkpoint_sim_period().c_str()); + checkpoint_action_ = + new CheckpointAction(cfg, my_rank, this, timeLord.getTimeConverter(cfg->checkpoint_sim_period())); + } + else { + checkpoint_action_ = new CheckpointAction(cfg, my_rank, this, nullptr); + } + + // Set up the syncManager + syncManager = new SyncManager(my_rank, num_ranks, minPart, interThreadLatencies, real_time_); + // Look at simulation.cc line 365 on setting up profile tools + ser& m_heartbeat; // Get statistics engine - ser& StatisticProcessingEngine::m_statOutputs; + if ( my_rank.thread == 0 ) { ser& StatisticProcessingEngine::m_statOutputs; } + completeBarrier.wait(); ser& stat_engine; /* Initial fix up of stat engine, the rest is after components re-register statistics */ stat_engine.restart(this); // Add shared regions - ser& SharedObject::manager; + if ( my_rank.thread == 0 ) { ser& SharedObject::manager; } ser& clockMap; // Last, get the timevortex ser& timeVortex; + /* Extract components */ size_t compCount; - fs.read(reinterpret_cast(&compCount), sizeof(compCount)); + fs_blob.read(reinterpret_cast(&compCount), sizeof(compCount)); // Deserialize component blobs individually for ( size_t comp = 0; comp < compCount; comp++ ) { - fs.read(reinterpret_cast(&size), sizeof(size)); + fs_blob.read(reinterpret_cast(&size), sizeof(size)); if ( size > buffer_size ) { delete[] buffer; buffer_size = size; buffer = new char[buffer_size]; } - fs.read(buffer, size); + fs_blob.read(buffer, size); ser.start_unpacking(buffer, size); ComponentInfo* compInfo = new ComponentInfo(); ser& compInfo; compInfoMap.insert(compInfo); } - fs.close(); + fs_blob.close(); delete[] buffer; + // If we are a parallel job, need to call + // finalizeLinkConfigurations() in order to finish setting up all + // the sync datastrctures. This will also cause the SyncManager to + // compute its next fire time. + if ( num_ranks.rank > 1 || num_ranks.thread > 1 ) { + syncManager->setRestartTime(currentSimCycle); + setupBarrier.wait(); + syncManager->finalizeLinkConfigurations(); + } + // Need to clean up the handlers in the TimeVortex timeVortex->fixup_handlers(); @@ -1615,17 +1734,6 @@ Simulation_impl::restart(Config* cfg) if ( my_rank.thread == 0 ) { StatisticProcessingEngine::stat_outputs_simulation_start(); } stat_engine.startOfSimulation(); - // Create new checkpoint object - if ( cfg->checkpoint_sim_period() != "" ) { - sim_output.output( - "# Creating simulation checkpoint at simulated time period of %s.\n", cfg->checkpoint_sim_period().c_str()); - checkpoint_action_ = - new CheckpointAction(cfg, my_rank, this, timeLord.getTimeConverter(cfg->checkpoint_sim_period())); - } - else { - checkpoint_action_ = new CheckpointAction(cfg, my_rank, this, nullptr); - } - // Resolve heartbeat -> overwrite with command line if present if ( cfg->heartbeat_sim_period() != "" && my_rank.thread == 0 ) { if ( m_heartbeat ) delete m_heartbeat; @@ -1812,6 +1920,8 @@ std::mutex Simulation_impl::simulationMutex; Core::ThreadSafe::Spinlock Simulation_impl::cross_thread_lock; TimeConverter* Simulation_impl::minPartTC = nullptr; SimTime_t Simulation_impl::minPart; +std::string Simulation_impl::checkpoint_directory_ = ""; + /* Define statics (Simulation) */ std::unordered_map Simulation_impl::instanceMap; diff --git a/src/sst/core/simulation_impl.h b/src/sst/core/simulation_impl.h index 378c8f977..a4da86cc4 100644 --- a/src/sst/core/simulation_impl.h +++ b/src/sst/core/simulation_impl.h @@ -328,8 +328,25 @@ class Simulation_impl */ TimeConverter* minPartToTC(SimTime_t cycles) const; - void scheduleCheckpoint(); - void checkpoint(); + std::string initializeCheckpointInfrastructure(const std::string& prefix); + void scheduleCheckpoint(); + + /** + Write the partition specific checkpoint data + */ + void checkpoint(const std::string& checkpoint_filename); + + /** + Append partitions registry information + */ + void checkpoint_append_registry(const std::string& registry_name, const std::string& blob_name); + + /** + Write the global data to a binary file and create the registry + and write the header info + */ + void checkpoint_write_globals( + int checkpoint_id, const std::string& registry_filename, const std::string& globals_filename); void restart(Config* config); /** Factory used to generate the simulation components */ @@ -394,6 +411,7 @@ class Simulation_impl static Exit* m_exit; SimulatorHeartbeat* m_heartbeat = nullptr; CheckpointAction* checkpoint_action_; + static std::string checkpoint_directory_; bool endSim; bool independent; // true if no links leave thread (i.e. no syncs required) static std::atomic untimed_msg_count; @@ -403,6 +421,11 @@ class Simulation_impl bool wireUpFinished_; RealTimeManager* real_time_; + /** + vector to hold offsets of component blobs in checkpoint files + */ + std::vector> component_blob_offsets_; + /** TimeLord of the simulation */ static TimeLord timeLord; /** Output */ diff --git a/src/sst/core/statapi/statoutput.cc b/src/sst/core/statapi/statoutput.cc index e8b605184..69a6f38b1 100644 --- a/src/sst/core/statapi/statoutput.cc +++ b/src/sst/core/statapi/statoutput.cc @@ -239,9 +239,11 @@ StatisticFieldsOutput::getFieldTypeShortName(fieldType_t type) void StatisticFieldsOutput::registerStatistic(StatisticBase* stat) { + this->lock(); startRegisterFields(stat); stat->registerOutputFields(this); stopRegisterFields(); + this->unlock(); } // Start / Stop of register diff --git a/src/sst/core/sync/Makefile.inc b/src/sst/core/sync/Makefile.inc index 93dd84ff6..84906adf1 100644 --- a/src/sst/core/sync/Makefile.inc +++ b/src/sst/core/sync/Makefile.inc @@ -14,5 +14,4 @@ sst_core_sources += \ sync/threadSyncDirectSkip.h \ sync/threadSyncDirectSkip.cc \ sync/threadSyncSimpleSkip.h \ - sync/threadSyncSimpleSkip.cc \ - sync/threadSyncQueue.h + sync/threadSyncSimpleSkip.cc diff --git a/src/sst/core/sync/rankSyncParallelSkip.cc b/src/sst/core/sync/rankSyncParallelSkip.cc index 3334f1d24..fd0b69f5a 100644 --- a/src/sst/core/sync/rankSyncParallelSkip.cc +++ b/src/sst/core/sync/rankSyncParallelSkip.cc @@ -49,7 +49,7 @@ SimTime_t RankSyncParallelSkip::myNextSyncTime = 0; ///// RankSyncParallelSkip class ///// -RankSyncParallelSkip::RankSyncParallelSkip(RankInfo num_ranks, TimeConverter* UNUSED(minPartTC)) : +RankSyncParallelSkip::RankSyncParallelSkip(RankInfo num_ranks) : RankSync(num_ranks), mpiWaitTime(0.0), deserializeTime(0.0), @@ -95,11 +95,11 @@ RankSyncParallelSkip::registerLink( std::lock_guard slock(lock); // For sends, we track the remote rank and thread ID - SyncQueue* queue; + RankSyncQueue* queue; if ( comm_send_map.count(to_rank) == 0 ) { send_count++; comm_send_map[to_rank].to_rank = to_rank; - queue = comm_send_map[to_rank].squeue = new SyncQueue(); + queue = comm_send_map[to_rank].squeue = new RankSyncQueue(to_rank); comm_send_map[to_rank].remote_size = 4096; } else { @@ -125,6 +125,12 @@ RankSyncParallelSkip::registerLink( return queue; } +void +RankSyncParallelSkip::setRestartTime(SimTime_t time) +{ + if ( Simulation_impl::getSimulation()->getRank().thread == 0 ) { myNextSyncTime = time; } +} + void RankSyncParallelSkip::finalizeLinkConfigurations() { @@ -250,7 +256,6 @@ RankSyncParallelSkip::exchange_master(int UNUSED(thread)) // of ranks I communicate with (1 recv, 2 sends per rank) MPI_Request sreqs[2 * comm_send_map.size()]; int sreq_count = 0; - // First thing to do is fill the serialize_queue. for ( auto i = comm_send_map.begin(); i != comm_send_map.end(); ++i ) { serialize_queue.try_insert(&(i->second)); @@ -281,17 +286,17 @@ RankSyncParallelSkip::exchange_master(int UNUSED(thread)) if ( send_queue.try_remove(send) ) { my_send_count--; - char* send_buffer = send->sbuf; + char* send_buffer = send->sbuf; // Cast to Header so we can get/fill in data - SyncQueue::Header* hdr = reinterpret_cast(send_buffer); - int tag = 2 * send->to_rank.thread; + RankSyncQueue::Header* hdr = reinterpret_cast(send_buffer); + int tag = 2 * send->to_rank.thread; // Check to see if remote queue is big enough for data if ( send->remote_size < hdr->buffer_size ) { // not big enough, send message that will tell remote side to get larger buffer hdr->mode = 1; MPI_Isend( - send_buffer, sizeof(SyncQueue::Header), MPI_BYTE, send->to_rank.rank /*dest*/, tag, MPI_COMM_WORLD, - &sreqs[sreq_count++]); + send_buffer, sizeof(RankSyncQueue::Header), MPI_BYTE, send->to_rank.rank /*dest*/, tag, + MPI_COMM_WORLD, &sreqs[sreq_count++]); send->remote_size = hdr->buffer_size; tag = 2 * send->to_rank.thread + 1; } @@ -330,9 +335,9 @@ RankSyncParallelSkip::exchange_master(int UNUSED(thread)) // Get the buffer and deserialize all the events char* buffer = i->second.rbuf; - SyncQueue::Header* hdr = reinterpret_cast(buffer); - unsigned int size = hdr->buffer_size; - int mode = hdr->mode; + RankSyncQueue::Header* hdr = reinterpret_cast(buffer); + unsigned int size = hdr->buffer_size; + int mode = hdr->mode; if ( mode == 1 ) { // May need to resize the buffer @@ -356,7 +361,7 @@ RankSyncParallelSkip::exchange_master(int UNUSED(thread)) // For now simply call exchange_slave() to deliver events exchange_slave(0); /* Barriers at end */ - // Clear the SyncQueues used to send the data after all the sends have completed + // Clear the RankSyncQueues used to send the data after all the sends have completed // waitStart = SST::Core::Profile::now(); MPI_Waitall(sreq_count, sreqs, MPI_STATUSES_IGNORE); // mpiWaitTime += SST::Core::Profile::getElapsed(waitStart); @@ -416,15 +421,15 @@ RankSyncParallelSkip::exchangeLinkUntimedData(int UNUSED_WO_MPI(thread), std::at char* send_buffer = i->second.squeue->getData(); // Cast to Header so we can get/fill in data - SyncQueue::Header* hdr = reinterpret_cast(send_buffer); - int tag = 2 * i->second.to_rank.thread; + RankSyncQueue::Header* hdr = reinterpret_cast(send_buffer); + int tag = 2 * i->second.to_rank.thread; // Check to see if remote queue is big enough for data if ( i->second.remote_size < hdr->buffer_size ) { // not big enough, send message that will tell remote side to get larger buffer hdr->mode = 1; MPI_Isend( - send_buffer, sizeof(SyncQueue::Header), MPI_BYTE, i->second.to_rank.rank /*dest*/, tag, MPI_COMM_WORLD, - &sreqs[sreq_count++]); + send_buffer, sizeof(RankSyncQueue::Header), MPI_BYTE, i->second.to_rank.rank /*dest*/, tag, + MPI_COMM_WORLD, &sreqs[sreq_count++]); i->second.remote_size = hdr->buffer_size; tag = 2 * i->second.to_rank.thread + 1; } @@ -444,9 +449,9 @@ RankSyncParallelSkip::exchangeLinkUntimedData(int UNUSED_WO_MPI(thread), std::at // Get the buffer and deserialize all the events char* buffer = i->second.rbuf; - SyncQueue::Header* hdr = reinterpret_cast(buffer); - unsigned int size = hdr->buffer_size; - int mode = hdr->mode; + RankSyncQueue::Header* hdr = reinterpret_cast(buffer); + unsigned int size = hdr->buffer_size; + int mode = hdr->mode; if ( mode == 1 ) { // May need to resize the buffer @@ -462,7 +467,7 @@ RankSyncParallelSkip::exchangeLinkUntimedData(int UNUSED_WO_MPI(thread), std::at } SST::Core::Serialization::serializer ser; - ser.start_unpacking(&buffer[sizeof(SyncQueue::Header)], size - sizeof(SyncQueue::Header)); + ser.start_unpacking(&buffer[sizeof(RankSyncQueue::Header)], size - sizeof(RankSyncQueue::Header)); std::vector activities; ser& activities; @@ -474,7 +479,7 @@ RankSyncParallelSkip::exchangeLinkUntimedData(int UNUSED_WO_MPI(thread), std::at } } - // Clear the SyncQueues used to send the data after all the sends have completed + // Clear the RankSyncQueues used to send the data after all the sends have completed MPI_Waitall(sreq_count, sreqs, MPI_STATUSES_IGNORE); for ( auto i = comm_send_map.begin(); i != comm_send_map.end(); ++i ) { @@ -493,44 +498,20 @@ RankSyncParallelSkip::exchangeLinkUntimedData(int UNUSED_WO_MPI(thread), std::at void RankSyncParallelSkip::deserializeMessage(comm_recv_pair* msg) { - char* buffer = msg->rbuf; - SyncQueue::Header* hdr = reinterpret_cast(buffer); - unsigned int size = hdr->buffer_size; + char* buffer = msg->rbuf; + RankSyncQueue::Header* hdr = reinterpret_cast(buffer); + unsigned int size = hdr->buffer_size; auto deserialStart = SST::Core::Profile::now(); SST::Core::Serialization::serializer ser; - ser.start_unpacking(&buffer[sizeof(SyncQueue::Header)], size - sizeof(SyncQueue::Header)); + ser.start_unpacking(&buffer[sizeof(RankSyncQueue::Header)], size - sizeof(RankSyncQueue::Header)); ser & msg->activity_vec; deserializeTime += SST::Core::Profile::getElapsed(deserialStart); } -void -RankSyncParallelSkip::serialize_order(SST::Core::Serialization::serializer& ser) -{ - RankSync::serialize_order(ser); - ser& myNextSyncTime; - ser& mpiWaitTime; - ser& deserializeTime; - ser& send_count; - for ( uint32_t i = 0; i < num_ranks_.thread; i++ ) - ser& recv_count[i]; - - ser& comm_send_map; - ser& comm_recv_map; - - // Unused - // link_map - - // No need to serialize - // remaining_deser - // queues (deserialize_queue, link_send_queue, serialize_queue, send_queue) - // barriers (serializeReadyBarrier, slaveExchangeDoneBarrier, allDoneBarrier) - // lock -} - int RankSyncParallelSkip::sig_end_(0); int RankSyncParallelSkip::sig_usr_(0); int RankSyncParallelSkip::sig_alrm_(0); diff --git a/src/sst/core/sync/rankSyncParallelSkip.h b/src/sst/core/sync/rankSyncParallelSkip.h index 697305309..c0b971b54 100644 --- a/src/sst/core/sync/rankSyncParallelSkip.h +++ b/src/sst/core/sync/rankSyncParallelSkip.h @@ -27,14 +27,14 @@ REENABLE_WARNING namespace SST { -class SyncQueue; +class RankSyncQueue; class TimeConverter; class RankSyncParallelSkip : public RankSync { public: /** Create a new Sync object which fires with a specified period */ - RankSyncParallelSkip(RankInfo num_ranks, TimeConverter* minPartTC); + RankSyncParallelSkip(RankInfo num_ranks); RankSyncParallelSkip() {} // For serialization virtual ~RankSyncParallelSkip(); @@ -57,10 +57,9 @@ class RankSyncParallelSkip : public RankSync SimTime_t getNextSyncTime() override { return myNextSyncTime; } - uint64_t getDataSize() const override; + void setRestartTime(SimTime_t time) override; - void serialize_order(SST::Core::Serialization::serializer& ser) override; - ImplementSerializable(SST::RankSyncParallelSkip) + uint64_t getDataSize() const override; private: static SimTime_t myNextSyncTime; @@ -71,10 +70,10 @@ class RankSyncParallelSkip : public RankSync struct comm_send_pair : public SST::Core::Serialization::serializable { - RankInfo to_rank; - SyncQueue* squeue; // SyncQueue - char* sbuf; - uint32_t remote_size; + RankInfo to_rank; + RankSyncQueue* squeue; // RankSyncQueue + char* sbuf; + uint32_t remote_size; void serialize_order(SST::Core::Serialization::serializer& ser) override { diff --git a/src/sst/core/sync/rankSyncSerialSkip.cc b/src/sst/core/sync/rankSyncSerialSkip.cc index c4063088a..d4b4fe75f 100644 --- a/src/sst/core/sync/rankSyncSerialSkip.cc +++ b/src/sst/core/sync/rankSyncSerialSkip.cc @@ -51,10 +51,7 @@ namespace SST { // Static Data Members SimTime_t RankSyncSerialSkip::myNextSyncTime = 0; -RankSyncSerialSkip::RankSyncSerialSkip(RankInfo num_ranks, TimeConverter* UNUSED(minPartTC)) : - RankSync(num_ranks), - mpiWaitTime(0.0), - deserializeTime(0.0) +RankSyncSerialSkip::RankSyncSerialSkip(RankInfo num_ranks) : RankSync(num_ranks), mpiWaitTime(0.0), deserializeTime(0.0) { max_period = Simulation_impl::getSimulation()->getMinPartTC(); myNextSyncTime = max_period->getFactor(); @@ -79,9 +76,9 @@ RankSyncSerialSkip::registerLink( { std::lock_guard slock(lock); - SyncQueue* queue; + RankSyncQueue* queue; if ( comm_map.count(to_rank.rank) == 0 ) { - queue = comm_map[to_rank.rank].squeue = new SyncQueue(); + queue = comm_map[to_rank.rank].squeue = new RankSyncQueue(to_rank); comm_map[to_rank.rank].rbuf = new char[4096]; comm_map[to_rank.rank].local_size = 4096; comm_map[to_rank.rank].remote_size = 4096; @@ -97,6 +94,12 @@ RankSyncSerialSkip::registerLink( return queue; } +void +RankSyncSerialSkip::setRestartTime(SimTime_t time) +{ + if ( Simulation_impl::getSimulation()->getRank().thread == 0 ) { myNextSyncTime = time; } +} + void RankSyncSerialSkip::finalizeLinkConfigurations() {} @@ -162,15 +165,15 @@ RankSyncSerialSkip::exchange(void) SST_EVENT_PROFILE_STOP // Cast to Header so we can get/fill in data - SyncQueue::Header* hdr = reinterpret_cast(send_buffer); + RankSyncQueue::Header* hdr = reinterpret_cast(send_buffer); // Simulation_impl::getSimulation()->getSimulationOutput().output("Data size = %d\n", hdr->buffer_size); - int tag = 1; + int tag = 1; // Check to see if remote queue is big enough for data if ( i->second.remote_size < hdr->buffer_size ) { // not big enough, send message that will tell remote side to get larger buffer hdr->mode = 1; MPI_Isend( - send_buffer, sizeof(SyncQueue::Header), MPI_BYTE, i->first /*dest*/, tag, MPI_COMM_WORLD, + send_buffer, sizeof(RankSyncQueue::Header), MPI_BYTE, i->first /*dest*/, tag, MPI_COMM_WORLD, &sreqs[sreq_count++]); i->second.remote_size = hdr->buffer_size; tag = 2; @@ -196,9 +199,9 @@ RankSyncSerialSkip::exchange(void) // Get the buffer and deserialize all the events char* buffer = i->second.rbuf; - SyncQueue::Header* hdr = reinterpret_cast(buffer); - unsigned int size = hdr->buffer_size; - int mode = hdr->mode; + RankSyncQueue::Header* hdr = reinterpret_cast(buffer); + unsigned int size = hdr->buffer_size; + int mode = hdr->mode; if ( mode == 1 ) { // May need to resize the buffer @@ -214,7 +217,7 @@ RankSyncSerialSkip::exchange(void) auto deserialStart = SST::Core::Profile::now(); SST::Core::Serialization::serializer ser; - ser.start_unpacking(&buffer[sizeof(SyncQueue::Header)], size - sizeof(SyncQueue::Header)); + ser.start_unpacking(&buffer[sizeof(RankSyncQueue::Header)], size - sizeof(RankSyncQueue::Header)); std::vector activities; activities.clear(); @@ -232,7 +235,7 @@ RankSyncSerialSkip::exchange(void) activities.clear(); } - // Clear the SyncQueues used to send the data after all the sends have completed + // Clear the RankSyncQueues used to send the data after all the sends have completed waitStart = SST::Core::Profile::now(); MPI_Waitall(sreq_count, sreqs, MPI_STATUSES_IGNORE); mpiWaitTime += SST::Core::Profile::getElapsed(waitStart); @@ -282,16 +285,16 @@ RankSyncSerialSkip::exchangeLinkUntimedData(int UNUSED_WO_MPI(thread), std::atom // Do all the sends // Get the buffer from the syncQueue - char* send_buffer = i->second.squeue->getData(); + char* send_buffer = i->second.squeue->getData(); // Cast to Header so we can get/fill in data - SyncQueue::Header* hdr = reinterpret_cast(send_buffer); - int tag = 1; + RankSyncQueue::Header* hdr = reinterpret_cast(send_buffer); + int tag = 1; // Check to see if remote queue is big enough for data if ( i->second.remote_size < hdr->buffer_size ) { // not big enough, send message that will tell remote side to get larger buffer hdr->mode = 1; MPI_Isend( - send_buffer, sizeof(SyncQueue::Header), MPI_BYTE, i->first /*dest*/, tag, MPI_COMM_WORLD, + send_buffer, sizeof(RankSyncQueue::Header), MPI_BYTE, i->first /*dest*/, tag, MPI_COMM_WORLD, &sreqs[sreq_count++]); i->second.remote_size = hdr->buffer_size; tag = 2; @@ -314,9 +317,9 @@ RankSyncSerialSkip::exchangeLinkUntimedData(int UNUSED_WO_MPI(thread), std::atom // Get the buffer and deserialize all the events char* buffer = i->second.rbuf; - SyncQueue::Header* hdr = reinterpret_cast(buffer); - unsigned int size = hdr->buffer_size; - int mode = hdr->mode; + RankSyncQueue::Header* hdr = reinterpret_cast(buffer); + unsigned int size = hdr->buffer_size; + int mode = hdr->mode; if ( mode == 1 ) { // May need to resize the buffer @@ -330,7 +333,7 @@ RankSyncSerialSkip::exchangeLinkUntimedData(int UNUSED_WO_MPI(thread), std::atom } SST::Core::Serialization::serializer ser; - ser.start_unpacking(&buffer[sizeof(SyncQueue::Header)], size - sizeof(SyncQueue::Header)); + ser.start_unpacking(&buffer[sizeof(RankSyncQueue::Header)], size - sizeof(RankSyncQueue::Header)); std::vector activities; ser& activities; @@ -341,7 +344,7 @@ RankSyncSerialSkip::exchangeLinkUntimedData(int UNUSED_WO_MPI(thread), std::atom } } - // Clear the SyncQueues used to send the data after all the sends have completed + // Clear the RankSyncQueues used to send the data after all the sends have completed MPI_Waitall(sreq_count, sreqs, MPI_STATUSES_IGNORE); for ( comm_map_t::iterator i = comm_map.begin(); i != comm_map.end(); ++i ) { @@ -357,16 +360,6 @@ RankSyncSerialSkip::exchangeLinkUntimedData(int UNUSED_WO_MPI(thread), std::atom #endif } -void -RankSyncSerialSkip::serialize_order(SST::Core::Serialization::serializer& ser) -{ - RankSync::serialize_order(ser); - // TODO Do we need to keep anything from comm_map? Certainly don't need the struct - // ser& comm_map; - ser& mpiWaitTime; - ser& deserializeTime; -} - int RankSyncSerialSkip::sig_end_(0); int RankSyncSerialSkip::sig_usr_(0); int RankSyncSerialSkip::sig_alrm_(0); diff --git a/src/sst/core/sync/rankSyncSerialSkip.h b/src/sst/core/sync/rankSyncSerialSkip.h index bd4b1d06f..f785a17c1 100644 --- a/src/sst/core/sync/rankSyncSerialSkip.h +++ b/src/sst/core/sync/rankSyncSerialSkip.h @@ -20,14 +20,14 @@ namespace SST { -class SyncQueue; +class RankSyncQueue; class TimeConverter; class RankSyncSerialSkip : public RankSync { public: /** Create a new Sync object which fires with a specified period */ - RankSyncSerialSkip(RankInfo num_ranks, TimeConverter* minPartTC); + RankSyncSerialSkip(RankInfo num_ranks); RankSyncSerialSkip() {} // For serialization virtual ~RankSyncSerialSkip(); @@ -50,10 +50,9 @@ class RankSyncSerialSkip : public RankSync SimTime_t getNextSyncTime() override { return myNextSyncTime; } - uint64_t getDataSize() const override; + void setRestartTime(SimTime_t time) override; - void serialize_order(SST::Core::Serialization::serializer& ser) override; - ImplementSerializable(SST::RankSyncSerialSkip) + uint64_t getDataSize() const override; private: static SimTime_t myNextSyncTime; @@ -63,10 +62,10 @@ class RankSyncSerialSkip : public RankSync struct comm_pair : public SST::Core::Serialization::serializable { - SyncQueue* squeue; // SyncQueue - char* rbuf; // receive buffer - uint32_t local_size; - uint32_t remote_size; + RankSyncQueue* squeue; // RankSyncQueue + char* rbuf; // receive buffer + uint32_t local_size; + uint32_t remote_size; void serialize_order(SST::Core::Serialization::serializer& UNUSED(ser)) override {} ImplementSerializable(comm_pair) diff --git a/src/sst/core/sync/syncManager.cc b/src/sst/core/sync/syncManager.cc index 1c4f2e7b7..0975beb04 100644 --- a/src/sst/core/sync/syncManager.cc +++ b/src/sst/core/sync/syncManager.cc @@ -21,8 +21,8 @@ #include "sst/core/simulation_impl.h" #include "sst/core/sync/rankSyncParallelSkip.h" #include "sst/core/sync/rankSyncSerialSkip.h" +#include "sst/core/sync/syncQueue.h" #include "sst/core/sync/threadSyncDirectSkip.h" -#include "sst/core/sync/threadSyncQueue.h" #include "sst/core/sync/threadSyncSimpleSkip.h" #include "sst/core/timeConverter.h" #include "sst/core/warnmacros.h" @@ -42,7 +42,7 @@ namespace SST { // Static data members RankSync* SyncManager::rankSync_ = nullptr; -Core::ThreadSafe::Barrier SyncManager::RankExecBarrier_[6]; +Core::ThreadSafe::Barrier SyncManager::RankExecBarrier_[5]; Core::ThreadSafe::Barrier SyncManager::LinkUntimedBarrier_[3]; SimTime_t SyncManager::next_rankSync_ = MAX_SIMTIME_T; @@ -133,8 +133,8 @@ class EmptyRankSync : public RankSync uint64_t getDataSize() const override { return 0; } - void serialize_order(SST::Core::Serialization::serializer& ser) override { RankSync::serialize_order(ser); } - ImplementSerializable(SST::EmptyRankSync) + // Don't want to reset time for Empty Sync + void setRestartTime(SimTime_t UNUSED(time)) override {} }; class EmptyThreadSync : public ThreadSync @@ -171,9 +171,10 @@ class EmptyThreadSync : public ThreadSync return nullptr; } + // Don't want to reset time for Empty Sync + void setRestartTime(SimTime_t UNUSED(time)) override {} + /** Serialization for checkpoint support */ - void serialize_order(SST::Core::Serialization::serializer& ser) override { ThreadSync::serialize_order(ser); } - ImplementSerializable(EmptyThreadSync) }; void @@ -266,19 +267,9 @@ class SyncProfileToolList std::vector tools; }; - -SyncManager::SyncManager( - const RankInfo& rank, const RankInfo& num_ranks, TimeConverter* minPartTC, SimTime_t min_part, - const std::vector& UNUSED(interThreadLatencies), RealTimeManager* real_time) : - Action(), - rank_(rank), - num_ranks_(num_ranks), - threadSync_(nullptr), - min_part_(min_part), - real_time_(real_time) +void +SyncManager::setupSyncObjects() { - sim_ = Simulation_impl::getSimulation(); - if ( rank_.thread == 0 ) { for ( auto& b : RankExecBarrier_ ) { b.resize(num_ranks_.thread); @@ -287,9 +278,9 @@ SyncManager::SyncManager( b.resize(num_ranks_.thread); } if ( min_part_ != MAX_SIMTIME_T ) { - if ( num_ranks_.thread == 1 ) { rankSync_ = new RankSyncSerialSkip(num_ranks_, minPartTC); } + if ( num_ranks_.thread == 1 ) { rankSync_ = new RankSyncSerialSkip(num_ranks_); } else { - rankSync_ = new RankSyncParallelSkip(num_ranks_, minPartTC); + rankSync_ = new RankSyncParallelSkip(num_ranks_); } } else { @@ -312,6 +303,21 @@ SyncManager::SyncManager( else { threadSync_ = new EmptyThreadSync(Simulation_impl::getSimulation()); } +} + +SyncManager::SyncManager( + const RankInfo& rank, const RankInfo& num_ranks, SimTime_t min_part, + const std::vector& UNUSED(interThreadLatencies), RealTimeManager* real_time) : + Action(), + rank_(rank), + num_ranks_(num_ranks), + threadSync_(nullptr), + min_part_(min_part), + real_time_(real_time) +{ + sim_ = Simulation_impl::getSimulation(); + + setupSyncObjects(); exit_ = sim_->getExit(); checkpoint_ = sim_->getCheckpointAction(); @@ -322,8 +328,6 @@ SyncManager::SyncManager( SyncManager::SyncManager() { sim_ = Simulation_impl::getSimulation(); - // threadSync_ = new EmptyThreadSync(Simulation_impl::getSimulation()); - // rankSync_ = new EmptyRankSync(num_ranks_); } SyncManager::~SyncManager() {} @@ -372,6 +376,8 @@ SyncManager::execute(void) int sig_usr; int sig_alrm; + SimTime_t next_checkpoint_time = MAX_SIMTIME_T; + switch ( next_sync_type_ ) { case RANK: // Need to make sure all threads have reached the sync to @@ -394,7 +400,9 @@ SyncManager::execute(void) real_time_->getSignals(sig_end, sig_usr, sig_alrm); rankSync_->setSignals(sig_end, sig_usr, sig_alrm); } - // Now call the actual RankSync + // Now call the actual RankSync. No barrier needed here + // because all threads will wait on thread 0 before doing + // anything rankSync_->execute(rank_.thread); // Once out of rankSync, signals have been exchanged @@ -406,8 +414,6 @@ SyncManager::execute(void) // Handle signals signals_received = rankSync_->getSignals(sig_end, sig_usr, sig_alrm); - RankExecBarrier_[3].wait(); - // Handle any signals if ( sig_end ) real_time_->performSignal(sig_end); @@ -417,11 +423,16 @@ SyncManager::execute(void) } // Generate checkpoint if needed - checkpoint_->check(); + next_checkpoint_time = checkpoint_->check(getDeliveryTime()); + + // No barrier needed. Either the check failed and no + // checkpoint happened, so no global activity, or the + // checkpoint happened and the last thing that happens in the + // checkpoint code is a barrier. if ( exit_ != nullptr && rank_.thread == 0 ) exit_->check(); - RankExecBarrier_[4].wait(); + RankExecBarrier_[3].wait(); if ( exit_->getGlobalCount() == 0 ) { endSimulation(exit_->getEndTime()); } @@ -442,7 +453,7 @@ SyncManager::execute(void) if ( sig_usr ) real_time_->performSignal(sig_usr); if ( sig_alrm ) real_time_->performSignal(sig_alrm); } - checkpoint_->check(); + next_checkpoint_time = checkpoint_->check(getDeliveryTime()); } if ( /*num_ranks_+.rank == 1*/ min_part_ == MAX_SIMTIME_T ) { @@ -454,8 +465,8 @@ SyncManager::execute(void) default: break; } - computeNextInsert(); - RankExecBarrier_[5].wait(); + computeNextInsert(next_checkpoint_time); + RankExecBarrier_[4].wait(); if ( profile_tools_ ) profile_tools_->syncManagerEnd(); @@ -483,6 +494,7 @@ SyncManager::finalizeLinkConfigurations() // Need to figure out what sync comes first and insert object into // TimeVortex + if ( num_ranks_.rank == 1 && num_ranks_.thread == 1 ) return; computeNextInsert(); } @@ -496,16 +508,32 @@ SyncManager::prepareForComplete() } void -SyncManager::computeNextInsert() +SyncManager::computeNextInsert(SimTime_t next_checkpoint_time) { - if ( rankSync_->getNextSyncTime() <= threadSync_->getNextSyncTime() ) { + SimTime_t next_rank_sync = rankSync_->getNextSyncTime(); + SimTime_t next_thread_sync = threadSync_->getNextSyncTime(); + + SimTime_t next_sync_time = next_thread_sync; + next_sync_type_ = THREAD; + + if ( next_rank_sync <= next_thread_sync ) { next_sync_type_ = RANK; - sim_->insertActivity(rankSync_->getNextSyncTime(), this); + next_sync_time = next_rank_sync; } - else { - next_sync_type_ = THREAD; - sim_->insertActivity(threadSync_->getNextSyncTime(), this); + + if ( next_checkpoint_time < next_sync_time ) { + next_sync_time = next_checkpoint_time; + if ( next_rank_sync == MAX_SIMTIME_T ) { + // Single rank job + next_sync_type_ = THREAD; + } + else { + // If using multiple ranks, must do a full rank sync + next_sync_type_ = RANK; + } } + + sim_->insertActivity(next_sync_time, this); } void @@ -529,32 +557,4 @@ SyncManager::addProfileTool(Profile::SyncProfileTool* tool) profile_tools_->addProfileTool(tool); } -void -SyncManager::serialize_order(SST::Core::Serialization::serializer& ser) -{ - Action::serialize_order(ser); - - // AHHHHHHHHHHHHHHHHH - ser& rank_; // const causes problems - ser& num_ranks_; // const again - - ser& next_rankSync_; - ser& threadSync_; - - ser& next_sync_type_; - ser& min_part_; - - // FIXME: Need to actually figure out how to handle the static - // RankSync object. - if ( ser.mode() == SST::Core::Serialization::serializer::UNPACK ) { rankSync_ = new EmptyRankSync(num_ranks_); } - - // No need to serialize - // RankExecBarrier_ - // LinkUntimedBarrier_ - // sim_ - // exit_ - // profile_tools_ - - // static RankSync* rankSync_; -} } // namespace SST diff --git a/src/sst/core/sync/syncManager.h b/src/sst/core/sync/syncManager.h index 94a3bf41e..a3b08f850 100644 --- a/src/sst/core/sync/syncManager.h +++ b/src/sst/core/sync/syncManager.h @@ -36,7 +36,7 @@ namespace Profile { class SyncProfileTool; } -class RankSync : public SST::Core::Serialization::serializable +class RankSync { public: RankSync(RankInfo num_ranks) : num_ranks_(num_ranks) { link_maps.resize(num_ranks_.rank); } @@ -60,19 +60,14 @@ class RankSync : public SST::Core::Serialization::serializable virtual SimTime_t getNextSyncTime() { return nextSyncTime; } - // void setMaxPeriod(TimeConverter* period) {max_period = period;} + virtual void setRestartTime(SimTime_t time) { nextSyncTime = time; } + TimeConverter* getMaxPeriod() { return max_period; } virtual uint64_t getDataSize() const = 0; - void serialize_order(SST::Core::Serialization::serializer& ser) override - { - ser& nextSyncTime; - ser& max_period; // Unused - // ser& num_ranks; // const so a pain to serialize but don't need it - ser& link_maps; - } - ImplementVirtualSerializable(SST::RankSync) protected : SimTime_t nextSyncTime; +protected: + SimTime_t nextSyncTime; TimeConverter* max_period; const RankInfo num_ranks_; @@ -87,11 +82,9 @@ class RankSync : public SST::Core::Serialization::serializable inline void setLinkDeliveryInfo(Link* link, uintptr_t info) { link->pair_link->setDeliveryInfo(info); } inline Link* getDeliveryLink(Event* ev) { return ev->getDeliveryLink(); } - -private: }; -class ThreadSync : public SST::Core::Serialization::serializable +class ThreadSync { public: ThreadSync() : max_period(nullptr) {} @@ -110,6 +103,7 @@ class ThreadSync : public SST::Core::Serialization::serializable virtual bool getSignals(int& end, int& usr, int& alrm) = 0; virtual SimTime_t getNextSyncTime() { return nextSyncTime; } + virtual void setRestartTime(SimTime_t time) { nextSyncTime = time; } void setMaxPeriod(TimeConverter* period) { max_period = period; } TimeConverter* getMaxPeriod() { return max_period; } @@ -118,16 +112,8 @@ class ThreadSync : public SST::Core::Serialization::serializable virtual void registerLink(const std::string& name, Link* link) = 0; virtual ActivityQueue* registerRemoteLink(int tid, const std::string& name, Link* link) = 0; - void serialize_order(SST::Core::Serialization::serializer& ser) override - { - ser& nextSyncTime; - ser& max_period; // Unused - } - ImplementVirtualSerializable(SST::ThreadSync) - - protected : - - SimTime_t nextSyncTime; +protected: + SimTime_t nextSyncTime; TimeConverter* max_period; void finalizeConfiguration(Link* link) { link->finalizeConfiguration(); } @@ -139,15 +125,13 @@ class ThreadSync : public SST::Core::Serialization::serializable inline void setLinkDeliveryInfo(Link* link, uintptr_t info) { link->pair_link->setDeliveryInfo(info); } inline Link* getDeliveryLink(Event* ev) { return ev->getDeliveryLink(); } - -private: }; class SyncManager : public Action { public: SyncManager( - const RankInfo& rank, const RankInfo& num_ranks, TimeConverter* minPartTC, SimTime_t min_part, + const RankInfo& rank, const RankInfo& num_ranks, SimTime_t min_part, const std::vector& interThreadLatencies, RealTimeManager* real_time); SyncManager(); // For serialization only virtual ~SyncManager(); @@ -168,19 +152,24 @@ class SyncManager : public Action uint64_t getDataSize() const; + void setRestartTime(SimTime_t time) + { + rankSync_->setRestartTime(time); + threadSync_->setRestartTime(time); + } + void addProfileTool(Profile::SyncProfileTool* tool); - void serialize_order(SST::Core::Serialization::serializer& ser) override; - ImplementSerializable(SST::SyncManager) + NotSerializable(SST::SyncManager) + private: + // Enum to track the next sync type enum sync_type_t { RANK, THREAD }; RankInfo rank_; RankInfo num_ranks_; - static Core::ThreadSafe::Barrier RankExecBarrier_[6]; + static Core::ThreadSafe::Barrier RankExecBarrier_[5]; static Core::ThreadSafe::Barrier LinkUntimedBarrier_[3]; - // static SimTime_t min_next_time; - // static int min_count; static RankSync* rankSync_; static SimTime_t next_rankSync_; @@ -196,7 +185,8 @@ class SyncManager : public Action SyncProfileToolList* profile_tools_ = nullptr; - void computeNextInsert(); + void computeNextInsert(SimTime_t next_checkpoint_time = MAX_SIMTIME_T); + void setupSyncObjects(); }; } // namespace SST diff --git a/src/sst/core/sync/syncQueue.cc b/src/sst/core/sync/syncQueue.cc index 158ec4318..8a9573ba8 100644 --- a/src/sst/core/sync/syncQueue.cc +++ b/src/sst/core/sync/syncQueue.cc @@ -32,33 +32,33 @@ namespace SST { using namespace Core::ThreadSafe; using namespace Core::Serialization; -SyncQueue::SyncQueue() : ActivityQueue(), buffer(nullptr), buf_size(0) {} +RankSyncQueue::RankSyncQueue(RankInfo to_rank) : SyncQueue(to_rank), buffer(nullptr), buf_size(0) {} -SyncQueue::~SyncQueue() {} +RankSyncQueue::~RankSyncQueue() {} bool -SyncQueue::empty() +RankSyncQueue::empty() { std::lock_guard lock(slock); return activities.empty(); } int -SyncQueue::size() +RankSyncQueue::size() { std::lock_guard lock(slock); return activities.size(); } void -SyncQueue::insert(Activity* activity) +RankSyncQueue::insert(Activity* activity) { std::lock_guard lock(slock); activities.push_back(activity); } Activity* -SyncQueue::pop() +RankSyncQueue::pop() { // NEED TO FATAL // if ( data.size() == 0 ) return nullptr; @@ -70,21 +70,21 @@ SyncQueue::pop() } Activity* -SyncQueue::front() +RankSyncQueue::front() { // NEED TO FATAL return nullptr; } void -SyncQueue::clear() +RankSyncQueue::clear() { std::lock_guard lock(slock); activities.clear(); } char* -SyncQueue::getData() +RankSyncQueue::getData() { std::lock_guard lock(slock); @@ -98,14 +98,14 @@ SyncQueue::getData() SST_EVENT_PROFILE_SIZE(activities.size(), size) - if ( buf_size < (size + sizeof(SyncQueue::Header)) ) { + if ( buf_size < (size + sizeof(RankSyncQueue::Header)) ) { if ( buffer != nullptr ) { delete[] buffer; } - buf_size = size + sizeof(SyncQueue::Header); + buf_size = size + sizeof(RankSyncQueue::Header); buffer = new char[buf_size]; } - ser.start_packing(buffer + sizeof(SyncQueue::Header), size); + ser.start_packing(buffer + sizeof(RankSyncQueue::Header), size); ser& activities; @@ -116,7 +116,7 @@ SyncQueue::getData() activities.clear(); // Set the size field in the header - static_cast(static_cast(buffer))->buffer_size = size + sizeof(SyncQueue::Header); + static_cast(static_cast(buffer))->buffer_size = size + sizeof(RankSyncQueue::Header); return buffer; } diff --git a/src/sst/core/sync/syncQueue.h b/src/sst/core/sync/syncQueue.h index b77f8b845..d286dffe2 100644 --- a/src/sst/core/sync/syncQueue.h +++ b/src/sst/core/sync/syncQueue.h @@ -13,12 +13,33 @@ #define SST_CORE_SYNC_SYNCQUEUE_H #include "sst/core/activityQueue.h" +#include "sst/core/rankInfo.h" #include "sst/core/threadsafe.h" #include namespace SST { +/** + \class SyncQueue + + Internal API + + Base class for all Sync Queues +*/ +class SyncQueue : public ActivityQueue +{ +public: + SyncQueue(RankInfo to_rank) : ActivityQueue(), to_rank(to_rank) {} + ~SyncQueue() {} + + /** Accessor method to get to_rank */ + RankInfo getToRank() { return to_rank; } + +private: + RankInfo to_rank; +}; + /** * \class SyncQueue * @@ -26,7 +47,7 @@ namespace SST { * * Activity Queue for use by Sync Objects */ -class SyncQueue : public ActivityQueue +class RankSyncQueue : public SyncQueue { public: struct Header @@ -36,8 +57,8 @@ class SyncQueue : public ActivityQueue uint32_t buffer_size; }; - SyncQueue(); - ~SyncQueue(); + RankSyncQueue(RankInfo to_rank); + ~RankSyncQueue(); bool empty() override; int size() override; @@ -61,6 +82,43 @@ class SyncQueue : public ActivityQueue Core::ThreadSafe::Spinlock slock; }; +class ThreadSyncQueue : public SyncQueue +{ +public: + ThreadSyncQueue(RankInfo to_rank) : SyncQueue(to_rank) {} + ~ThreadSyncQueue() {} + + /** Returns true if the queue is empty */ + bool empty() override { return activities.empty(); } + + /** Returns the number of activities in the queue */ + int size() override { return activities.size(); } + + /** Not supported */ + Activity* pop() override + { + // Need to fatal + return nullptr; + } + + /** Insert a new activity into the queue */ + void insert(Activity* activity) override { activities.push_back(activity); } + + /** Not supported */ + Activity* front() override + { + // Need to fatal + return nullptr; + } + + void clear() { activities.clear(); } + + std::vector& getVector() { return activities; } + +private: + std::vector activities; +}; + } // namespace SST #endif // SST_CORE_SYNC_SYNCQUEUE_H diff --git a/src/sst/core/sync/threadSyncDirectSkip.cc b/src/sst/core/sync/threadSyncDirectSkip.cc index df12d203d..5ea0fc791 100644 --- a/src/sst/core/sync/threadSyncDirectSkip.cc +++ b/src/sst/core/sync/threadSyncDirectSkip.cc @@ -102,23 +102,6 @@ ThreadSyncDirectSkip::getSignals(int& end, int& usr, int& alrm) } -void -ThreadSyncDirectSkip::serialize_order(SST::Core::Serialization::serializer& ser) -{ - ThreadSync::serialize_order(ser); - ser& my_max_period; - ser& num_threads; - ser& thread; - ser& localMinimumNextActivityTime; - ser& totalWaitTime; - ser& single_rank; - - // No need to serialize - // sim - // barrier -} - - Core::ThreadSafe::Barrier ThreadSyncDirectSkip::barrier[3]; int ThreadSyncDirectSkip::sig_end_(0); int ThreadSyncDirectSkip::sig_usr_(0); diff --git a/src/sst/core/sync/threadSyncDirectSkip.h b/src/sst/core/sync/threadSyncDirectSkip.h index 0b4c2b1f0..b897e3895 100644 --- a/src/sst/core/sync/threadSyncDirectSkip.h +++ b/src/sst/core/sync/threadSyncDirectSkip.h @@ -15,7 +15,7 @@ #include "sst/core/action.h" #include "sst/core/sst_types.h" #include "sst/core/sync/syncManager.h" -#include "sst/core/sync/threadSyncQueue.h" +#include "sst/core/sync/syncQueue.h" #include @@ -65,9 +65,6 @@ class ThreadSyncDirectSkip : public ThreadSync uint64_t getDataSize() const; - void serialize_order(SST::Core::Serialization::serializer& ser) override; - ImplementSerializable(SST::ThreadSyncDirectSkip) - private: SimTime_t my_max_period; int num_threads; diff --git a/src/sst/core/sync/threadSyncQueue.h b/src/sst/core/sync/threadSyncQueue.h deleted file mode 100644 index e07dadade..000000000 --- a/src/sst/core/sync/threadSyncQueue.h +++ /dev/null @@ -1,60 +0,0 @@ -// Copyright 2009-2024 NTESS. Under the terms -// of Contract DE-NA0003525 with NTESS, the U.S. -// Government retains certain rights in this software. -// -// Copyright (c) 2009-2024, NTESS -// All rights reserved. -// -// This file is part of the SST software package. For license -// information, see the LICENSE file in the top level directory of the -// distribution. - -#ifndef SST_CORE_SYNC_THREADSYNCQUEUE_H -#define SST_CORE_SYNC_THREADSYNCQUEUE_H - -#include "sst/core/activityQueue.h" - -namespace SST { - -/** Base Class for a queue of Activities - */ -class ThreadSyncQueue : public ActivityQueue -{ -public: - ThreadSyncQueue() : ActivityQueue() {} - ~ThreadSyncQueue() {} - - /** Returns true if the queue is empty */ - bool empty() override { return activities.empty(); } - - /** Returns the number of activities in the queue */ - int size() override { return activities.size(); } - - /** Not supported */ - Activity* pop() override - { - // Need to fatal - return nullptr; - } - - /** Insert a new activity into the queue */ - void insert(Activity* activity) override { activities.push_back(activity); } - - /** Not supported */ - Activity* front() override - { - // Need to fatal - return nullptr; - } - - void clear() { activities.clear(); } - - std::vector& getVector() { return activities; } - -private: - std::vector activities; -}; - -} // namespace SST - -#endif // SST_CORE_SYNC_THREADSYNCQUEUE_H diff --git a/src/sst/core/sync/threadSyncSimpleSkip.cc b/src/sst/core/sync/threadSyncSimpleSkip.cc index e1c6f3f77..d8677732d 100644 --- a/src/sst/core/sync/threadSyncSimpleSkip.cc +++ b/src/sst/core/sync/threadSyncSimpleSkip.cc @@ -31,8 +31,9 @@ ThreadSyncSimpleSkip::ThreadSyncSimpleSkip(int num_threads, int thread, Simulati sim(sim), totalWaitTime(0.0) { + RankInfo rank = sim->getRank(); for ( int i = 0; i < num_threads; i++ ) { - queues.push_back(new ThreadSyncQueue()); + queues.push_back(new ThreadSyncQueue(rank)); } if ( sim->getRank().thread == 0 ) { @@ -194,25 +195,6 @@ ThreadSyncSimpleSkip::getSignals(int& end, int& usr, int& alrm) return sig_end_ || sig_usr_ || sig_alrm_; } -void -ThreadSyncSimpleSkip::serialize_order(SST::Core::Serialization::serializer& ser) -{ - ThreadSync::serialize_order(ser); - ser& my_max_period; - ser& num_threads; - ser& thread; - ser& localMinimumNextActivityTime; - ser& totalWaitTime; - ser& single_rank; - - // No need to serialize - // link_map - unused after construction - // sim - regenerate - // barrier - regenerate & guarantee empty during checkpoint - // lock - regenerate - // queues - empty -} - Core::ThreadSafe::Barrier ThreadSyncSimpleSkip::barrier[3]; int ThreadSyncSimpleSkip::sig_end_(0); int ThreadSyncSimpleSkip::sig_usr_(0); diff --git a/src/sst/core/sync/threadSyncSimpleSkip.h b/src/sst/core/sync/threadSyncSimpleSkip.h index 866d72300..56d177831 100644 --- a/src/sst/core/sync/threadSyncSimpleSkip.h +++ b/src/sst/core/sync/threadSyncSimpleSkip.h @@ -15,7 +15,7 @@ #include "sst/core/action.h" #include "sst/core/sst_types.h" #include "sst/core/sync/syncManager.h" -#include "sst/core/sync/threadSyncQueue.h" +#include "sst/core/sync/syncQueue.h" #include "sst/core/threadsafe.h" #include @@ -64,9 +64,6 @@ class ThreadSyncSimpleSkip : public ThreadSync // static void disable() { disabled = true; barrier.disable(); } - void serialize_order(SST::Core::Serialization::serializer& ser) override; - ImplementSerializable(SST::ThreadSyncSimpleSkip) - private: // Stores the links until they can be intialized with the right // remote data. It will hold whichever thread registers the link diff --git a/src/sst/core/testElements/coreTest_SharedObjectComponent.h b/src/sst/core/testElements/coreTest_SharedObjectComponent.h index 035e7380c..8bc8acdef 100644 --- a/src/sst/core/testElements/coreTest_SharedObjectComponent.h +++ b/src/sst/core/testElements/coreTest_SharedObjectComponent.h @@ -84,6 +84,9 @@ class coreTestSharedObjectsComponent : public SST::Component // Optional since there is nothing to document SST_ELI_DOCUMENT_PORTS( + {"left", "Link to the component to the left", { "" } }, + {"right", "Link to the component to the right", { "" } } + ) // Optional since there is nothing to document diff --git a/tests/subcomponent_tests/test_sc_2a.py b/tests/subcomponent_tests/test_sc_2a.py index bd336e7ec..45252b6a9 100644 --- a/tests/subcomponent_tests/test_sc_2a.py +++ b/tests/subcomponent_tests/test_sc_2a.py @@ -13,6 +13,7 @@ # Define SST core options sst.setProgramOption("stop-at", "10us") +sst.setProgramOption("partitioner", "self") verbose = 0 if len(sys.argv) > 1: @@ -41,5 +42,17 @@ link1 = sst.Link("myLink1") link1.connect((loader0, "port1", "5ns"), (loader1, "port1", "5ns")) +# Do the paritioning +num_ranks = sst.getMPIRankCount() +num_threads = sst.getThreadCount() + +loader0.setRank(0,0) +if num_ranks >= 2: + loader1.setRank(1,0) +elif num_threads > 1: + loader1.setRank(0,1) +else: + loader1.setRank(0,0) + #sst.enableAllStatisticsForAllComponents() sst.setStatisticLoadLevel(1) diff --git a/tests/subcomponent_tests/test_sc_2u.py b/tests/subcomponent_tests/test_sc_2u.py index 63d3ef516..443b80715 100644 --- a/tests/subcomponent_tests/test_sc_2u.py +++ b/tests/subcomponent_tests/test_sc_2u.py @@ -13,6 +13,7 @@ # Define SST core options sst.setProgramOption("stop-at", "10us") +sst.setProgramOption("partitioner", "self") verbose = 0 if len(sys.argv) > 1: @@ -55,4 +56,16 @@ link1 = sst.Link("myLink1") link1.connect((sub0_1, "sendPort", "5ns"), (sub1_1, "recvPort", "5ns")) +# Do the paritioning +num_ranks = sst.getMPIRankCount() +num_threads = sst.getThreadCount() + +loader0.setRank(0,0) +if num_ranks >= 2: + loader1.setRank(1,0) +elif num_threads > 1: + loader1.setRank(0,1) +else: + loader1.setRank(0,0) + sst.setStatisticLoadLevel(1) diff --git a/tests/subcomponent_tests/test_sc_2u2a.py b/tests/subcomponent_tests/test_sc_2u2a.py index 248a5cfcf..928715f3d 100644 --- a/tests/subcomponent_tests/test_sc_2u2a.py +++ b/tests/subcomponent_tests/test_sc_2u2a.py @@ -13,6 +13,7 @@ # Define SST core options sst.setProgramOption("stop-at", "10us") +sst.setProgramOption("partitioner","self") verbose = 0 if len(sys.argv) > 1: @@ -70,4 +71,16 @@ link1_1.connect((sub0_1, "slot_port1", "5ns"), (sub1_1, "slot_port1", "5ns")) +# Do the paritioning +num_ranks = sst.getMPIRankCount() +num_threads = sst.getThreadCount() + +loader0.setRank(0,0) +if num_ranks >= 2: + loader1.setRank(1,0) +elif num_threads > 1: + loader1.setRank(0,1) +else: + loader1.setRank(0,0) + sst.setStatisticLoadLevel(1) diff --git a/tests/subcomponent_tests/test_sc_2u2u.py b/tests/subcomponent_tests/test_sc_2u2u.py index 7d94c2317..aff605738 100644 --- a/tests/subcomponent_tests/test_sc_2u2u.py +++ b/tests/subcomponent_tests/test_sc_2u2u.py @@ -13,6 +13,7 @@ # Define SST core options sst.setProgramOption("stop-at", "10us") +sst.setProgramOption("partitioner", "self") verbose = 0 if len(sys.argv) > 1: @@ -20,7 +21,7 @@ # Set up senders using slots and user subcomponents loader0 = sst.Component("Loader0", "coreTestElement.SubComponentLoader") -loader0.addParam("clock", "1.5GHz") +loader0.addParam("clock", "0.15GHz") loader0.addParam("verbose", verbose) loader0.enableAllStatistics() @@ -88,5 +89,16 @@ link1_1 = sst.Link("myLink1_1") link1_1.connect((sub0_1_1, "sendPort", "5ns"), (sub1_1_1, "recvPort", "5ns")) +# Do the paritioning +num_ranks = sst.getMPIRankCount() +num_threads = sst.getThreadCount() + +loader0.setRank(0,0) +if num_ranks >= 2: + loader1.setRank(1,0) +elif num_threads > 1: + loader1.setRank(0,1) +else: + loader1.setRank(0,0) sst.setStatisticLoadLevel(1) diff --git a/tests/subcomponent_tests/test_sc_2ua.py b/tests/subcomponent_tests/test_sc_2ua.py index dec1f92b1..c362a77a7 100644 --- a/tests/subcomponent_tests/test_sc_2ua.py +++ b/tests/subcomponent_tests/test_sc_2ua.py @@ -13,6 +13,7 @@ # Define SST core options sst.setProgramOption("stop-at", "10us") +sst.setProgramOption("partitioner","self") verbose = 0 if len(sys.argv) > 1: @@ -60,4 +61,16 @@ link1.connect((sub0_1, "slot_port0", "5ns"), (sub1_1, "slot_port0", "5ns")) +# Do the paritioning +num_ranks = sst.getMPIRankCount() +num_threads = sst.getThreadCount() + +loader0.setRank(0,0) +if num_ranks >= 2: + loader1.setRank(1,0) +elif num_threads > 1: + loader1.setRank(0,1) +else: + loader1.setRank(0,0) + sst.setStatisticLoadLevel(1) diff --git a/tests/subcomponent_tests/test_sc_2uu.py b/tests/subcomponent_tests/test_sc_2uu.py index 03af36aa1..06f211d06 100644 --- a/tests/subcomponent_tests/test_sc_2uu.py +++ b/tests/subcomponent_tests/test_sc_2uu.py @@ -13,6 +13,7 @@ # Define SST core options sst.setProgramOption("stop-at", "10us") +sst.setProgramOption("partitioner","self") verbose = 0 if len(sys.argv) > 1: @@ -66,4 +67,16 @@ link1.connect((sub0_1_0, "sendPort", "5ns"), (sub1_1_0, "recvPort", "5ns")) +# Do the paritioning +num_ranks = sst.getMPIRankCount() +num_threads = sst.getThreadCount() + +loader0.setRank(0,0) +if num_ranks >= 2: + loader1.setRank(1,0) +elif num_threads > 1: + loader1.setRank(0,1) +else: + loader1.setRank(0,0) + sst.setStatisticLoadLevel(1) diff --git a/tests/subcomponent_tests/test_sc_a.py b/tests/subcomponent_tests/test_sc_a.py index 985909fe4..e5b005ef1 100644 --- a/tests/subcomponent_tests/test_sc_a.py +++ b/tests/subcomponent_tests/test_sc_a.py @@ -13,6 +13,7 @@ # Define SST core options sst.setProgramOption("stop-at", "10us") +sst.setProgramOption("partitioner","self") verbose = 0 if len(sys.argv) > 1: @@ -37,4 +38,16 @@ link = sst.Link("myLink") link.connect((loader0, "port0", "5ns"), (loader1, "port0", "5ns")) +# Do the paritioning +num_ranks = sst.getMPIRankCount() +num_threads = sst.getThreadCount() + +loader0.setRank(0,0) +if num_ranks >= 2: + loader1.setRank(1,0) +elif num_threads > 1: + loader1.setRank(0,1) +else: + loader1.setRank(0,0) + sst.setStatisticLoadLevel(1) diff --git a/tests/subcomponent_tests/test_sc_u.py b/tests/subcomponent_tests/test_sc_u.py index 60118a33b..391312481 100644 --- a/tests/subcomponent_tests/test_sc_u.py +++ b/tests/subcomponent_tests/test_sc_u.py @@ -13,6 +13,7 @@ # Define SST core options sst.setProgramOption("stop-at", "10us") +sst.setProgramOption("partitioner","self") verbose = 0 if len(sys.argv) > 1: @@ -42,4 +43,16 @@ link0 = sst.Link("myLink0") link0.connect((sub0, "sendPort", "5ns"), (sub1, "recvPort", "5ns")) +# Do the paritioning +num_ranks = sst.getMPIRankCount() +num_threads = sst.getThreadCount() + +loader0.setRank(0,0) +if num_ranks >= 2: + loader1.setRank(1,0) +elif num_threads > 1: + loader1.setRank(0,1) +else: + loader1.setRank(0,0) + sst.setStatisticLoadLevel(1) diff --git a/tests/subcomponent_tests/test_sc_u2a.py b/tests/subcomponent_tests/test_sc_u2a.py index cfdb558ba..4b41dd897 100644 --- a/tests/subcomponent_tests/test_sc_u2a.py +++ b/tests/subcomponent_tests/test_sc_u2a.py @@ -13,6 +13,7 @@ # Define SST core options sst.setProgramOption("stop-at", "10us") +sst.setProgramOption("partitioner","self") verbose = 0 if len(sys.argv) > 1: @@ -51,4 +52,16 @@ link1.connect((sub0, "slot_port1", "5ns"), (sub1, "slot_port1", "5ns")) +# Do the paritioning +num_ranks = sst.getMPIRankCount() +num_threads = sst.getThreadCount() + +loader0.setRank(0,0) +if num_ranks >= 2: + loader1.setRank(1,0) +elif num_threads > 1: + loader1.setRank(0,1) +else: + loader1.setRank(0,0) + sst.setStatisticLoadLevel(1) diff --git a/tests/subcomponent_tests/test_sc_u2u.py b/tests/subcomponent_tests/test_sc_u2u.py index de9958585..a878d6f43 100644 --- a/tests/subcomponent_tests/test_sc_u2u.py +++ b/tests/subcomponent_tests/test_sc_u2u.py @@ -13,6 +13,7 @@ # Define SST core options sst.setProgramOption("stop-at", "10us") +sst.setProgramOption("partitioner","self") verbose = 0 if len(sys.argv) > 1: @@ -60,4 +61,16 @@ link1 = sst.Link("myLink1") link1.connect((sub0_1, "sendPort", "5ns"), (sub1_1, "recvPort", "5ns")) +# Do the paritioning +num_ranks = sst.getMPIRankCount() +num_threads = sst.getThreadCount() + +loader0.setRank(0,0) +if num_ranks >= 2: + loader1.setRank(1,0) +elif num_threads > 1: + loader1.setRank(0,1) +else: + loader1.setRank(0,0) + sst.setStatisticLoadLevel(1) diff --git a/tests/subcomponent_tests/test_sc_ua.py b/tests/subcomponent_tests/test_sc_ua.py index 6132f6fec..34654958a 100644 --- a/tests/subcomponent_tests/test_sc_ua.py +++ b/tests/subcomponent_tests/test_sc_ua.py @@ -13,6 +13,7 @@ # Define SST core options sst.setProgramOption("stop-at", "10us") +sst.setProgramOption("partitioner","self") verbose = 0 if len(sys.argv) > 1: @@ -46,4 +47,16 @@ link.connect((sub0, "slot_port0", "5ns"), (sub1, "slot_port0", "5ns")) +# Do the paritioning +num_ranks = sst.getMPIRankCount() +num_threads = sst.getThreadCount() + +loader0.setRank(0,0) +if num_ranks >= 2: + loader1.setRank(1,0) +elif num_threads > 1: + loader1.setRank(0,1) +else: + loader1.setRank(0,0) + sst.setStatisticLoadLevel(1) diff --git a/tests/subcomponent_tests/test_sc_uu.py b/tests/subcomponent_tests/test_sc_uu.py index d776aaf9b..8b6ecae1f 100644 --- a/tests/subcomponent_tests/test_sc_uu.py +++ b/tests/subcomponent_tests/test_sc_uu.py @@ -13,6 +13,7 @@ # Define SST core options sst.setProgramOption("stop-at", "10us") +sst.setProgramOption("partitioner","self") verbose = 0 if len(sys.argv) > 1: @@ -47,4 +48,16 @@ link.connect((sub0_0, "sendPort", "5ns"), (sub1_0, "recvPort", "5ns")) +# Do the paritioning +num_ranks = sst.getMPIRankCount() +num_threads = sst.getThreadCount() + +loader0.setRank(0,0) +if num_ranks >= 2: + loader1.setRank(1,0) +elif num_threads > 1: + loader1.setRank(0,1) +else: + loader1.setRank(0,0) + sst.setStatisticLoadLevel(1) diff --git a/tests/test_Checkpoint.py b/tests/test_Checkpoint.py index fec509b7a..747ca14f3 100644 --- a/tests/test_Checkpoint.py +++ b/tests/test_Checkpoint.py @@ -49,3 +49,14 @@ }) #sst.enableAllStatisticsForAllComponents() sst.enableAllStatisticsForAllComponents({ "rate" : "100us" }) + +# Ensure that this will work for all parallelism. The only case we +# need to worry about is if comp_count <= num_threads and we have more +# than one rank. +num_threads = sst.getThreadCount() +num_ranks = sst.getMPIRankCount() +if num_threads >= comp_count and num_ranks > 1: + sst.setProgramOption("partitioner","self") + for x,comp in enumerate(comps): + comp.setRank(x % num_ranks, x // num_ranks) + diff --git a/tests/test_SharedObject.py b/tests/test_SharedObject.py index 5559ff65c..424bb1a54 100644 --- a/tests/test_SharedObject.py +++ b/tests/test_SharedObject.py @@ -29,7 +29,13 @@ num_entities = int(params["num_entities"]) + link = sst.Link("left0") + last_link = link for x in range(num_entities): comp = sst.Component("obj%d"%x, "coreTestElement.coreTestSharedObjectsComponent") comp.addParams(params) comp.addParam("myid",x) + comp.addLink(last_link, "left", "4ns") + if x == (num_entities - 1): last_link = link + else: last_link = sst.Link("left{0}".format((x+1))) + comp.addLink(last_link, "right", "4ns") diff --git a/tests/testsuite_default_Checkpoint.py b/tests/testsuite_default_Checkpoint.py index c869c4032..ab7b52874 100644 --- a/tests/testsuite_default_Checkpoint.py +++ b/tests/testsuite_default_Checkpoint.py @@ -31,95 +31,61 @@ def tearDown(self): ##### parallelerr = "Test only suports serial execution" - @unittest.skipIf(testing_check_get_num_ranks() > 1, parallelerr) - @unittest.skipIf(testing_check_get_num_threads() > 1, parallelerr) def test_Checkpoint(self): - self.checkpoint_test_template("Checkpoint", "500us", "500000000_0") - - @unittest.skipIf(testing_check_get_num_ranks() > 1, parallelerr) - @unittest.skipIf(testing_check_get_num_threads() > 1, parallelerr) + self.checkpoint_test_template("Checkpoint", "500us", "0_500000000") + def test_Checkpoint_SubComponent_sc_2a(self): - self.checkpoint_test_template("sc_2a", "2500ns", "2500000_0", subcomp=True, modelparams="1") + self.checkpoint_test_template("sc_2a", "2500ns", "0_2500000", subcomp=True, modelparams="1") - @unittest.skipIf(testing_check_get_num_ranks() > 1, parallelerr) - @unittest.skipIf(testing_check_get_num_threads() > 1, parallelerr) def test_Checkpoint_SubComponent_sc_2u2u(self): - self.checkpoint_test_template("sc_2u2u", "2500ns", "2500000_0", subcomp=True, modelparams="1") + self.checkpoint_test_template("sc_2u2u", "2500ns", "0_2500000", subcomp=True, modelparams="1") - @unittest.skipIf(testing_check_get_num_ranks() > 1, parallelerr) - @unittest.skipIf(testing_check_get_num_threads() > 1, parallelerr) def test_Checkpoint_sc_2u(self): - self.checkpoint_test_template("sc_2u", "2500ns", "2500000_0", subcomp=True, modelparams="1") + self.checkpoint_test_template("sc_2u", "2500ns", "0_2500000", subcomp=True, modelparams="1") - @unittest.skipIf(testing_check_get_num_ranks() > 1, parallelerr) - @unittest.skipIf(testing_check_get_num_threads() > 1, parallelerr) def test_Checkpoint_sc_a(self): - self.checkpoint_test_template("sc_a", "2500ns", "2500000_0", subcomp=True, modelparams="1") + self.checkpoint_test_template("sc_a", "2500ns", "0_2500000", subcomp=True, modelparams="1") - @unittest.skipIf(testing_check_get_num_ranks() > 1, parallelerr) - @unittest.skipIf(testing_check_get_num_threads() > 1, parallelerr) def test_Checkpoint_sc_u2u(self): - self.checkpoint_test_template("sc_u2u", "2500ns", "2500000_0", subcomp=True, modelparams="1") + self.checkpoint_test_template("sc_u2u", "2500ns", "0_2500000", subcomp=True, modelparams="1") - @unittest.skipIf(testing_check_get_num_ranks() > 1, parallelerr) - @unittest.skipIf(testing_check_get_num_threads() > 1, parallelerr) def test_Checkpoint_sc_u(self): - self.checkpoint_test_template("sc_u", "2500ns", "2500000_0", subcomp=True, modelparams="1") + self.checkpoint_test_template("sc_u", "2500ns", "0_2500000", subcomp=True, modelparams="1") - @unittest.skipIf(testing_check_get_num_ranks() > 1, parallelerr) - @unittest.skipIf(testing_check_get_num_threads() > 1, parallelerr) def test_Checkpoint_sc_2u2a(self): - self.checkpoint_test_template("sc_2u2a", "2500ns", "2500000_0", subcomp=True, modelparams="1") + self.checkpoint_test_template("sc_2u2a", "2500ns", "0_2500000", subcomp=True, modelparams="1") - @unittest.skipIf(testing_check_get_num_ranks() > 1, parallelerr) - @unittest.skipIf(testing_check_get_num_threads() > 1, parallelerr) def test_Checkpoint_sc_2ua(self): - self.checkpoint_test_template("sc_2ua", "2500ns", "2500000_0", subcomp=True, modelparams="1") + self.checkpoint_test_template("sc_2ua", "2500ns", "0_2500000", subcomp=True, modelparams="1") - @unittest.skipIf(testing_check_get_num_ranks() > 1, parallelerr) - @unittest.skipIf(testing_check_get_num_threads() > 1, parallelerr) def test_Checkpoint_sc_2uu(self): - self.checkpoint_test_template("sc_2uu", "2500ns", "2500000_0", subcomp=True, modelparams="1") + self.checkpoint_test_template("sc_2uu", "2500ns", "0_2500000", subcomp=True, modelparams="1") - @unittest.skipIf(testing_check_get_num_ranks() > 1, parallelerr) - @unittest.skipIf(testing_check_get_num_threads() > 1, parallelerr) def test_Checkpoint_sc_u2a(self): - self.checkpoint_test_template("sc_u2a", "2500ns", "2500000_0", subcomp=True, modelparams="1") + self.checkpoint_test_template("sc_u2a", "2500ns", "0_2500000", subcomp=True, modelparams="1") - @unittest.skipIf(testing_check_get_num_ranks() > 1, parallelerr) - @unittest.skipIf(testing_check_get_num_threads() > 1, parallelerr) def test_Checkpoint_sc_ua(self): - self.checkpoint_test_template("sc_ua", "2500ns", "2500000_0", subcomp=True, modelparams="1") + self.checkpoint_test_template("sc_ua", "2500ns", "0_2500000", subcomp=True, modelparams="1") - @unittest.skipIf(testing_check_get_num_ranks() > 1, parallelerr) - @unittest.skipIf(testing_check_get_num_threads() > 1, parallelerr) def test_Checkpoint_sc_uu(self): - self.checkpoint_test_template("sc_uu", "2500ns", "2500000_0", subcomp=True, modelparams="1") + self.checkpoint_test_template("sc_uu", "2500ns", "0_2500000", subcomp=True, modelparams="1") - @unittest.skipIf(testing_check_get_num_ranks() > 1, parallelerr) - @unittest.skipIf(testing_check_get_num_threads() > 1, parallelerr) def test_Checkpoint_SharedObject_array(self): - self.checkpoint_test_template("SharedObject", "6ns", "6000_0", modelparams="--param=object_type:array --param=num_entities:12 --param=full_initialization:true --param=checkpoint:true", outstr = "SharedObject_array") - - @unittest.skipIf(testing_check_get_num_ranks() > 1, parallelerr) - @unittest.skipIf(testing_check_get_num_threads() > 1, parallelerr) + self.checkpoint_test_template("SharedObject", "6ns", "0_6000", modelparams="--param=object_type:array --param=num_entities:12 --param=full_initialization:true --param=checkpoint:true", outstr = "SharedObject_array") + def test_Checkpoint_SharedObject_bool_array(self): - self.checkpoint_test_template("SharedObject", "6ns", "6000_0", modelparams="--param=object_type:bool_array --param=num_entities:12 --param=full_initialization:true --param=checkpoint:true", outstr = "SharedObject_bool_array") + self.checkpoint_test_template("SharedObject", "6ns", "0_6000", modelparams="--param=object_type:bool_array --param=num_entities:12 --param=full_initialization:true --param=checkpoint:true", outstr = "SharedObject_bool_array") - @unittest.skipIf(testing_check_get_num_ranks() > 1, parallelerr) - @unittest.skipIf(testing_check_get_num_threads() > 1, parallelerr) def test_Checkpoint_SharedObject_map(self): - self.checkpoint_test_template("SharedObject", "6ns", "6000_0", modelparams="--param=object_type:map --param=num_entities:12 --param=full_initialization:true --param=checkpoint:true", outstr = "SharedObject_map") + self.checkpoint_test_template("SharedObject", "6ns", "0_6000", modelparams="--param=object_type:map --param=num_entities:12 --param=full_initialization:true --param=checkpoint:true", outstr = "SharedObject_map") - @unittest.skipIf(testing_check_get_num_ranks() > 1, parallelerr) - @unittest.skipIf(testing_check_get_num_threads() > 1, parallelerr) def test_Checkpoint_SharedObject_set(self): - self.checkpoint_test_template("SharedObject", "6ns", "6000_0", modelparams="--param=object_type:set --param=num_entities:12 --param=full_initialization:true --param=checkpoint:true", outstr = "SharedObject_set") + self.checkpoint_test_template("SharedObject", "6ns", "0_6000", modelparams="--param=object_type:set --param=num_entities:12 --param=full_initialization:true --param=checkpoint:true", outstr = "SharedObject_set") @unittest.skipIf(testing_check_get_num_ranks() > 1, parallelerr) @unittest.skipIf(testing_check_get_num_threads() > 1, parallelerr) def test_Checkpoint_Module(self): - self.checkpoint_test_template("Module", "25us", "50000000_1") + self.checkpoint_test_template("Module", "25us", "1_50000000") ##### # testtype: which test to run @@ -134,7 +100,7 @@ def checkpoint_test_template(self, testtype, cptfreq, cptrestart, subcomp=False, testsuitedir = self.get_testsuite_dir() outdir = test_output_get_run_dir() - + if outstr != "": teststr = outstr else: @@ -150,7 +116,7 @@ def checkpoint_test_template(self, testtype, cptfreq, cptrestart, subcomp=False, self.run_sst(sdlfile_generate, outfile_generate, other_args=options_checkpoint) # Run from restart - sdlfile_restart = "{0}/{1}_{2}.sstcpt".format(outdir,teststr,cptrestart) + sdlfile_restart = "{0}/{1}/{1}_{2}/{1}_{2}.sstcpt".format(outdir,teststr,cptrestart) outfile_restart = "{0}/test_Checkpoint_{1}_restart.out".format(outdir, teststr) options_restart = "--load-checkpoint" self.run_sst(sdlfile_restart, outfile_restart, other_args=options_restart)