From 88a7e342a1451c039195eebd167dfb8c05d14669 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Tue, 13 Dec 2022 15:35:18 -0800 Subject: [PATCH 1/7] fix --- .../sdk/metrics/export/periodic_exporting_metric_reader.h | 1 + sdk/test/metrics/periodic_exporting_metric_reader_test.cc | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h b/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h index 96b37f0d40..ecb5cc7e72 100644 --- a/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h +++ b/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h @@ -57,6 +57,7 @@ class PeriodicExportingMetricReader : public MetricReader std::chrono::milliseconds export_timeout_millis_; void DoBackgroundWork(); + bool CollectAndExportOnce(); /* The background worker thread */ std::thread worker_thread_; diff --git a/sdk/test/metrics/periodic_exporting_metric_reader_test.cc b/sdk/test/metrics/periodic_exporting_metric_reader_test.cc index cd789b5028..cd2a1a7522 100644 --- a/sdk/test/metrics/periodic_exporting_metric_reader_test.cc +++ b/sdk/test/metrics/periodic_exporting_metric_reader_test.cc @@ -69,7 +69,7 @@ TEST(PeriodicExporingMetricReader, BasicTests) PeriodicExportingMetricReader reader(std::move(exporter), options); MockMetricProducer producer; reader.SetMetricProducer(&producer); - std::this_thread::sleep_for(std::chrono::milliseconds(2000)); + std::this_thread::sleep_for(std::chrono::milliseconds(5000)); reader.Shutdown(); EXPECT_EQ(static_cast(exporter_ptr)->GetDataCount(), static_cast(&producer)->GetDataCount()); From 4fdfb88e2afda250a8b360a94235bc6338133b9e Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Tue, 13 Dec 2022 15:37:29 -0800 Subject: [PATCH 2/7] revery --- .../periodic_exporting_metric_reader.cc | 65 ++++++++++--------- .../periodic_exporting_metric_reader_test.cc | 2 +- 2 files changed, 36 insertions(+), 31 deletions(-) diff --git a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc index 4220509b74..4fc2424b30 100644 --- a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc +++ b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc @@ -46,40 +46,45 @@ void PeriodicExportingMetricReader::DoBackgroundWork() std::unique_lock lk(cv_m_); do { - if (IsShutdown()) - { - break; - } - std::atomic cancel_export_for_timeout{false}; - auto start = std::chrono::steady_clock::now(); - auto future_receive = std::async(std::launch::async, [this, &cancel_export_for_timeout] { - Collect([this, &cancel_export_for_timeout](ResourceMetrics &metric_data) { - if (cancel_export_for_timeout) - { - OTEL_INTERNAL_LOG_ERROR( - "[Periodic Exporting Metric Reader] Collect took longer configured time: " - << export_timeout_millis_.count() << " ms, and timed out"); - return false; - } - this->exporter_->Export(metric_data); - return true; - }); - }); - std::future_status status; - do - { - status = future_receive.wait_for(std::chrono::milliseconds(export_timeout_millis_)); - if (status == std::future_status::timeout) - { - cancel_export_for_timeout = true; - break; - } - } while (status != std::future_status::ready); + auto start = std::chrono::steady_clock::now(); + CollectAndExportOnce(); auto end = std::chrono::steady_clock::now(); auto export_time_ms = std::chrono::duration_cast(end - start); auto remaining_wait_interval_ms = export_interval_millis_ - export_time_ms; cv_.wait_for(lk, remaining_wait_interval_ms); - } while (true); + } while (IsShutdown() != true); + // One last Collect and Export before shutdown + CollectAndExportOnce(); +} + +bool PeriodicExportingMetricReader::CollectAndExportOnce() +{ + std::atomic cancel_export_for_timeout{false}; + auto future_receive = std::async(std::launch::async, [this, &cancel_export_for_timeout] { + Collect([this, &cancel_export_for_timeout](ResourceMetrics &metric_data) { + if (cancel_export_for_timeout) + { + OTEL_INTERNAL_LOG_ERROR( + "[Periodic Exporting Metric Reader] Collect took longer configured time: " + << export_timeout_millis_.count() << " ms, and timed out"); + return false; + } + this->exporter_->Export(metric_data); + std::cout << "\n Export done\n"; + return true; + }); + }); + std::future_status status; + do + { + status = future_receive.wait_for(std::chrono::milliseconds(export_timeout_millis_)); + if (status == std::future_status::timeout) + { + cancel_export_for_timeout = true; + break; + } + } while (status != std::future_status::ready); + return true; } bool PeriodicExportingMetricReader::OnForceFlush(std::chrono::microseconds timeout) noexcept diff --git a/sdk/test/metrics/periodic_exporting_metric_reader_test.cc b/sdk/test/metrics/periodic_exporting_metric_reader_test.cc index cd2a1a7522..cd789b5028 100644 --- a/sdk/test/metrics/periodic_exporting_metric_reader_test.cc +++ b/sdk/test/metrics/periodic_exporting_metric_reader_test.cc @@ -69,7 +69,7 @@ TEST(PeriodicExporingMetricReader, BasicTests) PeriodicExportingMetricReader reader(std::move(exporter), options); MockMetricProducer producer; reader.SetMetricProducer(&producer); - std::this_thread::sleep_for(std::chrono::milliseconds(5000)); + std::this_thread::sleep_for(std::chrono::milliseconds(2000)); reader.Shutdown(); EXPECT_EQ(static_cast(exporter_ptr)->GetDataCount(), static_cast(&producer)->GetDataCount()); From f20a7ee4c73cf4f759e0769c6a3f43a0836f9f78 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Tue, 13 Dec 2022 15:40:06 -0800 Subject: [PATCH 3/7] remove debug stmt --- sdk/src/metrics/export/periodic_exporting_metric_reader.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc index 4fc2424b30..6fc1754c26 100644 --- a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc +++ b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc @@ -70,7 +70,6 @@ bool PeriodicExportingMetricReader::CollectAndExportOnce() return false; } this->exporter_->Export(metric_data); - std::cout << "\n Export done\n"; return true; }); }); From f5374245ebf71b8b9614b82bab942f5422a6d77f Mon Sep 17 00:00:00 2001 From: Lalit Date: Tue, 13 Dec 2022 16:56:28 -0800 Subject: [PATCH 4/7] error handling --- .../export/periodic_exporting_metric_reader.cc | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc index 6fc1754c26..8a9ac84ac2 100644 --- a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc +++ b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc @@ -46,15 +46,23 @@ void PeriodicExportingMetricReader::DoBackgroundWork() std::unique_lock lk(cv_m_); do { - auto start = std::chrono::steady_clock::now(); - CollectAndExportOnce(); + auto start = std::chrono::steady_clock::now(); + auto status = CollectAndExportOnce(); + if (!status) + { + OTEL_INTERNAL_LOG_ERROR("[Periodic Exporting Metric Reader] Collect-Export Cycle Failure.") + } auto end = std::chrono::steady_clock::now(); auto export_time_ms = std::chrono::duration_cast(end - start); auto remaining_wait_interval_ms = export_interval_millis_ - export_time_ms; cv_.wait_for(lk, remaining_wait_interval_ms); } while (IsShutdown() != true); // One last Collect and Export before shutdown - CollectAndExportOnce(); + auto status = CollectAndExportOnce(); + if (!status) + { + OTEL_INTERNAL_LOG_ERROR("[Periodic Exporting Metric Reader] Collect-Export Cycle Failure.") + } } bool PeriodicExportingMetricReader::CollectAndExportOnce() From fb628db92d105ed63f4cf680acb01ac7e8f77bbf Mon Sep 17 00:00:00 2001 From: Lalit Date: Tue, 13 Dec 2022 19:53:36 -0800 Subject: [PATCH 5/7] add test --- sdk/src/metrics/metric_reader.cc | 4 ++- .../periodic_exporting_metric_reader_test.cc | 33 ++++++++++++------- 2 files changed, 24 insertions(+), 13 deletions(-) diff --git a/sdk/src/metrics/metric_reader.cc b/sdk/src/metrics/metric_reader.cc index 9ac7fc21ba..a778f0c41c 100644 --- a/sdk/src/metrics/metric_reader.cc +++ b/sdk/src/metrics/metric_reader.cc @@ -28,10 +28,12 @@ bool MetricReader::Collect( OTEL_INTERNAL_LOG_WARN( "MetricReader::Collect Cannot invoke Collect(). No MetricProducer registered for " "collection!") + return false; } if (IsShutdown()) { - OTEL_INTERNAL_LOG_WARN("MetricReader::Collect Cannot invoke Collect(). Shutdown in progress!"); + // Continue with warning, and let pull and push MetricReader state machine handle this. + OTEL_INTERNAL_LOG_WARN("MetricReader::Collect invoked while Shutdown in progress!"); } return metric_producer_->Collect(callback); diff --git a/sdk/test/metrics/periodic_exporting_metric_reader_test.cc b/sdk/test/metrics/periodic_exporting_metric_reader_test.cc index cd789b5028..5cd439fb04 100644 --- a/sdk/test/metrics/periodic_exporting_metric_reader_test.cc +++ b/sdk/test/metrics/periodic_exporting_metric_reader_test.cc @@ -61,16 +61,25 @@ class MockMetricProducer : public MetricProducer TEST(PeriodicExporingMetricReader, BasicTests) { - std::unique_ptr exporter(new MockPushMetricExporter()); - PeriodicExportingMetricReaderOptions options; - options.export_timeout_millis = std::chrono::milliseconds(200); - options.export_interval_millis = std::chrono::milliseconds(500); - auto exporter_ptr = exporter.get(); - PeriodicExportingMetricReader reader(std::move(exporter), options); - MockMetricProducer producer; - reader.SetMetricProducer(&producer); - std::this_thread::sleep_for(std::chrono::milliseconds(2000)); - reader.Shutdown(); - EXPECT_EQ(static_cast(exporter_ptr)->GetDataCount(), - static_cast(&producer)->GetDataCount()); + size_t num_of_iterations = 20; + + for (size_t i = 0; i < num_of_iterations; i++) + { + std::unique_ptr exporter(new MockPushMetricExporter()); + PeriodicExportingMetricReaderOptions options; + size_t lowest_ms = 100; + size_t range_ms = 200; + options.export_timeout_millis = std::chrono::milliseconds(200); + options.export_interval_millis = std::chrono::milliseconds(500); + auto exporter_ptr = exporter.get(); + + auto sleep_ms = lowest_ms + rand() % range_ms; + PeriodicExportingMetricReader reader(std::move(exporter), options); + MockMetricProducer producer; + reader.SetMetricProducer(&producer); + std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms)); + reader.Shutdown(); + EXPECT_EQ(static_cast(exporter_ptr)->GetDataCount(), + static_cast(&producer)->GetDataCount()); + } } From 333b8d27f299f77e1e17ac2118927f529cbb10db Mon Sep 17 00:00:00 2001 From: Lalit Date: Tue, 13 Dec 2022 19:56:43 -0800 Subject: [PATCH 6/7] Revert test --- .../periodic_exporting_metric_reader_test.cc | 31 +++++++------------ 1 file changed, 12 insertions(+), 19 deletions(-) diff --git a/sdk/test/metrics/periodic_exporting_metric_reader_test.cc b/sdk/test/metrics/periodic_exporting_metric_reader_test.cc index 5cd439fb04..64e7aec710 100644 --- a/sdk/test/metrics/periodic_exporting_metric_reader_test.cc +++ b/sdk/test/metrics/periodic_exporting_metric_reader_test.cc @@ -63,23 +63,16 @@ TEST(PeriodicExporingMetricReader, BasicTests) { size_t num_of_iterations = 20; - for (size_t i = 0; i < num_of_iterations; i++) - { - std::unique_ptr exporter(new MockPushMetricExporter()); - PeriodicExportingMetricReaderOptions options; - size_t lowest_ms = 100; - size_t range_ms = 200; - options.export_timeout_millis = std::chrono::milliseconds(200); - options.export_interval_millis = std::chrono::milliseconds(500); - auto exporter_ptr = exporter.get(); - - auto sleep_ms = lowest_ms + rand() % range_ms; - PeriodicExportingMetricReader reader(std::move(exporter), options); - MockMetricProducer producer; - reader.SetMetricProducer(&producer); - std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms)); - reader.Shutdown(); - EXPECT_EQ(static_cast(exporter_ptr)->GetDataCount(), - static_cast(&producer)->GetDataCount()); - } + std::unique_ptr exporter(new MockPushMetricExporter()); + PeriodicExportingMetricReaderOptions options; + options.export_timeout_millis = std::chrono::milliseconds(200); + options.export_interval_millis = std::chrono::milliseconds(500); + auto exporter_ptr = exporter.get(); + PeriodicExportingMetricReader reader(std::move(exporter), options); + MockMetricProducer producer; + reader.SetMetricProducer(&producer); + std::this_thread::sleep_for(std::chrono::milliseconds(2000)); + reader.Shutdown(); + EXPECT_EQ(static_cast(exporter_ptr)->GetDataCount(), + static_cast(&producer)->GetDataCount()); } From 91e1aa9b1aaabe801335e25d77f26a783bfce7d5 Mon Sep 17 00:00:00 2001 From: Lalit Date: Tue, 13 Dec 2022 19:59:30 -0800 Subject: [PATCH 7/7] revert test --- sdk/test/metrics/periodic_exporting_metric_reader_test.cc | 2 -- 1 file changed, 2 deletions(-) diff --git a/sdk/test/metrics/periodic_exporting_metric_reader_test.cc b/sdk/test/metrics/periodic_exporting_metric_reader_test.cc index 64e7aec710..cd789b5028 100644 --- a/sdk/test/metrics/periodic_exporting_metric_reader_test.cc +++ b/sdk/test/metrics/periodic_exporting_metric_reader_test.cc @@ -61,8 +61,6 @@ class MockMetricProducer : public MetricProducer TEST(PeriodicExporingMetricReader, BasicTests) { - size_t num_of_iterations = 20; - std::unique_ptr exporter(new MockPushMetricExporter()); PeriodicExportingMetricReaderOptions options; options.export_timeout_millis = std::chrono::milliseconds(200);