From 31d888a540c00e03fd430c724a15420ef3281e78 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Thu, 17 Mar 2022 18:15:35 -0700 Subject: [PATCH] Enable metric collection from MetricReader (#1241) --- .../exporters/ostream/metric_exporter.h | 1 + .../sdk/metrics/async_instruments.h | 55 ++---------- .../sdk/metrics/export/metric_producer.h | 36 ++++++++ .../opentelemetry/sdk/metrics/instruments.h | 7 -- sdk/include/opentelemetry/sdk/metrics/meter.h | 11 ++- .../opentelemetry/sdk/metrics/meter_context.h | 64 +++++++++---- .../sdk/metrics/meter_provider.h | 18 ++-- .../sdk/metrics/metric_exporter.h | 8 +- .../opentelemetry/sdk/metrics/metric_reader.h | 63 ++++++------- .../sdk/metrics/state/async_metric_storage.h | 11 ++- .../sdk/metrics/state/metric_collector.h | 57 ++++++++++++ .../sdk/metrics/state/metric_storage.h | 26 +++--- .../sdk/metrics/state/sync_metric_storage.h | 15 ++-- .../sdk/metrics/sync_instruments.h | 5 +- sdk/src/metrics/CMakeLists.txt | 2 + sdk/src/metrics/meter.cc | 47 ++++++---- sdk/src/metrics/meter_context.cc | 84 +++++++++++------ sdk/src/metrics/meter_provider.cc | 24 +++-- sdk/src/metrics/metric_reader.cc | 89 +++++++++++++++++++ sdk/src/metrics/state/metric_collector.cc | 55 ++++++++++++ sdk/src/metrics/sync_instruments.cc | 1 + sdk/test/metrics/BUILD | 16 ++++ sdk/test/metrics/CMakeLists.txt | 3 +- sdk/test/metrics/async_instruments_test.cc | 21 ++--- sdk/test/metrics/async_metric_storage_test.cc | 38 +++++--- sdk/test/metrics/meter_provider_sdk_test.cc | 12 ++- sdk/test/metrics/metric_reader_test.cc | 42 +++++++++ 27 files changed, 571 insertions(+), 240 deletions(-) create mode 100644 sdk/include/opentelemetry/sdk/metrics/export/metric_producer.h create mode 100644 sdk/include/opentelemetry/sdk/metrics/state/metric_collector.h create mode 100644 sdk/src/metrics/metric_reader.cc create mode 100644 sdk/src/metrics/state/metric_collector.cc create mode 100644 sdk/test/metrics/metric_reader_test.cc diff --git a/exporters/ostream/include/opentelemetry/exporters/ostream/metric_exporter.h b/exporters/ostream/include/opentelemetry/exporters/ostream/metric_exporter.h index 5658e3bff7..5f27db13d2 100644 --- a/exporters/ostream/include/opentelemetry/exporters/ostream/metric_exporter.h +++ b/exporters/ostream/include/opentelemetry/exporters/ostream/metric_exporter.h @@ -8,6 +8,7 @@ # include # include "opentelemetry/common/spin_lock_mutex.h" # include "opentelemetry/nostd/span.h" +# include "opentelemetry/sdk/metrics/data/metric_data.h" # include "opentelemetry/sdk/metrics/instruments.h" # include "opentelemetry/sdk/metrics/metric_exporter.h" # include "opentelemetry/sdk/metrics/recordable.h" diff --git a/sdk/include/opentelemetry/sdk/metrics/async_instruments.h b/sdk/include/opentelemetry/sdk/metrics/async_instruments.h index d66bb69890..f38ad6a3b4 100644 --- a/sdk/include/opentelemetry/sdk/metrics/async_instruments.h +++ b/sdk/include/opentelemetry/sdk/metrics/async_instruments.h @@ -5,10 +5,8 @@ #ifndef ENABLE_METRICS_PREVIEW # include "opentelemetry/metrics/async_instruments.h" # include "opentelemetry/metrics/observer_result.h" -# include "opentelemetry/sdk/instrumentationlibrary/instrumentation_library.h" -# include "opentelemetry/sdk/metrics/measurement_processor.h" - # include "opentelemetry/nostd/string_view.h" +# include "opentelemetry/sdk/instrumentationlibrary/instrumentation_library.h" # include "opentelemetry/sdk/metrics/instruments.h" OPENTELEMETRY_BEGIN_NAMESPACE namespace sdk @@ -23,13 +21,11 @@ class Asynchronous Asynchronous(nostd::string_view name, const opentelemetry::sdk::instrumentationlibrary::InstrumentationLibrary *instrumentation_library, - MeasurementProcessor *measurement_processor, void (*callback)(opentelemetry::metrics::ObserverResult &), nostd::string_view description = "", nostd::string_view unit = "") : name_(name), instrumentation_library_{instrumentation_library}, - measurement_processor_{measurement_processor}, callback_(callback), description_(description), unit_(unit) @@ -39,7 +35,6 @@ class Asynchronous std::string name_; const opentelemetry::sdk::instrumentationlibrary::InstrumentationLibrary *instrumentation_library_; - const MeasurementProcessor *measurement_processor_; void (*callback_)(opentelemetry::metrics::ObserverResult &); std::string description_; std::string unit_; @@ -52,16 +47,10 @@ class LongObservableCounter : public opentelemetry::metrics::ObservableCounter &), nostd::string_view description = "", nostd::string_view unit = "") - : Asynchronous(name, - instrumentation_library, - measurement_processor, - callback, - description, - unit) + : Asynchronous(name, instrumentation_library, callback, description, unit) {} }; @@ -73,16 +62,10 @@ class DoubleObservableCounter : public opentelemetry::metrics::ObservableCounter DoubleObservableCounter(nostd::string_view name, const opentelemetry::sdk::instrumentationlibrary::InstrumentationLibrary *instrumentation_library, - MeasurementProcessor *measurement_processor, void (*callback)(opentelemetry::metrics::ObserverResult &), nostd::string_view description = "", nostd::string_view unit = "") - : Asynchronous(name, - instrumentation_library, - measurement_processor, - callback, - description, - unit) + : Asynchronous(name, instrumentation_library, callback, description, unit) {} }; @@ -94,16 +77,10 @@ class LongObservableGauge : public opentelemetry::metrics::ObservableGauge LongObservableGauge(nostd::string_view name, const opentelemetry::sdk::instrumentationlibrary::InstrumentationLibrary *instrumentation_library, - MeasurementProcessor *measurement_processor, void (*callback)(opentelemetry::metrics::ObserverResult &), nostd::string_view description = "", nostd::string_view unit = "") - : Asynchronous(name, - instrumentation_library, - measurement_processor, - callback, - description, - unit) + : Asynchronous(name, instrumentation_library, callback, description, unit) {} }; @@ -115,16 +92,10 @@ class DoubleObservableGauge : public opentelemetry::metrics::ObservableGauge &), nostd::string_view description = "", nostd::string_view unit = "") - : Asynchronous(name, - instrumentation_library, - measurement_processor, - callback, - description, - unit) + : Asynchronous(name, instrumentation_library, callback, description, unit) {} }; @@ -137,16 +108,10 @@ class LongObservableUpDownCounter : public opentelemetry::metrics::ObservableUpD nostd::string_view name, const opentelemetry::sdk::instrumentationlibrary::InstrumentationLibrary *instrumentation_library, - MeasurementProcessor *measurement_processor, void (*callback)(opentelemetry::metrics::ObserverResult &), nostd::string_view description = "", nostd::string_view unit = "") - : Asynchronous(name, - instrumentation_library, - measurement_processor, - callback, - description, - unit) + : Asynchronous(name, instrumentation_library, callback, description, unit) {} }; @@ -160,16 +125,10 @@ class DoubleObservableUpDownCounter nostd::string_view name, const opentelemetry::sdk::instrumentationlibrary::InstrumentationLibrary *instrumentation_library, - MeasurementProcessor *measurement_processor, void (*callback)(opentelemetry::metrics::ObserverResult &), nostd::string_view description = "", nostd::string_view unit = "") - : Asynchronous(name, - instrumentation_library, - measurement_processor, - callback, - description, - unit) + : Asynchronous(name, instrumentation_library, callback, description, unit) {} }; diff --git a/sdk/include/opentelemetry/sdk/metrics/export/metric_producer.h b/sdk/include/opentelemetry/sdk/metrics/export/metric_producer.h new file mode 100644 index 0000000000..7c9ee57755 --- /dev/null +++ b/sdk/include/opentelemetry/sdk/metrics/export/metric_producer.h @@ -0,0 +1,36 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#pragma once +#ifndef ENABLE_METRICS_PREVIEW +# include "opentelemetry/sdk/metrics/data/metric_data.h" +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk +{ +namespace metrics +{ + +/** + * MetricProducer is the interface that is used to make metric data available to the + * OpenTelemetry exporters. Implementations should be stateful, in that each call to + * `Collect` will return any metric generated since the last call was made. + * + *

Implementations must be thread-safe. + */ + +class MetricProducer +{ +public: + /** + * The callback to be called for each metric exporter. This will only be those + * metrics that have been produced since the last time this method was called. + * + * @return a status of completion of method. + */ + virtual bool Collect(nostd::function_ref callback) noexcept = 0; +}; + +} // namespace metrics +} // namespace sdk +OPENTELEMETRY_END_NAMESPACE +#endif diff --git a/sdk/include/opentelemetry/sdk/metrics/instruments.h b/sdk/include/opentelemetry/sdk/metrics/instruments.h index ff8ed297fc..ad64ce718b 100644 --- a/sdk/include/opentelemetry/sdk/metrics/instruments.h +++ b/sdk/include/opentelemetry/sdk/metrics/instruments.h @@ -54,13 +54,6 @@ struct InstrumentDescriptor using MetricAttributes = opentelemetry::sdk::common::OrderedAttributeMap; -// TBD -> Remove once MetricCollector is imoplemeted -class MetricCollector -{ -public: - AggregationTemporarily aggregation_temporarily_; -}; - /*class InstrumentSelector { public: InstrumentSelector(opentelemetry::nostd::string_view name, diff --git a/sdk/include/opentelemetry/sdk/metrics/meter.h b/sdk/include/opentelemetry/sdk/metrics/meter.h index 24c60b454a..fc9fb36503 100644 --- a/sdk/include/opentelemetry/sdk/metrics/meter.h +++ b/sdk/include/opentelemetry/sdk/metrics/meter.h @@ -7,7 +7,6 @@ # include "opentelemetry/metrics/meter.h" # include "opentelemetry/sdk/instrumentationlibrary/instrumentation_library.h" # include "opentelemetry/sdk/metrics/instruments.h" -# include "opentelemetry/sdk/metrics/measurement_processor.h" # include "opentelemetry/sdk/metrics/meter_context.h" # include "opentelemetry/sdk/resource/resource.h" # include "opentelemetry/version.h" @@ -17,6 +16,10 @@ namespace sdk { namespace metrics { + +class MetricStorage; +class WritableMetricStorage; + class Meter final : public opentelemetry::metrics::Meter { public: @@ -99,8 +102,10 @@ class Meter final : public opentelemetry::metrics::Meter const sdk::instrumentationlibrary::InstrumentationLibrary *GetInstrumentationLibrary() const noexcept; - /** Returns the associated measurement processor */ - MeasurementProcessor *GetMeasurementProcessor() const noexcept; + /** collect metrics across all the meters **/ + bool collect(CollectorHandle *collector, + opentelemetry::common::SystemTimestamp collect_ts, + nostd::function_ref callback) noexcept; private: // order of declaration is important here - instrumentation library should destroy after diff --git a/sdk/include/opentelemetry/sdk/metrics/meter_context.h b/sdk/include/opentelemetry/sdk/metrics/meter_context.h index f0fdc6d946..8a31821353 100644 --- a/sdk/include/opentelemetry/sdk/metrics/meter_context.h +++ b/sdk/include/opentelemetry/sdk/metrics/meter_context.h @@ -3,27 +3,35 @@ #pragma once #ifndef ENABLE_METRICS_PREVIEW -# include -# include -# include -# include "opentelemetry/sdk/metrics/metric_exporter.h" -# include "opentelemetry/sdk/metrics/metric_reader.h" + +# include "opentelemetry/common/spin_lock_mutex.h" +# include "opentelemetry/sdk/metrics/state/metric_collector.h" # include "opentelemetry/sdk/metrics/view/instrument_selector.h" # include "opentelemetry/sdk/metrics/view/meter_selector.h" # include "opentelemetry/sdk/metrics/view/view_registry.h" # include "opentelemetry/sdk/resource/resource.h" # include "opentelemetry/version.h" +# include +# include +# include + OPENTELEMETRY_BEGIN_NAMESPACE namespace sdk { namespace metrics { + +// forward declaration +class Meter; +class MetricExporter; +class MetricReader; + /** * A class which stores the MeterProvider context. */ -class MeterContext +class MeterContext : public std::enable_shared_from_this { public: /** @@ -35,7 +43,6 @@ class MeterContext */ MeterContext( std::vector> &&exporters, - std::vector> &&readers, std::unique_ptr views = std::unique_ptr(new ViewRegistry()), opentelemetry::sdk::resource::Resource resource = opentelemetry::sdk::resource::Resource::Create({})) noexcept; @@ -47,16 +54,28 @@ class MeterContext const opentelemetry::sdk::resource::Resource &GetResource() const noexcept; /** - * Obtain the reference of measurement_processor. + * Obtain the View Registry configured + * @return The reference to view registry + */ + ViewRegistry *GetViewRegistry() const noexcept; + + /** + * Obtain the configured meters. * */ - MeasurementProcessor *GetMeasurementProcessor() const noexcept; + nostd::span> GetMeters() noexcept; /** - * Obtain the View Registry configured - * @return The reference to view registry + * Obtain the configured collectors. + * */ - ViewRegistry *GetViewRegistry() const noexcept; + nostd::span> GetCollectors() noexcept; + + /** + * GET SDK Start time + * + */ + opentelemetry::common::SystemTimestamp GetSDKStartTime() noexcept; /** * Attaches a metric exporter to list of configured exporters for this Meter context. @@ -91,23 +110,36 @@ class MeterContext std::unique_ptr view) noexcept; /** - * Force all active Exporters and Readers to flush any buffered meter data + * Adds a meter to the list of configured meters. + * + * Note: This method is INTERNAL to sdk not thread safe. + * + * @param meter + */ + void AddMeter(std::shared_ptr meter); + + /** + * Force all active Exporters and Collectors to flush any buffered meter data * within the given timeout. */ bool ForceFlush(std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept; /** - * Shutdown the Exporters and Readers associated with this meter provider. + * Shutdown the Exporters and Collectors associated with this meter provider. */ bool Shutdown() noexcept; private: opentelemetry::sdk::resource::Resource resource_; std::vector> exporters_; - std::vector> readers_; + std::vector> collectors_; std::unique_ptr views_; - std::unique_ptr measurement_processor_; + opentelemetry::common::SystemTimestamp sdk_start_ts_; + std::vector> meters_; + + std::atomic_flag shutdown_latch_ = ATOMIC_FLAG_INIT; + opentelemetry::common::SpinLockMutex forceflush_lock_; }; } // namespace metrics diff --git a/sdk/include/opentelemetry/sdk/metrics/meter_provider.h b/sdk/include/opentelemetry/sdk/metrics/meter_provider.h index 7d7966147f..c6efba6222 100644 --- a/sdk/include/opentelemetry/sdk/metrics/meter_provider.h +++ b/sdk/include/opentelemetry/sdk/metrics/meter_provider.h @@ -6,9 +6,9 @@ # include # include # include +# include "opentelemetry/metrics/meter.h" # include "opentelemetry/metrics/meter_provider.h" # include "opentelemetry/nostd/shared_ptr.h" -# include "opentelemetry/sdk/metrics/measurement_processor.h" # include "opentelemetry/sdk/metrics/meter.h" # include "opentelemetry/sdk/metrics/meter_context.h" # include "opentelemetry/sdk/resource/resource.h" @@ -19,19 +19,23 @@ namespace sdk { namespace metrics { + +// forward declaration +class MetricCollector; +class MetricExporter; +class MetricReader; + class MeterProvider final : public opentelemetry::metrics::MeterProvider { public: /** * Initialize a new meter provider * @param exporters The span exporters for this meter provider - * @param readers The readers for this meter provider. * @param views The views for this meter provider * @param resource The resources for this meter provider. */ MeterProvider( std::vector> &&exporters, - std::vector> &&readers, std::unique_ptr views = std::unique_ptr(new ViewRegistry()), sdk::resource::Resource resource = sdk::resource::Resource::Create({})) noexcept; @@ -52,12 +56,6 @@ class MeterProvider final : public opentelemetry::metrics::MeterProvider */ const sdk::resource::Resource &GetResource() const noexcept; - /** - * Obtain the reference of measurement processor. - * - */ - MeasurementProcessor *GetMeasurementProcessor() const noexcept; - /** * Attaches a metric exporter to list of configured exporters for this Meter provider. * @param exporter The metric exporter for this meter provider. This @@ -101,8 +99,6 @@ class MeterProvider final : public opentelemetry::metrics::MeterProvider bool ForceFlush(std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept; private: - // // order of declaration is important here - meter should destroy only after resource. - std::vector> meters_; std::shared_ptr context_; std::mutex lock_; }; diff --git a/sdk/include/opentelemetry/sdk/metrics/metric_exporter.h b/sdk/include/opentelemetry/sdk/metrics/metric_exporter.h index 039d3819ec..96898c9ad1 100644 --- a/sdk/include/opentelemetry/sdk/metrics/metric_exporter.h +++ b/sdk/include/opentelemetry/sdk/metrics/metric_exporter.h @@ -3,19 +3,21 @@ #pragma once #ifndef ENABLE_METRICS_PREVIEW -# include -# include + # include "opentelemetry/nostd/span.h" # include "opentelemetry/sdk/common/exporter_utils.h" -# include "opentelemetry/sdk/metrics/data/metric_data.h" # include "opentelemetry/version.h" +# include +# include + OPENTELEMETRY_BEGIN_NAMESPACE namespace sdk { namespace metrics { +class MetricData; /** * MetricExporter defines the interface to be used by metrics libraries to * push metrics data to the OpenTelemetry exporters. diff --git a/sdk/include/opentelemetry/sdk/metrics/metric_reader.h b/sdk/include/opentelemetry/sdk/metrics/metric_reader.h index 16a1880113..fdd5d41291 100644 --- a/sdk/include/opentelemetry/sdk/metrics/metric_reader.h +++ b/sdk/include/opentelemetry/sdk/metrics/metric_reader.h @@ -3,18 +3,23 @@ #pragma once #ifndef ENABLE_METRICS_PREVIEW -# include +# include "opentelemetry/common/spin_lock_mutex.h" # include "opentelemetry/sdk/common/global_log_handler.h" # include "opentelemetry/sdk/metrics/data/metric_data.h" # include "opentelemetry/sdk/metrics/instruments.h" + # include "opentelemetry/version.h" +# include +# include + OPENTELEMETRY_BEGIN_NAMESPACE namespace sdk { namespace metrics { +class MetricProducer; /** * MetricReader defines the interface to collect metrics from SDK */ @@ -22,56 +27,44 @@ class MetricReader { public: MetricReader( - AggregationTemporarily aggregation_temporarily = AggregationTemporarily::kCummulative) - : aggregation_temporarily_(aggregation_temporarily), measurement_processor_callback_({}) - {} + AggregationTemporarily aggregation_temporarily = AggregationTemporarily::kCummulative); - virtual ~MetricReader() = default; + void SetMetricProducer(MetricProducer *metric_producer); /** * Collect the metrics from SDK. * @return return the status of the operation. */ - bool Collect() noexcept - { - if (!measurement_processor_callback_) - { - OTEL_INTERNAL_LOG_WARN( - "Cannot invoke Collect() for MetricReader. No collection callback registered!") - } - return measurement_processor_callback_( - *this, aggregation_temporarily_, - [&](MetricData metric_data) noexcept { return ProcessReceivedMetrics(metric_data); }); - } + bool Collect(nostd::function_ref callback) noexcept; - /** - * Register the callback to Measurement Processor - * This function is internal to SDK. - */ - void RegisterCollectorCallback( - std::function)> measurement_processor_callback) - { - measurement_processor_callback_ = measurement_processor_callback; - } + AggregationTemporarily GetAggregationTemporarily() const noexcept; /** - * Process the metrics received through Measurement Processor. + * Shutdown the meter reader. */ - virtual bool ProcessReceivedMetrics(MetricData &metric_data) noexcept = 0; + bool Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept; /** - * Shut down the metric reader. - * @param timeout an optional timeout. - * @return return the status of the operation. + * Force flush the metric read by the reader. */ - virtual bool Shutdown() noexcept = 0; + bool ForceFlush(std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept; + + virtual ~MetricReader() = default; + +private: + virtual bool OnForceFlush(std::chrono::microseconds timeout) noexcept = 0; + + virtual bool OnShutDown(std::chrono::microseconds timeout) noexcept = 0; + + virtual void OnInitialized() noexcept {} + + bool IsShutdown() const noexcept; private: - std::function)> - measurement_processor_callback_; + MetricProducer *metric_producer_; AggregationTemporarily aggregation_temporarily_; + mutable opentelemetry::common::SpinLockMutex lock_; + bool shutdown_; }; } // namespace metrics } // namespace sdk diff --git a/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h b/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h index d5491e91cb..019d533006 100644 --- a/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h +++ b/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h @@ -37,12 +37,11 @@ class AsyncMetricStorage : public MetricStorage active_attributes_hashmap_(new AttributesHashMap()) {} - bool Collect( - MetricCollector *collector, - nostd::span collectors, - opentelemetry::sdk::instrumentationlibrary::InstrumentationLibrary *instrumentation_library, - opentelemetry::sdk::resource::Resource *resource, - nostd::function_ref metric_collection_callback) noexcept override + bool Collect(CollectorHandle *collector, + nostd::span> collectors, + opentelemetry::common::SystemTimestamp sdk_start_ts, + opentelemetry::common::SystemTimestamp collection_ts, + nostd::function_ref metric_collection_callback) noexcept override { opentelemetry::sdk::metrics::ObserverResult ob_res(attributes_processor_); diff --git a/sdk/include/opentelemetry/sdk/metrics/state/metric_collector.h b/sdk/include/opentelemetry/sdk/metrics/state/metric_collector.h new file mode 100644 index 0000000000..3049440d9d --- /dev/null +++ b/sdk/include/opentelemetry/sdk/metrics/state/metric_collector.h @@ -0,0 +1,57 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#pragma once +#ifndef ENABLE_METRICS_PREVIEW +# include "opentelemetry/sdk/metrics/data/metric_data.h" +# include "opentelemetry/sdk/metrics/export/metric_producer.h" + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk +{ +namespace metrics +{ + +class MetricReader; +class MeterContext; + +class CollectorHandle +{ +public: + virtual AggregationTemporarily GetAggregationTemporarily() noexcept = 0; +}; + +/** + * An internal opaque interface that the MetricReader receives as + * MetricProducer. It acts as the storage key to the internal metric stream + * state for each MetricReader. + */ + +class MetricCollector : public MetricProducer, public CollectorHandle +{ +public: + MetricCollector(std::shared_ptr &&context, + std::unique_ptr metric_reader); + + AggregationTemporarily GetAggregationTemporarily() noexcept override; + + /** + * The callback to be called for each metric exporter. This will only be those + * metrics that have been produced since the last time this method was called. + * + * @return a status of completion of method. + */ + bool Collect(nostd::function_ref callback) noexcept override; + + bool ForceFlush(std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept; + + bool Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept; + +private: + std::shared_ptr meter_context_; + std::shared_ptr metric_reader_; +}; +} // namespace metrics +} // namespace sdk +OPENTELEMETRY_END_NAMESPACE +#endif diff --git a/sdk/include/opentelemetry/sdk/metrics/state/metric_storage.h b/sdk/include/opentelemetry/sdk/metrics/state/metric_storage.h index a2fcb87679..8ae50554f1 100644 --- a/sdk/include/opentelemetry/sdk/metrics/state/metric_storage.h +++ b/sdk/include/opentelemetry/sdk/metrics/state/metric_storage.h @@ -4,6 +4,7 @@ #pragma once #ifndef ENABLE_METRICS_PREVIEW # include "opentelemetry/common/key_value_iterable_view.h" +# include "opentelemetry/common/timestamp.h" # include "opentelemetry/sdk/metrics/data/metric_data.h" # include "opentelemetry/sdk/metrics/instruments.h" OPENTELEMETRY_BEGIN_NAMESPACE @@ -13,19 +14,17 @@ namespace metrics { /* Represent the storage from which to collect the metrics */ +class CollectorHandle; class MetricStorage { public: /* collect the metrics from this storage */ - virtual bool Collect( - MetricCollector *collector, - nostd::span collectors, - opentelemetry::sdk::instrumentationlibrary::InstrumentationLibrary *instrumentation_library, - opentelemetry::sdk::resource::Resource *resource, - nostd::function_ref callback) noexcept = 0; - - virtual ~MetricStorage() = default; + virtual bool Collect(CollectorHandle *collector, + nostd::span> collectors, + opentelemetry::common::SystemTimestamp sdk_start_ts, + opentelemetry::common::SystemTimestamp collection_ts, + nostd::function_ref callback) noexcept = 0; }; class WritableMetricStorage @@ -46,12 +45,11 @@ class WritableMetricStorage class NoopMetricStorage : public MetricStorage { public: - bool Collect( - MetricCollector *collector, - nostd::span collectors, - opentelemetry::sdk::instrumentationlibrary::InstrumentationLibrary *instrumentation_library, - opentelemetry::sdk::resource::Resource *resource, - nostd::function_ref callback) noexcept override + bool Collect(CollectorHandle *collector, + nostd::span> collectors, + opentelemetry::common::SystemTimestamp sdk_start_ts, + opentelemetry::common::SystemTimestamp collection_ts, + nostd::function_ref callback) noexcept override { MetricData metric_data; if (callback(metric_data)) diff --git a/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h b/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h index e3ee673339..f8c2a86aa4 100644 --- a/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h +++ b/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h @@ -82,15 +82,14 @@ class SyncMetricStorage : public MetricStorage, public WritableMetricStorage attributes_hashmap_->GetOrSetDefault(attr, create_default_aggregation_)->Aggregate(value); } - bool Collect( - MetricCollector *collector, - nostd::span collectors, - opentelemetry::sdk::instrumentationlibrary::InstrumentationLibrary *instrumentation_library, - opentelemetry::sdk::resource::Resource *resource, - nostd::function_ref callback) noexcept override + bool Collect(CollectorHandle *collector, + nostd::span> collectors, + opentelemetry::common::SystemTimestamp sdk_start_ts, + opentelemetry::common::SystemTimestamp collection_ts, + nostd::function_ref callback) noexcept override { - MetricData metric_data; - if (callback(metric_data)) + MetricData data; + if (callback(data)) { return true; } diff --git a/sdk/include/opentelemetry/sdk/metrics/sync_instruments.h b/sdk/include/opentelemetry/sdk/metrics/sync_instruments.h index aa0348f0b0..3e8c5c8623 100644 --- a/sdk/include/opentelemetry/sdk/metrics/sync_instruments.h +++ b/sdk/include/opentelemetry/sdk/metrics/sync_instruments.h @@ -7,7 +7,6 @@ # include "opentelemetry/metrics/sync_instruments.h" # include "opentelemetry/nostd/string_view.h" # include "opentelemetry/sdk/metrics/instruments.h" -# include "opentelemetry/sdk/metrics/measurement_processor.h" # include "opentelemetry/sdk/instrumentationlibrary/instrumentation_library.h" @@ -16,6 +15,10 @@ namespace sdk { namespace metrics { + +// forward declaration +class WritableMetricStorage; + class Synchronous { public: diff --git a/sdk/src/metrics/CMakeLists.txt b/sdk/src/metrics/CMakeLists.txt index f9b6d546ed..119477eb06 100644 --- a/sdk/src/metrics/CMakeLists.txt +++ b/sdk/src/metrics/CMakeLists.txt @@ -3,6 +3,8 @@ add_library( meter_provider.cc meter.cc meter_context.cc + metric_reader.cc + state/metric_collector.cc aggregation/histogram_aggregation.cc aggregation/lastvalue_aggregation.cc aggregation/sum_aggregation.cc diff --git a/sdk/src/metrics/meter.cc b/sdk/src/metrics/meter.cc index 52ecfc9e4e..360630e786 100644 --- a/sdk/src/metrics/meter.cc +++ b/sdk/src/metrics/meter.cc @@ -7,6 +7,7 @@ # include "opentelemetry/nostd/shared_ptr.h" # include "opentelemetry/sdk/metrics/async_instruments.h" # include "opentelemetry/sdk/metrics/state/multi_metric_storage.h" +# include "opentelemetry/sdk/metrics/state/sync_metric_storage.h" # include "opentelemetry/sdk/metrics/sync_instruments.h" # include "opentelemetry/sdk_config.h" @@ -61,8 +62,8 @@ nostd::shared_ptr> Meter::CreateLongObservableC nostd::string_view description, nostd::string_view unit) noexcept { - return nostd::shared_ptr>{new LongObservableCounter( - name, GetInstrumentationLibrary(), GetMeasurementProcessor(), callback, description, unit)}; + return nostd::shared_ptr>{ + new LongObservableCounter(name, GetInstrumentationLibrary(), callback, description, unit)}; } nostd::shared_ptr> Meter::CreateDoubleObservableCounter( @@ -71,8 +72,8 @@ nostd::shared_ptr> Meter::CreateDoubleObserva nostd::string_view description, nostd::string_view unit) noexcept { - return nostd::shared_ptr>{new DoubleObservableCounter( - name, GetInstrumentationLibrary(), GetMeasurementProcessor(), callback, description, unit)}; + return nostd::shared_ptr>{ + new DoubleObservableCounter(name, GetInstrumentationLibrary(), callback, description, unit)}; } nostd::shared_ptr> Meter::CreateLongHistogram( @@ -109,8 +110,8 @@ nostd::shared_ptr> Meter::CreateLongObservableGau nostd::string_view description, nostd::string_view unit) noexcept { - return nostd::shared_ptr>{new LongObservableGauge( - name, GetInstrumentationLibrary(), GetMeasurementProcessor(), callback, description, unit)}; + return nostd::shared_ptr>{ + new LongObservableGauge(name, GetInstrumentationLibrary(), callback, description, unit)}; } nostd::shared_ptr> Meter::CreateDoubleObservableGauge( @@ -119,8 +120,8 @@ nostd::shared_ptr> Meter::CreateDoubleObservabl nostd::string_view description, nostd::string_view unit) noexcept { - return nostd::shared_ptr>{new DoubleObservableGauge( - name, GetInstrumentationLibrary(), GetMeasurementProcessor(), callback, description, unit)}; + return nostd::shared_ptr>{ + new DoubleObservableGauge(name, GetInstrumentationLibrary(), callback, description, unit)}; } nostd::shared_ptr> Meter::CreateLongUpDownCounter( @@ -158,7 +159,7 @@ nostd::shared_ptr> Meter::CreateLongObser nostd::string_view unit) noexcept { return nostd::shared_ptr>{new LongObservableUpDownCounter( - name, GetInstrumentationLibrary(), GetMeasurementProcessor(), callback, description, unit)}; + name, GetInstrumentationLibrary(), callback, description, unit)}; } nostd::shared_ptr> @@ -168,8 +169,8 @@ Meter::CreateDoubleObservableUpDownCounter(nostd::string_view name, nostd::string_view unit) noexcept { return nostd::shared_ptr>{ - new DoubleObservableUpDownCounter(name, GetInstrumentationLibrary(), - GetMeasurementProcessor(), callback, description, unit)}; + new DoubleObservableUpDownCounter(name, GetInstrumentationLibrary(), callback, description, + unit)}; } const sdk::instrumentationlibrary::InstrumentationLibrary *Meter::GetInstrumentationLibrary() @@ -178,11 +179,6 @@ const sdk::instrumentationlibrary::InstrumentationLibrary *Meter::GetInstrumenta return instrumentation_library_.get(); } -MeasurementProcessor *Meter::GetMeasurementProcessor() const noexcept -{ - return meter_context_->GetMeasurementProcessor(); -} - std::unique_ptr Meter::RegisterMetricStorage( InstrumentDescriptor &instrument_descriptor) { @@ -212,6 +208,25 @@ std::unique_ptr Meter::RegisterMetricStorage( return std::move(storages); } +/** collect metrics across all the meters **/ +bool Meter::collect(CollectorHandle *collector, + opentelemetry::common::SystemTimestamp collect_ts, + nostd::function_ref callback) noexcept +{ + std::vector data; + for (auto &metric_storage : storage_registry_) + { + // TBD - this needs to be asynchronous + metric_storage.second->Collect(collector, meter_context_->GetCollectors(), + meter_context_->GetSDKStartTime(), collect_ts, + [&callback](MetricData &metric_data) { + callback(metric_data); + return true; + }); + } + return true; +} + } // namespace metrics } // namespace sdk OPENTELEMETRY_END_NAMESPACE diff --git a/sdk/src/metrics/meter_context.cc b/sdk/src/metrics/meter_context.cc index bc2f75de2a..f23135fa0c 100644 --- a/sdk/src/metrics/meter_context.cc +++ b/sdk/src/metrics/meter_context.cc @@ -2,8 +2,10 @@ // SPDX-License-Identifier: Apache-2.0 #ifndef ENABLE_METRICS_PREVIEW +# include "opentelemetry/sdk/metrics/meter_context.h" # include "opentelemetry/sdk/common/global_log_handler.h" -# include "opentelemetry/sdk/metrics/meter.h" +# include "opentelemetry/sdk/metrics/metric_exporter.h" +# include "opentelemetry/sdk/metrics/metric_reader.h" # include "opentelemetry/sdk_config.h" # include "opentelemetry/version.h" @@ -14,13 +16,12 @@ namespace metrics { MeterContext::MeterContext(std::vector> &&exporters, - std::vector> &&readers, std::unique_ptr views, opentelemetry::sdk::resource::Resource resource) noexcept - : exporters_(std::move(exporters)), - readers_(std::move(readers)), + : resource_{resource}, + exporters_(std::move(exporters)), views_(std::move(views)), - resource_{resource} + sdk_start_ts_{std::chrono::system_clock::now()} {} const resource::Resource &MeterContext::GetResource() const noexcept @@ -28,14 +29,24 @@ const resource::Resource &MeterContext::GetResource() const noexcept return resource_; } -MeasurementProcessor *MeterContext::GetMeasurementProcessor() const noexcept +ViewRegistry *MeterContext::GetViewRegistry() const noexcept { - return measurement_processor_.get(); + return views_.get(); } -ViewRegistry *MeterContext::GetViewRegistry() const noexcept +nostd::span> MeterContext::GetMeters() noexcept { - return views_.get(); + return nostd::span>{meters_}; +} + +nostd::span> MeterContext::GetCollectors() noexcept +{ + return nostd::span>(collectors_); +} + +opentelemetry::common::SystemTimestamp MeterContext::GetSDKStartTime() noexcept +{ + return sdk_start_ts_; } void MeterContext::AddMetricExporter(std::unique_ptr exporter) noexcept @@ -45,7 +56,9 @@ void MeterContext::AddMetricExporter(std::unique_ptr exporter) n void MeterContext::AddMetricReader(std::unique_ptr reader) noexcept { - readers_.push_back(std::move(reader)); + auto collector = + std::shared_ptr{new MetricCollector(shared_from_this(), std::move(reader))}; + collectors_.push_back(collector); } void MeterContext::AddView(std::unique_ptr instrument_selector, @@ -55,34 +68,47 @@ void MeterContext::AddView(std::unique_ptr instrument_select views_->AddView(std::move(instrument_selector), std::move(meter_selector), std::move(view)); } +void MeterContext::AddMeter(std::shared_ptr meter) +{ + meters_.push_back(meter); +} + bool MeterContext::Shutdown() noexcept { - bool result_exporter = true; - bool result_reader = true; - for (auto &exporter : exporters_) - { - bool status = exporter->Shutdown(); - result_exporter = result_exporter && status; - } - if (!result_exporter) - { - OTEL_INTERNAL_LOG_WARN("[MeterContext::Shutdown] Unable to shutdown all metric exporters"); - } - for (auto &reader : readers_) + bool return_status = true; + if (!shutdown_latch_.test_and_set(std::memory_order_acquire)) { - bool status = reader->Shutdown(); - result_reader = result_reader && status; - } - if (!result_reader) - { - OTEL_INTERNAL_LOG_WARN("[MeterContext::Shutdown] Unable to shutdown all metric readers"); + bool result_exporter = true; + bool result_reader = true; + bool result_collector = true; + + for (auto &exporter : exporters_) + { + bool status = exporter->Shutdown(); + result_exporter = result_exporter && status; + } + if (!result_exporter) + { + OTEL_INTERNAL_LOG_WARN("[MeterContext::Shutdown] Unable to shutdown all metric exporters"); + } + for (auto &collector : collectors_) + { + bool status = std::static_pointer_cast(collector)->Shutdown(); + result_collector = result_reader && status; + } + if (!result_collector) + { + OTEL_INTERNAL_LOG_WARN("[MeterContext::Shutdown] Unable to shutdown all metric readers"); + } + return_status = result_exporter && result_collector; } - return result_exporter && result_reader; + return return_status; } bool MeterContext::ForceFlush(std::chrono::microseconds timeout) noexcept { // TODO - Implement timeout logic. + const std::lock_guard locked(forceflush_lock_); bool result_exporter = true; for (auto &exporter : exporters_) { diff --git a/sdk/src/metrics/meter_provider.cc b/sdk/src/metrics/meter_provider.cc index db1737ce94..8a9572c88c 100644 --- a/sdk/src/metrics/meter_provider.cc +++ b/sdk/src/metrics/meter_provider.cc @@ -3,6 +3,10 @@ #ifndef ENABLE_METRICS_PREVIEW # include "opentelemetry/sdk/metrics/meter_provider.h" +# include "opentelemetry/metrics/meter.h" +# include "opentelemetry/sdk/metrics/metric_exporter.h" +# include "opentelemetry/sdk/metrics/metric_reader.h" + # include "opentelemetry/sdk/common/global_log_handler.h" # include "opentelemetry/sdk_config.h" # include "opentelemetry/version.h" @@ -20,13 +24,9 @@ namespace metrics_api = opentelemetry::metrics; MeterProvider::MeterProvider(std::shared_ptr context) noexcept : context_{context} {} MeterProvider::MeterProvider(std::vector> &&exporters, - std::vector> &&readers, std::unique_ptr views, sdk::resource::Resource resource) noexcept - : context_(std::make_shared(std::move(exporters), - std::move(readers), - std::move(views), - resource)) + : context_(std::make_shared(std::move(exporters), std::move(views), resource)) {} nostd::shared_ptr MeterProvider::GetMeter( @@ -42,7 +42,7 @@ nostd::shared_ptr MeterProvider::GetMeter( const std::lock_guard guard(lock_); - for (auto &meter : meters_) + for (auto &meter : context_->GetMeters()) { auto meter_lib = meter->GetInstrumentationLibrary(); if (meter_lib->equal(name, version, schema_url)) @@ -50,9 +50,10 @@ nostd::shared_ptr MeterProvider::GetMeter( return nostd::shared_ptr{meter}; } } - auto lib = instrumentationlibrary::InstrumentationLibrary::Create(name, version, schema_url); - meters_.push_back(std::shared_ptr(new Meter(context_, std::move(lib)))); - return nostd::shared_ptr{meters_.back()}; + auto lib = instrumentationlibrary::InstrumentationLibrary::Create(name, version, schema_url); + auto meter = std::shared_ptr(new Meter(context_, std::move(lib))); + context_->AddMeter(meter); + return nostd::shared_ptr{meter}; } const resource::Resource &MeterProvider::GetResource() const noexcept @@ -60,11 +61,6 @@ const resource::Resource &MeterProvider::GetResource() const noexcept return context_->GetResource(); } -MeasurementProcessor *MeterProvider::GetMeasurementProcessor() const noexcept -{ - return context_->GetMeasurementProcessor(); -} - void MeterProvider::AddMetricExporter(std::unique_ptr exporter) noexcept { return context_->AddMetricExporter(std::move(exporter)); diff --git a/sdk/src/metrics/metric_reader.cc b/sdk/src/metrics/metric_reader.cc new file mode 100644 index 0000000000..71aedc227f --- /dev/null +++ b/sdk/src/metrics/metric_reader.cc @@ -0,0 +1,89 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#ifndef ENABLE_METRICS_PREVIEW +# include "opentelemetry/sdk/metrics/metric_reader.h" +# include "opentelemetry/sdk/metrics/export/metric_producer.h" + +# include + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk +{ +namespace metrics +{ + +MetricReader::MetricReader(AggregationTemporarily aggregation_temporarily) + : aggregation_temporarily_(aggregation_temporarily) +{} + +void MetricReader::SetMetricProducer(MetricProducer *metric_producer) +{ + metric_producer_ = metric_producer; +} + +AggregationTemporarily MetricReader::GetAggregationTemporarily() const noexcept +{ + return aggregation_temporarily_; +} + +bool MetricReader::Collect(nostd::function_ref callback) noexcept +{ + if (!metric_producer_) + { + OTEL_INTERNAL_LOG_WARN( + "MetricReader::Collect Cannot invoke Collect(). No MetricProducer registered for " + "collection!") + } + if (IsShutdown()) + { + OTEL_INTERNAL_LOG_WARN("MetricReader::Collect Cannot invoke Collect(). Shutdown in progress!"); + } + + return metric_producer_->Collect(callback); +} + +bool MetricReader::Shutdown(std::chrono::microseconds timeout) noexcept +{ + bool status = true; + + if (IsShutdown()) + { + OTEL_INTERNAL_LOG_WARN("MetricReader::Shutdown - Cannot invoke shutdown twice!"); + } + if (!OnShutDown(timeout)) + { + status = false; + OTEL_INTERNAL_LOG_WARN("MetricReader::OnShutDown Shutdown failed. Will not be tried again!"); + } + const std::lock_guard locked(lock_); + shutdown_ = true; + return status; +} + +/** Flush metric read by this reader **/ +bool MetricReader::ForceFlush(std::chrono::microseconds timeout) noexcept +{ + bool status = true; + if (shutdown_) + { + OTEL_INTERNAL_LOG_WARN("MetricReader::Shutdown Cannot invoke Force flush on shutdown reader!"); + } + if (!OnForceFlush(timeout)) + { + status = false; + OTEL_INTERNAL_LOG_ERROR("MetricReader::OnForceFlush failed!"); + } + return status; +} + +bool MetricReader::IsShutdown() const noexcept +{ + const std::lock_guard locked(lock_); + return shutdown_; +} + +} // namespace metrics +} // namespace sdk +OPENTELEMETRY_END_NAMESPACE +#endif diff --git a/sdk/src/metrics/state/metric_collector.cc b/sdk/src/metrics/state/metric_collector.cc new file mode 100644 index 0000000000..a52c6e9b1c --- /dev/null +++ b/sdk/src/metrics/state/metric_collector.cc @@ -0,0 +1,55 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#ifndef ENABLE_METRICS_PREVIEW +# include "opentelemetry/sdk/metrics/state/metric_collector.h" +# include "opentelemetry/sdk/common/global_log_handler.h" +# include "opentelemetry/sdk/metrics/meter.h" +# include "opentelemetry/sdk/metrics/meter_context.h" +# include "opentelemetry/sdk/metrics/metric_reader.h" +# include "opentelemetry/sdk_config.h" +# include "opentelemetry/version.h" + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk +{ +namespace metrics +{ + +MetricCollector::MetricCollector( + std::shared_ptr &&context, + std::unique_ptr metric_reader) + : meter_context_{std::move(context)}, metric_reader_{std::move(metric_reader)} +{ + metric_reader_->SetMetricProducer(this); +} + +AggregationTemporarily MetricCollector::GetAggregationTemporarily() noexcept +{ + return metric_reader_->GetAggregationTemporarily(); +} + +bool MetricCollector::Collect(nostd::function_ref callback) noexcept +{ + for (auto &meter : meter_context_->GetMeters()) + { + auto collection_ts = std::chrono::system_clock::now(); + meter->collect(this, collection_ts, callback); + } + return true; +} + +bool MetricCollector::ForceFlush(std::chrono::microseconds timeout) noexcept +{ + return metric_reader_->ForceFlush(timeout); +} + +bool MetricCollector::Shutdown(std::chrono::microseconds timeout) noexcept +{ + return metric_reader_->Shutdown(timeout); +} + +} // namespace metrics +} // namespace sdk +OPENTELEMETRY_END_NAMESPACE +#endif diff --git a/sdk/src/metrics/sync_instruments.cc b/sdk/src/metrics/sync_instruments.cc index 85e770a715..863e5b6323 100644 --- a/sdk/src/metrics/sync_instruments.cc +++ b/sdk/src/metrics/sync_instruments.cc @@ -3,6 +3,7 @@ #ifndef ENABLE_METRICS_PREVIEW # include "opentelemetry/sdk/metrics/sync_instruments.h" +# include "opentelemetry/sdk/metrics/state/metric_storage.h" OPENTELEMETRY_BEGIN_NAMESPACE namespace sdk diff --git a/sdk/test/metrics/BUILD b/sdk/test/metrics/BUILD index cf780b1120..9ca191574f 100644 --- a/sdk/test/metrics/BUILD +++ b/sdk/test/metrics/BUILD @@ -16,6 +16,22 @@ cc_test( ], ) +cc_test( + name = "metric_reader_test", + srcs = [ + "metric_reader_test.cc", + ], + tags = [ + "metrics", + "test", + ], + deps = [ + "//sdk/src/metrics", + "//sdk/src/resource", + "@com_google_googletest//:gtest_main", + ], +) + cc_test( name = "view_registry_test", srcs = [ diff --git a/sdk/test/metrics/CMakeLists.txt b/sdk/test/metrics/CMakeLists.txt index a2080b6e46..3ee2be6552 100644 --- a/sdk/test/metrics/CMakeLists.txt +++ b/sdk/test/metrics/CMakeLists.txt @@ -10,7 +10,8 @@ foreach( multi_metric_storage_test observer_result_test sync_instruments_test - async_instruments_test) + async_instruments_test + metric_reader_test) add_executable(${testname} "${testname}.cc") target_link_libraries( ${testname} ${GTEST_BOTH_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT} diff --git a/sdk/test/metrics/async_instruments_test.cc b/sdk/test/metrics/async_instruments_test.cc index abaeb83028..ad3a81b031 100644 --- a/sdk/test/metrics/async_instruments_test.cc +++ b/sdk/test/metrics/async_instruments_test.cc @@ -4,7 +4,6 @@ #ifndef ENABLE_METRICS_PREVIEW # include "opentelemetry/sdk/metrics/async_instruments.h" # include "opentelemetry/sdk/instrumentationlibrary/instrumentation_library.h" -# include "opentelemetry/sdk/metrics/measurement_processor.h" # include @@ -13,7 +12,6 @@ using namespace opentelemetry::sdk::instrumentationlibrary; using namespace opentelemetry::sdk::metrics; auto instrumentation_library = InstrumentationLibrary::Create("opentelemetry-cpp", "0.1.0"); -DefaultMeasurementProcessor measurement_processor; using M = std::map; @@ -25,48 +23,43 @@ TEST(AsyncInstruments, LongObservableCounter) { auto asyc_generate_meas_long = [](opentelemetry::metrics::ObserverResult &observer) {}; EXPECT_NO_THROW(LongObservableCounter counter("long_counter", instrumentation_library.get(), - &measurement_processor, asyc_generate_meas_long, - "description", "1")); + asyc_generate_meas_long, "description", "1")); } TEST(AsyncInstruments, DoubleObservableCounter) { auto asyc_generate_meas_double = [](opentelemetry::metrics::ObserverResult &observer) {}; EXPECT_NO_THROW(DoubleObservableCounter counter("long_counter", instrumentation_library.get(), - &measurement_processor, asyc_generate_meas_double, - "description", "1")); + asyc_generate_meas_double, "description", "1")); } TEST(AsyncInstruments, LongObservableGauge) { auto asyc_generate_meas_long = [](opentelemetry::metrics::ObserverResult &observer) {}; EXPECT_NO_THROW(LongObservableGauge counter("long_counter", instrumentation_library.get(), - &measurement_processor, asyc_generate_meas_long, - "description", "1")); + asyc_generate_meas_long, "description", "1")); } TEST(AsyncInstruments, DoubleObservableGauge) { auto asyc_generate_meas_double = [](opentelemetry::metrics::ObserverResult &observer) {}; EXPECT_NO_THROW(DoubleObservableGauge counter("long_counter", instrumentation_library.get(), - &measurement_processor, asyc_generate_meas_double, - "description", "1")); + asyc_generate_meas_double, "description", "1")); } TEST(AsyncInstruments, LongObservableUpDownCounter) { auto asyc_generate_meas_long = [](opentelemetry::metrics::ObserverResult &observer) {}; EXPECT_NO_THROW(LongObservableUpDownCounter counter("long_counter", instrumentation_library.get(), - &measurement_processor, asyc_generate_meas_long, "description", "1")); } TEST(AsyncInstruments, DoubleObservableUpDownCounter) { auto asyc_generate_meas_double = [](opentelemetry::metrics::ObserverResult &observer) {}; - EXPECT_NO_THROW(DoubleObservableUpDownCounter counter( - "long_counter", instrumentation_library.get(), &measurement_processor, - asyc_generate_meas_double, "description", "1")); + EXPECT_NO_THROW( + DoubleObservableUpDownCounter counter("long_counter", instrumentation_library.get(), + asyc_generate_meas_double, "description", "1")); } #endif diff --git a/sdk/test/metrics/async_metric_storage_test.cc b/sdk/test/metrics/async_metric_storage_test.cc index fd2a41d325..fd7d24c6f3 100644 --- a/sdk/test/metrics/async_metric_storage_test.cc +++ b/sdk/test/metrics/async_metric_storage_test.cc @@ -5,18 +5,31 @@ # include "opentelemetry/sdk/metrics/state/async_metric_storage.h" # include "opentelemetry/common/key_value_iterable_view.h" # include "opentelemetry/sdk/metrics/instruments.h" +# include "opentelemetry/sdk/metrics/meter_context.h" +# include "opentelemetry/sdk/metrics/metric_exporter.h" +# include "opentelemetry/sdk/metrics/metric_reader.h" # include "opentelemetry/sdk/metrics/observer_result.h" - -# include "opentelemetry/sdk/instrumentationlibrary/instrumentation_library.h" -# include "opentelemetry/sdk/resource/resource.h" +# include "opentelemetry/sdk/metrics/state/metric_collector.h" # include -# include +# include using namespace opentelemetry::sdk::metrics; using namespace opentelemetry::sdk::instrumentationlibrary; using namespace opentelemetry::sdk::resource; +class MockMetricReader : public MetricReader +{ +public: + MockMetricReader(AggregationTemporarily aggr_temporarily) : MetricReader(aggr_temporarily) {} + + virtual bool OnForceFlush(std::chrono::microseconds timeout) noexcept override { return true; } + + virtual bool OnShutDown(std::chrono::microseconds timeout) noexcept override { return true; } + + virtual void OnInitialized() noexcept override {} +}; + void measurement_fetch(opentelemetry::metrics::ObserverResult &observer_result) { observer_result.Observe(20l); @@ -28,14 +41,19 @@ TEST(AsyncMetricStorageTest, BasicTests) auto metric_callback = [](MetricData &metric_data) { return true; }; InstrumentDescriptor instr_desc = {"name", "desc", "1unit", InstrumentType::kCounter, InstrumentValueType::kLong}; - auto instrumentation_library = InstrumentationLibrary::Create("instr_lib"); - auto resource = Resource::Create({}); - MetricCollector collector; - std::vector collectors; + + std::vector> exporters; + std::shared_ptr meter_context(new MeterContext(std::move(exporters))); + std::unique_ptr metric_reader(new MockMetricReader(AggregationTemporarily::kDelta)); + + std::shared_ptr collector = std::shared_ptr( + new MetricCollector(std::move(meter_context), std::move(metric_reader))); + + std::vector> collectors{collector}; opentelemetry::sdk::metrics::AsyncMetricStorage storage( instr_desc, AggregationType::kSum, &measurement_fetch, new DefaultAttributesProcessor()); - EXPECT_NO_THROW(storage.Collect(&collector, collectors, instrumentation_library.get(), &resource, - metric_callback)); + storage.Collect(collector.get(), collectors, std::chrono::system_clock::now(), + std::chrono::system_clock::now(), metric_callback); } #endif \ No newline at end of file diff --git a/sdk/test/metrics/meter_provider_sdk_test.cc b/sdk/test/metrics/meter_provider_sdk_test.cc index e321a29a96..015d3023ae 100644 --- a/sdk/test/metrics/meter_provider_sdk_test.cc +++ b/sdk/test/metrics/meter_provider_sdk_test.cc @@ -3,8 +3,11 @@ #ifndef ENABLE_METRICS_PREVIEW # include +# include "opentelemetry/sdk/metrics/export/metric_producer.h" # include "opentelemetry/sdk/metrics/meter.h" # include "opentelemetry/sdk/metrics/meter_provider.h" +# include "opentelemetry/sdk/metrics/metric_exporter.h" +# include "opentelemetry/sdk/metrics/metric_reader.h" # include "opentelemetry/sdk/metrics/view/instrument_selector.h" # include "opentelemetry/sdk/metrics/view/meter_selector.h" @@ -36,17 +39,18 @@ class MockMetricExporter : public MetricExporter class MockMetricReader : public MetricReader { public: - bool ProcessReceivedMetrics(MetricData &metric_data) noexcept override { return true; } + virtual bool OnForceFlush(std::chrono::microseconds timeout) noexcept override { return true; } - bool Shutdown() noexcept override { return true; } + virtual bool OnShutDown(std::chrono::microseconds timeout) noexcept override { return true; } + + virtual void OnInitialized() noexcept override {} }; TEST(MeterProvider, GetMeter) { std::vector> exporters; - std::vector> readers; - MeterProvider mp1(std::move(exporters), std::move(readers)); + MeterProvider mp1(std::move(exporters)); // std::unique_ptr view{std::unique_ptr()}; // MeterProvider mp1(std::move(exporters), std::move(readers), std::move(views); auto m1 = mp1.GetMeter("test"); diff --git a/sdk/test/metrics/metric_reader_test.cc b/sdk/test/metrics/metric_reader_test.cc new file mode 100644 index 0000000000..d0f7c14981 --- /dev/null +++ b/sdk/test/metrics/metric_reader_test.cc @@ -0,0 +1,42 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#ifndef ENABLE_METRICS_PREVIEW +# include "opentelemetry/sdk/metrics/metric_reader.h" +# include +# include "opentelemetry/sdk/metrics/meter_context.h" +# include "opentelemetry/sdk/metrics/metric_exporter.h" + +using namespace opentelemetry; +using namespace opentelemetry::sdk::instrumentationlibrary; +using namespace opentelemetry::sdk::metrics; + +class MockMetricReader : public MetricReader +{ +public: + MockMetricReader(AggregationTemporarily aggr_temporarily) : MetricReader(aggr_temporarily) {} + + virtual bool OnForceFlush(std::chrono::microseconds timeout) noexcept override { return true; } + + virtual bool OnShutDown(std::chrono::microseconds timeout) noexcept override { return true; } + + virtual void OnInitialized() noexcept override {} +}; + +TEST(MetricReaderTest, BasicTests) +{ + AggregationTemporarily aggr_temporarily = AggregationTemporarily::kDelta; + std::unique_ptr metric_reader1(new MockMetricReader(aggr_temporarily)); + EXPECT_EQ(metric_reader1->GetAggregationTemporarily(), aggr_temporarily); + + std::vector> exporters; + std::shared_ptr meter_context1(new MeterContext(std::move(exporters))); + EXPECT_NO_THROW(meter_context1->AddMetricReader(std::move(metric_reader1))); + + std::unique_ptr metric_reader2(new MockMetricReader(aggr_temporarily)); + std::shared_ptr meter_context2(new MeterContext(std::move(exporters))); + MetricProducer *metric_producer = + new MetricCollector(std::move(meter_context2), std::move(metric_reader2)); + EXPECT_NO_THROW(metric_producer->Collect([](MetricData data) { return true; })); +} +#endif