From 8bbc03535c2ad91c0ac7ebf26b449a77aaf132b0 Mon Sep 17 00:00:00 2001 From: Lalit Date: Thu, 24 Mar 2022 00:45:08 -0700 Subject: [PATCH 01/13] add periodic exporting metric reader --- .../export/periodic_exporting_metric_reader.h | 67 +++++++++++++++++++ .../sdk/metrics/metric_exporter.h | 4 +- .../periodic_exporting_metric_reader.cc | 60 +++++++++++++++++ sdk/src/metrics/metric_reader.cc | 1 + 4 files changed, 129 insertions(+), 3 deletions(-) create mode 100644 sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h create mode 100644 sdk/src/metrics/export/periodic_exporting_metric_reader.cc diff --git a/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h b/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h new file mode 100644 index 0000000000..6601ba2ea1 --- /dev/null +++ b/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h @@ -0,0 +1,67 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#pragma once +#ifndef ENABLE_METRICS_PREVIEW + +# include "opentelemetry/sdk/metrics/metric_reader.h" +# include "opentelemetry/version.h" + +# include +# include +# include +# include + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk +{ +namespace metrics +{ + +class MetricExporter; +/** + * Struct to hold batch PeriodicExortingMetricReader options. + */ + +struct PeriodicExortingMetricReaderOptions +{ + + /* The time interval between two consecutive exports. */ + std::chrono::milliseconds schedule_delay_millis = std::chrono::milliseconds(60000); + + /* how long the export can run before it is cancelled. */ + std::chrono::milliseconds export_timeout_millis = std::chrono::milliseconds(30000); +}; + +class PeriodicExortingMetricReader : public MetricReader +{ + +public: + PeriodicExortingMetricReader(std::unique_ptr exporter, + const PeriodicExortingMetricReaderOptions &option); + +private: + bool OnForceFlush(std::chrono::microseconds timeout) noexcept override; + + bool OnShutDown(std::chrono::microseconds timeout) noexcept override; + + void OnInitialized() noexcept override; + + std::unique_ptr exporter_; + std::chrono::milliseconds schedule_delay_millis_; + std::chrono::milliseconds export_timeout_millis_; + + void DoBackgroundWork(); + + /* The background worker thread */ + std::thread worker_thread_; + + /* Synchronization primitives */ + std::condition_variable cv_, force_flush_cv_; + std::mutex cv_m_, force_flush_cv_m_, shutdown_m_; +}; + +} // namespace metrics +} // namespace sdk +OPENTELEMETRY_END_NAMESPACE +#endif diff --git a/sdk/include/opentelemetry/sdk/metrics/metric_exporter.h b/sdk/include/opentelemetry/sdk/metrics/metric_exporter.h index 96898c9ad1..5d2107a879 100644 --- a/sdk/include/opentelemetry/sdk/metrics/metric_exporter.h +++ b/sdk/include/opentelemetry/sdk/metrics/metric_exporter.h @@ -32,9 +32,7 @@ class MetricExporter * concurrently for the same exporter instance. * @param spans a span of unique pointers to metrics data */ - virtual opentelemetry::sdk::common::ExportResult Export( - const nostd::span> - &records) noexcept = 0; + virtual opentelemetry::sdk::common::ExportResult Export(const MetricData &records) noexcept = 0; /** * Force flush the exporter. diff --git a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc new file mode 100644 index 0000000000..9a4c37e31a --- /dev/null +++ b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc @@ -0,0 +1,60 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#ifndef ENABLE_METRICS_PREVIEW +# include "opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h" +# include "opentelemetry/sdk/metrics/metric_exporter.h" + +# include +# include + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk +{ +namespace metrics +{ + +PeriodicExortingMetricReader::PeriodicExortingMetricReader( + std::unique_ptr exporter, + const PeriodicExortingMetricReaderOptions &option) + : exporter_{std::move(exporter)}, + schedule_delay_millis_{option.schedule_delay_millis}, + export_timeout_millis_{option.export_timeout_millis} +{} + +void PeriodicExortingMetricReader::OnInitialized() noexcept +{ + worker_thread_ = std::thread(&PeriodicExortingMetricReader::DoBackgroundWork, this); +} + +void PeriodicExortingMetricReader::DoBackgroundWork() +{ + auto timeout = schedule_delay_millis_; + std::unique_lock lk(cv_m_); + cv_.wait_for(lk, timeout); + + std::atomic stop{false}; + auto future_receive = std::async(std::launch::async, [this, &stop] { + Collect([this, &stop](MetricData data) { + if (stop) + { + return false; + } + this->exporter_->Export(data); + return true; + }); + }); + std::future_status status; + do + { + status = future_receive.wait_for(std::chrono::milliseconds(export_timeout_millis_)); + if (status == std::future_status::timeout) + { + stop = true; + } + } while (status != std::future_status::ready()); +} +} // namespace metrics +} // namespace sdk +OPENTELEMETRY_END_NAMESPACE +#endif \ No newline at end of file diff --git a/sdk/src/metrics/metric_reader.cc b/sdk/src/metrics/metric_reader.cc index 71aedc227f..51a8695abe 100644 --- a/sdk/src/metrics/metric_reader.cc +++ b/sdk/src/metrics/metric_reader.cc @@ -20,6 +20,7 @@ MetricReader::MetricReader(AggregationTemporarily aggregation_temporarily) void MetricReader::SetMetricProducer(MetricProducer *metric_producer) { metric_producer_ = metric_producer; + OnInitialized(); } AggregationTemporarily MetricReader::GetAggregationTemporarily() const noexcept From 0023a3750e9591a4f25f7ea70a74f27c671c59be Mon Sep 17 00:00:00 2001 From: Lalit Date: Thu, 24 Mar 2022 00:57:25 -0700 Subject: [PATCH 02/13] fix class name --- .../metrics/export/periodic_exporting_metric_reader.h | 8 ++++---- .../metrics/export/periodic_exporting_metric_reader.cc | 10 +++++----- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h b/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h index 6601ba2ea1..0adae652b5 100644 --- a/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h +++ b/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h @@ -23,7 +23,7 @@ class MetricExporter; * Struct to hold batch PeriodicExortingMetricReader options. */ -struct PeriodicExortingMetricReaderOptions +struct PeriodicExportingMetricReaderOptions { /* The time interval between two consecutive exports. */ @@ -33,12 +33,12 @@ struct PeriodicExortingMetricReaderOptions std::chrono::milliseconds export_timeout_millis = std::chrono::milliseconds(30000); }; -class PeriodicExortingMetricReader : public MetricReader +class PeriodicExportingMetricReader : public MetricReader { public: - PeriodicExortingMetricReader(std::unique_ptr exporter, - const PeriodicExortingMetricReaderOptions &option); + PeriodicExportingMetricReader(std::unique_ptr exporter, + const PeriodicExportingMetricReaderOptions &option); private: bool OnForceFlush(std::chrono::microseconds timeout) noexcept override; diff --git a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc index 9a4c37e31a..200d79abd6 100644 --- a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc +++ b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc @@ -14,20 +14,20 @@ namespace sdk namespace metrics { -PeriodicExortingMetricReader::PeriodicExortingMetricReader( +PeriodicExportingMetricReader::PeriodicExportingMetricReader( std::unique_ptr exporter, - const PeriodicExortingMetricReaderOptions &option) + const PeriodicExportingMetricReaderOptions &option) : exporter_{std::move(exporter)}, schedule_delay_millis_{option.schedule_delay_millis}, export_timeout_millis_{option.export_timeout_millis} {} -void PeriodicExortingMetricReader::OnInitialized() noexcept +void PeriodicExportingMetricReader::OnInitialized() noexcept { - worker_thread_ = std::thread(&PeriodicExortingMetricReader::DoBackgroundWork, this); + worker_thread_ = std::thread(&PeriodicExportingMetricReader::DoBackgroundWork, this); } -void PeriodicExortingMetricReader::DoBackgroundWork() +void PeriodicExportingMetricReader::DoBackgroundWork() { auto timeout = schedule_delay_millis_; std::unique_lock lk(cv_m_); From 4a2f69c2289dc5c6dc8cce7de3f698aeb294a5a5 Mon Sep 17 00:00:00 2001 From: Lalit Date: Thu, 24 Mar 2022 09:53:43 -0700 Subject: [PATCH 03/13] fix build --- sdk/src/metrics/CMakeLists.txt | 1 + sdk/src/metrics/export/periodic_exporting_metric_reader.cc | 2 +- sdk/test/metrics/meter_provider_sdk_test.cc | 3 +-- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/src/metrics/CMakeLists.txt b/sdk/src/metrics/CMakeLists.txt index 119477eb06..00292c16cc 100644 --- a/sdk/src/metrics/CMakeLists.txt +++ b/sdk/src/metrics/CMakeLists.txt @@ -4,6 +4,7 @@ add_library( meter.cc meter_context.cc metric_reader.cc + export/periodic_exporting_metric_reader.cc state/metric_collector.cc aggregation/histogram_aggregation.cc aggregation/lastvalue_aggregation.cc diff --git a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc index 200d79abd6..87e8b8d9af 100644 --- a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc +++ b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc @@ -52,7 +52,7 @@ void PeriodicExportingMetricReader::DoBackgroundWork() { stop = true; } - } while (status != std::future_status::ready()); + } while (status != std::future_status::ready); } } // namespace metrics } // namespace sdk diff --git a/sdk/test/metrics/meter_provider_sdk_test.cc b/sdk/test/metrics/meter_provider_sdk_test.cc index 015d3023ae..89708516c9 100644 --- a/sdk/test/metrics/meter_provider_sdk_test.cc +++ b/sdk/test/metrics/meter_provider_sdk_test.cc @@ -18,8 +18,7 @@ class MockMetricExporter : public MetricExporter public: MockMetricExporter() = default; - opentelemetry::sdk::common::ExportResult Export( - const opentelemetry::nostd::span> &records) noexcept override + opentelemetry::sdk::common::ExportResult Export(const MetricData &records) noexcept override { return opentelemetry::sdk::common::ExportResult::kSuccess; } From 81c65b55bb6fb7e55b2141ddfcbb15d4b89e63d8 Mon Sep 17 00:00:00 2001 From: Lalit Date: Fri, 25 Mar 2022 00:47:37 -0700 Subject: [PATCH 04/13] tests --- .../export/periodic_exporting_metric_reader.h | 6 +- .../sdk/metrics/metric_exporter.h | 3 - .../opentelemetry/sdk/metrics/metric_reader.h | 4 +- .../periodic_exporting_metric_reader.cc | 55 ++++++++----- .../periodic_exporting_metric_reader_test.cc | 79 +++++++++++++++++++ 5 files changed, 120 insertions(+), 27 deletions(-) create mode 100644 sdk/test/metrics/periodic_exporting_metric_reader_test.cc diff --git a/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h b/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h index 0adae652b5..58c8872c45 100644 --- a/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h +++ b/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h @@ -37,8 +37,10 @@ class PeriodicExportingMetricReader : public MetricReader { public: - PeriodicExportingMetricReader(std::unique_ptr exporter, - const PeriodicExportingMetricReaderOptions &option); + PeriodicExportingMetricReader( + std::unique_ptr exporter, + const PeriodicExportingMetricReaderOptions &option, + AggregationTemporality aggregation_temporality = AggregationTemporality::kCummulative); private: bool OnForceFlush(std::chrono::microseconds timeout) noexcept override; diff --git a/sdk/include/opentelemetry/sdk/metrics/metric_exporter.h b/sdk/include/opentelemetry/sdk/metrics/metric_exporter.h index 8f7b039697..26193f4cce 100644 --- a/sdk/include/opentelemetry/sdk/metrics/metric_exporter.h +++ b/sdk/include/opentelemetry/sdk/metrics/metric_exporter.h @@ -47,9 +47,6 @@ class MetricExporter */ virtual bool Shutdown( std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept = 0; - -private: - AggregationTemporality aggregation_temporality_; }; } // namespace metrics } // namespace sdk diff --git a/sdk/include/opentelemetry/sdk/metrics/metric_reader.h b/sdk/include/opentelemetry/sdk/metrics/metric_reader.h index 1f7cee8d30..240ca9ba15 100644 --- a/sdk/include/opentelemetry/sdk/metrics/metric_reader.h +++ b/sdk/include/opentelemetry/sdk/metrics/metric_reader.h @@ -26,8 +26,7 @@ class MetricProducer; class MetricReader { public: - MetricReader( - AggregationTemporality aggregation_temporality = AggregationTemporality::kCummulative); + MetricReader(AggregationTemporality aggr_temp = AggregationTemporality::kCummulative); void SetMetricProducer(MetricProducer *metric_producer); @@ -58,6 +57,7 @@ class MetricReader virtual void OnInitialized() noexcept {} +protected: bool IsShutdown() const noexcept; private: diff --git a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc index 87e8b8d9af..f26c4875c0 100644 --- a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc +++ b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc @@ -3,6 +3,7 @@ #ifndef ENABLE_METRICS_PREVIEW # include "opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h" +# include "opentelemetry/sdk/common/global_log_handler.h" # include "opentelemetry/sdk/metrics/metric_exporter.h" # include @@ -16,8 +17,10 @@ namespace metrics PeriodicExportingMetricReader::PeriodicExportingMetricReader( std::unique_ptr exporter, - const PeriodicExportingMetricReaderOptions &option) - : exporter_{std::move(exporter)}, + const PeriodicExportingMetricReaderOptions &option, + AggregationTemporality aggregation_temporality) + : MetricReader(aggregation_temporality), + exporter_{std::move(exporter)}, schedule_delay_millis_{option.schedule_delay_millis}, export_timeout_millis_{option.export_timeout_millis} {} @@ -31,28 +34,40 @@ void PeriodicExportingMetricReader::DoBackgroundWork() { auto timeout = schedule_delay_millis_; std::unique_lock lk(cv_m_); - cv_.wait_for(lk, timeout); - - std::atomic stop{false}; - auto future_receive = std::async(std::launch::async, [this, &stop] { - Collect([this, &stop](MetricData data) { - if (stop) - { - return false; - } - this->exporter_->Export(data); - return true; - }); - }); - std::future_status status; do { - status = future_receive.wait_for(std::chrono::milliseconds(export_timeout_millis_)); - if (status == std::future_status::timeout) + cv_.wait_for(lk, timeout); + + if (IsShutdown()) { - stop = true; + break; } - } while (status != std::future_status::ready); + + std::atomic cancel_export_for_timeout{false}; + auto future_receive = std::async(std::launch::async, [this, &cancel_export_for_timeout] { + Collect([this, &cancel_export_for_timeout](MetricData data) { + if (cancel_export_for_timeout) + { + OTEL_INTERNAL_LOG_ERROR( + "[Periodic Exporting Metric Reader] Collect took longer configured time: " + << export_timeout_millis_.count() << " ms, and timed out"); + return false; + } + this->exporter_->Export(data); + return true; + }); + }); + std::future_status status; + do + { + status = future_receive.wait_for(std::chrono::milliseconds(export_timeout_millis_)); + if (status == std::future_status::timeout) + { + cancel_export_for_timeout = true; + } + } while (status != std::future_status::ready); + + } while (true); } } // namespace metrics } // namespace sdk diff --git a/sdk/test/metrics/periodic_exporting_metric_reader_test.cc b/sdk/test/metrics/periodic_exporting_metric_reader_test.cc new file mode 100644 index 0000000000..b95083aa76 --- /dev/null +++ b/sdk/test/metrics/periodic_exporting_metric_reader_test.cc @@ -0,0 +1,79 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#ifndef ENABLE_METRICS_PREVIEW + +# include "opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h" +# include "opentelemetry/sdk/metrics/export/metric_producer.h" +# include "opentelemetry/sdk/metrics/metric_exporter.h" + +# include + +using namespace opentelemetry; +using namespace opentelemetry::sdk::instrumentationlibrary; +using namespace opentelemetry::sdk::metrics; + +class MockPushMetricExporter : public MetricExporter +{ +public: + opentelemetry::sdk::common::ExportResult Export(const MetricData &record) noexcept override + { + records.push_back(record); + return opentelemetry::sdk::common::ExportResult::kSuccess; + } + + bool ForceFlush( + std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept override + { + return false; + } + + bool Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept override + { + return true; + } + + size_t GetDataCount() { return records.size(); } + +private: + std::vector records; +}; + +class MockMetricProducer : public MetricProducer +{ +public: + MockMetricProducer(std::chrono::microseconds sleep_ms = std::chrono::microseconds::zero()) + : sleep_ms_{sleep_ms} + {} + + bool Collect(nostd::function_ref callback) noexcept override + { + std::this_thread::sleep_for(sleep_ms_); + MetricData data; + callback(data); + data_sent_size_++; + return true; + } + + size_t GetDataCount() { return data_sent_size_; } + +private: + std::chrono::microseconds sleep_ms_; + size_t data_sent_size_; +}; + +TEST(PeriodicExporingMetricReader, BasicTests) +{ + std::unique_ptr exporter(new MockPushMetricExporter()); + PeriodicExportingMetricReaderOptions options; + auto exporter_ptr = exporter.get(); + PeriodicExportingMetricReader reader(std::move(exporter), options); + MockMetricProducer producer; + reader.SetMetricProducer(&producer); + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + reader.Shutdown(); + EXPECT_EQ(static_cast(exporter_ptr)->GetDataCount(), + static_cast(&producer)->GetDataCount()); +} + +#endif \ No newline at end of file From 45d9b52744c43f5f1199f316044b7d9ad2357fe5 Mon Sep 17 00:00:00 2001 From: Lalit Date: Fri, 25 Mar 2022 16:05:55 -0700 Subject: [PATCH 05/13] fix test --- .../opentelemetry/sdk/metrics/metric_reader.h | 2 +- .../periodic_exporting_metric_reader.cc | 19 +++++++++++++++++-- sdk/src/metrics/metric_reader.cc | 8 ++++++-- sdk/test/metrics/CMakeLists.txt | 3 ++- .../periodic_exporting_metric_reader_test.cc | 10 ++++++---- 5 files changed, 32 insertions(+), 10 deletions(-) diff --git a/sdk/include/opentelemetry/sdk/metrics/metric_reader.h b/sdk/include/opentelemetry/sdk/metrics/metric_reader.h index 240ca9ba15..c9fcde6276 100644 --- a/sdk/include/opentelemetry/sdk/metrics/metric_reader.h +++ b/sdk/include/opentelemetry/sdk/metrics/metric_reader.h @@ -50,7 +50,7 @@ class MetricReader virtual ~MetricReader() = default; -private: + // private: virtual bool OnForceFlush(std::chrono::microseconds timeout) noexcept = 0; virtual bool OnShutDown(std::chrono::microseconds timeout) noexcept = 0; diff --git a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc index f26c4875c0..1c547c3177 100644 --- a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc +++ b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc @@ -37,12 +37,10 @@ void PeriodicExportingMetricReader::DoBackgroundWork() do { cv_.wait_for(lk, timeout); - if (IsShutdown()) { break; } - std::atomic cancel_export_for_timeout{false}; auto future_receive = std::async(std::launch::async, [this, &cancel_export_for_timeout] { Collect([this, &cancel_export_for_timeout](MetricData data) { @@ -64,11 +62,28 @@ void PeriodicExportingMetricReader::DoBackgroundWork() if (status == std::future_status::timeout) { cancel_export_for_timeout = true; + break; } } while (status != std::future_status::ready); } while (true); } + +bool PeriodicExportingMetricReader::OnForceFlush(std::chrono::microseconds timeout) noexcept +{ + return exporter_->ForceFlush(timeout); +} + +bool PeriodicExportingMetricReader::OnShutDown(std::chrono::microseconds timeout) noexcept +{ + if (worker_thread_.joinable()) + { + cv_.notify_one(); + worker_thread_.join(); + } + return exporter_->Shutdown(timeout); +} + } // namespace metrics } // namespace sdk OPENTELEMETRY_END_NAMESPACE diff --git a/sdk/src/metrics/metric_reader.cc b/sdk/src/metrics/metric_reader.cc index f3bc605471..7931111db7 100644 --- a/sdk/src/metrics/metric_reader.cc +++ b/sdk/src/metrics/metric_reader.cc @@ -47,18 +47,22 @@ bool MetricReader::Collect(nostd::function_ref callback) noexc bool MetricReader::Shutdown(std::chrono::microseconds timeout) noexcept { bool status = true; - if (IsShutdown()) { OTEL_INTERNAL_LOG_WARN("MetricReader::Shutdown - Cannot invoke shutdown twice!"); } + + { + const std::lock_guard locked(lock_); + shutdown_ = true; + } + if (!OnShutDown(timeout)) { status = false; OTEL_INTERNAL_LOG_WARN("MetricReader::OnShutDown Shutdown failed. Will not be tried again!"); } const std::lock_guard locked(lock_); - shutdown_ = true; return status; } diff --git a/sdk/test/metrics/CMakeLists.txt b/sdk/test/metrics/CMakeLists.txt index fa1f22c73a..faf2f2b49f 100644 --- a/sdk/test/metrics/CMakeLists.txt +++ b/sdk/test/metrics/CMakeLists.txt @@ -11,7 +11,8 @@ foreach( observer_result_test sync_instruments_test async_instruments_test - metric_reader_test) + metric_reader_test + periodic_exporting_metric_reader_test) add_executable(${testname} "${testname}.cc") target_link_libraries( ${testname} ${GTEST_BOTH_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT} diff --git a/sdk/test/metrics/periodic_exporting_metric_reader_test.cc b/sdk/test/metrics/periodic_exporting_metric_reader_test.cc index b95083aa76..b322c0f4c7 100644 --- a/sdk/test/metrics/periodic_exporting_metric_reader_test.cc +++ b/sdk/test/metrics/periodic_exporting_metric_reader_test.cc @@ -43,15 +43,15 @@ class MockMetricProducer : public MetricProducer { public: MockMetricProducer(std::chrono::microseconds sleep_ms = std::chrono::microseconds::zero()) - : sleep_ms_{sleep_ms} + : sleep_ms_{sleep_ms}, data_sent_size_(0) {} bool Collect(nostd::function_ref callback) noexcept override { std::this_thread::sleep_for(sleep_ms_); + data_sent_size_++; MetricData data; callback(data); - data_sent_size_++; return true; } @@ -66,11 +66,13 @@ TEST(PeriodicExporingMetricReader, BasicTests) { std::unique_ptr exporter(new MockPushMetricExporter()); PeriodicExportingMetricReaderOptions options; - auto exporter_ptr = exporter.get(); + options.export_timeout_millis = std::chrono::milliseconds(2000); + options.schedule_delay_millis = std::chrono::milliseconds(500); + auto exporter_ptr = exporter.get(); PeriodicExportingMetricReader reader(std::move(exporter), options); MockMetricProducer producer; reader.SetMetricProducer(&producer); - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + std::this_thread::sleep_for(std::chrono::milliseconds(2000)); reader.Shutdown(); EXPECT_EQ(static_cast(exporter_ptr)->GetDataCount(), static_cast(&producer)->GetDataCount()); From 3e60d9682218102a43f4c96b67c0cfba437b9cf4 Mon Sep 17 00:00:00 2001 From: Lalit Date: Fri, 25 Mar 2022 20:54:14 -0700 Subject: [PATCH 06/13] review comment --- .../export/periodic_exporting_metric_reader.h | 12 +++++++---- .../opentelemetry/sdk/metrics/metric_reader.h | 2 +- .../periodic_exporting_metric_reader.cc | 21 ++++++++++++++----- .../periodic_exporting_metric_reader_test.cc | 6 +++--- 4 files changed, 28 insertions(+), 13 deletions(-) diff --git a/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h b/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h index 58c8872c45..db521c33d8 100644 --- a/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h +++ b/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h @@ -20,17 +20,21 @@ namespace metrics class MetricExporter; /** - * Struct to hold batch PeriodicExortingMetricReader options. + * Struct to hold PeriodicExortingMetricReader options. */ +constexpr std::chrono::milliseconds kExportIntervalMillis = std::chrono::milliseconds(60000); +; +constexpr std::chrono::milliseconds kExportTimeOutMillis = std::chrono::milliseconds(30000); struct PeriodicExportingMetricReaderOptions { /* The time interval between two consecutive exports. */ - std::chrono::milliseconds schedule_delay_millis = std::chrono::milliseconds(60000); + std::chrono::milliseconds export_interval_millis = + std::chrono::milliseconds(kExportIntervalMillis); /* how long the export can run before it is cancelled. */ - std::chrono::milliseconds export_timeout_millis = std::chrono::milliseconds(30000); + std::chrono::milliseconds export_timeout_millis = std::chrono::milliseconds(kExportTimeOutMillis); }; class PeriodicExportingMetricReader : public MetricReader @@ -50,7 +54,7 @@ class PeriodicExportingMetricReader : public MetricReader void OnInitialized() noexcept override; std::unique_ptr exporter_; - std::chrono::milliseconds schedule_delay_millis_; + std::chrono::milliseconds export_interval_millis_; std::chrono::milliseconds export_timeout_millis_; void DoBackgroundWork(); diff --git a/sdk/include/opentelemetry/sdk/metrics/metric_reader.h b/sdk/include/opentelemetry/sdk/metrics/metric_reader.h index c9fcde6276..240ca9ba15 100644 --- a/sdk/include/opentelemetry/sdk/metrics/metric_reader.h +++ b/sdk/include/opentelemetry/sdk/metrics/metric_reader.h @@ -50,7 +50,7 @@ class MetricReader virtual ~MetricReader() = default; - // private: +private: virtual bool OnForceFlush(std::chrono::microseconds timeout) noexcept = 0; virtual bool OnShutDown(std::chrono::microseconds timeout) noexcept = 0; diff --git a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc index 1c547c3177..e9a91a9e06 100644 --- a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc +++ b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc @@ -21,9 +21,18 @@ PeriodicExportingMetricReader::PeriodicExportingMetricReader( AggregationTemporality aggregation_temporality) : MetricReader(aggregation_temporality), exporter_{std::move(exporter)}, - schedule_delay_millis_{option.schedule_delay_millis}, + export_interval_millis_{option.export_interval_millis}, export_timeout_millis_{option.export_timeout_millis} -{} +{ + if (export_interval_millis_ <= export_timeout_millis_) + { + OTEL_INTERNAL_LOG_WARN( + "[Periodic Exporting Metric Reader] Invalid configuration: " + "export_interval_millis_ should be less than export_timeout_millis_, using default values"); + export_interval_millis_ = kExportIntervalMillis; + export_timeout_millis_ = kExportTimeOutMillis; + } +} void PeriodicExportingMetricReader::OnInitialized() noexcept { @@ -32,16 +41,15 @@ void PeriodicExportingMetricReader::OnInitialized() noexcept void PeriodicExportingMetricReader::DoBackgroundWork() { - auto timeout = schedule_delay_millis_; std::unique_lock lk(cv_m_); do { - cv_.wait_for(lk, timeout); if (IsShutdown()) { break; } std::atomic cancel_export_for_timeout{false}; + auto start = std::chrono::steady_clock::now(); auto future_receive = std::async(std::launch::async, [this, &cancel_export_for_timeout] { Collect([this, &cancel_export_for_timeout](MetricData data) { if (cancel_export_for_timeout) @@ -65,7 +73,10 @@ void PeriodicExportingMetricReader::DoBackgroundWork() break; } } while (status != std::future_status::ready); - + auto end = std::chrono::steady_clock::now(); + auto export_time_ms = std::chrono::duration_cast(end - start); + auto remaining_wait_interval_ms = export_interval_millis_ - export_time_ms; + cv_.wait_for(lk, remaining_wait_interval_ms); } while (true); } diff --git a/sdk/test/metrics/periodic_exporting_metric_reader_test.cc b/sdk/test/metrics/periodic_exporting_metric_reader_test.cc index b322c0f4c7..66d3832b0b 100644 --- a/sdk/test/metrics/periodic_exporting_metric_reader_test.cc +++ b/sdk/test/metrics/periodic_exporting_metric_reader_test.cc @@ -66,9 +66,9 @@ TEST(PeriodicExporingMetricReader, BasicTests) { std::unique_ptr exporter(new MockPushMetricExporter()); PeriodicExportingMetricReaderOptions options; - options.export_timeout_millis = std::chrono::milliseconds(2000); - options.schedule_delay_millis = std::chrono::milliseconds(500); - auto exporter_ptr = exporter.get(); + options.export_timeout_millis = std::chrono::milliseconds(200); + options.export_interval_millis = std::chrono::milliseconds(500); + auto exporter_ptr = exporter.get(); PeriodicExportingMetricReader reader(std::move(exporter), options); MockMetricProducer producer; reader.SetMetricProducer(&producer); From 1ef7d662c60dc7cbd68b8d1b9b43b920b11862fc Mon Sep 17 00:00:00 2001 From: Lalit Date: Fri, 25 Mar 2022 20:57:49 -0700 Subject: [PATCH 07/13] comment metrics ostream exporter --- exporters/ostream/BUILD | 59 +++++++++++++++++++++-------------------- 1 file changed, 30 insertions(+), 29 deletions(-) diff --git a/exporters/ostream/BUILD b/exporters/ostream/BUILD index cca74d6693..f74d896344 100644 --- a/exporters/ostream/BUILD +++ b/exporters/ostream/BUILD @@ -43,36 +43,37 @@ cc_library( ], ) -cc_library( - name = "ostream_metric_exporter", - srcs = [ - "src/metric_exporter.cc", - ], - hdrs = [ - "include/opentelemetry/exporters/ostream/metric_exporter.h", - ], - strip_include_prefix = "include", - tags = [ - "metrics", - "ostream", - ], - deps = [ - "//sdk/src/metrics", - ], -) +# TODO - Uncomment once MetricData interface is finalised +#cc_library( +# name = "ostream_metric_exporter", +# srcs = [ +# "src/metric_exporter.cc", +# ], +# hdrs = [ +# "include/opentelemetry/exporters/ostream/metric_exporter.h", +# ], +# strip_include_prefix = "include", +# tags = [ +# "metrics", +# "ostream", +# ], +# deps = [ +# "//sdk/src/metrics", +# ], +#) -cc_test( - name = "ostream_metric_test", - srcs = ["test/ostream_metric_test.cc"], - tags = [ - "ostream", - "test", - ], - deps = [ - ":ostream_metric_exporter", - "@com_google_googletest//:gtest_main", - ], -) +#cc_test( +# name = "ostream_metric_test", +# srcs = ["test/ostream_metric_test.cc"], +# tags = [ +# "ostream", +# "test", +# ], +# deps = [ +# ":ostream_metric_exporter", +# "@com_google_googletest//:gtest_main", +# ], +#) cc_test( name = "ostream_metrics_test_deprecated", From 00337e51e6bc03ef86c1c4d2c2ed96679d7456ba Mon Sep 17 00:00:00 2001 From: Lalit Date: Wed, 30 Mar 2022 15:39:28 -0700 Subject: [PATCH 08/13] fix review comments --- .../sdk/metrics/export/periodic_exporting_metric_reader.h | 4 ++-- sdk/test/metrics/periodic_exporting_metric_reader_test.cc | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h b/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h index db521c33d8..43fcb61600 100644 --- a/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h +++ b/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h @@ -63,8 +63,8 @@ class PeriodicExportingMetricReader : public MetricReader std::thread worker_thread_; /* Synchronization primitives */ - std::condition_variable cv_, force_flush_cv_; - std::mutex cv_m_, force_flush_cv_m_, shutdown_m_; + std::condition_variable cv_; + std::mutex cv_m_; }; } // namespace metrics diff --git a/sdk/test/metrics/periodic_exporting_metric_reader_test.cc b/sdk/test/metrics/periodic_exporting_metric_reader_test.cc index 66d3832b0b..f13fbb0e04 100644 --- a/sdk/test/metrics/periodic_exporting_metric_reader_test.cc +++ b/sdk/test/metrics/periodic_exporting_metric_reader_test.cc @@ -18,7 +18,7 @@ class MockPushMetricExporter : public MetricExporter public: opentelemetry::sdk::common::ExportResult Export(const MetricData &record) noexcept override { - records.push_back(record); + records_.push_back(record); return opentelemetry::sdk::common::ExportResult::kSuccess; } @@ -33,10 +33,10 @@ class MockPushMetricExporter : public MetricExporter return true; } - size_t GetDataCount() { return records.size(); } + size_t GetDataCount() { return records_.size(); } private: - std::vector records; + std::vector records_; }; class MockMetricProducer : public MetricProducer From 405b0c0794dc12eddd036e14d910cfefe8f9c01d Mon Sep 17 00:00:00 2001 From: Lalit Date: Wed, 30 Mar 2022 15:47:11 -0700 Subject: [PATCH 09/13] fix comment --- sdk/include/opentelemetry/sdk/metrics/metric_exporter.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/include/opentelemetry/sdk/metrics/metric_exporter.h b/sdk/include/opentelemetry/sdk/metrics/metric_exporter.h index 26193f4cce..2488edd7ed 100644 --- a/sdk/include/opentelemetry/sdk/metrics/metric_exporter.h +++ b/sdk/include/opentelemetry/sdk/metrics/metric_exporter.h @@ -30,9 +30,9 @@ class MetricExporter /** * Exports a batch of metrics recordables. This method must not be called * concurrently for the same exporter instance. - * @param spans a span of unique pointers to metrics data + * @param data metrics data */ - virtual opentelemetry::sdk::common::ExportResult Export(const MetricData &records) noexcept = 0; + virtual opentelemetry::sdk::common::ExportResult Export(const MetricData &data) noexcept = 0; /** * Force flush the exporter. From 582de6b3fa122dc837c6a0a6934f1e3c77a177ca Mon Sep 17 00:00:00 2001 From: Lalit Date: Thu, 31 Mar 2022 08:39:38 -0700 Subject: [PATCH 10/13] review comment - remove unnecessart lock --- sdk/src/metrics/metric_reader.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/sdk/src/metrics/metric_reader.cc b/sdk/src/metrics/metric_reader.cc index 7931111db7..abf73d766a 100644 --- a/sdk/src/metrics/metric_reader.cc +++ b/sdk/src/metrics/metric_reader.cc @@ -62,7 +62,6 @@ bool MetricReader::Shutdown(std::chrono::microseconds timeout) noexcept status = false; OTEL_INTERNAL_LOG_WARN("MetricReader::OnShutDown Shutdown failed. Will not be tried again!"); } - const std::lock_guard locked(lock_); return status; } From 144d5a277a5f1f3f3390814fe2c6f144d833501f Mon Sep 17 00:00:00 2001 From: Lalit Date: Thu, 31 Mar 2022 08:46:41 -0700 Subject: [PATCH 11/13] conflict fix --- .../sdk/metrics/export/periodic_exporting_metric_reader.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h b/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h index 43fcb61600..d6dc86b6ee 100644 --- a/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h +++ b/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h @@ -44,7 +44,7 @@ class PeriodicExportingMetricReader : public MetricReader PeriodicExportingMetricReader( std::unique_ptr exporter, const PeriodicExportingMetricReaderOptions &option, - AggregationTemporality aggregation_temporality = AggregationTemporality::kCummulative); + AggregationTemporality aggregation_temporality = AggregationTemporality::kCumulative); private: bool OnForceFlush(std::chrono::microseconds timeout) noexcept override; From 3bbe566f6e6783c97602dcc07d25fcb2ea88e906 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Thu, 31 Mar 2022 16:22:39 -0700 Subject: [PATCH 12/13] Update sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h Co-authored-by: Tom Tan --- .../sdk/metrics/export/periodic_exporting_metric_reader.h | 1 - 1 file changed, 1 deletion(-) diff --git a/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h b/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h index d6dc86b6ee..732af64f0d 100644 --- a/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h +++ b/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h @@ -24,7 +24,6 @@ class MetricExporter; */ constexpr std::chrono::milliseconds kExportIntervalMillis = std::chrono::milliseconds(60000); -; constexpr std::chrono::milliseconds kExportTimeOutMillis = std::chrono::milliseconds(30000); struct PeriodicExportingMetricReaderOptions { From ca0bb8c0896181f9932725714fa16e501e3db477 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Thu, 31 Mar 2022 16:43:56 -0700 Subject: [PATCH 13/13] format --- .../sdk/metrics/export/periodic_exporting_metric_reader.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h b/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h index 732af64f0d..29125a6ea2 100644 --- a/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h +++ b/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h @@ -24,7 +24,7 @@ class MetricExporter; */ constexpr std::chrono::milliseconds kExportIntervalMillis = std::chrono::milliseconds(60000); -constexpr std::chrono::milliseconds kExportTimeOutMillis = std::chrono::milliseconds(30000); +constexpr std::chrono::milliseconds kExportTimeOutMillis = std::chrono::milliseconds(30000); struct PeriodicExportingMetricReaderOptions {