diff --git a/exporter/opentelemetry-exporter-prometheus-remote-write/CHANGELOG.md b/exporter/opentelemetry-exporter-prometheus-remote-write/CHANGELOG.md index 3b15e34e4c..323ab2817d 100644 --- a/exporter/opentelemetry-exporter-prometheus-remote-write/CHANGELOG.md +++ b/exporter/opentelemetry-exporter-prometheus-remote-write/CHANGELOG.md @@ -5,3 +5,5 @@ ((#180)[https://github.com/open-telemetry/opentelemetry-python-contrib/pull/180]) - Add Exporter constructor validation methods ((#206)[https://github.com/open-telemetry/opentelemetry-python-contrib/pull/206]) +- Add conversion to TimeSeries methods + ((#207)[https://github.com/open-telemetry/opentelemetry-python-contrib/pull/207]) diff --git a/exporter/opentelemetry-exporter-prometheus-remote-write/src/opentelemetry/exporter/prometheus_remote_write/__init__.py b/exporter/opentelemetry-exporter-prometheus-remote-write/src/opentelemetry/exporter/prometheus_remote_write/__init__.py index 9f21aa2ae6..ee4dcc1fd3 100644 --- a/exporter/opentelemetry-exporter-prometheus-remote-write/src/opentelemetry/exporter/prometheus_remote_write/__init__.py +++ b/exporter/opentelemetry-exporter-prometheus-remote-write/src/opentelemetry/exporter/prometheus_remote_write/__init__.py @@ -12,8 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. +import re from typing import Dict, Sequence +from opentelemetry.exporter.prometheus_remote_write.gen.remote_pb2 import ( + WriteRequest, +) from opentelemetry.exporter.prometheus_remote_write.gen.types_pb2 import ( Label, Sample, @@ -24,6 +28,13 @@ MetricsExporter, MetricsExportResult, ) +from opentelemetry.sdk.metrics.export.aggregate import ( + HistogramAggregator, + LastValueAggregator, + MinMaxSumCountAggregator, + SumAggregator, + ValueObserverAggregator, +) class PrometheusRemoteWriteMetricsExporter(MetricsExporter): @@ -142,50 +153,144 @@ def export( def shutdown(self) -> None: raise NotImplementedError() - def convert_to_timeseries( + def _convert_to_timeseries( self, export_records: Sequence[ExportRecord] ) -> Sequence[TimeSeries]: - raise NotImplementedError() - - def convert_from_sum(self, sum_record: ExportRecord) -> TimeSeries: - raise NotImplementedError() - - def convert_from_min_max_sum_count( + converter_map = { + MinMaxSumCountAggregator: self._convert_from_min_max_sum_count, + SumAggregator: self._convert_from_sum, + HistogramAggregator: self._convert_from_histogram, + LastValueAggregator: self._convert_from_last_value, + ValueObserverAggregator: self._convert_from_value_observer, + } + timeseries = [] + for export_record in export_records: + aggregator_type = type(export_record.aggregator) + converter = converter_map.get(aggregator_type) + if converter is None: + raise ValueError( + str(aggregator_type) + " conversion is not supported" + ) + timeseries.extend(converter(export_record)) + return timeseries + + def _convert_from_sum(self, sum_record: ExportRecord) -> TimeSeries: + return [ + self._create_timeseries( + sum_record, + sum_record.instrument.name, + sum_record.aggregator.checkpoint, + ) + ] + + def _convert_from_min_max_sum_count( self, min_max_sum_count_record: ExportRecord ) -> TimeSeries: - raise NotImplementedError() - - def convert_from_histogram( + timeseries = [] + for agg_type in ["min", "max", "sum", "count"]: + name = min_max_sum_count_record.instrument.name + "_" + agg_type + value = getattr( + min_max_sum_count_record.aggregator.checkpoint, agg_type + ) + timeseries.append( + self._create_timeseries(min_max_sum_count_record, name, value) + ) + return timeseries + + def _convert_from_histogram( self, histogram_record: ExportRecord ) -> TimeSeries: - raise NotImplementedError() - - def convert_from_last_value( + count = 0 + timeseries = [] + for bound in histogram_record.aggregator.checkpoint.keys(): + bound_str = "+Inf" if bound == float("inf") else str(bound) + value = histogram_record.aggregator.checkpoint[bound] + timeseries.append( + self._create_timeseries( + histogram_record, + histogram_record.instrument.name, + value, + labels=[("le", bound_str)], + ) + ) + count += value + name = histogram_record.instrument.name + "_count" + timeseries.append( + self._create_timeseries(histogram_record, name, float(count)) + ) + return timeseries + + def _convert_from_last_value( self, last_value_record: ExportRecord ) -> TimeSeries: - raise NotImplementedError() - - def convert_from_value_observer( + return [ + self._create_timeseries( + last_value_record, + last_value_record.instrument.name, + last_value_record.aggregator.checkpoint, + ) + ] + + def _convert_from_value_observer( self, value_observer_record: ExportRecord ) -> TimeSeries: - raise NotImplementedError() + timeseries = [] + for agg_type in ["min", "max", "sum", "count", "last"]: + timeseries.append( + self._create_timeseries( + value_observer_record, + value_observer_record.instrument.name + "_" + agg_type, + getattr( + value_observer_record.aggregator.checkpoint, agg_type + ), + ) + ) + return timeseries - def convert_from_quantile( + # TODO: Implement convert from quantile once supported by SDK for Prometheus Summaries + def _convert_from_quantile( self, summary_record: ExportRecord ) -> TimeSeries: raise NotImplementedError() # pylint: disable=no-member - def create_timeseries( - self, export_record: ExportRecord, name, value: float + def _create_timeseries( + self, export_record: ExportRecord, name, value: float, labels=None ) -> TimeSeries: - raise NotImplementedError() - - def create_sample(self, timestamp: int, value: float) -> Sample: - raise NotImplementedError() - - def create_label(self, name: str, value: str) -> Label: - raise NotImplementedError() + timeseries = TimeSeries() + seen = set() + + def add_label(label_name, label_value): + # Label name must contain only alphanumeric characters and underscores + label_name = re.sub("[^\\w_]", "_", label_name) + if label_name not in seen: + label = Label() + label.name = label_name + label.value = label_value + timeseries.labels.append(label) + seen.add(label_name) + + add_label("__name__", name) + if labels: + for [label_name, label_value] in labels: + add_label(label_name, label_value) + if export_record.resource.attributes: + for ( + label_name, + label_value, + ) in export_record.resource.attributes.items(): + add_label(label_name, label_value) + if export_record.labels: + for [label_name, label_value] in export_record.labels: + add_label(label_name, label_value) + + sample = Sample() + sample.timestamp = int( + export_record.aggregator.last_update_timestamp / 1000000 + ) + sample.value = value + timeseries.samples.append(sample) + return timeseries def build_message(self, timeseries: Sequence[TimeSeries]) -> bytes: raise NotImplementedError() diff --git a/exporter/opentelemetry-exporter-prometheus-remote-write/tests/test_prometheus_remote_write_exporter.py b/exporter/opentelemetry-exporter-prometheus-remote-write/tests/test_prometheus_remote_write_exporter.py index 5208828fc1..227c4ec3bf 100644 --- a/exporter/opentelemetry-exporter-prometheus-remote-write/tests/test_prometheus_remote_write_exporter.py +++ b/exporter/opentelemetry-exporter-prometheus-remote-write/tests/test_prometheus_remote_write_exporter.py @@ -13,10 +13,27 @@ # limitations under the License. import unittest +from unittest.mock import MagicMock, Mock, patch from opentelemetry.exporter.prometheus_remote_write import ( PrometheusRemoteWriteMetricsExporter, ) +from opentelemetry.exporter.prometheus_remote_write.gen.types_pb2 import ( + Label, + Sample, + TimeSeries, +) +from opentelemetry.sdk.metrics import Counter +from opentelemetry.sdk.metrics.export import ExportRecord, MetricsExportResult +from opentelemetry.sdk.metrics.export.aggregate import ( + HistogramAggregator, + LastValueAggregator, + MinMaxSumCountAggregator, + SumAggregator, + ValueObserverAggregator, +) +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.util import get_dict_as_key class TestValidation(unittest.TestCase): @@ -115,44 +132,230 @@ def test_invalid_tls_config_key_only_param(self): class TestConversion(unittest.TestCase): # Initializes test data that is reused across tests def setUp(self): - pass + self.exporter = PrometheusRemoteWriteMetricsExporter( + endpoint="/prom/test_endpoint" + ) # Ensures conversion to timeseries function works with valid aggregation types def test_valid_convert_to_timeseries(self): - pass + test_records = [ + ExportRecord( + Counter("testname", "testdesc", "testunit", int, None), + None, + SumAggregator(), + Resource({}), + ), + ExportRecord( + Counter("testname", "testdesc", "testunit", int, None), + None, + MinMaxSumCountAggregator(), + Resource({}), + ), + ExportRecord( + Counter("testname", "testdesc", "testunit", int, None), + None, + HistogramAggregator(), + Resource({}), + ), + ExportRecord( + Counter("testname", "testdesc", "testunit", int, None), + None, + LastValueAggregator(), + Resource({}), + ), + ExportRecord( + Counter("testname", "testdesc", "testunit", int, None), + None, + ValueObserverAggregator(), + Resource({}), + ), + ] + for record in test_records: + record.aggregator.update(5) + record.aggregator.take_checkpoint() + data = self.exporter._convert_to_timeseries(test_records) + self.assertEqual(len(data), 14) + for timeseries in data: + self.assertIsInstance(timeseries, TimeSeries) # Ensures conversion to timeseries fails for unsupported aggregation types def test_invalid_convert_to_timeseries(self): - pass + with self.assertRaises(ValueError): + self.exporter._convert_to_timeseries( + [ExportRecord(None, None, None, Resource({}))] + ) # Ensures sum aggregator is correctly converted to timeseries def test_convert_from_sum(self): - pass + sum_record = ExportRecord( + Counter("testname", "testdesc", "testunit", int, None), + None, + SumAggregator(), + Resource({}), + ) + sum_record.aggregator.update(3) + sum_record.aggregator.update(2) + sum_record.aggregator.take_checkpoint() + + expected_timeseries = self.exporter._create_timeseries( + sum_record, "testname", 5.0 + ) + timeseries = self.exporter._convert_from_sum(sum_record) + self.assertEqual(timeseries[0], expected_timeseries) # Ensures sum min_max_count aggregator is correctly converted to timeseries def test_convert_from_min_max_sum_count(self): - pass + min_max_sum_count_record = ExportRecord( + Counter("testname", "testdesc", "testunit", int, None), + None, + MinMaxSumCountAggregator(), + Resource({}), + ) + min_max_sum_count_record.aggregator.update(5) + min_max_sum_count_record.aggregator.update(1) + min_max_sum_count_record.aggregator.take_checkpoint() + + expected_min_timeseries = self.exporter._create_timeseries( + min_max_sum_count_record, "testname_min", 1.0 + ) + expected_max_timeseries = self.exporter._create_timeseries( + min_max_sum_count_record, "testname_max", 5.0 + ) + expected_sum_timeseries = self.exporter._create_timeseries( + min_max_sum_count_record, "testname_sum", 6.0 + ) + expected_count_timeseries = self.exporter._create_timeseries( + min_max_sum_count_record, "testname_count", 2.0 + ) + + timeseries = self.exporter._convert_from_min_max_sum_count( + min_max_sum_count_record + ) + self.assertEqual(timeseries[0], expected_min_timeseries) + self.assertEqual(timeseries[1], expected_max_timeseries) + self.assertEqual(timeseries[2], expected_sum_timeseries) + self.assertEqual(timeseries[3], expected_count_timeseries) # Ensures histogram aggregator is correctly converted to timeseries def test_convert_from_histogram(self): - pass + histogram_record = ExportRecord( + Counter("testname", "testdesc", "testunit", int, None), + None, + HistogramAggregator(), + Resource({}), + ) + histogram_record.aggregator.update(5) + histogram_record.aggregator.update(2) + histogram_record.aggregator.update(-1) + histogram_record.aggregator.take_checkpoint() + + expected_le_0_timeseries = self.exporter._create_timeseries( + histogram_record, "testname", 1.0, [("le", "0")] + ) + expected_le_inf_timeseries = self.exporter._create_timeseries( + histogram_record, "testname", 2.0, [("le", "+Inf")] + ) + expected_count_timeseries = self.exporter._create_timeseries( + histogram_record, "testname_count", 3.0 + ) + timeseries = self.exporter._convert_from_histogram(histogram_record) + self.assertEqual(timeseries[0], expected_le_0_timeseries) + self.assertEqual(timeseries[1], expected_le_inf_timeseries) + self.assertEqual(timeseries[2], expected_count_timeseries) # Ensures last value aggregator is correctly converted to timeseries def test_convert_from_last_value(self): - pass + last_value_record = ExportRecord( + Counter("testname", "testdesc", "testunit", int, None), + None, + LastValueAggregator(), + Resource({}), + ) + last_value_record.aggregator.update(1) + last_value_record.aggregator.update(5) + last_value_record.aggregator.take_checkpoint() + + expected_timeseries = self.exporter._create_timeseries( + last_value_record, "testname", 5.0 + ) + timeseries = self.exporter._convert_from_last_value(last_value_record) + self.assertEqual(timeseries[0], expected_timeseries) # Ensures value observer aggregator is correctly converted to timeseries def test_convert_from_value_observer(self): - pass + value_observer_record = ExportRecord( + Counter("testname", "testdesc", "testunit", int, None), + None, + ValueObserverAggregator(), + Resource({}), + ) + value_observer_record.aggregator.update(5) + value_observer_record.aggregator.update(1) + value_observer_record.aggregator.update(2) + value_observer_record.aggregator.take_checkpoint() + + expected_min_timeseries = self.exporter._create_timeseries( + value_observer_record, "testname_min", 1.0 + ) + expected_max_timeseries = self.exporter._create_timeseries( + value_observer_record, "testname_max", 5.0 + ) + expected_sum_timeseries = self.exporter._create_timeseries( + value_observer_record, "testname_sum", 8.0 + ) + expected_count_timeseries = self.exporter._create_timeseries( + value_observer_record, "testname_count", 3.0 + ) + expected_last_timeseries = self.exporter._create_timeseries( + value_observer_record, "testname_last", 2.0 + ) + timeseries = self.exporter._convert_from_value_observer( + value_observer_record + ) + self.assertEqual(timeseries[0], expected_min_timeseries) + self.assertEqual(timeseries[1], expected_max_timeseries) + self.assertEqual(timeseries[2], expected_sum_timeseries) + self.assertEqual(timeseries[3], expected_count_timeseries) + self.assertEqual(timeseries[4], expected_last_timeseries) # Ensures quantile aggregator is correctly converted to timeseries - # TODO: Add test once method is implemented - def test_convert_from_quantile(self): - pass + # TODO: Add test_convert_from_quantile once method is implemented # Ensures timeseries produced contains appropriate sample and labels def test_create_timeseries(self): - pass + def create_label(name, value): + label = Label() + label.name = name + label.value = value + return label + + sum_aggregator = SumAggregator() + sum_aggregator.update(5) + sum_aggregator.take_checkpoint() + export_record = ExportRecord( + Counter("testname", "testdesc", "testunit", int, None), + get_dict_as_key({"record_name": "record_value"}), + sum_aggregator, + Resource({"resource_name": "resource_value"}), + ) + + expected_timeseries = TimeSeries() + expected_timeseries.labels.append(create_label("__name__", "testname")) + expected_timeseries.labels.append( + create_label("resource_name", "resource_value") + ) + expected_timeseries.labels.append( + create_label("record_name", "record_value") + ) + + sample = expected_timeseries.samples.add() + sample.timestamp = int(sum_aggregator.last_update_timestamp / 1000000) + sample.value = 5.0 + + timeseries = self.exporter._create_timeseries( + export_record, "testname", 5.0 + ) + self.assertEqual(timeseries, expected_timeseries) class TestExport(unittest.TestCase):