Skip to content
This repository has been archived by the owner on May 21, 2024. It is now read-only.

Commit

Permalink
Merge pull request #793 from advancedtelematic/lt-ostree
Browse files Browse the repository at this point in the history
Add fetch from ostree
  • Loading branch information
pattivacek authored May 28, 2018
2 parents cfe59ab + 0c700a5 commit adc987f
Show file tree
Hide file tree
Showing 10 changed files with 234 additions and 42 deletions.
9 changes: 7 additions & 2 deletions src/load_tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,17 @@ set(LOAD_TESTS_SRC main.cc
executor.h
context.h context.cc
stats.cc stats.h
sslinit.h sslinit.cc)
sslinit.h sslinit.cc executor.cc)

set(LT_TREEHUB_SRC treehub.cc treehub.h)

if (BUILD_LOAD_TESTS)

add_executable(ota-load-tests ${LOAD_TESTS_SRC})

if (BUILD_OSTREE)
target_sources(ota-load-tests PUBLIC ${LT_TREEHUB_SRC})
endif (BUILD_OSTREE)

add_dependencies(ota-load-tests aktualizr hdr_histogram)

Expand Down Expand Up @@ -41,4 +46,4 @@ if (BUILD_LOAD_TESTS)
RUNTIME DESTINATION bin)
endif (BUILD_LOAD_TESTS)

aktualizr_source_file_checks(${LOAD_TESTS_SRC})
aktualizr_source_file_checks(${LOAD_TESTS_SRC} ${LT_TREEHUB_SRC})
13 changes: 9 additions & 4 deletions src/load_tests/check.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,16 @@ class CheckForUpdateTasks {
CheckForUpdate nextTask() { return CheckForUpdate{configs[gen(rng)]}; }
};

