diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 15274be986..d723ea2fe1 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -4,7 +4,7 @@ on: push: branches: [ main ] pull_request: - branches: [ main, async-changes ] + branches: [ main ] jobs: cmake_test: diff --git a/.github/workflows/dependencies_image.yml b/.github/workflows/dependencies_image.yml index 3a55a0b4e8..2e3aa9bb42 100644 --- a/.github/workflows/dependencies_image.yml +++ b/.github/workflows/dependencies_image.yml @@ -11,7 +11,7 @@ jobs: steps: - name: checkout - uses: actions/checkout@v2 + uses: actions/checkout@v3 - name: Set up QEMU uses: docker/setup-qemu-action@v2 diff --git a/exporters/otlp/src/otlp_http_client.cc b/exporters/otlp/src/otlp_http_client.cc index 12731120c1..1818b2132e 100644 --- a/exporters/otlp/src/otlp_http_client.cc +++ b/exporters/otlp/src/otlp_http_client.cc @@ -847,20 +847,20 @@ void OtlpHttpClient::ReleaseSession( { bool has_session = false; - { - std::lock_guard guard{session_manager_lock_}; + std::lock_guard guard{session_manager_lock_}; - auto session_iter = running_sessions_.find(&session); - if (session_iter != running_sessions_.end()) - { - // Move session and handle into gc list, and they will be destroyed later - gc_sessions_.emplace_back(std::move(session_iter->second)); - running_sessions_.erase(session_iter); + auto session_iter = running_sessions_.find(&session); + if (session_iter != running_sessions_.end()) + { + // Move session and handle into gc list, and they will be destroyed later + gc_sessions_.emplace_back(std::move(session_iter->second)); + running_sessions_.erase(session_iter); - has_session = true; - } + has_session = true; } + // Call session_waker_.notify_all() with session_manager_lock_ locked to keep session_waker_ + // available when destroying OtlpHttpClient if (has_session) { session_waker_.notify_all(); diff --git a/sdk/include/opentelemetry/sdk/logs/simple_log_processor.h b/sdk/include/opentelemetry/sdk/logs/simple_log_processor.h index cc3aec47b2..45f4683973 100644 --- a/sdk/include/opentelemetry/sdk/logs/simple_log_processor.h +++ b/sdk/include/opentelemetry/sdk/logs/simple_log_processor.h @@ -41,13 +41,15 @@ class SimpleLogProcessor : public LogProcessor bool Shutdown( std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override; + bool IsShutdown() const noexcept; + private: // The configured exporter std::unique_ptr exporter_; // The lock used to ensure the exporter is not called concurrently opentelemetry::common::SpinLockMutex lock_; - // The atomic boolean flag to ensure the ShutDown() function is only called once - std::atomic_flag shutdown_latch_ = ATOMIC_FLAG_INIT; + // The atomic boolean to ensure the ShutDown() function is only called once + std::atomic is_shutdown_; }; } // namespace logs } // namespace sdk diff --git a/sdk/src/logs/logger_context.cc b/sdk/src/logs/logger_context.cc index b0025ff724..dd26d7e508 100644 --- a/sdk/src/logs/logger_context.cc +++ b/sdk/src/logs/logger_context.cc @@ -44,7 +44,7 @@ bool LoggerContext::ForceFlush(std::chrono::microseconds timeout) noexcept bool LoggerContext::Shutdown(std::chrono::microseconds timeout) noexcept { - return processor_->ForceFlush(timeout); + return processor_->Shutdown(timeout); } } // namespace logs diff --git a/sdk/src/logs/simple_log_processor.cc b/sdk/src/logs/simple_log_processor.cc index 6e2fde9f14..c9ba4c7895 100644 --- a/sdk/src/logs/simple_log_processor.cc +++ b/sdk/src/logs/simple_log_processor.cc @@ -17,7 +17,7 @@ namespace logs * @param exporter the configured exporter where log records are sent */ SimpleLogProcessor::SimpleLogProcessor(std::unique_ptr &&exporter) - : exporter_(std::move(exporter)) + : exporter_(std::move(exporter)), is_shutdown_(false) {} std::unique_ptr SimpleLogProcessor::MakeRecordable() noexcept @@ -51,13 +51,19 @@ bool SimpleLogProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept bool SimpleLogProcessor::Shutdown(std::chrono::microseconds timeout) noexcept { // Should only shutdown exporter ONCE. - if (!shutdown_latch_.test_and_set(std::memory_order_acquire) && exporter_ != nullptr) + if (!is_shutdown_.exchange(true, std::memory_order_acq_rel) && exporter_ != nullptr) { return exporter_->Shutdown(timeout); } return true; } + +bool SimpleLogProcessor::IsShutdown() const noexcept +{ + return is_shutdown_.load(std::memory_order_acquire); +} + } // namespace logs } // namespace sdk OPENTELEMETRY_END_NAMESPACE diff --git a/sdk/src/metrics/state/temporal_metric_storage.cc b/sdk/src/metrics/state/temporal_metric_storage.cc index 3eea56917b..eb29286d8d 100644 --- a/sdk/src/metrics/state/temporal_metric_storage.cc +++ b/sdk/src/metrics/state/temporal_metric_storage.cc @@ -34,9 +34,12 @@ bool TemporalMetricStorage::buildMetrics(CollectorHandle *collector, opentelemetry::common::SystemTimestamp last_collection_ts = sdk_start_ts; AggregationTemporality aggregation_temporarily = collector->GetAggregationTemporality(instrument_descriptor_.type_); - for (auto &col : collectors) + if (delta_metrics->Size()) { - unreported_metrics_[col.get()].push_back(delta_metrics); + for (auto &col : collectors) + { + unreported_metrics_[col.get()].push_back(delta_metrics); + } } // Get the unreported metrics for the `collector` from `unreported metrics stash` @@ -88,20 +91,20 @@ bool TemporalMetricStorage::buildMetrics(CollectorHandle *collector, if (aggregation_temporarily == AggregationTemporality::kCumulative) { // merge current delta to previous cumulative - last_aggr_hashmap->GetAllEnteries([&merged_metrics, this](const MetricAttributes &attributes, - Aggregation &aggregation) { - auto agg = merged_metrics->Get(attributes); - if (agg) - { - merged_metrics->Set(attributes, agg->Merge(aggregation)); - } - else - { - merged_metrics->Set( - attributes, DefaultAggregation::CreateAggregation(instrument_descriptor_, nullptr)); - } - return true; - }); + last_aggr_hashmap->GetAllEnteries( + [&merged_metrics, this](const MetricAttributes &attributes, Aggregation &aggregation) { + auto agg = merged_metrics->Get(attributes); + if (agg) + { + merged_metrics->Set(attributes, agg->Merge(aggregation)); + } + else + { + auto def_agg = DefaultAggregation::CreateAggregation(instrument_descriptor_, nullptr); + merged_metrics->Set(attributes, def_agg->Merge(aggregation)); + } + return true; + }); } last_reported_metrics_[collector] = LastReportedMetrics{std::move(merged_metrics), collection_ts}; @@ -137,4 +140,4 @@ bool TemporalMetricStorage::buildMetrics(CollectorHandle *collector, } // namespace sdk OPENTELEMETRY_END_NAMESPACE -#endif \ No newline at end of file +#endif diff --git a/sdk/test/logs/logger_provider_sdk_test.cc b/sdk/test/logs/logger_provider_sdk_test.cc index 36bbb4a0ec..3f02ace7e2 100644 --- a/sdk/test/logs/logger_provider_sdk_test.cc +++ b/sdk/test/logs/logger_provider_sdk_test.cc @@ -108,12 +108,14 @@ TEST(LoggerProviderSDK, GetResource) TEST(LoggerProviderSDK, Shutdown) { std::unique_ptr processor(new SimpleLogProcessor(nullptr)); + SimpleLogProcessor *processor_ptr = processor.get(); std::vector> processors; processors.push_back(std::move(processor)); LoggerProvider lp(std::make_shared(std::move(processors))); EXPECT_TRUE(lp.Shutdown()); + EXPECT_TRUE(processor_ptr->IsShutdown()); // It's safe to shutdown again EXPECT_TRUE(lp.Shutdown()); diff --git a/sdk/test/metrics/sync_metric_storage_test.cc b/sdk/test/metrics/sync_metric_storage_test.cc index 6c167c7a29..6bfd09d181 100644 --- a/sdk/test/metrics/sync_metric_storage_test.cc +++ b/sdk/test/metrics/sync_metric_storage_test.cc @@ -96,7 +96,7 @@ TEST_P(WritableMetricStorageTestFixture, LongSumAggregation) } return true; }); - + EXPECT_EQ(count_attributes, 2); // GET and PUT // In case of delta temporarily, subsequent collection would contain new data points, so resetting // the counts if (temporality == AggregationTemporality::kDelta) @@ -105,6 +105,34 @@ TEST_P(WritableMetricStorageTestFixture, LongSumAggregation) expected_total_put_requests = 0; } + // collect one more time. + collection_ts = std::chrono::system_clock::now(); + count_attributes = 0; + storage.Collect( + collector.get(), collectors, sdk_start_ts, collection_ts, [&](const MetricData data) { + for (auto data_attr : data.point_data_attr_) + { + auto data = opentelemetry::nostd::get(data_attr.point_data); + if (opentelemetry::nostd::get( + data_attr.attributes.find("RequestType")->second) == "GET") + { + count_attributes++; + EXPECT_EQ(opentelemetry::nostd::get(data.value_), expected_total_get_requests); + } + else if (opentelemetry::nostd::get( + data_attr.attributes.find("RequestType")->second) == "PUT") + { + count_attributes++; + EXPECT_EQ(opentelemetry::nostd::get(data.value_), expected_total_put_requests); + } + } + return true; + }); + if (temporality == AggregationTemporality::kCumulative) + { + EXPECT_EQ(count_attributes, 2); // GET AND PUT + } + storage.RecordLong(50l, KeyValueIterableView>(attributes_get), opentelemetry::context::Context{}); expected_total_get_requests += 50; @@ -134,7 +162,9 @@ TEST_P(WritableMetricStorageTestFixture, LongSumAggregation) } return true; }); + EXPECT_EQ(count_attributes, 2); // GET and PUT } + INSTANTIATE_TEST_SUITE_P(WritableMetricStorageTestLong, WritableMetricStorageTestFixture, ::testing::Values(AggregationTemporality::kCumulative, @@ -205,6 +235,7 @@ TEST_P(WritableMetricStorageTestFixture, DoubleSumAggregation) } return true; }); + EXPECT_EQ(count_attributes, 2); // GET and PUT // In case of delta temporarily, subsequent collection would contain new data points, so resetting // the counts @@ -214,6 +245,34 @@ TEST_P(WritableMetricStorageTestFixture, DoubleSumAggregation) expected_total_put_requests = 0; } + // collect one more time. + collection_ts = std::chrono::system_clock::now(); + count_attributes = 0; + storage.Collect( + collector.get(), collectors, sdk_start_ts, collection_ts, [&](const MetricData data) { + for (auto data_attr : data.point_data_attr_) + { + auto data = opentelemetry::nostd::get(data_attr.point_data); + if (opentelemetry::nostd::get( + data_attr.attributes.find("RequestType")->second) == "GET") + { + count_attributes++; + EXPECT_EQ(opentelemetry::nostd::get(data.value_), expected_total_get_requests); + } + else if (opentelemetry::nostd::get( + data_attr.attributes.find("RequestType")->second) == "PUT") + { + count_attributes++; + EXPECT_EQ(opentelemetry::nostd::get(data.value_), expected_total_put_requests); + } + } + return true; + }); + if (temporality == AggregationTemporality::kCumulative) + { + EXPECT_EQ(count_attributes, 2); // GET AND PUT + } + storage.RecordDouble(50.0, KeyValueIterableView>(attributes_get), opentelemetry::context::Context{}); @@ -245,6 +304,7 @@ TEST_P(WritableMetricStorageTestFixture, DoubleSumAggregation) } return true; }); + EXPECT_EQ(count_attributes, 2); // GET and PUT } INSTANTIATE_TEST_SUITE_P(WritableMetricStorageTestDouble, WritableMetricStorageTestFixture,