Skip to content

Commit

Permalink
[alpaka] Static splitting of event streams across multiple backends
Browse files Browse the repository at this point in the history
Each event stream is associated to a different backend, according to the
optional weight specified on the command line.
  • Loading branch information
fwyzard committed May 1, 2022
1 parent 472c9da commit 50570e8
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 64 deletions.
11 changes: 11 additions & 0 deletions src/alpaka/AlpakaCore/backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,15 @@

enum class Backend { SERIAL, TBB, CUDA, HIP };

inline std::string const& name(Backend backend) {
static const std::string names[] = {"serial_sync", "tbb_async", "cuda_async", "rocm_async"};
return names[static_cast<int>(backend)];
}

template <typename T>
inline T& operator<<(T& out, Backend backend) {
out << name(backend);
return out;
}

#endif // AlpakaCore_backend_h
21 changes: 18 additions & 3 deletions src/alpaka/bin/EventProcessor.cc
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <cmath>
#include <exception>
#include <filesystem>
#include <string>
Expand All @@ -13,7 +14,7 @@ namespace edm {
EventProcessor::EventProcessor(int maxEvents,
int runForMinutes,
int numberOfStreams,
std::vector<std::string> const& path,
Alternatives alternatives,
std::vector<std::string> const& esproducers,
std::filesystem::path const& datadir,
bool validation)
Expand All @@ -24,9 +25,23 @@ namespace edm {
esp->produce(eventSetup_);
}

// normalise the total weight to the number of streams
float total = 0.;
for (auto const& alternative : alternatives) {
total += alternative.weight;
}
//schedules_.reserve(numberOfStreams);
for (int i = 0; i < numberOfStreams; ++i) {
schedules_.emplace_back(registry_, pluginManager_, &source_, &eventSetup_, i, path);
float cumulative = 0.;
int lower_range = 0;
int upper_range = 0;
for (auto& alternative : alternatives) {
cumulative += alternative.weight;
lower_range = upper_range;
upper_range = static_cast<int>(std::round(cumulative * numberOfStreams / total));
for (int i = lower_range; i < upper_range; ++i) {
schedules_.emplace_back(registry_, pluginManager_, &source_, &eventSetup_, i, alternative.path);
}
streamsPerBackend_.emplace_back(alternative.backend, upper_range - lower_range);
}
}

Expand Down
17 changes: 16 additions & 1 deletion src/alpaka/bin/EventProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,39 @@
#include <string>
#include <vector>

#include "AlpakaCore/backend.h"
#include "Framework/EventSetup.h"

#include "PluginManager.h"
#include "StreamSchedule.h"
#include "Source.h"

namespace edm {
struct Alternative {
Alternative() = default;
Alternative(Backend backend, float weight, std::vector<std::string> path)
: backend{backend}, weight{weight}, path{std::move(path)} {}

Backend backend;
float weight;
std::vector<std::string> path;
};

using Alternatives = std::vector<Alternative>;

class EventProcessor {
public:
explicit EventProcessor(int maxEvents,
int runForMinutes,
int numberOfStreams,
std::vector<std::string> const& path,
Alternatives alternatives,
std::vector<std::string> const& esproducers,
std::filesystem::path const& datadir,
bool validation);

int maxEvents() const { return source_.maxEvents(); }
int processedEvents() const { return source_.processedEvents(); }
std::vector<std::pair<Backend, int>> const& backends() const { return streamsPerBackend_; }

void runToCompletion();

Expand All @@ -35,6 +49,7 @@ namespace edm {
Source source_;
EventSetup eventSetup_;
std::vector<StreamSchedule> schedules_;
std::vector<std::pair<Backend, int>> streamsPerBackend_;
};
} // namespace edm

Expand Down
175 changes: 115 additions & 60 deletions src/alpaka/bin/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <iomanip>
#include <ios>
#include <iostream>
#include <unordered_map>
#include <stdexcept>
#include <string>
#include <utility>
Expand Down Expand Up @@ -38,7 +39,7 @@ namespace {
<< "[--hip] "
#endif
<< "[--numberOfThreads NT] [--numberOfStreams NS] [--maxEvents ME] [--data PATH] "
"[--transfer] [--validation]\n\n"
"[--transfer] [--validation] [--histogram]\n\n"
<< "Options\n"
#ifdef ALPAKA_ACC_CPU_B_SEQ_T_SEQ_PRESENT
<< " --serial Use CPU Serial backend\n"
Expand All @@ -64,13 +65,62 @@ namespace {
<< " --empty Ignore all producers (for testing only)\n"
<< std::endl;
}

} // namespace

bool getOptionalArgument(std::vector<std::string> const& args, std::vector<std::string>::iterator& i, int& value) {
auto it = i;
++it;
if (it == args.end()) {
return false;
}
try {
value = std::stoi(*it);
++i;
return true;
} catch (...) {
return false;
}
}

bool getOptionalArgument(std::vector<std::string> const& args, std::vector<std::string>::iterator& i, float& value) {
auto it = i;
++it;
if (it == args.end()) {
return false;
}
try {
value = std::stof(*it);
++i;
return true;
} catch (...) {
return false;
}
}

bool getOptionalArgument(std::vector<std::string> const& args,
std::vector<std::string>::iterator& i,
std::filesystem::path& value) {
auto it = i;
++it;
if (it == args.end()) {
return false;
}
value = *it;
return true;
}

template <typename T>
void getArgument(std::vector<std::string> const& args, std::vector<std::string>::iterator& i, T& value) {
if (not getOptionalArgument(args, i, value)) {
std::cerr << "error: " << *i << " expects an argument" << std::endl;
exit(EXIT_FAILURE);
}
}

int main(int argc, char** argv) {
// Parse command line arguments
std::vector<std::string> args(argv, argv + argc);
std::vector<Backend> backends;
std::unordered_map<Backend, float> backends;
int numberOfThreads = 1;
int numberOfStreams = 0;
int maxEvents = -1;
Expand All @@ -86,35 +136,38 @@ int main(int argc, char** argv) {
return EXIT_SUCCESS;
#ifdef ALPAKA_ACC_CPU_B_SEQ_T_SEQ_PRESENT
} else if (*i == "--serial") {
backends.emplace_back(Backend::SERIAL);
float weight = 1.;
getOptionalArgument(args, i, weight);
backends.insert_or_assign(Backend::SERIAL, weight);
#endif
#ifdef ALPAKA_ACC_CPU_B_TBB_T_SEQ_PRESENT
} else if (*i == "--tbb") {
backends.emplace_back(Backend::TBB);
float weight = 1.;
getOptionalArgument(args, i, weight);
backends.insert_or_assign(Backend::TBB, weight);
#endif
#ifdef ALPAKA_ACC_GPU_CUDA_PRESENT
} else if (*i == "--cuda") {
backends.emplace_back(Backend::CUDA);
float weight = 1.;
getOptionalArgument(args, i, weight);
backends.insert_or_assign(Backend::CUDA, weight);
#endif
#ifdef ALPAKA_ACC_GPU_HIP_PRESENT
} else if (*i == "--hip") {
backends.emplace_back(Backend::HIP);
float weight = 1.;
getOptionalArgument(args, i, weight);
backends.insert_or_assign(Backend::HIP, weight);
#endif
} else if (*i == "--numberOfThreads") {
++i;
numberOfThreads = std::stoi(*i);
getArgument(args, i, numberOfThreads);
} else if (*i == "--numberOfStreams") {
++i;
numberOfStreams = std::stoi(*i);
getArgument(args, i, numberOfStreams);
} else if (*i == "--maxEvents") {
++i;
maxEvents = std::stoi(*i);
getArgument(args, i, maxEvents);
} else if (*i == "--runForMinutes") {
++i;
runForMinutes = std::stoi(*i);
getArgument(args, i, runForMinutes);
} else if (*i == "--data") {
++i;
datadir = *i;
getArgument(args, i, datadir);
} else if (*i == "--transfer") {
transfer = true;
} else if (*i == "--validation") {
Expand Down Expand Up @@ -151,75 +204,77 @@ int main(int argc, char** argv) {

// Initialiase the selected backends
#ifdef ALPAKA_ACC_CPU_B_SEQ_T_SEQ_PRESENT
if (std::find(backends.begin(), backends.end(), Backend::SERIAL) != backends.end()) {
if (backends.find(Backend::SERIAL) != backends.end()) {
cms::alpakatools::initialise<alpaka_serial_sync::Platform>();
}
#endif
#ifdef ALPAKA_ACC_CPU_B_TBB_T_SEQ_PRESENT
if (std::find(backends.begin(), backends.end(), Backend::TBB) != backends.end()) {
if (backends.find(Backend::TBB) != backends.end()) {
cms::alpakatools::initialise<alpaka_tbb_async::Platform>();
}
#endif
#ifdef ALPAKA_ACC_GPU_CUDA_PRESENT
if (std::find(backends.begin(), backends.end(), Backend::CUDA) != backends.end()) {
if (backends.find(Backend::CUDA) != backends.end()) {
cms::alpakatools::initialise<alpaka_cuda_async::Platform>();
}
#endif
#ifdef ALPAKA_ACC_GPU_HIP_PRESENT
if (std::find(backends.begin(), backends.end(), Backend::HIP) != backends.end()) {
if (backends.find(Backend::HIP) != backends.end()) {
cms::alpakatools::initialise<alpaka_rocm_async::Platform>();
}
#endif

// Initialize EventProcessor
std::vector<std::string> edmodules;
std::vector<std::string> esmodules;
edm::Alternatives alternatives;
if (not empty) {
// host-only ESModules
esmodules = {"BeamSpotESProducer", "SiPixelFedIdsESProducer"};
auto addModules = [&](std::string const& accelerator_namespace, Backend backend) {
if (std::find(backends.begin(), backends.end(), backend) != backends.end()) {
edmodules.emplace_back(accelerator_namespace + "::" + "BeamSpotToAlpaka");
edmodules.emplace_back(accelerator_namespace + "::" + "SiPixelRawToCluster");
edmodules.emplace_back(accelerator_namespace + "::" + "SiPixelRecHitAlpaka");
edmodules.emplace_back(accelerator_namespace + "::" + "CAHitNtupletAlpaka");
edmodules.emplace_back(accelerator_namespace + "::" + "PixelVertexProducerAlpaka");
if (transfer) {
edmodules.emplace_back(accelerator_namespace + "::" + "PixelTrackSoAFromAlpaka");
edmodules.emplace_back(accelerator_namespace + "::" + "PixelVertexSoAFromAlpaka");
}
if (validation) {
edmodules.emplace_back(accelerator_namespace + "::" + "CountValidator");
}
if (histogram) {
edmodules.emplace_back(accelerator_namespace + "::" + "HistoValidator");
}
esmodules.emplace_back(accelerator_namespace + "::" + "SiPixelFedCablingMapESProducer");
esmodules.emplace_back(accelerator_namespace + "::" + "SiPixelGainCalibrationForHLTESProducer");
esmodules.emplace_back(accelerator_namespace + "::" + "PixelCPEFastESProducer");
for (auto const& [backend, weight] : backends) {
std::string prefix = "alpaka_" + name(backend) + "::";
// "portable" ESModules
esmodules.emplace_back(prefix + "SiPixelFedCablingMapESProducer");
esmodules.emplace_back(prefix + "SiPixelGainCalibrationForHLTESProducer");
esmodules.emplace_back(prefix + "PixelCPEFastESProducer");
// "portable" EDModules
std::vector<std::string> edmodules;
edmodules.emplace_back(prefix + "BeamSpotToAlpaka");
edmodules.emplace_back(prefix + "SiPixelRawToCluster");
edmodules.emplace_back(prefix + "SiPixelRecHitAlpaka");
edmodules.emplace_back(prefix + "CAHitNtupletAlpaka");
edmodules.emplace_back(prefix + "PixelVertexProducerAlpaka");
if (transfer) {
edmodules.emplace_back(prefix + "PixelTrackSoAFromAlpaka");
edmodules.emplace_back(prefix + "PixelVertexSoAFromAlpaka");
}
};
#ifdef ALPAKA_ACC_CPU_B_SEQ_T_SEQ_PRESENT
addModules("alpaka_serial_sync", Backend::SERIAL);
#endif
#ifdef ALPAKA_ACC_CPU_B_TBB_T_SEQ_PRESENT
addModules("alpaka_tbb_async", Backend::TBB);
#endif
#ifdef ALPAKA_ACC_GPU_CUDA_PRESENT
addModules("alpaka_cuda_async", Backend::CUDA);
#endif
#ifdef ALPAKA_ACC_GPU_HIP_PRESENT
addModules("alpaka_rocm_async", Backend::HIP);
#endif
if (validation) {
edmodules.emplace_back(prefix + "CountValidator");
}
if (histogram) {
edmodules.emplace_back(prefix + "HistoValidator");
}
alternatives.emplace_back(backend, weight, std::move(edmodules));
}
}
edm::EventProcessor processor(
maxEvents, runForMinutes, numberOfStreams, std::move(edmodules), std::move(esmodules), datadir, validation);
maxEvents, runForMinutes, numberOfStreams, std::move(alternatives), std::move(esmodules), datadir, validation);

if (runForMinutes < 0) {
std::cout << "Processing " << processor.maxEvents() << " events, of which " << numberOfStreams
<< " concurrently, with " << numberOfThreads << " threads." << std::endl;
std::cout << "Processing " << processor.maxEvents() << " events,";
} else {
std::cout << "Processing for about " << runForMinutes << " minutes with " << numberOfStreams
<< " concurrent events and " << numberOfThreads << " threads." << std::endl;
std::cout << "Processing for about " << runForMinutes << " minutes,";
}
{
std::cout << " with " << numberOfStreams << " concurrent events (";
bool need_comma = false;
for (auto const& [backend, streams] : processor.backends()) {
if (need_comma) {
std::cout << ", ";
}
std::cout << streams << " on " << backend;
need_comma = true;
}
std::cout << ") and " << numberOfThreads << " threads." << std::endl;
}

// Initialize the TBB thread pool
Expand Down

0 comments on commit 50570e8

Please sign in to comment.