From 22e6ecc29a0d9294569aae168ff5b45acdd795c2 Mon Sep 17 00:00:00 2001 From: yiguolei <676222867@qq.com> Date: Sat, 2 Dec 2023 20:33:54 +0800 Subject: [PATCH] [improvement](spinlock) remove some potential bad spinlock usage (#27904) * [improvement](spinlock) remove some potential spinlock usage --------- Co-authored-by: yiguolei --- be/src/runtime/query_statistics.cpp | 6 +- be/src/runtime/query_statistics.h | 4 +- be/src/runtime/runtime_filter_mgr.cpp | 1 - be/src/runtime/user_function_cache.cpp | 1 - be/src/util/metrics.cpp | 36 +++--- be/src/util/metrics.h | 23 ++-- be/src/util/streaming_sampler.h | 150 ------------------------- 7 files changed, 34 insertions(+), 187 deletions(-) delete mode 100644 be/src/util/streaming_sampler.h diff --git a/be/src/runtime/query_statistics.cpp b/be/src/runtime/query_statistics.cpp index 0a20f6d6e0a56d..68bc5c97de51e4 100644 --- a/be/src/runtime/query_statistics.cpp +++ b/be/src/runtime/query_statistics.cpp @@ -122,7 +122,7 @@ QueryStatistics::~QueryStatistics() { } void QueryStatisticsRecvr::insert(const PQueryStatistics& statistics, int sender_id) { - std::lock_guard l(_lock); + std::lock_guard l(_lock); if (!_query_statistics.contains(sender_id)) { _query_statistics[sender_id] = std::make_shared(); } @@ -132,12 +132,12 @@ void QueryStatisticsRecvr::insert(const PQueryStatistics& statistics, int sender void QueryStatisticsRecvr::insert(QueryStatisticsPtr statistics, int sender_id) { if (!statistics->collected()) return; if (_query_statistics.contains(sender_id)) return; - std::lock_guard l(_lock); + std::lock_guard l(_lock); _query_statistics[sender_id] = statistics; } QueryStatisticsPtr QueryStatisticsRecvr::find(int sender_id) { - std::lock_guard l(_lock); + std::lock_guard l(_lock); auto it = _query_statistics.find(sender_id); if (it != _query_statistics.end()) { return it->second; diff --git a/be/src/runtime/query_statistics.h b/be/src/runtime/query_statistics.h index e9df0e338abdc8..24f14323bd508a 100644 --- a/be/src/runtime/query_statistics.h +++ b/be/src/runtime/query_statistics.h @@ -155,14 +155,14 @@ class QueryStatisticsRecvr { friend class QueryStatistics; void merge(QueryStatistics* statistics) { - std::lock_guard l(_lock); + std::lock_guard l(_lock); for (auto& pair : _query_statistics) { statistics->merge(*(pair.second)); } } std::map _query_statistics; - SpinLock _lock; + std::mutex _lock; }; } // namespace doris diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index 73fcae57fcef27..93a6d6addffffa 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -319,7 +319,6 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ std::to_string(request->filter_id())); } } - // iter->second = pair{CntlVal,SpinLock} cntVal = iter->second.first; { std::lock_guard l(*iter->second.second); diff --git a/be/src/runtime/user_function_cache.cpp b/be/src/runtime/user_function_cache.cpp index 05b2fe0882e341..964a38ba9dcc8d 100644 --- a/be/src/runtime/user_function_cache.cpp +++ b/be/src/runtime/user_function_cache.cpp @@ -90,7 +90,6 @@ struct UserFunctionCacheEntry { // used to lookup a symbol void* lib_handle = nullptr; - SpinLock map_lock; // from symbol_name to function pointer std::unordered_map fptr_map; diff --git a/be/src/util/metrics.cpp b/be/src/util/metrics.cpp index de2d43c8548ceb..23dbb628a0d95b 100644 --- a/be/src/util/metrics.cpp +++ b/be/src/util/metrics.cpp @@ -123,7 +123,7 @@ std::string Metric::to_prometheus(const std::string& display_name, const Labels& std::map HistogramMetric::_s_output_percentiles = { {"0.50", 50.0}, {"0.75", 75.0}, {"0.90", 90.0}, {"0.95", 95.0}, {"0.99", 99.0}}; void HistogramMetric::clear() { - std::lock_guard l(_lock); + std::lock_guard l(_lock); _stats.clear(); } @@ -136,12 +136,12 @@ void HistogramMetric::add(const uint64_t& value) { } void HistogramMetric::merge(const HistogramMetric& other) { - std::lock_guard l(_lock); + std::lock_guard l(_lock); _stats.merge(other._stats); } void HistogramMetric::set_histogram(const HistogramStat& stats) { - std::lock_guard l(_lock); + std::lock_guard l(_lock); _stats.clear(); _stats.merge(stats); } @@ -228,7 +228,7 @@ std::string MetricPrototype::to_prometheus(const std::string& registry_name) con } void MetricEntity::deregister_metric(const MetricPrototype* metric_type) { - std::lock_guard l(_lock); + std::lock_guard l(_lock); auto metric = _metrics.find(metric_type); if (metric != _metrics.end()) { delete metric->second; @@ -238,7 +238,7 @@ void MetricEntity::deregister_metric(const MetricPrototype* metric_type) { Metric* MetricEntity::get_metric(const std::string& name, const std::string& group_name) const { MetricPrototype dummy(MetricType::UNTYPED, MetricUnit::NOUNIT, name, "", group_name); - std::lock_guard l(_lock); + std::lock_guard l(_lock); auto it = _metrics.find(&dummy); if (it == _metrics.end()) { return nullptr; @@ -247,7 +247,7 @@ Metric* MetricEntity::get_metric(const std::string& name, const std::string& gro } void MetricEntity::register_hook(const std::string& name, const std::function& hook) { - std::lock_guard l(_lock); + std::lock_guard l(_lock); #ifndef BE_TEST DCHECK(_hooks.find(name) == _hooks.end()) << "hook is already exist! " << _name << ":" << name; #endif @@ -255,7 +255,7 @@ void MetricEntity::register_hook(const std::string& name, const std::function l(_lock); + std::lock_guard l(_lock); _hooks.erase(name); } @@ -276,7 +276,7 @@ std::shared_ptr MetricRegistry::register_entity(const std::string& const Labels& labels, MetricEntityType type) { std::shared_ptr entity = std::make_shared(type, name, labels); - std::lock_guard l(_lock); + std::lock_guard l(_lock); auto inserted_entity = _entities.insert(std::make_pair(entity, 1)); if (!inserted_entity.second) { // If exist, increase the registered count @@ -286,7 +286,7 @@ std::shared_ptr MetricRegistry::register_entity(const std::string& } void MetricRegistry::deregister_entity(const std::shared_ptr& entity) { - std::lock_guard l(_lock); + std::lock_guard l(_lock); auto found_entity = _entities.find(entity); if (found_entity != _entities.end()) { // Decrease the registered count @@ -303,7 +303,7 @@ std::shared_ptr MetricRegistry::get_entity(const std::string& name MetricEntityType type) { std::shared_ptr dummy = std::make_shared(type, name, labels); - std::lock_guard l(_lock); + std::lock_guard l(_lock); auto entity = _entities.find(dummy); if (entity == _entities.end()) { return std::shared_ptr(); @@ -312,9 +312,9 @@ std::shared_ptr MetricRegistry::get_entity(const std::string& name } void MetricRegistry::trigger_all_hooks(bool force) const { - std::lock_guard l(_lock); + std::lock_guard l(_lock); for (const auto& entity : _entities) { - std::lock_guard l(entity.first->_lock); + std::lock_guard l(entity.first->_lock); entity.first->trigger_hook_unlocked(force); } } @@ -322,12 +322,12 @@ void MetricRegistry::trigger_all_hooks(bool force) const { std::string MetricRegistry::to_prometheus(bool with_tablet_metrics) const { // Reorder by MetricPrototype EntityMetricsByType entity_metrics_by_types; - std::lock_guard l(_lock); + std::lock_guard l(_lock); for (const auto& entity : _entities) { if (entity.first->_type == MetricEntityType::kTablet && !with_tablet_metrics) { continue; } - std::lock_guard l(entity.first->_lock); + std::lock_guard l(entity.first->_lock); entity.first->trigger_hook_unlocked(false); for (const auto& metric : entity.first->_metrics) { std::pair new_elem = @@ -365,12 +365,12 @@ std::string MetricRegistry::to_prometheus(bool with_tablet_metrics) const { std::string MetricRegistry::to_json(bool with_tablet_metrics) const { rj::Document doc {rj::kArrayType}; rj::Document::AllocatorType& allocator = doc.GetAllocator(); - std::lock_guard l(_lock); + std::lock_guard l(_lock); for (const auto& entity : _entities) { if (entity.first->_type == MetricEntityType::kTablet && !with_tablet_metrics) { continue; } - std::lock_guard l(entity.first->_lock); + std::lock_guard l(entity.first->_lock); entity.first->trigger_hook_unlocked(false); for (const auto& metric : entity.first->_metrics) { rj::Value metric_obj(rj::kObjectType); @@ -406,9 +406,9 @@ std::string MetricRegistry::to_json(bool with_tablet_metrics) const { std::string MetricRegistry::to_core_string() const { std::stringstream ss; - std::lock_guard l(_lock); + std::lock_guard l(_lock); for (const auto& entity : _entities) { - std::lock_guard l(entity.first->_lock); + std::lock_guard l(entity.first->_lock); entity.first->trigger_hook_unlocked(false); for (const auto& metric : entity.first->_metrics) { if (metric.first->is_core_metric) { diff --git a/be/src/util/metrics.h b/be/src/util/metrics.h index 6007562eb05bc4..ac7e69a4ef8ab4 100644 --- a/be/src/util/metrics.h +++ b/be/src/util/metrics.h @@ -35,7 +35,6 @@ #include "util/core_local.h" #include "util/histogram.h" -#include "util/spinlock.h" namespace doris { @@ -111,17 +110,17 @@ class LockSimpleMetric : public Metric { std::string to_string() const override { return std::to_string(value()); } T value() const { - std::lock_guard l(_lock); + std::lock_guard l(_lock); return _value; } void increment(const T& delta) { - std::lock_guard l(this->_lock); + std::lock_guard l(this->_lock); _value += delta; } void set_value(const T& value) { - std::lock_guard l(this->_lock); + std::lock_guard l(this->_lock); _value = value; } @@ -130,14 +129,14 @@ class LockSimpleMetric : public Metric { } protected: - // We use spinlock instead of std::atomic is because atomic don't support + // We use std::mutex instead of std::atomic is because atomic don't support // double's fetch_add // TODO(zc): If this is atomic is bottleneck, we change to thread local. // performance: on Intel(R) Xeon(R) CPU E5-2450 int64_t // original type: 2ns/op - // single thread spinlock: 26ns/op - // multiple thread(8) spinlock: 2500ns/op - mutable SpinLock _lock; + // single thread std::mutex: 26ns/op + // multiple thread(8) std::mutex: 2500ns/op + mutable std::mutex _lock; T _value; }; @@ -202,7 +201,7 @@ class HistogramMetric : public Metric { protected: static std::map _s_output_percentiles; - mutable SpinLock _lock; + mutable std::mutex _lock; HistogramStat _stats; }; @@ -351,7 +350,7 @@ class MetricEntity { template Metric* register_metric(const MetricPrototype* metric_type) { - std::lock_guard l(_lock); + std::lock_guard l(_lock); auto inserted_metric = _metrics.insert(std::make_pair(metric_type, nullptr)); if (inserted_metric.second) { // If not exist, make a new metric pointer @@ -377,7 +376,7 @@ class MetricEntity { std::string _name; Labels _labels; - mutable SpinLock _lock; + mutable std::mutex _lock; MetricMap _metrics; std::map> _hooks; }; @@ -421,7 +420,7 @@ class MetricRegistry { private: const std::string _name; - mutable SpinLock _lock; + mutable std::mutex _lock; // MetricEntity -> register count std::unordered_map, int32_t, MetricEntityHash, MetricEntityEqualTo> diff --git a/be/src/util/streaming_sampler.h b/be/src/util/streaming_sampler.h deleted file mode 100644 index f5b727ac430aec..00000000000000 --- a/be/src/util/streaming_sampler.h +++ /dev/null @@ -1,150 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include - -#include - -#include "util/spinlock.h" - -namespace doris { - -/// A fixed-size sampler to collect samples over time. AddSample should be -/// called periodically with the sampled value. Samples are added at the max -/// resolution possible. When the sample buffer is full, the current samples -/// are collapsed and the collection period is doubled. -/// The input period and the streaming sampler period do not need to match, the -/// streaming sampler will average values. -/// T is the type of the sample and must be a native numerical type (e.g. int or float). -template -class StreamingSampler { -public: - StreamingSampler(int initial_period = 500) - : samples_collected_(0), - period_(initial_period), - current_sample_sum_(0), - current_sample_count_(0), - current_sample_total_time_(0) {} - - /// Initialize the sampler with values. - StreamingSampler(int period, const std::vector& initial_samples) - : samples_collected_(initial_samples.size()), - period_(period), - current_sample_sum_(0), - current_sample_count_(0), - current_sample_total_time_(0) { - DCHECK_LE(samples_collected_, MAX_SAMPLES); - memcpy(samples_, &initial_samples[0], sizeof(T) * samples_collected_); - } - - /// Add a sample to the sampler. 'ms' is the time elapsed since the last time this - /// was called. - /// The input value is accumulated into current_*. If the total time elapsed - /// in current_sample_total_time_ is higher than the storage period, the value is - /// stored. 'sample' should be interpreted as a representative sample from - /// (now - ms, now]. - /// TODO: we can make this more complex by taking a weighted average of samples - /// accumulated in a period. - void AddSample(T sample, int ms) { - std::lock_guard l(lock_); - ++current_sample_count_; - current_sample_sum_ += sample; - current_sample_total_time_ += ms; - - if (current_sample_total_time_ >= period_) { - samples_[samples_collected_++] = current_sample_sum_ / current_sample_count_; - current_sample_count_ = 0; - current_sample_sum_ = 0; - current_sample_total_time_ = 0; - - if (samples_collected_ == MAX_SAMPLES) { - /// collapse the samples in half by averaging them and doubling the storage period - period_ *= 2; - for (int i = 0; i < MAX_SAMPLES / 2; ++i) { - samples_[i] = (samples_[i * 2] + samples_[i * 2 + 1]) / 2; - } - samples_collected_ /= 2; - } - } - } - - /// Get the samples collected. Returns the number of samples and - /// the period they were collected at. - /// If lock is non-null, the lock will be taken before returning. The caller - /// must unlock it. - const T* GetSamples(int* num_samples, int* period, SpinLock** lock = nullptr) const { - if (lock != nullptr) { - lock_.lock(); - *lock = &lock_; - } - *num_samples = samples_collected_; - *period = period_; - return samples_; - } - - /// Set the underlying data to period/samples - void SetSamples(int period, const std::vector& samples) { - DCHECK_LE(samples.size(), MAX_SAMPLES); - - std::lock_guard l(lock_); - period_ = period; - samples_collected_ = samples.size(); - memcpy(samples_, &samples[0], sizeof(T) * samples_collected_); - current_sample_sum_ = 0; - current_sample_count_ = 0; - current_sample_total_time_ = 0; - } - - std::string DebugString(const std::string& prefix = "") const { - std::lock_guard l(lock_); - std::stringstream ss; - ss << prefix << "Period = " << period_ << std::endl - << prefix << "Num = " << samples_collected_ << std::endl - << prefix << "Samples = {"; - for (int i = 0; i < samples_collected_; ++i) { - ss << samples_[i] << ", "; - } - ss << prefix << "}" << std::endl; - return ss.str(); - } - -private: - mutable SpinLock lock_; - - /// Aggregated samples collected. Note: this is not all the input samples from - /// AddSample(), as logically, those samples get resampled and aggregated. - T samples_[MAX_SAMPLES]; - - /// Number of samples collected <= MAX_SAMPLES. - int samples_collected_; - - /// Storage period in ms. - int period_; - - /// The sum of input samples that makes up the next stored sample. - T current_sample_sum_; - - /// The number of input samples that contribute to current_sample_sum_. - int current_sample_count_; - - /// The total time that current_sample_sum_ represents - int current_sample_total_time_; -}; - -} // namespace doris