Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add MeterContext::ForEachMeter() method to process callbacks on Meter in thread-safe manner #1783

Merged
merged 2 commits into from
Nov 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions sdk/include/opentelemetry/sdk/metrics/meter_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,15 @@ class MeterContext : public std::enable_shared_from_this<MeterContext>
ViewRegistry *GetViewRegistry() const noexcept;

/**
* Obtain the configured meters.
* NOTE - INTERNAL method, can change in future.
* Process callback for each meter in thread-safe manner
*/
bool ForEachMeter(nostd::function_ref<bool(std::shared_ptr<Meter> &meter)> callback) noexcept;

/**
* NOTE - INTERNAL method, can change in future.
* Get the configured meters.
* This method is NOT thread safe, and only called through MeterProvider
*
*/
nostd::span<std::shared_ptr<Meter>> GetMeters() noexcept;
Expand Down Expand Up @@ -96,8 +104,8 @@ class MeterContext : public std::enable_shared_from_this<MeterContext>
std::unique_ptr<View> view) noexcept;

/**
* Adds a meter to the list of configured meters.
* Note: This method is INTERNAL to sdk not thread safe.
* NOTE - INTERNAL method, can change in future.
* Adds a meter to the list of configured meters in thread safe manner.
*
* @param meter
*/
Expand All @@ -124,7 +132,7 @@ class MeterContext : public std::enable_shared_from_this<MeterContext>

std::atomic_flag shutdown_latch_ = ATOMIC_FLAG_INIT;
opentelemetry::common::SpinLockMutex forceflush_lock_;
opentelemetry::common::SpinLockMutex storage_lock_;
opentelemetry::common::SpinLockMutex meter_lock_;
};

} // namespace metrics
Expand Down
18 changes: 16 additions & 2 deletions sdk/src/metrics/meter_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,23 @@ ViewRegistry *MeterContext::GetViewRegistry() const noexcept
return views_.get();
}

bool MeterContext::ForEachMeter(
nostd::function_ref<bool(std::shared_ptr<Meter> &meter)> callback) noexcept
{
std::lock_guard<opentelemetry::common::SpinLockMutex> guard(meter_lock_);
for (auto &meter : meters_)
{
if (!callback(meter))
{
return false;
}
}
return true;
}

nostd::span<std::shared_ptr<Meter>> MeterContext::GetMeters() noexcept
{
std::lock_guard<opentelemetry::common::SpinLockMutex> guard(storage_lock_);
// no lock required, as this is called by MeterProvider in thread-safe manner.
return nostd::span<std::shared_ptr<Meter>>{meters_};
}

Expand Down Expand Up @@ -59,7 +73,7 @@ void MeterContext::AddView(std::unique_ptr<InstrumentSelector> instrument_select

void MeterContext::AddMeter(std::shared_ptr<Meter> meter)
{
std::lock_guard<opentelemetry::common::SpinLockMutex> guard(storage_lock_);
std::lock_guard<opentelemetry::common::SpinLockMutex> guard(meter_lock_);
meters_.push_back(meter);
}

Expand Down
6 changes: 3 additions & 3 deletions sdk/src/metrics/state/metric_collector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ bool MetricCollector::Collect(
return false;
}
ResourceMetrics resource_metrics;
for (auto &meter : meter_context_->GetMeters())
{
meter_context_->ForEachMeter([&](std::shared_ptr<Meter> meter) noexcept {
auto collection_ts = std::chrono::system_clock::now();
ScopeMetrics scope_metrics;
scope_metrics.metric_data_ = meter->Collect(this, collection_ts);
scope_metrics.scope_ = meter->GetInstrumentationScope();
resource_metrics.scope_metric_data_.push_back(scope_metrics);
}
return true;
});
resource_metrics.resource_ = &meter_context_->GetResource();
callback(resource_metrics);
return true;
Expand Down