void checkForUpdates(const boost::filesystem::path &baseDir, const uint rate, const uint nr, const uint parallelism) {
void checkForUpdates(const boost::filesystem::path &baseDir, const unsigned int rate, const unsigned int nr,
const unsigned int parallelism) {
LOG_INFO << "Target rate: " << rate << "op/s, operations: " << nr << ", workers: " << parallelism;
auto configs = loadDeviceConfigurations(baseDir);
std::vector<CheckForUpdateTasks> feeds(parallelism, CheckForUpdateTasks{baseDir});
FixedExecutionController execController{nr};
Executor<CheckForUpdateTasks> exec{feeds, rate, execController};
std::unique_ptr<ExecutionController> execController;
if (nr == 0) {
execController = std_::make_unique<InterruptableExecutionController>();
} else {
execController = std_::make_unique<FixedExecutionController>(nr);
}
Executor<CheckForUpdateTasks> exec{feeds, rate, std::move(execController)};
exec.run();
}
15 changes: 14 additions & 1 deletion src/load_tests/context.cc
Original file line number Diff line number Diff line change
@@ -1,12 +1,25 @@
#include "context.h"
#include <boost/program_options.hpp>
#include "logging/logging.h"

using namespace boost::filesystem;
namespace po = boost::program_options;

Config configure(const path& cfgFile, const int logLevel) {
po::variables_map vm;
vm.insert(std::make_pair("loglevel", po::variable_value(logLevel, false)));
const std::vector<path> configDirs{cfgFile};
vm.insert(std::make_pair("config", po::variable_value(configDirs, false)));
po::notify(vm);
return Config{vm};
}

std::vector<Config> loadDeviceConfigurations(const path& baseDir) {
const int severity = loggerGetSeverity();
std::vector<Config> configs;
for (directory_entry& x : directory_iterator(baseDir)) {
const path sotaToml = x / "sota.toml";
configs.push_back(Config{sotaToml});
configs.push_back(configure(sotaToml, severity));
}
return configs;
}
2 changes: 2 additions & 0 deletions src/load_tests/context.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#include <config/config.h>
#include <vector>

Config configure(const boost::filesystem::path& cfgFile, const int logLevel);

std::vector<Config> loadDeviceConfigurations(const boost::filesystem::path& baseDir);

#endif
3 changes: 3 additions & 0 deletions src/load_tests/executor.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#include "executor.h"

std::atomic_bool InterruptableExecutionController::interrupted{false};
42 changes: 26 additions & 16 deletions src/load_tests/executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <atomic>
#include <boost/thread/latch.hpp>
#include <chrono>
#include <csignal>
#include <functional>
#include <thread>
#include <vector>
Expand All @@ -28,7 +29,7 @@ class UnboundedExecutionController : ExecutionController {

void stop() override { stopped = true; }

bool claim() override { return stopped; }
bool claim() override { return !stopped; }
};

class FixedExecutionController : public ExecutionController {
Expand All @@ -52,18 +53,23 @@ class FixedExecutionController : public ExecutionController {
}
};

class ThreadJoiner {
std::vector<std::thread> &threads;
class InterruptableExecutionController : public ExecutionController {
private:
std::atomic_bool stopped;
static std::atomic_bool interrupted;
static void handleSignal(int) {
LOG_INFO << "SIGINT received";
interrupted = true;
}

public:
explicit ThreadJoiner(std::vector<std::thread> &threads_) : threads(threads_) {}
~ThreadJoiner() {
for (size_t i = 0; i < threads.size(); i++) {
if (threads[i].joinable()) {
threads[i].join();
}
}
}
InterruptableExecutionController() : stopped{false} {
std::signal(SIGINT, InterruptableExecutionController::handleSignal);
};

bool claim() override { return !(interrupted || stopped); }

void stop() override { stopped = true; }
};

typedef timer::steady_clock::time_point TimePoint;
Expand All @@ -85,7 +91,7 @@ class TaskStartTimeCalculator {

template <typename TaskStream>
class Executor {
ExecutionController &controller;
std::unique_ptr<ExecutionController> controller;
std::vector<std::thread> workers;
std::vector<Statistics> statistics;
TaskStartTimeCalculator calculateTaskStartTime;
Expand All @@ -96,8 +102,10 @@ class Executor {
using clock = std::chrono::steady_clock;
LOG_DEBUG << "Worker created: " << std::this_thread::get_id();
threadCountDown.count_down();
LOG_INFO << "Ready to go...";
starter.wait();
while (controller.claim()) {
LOG_INFO << "Go!";
while (controller->claim()) {
auto task = tasks.nextTask();
const auto intendedStartTime = calculateTaskStartTime();
if (timer::steady_clock::now() < intendedStartTime) {
Expand All @@ -113,8 +121,8 @@ class Executor {
}

public:
Executor(std::vector<TaskStream> &feeds, const unsigned rate, ExecutionController &ctrl)
: controller{ctrl},
Executor(std::vector<TaskStream> &feeds, const unsigned rate, std::unique_ptr<ExecutionController> ctrl)
: controller{std::move(ctrl)},
workers{},
statistics(feeds.size()),
calculateTaskStartTime{rate},
Expand All @@ -126,17 +134,19 @@ class Executor {
workers.push_back(std::thread(&Executor::runWorker, this, std::ref(feeds[i]), std::ref(statistics[i])));
}
} catch (...) {
controller.stop();
controller->stop();
throw;
}
};

Statistics run() {
Statistics summary{};
// wait till all threads are crerated and ready to go
LOG_INFO << "Waiting for threads to start";
threadCountDown.wait();
calculateTaskStartTime.start();
summary.start();
LOG_INFO << "Starting tests";
// start execution
starter.count_down();
// wait till all threads finished execution
Expand Down
57 changes: 46 additions & 11 deletions src/load_tests/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,57 @@
#include "check.h"
#include "provision.h"
#include "sslinit.h"
#ifdef BUILD_OSTREE
#include "treehub.h"
#endif

namespace bpo = boost::program_options;

#ifdef BUILD_OSTREE
void fetchRemoteCmd(const std::vector<std::string> &opts) {
std::string inputDir;
std::string outDir;
std::string branchName;
std::string remoteUrl;
unsigned int opsPerSec;
unsigned int opsNr;
unsigned int parallelism;
bpo::options_description description("Fetch from ostree");
// clang-format off
description.add_options()
("inputdir,i", bpo::value<std::string>(&inputDir)->required(), "Directory containig provisioned devices.")
("outputdir,o", bpo::value<std::string>(&outDir)->required(), "Directory where repos will be created")
("branch,b", bpo::value<std::string>(&branchName)->required(), "Name of a branch to pull")
("url,u", bpo::value<std::string>(&remoteUrl)->required(), "Url of the repository")
("number,n", bpo::value<unsigned int>(&opsNr)->default_value(100), "number of operation to execute")
("threads,t", bpo::value<unsigned int>(&parallelism)->default_value(std::thread::hardware_concurrency()), "number of worker threads")
("rate,r", bpo::value<unsigned int>(&opsPerSec)->default_value(50), "repo pulls per second");
// clang-format on

bpo::variables_map vm;
bpo::store(bpo::command_line_parser(opts).options(description).run(), vm);
bpo::notify(vm);

const boost::filesystem::path outputPath(outDir);
fetchFromOstree(inputDir, outputPath, branchName, remoteUrl, opsPerSec, opsNr, parallelism);
}
#endif

void checkForUpdatesCmd(const std::vector<std::string> &opts) {
namespace bpo = boost::program_options;
namespace fs = boost::filesystem;
uint devicesPerSec;
uint opsNr;
uint parallelism;
unsigned int devicesPerSec;
unsigned int opsNr;
unsigned int parallelism;
std::string inputDir;
std::string outDir;
bpo::options_description description("Check for update options");
// clang-format off
description.add_options()
("inputdir,i", bpo::value<std::string>(&inputDir)->required(), "path to the input data")
("rate,r", bpo::value<uint>(&devicesPerSec)->default_value(5), "devices/sec")
("number,n", bpo::value<uint>(&opsNr)->default_value(100), "number of operation to execute")
("threads,t", bpo::value<uint>(&parallelism)->default_value(std::thread::hardware_concurrency()), "number of worker threads");
("rate,r", bpo::value<unsigned int>(&devicesPerSec)->default_value(5), "devices/sec")
("number,n", bpo::value<unsigned int>(&opsNr)->default_value(100), "number of operation to execute")
("threads,t", bpo::value<unsigned int>(&parallelism)->default_value(std::thread::hardware_concurrency()), "number of worker threads");
// clang-format on

bpo::variables_map vm;
Expand Down Expand Up @@ -86,11 +119,13 @@ void setLogLevel(const bpo::variables_map &vm) {
int main(int argc, char *argv[]) {
std::srand(std::time(0));

std::map<std::string, std::function<void(std::vector<std::string>)>> commands{
// {"sla", slaCheckCmd},
{"provision", provisionDevicesCmd},
{"check", checkForUpdatesCmd}
// {"pull", pullOSTreeCmd}
std::map<std::string, std::function<void(std::vector<std::string>)>> commands{// {"sla", slaCheckCmd},
{"provision", provisionDevicesCmd},
{"check", checkForUpdatesCmd}
#ifdef BUILD_OSTREE
,
{"fetch", fetchRemoteCmd}
#endif
};

std::stringstream acc;
Expand Down
22 changes: 14 additions & 8 deletions src/load_tests/provision.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
#include <boost/property_tree/ini_parser.hpp>
#include <boost/uuid/random_generator.hpp>
#include <boost/uuid/uuid_io.hpp>
#include "context.h"
#include "executor.h"
#include "logging/logging.h"

using namespace boost::filesystem;
using ptree = boost::property_tree::ptree;
Expand Down Expand Up @@ -37,8 +39,9 @@ class ProvisionDeviceTask {
HttpClient httpClient;

public:
ProvisionDeviceTask(const path configFile)
: config{configFile}, storage{INvStorage::newStorage(config.storage)}, httpClient{} {}
ProvisionDeviceTask(const Config cfg) : config{cfg}, storage{INvStorage::newStorage(config.storage)}, httpClient{} {
logger_set_threshold(boost::log::trivial::severity_level::trace);
}

void operator()() {
Uptane::Repository repo{config, storage, httpClient};
Expand All @@ -62,21 +65,24 @@ class ProvisionDeviceTaskStream {
boost::uuids::basic_random_generator<boost::random::mt19937> gen;
const path &dstDir;
const ptree &cfgTemplate;
const int logLevel;

public:
ProvisionDeviceTaskStream(const path &dstDir_, const ptree &ct) : gen{}, dstDir{dstDir_}, cfgTemplate{ct} {}
ProvisionDeviceTaskStream(const path &dstDir_, const ptree &ct, const int ll)
: gen{}, dstDir{dstDir_}, cfgTemplate{ct}, logLevel{ll} {}

ProvisionDeviceTask nextTask() {
LOG_INFO << "Creating provision device task";
const boost::uuids::uuid deviceId = gen();
const path deviceBaseDir = mkDeviceBaseDir(deviceId, dstDir);
const path deviceCfgPath = writeDeviceConfig(cfgTemplate, deviceBaseDir, deviceId);
return ProvisionDeviceTask(deviceCfgPath);
return ProvisionDeviceTask(configure(deviceCfgPath, logLevel));
}
};

void mkDevices(const path &dstDir, const path bootstrapCredentials, const std::string gw_uri, const size_t parallelism,
const uint nr, const uint rate) {
const unsigned int nr, const uint rate) {
const int severity = loggerGetSeverity();
ptree cfgTemplate{};
cfgTemplate.put_child("tls.server", ptree("\"https://" + gw_uri + "\""));
cfgTemplate.put_child("provision.server", ptree("\"https://" + gw_uri + "\""));
Expand All @@ -85,9 +91,9 @@ void mkDevices(const path &dstDir, const path bootstrapCredentials, const std::s
cfgTemplate.put_child("pacman.type", ptree("\"none\""));
std::vector<ProvisionDeviceTaskStream> feeds;
for (size_t i = 0; i < parallelism; i++) {
feeds.push_back(ProvisionDeviceTaskStream{dstDir, cfgTemplate});
feeds.push_back(ProvisionDeviceTaskStream{dstDir, cfgTemplate, severity});
}
FixedExecutionController execController{nr};
Executor<ProvisionDeviceTaskStream> exec{feeds, rate, execController};
std::unique_ptr<ExecutionController> execController = std_::make_unique<FixedExecutionController>(nr);
Executor<ProvisionDeviceTaskStream> exec{feeds, rate, std::move(execController)};
exec.run();
}
Loading

0 comments on commit adc987f

Please sign in to comment.