From dda381239f312934765e6e6588e0ae2d300eba85 Mon Sep 17 00:00:00 2001 From: owent Date: Mon, 1 Jul 2024 15:45:28 +0800 Subject: [PATCH 1/8] Fix crash in `PeriodicExportingMetricReader`. --- .../opentelemetry/sdk/metrics/metric_reader.h | 3 +- .../periodic_exporting_metric_reader.cc | 35 +++++++++++-------- 2 files changed, 23 insertions(+), 15 deletions(-) diff --git a/sdk/include/opentelemetry/sdk/metrics/metric_reader.h b/sdk/include/opentelemetry/sdk/metrics/metric_reader.h index 30d5e60823..545fc56b7a 100644 --- a/sdk/include/opentelemetry/sdk/metrics/metric_reader.h +++ b/sdk/include/opentelemetry/sdk/metrics/metric_reader.h @@ -5,6 +5,7 @@ #include #include +#include #include "opentelemetry/nostd/function_ref.h" #include "opentelemetry/sdk/metrics/export/metric_producer.h" @@ -19,7 +20,7 @@ namespace metrics /** * MetricReader defines the interface to collect metrics from SDK */ -class MetricReader +class MetricReader : public std::enable_shared_from_this { public: MetricReader(); diff --git a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc index f6ae47977d..c9dd3d8810 100644 --- a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc +++ b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc @@ -89,19 +89,26 @@ void PeriodicExportingMetricReader::DoBackgroundWork() bool PeriodicExportingMetricReader::CollectAndExportOnce() { - std::atomic cancel_export_for_timeout{false}; - auto future_receive = std::async(std::launch::async, [this, &cancel_export_for_timeout] { - Collect([this, &cancel_export_for_timeout](ResourceMetrics &metric_data) { - if (cancel_export_for_timeout) - { - OTEL_INTERNAL_LOG_ERROR( - "[Periodic Exporting Metric Reader] Collect took longer configured time: " - << export_timeout_millis_.count() << " ms, and timed out"); - return false; - } - this->exporter_->Export(metric_data); - return true; - }); + std::shared_ptr> cancel_export_for_timeout = + std::make_shared>(false); + auto keep_lifetime = shared_from_this(); + + auto future_receive = std::async(std::launch::async, [keep_lifetime, cancel_export_for_timeout] { + static_cast(keep_lifetime.get()) + ->Collect([keep_lifetime, cancel_export_for_timeout](ResourceMetrics &metric_data) { + if (*cancel_export_for_timeout) + { + OTEL_INTERNAL_LOG_ERROR( + "[Periodic Exporting Metric Reader] Collect took longer configured time: " + << static_cast(keep_lifetime.get()) + ->export_timeout_millis_.count() + << " ms, and timed out"); + return false; + } + static_cast(keep_lifetime.get()) + ->exporter_->Export(metric_data); + return true; + }); }); std::future_status status; @@ -111,7 +118,7 @@ bool PeriodicExportingMetricReader::CollectAndExportOnce() status = future_receive.wait_for(std::chrono::milliseconds(export_timeout_millis_)); if (status == std::future_status::timeout) { - cancel_export_for_timeout = true; + *cancel_export_for_timeout = true; break; } } while (status != std::future_status::ready); From d5e0eb777106bc45f0e92541e922b167f64677a1 Mon Sep 17 00:00:00 2001 From: owent Date: Mon, 1 Jul 2024 17:28:45 +0800 Subject: [PATCH 2/8] Add unit test --- .../periodic_exporting_metric_reader_test.cc | 38 ++++++++++++++++--- 1 file changed, 33 insertions(+), 5 deletions(-) diff --git a/sdk/test/metrics/periodic_exporting_metric_reader_test.cc b/sdk/test/metrics/periodic_exporting_metric_reader_test.cc index e115f79f75..f65c10ca05 100644 --- a/sdk/test/metrics/periodic_exporting_metric_reader_test.cc +++ b/sdk/test/metrics/periodic_exporting_metric_reader_test.cc @@ -7,6 +7,10 @@ #include +#include +#include +#include + using namespace opentelemetry; using namespace opentelemetry::sdk::instrumentationscope; using namespace opentelemetry::sdk::metrics; @@ -14,8 +18,14 @@ using namespace opentelemetry::sdk::metrics; class MockPushMetricExporter : public PushMetricExporter { public: + MockPushMetricExporter(std::chrono::milliseconds wait) : wait_(wait) {} + opentelemetry::sdk::common::ExportResult Export(const ResourceMetrics &record) noexcept override { + if (wait_ > std::chrono::milliseconds::zero()) + { + std::this_thread::sleep_for(wait_); + } records_.push_back(record); return opentelemetry::sdk::common::ExportResult::kSuccess; } @@ -34,6 +44,7 @@ class MockPushMetricExporter : public PushMetricExporter private: std::vector records_; + std::chrono::milliseconds wait_; }; class MockMetricProducer : public MetricProducer @@ -61,17 +72,34 @@ class MockMetricProducer : public MetricProducer TEST(PeriodicExporingMetricReader, BasicTests) { - std::unique_ptr exporter(new MockPushMetricExporter()); + std::unique_ptr exporter( + new MockPushMetricExporter(std::chrono::milliseconds{0})); PeriodicExportingMetricReaderOptions options; options.export_timeout_millis = std::chrono::milliseconds(200); options.export_interval_millis = std::chrono::milliseconds(500); auto exporter_ptr = exporter.get(); - PeriodicExportingMetricReader reader(std::move(exporter), options); + std::shared_ptr reader = + std::make_shared(std::move(exporter), options); MockMetricProducer producer; - reader.SetMetricProducer(&producer); + reader->SetMetricProducer(&producer); std::this_thread::sleep_for(std::chrono::milliseconds(2000)); - EXPECT_NO_THROW(reader.ForceFlush()); - reader.Shutdown(); + EXPECT_NO_THROW(reader->ForceFlush()); + reader->Shutdown(); EXPECT_EQ(static_cast(exporter_ptr)->GetDataCount(), static_cast(&producer)->GetDataCount()); } + +TEST(PeriodicExporingMetricReader, Timeout) +{ + std::unique_ptr exporter( + new MockPushMetricExporter(std::chrono::milliseconds{2000})); + PeriodicExportingMetricReaderOptions options; + options.export_timeout_millis = std::chrono::milliseconds(200); + options.export_interval_millis = std::chrono::milliseconds(500); + std::shared_ptr reader = + std::make_shared(std::move(exporter), options); + MockMetricProducer producer; + reader->SetMetricProducer(&producer); + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + reader->Shutdown(); +} From 06d14caa4c39c7cc9485eb3d7eb855a81008da5b Mon Sep 17 00:00:00 2001 From: owent Date: Fri, 5 Jul 2024 15:24:58 +0800 Subject: [PATCH 3/8] Optimize for readibility --- .../metrics/export/periodic_exporting_metric_reader.cc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc index c9dd3d8810..a28c87e7d0 100644 --- a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc +++ b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc @@ -94,9 +94,9 @@ bool PeriodicExportingMetricReader::CollectAndExportOnce() auto keep_lifetime = shared_from_this(); auto future_receive = std::async(std::launch::async, [keep_lifetime, cancel_export_for_timeout] { - static_cast(keep_lifetime.get()) - ->Collect([keep_lifetime, cancel_export_for_timeout](ResourceMetrics &metric_data) { - if (*cancel_export_for_timeout) + keep_lifetime->Collect( + [keep_lifetime, cancel_export_for_timeout](ResourceMetrics &metric_data) { + if (cancel_export_for_timeout->load(std::memory_order_acquire)) { OTEL_INTERNAL_LOG_ERROR( "[Periodic Exporting Metric Reader] Collect took longer configured time: " @@ -118,7 +118,7 @@ bool PeriodicExportingMetricReader::CollectAndExportOnce() status = future_receive.wait_for(std::chrono::milliseconds(export_timeout_millis_)); if (status == std::future_status::timeout) { - *cancel_export_for_timeout = true; + cancel_export_for_timeout->store(true, std::memory_order_release); break; } } while (status != std::future_status::ready); From 46e303279bed43b0262b5b1012ca608f76242e4f Mon Sep 17 00:00:00 2001 From: owent Date: Sun, 7 Jul 2024 16:35:52 +0800 Subject: [PATCH 4/8] Reduce the scope of `enable_shared_from_this` for `PeriodicExportingMetricReader` --- .../sdk/metrics/export/periodic_exporting_metric_reader.h | 4 +++- sdk/include/opentelemetry/sdk/metrics/metric_reader.h | 2 +- sdk/src/metrics/export/periodic_exporting_metric_reader.cc | 7 ++----- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h b/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h index d72ec08839..25167fbbb2 100644 --- a/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h +++ b/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h @@ -23,7 +23,9 @@ namespace sdk namespace metrics { -class PeriodicExportingMetricReader : public MetricReader +class PeriodicExportingMetricReader + : public MetricReader, + public std::enable_shared_from_this { public: diff --git a/sdk/include/opentelemetry/sdk/metrics/metric_reader.h b/sdk/include/opentelemetry/sdk/metrics/metric_reader.h index 545fc56b7a..d3660289ae 100644 --- a/sdk/include/opentelemetry/sdk/metrics/metric_reader.h +++ b/sdk/include/opentelemetry/sdk/metrics/metric_reader.h @@ -20,7 +20,7 @@ namespace metrics /** * MetricReader defines the interface to collect metrics from SDK */ -class MetricReader : public std::enable_shared_from_this +class MetricReader { public: MetricReader(); diff --git a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc index a28c87e7d0..ee9d299b7b 100644 --- a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc +++ b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc @@ -100,13 +100,10 @@ bool PeriodicExportingMetricReader::CollectAndExportOnce() { OTEL_INTERNAL_LOG_ERROR( "[Periodic Exporting Metric Reader] Collect took longer configured time: " - << static_cast(keep_lifetime.get()) - ->export_timeout_millis_.count() - << " ms, and timed out"); + << keep_lifetime->export_timeout_millis_.count() << " ms, and timed out"); return false; } - static_cast(keep_lifetime.get()) - ->exporter_->Export(metric_data); + keep_lifetime->exporter_->Export(metric_data); return true; }); }); From 24733e45d7a4a957d832a0abb4ab85e6b7b877fd Mon Sep 17 00:00:00 2001 From: owent Date: Wed, 10 Jul 2024 19:53:07 +0800 Subject: [PATCH 5/8] Remove unused header --- sdk/include/opentelemetry/sdk/metrics/metric_reader.h | 1 - 1 file changed, 1 deletion(-) diff --git a/sdk/include/opentelemetry/sdk/metrics/metric_reader.h b/sdk/include/opentelemetry/sdk/metrics/metric_reader.h index d3660289ae..30d5e60823 100644 --- a/sdk/include/opentelemetry/sdk/metrics/metric_reader.h +++ b/sdk/include/opentelemetry/sdk/metrics/metric_reader.h @@ -5,7 +5,6 @@ #include #include -#include #include "opentelemetry/nostd/function_ref.h" #include "opentelemetry/sdk/metrics/export/metric_producer.h" From e73b275bfffd2ecd5b6ef6c44e642891f82d7665 Mon Sep 17 00:00:00 2001 From: owent Date: Wed, 10 Jul 2024 20:04:19 +0800 Subject: [PATCH 6/8] Cleanup IWYU --- examples/plugin/plugin/tracer.cc | 1 + exporters/otlp/src/otlp_file_client.cc | 22 ++++++++++++---------- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/examples/plugin/plugin/tracer.cc b/examples/plugin/plugin/tracer.cc index 5361197584..05ff971fb1 100644 --- a/examples/plugin/plugin/tracer.cc +++ b/examples/plugin/plugin/tracer.cc @@ -9,6 +9,7 @@ #include "opentelemetry/common/attribute_value.h" #include "opentelemetry/common/timestamp.h" #include "opentelemetry/context/context_value.h" +#include "opentelemetry/nostd/utility.h" #include "opentelemetry/trace/span_context.h" #include "opentelemetry/trace/span_metadata.h" #include "tracer.h" diff --git a/exporters/otlp/src/otlp_file_client.cc b/exporters/otlp/src/otlp_file_client.cc index bd36a602c4..9d71f4f91c 100644 --- a/exporters/otlp/src/otlp_file_client.cc +++ b/exporters/otlp/src/otlp_file_client.cc @@ -14,8 +14,6 @@ // clang-format on #include "google/protobuf/message.h" -#include "google/protobuf/reflection.h" -#include "google/protobuf/stubs/common.h" #include "nlohmann/json.hpp" // clang-format off @@ -28,15 +26,26 @@ #include "opentelemetry/sdk/common/global_log_handler.h" #include "opentelemetry/version.h" +#ifdef _MSC_VER +# include +# define strcasecmp _stricmp +#else +# include +#endif + +#include #include #include +#include #include -#include #include +#include #include +#include #include #include #include +#include #include #if !defined(__CYGWIN__) && defined(_WIN32) @@ -64,11 +73,8 @@ #else -# include -# include # include # include -# include # include # define FS_ACCESS(x) access(x, F_OK) @@ -89,10 +95,6 @@ # undef GetMessage #endif -#ifdef _MSC_VER -# define strcasecmp _stricmp -#endif - #if (defined(_MSC_VER) && _MSC_VER >= 1600) || \ (defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L) || defined(__STDC_LIB_EXT1__) # ifdef _MSC_VER From 38b6a70707dbf3f3d1a51cbd148d17832fc49190 Mon Sep 17 00:00:00 2001 From: owent Date: Tue, 16 Jul 2024 16:14:35 +0800 Subject: [PATCH 7/8] Only keep lifetime when in callback of std::async --- .../export/periodic_exporting_metric_reader.cc | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc index ee9d299b7b..715c30b1b4 100644 --- a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc +++ b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc @@ -91,11 +91,16 @@ bool PeriodicExportingMetricReader::CollectAndExportOnce() { std::shared_ptr> cancel_export_for_timeout = std::make_shared>(false); - auto keep_lifetime = shared_from_this(); + auto watch_lifetime = std::weak_ptr(shared_from_this()); - auto future_receive = std::async(std::launch::async, [keep_lifetime, cancel_export_for_timeout] { + auto future_receive = std::async(std::launch::async, [watch_lifetime, cancel_export_for_timeout] { + auto keep_lifetime = watch_lifetime.lock(); + if (!keep_lifetime) + { + return; + } keep_lifetime->Collect( - [keep_lifetime, cancel_export_for_timeout](ResourceMetrics &metric_data) { + [&keep_lifetime, cancel_export_for_timeout](ResourceMetrics &metric_data) { if (cancel_export_for_timeout->load(std::memory_order_acquire)) { OTEL_INTERNAL_LOG_ERROR( From f6f3467baea66742b2c32c75c79a02545fd8ae37 Mon Sep 17 00:00:00 2001 From: owent Date: Tue, 16 Jul 2024 22:03:27 +0800 Subject: [PATCH 8/8] Use `std::thread` instead of `std::async` to avoid BUGs in some STL implementations. --- api/include/opentelemetry/common/macros.h | 31 +++++++ .../export/periodic_exporting_metric_reader.h | 4 +- .../periodic_exporting_metric_reader.cc | 88 ++++++++++++------- 3 files changed, 89 insertions(+), 34 deletions(-) diff --git a/api/include/opentelemetry/common/macros.h b/api/include/opentelemetry/common/macros.h index b4a270084d..b74c1048fc 100644 --- a/api/include/opentelemetry/common/macros.h +++ b/api/include/opentelemetry/common/macros.h @@ -387,6 +387,37 @@ point. #endif +// OPENTELEMETRY_HAVE_EXCEPTIONS +// +// Checks whether the compiler both supports and enables exceptions. Many +// compilers support a "no exceptions" mode that disables exceptions. +// +// Generally, when OPENTELEMETRY_HAVE_EXCEPTIONS is not defined: +// +// * Code using `throw` and `try` may not compile. +// * The `noexcept` specifier will still compile and behave as normal. +// * The `noexcept` operator may still return `false`. +// +// For further details, consult the compiler's documentation. +#ifndef OPENTELEMETRY_HAVE_EXCEPTIONS +# if defined(__clang__) && ((__clang_major__ * 100) + __clang_minor__) < 306 +// Clang < 3.6 +// http://releases.llvm.org/3.6.0/tools/clang/docs/ReleaseNotes.html#the-exceptions-macro +# if defined(__EXCEPTIONS) && OPENTELEMETRY_HAVE_FEATURE(cxx_exceptions) +# define OPENTELEMETRY_HAVE_EXCEPTIONS 1 +# endif // defined(__EXCEPTIONS) && OPENTELEMETRY_HAVE_FEATURE(cxx_exceptions) +# elif OPENTELEMETRY_HAVE_FEATURE(cxx_exceptions) +# define OPENTELEMETRY_HAVE_EXCEPTIONS 1 +// Handle remaining special cases and default to exceptions being supported. +# elif !(defined(__GNUC__) && !defined(__EXCEPTIONS) && !defined(__cpp_exceptions)) && \ + !(defined(_MSC_VER) && !defined(_CPPUNWIND)) +# define OPENTELEMETRY_HAVE_EXCEPTIONS 1 +# endif +#endif +#ifndef OPENTELEMETRY_HAVE_EXCEPTIONS +# define OPENTELEMETRY_HAVE_EXCEPTIONS 0 +#endif + /* OPENTELEMETRY_ATTRIBUTE_LIFETIME_BOUND indicates that a resource owned by a function parameter or implicit object parameter is retained by the return value of the diff --git a/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h b/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h index 25167fbbb2..d72ec08839 100644 --- a/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h +++ b/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h @@ -23,9 +23,7 @@ namespace sdk namespace metrics { -class PeriodicExportingMetricReader - : public MetricReader, - public std::enable_shared_from_this +class PeriodicExportingMetricReader : public MetricReader { public: diff --git a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc index 715c30b1b4..7ca2747337 100644 --- a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc +++ b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc @@ -13,6 +13,7 @@ #include #include +#include "opentelemetry/common/macros.h" #include "opentelemetry/common/timestamp.h" #include "opentelemetry/sdk/common/global_log_handler.h" #include "opentelemetry/sdk/metrics/export/metric_producer.h" @@ -29,6 +30,10 @@ # include #endif +#if OPENTELEMETRY_HAVE_EXCEPTIONS +# include +#endif + OPENTELEMETRY_BEGIN_NAMESPACE namespace sdk { @@ -89,41 +94,62 @@ void PeriodicExportingMetricReader::DoBackgroundWork() bool PeriodicExportingMetricReader::CollectAndExportOnce() { - std::shared_ptr> cancel_export_for_timeout = - std::make_shared>(false); - auto watch_lifetime = std::weak_ptr(shared_from_this()); + std::atomic cancel_export_for_timeout{false}; - auto future_receive = std::async(std::launch::async, [watch_lifetime, cancel_export_for_timeout] { - auto keep_lifetime = watch_lifetime.lock(); - if (!keep_lifetime) - { - return; - } - keep_lifetime->Collect( - [&keep_lifetime, cancel_export_for_timeout](ResourceMetrics &metric_data) { - if (cancel_export_for_timeout->load(std::memory_order_acquire)) - { - OTEL_INTERNAL_LOG_ERROR( - "[Periodic Exporting Metric Reader] Collect took longer configured time: " - << keep_lifetime->export_timeout_millis_.count() << " ms, and timed out"); - return false; - } - keep_lifetime->exporter_->Export(metric_data); - return true; - }); - }); - - std::future_status status; std::uint64_t notify_force_flush = force_flush_pending_sequence_.load(std::memory_order_acquire); - do + std::unique_ptr task_thread; + +#if OPENTELEMETRY_HAVE_EXCEPTIONS + try { - status = future_receive.wait_for(std::chrono::milliseconds(export_timeout_millis_)); - if (status == std::future_status::timeout) +#endif + std::promise sender; + auto receiver = sender.get_future(); + + task_thread.reset(new std::thread([this, &cancel_export_for_timeout] { + this->Collect([this, &cancel_export_for_timeout](ResourceMetrics &metric_data) { + if (cancel_export_for_timeout.load(std::memory_order_acquire)) + { + OTEL_INTERNAL_LOG_ERROR( + "[Periodic Exporting Metric Reader] Collect took longer configured time: " + << this->export_timeout_millis_.count() << " ms, and timed out"); + return false; + } + this->exporter_->Export(metric_data); + return true; + }); + })); + + std::future_status status; + do { - cancel_export_for_timeout->store(true, std::memory_order_release); - break; - } - } while (status != std::future_status::ready); + status = receiver.wait_for(std::chrono::milliseconds(export_timeout_millis_)); + if (status == std::future_status::timeout) + { + cancel_export_for_timeout.store(true, std::memory_order_release); + break; + } + } while (status != std::future_status::ready); +#if OPENTELEMETRY_HAVE_EXCEPTIONS + } + catch (std::exception &e) + { + OTEL_INTERNAL_LOG_ERROR("[Periodic Exporting Metric Reader] Collect failed with exception " + << e.what()); + return false; + } + catch (...) + { + OTEL_INTERNAL_LOG_ERROR( + "[Periodic Exporting Metric Reader] Collect failed with unknown exception"); + return false; + } +#endif + + if (task_thread && task_thread->joinable()) + { + task_thread->join(); + } std::uint64_t notified_sequence = force_flush_notified_sequence_.load(std::memory_order_acquire); while (notify_force_flush > notified_sequence)