From 30c3b84c7ca978709474f327853e1e1a4b4ff4bf Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Tue, 27 Sep 2022 11:53:09 -0700 Subject: [PATCH 1/4] commit --- .../sdk/metrics/state/async_metric_storage.h | 27 ++++++++++++++++--- .../sdk/metrics/state/attributes_hashmap.h | 16 +---------- .../sdk/metrics/state/sync_metric_storage.h | 5 ++++ sdk/src/metrics/state/sync_metric_storage.cc | 8 ++++-- 4 files changed, 35 insertions(+), 21 deletions(-) diff --git a/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h b/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h index b23443595b..acd866d2a8 100644 --- a/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h +++ b/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h @@ -48,21 +48,31 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora { auto aggr = DefaultAggregation::CreateAggregation(aggregation_type_, instrument_descriptor_); aggr->Aggregate(measurement.second); + cumulative_hashmap_lock_.lock(); auto prev = cumulative_hash_map_->Get(measurement.first); + cumulative_hashmap_lock_.unlock(); if (prev) { auto delta = prev->Diff(*aggr); + cumulative_hashmap_lock_.lock(); cumulative_hash_map_->Set(measurement.first, DefaultAggregation::CloneAggregation( aggregation_type_, instrument_descriptor_, *delta)); + cumulative_hashmap_lock_.unlock(); + delta_hashmap_lock_.lock(); delta_hash_map_->Set(measurement.first, std::move(delta)); + delta_hashmap_lock_.unlock(); } else { + cumulative_hashmap_lock_.lock(); cumulative_hash_map_->Set( measurement.first, DefaultAggregation::CloneAggregation(aggregation_type_, instrument_descriptor_, *aggr)); + cumulative_hashmap_lock_.unlock(); + delta_hashmap_lock_.lock(); delta_hash_map_->Set(measurement.first, std::move(aggr)); + delta_hashmap_lock_.unlock(); } } } @@ -96,10 +106,16 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora nostd::function_ref metric_collection_callback) noexcept override { - auto status = temporal_metric_storage_.buildMetrics(collector, collectors, sdk_start_ts, - collection_ts, std::move(delta_hash_map_), - metric_collection_callback); - delta_hash_map_.reset(new AttributesHashMap()); + std::shared_ptr delta_metrics = nullptr; + { + std::lock_guard guard(delta_hashmap_lock_); + delta_metrics = std::move(delta_hash_map_); + delta_hash_map_.reset(new AttributesHashMap); + } + + auto status = + temporal_metric_storage_.buildMetrics(collector, collectors, sdk_start_ts, collection_ts, + delta_metrics, metric_collection_callback); return status; } @@ -109,7 +125,10 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora const AttributesProcessor *attributes_processor_; void *state_; std::unique_ptr cumulative_hash_map_; + opentelemetry::common::SpinLockMutex cumulative_hashmap_lock_; std::unique_ptr delta_hash_map_; + opentelemetry::common::SpinLockMutex delta_hashmap_lock_; + TemporalMetricStorage temporal_metric_storage_; }; diff --git a/sdk/include/opentelemetry/sdk/metrics/state/attributes_hashmap.h b/sdk/include/opentelemetry/sdk/metrics/state/attributes_hashmap.h index be86c96826..2f93b32f66 100644 --- a/sdk/include/opentelemetry/sdk/metrics/state/attributes_hashmap.h +++ b/sdk/include/opentelemetry/sdk/metrics/state/attributes_hashmap.h @@ -3,7 +3,6 @@ #pragma once #ifndef ENABLE_METRICS_PREVIEW -# include "opentelemetry/common/spin_lock_mutex.h" # include "opentelemetry/nostd/function_ref.h" # include "opentelemetry/sdk/common/attribute_utils.h" # include "opentelemetry/sdk/common/attributemap_hash.h" @@ -13,7 +12,6 @@ # include # include -# include # include OPENTELEMETRY_BEGIN_NAMESPACE @@ -37,7 +35,6 @@ class AttributesHashMap public: Aggregation *Get(const MetricAttributes &attributes) const { - std::lock_guard guard(lock_); auto it = hash_map_.find(attributes); if (it != hash_map_.end()) { @@ -52,7 +49,6 @@ class AttributesHashMap */ bool Has(const MetricAttributes &attributes) const { - std::lock_guard guard(lock_); return (hash_map_.find(attributes) == hash_map_.end()) ? false : true; } @@ -64,8 +60,6 @@ class AttributesHashMap Aggregation *GetOrSetDefault(const MetricAttributes &attributes, std::function()> aggregation_callback) { - std::lock_guard guard(lock_); - auto it = hash_map_.find(attributes); if (it != hash_map_.end()) { @@ -81,7 +75,6 @@ class AttributesHashMap */ void Set(const MetricAttributes &attributes, std::unique_ptr value) { - std::lock_guard guard(lock_); hash_map_[attributes] = std::move(value); } @@ -91,7 +84,6 @@ class AttributesHashMap bool GetAllEnteries( nostd::function_ref callback) const { - std::lock_guard guard(lock_); for (auto &kv : hash_map_) { if (!callback(kv.first, *(kv.second.get()))) @@ -105,17 +97,11 @@ class AttributesHashMap /** * Return the size of hash. */ - size_t Size() - { - std::lock_guard guard(lock_); - return hash_map_.size(); - } + size_t Size() { return hash_map_.size(); } private: std::unordered_map, AttributeHashGenerator> hash_map_; - - mutable opentelemetry::common::SpinLockMutex lock_; }; } // namespace metrics diff --git a/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h b/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h index a9b0604f05..993a18f744 100644 --- a/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h +++ b/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h @@ -52,6 +52,7 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage return; } exemplar_reservoir_->OfferMeasurement(value, {}, context, std::chrono::system_clock::now()); + std::lock_guard guard(lock_); attributes_hashmap_->GetOrSetDefault({}, create_default_aggregation_)->Aggregate(value); } @@ -67,6 +68,7 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage exemplar_reservoir_->OfferMeasurement(value, attributes, context, std::chrono::system_clock::now()); auto attr = attributes_processor_->process(attributes); + std::lock_guard guard(lock_); attributes_hashmap_->GetOrSetDefault(attr, create_default_aggregation_)->Aggregate(value); } @@ -77,6 +79,7 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage return; } exemplar_reservoir_->OfferMeasurement(value, {}, context, std::chrono::system_clock::now()); + std::lock_guard guard(lock_); attributes_hashmap_->GetOrSetDefault({}, create_default_aggregation_)->Aggregate(value); } @@ -93,6 +96,7 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage exemplar_reservoir_->OfferMeasurement(value, attributes, context, std::chrono::system_clock::now()); auto attr = attributes_processor_->process(attributes); + std::lock_guard guard(lock_); attributes_hashmap_->GetOrSetDefault(attr, create_default_aggregation_)->Aggregate(value); } @@ -117,6 +121,7 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage std::function()> create_default_aggregation_; nostd::shared_ptr exemplar_reservoir_; TemporalMetricStorage temporal_metric_storage_; + opentelemetry::common::SpinLockMutex lock_; }; } // namespace metrics diff --git a/sdk/src/metrics/state/sync_metric_storage.cc b/sdk/src/metrics/state/sync_metric_storage.cc index eec68ed6d7..546dc5822e 100644 --- a/sdk/src/metrics/state/sync_metric_storage.cc +++ b/sdk/src/metrics/state/sync_metric_storage.cc @@ -20,8 +20,12 @@ bool SyncMetricStorage::Collect(CollectorHandle *collector, // Add the current delta metrics to `unreported metrics stash` for all the collectors, // this will also empty the delta metrics hashmap, and make it available for // recordings - std::shared_ptr delta_metrics = std::move(attributes_hashmap_); - attributes_hashmap_.reset(new AttributesHashMap); + std::shared_ptr delta_metrics = nullptr; + { + std::lock_guard guard(lock_); + delta_metrics = std::move(attributes_hashmap_); + attributes_hashmap_.reset(new AttributesHashMap); + } return temporal_metric_storage_.buildMetrics(collector, collectors, sdk_start_ts, collection_ts, std::move(delta_metrics), callback); From fb1ed25d1f2fae765d2d770fe09d26663890c7f4 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Tue, 27 Sep 2022 12:53:16 -0700 Subject: [PATCH 2/4] fix --- sdk/test/metrics/attributes_hashmap_benchmark.cc | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sdk/test/metrics/attributes_hashmap_benchmark.cc b/sdk/test/metrics/attributes_hashmap_benchmark.cc index 2ebf03175a..92035809e0 100644 --- a/sdk/test/metrics/attributes_hashmap_benchmark.cc +++ b/sdk/test/metrics/attributes_hashmap_benchmark.cc @@ -25,14 +25,17 @@ void BM_AttributseHashMap(benchmark::State &state) std::vector attributes = {{{"k1", "v1"}, {"k2", "v2"}}, {{"k1", "v1"}, {"k2", "v2"}, {"k3", "v3"}}}; - auto work = [&attributes, &hash_map](const size_t i) { + std::mutex m; + + auto work = [&attributes, &hash_map, &m](const size_t i) { std::function()> create_default_aggregation = []() -> std::unique_ptr { return std::unique_ptr(new DropAggregation); }; - + m.lock(); hash_map.GetOrSetDefault(attributes[i % 2], create_default_aggregation)->Aggregate(1l); benchmark::DoNotOptimize(hash_map.Has(attributes[i % 2])); + m.unlock(); }; while (state.KeepRunning()) { From 118f04888c2d369ac0301e66adf4df9ec6584be8 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Tue, 27 Sep 2022 14:32:48 -0700 Subject: [PATCH 3/4] Fix --- .../sdk/metrics/state/async_metric_storage.h | 17 +++-------------- .../sdk/metrics/state/sync_metric_storage.h | 10 +++++----- sdk/src/metrics/state/sync_metric_storage.cc | 2 +- 3 files changed, 9 insertions(+), 20 deletions(-) diff --git a/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h b/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h index acd866d2a8..79731a80bc 100644 --- a/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h +++ b/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h @@ -44,35 +44,26 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora opentelemetry::common::SystemTimestamp /* observation_time */) noexcept { // process the read measurements - aggregate and store in hashmap + std::lock_guard guard(hashmap_lock_); for (auto &measurement : measurements) { auto aggr = DefaultAggregation::CreateAggregation(aggregation_type_, instrument_descriptor_); aggr->Aggregate(measurement.second); - cumulative_hashmap_lock_.lock(); auto prev = cumulative_hash_map_->Get(measurement.first); - cumulative_hashmap_lock_.unlock(); if (prev) { auto delta = prev->Diff(*aggr); - cumulative_hashmap_lock_.lock(); cumulative_hash_map_->Set(measurement.first, DefaultAggregation::CloneAggregation( aggregation_type_, instrument_descriptor_, *delta)); - cumulative_hashmap_lock_.unlock(); - delta_hashmap_lock_.lock(); delta_hash_map_->Set(measurement.first, std::move(delta)); - delta_hashmap_lock_.unlock(); } else { - cumulative_hashmap_lock_.lock(); cumulative_hash_map_->Set( measurement.first, DefaultAggregation::CloneAggregation(aggregation_type_, instrument_descriptor_, *aggr)); - cumulative_hashmap_lock_.unlock(); - delta_hashmap_lock_.lock(); delta_hash_map_->Set(measurement.first, std::move(aggr)); - delta_hashmap_lock_.unlock(); } } } @@ -108,7 +99,7 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora std::shared_ptr delta_metrics = nullptr; { - std::lock_guard guard(delta_hashmap_lock_); + std::lock_guard guard(hashmap_lock_); delta_metrics = std::move(delta_hash_map_); delta_hash_map_.reset(new AttributesHashMap); } @@ -125,10 +116,8 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora const AttributesProcessor *attributes_processor_; void *state_; std::unique_ptr cumulative_hash_map_; - opentelemetry::common::SpinLockMutex cumulative_hashmap_lock_; std::unique_ptr delta_hash_map_; - opentelemetry::common::SpinLockMutex delta_hashmap_lock_; - + opentelemetry::common::SpinLockMutex hashmap_lock_; TemporalMetricStorage temporal_metric_storage_; }; diff --git a/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h b/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h index 993a18f744..db56bf6b49 100644 --- a/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h +++ b/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h @@ -52,7 +52,7 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage return; } exemplar_reservoir_->OfferMeasurement(value, {}, context, std::chrono::system_clock::now()); - std::lock_guard guard(lock_); + std::lock_guard guard(hashmap_lock_); attributes_hashmap_->GetOrSetDefault({}, create_default_aggregation_)->Aggregate(value); } @@ -68,7 +68,7 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage exemplar_reservoir_->OfferMeasurement(value, attributes, context, std::chrono::system_clock::now()); auto attr = attributes_processor_->process(attributes); - std::lock_guard guard(lock_); + std::lock_guard guard(hashmap_lock_); attributes_hashmap_->GetOrSetDefault(attr, create_default_aggregation_)->Aggregate(value); } @@ -79,7 +79,7 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage return; } exemplar_reservoir_->OfferMeasurement(value, {}, context, std::chrono::system_clock::now()); - std::lock_guard guard(lock_); + std::lock_guard guard(hashmap_lock_); attributes_hashmap_->GetOrSetDefault({}, create_default_aggregation_)->Aggregate(value); } @@ -96,7 +96,7 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage exemplar_reservoir_->OfferMeasurement(value, attributes, context, std::chrono::system_clock::now()); auto attr = attributes_processor_->process(attributes); - std::lock_guard guard(lock_); + std::lock_guard guard(hashmap_lock_); attributes_hashmap_->GetOrSetDefault(attr, create_default_aggregation_)->Aggregate(value); } @@ -121,7 +121,7 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage std::function()> create_default_aggregation_; nostd::shared_ptr exemplar_reservoir_; TemporalMetricStorage temporal_metric_storage_; - opentelemetry::common::SpinLockMutex lock_; + opentelemetry::common::SpinLockMutex hashmap_lock_; }; } // namespace metrics diff --git a/sdk/src/metrics/state/sync_metric_storage.cc b/sdk/src/metrics/state/sync_metric_storage.cc index 546dc5822e..530b2668ef 100644 --- a/sdk/src/metrics/state/sync_metric_storage.cc +++ b/sdk/src/metrics/state/sync_metric_storage.cc @@ -22,7 +22,7 @@ bool SyncMetricStorage::Collect(CollectorHandle *collector, // recordings std::shared_ptr delta_metrics = nullptr; { - std::lock_guard guard(lock_); + std::lock_guard guard(hashmap_lock_); delta_metrics = std::move(attributes_hashmap_); attributes_hashmap_.reset(new AttributesHashMap); } From 73c07ddf33752a55e4b49dfcd6ec14bccfd27281 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Tue, 27 Sep 2022 16:53:59 -0700 Subject: [PATCH 4/4] fix comments --- .../sdk/metrics/state/sync_metric_storage.h | 10 +++++----- sdk/src/metrics/state/sync_metric_storage.cc | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h b/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h index db56bf6b49..16bac34079 100644 --- a/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h +++ b/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h @@ -52,7 +52,7 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage return; } exemplar_reservoir_->OfferMeasurement(value, {}, context, std::chrono::system_clock::now()); - std::lock_guard guard(hashmap_lock_); + std::lock_guard guard(attribute_hashmap_lock_); attributes_hashmap_->GetOrSetDefault({}, create_default_aggregation_)->Aggregate(value); } @@ -68,7 +68,7 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage exemplar_reservoir_->OfferMeasurement(value, attributes, context, std::chrono::system_clock::now()); auto attr = attributes_processor_->process(attributes); - std::lock_guard guard(hashmap_lock_); + std::lock_guard guard(attribute_hashmap_lock_); attributes_hashmap_->GetOrSetDefault(attr, create_default_aggregation_)->Aggregate(value); } @@ -79,7 +79,7 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage return; } exemplar_reservoir_->OfferMeasurement(value, {}, context, std::chrono::system_clock::now()); - std::lock_guard guard(hashmap_lock_); + std::lock_guard guard(attribute_hashmap_lock_); attributes_hashmap_->GetOrSetDefault({}, create_default_aggregation_)->Aggregate(value); } @@ -96,7 +96,7 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage exemplar_reservoir_->OfferMeasurement(value, attributes, context, std::chrono::system_clock::now()); auto attr = attributes_processor_->process(attributes); - std::lock_guard guard(hashmap_lock_); + std::lock_guard guard(attribute_hashmap_lock_); attributes_hashmap_->GetOrSetDefault(attr, create_default_aggregation_)->Aggregate(value); } @@ -121,7 +121,7 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage std::function()> create_default_aggregation_; nostd::shared_ptr exemplar_reservoir_; TemporalMetricStorage temporal_metric_storage_; - opentelemetry::common::SpinLockMutex hashmap_lock_; + opentelemetry::common::SpinLockMutex attribute_hashmap_lock_; }; } // namespace metrics diff --git a/sdk/src/metrics/state/sync_metric_storage.cc b/sdk/src/metrics/state/sync_metric_storage.cc index 530b2668ef..be753ca98f 100644 --- a/sdk/src/metrics/state/sync_metric_storage.cc +++ b/sdk/src/metrics/state/sync_metric_storage.cc @@ -22,7 +22,7 @@ bool SyncMetricStorage::Collect(CollectorHandle *collector, // recordings std::shared_ptr delta_metrics = nullptr; { - std::lock_guard guard(hashmap_lock_); + std::lock_guard guard(attribute_hashmap_lock_); delta_metrics = std::move(attributes_hashmap_); attributes_hashmap_.reset(new AttributesHashMap); }