Skip to content

Commit

Permalink
Merge branch 'main' into fix_1600
Browse files Browse the repository at this point in the history
  • Loading branch information
lalitb authored Sep 9, 2022
2 parents fd8c9c0 + b8504d9 commit e9da800
Show file tree
Hide file tree
Showing 9 changed files with 108 additions and 35 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ on:
push:
branches: [ main ]
pull_request:
branches: [ main, async-changes ]
branches: [ main ]

jobs:
cmake_test:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/dependencies_image.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
steps:
-
name: checkout
uses: actions/checkout@v2
uses: actions/checkout@v3
-
name: Set up QEMU
uses: docker/setup-qemu-action@v2
Expand Down
20 changes: 10 additions & 10 deletions exporters/otlp/src/otlp_http_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -847,20 +847,20 @@ void OtlpHttpClient::ReleaseSession(
{
bool has_session = false;

{
std::lock_guard<std::recursive_mutex> guard{session_manager_lock_};
std::lock_guard<std::recursive_mutex> guard{session_manager_lock_};

auto session_iter = running_sessions_.find(&session);
if (session_iter != running_sessions_.end())
{
// Move session and handle into gc list, and they will be destroyed later
gc_sessions_.emplace_back(std::move(session_iter->second));
running_sessions_.erase(session_iter);
auto session_iter = running_sessions_.find(&session);
if (session_iter != running_sessions_.end())
{
// Move session and handle into gc list, and they will be destroyed later
gc_sessions_.emplace_back(std::move(session_iter->second));
running_sessions_.erase(session_iter);

has_session = true;
}
has_session = true;
}

// Call session_waker_.notify_all() with session_manager_lock_ locked to keep session_waker_
// available when destroying OtlpHttpClient
if (has_session)
{
session_waker_.notify_all();
Expand Down
6 changes: 4 additions & 2 deletions sdk/include/opentelemetry/sdk/logs/simple_log_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@ class SimpleLogProcessor : public LogProcessor
bool Shutdown(
std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override;

bool IsShutdown() const noexcept;

private:
// The configured exporter
std::unique_ptr<LogExporter> exporter_;
// The lock used to ensure the exporter is not called concurrently
opentelemetry::common::SpinLockMutex lock_;
// The atomic boolean flag to ensure the ShutDown() function is only called once
std::atomic_flag shutdown_latch_ = ATOMIC_FLAG_INIT;
// The atomic boolean to ensure the ShutDown() function is only called once
std::atomic<bool> is_shutdown_;
};
} // namespace logs
} // namespace sdk
Expand Down
2 changes: 1 addition & 1 deletion sdk/src/logs/logger_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ bool LoggerContext::ForceFlush(std::chrono::microseconds timeout) noexcept

bool LoggerContext::Shutdown(std::chrono::microseconds timeout) noexcept
{
return processor_->ForceFlush(timeout);
return processor_->Shutdown(timeout);
}

} // namespace logs
Expand Down
10 changes: 8 additions & 2 deletions sdk/src/logs/simple_log_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ namespace logs
* @param exporter the configured exporter where log records are sent
*/
SimpleLogProcessor::SimpleLogProcessor(std::unique_ptr<LogExporter> &&exporter)
: exporter_(std::move(exporter))
: exporter_(std::move(exporter)), is_shutdown_(false)
{}

std::unique_ptr<Recordable> SimpleLogProcessor::MakeRecordable() noexcept
Expand Down Expand Up @@ -51,13 +51,19 @@ bool SimpleLogProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept
bool SimpleLogProcessor::Shutdown(std::chrono::microseconds timeout) noexcept
{
// Should only shutdown exporter ONCE.
if (!shutdown_latch_.test_and_set(std::memory_order_acquire) && exporter_ != nullptr)
if (!is_shutdown_.exchange(true, std::memory_order_acq_rel) && exporter_ != nullptr)
{
return exporter_->Shutdown(timeout);
}

return true;
}

bool SimpleLogProcessor::IsShutdown() const noexcept
{
return is_shutdown_.load(std::memory_order_acquire);
}

} // namespace logs
} // namespace sdk
OPENTELEMETRY_END_NAMESPACE
Expand Down
37 changes: 20 additions & 17 deletions sdk/src/metrics/state/temporal_metric_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,12 @@ bool TemporalMetricStorage::buildMetrics(CollectorHandle *collector,
opentelemetry::common::SystemTimestamp last_collection_ts = sdk_start_ts;
AggregationTemporality aggregation_temporarily =
collector->GetAggregationTemporality(instrument_descriptor_.type_);
for (auto &col : collectors)
if (delta_metrics->Size())
{
unreported_metrics_[col.get()].push_back(delta_metrics);
for (auto &col : collectors)
{
unreported_metrics_[col.get()].push_back(delta_metrics);
}
}

// Get the unreported metrics for the `collector` from `unreported metrics stash`
Expand Down Expand Up @@ -88,20 +91,20 @@ bool TemporalMetricStorage::buildMetrics(CollectorHandle *collector,
if (aggregation_temporarily == AggregationTemporality::kCumulative)
{
// merge current delta to previous cumulative
last_aggr_hashmap->GetAllEnteries([&merged_metrics, this](const MetricAttributes &attributes,
Aggregation &aggregation) {
auto agg = merged_metrics->Get(attributes);
if (agg)
{
merged_metrics->Set(attributes, agg->Merge(aggregation));
}
else
{
merged_metrics->Set(
attributes, DefaultAggregation::CreateAggregation(instrument_descriptor_, nullptr));
}
return true;
});
last_aggr_hashmap->GetAllEnteries(
[&merged_metrics, this](const MetricAttributes &attributes, Aggregation &aggregation) {
auto agg = merged_metrics->Get(attributes);
if (agg)
{
merged_metrics->Set(attributes, agg->Merge(aggregation));
}
else
{
auto def_agg = DefaultAggregation::CreateAggregation(instrument_descriptor_, nullptr);
merged_metrics->Set(attributes, def_agg->Merge(aggregation));
}
return true;
});
}
last_reported_metrics_[collector] =
LastReportedMetrics{std::move(merged_metrics), collection_ts};
Expand Down Expand Up @@ -137,4 +140,4 @@ bool TemporalMetricStorage::buildMetrics(CollectorHandle *collector,

} // namespace sdk
OPENTELEMETRY_END_NAMESPACE
#endif
#endif
2 changes: 2 additions & 0 deletions sdk/test/logs/logger_provider_sdk_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,14 @@ TEST(LoggerProviderSDK, GetResource)
TEST(LoggerProviderSDK, Shutdown)
{
std::unique_ptr<SimpleLogProcessor> processor(new SimpleLogProcessor(nullptr));
SimpleLogProcessor *processor_ptr = processor.get();
std::vector<std::unique_ptr<LogProcessor>> processors;
processors.push_back(std::move(processor));

LoggerProvider lp(std::make_shared<LoggerContext>(std::move(processors)));

EXPECT_TRUE(lp.Shutdown());
EXPECT_TRUE(processor_ptr->IsShutdown());

// It's safe to shutdown again
EXPECT_TRUE(lp.Shutdown());
Expand Down
62 changes: 61 additions & 1 deletion sdk/test/metrics/sync_metric_storage_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ TEST_P(WritableMetricStorageTestFixture, LongSumAggregation)
}
return true;
});

