diff --git a/bazel/external/libcircllhist.BUILD b/bazel/external/libcircllhist.BUILD new file mode 100644 index 000000000000..4e109f0b38d4 --- /dev/null +++ b/bazel/external/libcircllhist.BUILD @@ -0,0 +1,9 @@ +cc_library( + name = "libcircllhist", + srcs = ["src/circllhist.c"], + hdrs = [ + "src/circllhist.h", + ], + includes = ["src"], + visibility = ["//visibility:public"], +) diff --git a/bazel/repositories.bzl b/bazel/repositories.bzl index d43299656af3..5df5199048f0 100644 --- a/bazel/repositories.bzl +++ b/bazel/repositories.bzl @@ -226,6 +226,7 @@ def envoy_dependencies(path = "@envoy_deps//", skip_targets = []): _boringssl() _com_google_absl() _com_github_bombela_backward() + _com_github_circonus_labs_libcircllhist() _com_github_cyan4973_xxhash() _com_github_eile_tclap() _com_github_fmtlib_fmt() @@ -265,6 +266,16 @@ def _com_github_bombela_backward(): actual = "@com_github_bombela_backward//:backward", ) +def _com_github_circonus_labs_libcircllhist(): + _repository_impl( + name = "com_github_circonus_labs_libcircllhist", + build_file = "@envoy//bazel/external:libcircllhist.BUILD", + ) + native.bind( + name = "libcircllhist", + actual = "@com_github_circonus_labs_libcircllhist//:libcircllhist", + ) + def _com_github_cyan4973_xxhash(): _repository_impl( name = "com_github_cyan4973_xxhash", diff --git a/bazel/repository_locations.bzl b/bazel/repository_locations.bzl index bd82a390a2c4..65a01fa7f2ad 100644 --- a/bazel/repository_locations.bzl +++ b/bazel/repository_locations.bzl @@ -12,6 +12,10 @@ REPOSITORY_LOCATIONS = dict( commit = "44ae9609e860e3428cd057f7052e505b4819eb84", # 2018-02-06 remote = "https://github.com/bombela/backward-cpp", ), + com_github_circonus_labs_libcircllhist = dict( + commit = "0c44450723e34c9d8768e69b11bf919be83fd2ed", # 2018-04-30 + remote = "https://github.com/circonus-labs/libcircllhist", + ), com_github_cyan4973_xxhash = dict( commit = "7cc9639699f64b750c0b82333dced9ea77e8436e", # v0.6.5 remote = "https://github.com/Cyan4973/xxHash", diff --git a/docs/root/intro/version_history.rst b/docs/root/intro/version_history.rst index 361c53377314..bcf8360f7ad4 100644 --- a/docs/root/intro/version_history.rst +++ b/docs/root/intro/version_history.rst @@ -64,6 +64,7 @@ Version history `. * sockets: added `SO_KEEPALIVE` socket option for upstream connections :ref:`per cluster `. +* stats: added support for histograms. * tracing: the sampling decision is now delegated to the tracers, allowing the tracer to decide when and if to use it. For example, if the :ref:`x-b3-sampled ` header is supplied with the client request, its value will override any sampling decision made by the Envoy proxy. diff --git a/docs/root/operations/admin.rst b/docs/root/operations/admin.rst index 69ce90f25c59..fbe401729aa6 100644 --- a/docs/root/operations/admin.rst +++ b/docs/root/operations/admin.rst @@ -181,10 +181,12 @@ The fields are: .. http:get:: /stats - Outputs all statistics on demand. This includes only counters and gauges. Histograms are not - output as Envoy currently has no built in histogram support and relies on statsd for - aggregation. This command is very useful for local debugging. See :ref:`here ` - for more information. + Outputs all statistics on demand. This command is very useful for local debugging. + Histograms will output the computed quantiles i.e P0,P25,P50,P75,P90,P99,P99.9 and P100. + The output for each quantile will be in the form of (interval,cumulative) where interval value + represents the summary since last flush interval and cumulative value represents the + summary since the start of envoy instance. + See :ref:`here ` for more information. .. http:get:: /stats?format=json diff --git a/include/envoy/stats/stats.h b/include/envoy/stats/stats.h index b84e811047fa..f11045a6169d 100644 --- a/include/envoy/stats/stats.h +++ b/include/envoy/stats/stats.h @@ -2,6 +2,7 @@ #include #include +#include #include #include #include @@ -114,6 +115,11 @@ class Metric { * Returns the name of the Metric with the portions designated as tags removed. */ virtual const std::string& tagExtractedName() const PURE; + + /** + * Indicates whether this metric has been updated since the server was started. + */ + virtual bool used() const PURE; }; /** @@ -128,7 +134,6 @@ class Counter : public virtual Metric { virtual void inc() PURE; virtual uint64_t latch() PURE; virtual void reset() PURE; - virtual bool used() const PURE; virtual uint64_t value() const PURE; }; @@ -146,12 +151,34 @@ class Gauge : public virtual Metric { virtual void inc() PURE; virtual void set(uint64_t value) PURE; virtual void sub(uint64_t amount) PURE; - virtual bool used() const PURE; virtual uint64_t value() const PURE; }; typedef std::shared_ptr GaugeSharedPtr; +/** + * Holds the computed statistics for a histogram. + */ +class HistogramStatistics { +public: + virtual ~HistogramStatistics() {} + + /** + * Returns summary representation of the histogram. + */ + virtual std::string summary() const PURE; + + /** + * Returns supported quantiles. + */ + virtual const std::vector& supportedQuantiles() const PURE; + + /** + * Returns computed quantile values during the period. + */ + virtual const std::vector& computedQuantiles() const PURE; +}; + /** * A histogram that records values one at a time. * Note: Histograms now incorporate what used to be timers because the only difference between the @@ -171,6 +198,32 @@ class Histogram : public virtual Metric { typedef std::shared_ptr HistogramSharedPtr; +/** + * A histogram that is stored in main thread and provides summary view of the histogram. + */ +class ParentHistogram : public virtual Histogram { +public: + virtual ~ParentHistogram() {} + + /** + * This method is called during the main stats flush process for each of the histograms and used + * to merge the histogram values. + */ + virtual void merge() PURE; + + /** + * Returns the interval histogram summary statistics for the flush interval. + */ + virtual const HistogramStatistics& intervalStatistics() const PURE; + + /** + * Returns the cumulative histogram summary statistics. + */ + virtual const HistogramStatistics& cumulativeStatistics() const PURE; +}; + +typedef std::shared_ptr ParentHistogramSharedPtr; + /** * A sink for stats. Each sink is responsible for writing stats to a backing store. */ @@ -194,6 +247,11 @@ class Sink { */ virtual void flushGauge(const Gauge& gauge, uint64_t value) PURE; + /** + * Flush a histogram. + */ + virtual void flushHistogram(const ParentHistogram& histogram) PURE; + /** * This will be called after beginFlush(), some number of flushCounter(), and some number of * flushGauge(). Sinks can use this to optimize writing if desired. @@ -263,10 +321,20 @@ class Store : public Scope { * @return a list of all known gauges. */ virtual std::list gauges() const PURE; + + /** + * @return a list of all known histograms. + */ + virtual std::list histograms() const PURE; }; typedef std::unique_ptr StorePtr; +/** + * Callback invoked when a store's mergeHistogram() runs. + */ +typedef std::function PostMergeCb; + /** * The root of the stat store. */ @@ -294,6 +362,15 @@ class StoreRoot : public Store { * down. */ virtual void shutdownThreading() PURE; + + /** + * Called during the flush process to merge all the thread local histograms. The passed in + * callback will be called on the main thread, but it will happen after the method returns + * which means that the actual flush process will happen on the main thread after this method + * returns. It is expected that only one merge runs at any time and concurrent calls to this + * method would be asserted. + */ + virtual void mergeHistograms(PostMergeCb merge_complete_cb) PURE; }; typedef std::unique_ptr StoreRootPtr; diff --git a/include/envoy/thread_local/thread_local.h b/include/envoy/thread_local/thread_local.h index 1db262c95720..c6eb0b54bb4a 100644 --- a/include/envoy/thread_local/thread_local.h +++ b/include/envoy/thread_local/thread_local.h @@ -46,6 +46,15 @@ class Slot { */ virtual void runOnAllThreads(Event::PostCb cb) PURE; + /** + * Run a callback on all registered threads with a barrier. A shutdown initiated during the + * running of the PostCBs may prevent all_threads_complete_cb from being called. + * @param cb supplies the callback to run on each thread. + * @param all_threads_complete_cb supplies the callback to run on main thread after cb has + * been run on all registered threads. + */ + virtual void runOnAllThreads(Event::PostCb cb, Event::PostCb all_threads_complete_cb) PURE; + /** * Set thread local data on all threads previously registered via registerThread(). * @param initializeCb supplies the functor that will be called *on each thread*. The functor diff --git a/source/common/common/logger.h b/source/common/common/logger.h index 3eae68bd69ad..f51dcb4344bf 100644 --- a/source/common/common/logger.h +++ b/source/common/common/logger.h @@ -41,7 +41,9 @@ namespace Logger { FUNCTION(testing) \ FUNCTION(tracing) \ FUNCTION(upstream) \ - FUNCTION(grpc) + FUNCTION(grpc) \ + FUNCTION(stats) + enum class Id { ALL_LOGGER_IDS(GENERATE_ENUM) diff --git a/source/common/stats/BUILD b/source/common/stats/BUILD index 1d6698e72c95..04ef67fafcf1 100644 --- a/source/common/stats/BUILD +++ b/source/common/stats/BUILD @@ -12,12 +12,17 @@ envoy_cc_library( name = "stats_lib", srcs = ["stats_impl.cc"], hdrs = ["stats_impl.h"], + external_deps = [ + "abseil_optional", + "libcircllhist", + ], deps = [ "//include/envoy/common:time_interface", "//include/envoy/server:options_interface", "//include/envoy/stats:stats_interface", "//source/common/common:assert_lib", "//source/common/common:hash_lib", + "//source/common/common:non_copyable", "//source/common/common:perf_annotation_lib", "//source/common/common:utility_lib", "//source/common/config:well_known_names", diff --git a/source/common/stats/stats_impl.cc b/source/common/stats/stats_impl.cc index 1d38e39e9125..6d3dace50956 100644 --- a/source/common/stats/stats_impl.cc +++ b/source/common/stats/stats_impl.cc @@ -277,5 +277,38 @@ void RawStatData::initialize(absl::string_view key) { name_[xfer_size] = '\0'; } +HistogramStatisticsImpl::HistogramStatisticsImpl(const histogram_t* histogram_ptr) + : computed_quantiles_(supportedQuantiles().size(), 0.0) { + hist_approx_quantile(histogram_ptr, supportedQuantiles().data(), supportedQuantiles().size(), + computed_quantiles_.data()); +} + +const std::vector& HistogramStatisticsImpl::supportedQuantiles() const { + static const std::vector supported_quantiles = {0, 0.25, 0.5, 0.75, 0.90, + 0.95, 0.99, 0.999, 1}; + return supported_quantiles; +} + +std::string HistogramStatisticsImpl::summary() const { + std::vector summary; + const std::vector& supported_quantiles_ref = supportedQuantiles(); + summary.reserve(supported_quantiles_ref.size()); + for (size_t i = 0; i < supported_quantiles_ref.size(); ++i) { + summary.push_back( + fmt::format("P{}: {}", 100 * supported_quantiles_ref[i], computed_quantiles_[i])); + } + return absl::StrJoin(summary, ", "); +} + +/** + * Clears the old computed values and refreshes it with values computed from passed histogram. + */ +void HistogramStatisticsImpl::refresh(const histogram_t* new_histogram_ptr) { + std::fill(computed_quantiles_.begin(), computed_quantiles_.end(), 0.0); + ASSERT(supportedQuantiles().size() == computed_quantiles_.size()); + hist_approx_quantile(new_histogram_ptr, supportedQuantiles().data(), supportedQuantiles().size(), + computed_quantiles_.data()); +} + } // namespace Stats } // namespace Envoy diff --git a/source/common/stats/stats_impl.h b/source/common/stats/stats_impl.h index f900a46c98e3..7168e00ba7b5 100644 --- a/source/common/stats/stats_impl.h +++ b/source/common/stats/stats_impl.h @@ -18,10 +18,13 @@ #include "common/common/assert.h" #include "common/common/hash.h" +#include "common/common/non_copyable.h" #include "common/common/utility.h" #include "common/protobuf/protobuf.h" +#include "absl/strings/str_join.h" #include "absl/strings/string_view.h" +#include "circllhist.h" namespace Envoy { namespace Stats { @@ -167,9 +170,6 @@ class Utility { * RawStatData::size() instead. */ struct RawStatData { - struct Flags { - static const uint8_t Used = 0x1; - }; /** * Due to the flexible-array-length of name_, c-style allocation @@ -284,6 +284,14 @@ class MetricImpl : public virtual Metric { const std::string& tagExtractedName() const override { return tag_extracted_name_; } const std::vector& tags() const override { return tags_; } +protected: + /** + * Flags used by all stats types to figure out whether they have been used. + */ + struct Flags { + static const uint8_t Used = 0x1; + }; + private: const std::string name_; const std::string tag_extracted_name_; @@ -305,13 +313,13 @@ class CounterImpl : public Counter, public MetricImpl { void add(uint64_t amount) override { data_.value_ += amount; data_.pending_increment_ += amount; - data_.flags_ |= RawStatData::Flags::Used; + data_.flags_ |= Flags::Used; } void inc() override { add(1); } uint64_t latch() override { return data_.pending_increment_.exchange(0); } void reset() override { data_.value_ = 0; } - bool used() const override { return data_.flags_ & RawStatData::Flags::Used; } + bool used() const override { return data_.flags_ & Flags::Used; } uint64_t value() const override { return data_.value_; } private: @@ -333,13 +341,13 @@ class GaugeImpl : public Gauge, public MetricImpl { // Stats::Gauge virtual void add(uint64_t amount) override { data_.value_ += amount; - data_.flags_ |= RawStatData::Flags::Used; + data_.flags_ |= Flags::Used; } virtual void dec() override { sub(1); } virtual void inc() override { add(1); } virtual void set(uint64_t value) override { data_.value_ = value; - data_.flags_ |= RawStatData::Flags::Used; + data_.flags_ |= Flags::Used; } virtual void sub(uint64_t amount) override { ASSERT(data_.value_ >= amount); @@ -347,13 +355,37 @@ class GaugeImpl : public Gauge, public MetricImpl { data_.value_ -= amount; } virtual uint64_t value() const override { return data_.value_; } - bool used() const override { return data_.flags_ & RawStatData::Flags::Used; } + bool used() const override { return data_.flags_ & Flags::Used; } private: RawStatData& data_; RawStatDataAllocator& alloc_; }; +/** + * Implementation of HistogramStatistics for circllhist. + */ +class HistogramStatisticsImpl : public HistogramStatistics, NonCopyable { +public: + HistogramStatisticsImpl() : computed_quantiles_(supportedQuantiles().size(), 0.0) {} + /** + * HistogramStatisticsImpl object is constructed using the passed in histogram. + * @param histogram_ptr pointer to the histogram for which stats will be calculated. This pointer + * will not be retained. + */ + HistogramStatisticsImpl(const histogram_t* histogram_ptr); + + void refresh(const histogram_t* new_histogram_ptr); + + // HistogramStatistics + std::string summary() const override; + const std::vector& supportedQuantiles() const override; + const std::vector& computedQuantiles() const override { return computed_quantiles_; } + +private: + std::vector computed_quantiles_; +}; + /** * Histogram implementation for the heap. */ @@ -366,6 +398,10 @@ class HistogramImpl : public Histogram, public MetricImpl { // Stats::Histogram void recordValue(uint64_t value) override { parent_.deliverHistogramToSinks(*this, value); } + bool used() const override { return true; } + +private: + // This is used for delivering the histogram data to sinks. Store& parent_; }; @@ -446,6 +482,9 @@ class IsolatedStoreImpl : public Store { // Stats::Store std::list counters() const override { return counters_.toList(); } std::list gauges() const override { return gauges_.toList(); } + std::list histograms() const override { + return std::list{}; + } private: struct ScopeImpl : public Scope { diff --git a/source/common/stats/thread_local_store.cc b/source/common/stats/thread_local_store.cc index f7f8939fddbb..407e3bec4340 100644 --- a/source/common/stats/thread_local_store.cc +++ b/source/common/stats/thread_local_store.cc @@ -61,6 +61,26 @@ std::list ThreadLocalStoreImpl::gauges() const { return ret; } +std::list ThreadLocalStoreImpl::histograms() const { + // Handle de-dup due to overlapping scopes. + std::list ret; + std::unordered_set names; + std::unique_lock lock(lock_); + // TODO(ramaraochavali): As histograms don't share storage, there is a chance of duplicate names + // here. We need to create global storage for histograms similar to how we have a central storage + // in shared memory for counters/gauges. In the interim, no de-dup is done here. This may result + // in histograms with duplicate names, but until shared storage is implementing it's ultimately + // less confusing for users who have such configs. + for (ScopeImpl* scope : scopes_) { + for (const auto& name_histogram_pair : scope->central_cache_.histograms_) { + const ParentHistogramSharedPtr& parent_hist = name_histogram_pair.second; + ret.push_back(parent_hist); + } + } + + return ret; +} + void ThreadLocalStoreImpl::initializeThreading(Event::Dispatcher& main_thread_dispatcher, ThreadLocal::Instance& tls) { main_thread_dispatcher_ = &main_thread_dispatcher; @@ -75,6 +95,33 @@ void ThreadLocalStoreImpl::shutdownThreading() { shutting_down_ = true; } +void ThreadLocalStoreImpl::mergeHistograms(PostMergeCb merge_complete_cb) { + ASSERT(!merge_in_progress_); + if (!shutting_down_) { + merge_in_progress_ = true; + tls_->runOnAllThreads( + [this]() -> void { + for (const auto& scopes : tls_->getTyped().scope_cache_) { + for (const auto& name_histogram_pair : scopes.second.histograms_) { + const TlsHistogramSharedPtr& tls_hist = name_histogram_pair.second; + tls_hist->beginMerge(); + } + } + }, + [this, merge_complete_cb]() -> void { mergeInternal(merge_complete_cb); }); + } +} + +void ThreadLocalStoreImpl::mergeInternal(PostMergeCb merge_complete_cb) { + if (!shutting_down_) { + for (const ParentHistogramSharedPtr& histogram : histograms()) { + histogram->merge(); + } + merge_complete_cb(); + merge_in_progress_ = false; + } +} + void ThreadLocalStoreImpl::releaseScopeCrossThread(ScopeImpl* scope) { std::unique_lock lock(lock_); ASSERT(scopes_.count(scope) == 1); @@ -208,10 +255,12 @@ Histogram& ThreadLocalStoreImpl::ScopeImpl::histogram(const std::string& name) { // See comments in counter(). There is no super clean way (via templates or otherwise) to // share this code so I'm leaving it largely duplicated for now. std::string final_name = prefix_ + name; - HistogramSharedPtr* tls_ref = nullptr; + ParentHistogramSharedPtr* tls_ref = nullptr; + if (!parent_.shutting_down_ && parent_.tls_) { - tls_ref = - &parent_.tls_->getTyped().scope_cache_[this->scope_id_].histograms_[final_name]; + tls_ref = &parent_.tls_->getTyped() + .scope_cache_[this->scope_id_] + .parent_histograms_[final_name]; } if (tls_ref && *tls_ref) { @@ -219,20 +268,130 @@ Histogram& ThreadLocalStoreImpl::ScopeImpl::histogram(const std::string& name) { } std::unique_lock lock(parent_.lock_); - HistogramSharedPtr& central_ref = central_cache_.histograms_[final_name]; + ParentHistogramImplSharedPtr& central_ref = central_cache_.histograms_[final_name]; if (!central_ref) { std::vector tags; std::string tag_extracted_name = parent_.getTagsForName(final_name, tags); - central_ref.reset( - new HistogramImpl(final_name, parent_, std::move(tag_extracted_name), std::move(tags))); + central_ref.reset(new ParentHistogramImpl(final_name, parent_, *this, + std::move(tag_extracted_name), std::move(tags))); } if (tls_ref) { *tls_ref = central_ref; } - return *central_ref; } +Histogram& ThreadLocalStoreImpl::ScopeImpl::tlsHistogram(const std::string& name, + ParentHistogramImpl& parent) { + // See comments in counter() which explains the logic here. + + // Here prefix will not be considered because, by the time ParentHistogram calls this method + // during recordValue, the prefix is already attached to the name. + TlsHistogramSharedPtr* tls_ref = nullptr; + if (!parent_.shutting_down_ && parent_.tls_) { + tls_ref = &parent_.tls_->getTyped().scope_cache_[this->scope_id_].histograms_[name]; + } + + if (tls_ref && *tls_ref) { + return **tls_ref; + } + + std::unique_lock lock(parent_.lock_); + std::vector tags; + std::string tag_extracted_name = parent_.getTagsForName(name, tags); + TlsHistogramSharedPtr hist_tls_ptr = std::make_shared( + name, std::move(tag_extracted_name), std::move(tags)); + + parent.addTlsHistogram(hist_tls_ptr); + + if (tls_ref) { + *tls_ref = hist_tls_ptr; + } + return *hist_tls_ptr; +} + +ThreadLocalHistogramImpl::ThreadLocalHistogramImpl(const std::string& name, + std::string&& tag_extracted_name, + std::vector&& tags) + : MetricImpl(name, std::move(tag_extracted_name), std::move(tags)), current_active_(0), + flags_(0), created_thread_id_(std::this_thread::get_id()) { + histograms_[0] = hist_alloc(); + histograms_[1] = hist_alloc(); +} + +ThreadLocalHistogramImpl::~ThreadLocalHistogramImpl() { + hist_free(histograms_[0]); + hist_free(histograms_[1]); +} + +void ThreadLocalHistogramImpl::recordValue(uint64_t value) { + ASSERT(std::this_thread::get_id() == created_thread_id_); + hist_insert_intscale(histograms_[current_active_], value, 0, 1); + flags_ |= Flags::Used; +} + +void ThreadLocalHistogramImpl::merge(histogram_t* target) { + histogram_t** other_histogram = &histograms_[otherHistogramIndex()]; + hist_accumulate(target, other_histogram, 1); + hist_clear(*other_histogram); +} + +ParentHistogramImpl::ParentHistogramImpl(const std::string& name, Store& parent, + TlsScope& tls_scope, std::string&& tag_extracted_name, + std::vector&& tags) + : MetricImpl(name, std::move(tag_extracted_name), std::move(tags)), parent_(parent), + tls_scope_(tls_scope), interval_histogram_(hist_alloc()), cumulative_histogram_(hist_alloc()), + interval_statistics_(interval_histogram_), cumulative_statistics_(cumulative_histogram_) {} + +ParentHistogramImpl::~ParentHistogramImpl() { + hist_free(interval_histogram_); + hist_free(cumulative_histogram_); +} + +void ParentHistogramImpl::recordValue(uint64_t value) { + Histogram& tls_histogram = tls_scope_.tlsHistogram(name(), *this); + tls_histogram.recordValue(value); + parent_.deliverHistogramToSinks(*this, value); +} + +bool ParentHistogramImpl::used() const { + std::unique_lock lock(merge_lock_); + return usedLockHeld(); +} + +void ParentHistogramImpl::merge() { + std::unique_lock lock(merge_lock_); + if (usedLockHeld()) { + hist_clear(interval_histogram_); + // Here we could copy all the pointers to TLS histograms in the tls_histogram_ list, + // then release the lock before we do the actual merge. However it is not a big deal + // because the tls_histogram merge is not that expensive as it is a single histogram + // merge and adding TLS histograms is rare. + for (const TlsHistogramSharedPtr& tls_histogram : tls_histograms_) { + tls_histogram->merge(interval_histogram_); + } + // Since TLS merge is done, we can release the lock here. + lock.unlock(); + hist_accumulate(cumulative_histogram_, &interval_histogram_, 1); + cumulative_statistics_.refresh(cumulative_histogram_); + interval_statistics_.refresh(interval_histogram_); + } +} + +void ParentHistogramImpl::addTlsHistogram(const TlsHistogramSharedPtr& hist_ptr) { + std::unique_lock lock(merge_lock_); + tls_histograms_.emplace_back(hist_ptr); +} + +bool ParentHistogramImpl::usedLockHeld() const { + for (const TlsHistogramSharedPtr& tls_histogram : tls_histograms_) { + if (tls_histogram->used()) { + return true; + } + } + return false; +} + } // namespace Stats } // namespace Envoy diff --git a/source/common/stats/thread_local_store.h b/source/common/stats/thread_local_store.h index 89af903dab3e..68b40ea8c1b3 100644 --- a/source/common/stats/thread_local_store.h +++ b/source/common/stats/thread_local_store.h @@ -16,6 +16,101 @@ namespace Envoy { namespace Stats { +/** + * A histogram that is stored in TLS and used to record values per thread. This holds two + * histograms, one to collect the values and other as backup that is used for merge process. The + * swap happens during the merge process. + */ +class ThreadLocalHistogramImpl : public Histogram, public MetricImpl { +public: + ThreadLocalHistogramImpl(const std::string& name, std::string&& tag_extracted_name, + std::vector&& tags); + ~ThreadLocalHistogramImpl(); + + void merge(histogram_t* target); + + /** + * Called in the beginning of merge process. Swaps the histogram used for collection so that we do + * not have to lock the histogram in high throughput TLS writes. + */ + void beginMerge() { + // This switches the current_active_ between 1 and 0. + ASSERT(std::this_thread::get_id() == created_thread_id_); + current_active_ = otherHistogramIndex(); + } + + // Stats::Histogram + void recordValue(uint64_t value) override; + bool used() const override { return flags_ & Flags::Used; } + +private: + uint64_t otherHistogramIndex() const { return 1 - current_active_; } + uint64_t current_active_; + histogram_t* histograms_[2]; + std::atomic flags_; + std::thread::id created_thread_id_; +}; + +typedef std::shared_ptr TlsHistogramSharedPtr; + +class TlsScope; + +/** + * Log Linear Histogram implementation that is stored in the main thread. + */ +class ParentHistogramImpl : public ParentHistogram, public MetricImpl { +public: + ParentHistogramImpl(const std::string& name, Store& parent, TlsScope& tlsScope, + std::string&& tag_extracted_name, std::vector&& tags); + ~ParentHistogramImpl(); + + void addTlsHistogram(const TlsHistogramSharedPtr& hist_ptr); + bool used() const override; + void recordValue(uint64_t value) override; + + /** + * This method is called during the main stats flush process for each of the histograms. It + * iterates through the TLS histograms and collects the histogram data of all of them + * in to "interval_histogram". Then the collected "interval_histogram" is merged to a + * "cumulative_histogram". + */ + void merge() override; + + const HistogramStatistics& intervalStatistics() const override { return interval_statistics_; } + const HistogramStatistics& cumulativeStatistics() const override { + return cumulative_statistics_; + } + +private: + bool usedLockHeld() const; + + Store& parent_; + TlsScope& tls_scope_; + histogram_t* interval_histogram_; + histogram_t* cumulative_histogram_; + HistogramStatisticsImpl interval_statistics_; + HistogramStatisticsImpl cumulative_statistics_; + mutable std::mutex merge_lock_; + std::list tls_histograms_; +}; + +typedef std::shared_ptr ParentHistogramImplSharedPtr; + +/** + * Class used to create ThreadLocalHistogram in the scope. + */ +class TlsScope : public Scope { +public: + virtual ~TlsScope() {} + + // TODO(ramaraochavali): Allow direct TLS access for the advanced consumers. + /** + * @return a ThreadLocalHistogram within the scope's namespace. + * @param name name of the histogram with scope prefix attached. + */ + virtual Histogram& tlsHistogram(const std::string& name, ParentHistogramImpl& parent) PURE; +}; + /** * Store implementation with thread local caching. This implementation supports the following * features: @@ -44,8 +139,28 @@ namespace Stats { * back to heap allocated stats if needed. NOTE: In this case, overlapping scopes will not share * the same backing store. This is to keep things simple, it could be done in the future if * needed. + * + * The threading model for managing histograms is as described below. + * Each Histogram implementation will have 2 parts. + * - "main" thread parent which is called "ParentHistogram". + * - "per-thread" collector which is called "ThreadLocalHistogram". + * Worker threads will write to ParentHistogram which checks whether a TLS histogram is available. + * If there is one it will write to it, otherwise creates new one and writes to it. + * During the flush process the following sequence is followed. + * - The main thread starts the flush process by posting a message to every worker which tells the + * worker to swap its "active" histogram with its "backup" histogram. This is acheived via a call + * to "beginMerge" method. + * - Each TLS histogram has 2 histograms it makes use of, swapping back and forth. It manages a + * current_active index via which it writes to the correct histogram. + * - When all workers have done, the main thread continues with the flush process where the + * "actual" merging happens. + * - As the active histograms are swapped in TLS histograms, on the main thread, we can be sure + * that no worker is writing into the "backup" histogram. + * - The main thread now goes through all histograms, collect them across each worker and + * accumulates in to "interval" histograms. + * - Finally the main "interval" histogram is merged to "cumulative" histogram. */ -class ThreadLocalStoreImpl : public StoreRoot { +class ThreadLocalStoreImpl : Logger::Loggable, public StoreRoot { public: ThreadLocalStoreImpl(RawStatDataAllocator& alloc); ~ThreadLocalStoreImpl(); @@ -62,8 +177,11 @@ class ThreadLocalStoreImpl : public StoreRoot { }; // Stats::Store + // TODO(ramaraochavali): Consider changing the implementation of these methods to use vectors and + // use std::sort, rather than inserting into a map and pulling it out for better performance. std::list counters() const override; std::list gauges() const override; + std::list histograms() const override; // Stats::StoreRoot void addSink(Sink& sink) override { timer_sinks_.push_back(sink); } @@ -74,14 +192,23 @@ class ThreadLocalStoreImpl : public StoreRoot { ThreadLocal::Instance& tls) override; void shutdownThreading() override; + void mergeHistograms(PostMergeCb mergeCb) override; + private: struct TlsCacheEntry { std::unordered_map counters_; std::unordered_map gauges_; - std::unordered_map histograms_; + std::unordered_map histograms_; + std::unordered_map parent_histograms_; + }; + + struct CentralCacheEntry { + std::unordered_map counters_; + std::unordered_map gauges_; + std::unordered_map histograms_; }; - struct ScopeImpl : public Scope { + struct ScopeImpl : public TlsScope { ScopeImpl(ThreadLocalStoreImpl& parent, const std::string& prefix) : scope_id_(next_scope_id_++), parent_(parent), prefix_(Utility::sanitizeStatsName(prefix)) {} @@ -95,13 +222,14 @@ class ThreadLocalStoreImpl : public StoreRoot { void deliverHistogramToSinks(const Histogram& histogram, uint64_t value) override; Gauge& gauge(const std::string& name) override; Histogram& histogram(const std::string& name) override; + Histogram& tlsHistogram(const std::string& name, ParentHistogramImpl& parent) override; static std::atomic next_scope_id_; const uint64_t scope_id_; ThreadLocalStoreImpl& parent_; const std::string prefix_; - TlsCacheEntry central_cache_; + CentralCacheEntry central_cache_; }; struct TlsCache : public ThreadLocal::ThreadLocalObject { @@ -124,6 +252,7 @@ class ThreadLocalStoreImpl : public StoreRoot { void clearScopeFromCaches(uint64_t scope_id); void releaseScopeCrossThread(ScopeImpl* scope); SafeAllocData safeAlloc(const std::string& name); + void mergeInternal(PostMergeCb mergeCb); RawStatDataAllocator& alloc_; Event::Dispatcher* main_thread_dispatcher_{}; @@ -134,6 +263,7 @@ class ThreadLocalStoreImpl : public StoreRoot { std::list> timer_sinks_; TagProducerPtr tag_producer_; std::atomic shutting_down_{}; + std::atomic merge_in_progress_{}; Counter& num_last_resort_stats_; HeapRawStatDataAllocator heap_allocator_; }; diff --git a/source/common/thread_local/thread_local_impl.cc b/source/common/thread_local/thread_local_impl.cc index e1805e3e85f3..b43737993ae3 100644 --- a/source/common/thread_local/thread_local_impl.cc +++ b/source/common/thread_local/thread_local_impl.cc @@ -91,6 +91,21 @@ void InstanceImpl::runOnAllThreads(Event::PostCb cb) { cb(); } +void InstanceImpl::runOnAllThreads(Event::PostCb cb, Event::PostCb all_threads_complete_cb) { + ASSERT(std::this_thread::get_id() == main_thread_id_); + ASSERT(!shutdown_); + std::shared_ptr> worker_count = + std::make_shared>(registered_threads_.size()); + for (Event::Dispatcher& dispatcher : registered_threads_) { + dispatcher.post([this, worker_count, cb, all_threads_complete_cb]() -> void { + cb(); + if (--*worker_count == 0) { + main_thread_dispatcher_->post(all_threads_complete_cb); + } + }); + } +} + void InstanceImpl::SlotImpl::set(InitializeCb cb) { ASSERT(std::this_thread::get_id() == parent_.main_thread_id_); ASSERT(!parent_.shutdown_); diff --git a/source/common/thread_local/thread_local_impl.h b/source/common/thread_local/thread_local_impl.h index 17a835ac5d0d..820cd1504a95 100644 --- a/source/common/thread_local/thread_local_impl.h +++ b/source/common/thread_local/thread_local_impl.h @@ -35,6 +35,9 @@ class InstanceImpl : Logger::Loggable, public Instance { // ThreadLocal::Slot ThreadLocalObjectSharedPtr get() override; void runOnAllThreads(Event::PostCb cb) override { parent_.runOnAllThreads(cb); } + void runOnAllThreads(Event::PostCb cb, Event::PostCb main_callback) override { + parent_.runOnAllThreads(cb, main_callback); + } void set(InitializeCb cb) override; InstanceImpl& parent_; @@ -48,6 +51,7 @@ class InstanceImpl : Logger::Loggable, public Instance { void removeSlot(SlotImpl& slot); void runOnAllThreads(Event::PostCb cb); + void runOnAllThreads(Event::PostCb cb, Event::PostCb main_callback); static void setThreadLocal(uint32_t index, ThreadLocalObjectSharedPtr object); static thread_local ThreadLocalData thread_local_data_; diff --git a/source/extensions/stat_sinks/common/statsd/statsd.h b/source/extensions/stat_sinks/common/statsd/statsd.h index f891cec85bed..43da3eeab4d8 100644 --- a/source/extensions/stat_sinks/common/statsd/statsd.h +++ b/source/extensions/stat_sinks/common/statsd/statsd.h @@ -55,6 +55,7 @@ class UdpStatsdSink : public Stats::Sink { void beginFlush() override {} void flushCounter(const Stats::Counter& counter, uint64_t delta) override; void flushGauge(const Stats::Gauge& gauge, uint64_t value) override; + void flushHistogram(const Stats::ParentHistogram&) override {} void endFlush() override {} void onHistogramComplete(const Stats::Histogram& histogram, uint64_t value) override; @@ -94,6 +95,8 @@ class TcpStatsdSink : public Stats::Sink { tls_->getTyped().flushGauge(gauge.name(), value); } + void flushHistogram(const Stats::ParentHistogram&) override {} + void endFlush() override { tls_->getTyped().endFlush(true); } void onHistogramComplete(const Stats::Histogram& histogram, uint64_t value) override { diff --git a/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.cc b/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.cc index 5aa68b19a3b8..d4c4e9d07970 100644 --- a/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.cc +++ b/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.cc @@ -61,6 +61,40 @@ void GrpcMetricsStreamerImpl::ThreadLocalStreamer::send( MetricsServiceSink::MetricsServiceSink(const GrpcMetricsStreamerSharedPtr& grpc_metrics_streamer) : grpc_metrics_streamer_(grpc_metrics_streamer) {} +void MetricsServiceSink::flushCounter(const Stats::Counter& counter, uint64_t) { + io::prometheus::client::MetricFamily* metrics_family = message_.add_envoy_metrics(); + metrics_family->set_type(io::prometheus::client::MetricType::COUNTER); + metrics_family->set_name(counter.name()); + auto* metric = metrics_family->add_metric(); + metric->set_timestamp_ms(std::chrono::system_clock::now().time_since_epoch().count()); + auto* counter_metric = metric->mutable_counter(); + counter_metric->set_value(counter.value()); +} + +void MetricsServiceSink::flushGauge(const Stats::Gauge& gauge, uint64_t value) { + io::prometheus::client::MetricFamily* metrics_family = message_.add_envoy_metrics(); + metrics_family->set_type(io::prometheus::client::MetricType::GAUGE); + metrics_family->set_name(gauge.name()); + auto* metric = metrics_family->add_metric(); + metric->set_timestamp_ms(std::chrono::system_clock::now().time_since_epoch().count()); + auto* gauage_metric = metric->mutable_gauge(); + gauage_metric->set_value(value); +} +void MetricsServiceSink::flushHistogram(const Stats::ParentHistogram& histogram) { + io::prometheus::client::MetricFamily* metrics_family = message_.add_envoy_metrics(); + metrics_family->set_type(io::prometheus::client::MetricType::SUMMARY); + metrics_family->set_name(histogram.name()); + auto* metric = metrics_family->add_metric(); + metric->set_timestamp_ms(std::chrono::system_clock::now().time_since_epoch().count()); + auto* summary_metric = metric->mutable_summary(); + const Stats::HistogramStatistics& hist_stats = histogram.intervalStatistics(); + for (size_t i = 0; i < hist_stats.supportedQuantiles().size(); i++) { + auto* quantile = summary_metric->add_quantile(); + quantile->set_quantile(hist_stats.supportedQuantiles()[i]); + quantile->set_value(hist_stats.computedQuantiles()[i]); + } +} + } // namespace MetricsService } // namespace StatSinks } // namespace Extensions diff --git a/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.h b/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.h index cafaafc9ad03..084712cad7c8 100644 --- a/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.h +++ b/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.h @@ -112,25 +112,9 @@ class MetricsServiceSink : public Stats::Sink { void beginFlush() override { message_.clear_envoy_metrics(); } - void flushCounter(const Stats::Counter& counter, uint64_t) override { - io::prometheus::client::MetricFamily* metrics_family = message_.add_envoy_metrics(); - metrics_family->set_type(io::prometheus::client::MetricType::COUNTER); - metrics_family->set_name(counter.name()); - auto* metric = metrics_family->add_metric(); - metric->set_timestamp_ms(std::chrono::system_clock::now().time_since_epoch().count()); - auto* counter_metric = metric->mutable_counter(); - counter_metric->set_value(counter.value()); - } - - void flushGauge(const Stats::Gauge& gauge, uint64_t value) override { - io::prometheus::client::MetricFamily* metrics_family = message_.add_envoy_metrics(); - metrics_family->set_type(io::prometheus::client::MetricType::GAUGE); - metrics_family->set_name(gauge.name()); - auto* metric = metrics_family->add_metric(); - metric->set_timestamp_ms(std::chrono::system_clock::now().time_since_epoch().count()); - auto* gauage_metric = metric->mutable_gauge(); - gauage_metric->set_value(value); - } + void flushCounter(const Stats::Counter& counter, uint64_t) override; + void flushGauge(const Stats::Gauge& gauge, uint64_t value) override; + void flushHistogram(const Stats::ParentHistogram& histogram) override; void endFlush() override { grpc_metrics_streamer_->send(message_); @@ -140,9 +124,7 @@ class MetricsServiceSink : public Stats::Sink { } } - void onHistogramComplete(const Stats::Histogram&, uint64_t) override { - // TODO : Need to figure out how to map existing histogram to Proto Model - } + void onHistogramComplete(const Stats::Histogram&, uint64_t) override {} private: GrpcMetricsStreamerSharedPtr grpc_metrics_streamer_; diff --git a/source/server/http/admin.cc b/source/server/http/admin.cc index 63a9e0c22d56..ef08d0e08b11 100644 --- a/source/server/http/admin.cc +++ b/source/server/http/admin.cc @@ -36,6 +36,7 @@ #include "common/network/listen_socket_impl.h" #include "common/profiler/profiler.h" #include "common/router/config_impl.h" +#include "common/stats/stats_impl.h" #include "common/upstream/host_utility.h" #include "extensions/access_loggers/file/file_access_log_impl.h" @@ -388,8 +389,6 @@ Http::Code AdminImpl::handlerServerInfo(absl::string_view, Http::HeaderMap&, Http::Code AdminImpl::handlerStats(absl::string_view url, Http::HeaderMap& response_headers, Buffer::Instance& response) { - // We currently don't support timers locally (only via statsd) so just group all the counters - // and gauges together, alpha sort them, and spit them out. Http::Code rc = Http::Code::OK; const Http::Utility::QueryParams params = Http::Utility::parseQueryString(url); std::map all_stats; @@ -401,18 +400,39 @@ Http::Code AdminImpl::handlerStats(absl::string_view url, Http::HeaderMap& respo all_stats.emplace(gauge->name(), gauge->value()); } + // TOOD(ramaraochavali): See the comment in ThreadLocalStoreImpl::histograms() for why we use a + // multimap here. This makes sure that duplicate histograms get output. When shared storage is + // implemented this can be switched back to a normal map. + std::multimap all_histograms; + for (const Stats::ParentHistogramSharedPtr& histogram : server_.stats().histograms()) { + std::vector summary; + const std::vector& supported_quantiles_ref = + histogram->intervalStatistics().supportedQuantiles(); + summary.reserve(supported_quantiles_ref.size()); + for (size_t i = 0; i < supported_quantiles_ref.size(); ++i) { + summary.push_back(fmt::format("P{}({},{})", 100 * supported_quantiles_ref[i], + histogram->intervalStatistics().computedQuantiles()[i], + histogram->cumulativeStatistics().computedQuantiles()[i])); + } + + all_histograms.emplace(histogram->name(), absl::StrJoin(summary, " ")); + } + if (params.size() == 0) { // No Arguments so use the standard. for (auto stat : all_stats) { response.add(fmt::format("{}: {}\n", stat.first, stat.second)); } + for (auto histogram : all_histograms) { + response.add(fmt::format("{}: {}\n", histogram.first, histogram.second)); + } } else { const std::string format_key = params.begin()->first; const std::string format_value = params.begin()->second; if (format_key == "format" && format_value == "json") { response_headers.insertContentType().value().setReference( Http::Headers::get().ContentTypeValues.Json); - response.add(AdminImpl::statsAsJson(all_stats)); + response.add(AdminImpl::statsAsJson(all_stats, server_.stats().histograms())); } else if (format_key == "format" && format_value == "prometheus") { return handlerPrometheusStats(url, response_headers, response); } else { @@ -453,6 +473,7 @@ std::string PrometheusStatsFormatter::metricName(const std::string& extractedNam return fmt::format("envoy_{0}", sanitizeName(extractedName)); } +// TODO(ramaraochavali): Add summary histogram output for Prometheus. uint64_t PrometheusStatsFormatter::statsAsPrometheus(const std::list& counters, const std::list& gauges, @@ -480,7 +501,9 @@ PrometheusStatsFormatter::statsAsPrometheus(const std::list& all_stats) { +std::string +AdminImpl::statsAsJson(const std::map& all_stats, + const std::list& all_histograms) { rapidjson::Document document; document.SetObject(); rapidjson::Value stats_array(rapidjson::kArrayType); @@ -496,6 +519,40 @@ std::string AdminImpl::statsAsJson(const std::map& all_st stat_obj.AddMember("value", stat_value, allocator); stats_array.PushBack(stat_obj, allocator); } + + for (const Stats::ParentHistogramSharedPtr& histogram : all_histograms) { + Value histogram_obj; + histogram_obj.SetObject(); + Value histogram_name; + histogram_name.SetString(histogram->name().c_str(), allocator); + histogram_obj.AddMember("name", histogram_name, allocator); + + rapidjson::Value quantile_array(rapidjson::kArrayType); + + // TODO(ramaraochavali): consider optimizing the model here. Quantiles can be added once, + // followed by two arrays interval and cumulative. + for (size_t i = 0; i < histogram->intervalStatistics().supportedQuantiles().size(); ++i) { + Value quantile_obj; + quantile_obj.SetObject(); + Value quantile_type; + quantile_type.SetDouble(histogram->intervalStatistics().supportedQuantiles()[i] * 100); + quantile_obj.AddMember("quantile", quantile_type, allocator); + Value interval_value; + if (!std::isnan(histogram->intervalStatistics().computedQuantiles()[i])) { + interval_value.SetDouble(histogram->intervalStatistics().computedQuantiles()[i]); + } + quantile_obj.AddMember("interval_value", interval_value, allocator); + Value cumulative_value; + if (!std::isnan(histogram->cumulativeStatistics().computedQuantiles()[i])) { + cumulative_value.SetDouble(histogram->cumulativeStatistics().computedQuantiles()[i]); + } + quantile_obj.AddMember("cumulative_value", cumulative_value, allocator); + quantile_array.PushBack(quantile_obj, allocator); + } + histogram_obj.AddMember("quantiles", quantile_array, allocator); + stats_array.PushBack(histogram_obj, allocator); + } + document.AddMember("stats", stats_array, allocator); rapidjson::StringBuffer strbuf; rapidjson::PrettyWriter writer(strbuf); diff --git a/source/server/http/admin.h b/source/server/http/admin.h index cf1740432a6e..0196b2688d21 100644 --- a/source/server/http/admin.h +++ b/source/server/http/admin.h @@ -134,7 +134,8 @@ class AdminImpl : public Admin, void addOutlierInfo(const std::string& cluster_name, const Upstream::Outlier::Detector* outlier_detector, Buffer::Instance& response); - static std::string statsAsJson(const std::map& all_stats); + static std::string statsAsJson(const std::map& all_stats, + const std::list& all_histograms); static std::string runtimeAsJson(const std::vector>& entries); std::vector sortedHandlers() const; diff --git a/source/server/server.cc b/source/server/server.cc index e5d7aac0a976..971a299e6fe7 100644 --- a/source/server/server.cc +++ b/source/server/server.cc @@ -99,8 +99,8 @@ void InstanceImpl::failHealthcheck(bool fail) { server_stats_->live_.set(!fail); } -void InstanceUtil::flushCountersAndGaugesToSinks(const std::list& sinks, - Stats::Store& store) { +void InstanceUtil::flushMetricsToSinks(const std::list& sinks, + Stats::Store& store) { for (const auto& sink : sinks) { sink->beginFlush(); } @@ -122,6 +122,14 @@ void InstanceUtil::flushCountersAndGaugesToSinks(const std::list } } + for (const Stats::ParentHistogramSharedPtr& histogram : store.histograms()) { + if (histogram->used()) { + for (const auto& sink : sinks) { + sink->flushHistogram(*histogram); + } + } + } + for (const auto& sink : sinks) { sink->endFlush(); } @@ -129,19 +137,23 @@ void InstanceUtil::flushCountersAndGaugesToSinks(const std::list void InstanceImpl::flushStats() { ENVOY_LOG(debug, "flushing stats"); - HotRestart::GetParentStatsInfo info; - restarter_.getParentStats(info); - server_stats_->uptime_.set(time(nullptr) - original_start_time_); - server_stats_->memory_allocated_.set(Memory::Stats::totalCurrentlyAllocated() + - info.memory_allocated_); - server_stats_->memory_heap_size_.set(Memory::Stats::totalCurrentlyReserved()); - server_stats_->parent_connections_.set(info.num_connections_); - server_stats_->total_connections_.set(numConnections() + info.num_connections_); - server_stats_->days_until_first_cert_expiring_.set( - sslContextManager().daysUntilFirstCertExpires()); - - InstanceUtil::flushCountersAndGaugesToSinks(config_->statsSinks(), stats_store_); - stat_flush_timer_->enableTimer(config_->statsFlushInterval()); + // A shutdown initiated before this callback may prevent this from being called as per + // the semantics documented in ThreadLocal's runOnAllThreads method. + stats_store_.mergeHistograms([this]() -> void { + HotRestart::GetParentStatsInfo info; + restarter_.getParentStats(info); + server_stats_->uptime_.set(time(nullptr) - original_start_time_); + server_stats_->memory_allocated_.set(Memory::Stats::totalCurrentlyAllocated() + + info.memory_allocated_); + server_stats_->memory_heap_size_.set(Memory::Stats::totalCurrentlyReserved()); + server_stats_->parent_connections_.set(info.num_connections_); + server_stats_->total_connections_.set(numConnections() + info.num_connections_); + server_stats_->days_until_first_cert_expiring_.set( + sslContextManager().daysUntilFirstCertExpires()); + InstanceUtil::flushMetricsToSinks(config_->statsSinks(), stats_store_); + // TODO(ramaraochavali): consider adding different flush interval for histograms. + stat_flush_timer_->enableTimer(config_->statsFlushInterval()); + }); } void InstanceImpl::getParentStats(HotRestart::GetParentStatsInfo& info) { diff --git a/source/server/server.h b/source/server/server.h index 05af18cdfe56..2b1f31c243f5 100644 --- a/source/server/server.h +++ b/source/server/server.h @@ -83,13 +83,13 @@ class InstanceUtil : Logger::Loggable { static Runtime::LoaderPtr createRuntime(Instance& server, Server::Configuration::Initial& config); /** - * Helper for flushing counters and gauges to sinks. This takes care of calling beginFlush(), - * latching of counters and flushing, flushing of gauges, and calling endFlush(), on each sink. + * Helper for flushing counters, gauges and hisograms to sinks. This takes care of calling + * beginFlush(), latching of counters and flushing, flushing of gauges, and calling endFlush(), on + * each sink. * @param sinks supplies the list of sinks. * @param store supplies the store to flush. */ - static void flushCountersAndGaugesToSinks(const std::list& sinks, - Stats::Store& store); + static void flushMetricsToSinks(const std::list& sinks, Stats::Store& store); /** * Load a bootstrap config from either v1 or v2 and perform validation. @@ -209,5 +209,5 @@ class InstanceImpl : Logger::Loggable, public Instance { std::unique_ptr file_logger_; }; -} // Server +} // namespace Server } // namespace Envoy diff --git a/test/common/stats/thread_local_store_test.cc b/test/common/stats/thread_local_store_test.cc index 9b352c236d48..803cdd54d39a 100644 --- a/test/common/stats/thread_local_store_test.cc +++ b/test/common/stats/thread_local_store_test.cc @@ -11,6 +11,7 @@ #include "test/mocks/thread_local/mocks.h" #include "test/test_common/utility.h" +#include "absl/strings/str_split.h" #include "gmock/gmock.h" #include "gtest/gtest.h" @@ -90,6 +91,120 @@ class StatsThreadLocalStoreTest : public testing::Test, public RawStatDataAlloca std::unique_ptr store_; }; +class HistogramTest : public testing::Test, public RawStatDataAllocator { +public: + typedef std::map NameHistogramMap; + + void SetUp() override { + ON_CALL(*this, alloc(_)).WillByDefault(Invoke([this](const std::string& name) -> RawStatData* { + return alloc_.alloc(name); + })); + + ON_CALL(*this, free(_)).WillByDefault(Invoke([this](RawStatData& data) -> void { + return alloc_.free(data); + })); + + EXPECT_CALL(*this, alloc("stats.overflow")); + store_.reset(new ThreadLocalStoreImpl(*this)); + store_->addSink(sink_); + store_->initializeThreading(main_thread_dispatcher_, tls_); + } + + void TearDown() override { + store_->shutdownThreading(); + tls_.shutdownThread(); + // Includes overflow stat. + EXPECT_CALL(*this, free(_)); + } + + NameHistogramMap makeHistogramMap(const std::list& hist_list) { + NameHistogramMap name_histogram_map; + for (const Stats::ParentHistogramSharedPtr& histogram : hist_list) { + // Exclude the scope part of the name. + const std::vector& split_vector = absl::StrSplit(histogram->name(), '.'); + name_histogram_map.insert(std::make_pair(split_vector.back(), histogram)); + } + return name_histogram_map; + } + + /** + * Validates that Histogram merge happens as desired and returns the processed histogram count + * that can be asserted later. + */ + uint64_t validateMerge() { + bool merge_called = false; + store_->mergeHistograms([&merge_called]() -> void { merge_called = true; }); + + EXPECT_TRUE(merge_called); + + std::list histogram_list = store_->histograms(); + + histogram_t* hist1_cumulative = makeHistogram(h1_cumulative_values_); + histogram_t* hist2_cumulative = makeHistogram(h2_cumulative_values_); + histogram_t* hist1_interval = makeHistogram(h1_interval_values_); + histogram_t* hist2_interval = makeHistogram(h2_interval_values_); + + HistogramStatisticsImpl h1_cumulative_statistics(hist1_cumulative); + HistogramStatisticsImpl h2_cumulative_statistics(hist2_cumulative); + HistogramStatisticsImpl h1_interval_statistics(hist1_interval); + HistogramStatisticsImpl h2_interval_statistics(hist2_interval); + + NameHistogramMap name_histogram_map = makeHistogramMap(histogram_list); + const Stats::ParentHistogramSharedPtr& h1 = name_histogram_map["h1"]; + EXPECT_EQ(h1->cumulativeStatistics().summary(), h1_cumulative_statistics.summary()); + EXPECT_EQ(h1->intervalStatistics().summary(), h1_interval_statistics.summary()); + + if (histogram_list.size() > 1) { + const Stats::ParentHistogramSharedPtr& h2 = name_histogram_map["h2"]; + EXPECT_EQ(h2->cumulativeStatistics().summary(), h2_cumulative_statistics.summary()); + EXPECT_EQ(h2->intervalStatistics().summary(), h2_interval_statistics.summary()); + } + + hist_free(hist1_cumulative); + hist_free(hist2_cumulative); + hist_free(hist1_interval); + hist_free(hist2_interval); + + h1_interval_values_.clear(); + h2_interval_values_.clear(); + + return histogram_list.size(); + } + + void expectCallAndAccumulate(Histogram& histogram, uint64_t record_value) { + EXPECT_CALL(sink_, onHistogramComplete(Ref(histogram), record_value)); + histogram.recordValue(record_value); + + if (histogram.name() == "h1") { + h1_cumulative_values_.push_back(record_value); + h1_interval_values_.push_back(record_value); + } else { + h2_cumulative_values_.push_back(record_value); + h2_interval_values_.push_back(record_value); + } + } + + histogram_t* makeHistogram(const std::vector& values) { + histogram_t* histogram = hist_alloc(); + for (uint64_t value : values) { + hist_insert_intscale(histogram, value, 0, 1); + } + return histogram; + } + + MOCK_METHOD1(alloc, RawStatData*(const std::string& name)); + MOCK_METHOD1(free, void(RawStatData& data)); + + NiceMock main_thread_dispatcher_; + NiceMock tls_; + TestAllocator alloc_; + MockSink sink_; + std::unique_ptr store_; + InSequence s; + std::vector h1_cumulative_values_, h2_cumulative_values_, h1_interval_values_, + h2_interval_values_; +}; + TEST_F(StatsThreadLocalStoreTest, NoTls) { InSequence s; EXPECT_CALL(*this, alloc(_)).Times(2); @@ -102,6 +217,7 @@ TEST_F(StatsThreadLocalStoreTest, NoTls) { Histogram& h1 = store_->histogram("h1"); EXPECT_EQ(&h1, &store_->histogram("h1")); + EXPECT_CALL(sink_, onHistogramComplete(Ref(h1), 200)); h1.recordValue(200); EXPECT_CALL(sink_, onHistogramComplete(Ref(h1), 100)); @@ -343,5 +459,146 @@ TEST_F(StatsThreadLocalStoreTest, ShuttingDown) { EXPECT_CALL(*this, free(_)).Times(5); } +// Histogram tests +TEST_F(HistogramTest, BasicSingleHistogramMerge) { + Histogram& h1 = store_->histogram("h1"); + EXPECT_EQ("h1", h1.name()); + + expectCallAndAccumulate(h1, 0); + expectCallAndAccumulate(h1, 43); + expectCallAndAccumulate(h1, 41); + expectCallAndAccumulate(h1, 415); + expectCallAndAccumulate(h1, 2201); + expectCallAndAccumulate(h1, 3201); + expectCallAndAccumulate(h1, 125); + expectCallAndAccumulate(h1, 13); + + EXPECT_EQ(1, validateMerge()); +} + +TEST_F(HistogramTest, BasicMultiHistogramMerge) { + Histogram& h1 = store_->histogram("h1"); + Histogram& h2 = store_->histogram("h2"); + EXPECT_EQ("h1", h1.name()); + EXPECT_EQ("h2", h2.name()); + + expectCallAndAccumulate(h1, 1); + expectCallAndAccumulate(h2, 1); + expectCallAndAccumulate(h2, 2); + + EXPECT_EQ(2, validateMerge()); +} + +TEST_F(HistogramTest, MultiHistogramMultipleMerges) { + Histogram& h1 = store_->histogram("h1"); + Histogram& h2 = store_->histogram("h2"); + EXPECT_EQ("h1", h1.name()); + EXPECT_EQ("h2", h2.name()); + + // Insert one value in to one histogram and validate + expectCallAndAccumulate(h1, 1); + EXPECT_EQ(2, validateMerge()); + + // Insert value into second histogram and validate that it is merged properly. + expectCallAndAccumulate(h2, 1); + EXPECT_EQ(2, validateMerge()); + + // Insert more values into both the histograms and validate that it is merged properly. + expectCallAndAccumulate(h1, 2); + EXPECT_EQ(2, validateMerge()); + + expectCallAndAccumulate(h2, 3); + EXPECT_EQ(2, validateMerge()); + + expectCallAndAccumulate(h2, 2); + EXPECT_EQ(2, validateMerge()); + + // Do not insert any value and validate that intervalSummary is empty for both the histograms and + // cumulativeSummary has right values. + EXPECT_EQ(2, validateMerge()); +} + +TEST_F(HistogramTest, BasicScopeHistogramMerge) { + ScopePtr scope1 = store_->createScope("scope1."); + + Histogram& h1 = store_->histogram("h1"); + Histogram& h2 = scope1->histogram("h2"); + EXPECT_EQ("h1", h1.name()); + EXPECT_EQ("scope1.h2", h2.name()); + + expectCallAndAccumulate(h1, 2); + expectCallAndAccumulate(h2, 2); + EXPECT_EQ(2, validateMerge()); +} + +TEST_F(HistogramTest, BasicHistogramSummaryValidate) { + Histogram& h1 = store_->histogram("h1"); + Histogram& h2 = store_->histogram("h2"); + + expectCallAndAccumulate(h1, 1); + + EXPECT_EQ(2, validateMerge()); + + const std::string h1_expected_summary = + "P0: 1, P25: 1.025, P50: 1.05, P75: 1.075, P90: 1.09, P95: 1.095, " + "P99: 1.099, P99.9: 1.0999, P100: 1.1"; + const std::string h2_expected_summary = + "P0: 0, P25: 25, P50: 50, P75: 75, P90: 90, P95: 95, P99: 99, P99.9: 99.9, P100: 100"; + + for (size_t i = 0; i < 100; ++i) { + expectCallAndAccumulate(h2, i); + } + + EXPECT_EQ(2, validateMerge()); + + NameHistogramMap name_histogram_map = makeHistogramMap(store_->histograms()); + EXPECT_EQ(h1_expected_summary, name_histogram_map["h1"]->cumulativeStatistics().summary()); + EXPECT_EQ(h2_expected_summary, name_histogram_map["h2"]->cumulativeStatistics().summary()); +} + +// Validates the summary after known value merge in to same histogram. +TEST_F(HistogramTest, BasicHistogramMergeSummary) { + Histogram& h1 = store_->histogram("h1"); + + for (size_t i = 0; i < 50; ++i) { + expectCallAndAccumulate(h1, i); + } + EXPECT_EQ(1, validateMerge()); + + for (size_t i = 50; i < 100; ++i) { + expectCallAndAccumulate(h1, i); + } + EXPECT_EQ(1, validateMerge()); + + const std::string expected_summary = + "P0: 0, P25: 25, P50: 50, P75: 75, P90: 90, P95: 95, P99: 99, P99.9: 99.9, P100: 100"; + + NameHistogramMap name_histogram_map = makeHistogramMap(store_->histograms()); + EXPECT_EQ(expected_summary, name_histogram_map["h1"]->cumulativeStatistics().summary()); +} + +TEST_F(HistogramTest, BasicHistogramUsed) { + ScopePtr scope1 = store_->createScope("scope1."); + + Histogram& h1 = store_->histogram("h1"); + Histogram& h2 = scope1->histogram("h2"); + EXPECT_EQ("h1", h1.name()); + EXPECT_EQ("scope1.h2", h2.name()); + + EXPECT_CALL(sink_, onHistogramComplete(Ref(h1), 1)); + h1.recordValue(1); + + NameHistogramMap name_histogram_map = makeHistogramMap(store_->histograms()); + EXPECT_TRUE(name_histogram_map["h1"]->used()); + EXPECT_FALSE(name_histogram_map["h2"]->used()); + + EXPECT_CALL(sink_, onHistogramComplete(Ref(h2), 2)); + h2.recordValue(2); + + for (const Stats::ParentHistogramSharedPtr& histogram : store_->histograms()) { + EXPECT_TRUE(histogram->used()); + } +} + } // namespace Stats } // namespace Envoy diff --git a/test/common/thread_local/thread_local_impl_test.cc b/test/common/thread_local/thread_local_impl_test.cc index 8641bb6508b5..bb7fe0810ae8 100644 --- a/test/common/thread_local/thread_local_impl_test.cc +++ b/test/common/thread_local/thread_local_impl_test.cc @@ -82,6 +82,34 @@ TEST_F(ThreadLocalInstanceImplTest, All) { tls_.shutdownThread(); } +// TODO(ramaraochavali): Run this test with real threads. The current issue in the unit +// testing environment is, the post to main_dispatcher is not working as expected. + +// Validate ThreadLocal::runOnAllThreads behavior with all_thread_complete call back. +TEST_F(ThreadLocalInstanceImplTest, RunOnAllThreads) { + SlotPtr tlsptr = tls_.allocateSlot(); + + EXPECT_CALL(thread_dispatcher_, post(_)); + EXPECT_CALL(main_dispatcher_, post(_)); + + // Ensure that the thread local call back and all_thread_complete call back are called. + struct { + uint64_t thread_local_calls_{0}; + bool all_threads_complete_ = false; + } thread_status; + + tlsptr->runOnAllThreads([&thread_status]() -> void { ++thread_status.thread_local_calls_; }, + [&thread_status]() -> void { + EXPECT_EQ(thread_status.thread_local_calls_, 1); + thread_status.all_threads_complete_ = true; + }); + + EXPECT_TRUE(thread_status.all_threads_complete_); + + tls_.shutdownGlobalThreading(); + tls_.shutdownThread(); +} + // Validate ThreadLocal::InstanceImpl's dispatcher() behavior. TEST(ThreadLocalInstanceImplDispatcherTest, Dispatcher) { InstanceImpl tls; diff --git a/test/extensions/stats_sinks/metrics_service/grpc_metrics_service_impl_test.cc b/test/extensions/stats_sinks/metrics_service/grpc_metrics_service_impl_test.cc index b8adeb1700f6..42dba8622621 100644 --- a/test/extensions/stats_sinks/metrics_service/grpc_metrics_service_impl_test.cc +++ b/test/extensions/stats_sinks/metrics_service/grpc_metrics_service_impl_test.cc @@ -110,6 +110,10 @@ TEST(MetricsServiceSinkTest, CheckSendCall) { NiceMock gauge; gauge.name_ = "test_gauge"; sink.flushGauge(gauge, 1); + + NiceMock histogram; + histogram.name_ = "test_histogram"; + sink.flushHistogram(histogram); EXPECT_CALL(*streamer_, send(_)); sink.endFlush(); diff --git a/test/extensions/stats_sinks/metrics_service/metrics_service_integration_test.cc b/test/extensions/stats_sinks/metrics_service/metrics_service_integration_test.cc index c81aea51577c..907f6213cc67 100644 --- a/test/extensions/stats_sinks/metrics_service/metrics_service_integration_test.cc +++ b/test/extensions/stats_sinks/metrics_service/metrics_service_integration_test.cc @@ -68,6 +68,7 @@ class MetricsServiceIntegrationTest : public HttpIntegrationTest, request_msg.envoy_metrics(); bool known_counter_exists = false; bool known_gauge_exists = false; + bool known_histogram_exists = false; for (::io::prometheus::client::MetricFamily metrics_family : envoy_metrics) { if (metrics_family.name() == "cluster.cluster_0.membership_change" && metrics_family.type() == ::io::prometheus::client::MetricType::COUNTER) { @@ -79,13 +80,21 @@ class MetricsServiceIntegrationTest : public HttpIntegrationTest, known_gauge_exists = true; EXPECT_EQ(1, metrics_family.metric(0).gauge().value()); } + if (metrics_family.name() == "cluster.cluster_0.upstream_rq_time" && + metrics_family.type() == ::io::prometheus::client::MetricType::SUMMARY) { + known_histogram_exists = true; + Stats::HistogramStatisticsImpl empty_statistics; + EXPECT_EQ(metrics_family.metric(0).summary().quantile_size(), + empty_statistics.supportedQuantiles().size()); + } ASSERT(metrics_family.metric(0).has_timestamp_ms()); - if (known_counter_exists && known_gauge_exists) { + if (known_counter_exists && known_gauge_exists && known_histogram_exists) { break; } } EXPECT_TRUE(known_counter_exists); EXPECT_TRUE(known_gauge_exists); + EXPECT_TRUE(known_histogram_exists); } void cleanup() { @@ -102,18 +111,29 @@ class MetricsServiceIntegrationTest : public HttpIntegrationTest, INSTANTIATE_TEST_CASE_P(IpVersionsClientType, MetricsServiceIntegrationTest, GRPC_CLIENT_INTEGRATION_PARAMS); -// Test a basic full access logging flow. +// Test a basic metric service flow. TEST_P(MetricsServiceIntegrationTest, BasicFlow) { initialize(); + // Send an empty request so that histogram values merged for cluster_0. + codec_client_ = makeHttpConnection(makeClientConnection(lookupPort("http"))); + Http::TestHeaderMapImpl request_headers{{":method", "GET"}, + {":path", "/test/long/url"}, + {":scheme", "http"}, + {":authority", "host"}, + {"x-lyft-user-id", "123"}}; + sendRequestAndWaitForResponse(request_headers, 0, default_response_headers_, 0); + waitForMetricsServiceConnection(); waitForMetricsStream(); waitForMetricsRequest(); + // Send an empty response and end the stream. This should never happen but make sure nothing // breaks and we make a new stream on a follow up request. metrics_service_request_->startGrpcStream(); envoy::service::metrics::v2::StreamMetricsResponse response_msg; metrics_service_request_->sendGrpcMessage(response_msg); metrics_service_request_->finishGrpcStream(Grpc::Status::Ok); + switch (clientType()) { case Grpc::ClientType::EnvoyGrpc: test_server_->waitForGaugeEq("cluster.metrics_service.upstream_rq_active", 0); diff --git a/test/integration/server.h b/test/integration/server.h index 15591dcd213c..117dd6cae051 100644 --- a/test/integration/server.h +++ b/test/integration/server.h @@ -176,11 +176,17 @@ class TestIsolatedStoreImpl : public StoreRoot { return store_.gauges(); } + std::list histograms() const override { + std::unique_lock lock(lock_); + return store_.histograms(); + } + // Stats::StoreRoot void addSink(Sink&) override {} void setTagProducer(TagProducerPtr&&) override {} void initializeThreading(Event::Dispatcher&, ThreadLocal::Instance&) override {} void shutdownThreading() override {} + void mergeHistograms(PostMergeCb) override {} private: mutable std::mutex lock_; diff --git a/test/mocks/stats/mocks.cc b/test/mocks/stats/mocks.cc index cb107c441ce3..7aea6e68deb7 100644 --- a/test/mocks/stats/mocks.cc +++ b/test/mocks/stats/mocks.cc @@ -5,6 +5,7 @@ using testing::Invoke; using testing::NiceMock; +using testing::Return; using testing::ReturnRef; using testing::_; @@ -34,8 +35,23 @@ MockHistogram::MockHistogram() { ON_CALL(*this, tagExtractedName()).WillByDefault(ReturnRef(name_)); ON_CALL(*this, tags()).WillByDefault(ReturnRef(tags_)); } + MockHistogram::~MockHistogram() {} +MockParentHistogram::MockParentHistogram() { + ON_CALL(*this, recordValue(_)).WillByDefault(Invoke([this](uint64_t value) { + if (store_ != nullptr) { + store_->deliverHistogramToSinks(*this, value); + } + })); + ON_CALL(*this, tagExtractedName()).WillByDefault(ReturnRef(name_)); + ON_CALL(*this, tags()).WillByDefault(ReturnRef(tags_)); + ON_CALL(*this, intervalStatistics()).WillByDefault(ReturnRef(*histogram_stats_)); + ON_CALL(*this, cumulativeStatistics()).WillByDefault(ReturnRef(*histogram_stats_)); +} + +MockParentHistogram::~MockParentHistogram() {} + MockSink::MockSink() {} MockSink::~MockSink() {} diff --git a/test/mocks/stats/mocks.h b/test/mocks/stats/mocks.h index d70478d5b6a3..9df1ae1014a0 100644 --- a/test/mocks/stats/mocks.h +++ b/test/mocks/stats/mocks.h @@ -68,10 +68,35 @@ class MockHistogram : public Histogram { MOCK_CONST_METHOD0(tagExtractedName, const std::string&()); MOCK_CONST_METHOD0(tags, const std::vector&()); MOCK_METHOD1(recordValue, void(uint64_t value)); + MOCK_CONST_METHOD0(used, bool()); + + std::string name_; + std::vector tags_; + Store* store_; +}; + +class MockParentHistogram : public ParentHistogram { +public: + MockParentHistogram(); + ~MockParentHistogram(); + + // Note: cannot be mocked because it is accessed as a Property in a gmock EXPECT_CALL. This + // creates a deadlock in gmock and is an unintended use of mock functions. + const std::string& name() const override { return name_; }; + void merge() override {} + + MOCK_CONST_METHOD0(used, bool()); + MOCK_CONST_METHOD0(tagExtractedName, const std::string&()); + MOCK_CONST_METHOD0(tags, const std::vector&()); + MOCK_METHOD1(recordValue, void(uint64_t value)); + MOCK_CONST_METHOD0(cumulativeStatistics, const HistogramStatistics&()); + MOCK_CONST_METHOD0(intervalStatistics, const HistogramStatistics&()); std::string name_; std::vector tags_; Store* store_; + std::shared_ptr histogram_stats_ = + std::make_shared(); }; class MockSink : public Sink { @@ -82,6 +107,7 @@ class MockSink : public Sink { MOCK_METHOD0(beginFlush, void()); MOCK_METHOD2(flushCounter, void(const Counter& counter, uint64_t delta)); MOCK_METHOD2(flushGauge, void(const Gauge& gauge, uint64_t value)); + MOCK_METHOD1(flushHistogram, void(const ParentHistogram& histogram)); MOCK_METHOD0(endFlush, void()); MOCK_METHOD2(onHistogramComplete, void(const Histogram& histogram, uint64_t value)); }; @@ -100,6 +126,7 @@ class MockStore : public Store { MOCK_METHOD1(gauge, Gauge&(const std::string&)); MOCK_CONST_METHOD0(gauges, std::list()); MOCK_METHOD1(histogram, Histogram&(const std::string& name)); + MOCK_CONST_METHOD0(histograms, std::list()); testing::NiceMock counter_; std::vector> histograms_; diff --git a/test/mocks/thread_local/mocks.h b/test/mocks/thread_local/mocks.h index fb9d3ab01331..af871695420d 100644 --- a/test/mocks/thread_local/mocks.h +++ b/test/mocks/thread_local/mocks.h @@ -28,6 +28,11 @@ class MockInstance : public Instance { SlotPtr allocateSlot_() { return SlotPtr{new SlotImpl(*this, current_slot_++)}; } void runOnAllThreads_(Event::PostCb cb) { cb(); } + void runOnAllThreads(Event::PostCb cb, Event::PostCb main_callback) { + cb(); + main_callback(); + } + void shutdownThread_() { shutdown_ = true; // Reverse order which is same as the production code. @@ -53,6 +58,9 @@ class MockInstance : public Instance { // ThreadLocal::Slot ThreadLocalObjectSharedPtr get() override { return parent_.data_[index_]; } void runOnAllThreads(Event::PostCb cb) override { parent_.runOnAllThreads(cb); } + void runOnAllThreads(Event::PostCb cb, Event::PostCb main_callback) override { + parent_.runOnAllThreads(cb, main_callback); + } void set(InitializeCb cb) override { parent_.data_[index_] = cb(parent_.dispatcher_); } MockInstance& parent_; diff --git a/test/server/server_test.cc b/test/server/server_test.cc index edba829b7fa1..b831f7246679 100644 --- a/test/server/server_test.cc +++ b/test/server/server_test.cc @@ -36,7 +36,7 @@ TEST(ServerInstanceUtil, flushHelper) { std::list sinks; sinks.emplace_back(std::move(sink)); - InstanceUtil::flushCountersAndGaugesToSinks(sinks, store); + InstanceUtil::flushMetricsToSinks(sinks, store); } class RunHelperTest : public testing::Test {