Skip to content

Commit

Permalink
Merge pull request #17 from sfc-gh-mpilman/features/actor-lineage-flu…
Browse files Browse the repository at this point in the history
…entd

FluentD ingest
  • Loading branch information
sfc-gh-ljoswiak authored Apr 28, 2021
2 parents 3400ab5 + 32ee206 commit 1c62a92
Show file tree
Hide file tree
Showing 8 changed files with 559 additions and 51 deletions.
150 changes: 110 additions & 40 deletions fdbclient/ActorLineageProfiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <msgpack.hpp>
#include <memory>
#include <boost/endian/conversion.hpp>
#include <boost/asio.hpp>

using namespace std::literals;

Expand Down Expand Up @@ -211,6 +212,7 @@ void SampleCollection_t::refresh() {
oldest = data.front()->time;
}
}
//config->ingest(sample);
}

std::vector<std::shared_ptr<Sample>> SampleCollection_t::get(double from /*= 0.0*/,
Expand All @@ -227,7 +229,45 @@ std::vector<std::shared_ptr<Sample>> SampleCollection_t::get(double from /*= 0.0
return res;
}

ActorLineageProfilerT::ActorLineageProfilerT() {
struct ProfilerImpl {
boost::asio::io_context context;
boost::asio::executor_work_guard<decltype(context.get_executor())> workGuard;
boost::asio::steady_timer timer;
std::thread mainThread;
unsigned frequency;

SampleCollection collection;

ProfilerImpl() : workGuard(context.get_executor()), timer(context) {
mainThread = std::thread([this]() { context.run(); });
}
~ProfilerImpl() {
setFrequency(0);
workGuard.reset();
mainThread.join();
}

void profileHandler(boost::system::error_code const& ec) {
if (ec) {
return;
}
collection->refresh();
timer = boost::asio::steady_timer(context, std::chrono::microseconds(1000000 / frequency));
timer.async_wait([this](auto const& ec) { profileHandler(ec); });
}

void setFrequency(unsigned frequency) {
boost::asio::post(context, [this, frequency]() {
this->frequency = frequency;
timer.cancel();
if (frequency > 0) {
profileHandler(boost::system::error_code{});
}
});
}
};

ActorLineageProfilerT::ActorLineageProfilerT() : impl(new ProfilerImpl()) {
collection->collector()->addGetter(WaitState::Network,
std::bind(&ActorLineageSet::copy, std::ref(g_network->getActorLineageSet())));
collection->collector()->addGetter(
Expand All @@ -240,52 +280,19 @@ ActorLineageProfilerT::ActorLineageProfilerT() {
}

ActorLineageProfilerT::~ActorLineageProfilerT() {
stop();
}

void ActorLineageProfilerT::stop() {
setFrequency(0);
delete impl;
}

void ActorLineageProfilerT::setFrequency(unsigned frequency) {
unsigned oldFrequency = this->frequency;
bool change = this->frequency != frequency;
this->frequency = frequency;

if (change) {
// Profiler thread will automatically switch to new frequency after
// being triggered by the the condition variable. Only need to start a
// new profiler thread if the old one has been stopped due to the
// profiler thread returning (frequency set to 0).
if (oldFrequency == 0 && frequency != 0) {
std::thread(&ActorLineageProfilerT::profile, this).detach();
}
cond.notify_all();
}
impl->setFrequency(frequency);
}

void ActorLineageProfilerT::profile() {
static std::atomic_int profileThreadCount = 0;
ASSERT(++profileThreadCount == 1);

for (;;) {
collection->refresh();
if (frequency == 0) {
profileThreadCount--;
return;
}
{
std::unique_lock<std::mutex> lock{ mutex };
cond.wait_for(lock, std::chrono::microseconds(1000000 / frequency));
// cond.wait_until(lock, lastSample + std::chrono::milliseconds)
}
if (frequency == 0) {
profileThreadCount--;
return;
}
}
boost::asio::io_context& ActorLineageProfilerT::context() {
return impl->context;
}

SampleIngestor::~SampleIngestor() {}

// Callback used to update the sampling profilers run frequency whenever the
// frequency changes.
void samplingProfilerUpdateFrequency(std::optional<std::any> freq) {
Expand All @@ -297,6 +304,69 @@ void samplingProfilerUpdateFrequency(std::optional<std::any> freq) {
ActorLineageProfiler::instance().setFrequency(frequency);
}

void ProfilerConfigT::reset(std::map<std::string, std::string> const& config) {
bool expectNoMore = false, useFluentD = false, useTCP = false;
std::string endpoint;
ConfigError err;
for (auto& kv : config) {
if (expectNoMore) {
err.description = format("Unexpected option %s", kv.first.c_str());
throw err;
}
if (kv.first == "collector") {
std::string val = kv.second;
std::for_each(val.begin(), val.end(), [](auto c) { return std::tolower(c); });
if (val == "none") {
setBackend(std::make_shared<NoneIngestor>());
} else if (val == "fluentd") {
useFluentD = true;
} else {
err.description = format("Unsupported collector: %s", val.c_str());
throw err;
}
} else if (kv.first == "collector_endpoint") {
endpoint = kv.second;
} else if (kv.first == "collector_protocol") {
auto val = kv.second;
std::for_each(val.begin(), val.end(), [](auto c) { return std::tolower(c); });
if (val == "tcp") {
useTCP = true;
} else if (val == "udp") {
useTCP = false;
} else {
err.description = format("Unsupported protocol for fluentd: %s", kv.second.c_str());
throw err;
}
} else {
err.description = format("Unknown option %s", kv.first.c_str());
throw err;
}
}
if (useFluentD) {
if (endpoint.empty()) {
err.description = "Endpoint is required for fluentd ingestor";
throw err;
}
NetworkAddress address;
try {
address = NetworkAddress::parse(endpoint);
} catch (Error& e) {
err.description = format("Can't parse address %s", endpoint.c_str());
throw err;
}
setBackend(std::make_shared<FluentDIngestor>(
useTCP ? FluentDIngestor::Protocol::TCP : FluentDIngestor::Protocol::TCP, address));
}
}

std::map<std::string, std::string> ProfilerConfigT::getConfig() const {
std::map<std::string, std::string> res;
if (ingestor) {
ingestor->getConfig(res);
}
return res;
}

// Callback used to update the sample collector window size.
void samplingProfilerUpdateWindow(std::optional<std::any> window) {
double duration = 0;
Expand Down
78 changes: 71 additions & 7 deletions fdbclient/ActorLineageProfiler.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ struct IALPCollector : IALPCollectorBase {

struct Sample : std::enable_shared_from_this<Sample> {
double time = 0.0;
Sample(Sample const&) = delete;
Sample& operator=(Sample const&) = delete;
std::unordered_map<WaitState, std::pair<char*, unsigned>> data;
~Sample() {
std::for_each(data.begin(), data.end(), [](std::pair<WaitState, std::pair<char*, unsigned>> entry) {
Expand All @@ -55,6 +57,62 @@ struct Sample : std::enable_shared_from_this<Sample> {
}
};

class SampleIngestor : std::enable_shared_from_this<SampleIngestor> {
public:
virtual ~SampleIngestor();
virtual void ingest(std::shared_ptr<Sample> const& sample) = 0;
virtual void getConfig(std::map<std::string, std::string>&) const = 0;
};

class NoneIngestor : public SampleIngestor {
public:
void ingest(std::shared_ptr<Sample> const& sample) override {}
void getConfig(std::map<std::string, std::string>& res) const override { res["ingestor"] = "none"; }
};

// The FluentD ingestor uses the pimpl idiom. This is to make compilation less heavy weight as this implementation has
// dependencies to boost::asio
struct FluentDIngestorImpl;

class FluentDIngestor : public SampleIngestor {
public: // Public Types
enum class Protocol { TCP, UDP };

private: // members
FluentDIngestorImpl* impl;

public: // interface
void ingest(std::shared_ptr<Sample> const& sample) override;
FluentDIngestor(Protocol protocol, NetworkAddress& endpoint);
void getConfig(std::map<std::string, std::string>& res) const override;
~FluentDIngestor();
};

struct ConfigError {
std::string description;
};

class ProfilerConfigT {
private: // private types
using Lock = std::unique_lock<std::mutex>;
friend class crossbow::create_static<ProfilerConfigT>;

private: // members
std::shared_ptr<SampleIngestor> ingestor = std::make_shared<NoneIngestor>();

private: // construction
ProfilerConfigT() {}
ProfilerConfigT(ProfilerConfigT const&) = delete;
ProfilerConfigT& operator=(ProfilerConfigT const&) = delete;
void setBackend(std::shared_ptr<SampleIngestor> ingestor) { this->ingestor = ingestor; }

public:
void reset(std::map<std::string, std::string> const& config);
std::map<std::string, std::string> getConfig() const;
};

using ProfilerConfig = crossbow::singleton<ProfilerConfigT>;

class SampleCollectorT {
public: // Types
friend struct crossbow::create_static<SampleCollectorT>;
Expand Down Expand Up @@ -83,6 +141,7 @@ class SampleCollection_t {
mutable std::mutex mutex;
std::atomic<double> windowSize = 0.0;
std::deque<std::shared_ptr<Sample>> data;
ProfilerConfig config;

public:
/**
Expand All @@ -108,20 +167,25 @@ class SampleCollection_t {

using SampleCollection = crossbow::singleton<SampleCollection_t>;

struct ProfilerImpl;

namespace boost {
namespace asio {
// forward declare io_context because including boost asio is super expensive
class io_context;
} // namespace asio
} // namespace boost

class ActorLineageProfilerT {
friend struct crossbow::create_static<ActorLineageProfilerT>;
ActorLineageProfilerT();
ProfilerImpl* impl;
SampleCollection collection;
std::thread profilerThread;
std::atomic<unsigned> frequency = 0;
std::mutex mutex;
std::condition_variable cond;
void profile();
ActorLineageProfilerT();

public:
~ActorLineageProfilerT();
void setFrequency(unsigned frequency);
void stop();
boost::asio::io_context& context();
};

using ActorLineageProfiler = crossbow::singleton<ActorLineageProfilerT>;
1 change: 1 addition & 0 deletions fdbclient/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ set(FDBCLIENT_SRCS
EventTypes.actor.h
FDBOptions.h
FDBTypes.h
FluentDSampleIngestor.cpp
FileBackupAgent.actor.cpp
GlobalConfig.h
GlobalConfig.actor.h
Expand Down
Loading

0 comments on commit 1c62a92

Please sign in to comment.