EXPECT_EQ(count_attributes, 2); // GET and PUT
// In case of delta temporarily, subsequent collection would contain new data points, so resetting
// the counts
if (temporality == AggregationTemporality::kDelta)
Expand All @@ -105,6 +105,34 @@ TEST_P(WritableMetricStorageTestFixture, LongSumAggregation)
expected_total_put_requests = 0;
}

// collect one more time.
collection_ts = std::chrono::system_clock::now();
count_attributes = 0;
storage.Collect(
collector.get(), collectors, sdk_start_ts, collection_ts, [&](const MetricData data) {
for (auto data_attr : data.point_data_attr_)
{
auto data = opentelemetry::nostd::get<SumPointData>(data_attr.point_data);
if (opentelemetry::nostd::get<std::string>(
data_attr.attributes.find("RequestType")->second) == "GET")
{
count_attributes++;
EXPECT_EQ(opentelemetry::nostd::get<long>(data.value_), expected_total_get_requests);
}
else if (opentelemetry::nostd::get<std::string>(
data_attr.attributes.find("RequestType")->second) == "PUT")
{
count_attributes++;
EXPECT_EQ(opentelemetry::nostd::get<long>(data.value_), expected_total_put_requests);
}
}
return true;
});
if (temporality == AggregationTemporality::kCumulative)
{
EXPECT_EQ(count_attributes, 2); // GET AND PUT
}

storage.RecordLong(50l, KeyValueIterableView<std::map<std::string, std::string>>(attributes_get),
opentelemetry::context::Context{});
expected_total_get_requests += 50;
Expand Down Expand Up @@ -134,7 +162,9 @@ TEST_P(WritableMetricStorageTestFixture, LongSumAggregation)
}
return true;
});
EXPECT_EQ(count_attributes, 2); // GET and PUT
}

INSTANTIATE_TEST_SUITE_P(WritableMetricStorageTestLong,
WritableMetricStorageTestFixture,
::testing::Values(AggregationTemporality::kCumulative,
Expand Down Expand Up @@ -205,6 +235,7 @@ TEST_P(WritableMetricStorageTestFixture, DoubleSumAggregation)
}
return true;
});
EXPECT_EQ(count_attributes, 2); // GET and PUT

// In case of delta temporarily, subsequent collection would contain new data points, so resetting
// the counts
Expand All @@ -214,6 +245,34 @@ TEST_P(WritableMetricStorageTestFixture, DoubleSumAggregation)
expected_total_put_requests = 0;
}

// collect one more time.
collection_ts = std::chrono::system_clock::now();
count_attributes = 0;
storage.Collect(
collector.get(), collectors, sdk_start_ts, collection_ts, [&](const MetricData data) {
for (auto data_attr : data.point_data_attr_)
{
auto data = opentelemetry::nostd::get<SumPointData>(data_attr.point_data);
if (opentelemetry::nostd::get<std::string>(
data_attr.attributes.find("RequestType")->second) == "GET")
{
count_attributes++;
EXPECT_EQ(opentelemetry::nostd::get<double>(data.value_), expected_total_get_requests);
}
else if (opentelemetry::nostd::get<std::string>(
data_attr.attributes.find("RequestType")->second) == "PUT")
{
count_attributes++;
EXPECT_EQ(opentelemetry::nostd::get<double>(data.value_), expected_total_put_requests);
}
}
return true;
});
if (temporality == AggregationTemporality::kCumulative)
{
EXPECT_EQ(count_attributes, 2); // GET AND PUT
}

storage.RecordDouble(50.0,
KeyValueIterableView<std::map<std::string, std::string>>(attributes_get),
opentelemetry::context::Context{});
Expand Down Expand Up @@ -245,6 +304,7 @@ TEST_P(WritableMetricStorageTestFixture, DoubleSumAggregation)
}
return true;
});
EXPECT_EQ(count_attributes, 2); // GET and PUT
}
INSTANTIATE_TEST_SUITE_P(WritableMetricStorageTestDouble,
WritableMetricStorageTestFixture,
Expand Down

0 comments on commit e9da800

Please sign in to comment.