Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #1663 Threading issue between Meter::RegisterSyncMetricStorage and Meter::Collect #1666

Merged
merged 11 commits into from
Oct 11, 2022
1 change: 1 addition & 0 deletions sdk/include/opentelemetry/sdk/metrics/meter.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ class Meter final : public opentelemetry::metrics::Meter
InstrumentDescriptor &instrument_descriptor);
std::unique_ptr<AsyncWritableMetricStorage> RegisterAsyncMetricStorage(
InstrumentDescriptor &instrument_descriptor);
opentelemetry::common::SpinLockMutex storage_lock_;
};
} // namespace metrics
} // namespace sdk
Expand Down
3 changes: 3 additions & 0 deletions sdk/src/metrics/meter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ const sdk::instrumentationscope::InstrumentationScope *Meter::GetInstrumentation
std::unique_ptr<SyncWritableMetricStorage> Meter::RegisterSyncMetricStorage(
InstrumentDescriptor &instrument_descriptor)
{
std::lock_guard<opentelemetry::common::SpinLockMutex> guard(storage_lock_);
auto ctx = meter_context_.lock();
if (!ctx)
{
Expand Down Expand Up @@ -251,6 +252,7 @@ std::unique_ptr<SyncWritableMetricStorage> Meter::RegisterSyncMetricStorage(
std::unique_ptr<AsyncWritableMetricStorage> Meter::RegisterAsyncMetricStorage(
InstrumentDescriptor &instrument_descriptor)
{
std::lock_guard<opentelemetry::common::SpinLockMutex> guard(storage_lock_);
auto ctx = meter_context_.lock();
if (!ctx)
{
Expand Down Expand Up @@ -293,6 +295,7 @@ std::unique_ptr<AsyncWritableMetricStorage> Meter::RegisterAsyncMetricStorage(
std::vector<MetricData> Meter::Collect(CollectorHandle *collector,
opentelemetry::common::SystemTimestamp collect_ts) noexcept
{
std::lock_guard<opentelemetry::common::SpinLockMutex> guard(storage_lock_);
ThomsonTan marked this conversation as resolved.
Show resolved Hide resolved
observable_registry_->Observe(collect_ts);
std::vector<MetricData> metric_data_list;
auto ctx = meter_context_.lock();
Expand Down
54 changes: 52 additions & 2 deletions sdk/test/metrics/meter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@ class MockMetricReader : public MetricReader

namespace
{
nostd::shared_ptr<metrics::Meter> InitMeter(MetricReader **metricReaderPtr)
nostd::shared_ptr<metrics::Meter> InitMeter(MetricReader **metricReaderPtr,
std::string meter_name = "meter_name")
{
static std::shared_ptr<metrics::MeterProvider> provider(new MeterProvider());
std::unique_ptr<MetricReader> metric_reader(new MockMetricReader());
*metricReaderPtr = metric_reader.get();
auto p = std::static_pointer_cast<MeterProvider>(provider);
p->AddMetricReader(std::move(metric_reader));
auto meter = provider->GetMeter("meter_name");
auto meter = provider->GetMeter(meter_name);
return meter;
}
} // namespace
Expand Down Expand Up @@ -70,6 +71,55 @@ TEST(MeterTest, BasicAsyncTests)
}
return true;
});
observable_counter->RemoveCallback(asyc_generate_measurements, nullptr);
}

constexpr static unsigned MAX_THREADS = 25;
constexpr static unsigned MAX_ITERATIONS_MT = 1000;

TEST(MeterTest, StressMultiThread)
{
MetricReader *metric_reader_ptr = nullptr;
auto meter = InitMeter(&metric_reader_ptr, "stress_test_meter");
std::atomic<unsigned> threadCount(0);
size_t numIterations = MAX_ITERATIONS_MT;
std::atomic<bool> do_collect{false}, do_sync_create{true}, do_async_create{false};
std::vector<nostd::shared_ptr<opentelemetry::metrics::ObservableInstrument>>
observable_instruments;
while (numIterations--)
{
for (size_t i = 0; i < MAX_THREADS; i++)
{
if (threadCount++ < MAX_THREADS)
{
auto t = std::thread([&]() {
std::this_thread::yield();
esigo marked this conversation as resolved.
Show resolved Hide resolved
if (do_sync_create.exchange(false))
{
std::string instrument_name = "test_couter_" + std::to_string(numIterations);
meter->CreateLongCounter(instrument_name, "", "");
do_async_create.store(true);
}
if (do_async_create.exchange(false))
{
auto observable_instrument =
meter->CreateLongObservableGauge("test_gauge" + std::to_string(numIterations));
esigo marked this conversation as resolved.
Show resolved Hide resolved
observable_instrument->AddCallback(asyc_generate_measurements, nullptr);
observable_instruments.push_back(std::move(observable_instrument));
do_collect.store(true);
}
if (do_collect.exchange(false))
{
metric_reader_ptr->Collect([](ResourceMetrics &metric_data) { return true; });
do_sync_create.store(true);
}
});
t.detach();
esigo marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
// random wait for all callbacks to complete
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}

#endif