diff --git a/fdbclient/ActorLineageProfiler.cpp b/fdbclient/ActorLineageProfiler.cpp index e0a2e1bdf52..d596d306160 100644 --- a/fdbclient/ActorLineageProfiler.cpp +++ b/fdbclient/ActorLineageProfiler.cpp @@ -25,6 +25,7 @@ #include #include #include +#include using namespace std::literals; @@ -211,6 +212,7 @@ void SampleCollection_t::refresh() { oldest = data.front()->time; } } + //config->ingest(sample); } std::vector> SampleCollection_t::get(double from /*= 0.0*/, @@ -227,7 +229,45 @@ std::vector> SampleCollection_t::get(double from /*= 0.0 return res; } -ActorLineageProfilerT::ActorLineageProfilerT() { +struct ProfilerImpl { + boost::asio::io_context context; + boost::asio::executor_work_guard workGuard; + boost::asio::steady_timer timer; + std::thread mainThread; + unsigned frequency; + + SampleCollection collection; + + ProfilerImpl() : workGuard(context.get_executor()), timer(context) { + mainThread = std::thread([this]() { context.run(); }); + } + ~ProfilerImpl() { + setFrequency(0); + workGuard.reset(); + mainThread.join(); + } + + void profileHandler(boost::system::error_code const& ec) { + if (ec) { + return; + } + collection->refresh(); + timer = boost::asio::steady_timer(context, std::chrono::microseconds(1000000 / frequency)); + timer.async_wait([this](auto const& ec) { profileHandler(ec); }); + } + + void setFrequency(unsigned frequency) { + boost::asio::post(context, [this, frequency]() { + this->frequency = frequency; + timer.cancel(); + if (frequency > 0) { + profileHandler(boost::system::error_code{}); + } + }); + } +}; + +ActorLineageProfilerT::ActorLineageProfilerT() : impl(new ProfilerImpl()) { collection->collector()->addGetter(WaitState::Network, std::bind(&ActorLineageSet::copy, std::ref(g_network->getActorLineageSet()))); collection->collector()->addGetter( @@ -240,52 +280,19 @@ ActorLineageProfilerT::ActorLineageProfilerT() { } ActorLineageProfilerT::~ActorLineageProfilerT() { - stop(); -} - -void ActorLineageProfilerT::stop() { - setFrequency(0); + delete impl; } void ActorLineageProfilerT::setFrequency(unsigned frequency) { - unsigned oldFrequency = this->frequency; - bool change = this->frequency != frequency; - this->frequency = frequency; - - if (change) { - // Profiler thread will automatically switch to new frequency after - // being triggered by the the condition variable. Only need to start a - // new profiler thread if the old one has been stopped due to the - // profiler thread returning (frequency set to 0). - if (oldFrequency == 0 && frequency != 0) { - std::thread(&ActorLineageProfilerT::profile, this).detach(); - } - cond.notify_all(); - } + impl->setFrequency(frequency); } -void ActorLineageProfilerT::profile() { - static std::atomic_int profileThreadCount = 0; - ASSERT(++profileThreadCount == 1); - - for (;;) { - collection->refresh(); - if (frequency == 0) { - profileThreadCount--; - return; - } - { - std::unique_lock lock{ mutex }; - cond.wait_for(lock, std::chrono::microseconds(1000000 / frequency)); - // cond.wait_until(lock, lastSample + std::chrono::milliseconds) - } - if (frequency == 0) { - profileThreadCount--; - return; - } - } +boost::asio::io_context& ActorLineageProfilerT::context() { + return impl->context; } +SampleIngestor::~SampleIngestor() {} + // Callback used to update the sampling profilers run frequency whenever the // frequency changes. void samplingProfilerUpdateFrequency(std::optional freq) { @@ -297,6 +304,69 @@ void samplingProfilerUpdateFrequency(std::optional freq) { ActorLineageProfiler::instance().setFrequency(frequency); } +void ProfilerConfigT::reset(std::map const& config) { + bool expectNoMore = false, useFluentD = false, useTCP = false; + std::string endpoint; + ConfigError err; + for (auto& kv : config) { + if (expectNoMore) { + err.description = format("Unexpected option %s", kv.first.c_str()); + throw err; + } + if (kv.first == "collector") { + std::string val = kv.second; + std::for_each(val.begin(), val.end(), [](auto c) { return std::tolower(c); }); + if (val == "none") { + setBackend(std::make_shared()); + } else if (val == "fluentd") { + useFluentD = true; + } else { + err.description = format("Unsupported collector: %s", val.c_str()); + throw err; + } + } else if (kv.first == "collector_endpoint") { + endpoint = kv.second; + } else if (kv.first == "collector_protocol") { + auto val = kv.second; + std::for_each(val.begin(), val.end(), [](auto c) { return std::tolower(c); }); + if (val == "tcp") { + useTCP = true; + } else if (val == "udp") { + useTCP = false; + } else { + err.description = format("Unsupported protocol for fluentd: %s", kv.second.c_str()); + throw err; + } + } else { + err.description = format("Unknown option %s", kv.first.c_str()); + throw err; + } + } + if (useFluentD) { + if (endpoint.empty()) { + err.description = "Endpoint is required for fluentd ingestor"; + throw err; + } + NetworkAddress address; + try { + address = NetworkAddress::parse(endpoint); + } catch (Error& e) { + err.description = format("Can't parse address %s", endpoint.c_str()); + throw err; + } + setBackend(std::make_shared( + useTCP ? FluentDIngestor::Protocol::TCP : FluentDIngestor::Protocol::TCP, address)); + } +} + +std::map ProfilerConfigT::getConfig() const { + std::map res; + if (ingestor) { + ingestor->getConfig(res); + } + return res; +} + // Callback used to update the sample collector window size. void samplingProfilerUpdateWindow(std::optional window) { double duration = 0; diff --git a/fdbclient/ActorLineageProfiler.h b/fdbclient/ActorLineageProfiler.h index 67c6c83ff36..8fc29ef0b2e 100644 --- a/fdbclient/ActorLineageProfiler.h +++ b/fdbclient/ActorLineageProfiler.h @@ -47,6 +47,8 @@ struct IALPCollector : IALPCollectorBase { struct Sample : std::enable_shared_from_this { double time = 0.0; + Sample(Sample const&) = delete; + Sample& operator=(Sample const&) = delete; std::unordered_map> data; ~Sample() { std::for_each(data.begin(), data.end(), [](std::pair> entry) { @@ -55,6 +57,62 @@ struct Sample : std::enable_shared_from_this { } }; +class SampleIngestor : std::enable_shared_from_this { +public: + virtual ~SampleIngestor(); + virtual void ingest(std::shared_ptr const& sample) = 0; + virtual void getConfig(std::map&) const = 0; +}; + +class NoneIngestor : public SampleIngestor { +public: + void ingest(std::shared_ptr const& sample) override {} + void getConfig(std::map& res) const override { res["ingestor"] = "none"; } +}; + +// The FluentD ingestor uses the pimpl idiom. This is to make compilation less heavy weight as this implementation has +// dependencies to boost::asio +struct FluentDIngestorImpl; + +class FluentDIngestor : public SampleIngestor { +public: // Public Types + enum class Protocol { TCP, UDP }; + +private: // members + FluentDIngestorImpl* impl; + +public: // interface + void ingest(std::shared_ptr const& sample) override; + FluentDIngestor(Protocol protocol, NetworkAddress& endpoint); + void getConfig(std::map& res) const override; + ~FluentDIngestor(); +}; + +struct ConfigError { + std::string description; +}; + +class ProfilerConfigT { +private: // private types + using Lock = std::unique_lock; + friend class crossbow::create_static; + +private: // members + std::shared_ptr ingestor = std::make_shared(); + +private: // construction + ProfilerConfigT() {} + ProfilerConfigT(ProfilerConfigT const&) = delete; + ProfilerConfigT& operator=(ProfilerConfigT const&) = delete; + void setBackend(std::shared_ptr ingestor) { this->ingestor = ingestor; } + +public: + void reset(std::map const& config); + std::map getConfig() const; +}; + +using ProfilerConfig = crossbow::singleton; + class SampleCollectorT { public: // Types friend struct crossbow::create_static; @@ -83,6 +141,7 @@ class SampleCollection_t { mutable std::mutex mutex; std::atomic windowSize = 0.0; std::deque> data; + ProfilerConfig config; public: /** @@ -108,20 +167,25 @@ class SampleCollection_t { using SampleCollection = crossbow::singleton; +struct ProfilerImpl; + +namespace boost { +namespace asio { +// forward declare io_context because including boost asio is super expensive +class io_context; +} // namespace asio +} // namespace boost + class ActorLineageProfilerT { friend struct crossbow::create_static; - ActorLineageProfilerT(); + ProfilerImpl* impl; SampleCollection collection; - std::thread profilerThread; - std::atomic frequency = 0; - std::mutex mutex; - std::condition_variable cond; - void profile(); + ActorLineageProfilerT(); public: ~ActorLineageProfilerT(); void setFrequency(unsigned frequency); - void stop(); + boost::asio::io_context& context(); }; using ActorLineageProfiler = crossbow::singleton; diff --git a/fdbclient/CMakeLists.txt b/fdbclient/CMakeLists.txt index ee87d086461..e9d3d3716b5 100644 --- a/fdbclient/CMakeLists.txt +++ b/fdbclient/CMakeLists.txt @@ -30,6 +30,7 @@ set(FDBCLIENT_SRCS EventTypes.actor.h FDBOptions.h FDBTypes.h + FluentDSampleIngestor.cpp FileBackupAgent.actor.cpp GlobalConfig.h GlobalConfig.actor.h diff --git a/fdbclient/FluentDSampleIngestor.cpp b/fdbclient/FluentDSampleIngestor.cpp new file mode 100644 index 00000000000..08d5bfe55fb --- /dev/null +++ b/fdbclient/FluentDSampleIngestor.cpp @@ -0,0 +1,257 @@ +/* + * FluentDSampleIngestor.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2021 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "fdbclient/ActorLineageProfiler.h" +#include +#include +#include + +namespace { + +boost::asio::ip::address ipAddress(IPAddress const& n) { + if (n.isV6()) { + return boost::asio::ip::address_v6(n.toV6()); + } else { + return boost::asio::ip::address_v4(n.toV4()); + } +} + +template +boost::asio::ip::basic_endpoint toEndpoint(NetworkAddress const n) { + return boost::asio::ip::basic_endpoint(ipAddress(n.ip), n.port); +} + +struct FluentDSocket { + virtual ~FluentDSocket() {} + virtual void connect(NetworkAddress const& endpoint) = 0; + virtual void send(std::shared_ptr const& sample) = 0; + virtual const boost::system::error_code& failed() const = 0; +}; + +template +class SampleSender : public std::enable_shared_from_this> { + using Socket = typename Protocol::socket; + using Iter = typename decltype(Sample::data)::iterator; + Socket& socket; + Callback callback; + Iter iter, end; + + struct Buf { + const char* data; + const unsigned size; + Buf(const char* data, unsigned size) : data(data), size(size) {} + Buf(Buf const&) = delete; + Buf& operator=(Buf const&) = delete; + ~Buf() { delete[] data; } + }; + + void sendCompletionHandler(boost::system::error_code const& ec) { + if (ec) { + callback(ec); + } else { + ++iter; + sendNext(); + } + } + + void send(boost::asio::ip::tcp::socket& socket, std::shared_ptr const& buf) { + boost::asio::async_write( + socket, + boost::asio::const_buffer(buf->data, buf->size), + [buf, self = this->shared_from_this()](auto const& ec, size_t) { self->sendCompletionHandler(ec); }); + } + void send(boost::asio::ip::udp::socket& socket, std::shared_ptr const& buf) { + socket.async_send( + boost::asio::const_buffer(buf->data, buf->size), + [buf, self = this->shared_from_this()](auto const& ec, size_t) { self->sendCompletionHandler(ec); }); + } + + void sendNext() { + if (iter == end) { + callback(boost::system::error_code()); + } + // 1. calculate size of buffer + unsigned size = 1; // 1 for fixmap identifier byte + auto waitState = to_string(iter->first); + if (waitState.size() < 32) { + size = waitState.size() + 1; + } else { + size = waitState.size() + 2; + } + size += iter->second.second; + // 2. allocate the buffer + std::unique_ptr buf(new char[size]); + unsigned off = 0; + // 3. serialize fixmap + buf[off++] = 0x81; // map of size 1 + // 3.1 serialize key + if (waitState.size() < 32) { + buf[off++] = 0xa0 + waitState.size(); // fixstr + } else { + buf[off++] = 0xd9; + buf[off++] = char(waitState.size()); + } + memcpy(buf.get() + off, waitState.data(), waitState.size()); + off += waitState.size(); + // 3.2 append serialized value + memcpy(buf.get() + off, iter->second.first, iter->second.second); + // 4. send the result to fluentd + send(socket, std::make_shared(buf.release(), size)); + } + +public: + SampleSender(Socket& socket, Callback const& callback, std::shared_ptr const& sample) + : socket(socket), callback(callback), iter(sample->data.begin()), end(sample->data.end()) { + sendNext(); + } +}; + +// Sample function to make instanciation of SampleSender easier +template +std::shared_ptr> makeSampleSender(typename Protocol::socket& socket, Callback const& callback, std::shared_ptr const& sample) { + return std::make_shared>(socket, callback, sample); +} + +template +struct FluentDSocketImpl : FluentDSocket, std::enable_shared_from_this> { + static constexpr unsigned MAX_QUEUE_SIZE = 100; + boost::asio::io_context& context; + typename Protocol::socket socket; + FluentDSocketImpl(boost::asio::io_context& context) : context(context), socket(context) {} + bool ready = false; + std::deque> queue; + boost::system::error_code _failed; + + const boost::system::error_code& failed() const override { return _failed; } + + void sendCompletionHandler(boost::system::error_code const& ec) { + if (ec) { + // TODO: trace error + _failed = ec; + return; + } + if (queue.empty()) { + ready = true; + } else { + auto sample = queue.front(); + queue.pop_front(); + sendImpl(sample); + } + } + + + void sendImpl(std::shared_ptr const& sample) { + makeSampleSender(socket, [self = this->shared_from_this()](boost::system::error_code const& ec){ + self->sendCompletionHandler(ec); + }, sample); + } + + void send(std::shared_ptr const& sample) override { + if (_failed) { + return; + } + if (ready) { + ready = false; + sendImpl(sample); + } else { + if (queue.size() < MAX_QUEUE_SIZE) { + queue.push_back(sample); + } // TODO: else trace a warning + } + } + + void connect(NetworkAddress const& endpoint) override { + auto to = toEndpoint(endpoint); + socket.async_connect(to, [self = this->shared_from_this()](boost::system::error_code const& ec) { + if (ec) { + // TODO: error handling + self->_failed = ec; + return; + } + self->ready = true; + }); + } +}; + +} // namespace + +struct FluentDIngestorImpl { + using Protocol = FluentDIngestor::Protocol; + Protocol protocol; + NetworkAddress endpoint; + boost::asio::io_context& io_context; + std::unique_ptr socket; + boost::asio::steady_timer retryTimer; + FluentDIngestorImpl(Protocol protocol, NetworkAddress const& endpoint) + : protocol(protocol), endpoint(endpoint), io_context(ActorLineageProfiler::instance().context()), + retryTimer(io_context) { + connect(); + } + + ~FluentDIngestorImpl() { retryTimer.cancel(); } + + void connect() { + switch (protocol) { + case Protocol::TCP: + socket.reset(new FluentDSocketImpl(io_context)); + break; + case Protocol::UDP: + socket.reset(new FluentDSocketImpl(io_context)); + break; + } + socket->connect(endpoint); + } + + void retry() { + retryTimer = boost::asio::steady_timer(io_context, std::chrono::seconds(1)); + retryTimer.async_wait([this](auto const& ec) { + if (ec) { + return; + } + connect(); + }); + socket.reset(); + } +}; + +FluentDIngestor::~FluentDIngestor() { + delete impl; +} + +FluentDIngestor::FluentDIngestor(Protocol protocol, NetworkAddress& endpoint) + : impl(new FluentDIngestorImpl(protocol, endpoint)) {} + +void FluentDIngestor::ingest(const std::shared_ptr& sample) { + if (!impl->socket) { + // the connection failed in the past and we wait for a timeout before we retry + return; + } else if (impl->socket->failed()) { + impl->retry(); + return; + } else { + impl->socket->send(sample); + } +} + +void FluentDIngestor::getConfig(std::map& res) const { + res["ingestor"] = "fluentd"; + res["collector_endpoint"] = impl->endpoint.toString(); + res["collector_protocol"] = impl->protocol == Protocol::TCP ? "tcp" : "udp"; +} diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 099ee4df278..e3ac5dbab0b 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -1074,6 +1074,10 @@ DatabaseContext::DatabaseContext(Reference(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::ACTORLINEAGE))); + registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::ACTOR_PROFILER_CONF, + SpecialKeySpace::IMPLTYPE::READWRITE, + std::make_unique( + SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::ACTORLINEAGE))); } if (apiVersionAtLeast(630)) { registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::TRANSACTION, diff --git a/fdbclient/SpecialKeySpace.actor.cpp b/fdbclient/SpecialKeySpace.actor.cpp index cbd62b72c78..c866354759c 100644 --- a/fdbclient/SpecialKeySpace.actor.cpp +++ b/fdbclient/SpecialKeySpace.actor.cpp @@ -26,6 +26,7 @@ #include +#include "fdbclient/ActorLineageProfiler.h" #include "fdbclient/Knobs.h" #include "fdbclient/ProcessInterface.h" #include "fdbclient/GlobalConfig.actor.h" @@ -76,7 +77,10 @@ std::unordered_map SpecialKeySpace::moduleToB { SpecialKeySpace::MODULE::TRACING, KeyRangeRef(LiteralStringRef("\xff\xff/tracing/"), LiteralStringRef("\xff\xff/tracing0")) }, { SpecialKeySpace::MODULE::ACTORLINEAGE, - KeyRangeRef(LiteralStringRef("\xff\xff/actor_lineage/"), LiteralStringRef("\xff\xff/actor_lineage0")) } + KeyRangeRef(LiteralStringRef("\xff\xff/actor_lineage/"), LiteralStringRef("\xff\xff/actor_lineage0")) }, + { SpecialKeySpace::MODULE::ACTOR_PROFILER_CONF, + KeyRangeRef(LiteralStringRef("\xff\xff/actor_profiler_conf/"), + LiteralStringRef("\xff\xff/actor_profiler_conf0")) } }; std::unordered_map SpecialKeySpace::managementApiCommandToRange = { @@ -2051,8 +2055,8 @@ ACTOR static Future> actorLineageGetRangeActor(ReadYo throw special_keys_api_failure(); } + state NetworkAddress endRangeHost; try { - state NetworkAddress endRangeHost; if (SpecialKeySpace::getActorLineageApiCommandRange("state").contains(kr)) { // For the range \xff\xff/actor_lineage/state/ip:port/wait-state/time/seq parse(beginValues.begin() + 1, beginValues.end(), host, waitStateStart, timeStart, seqStart); @@ -2146,6 +2150,70 @@ Future> ActorLineageImpl::getRange(ReadYourWritesTran return actorLineageGetRangeActor(ryw, getKeyRange().begin, kr); } +namespace { +std::string_view to_string_view(StringRef sr) { + return std::string_view(reinterpret_cast(sr.begin()), sr.size()); +} +} // namespace + +ActorProfilerConf::ActorProfilerConf(KeyRangeRef kr) + : SpecialKeyRangeRWImpl(kr), config(ProfilerConfig::instance().getConfig()) {} + +Future> ActorProfilerConf::getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const { + Standalone res; + std::string_view begin(to_string_view(kr.begin.removePrefix(range.begin))), + end(to_string_view(kr.end.removePrefix(range.begin))); + for (auto& p : config) { + if (p.first > end) { + break; + } else if (p.first > begin) { + KeyValueRef kv; + kv.key = StringRef(res.arena(), p.first); + kv.value = StringRef(res.arena(), p.second); + res.push_back(res.arena(), kv); + } + } + return res; +} + +void ActorProfilerConf::set(ReadYourWritesTransaction* ryw, const KeyRef& key, const ValueRef& value) { + config[key.removePrefix(range.begin).toString()] = value.toString(); + didWrite = true; +} + +void ActorProfilerConf::clear(ReadYourWritesTransaction* ryw, const KeyRangeRef& kr) { + std::string begin(kr.begin.removePrefix(range.begin).toString()), end(kr.end.removePrefix(range.begin).toString()); + auto first = config.lower_bound(begin); + if (first == config.end()) { + // nothing to clear + return; + } + didWrite = true; + auto last = config.upper_bound(end); + config.erase(first, last); +} + +void ActorProfilerConf::clear(ReadYourWritesTransaction* ryw, const KeyRef& key) { + std::string k = key.removePrefix(range.begin).toString(); + auto iter = config.find(k); + if (iter != config.end()) { + config.erase(iter); + } + didWrite = true; +} + +Future> ActorProfilerConf::commit(ReadYourWritesTransaction* ryw) { + Optional res{}; + try { + if (didWrite) { + ProfilerConfig::instance().reset(config); + } + return res; + } catch (ConfigError& err) { + return Optional{ err.description }; + } +} + MaintenanceImpl::MaintenanceImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {} // Used to read the healthZoneKey diff --git a/fdbclient/SpecialKeySpace.actor.h b/fdbclient/SpecialKeySpace.actor.h index 670c66f957a..ca6676a54ba 100644 --- a/fdbclient/SpecialKeySpace.actor.h +++ b/fdbclient/SpecialKeySpace.actor.h @@ -143,6 +143,7 @@ class SpecialKeySpace { public: enum class MODULE { ACTORLINEAGE, // Sampling data + ACTOR_PROFILER_CONF, // profiler configuration CLUSTERFILEPATH, CONFIGURATION, // Configuration of the cluster CONNECTIONSTRING, @@ -402,6 +403,19 @@ class ActorLineageImpl : public SpecialKeyRangeReadImpl { Future> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override; }; +class ActorProfilerConf : public SpecialKeyRangeRWImpl { + bool didWrite = false; + std::map config; + +public: + explicit ActorProfilerConf(KeyRangeRef kr); + Future> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override; + void set(ReadYourWritesTransaction* ryw, const KeyRef& key, const ValueRef& value) override; + void clear(ReadYourWritesTransaction* ryw, const KeyRangeRef& range) override; + void clear(ReadYourWritesTransaction* ryw, const KeyRef& key) override; + Future> commit(ReadYourWritesTransaction* ryw) override; +}; + class MaintenanceImpl : public SpecialKeyRangeRWImpl { public: explicit MaintenanceImpl(KeyRangeRef kr); diff --git a/fdbserver/fdbserver.actor.cpp b/fdbserver/fdbserver.actor.cpp index 136cd90c3de..3b0fc8b180f 100644 --- a/fdbserver/fdbserver.actor.cpp +++ b/fdbserver/fdbserver.actor.cpp @@ -68,6 +68,7 @@ #include "flow/Tracing.h" #include "flow/WriteOnlySet.h" #include "flow/UnitTest.h" +#include "fdbclient/ActorLineageProfiler.h" #if defined(__linux__) || defined(__FreeBSD__) #include @@ -85,6 +86,8 @@ #include "flow/actorcompiler.h" // This must be the last #include. +using namespace std::literals; + // clang-format off enum { OPT_CONNFILE, OPT_SEEDCONNFILE, OPT_SEEDCONNSTRING, OPT_ROLE, OPT_LISTEN, OPT_PUBLICADDR, OPT_DATAFOLDER, OPT_LOGFOLDER, OPT_PARENTPID, OPT_TRACER, OPT_NEWCONSOLE, @@ -92,7 +95,7 @@ enum { OPT_DCID, OPT_MACHINE_CLASS, OPT_BUGGIFY, OPT_VERSION, OPT_BUILD_FLAGS, OPT_CRASHONERROR, OPT_HELP, OPT_NETWORKIMPL, OPT_NOBUFSTDOUT, OPT_BUFSTDOUTERR, OPT_TRACECLOCK, OPT_NUMTESTERS, OPT_DEVHELP, OPT_ROLLSIZE, OPT_MAXLOGS, OPT_MAXLOGSSIZE, OPT_KNOB, OPT_UNITTESTPARAM, OPT_TESTSERVERS, OPT_TEST_ON_SERVERS, OPT_METRICSCONNFILE, OPT_METRICSPREFIX, OPT_LOGGROUP, OPT_LOCALITY, OPT_IO_TRUST_SECONDS, OPT_IO_TRUST_WARN_ONLY, OPT_FILESYSTEM, OPT_PROFILER_RSS_SIZE, OPT_KVFILE, - OPT_TRACE_FORMAT, OPT_WHITELIST_BINPATH, OPT_BLOB_CREDENTIAL_FILE + OPT_TRACE_FORMAT, OPT_WHITELIST_BINPATH, OPT_BLOB_CREDENTIAL_FILE, OPT_PROFILER }; CSimpleOpt::SOption g_rgOptions[] = { @@ -172,9 +175,10 @@ CSimpleOpt::SOption g_rgOptions[] = { { OPT_METRICSPREFIX, "--metrics_prefix", SO_REQ_SEP }, { OPT_IO_TRUST_SECONDS, "--io_trust_seconds", SO_REQ_SEP }, { OPT_IO_TRUST_WARN_ONLY, "--io_trust_warn_only", SO_NONE }, - { OPT_TRACE_FORMAT , "--trace_format", SO_REQ_SEP }, + { OPT_TRACE_FORMAT, "--trace_format", SO_REQ_SEP }, { OPT_WHITELIST_BINPATH, "--whitelist_binpath", SO_REQ_SEP }, { OPT_BLOB_CREDENTIAL_FILE, "--blob_credential_file", SO_REQ_SEP }, + { OPT_PROFILER, "--profiler_", SO_REQ_SEP}, #ifndef TLS_DISABLED TLS_OPTION_FLAGS @@ -618,6 +622,11 @@ static void printUsage(const char* name, bool devhelp) { " Machine class (valid options are storage, transaction," " resolution, grv_proxy, commit_proxy, master, test, unset, stateless, log, router," " and cluster_controller)."); + printOptionUsage("--profiler_", + "Set an actor profiler option. Supported options are:\n" + " collector -- None or FluentD (FluentD requires collector_endpoint to be set)\n" + " collector_endpoint -- IP:PORT of the fluentd server\n" + " collector_protocol -- UDP or TCP (default is UDP)"); #ifndef TLS_DISABLED printf(TLS_HELP); #endif @@ -981,6 +990,8 @@ struct CLIOptions { Standalone machineId; UnitTestParameters testParams; + std::map profilerConfig; + static CLIOptions parseArgs(int argc, char* argv[]) { CLIOptions opts; opts.parseArgsInternal(argc, argv); @@ -1054,6 +1065,18 @@ struct CLIOptions { knobs.push_back(std::make_pair(syn, args.OptionArg())); break; } + case OPT_PROFILER: { + std::string syn = args.OptionSyntax(); + std::string_view key = syn; + auto prefix = "--profiler_"sv; + if (key.find(prefix) != 0) { + fprintf(stderr, "ERROR: unable to parse profiler option '%s'\n", syn.c_str()); + flushAndExit(FDB_EXIT_ERROR); + } + key.remove_prefix(prefix.size()); + profilerConfig.emplace(key, args.OptionArg()); + break; + }; case OPT_UNITTESTPARAM: { std::string syn = args.OptionSyntax(); if (!StringRef(syn).startsWith(LiteralStringRef("--test_"))) { @@ -1454,6 +1477,13 @@ struct CLIOptions { } } + try { + ProfilerConfig::instance().reset(profilerConfig); + } catch (ConfigError& e) { + printf("Error seting up profiler: %s", e.description.c_str()); + flushAndExit(FDB_EXIT_ERROR); + } + if (seedConnString.length() && seedConnFile.length()) { fprintf( stderr, "%s\n", "--seed_cluster_file and --seed_connection_string may not both be specified at once.");