Skip to content

Commit

Permalink
Complete metric exporter format and update OTLP exporter
Browse files Browse the repository at this point in the history
  • Loading branch information
aabmass committed Jan 12, 2022
1 parent 2a7e3fc commit 9cfa7eb
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
from opentelemetry.sdk.environment_variables import (
OTEL_EXPORTER_OTLP_METRICS_INSECURE,
)
from opentelemetry.sdk._metrics.data import (
MetricData,
from opentelemetry.sdk._metrics.point import (
Metric,
)

from opentelemetry.sdk._metrics.export import (
Expand All @@ -45,9 +45,7 @@

class OTLPMetricExporter(
MetricExporter,
OTLPExporterMixin[
MetricData, ExportMetricsServiceRequest, MetricExportResult
],
OTLPExporterMixin[Metric, ExportMetricsServiceRequest, MetricExportResult],
):
_result = MetricExportResult
_stub = MetricsServiceStub
Expand Down Expand Up @@ -79,13 +77,13 @@ def __init__(
)

def _translate_data(
self, data: Sequence[MetricData]
self, data: Sequence[Metric]
) -> ExportMetricsServiceRequest:
sdk_resource_instrumentation_library_metrics = {}
self._collector_metric_kwargs = {}

for metric_data in data:
resource = metric_data.metric.resource
for metric in data:
resource = metric.resource
instrumentation_library_map = (
sdk_resource_instrumentation_library_metrics.get(resource, {})
)
Expand All @@ -95,26 +93,26 @@ def _translate_data(
] = instrumentation_library_map

instrumentation_library_metrics = instrumentation_library_map.get(
metric_data.instrumentation_info
metric.instrumentation_info
)

if not instrumentation_library_metrics:
if metric_data.instrumentation_info is not None:
if metric.instrumentation_info is not None:
instrumentation_library_map[
metric_data.instrumentation_info
metric.instrumentation_info
] = InstrumentationLibraryMetrics(
instrumentation_library=InstrumentationLibrary(
name=metric_data.instrumentation_info.name,
version=metric_data.instrumentation_info.version,
name=metric.instrumentation_info.name,
version=metric.instrumentation_info.version,
)
)
else:
instrumentation_library_map[
metric_data.instrumentation_info
metric.instrumentation_info
] = InstrumentationLibraryMetrics()

instrumentation_library_metrics = instrumentation_library_map.get(
metric_data.instrumentation_info
metric.instrumentation_info
)

instrumentation_library_metrics.metrics.append(
Expand All @@ -128,7 +126,8 @@ def _translate_data(
)
)

def export(self, metrics: Sequence[MetricData]) -> MetricExportResult:
def export(self, metrics: Sequence[Metric]) -> MetricExportResult:
print("Got metrics!!!\n", metrics)
return self._export(metrics)

def shutdown(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,12 @@
MetricsServiceServicer,
add_MetricsServiceServicer_to_server,
)
from opentelemetry.sdk._metrics.data import Metric, MetricData
from opentelemetry.sdk._metrics.export import MetricExportResult
from opentelemetry.sdk._metrics.point import (
AggregationTemporality,
Metric,
Sum,
)
from opentelemetry.sdk.environment_variables import (
OTEL_EXPORTER_OTLP_METRICS_INSECURE,
)
Expand Down Expand Up @@ -96,14 +100,25 @@ def setUp(self):

self.server.start()

self.metric_data_1 = MetricData(
metric=Metric(
self.metric_data_1 = [
Metric(
resource=SDKResource({"key": "value"}),
instrumentation_info=InstrumentationInfo(
"first_name", "first_version"
),
attributes={},
description="foo",
name="foometric",
unit="s",
point=Sum(
aggregation_temporality=AggregationTemporality.CUMULATIVE,
is_monotonic=True,
start_time_unix_nano=1641946015139533244,
time_unix_nano=1641946016139533244,
value=33,
),
),
instrumentation_info=InstrumentationInfo(
"first_name", "first_version"
),
)
]

def tearDown(self):
self.server.stop(None)
Expand Down Expand Up @@ -219,7 +234,7 @@ def test_unavailable(self, mock_sleep, mock_expo):
MetricsServiceServicerUNAVAILABLE(), self.server
)
self.assertEqual(
self.exporter.export([self.metric_data_1]),
self.exporter.export(self.metric_data_1),
MetricExportResult.FAILURE,
)
mock_sleep.assert_called_with(1)
Expand All @@ -234,7 +249,7 @@ def test_unavailable_delay(self, mock_sleep, mock_expo):
MetricsServiceServicerUNAVAILABLEDelay(), self.server
)
self.assertEqual(
self.exporter.export([self.metric_data_1]),
self.exporter.export(self.metric_data_1),
MetricExportResult.FAILURE,
)
mock_sleep.assert_called_with(4)
Expand All @@ -244,7 +259,7 @@ def test_success(self):
MetricsServiceServicerSUCCESS(), self.server
)
self.assertEqual(
self.exporter.export([self.metric_data_1]),
self.exporter.export(self.metric_data_1),
MetricExportResult.SUCCESS,
)

Expand All @@ -253,6 +268,6 @@ def test_failure(self):
MetricsServiceServicerALREADY_EXISTS(), self.server
)
self.assertEqual(
self.exporter.export([self.metric_data_1]),
self.exporter.export(self.metric_data_1),
MetricExportResult.FAILURE,
)
16 changes: 7 additions & 9 deletions opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,21 @@

from abc import ABC, abstractmethod
from collections import OrderedDict
from enum import IntEnum
from logging import getLogger
from math import inf
from threading import Lock
from typing import Generic, Optional, Sequence, TypeVar

from opentelemetry.sdk._metrics.measurement import Measurement
from opentelemetry.sdk._metrics.point import Gauge, Histogram, PointT, Sum
from opentelemetry.sdk._metrics.point import (
AggregationTemporality,
Gauge,
Histogram,
PointT,
Sum,
)
from opentelemetry.util._time import _time_ns


class AggregationTemporality(IntEnum):
UNSPECIFIED = 0
DELTA = 1
CUMULATIVE = 2


_PointVarT = TypeVar("_PointVarT", bound=PointT)

_logger = getLogger(__name__)
Expand Down
35 changes: 0 additions & 35 deletions opentelemetry-sdk/src/opentelemetry/sdk/_metrics/data.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from sys import stdout
from typing import IO, Callable, Sequence

from opentelemetry.sdk._metrics.data import Metric, MetricData
from opentelemetry.sdk._metrics.point import Metric


class MetricExportResult(Enum):
Expand All @@ -33,7 +33,8 @@ class MetricExporter(ABC):
in their own format.
"""

def export(self, metrics: Sequence[MetricData]) -> "MetricExportResult":
@abstractmethod
def export(self, metrics: Sequence[Metric]) -> "MetricExportResult":
"""Exports a batch of telemetry data.
Args:
Expand Down Expand Up @@ -68,9 +69,9 @@ def __init__(
self.out = out
self.formatter = formatter

def export(self, metrics: Sequence[MetricData]) -> MetricExportResult:
for data in metrics:
self.out.write(self.formatter(data.metric))
def export(self, metrics: Sequence[Metric]) -> MetricExportResult:
for metric in metrics:
self.out.write(self.formatter(metric))
self.out.flush()
return MetricExportResult.SUCCESS

Expand Down
40 changes: 37 additions & 3 deletions opentelemetry-sdk/src/opentelemetry/sdk/_metrics/point.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,27 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from dataclasses import dataclass
import json
from dataclasses import asdict, dataclass
from enum import IntEnum
from typing import Sequence, Union

from opentelemetry.sdk.resources import Attributes, Resource
from opentelemetry.sdk.util.instrumentation import InstrumentationInfo


class AggregationTemporality(IntEnum):
UNSPECIFIED = 0
DELTA = 1
CUMULATIVE = 2


@dataclass(frozen=True)
class Sum:
start_time_unix_nano: int
time_unix_nano: int
value: Union[int, float]
aggregation_temporality: int
aggregation_temporality: AggregationTemporality
is_monotonic: bool


Expand All @@ -37,7 +48,30 @@ class Histogram:
time_unix_nano: int
bucket_counts: Sequence[int]
explicit_bounds: Sequence[float]
aggregation_temporality: int
aggregation_temporality: AggregationTemporality


PointT = Union[Sum, Gauge, Histogram]


@dataclass(frozen=True)
class Metric:
"""Represents a metric point in the OpenTelemetry data model to be exported
Concrete metric types contain all the information as in the OTLP proto definitions
(https://tinyurl.com/7h6yx24v) but are flattened as much as possible.
"""

# common fields to all metric kinds
attributes: Attributes
description: str
instrumentation_info: InstrumentationInfo
name: str
resource: Resource
unit: str

point: PointT
"""Contains non-common fields for the given metric"""

def to_json(self) -> str:
return json.dumps(asdict(self))

0 comments on commit 9cfa7eb

Please sign in to comment.