Skip to content

Commit

Permalink
sdk: Implement observer instrument (#425)
Browse files Browse the repository at this point in the history
Observer instruments are used to capture a current set of values at a point in
time [1].

This commit extends the Meter interface to allow to register an observer
instrument by pasing a callback that will be executed at collection time.
The logic inside collection is updated to consider these instruments and
a new ObserverAggregator is implemented.

[1] https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/api-metrics.md#observer-instruments
  • Loading branch information
mauriciovasquezbernal authored Mar 2, 2020
1 parent 344d72b commit 888bed9
Show file tree
Hide file tree
Showing 13 changed files with 517 additions and 191 deletions.
72 changes: 72 additions & 0 deletions examples/metrics/observer_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# Copyright 2020, OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
This example shows how the Observer metric instrument can be used to capture
asynchronous metrics data.
"""
import psutil

from opentelemetry import metrics
from opentelemetry.sdk.metrics import LabelSet, MeterProvider
from opentelemetry.sdk.metrics.export import ConsoleMetricsExporter
from opentelemetry.sdk.metrics.export.batcher import UngroupedBatcher
from opentelemetry.sdk.metrics.export.controller import PushController

# Configure a stateful batcher
batcher = UngroupedBatcher(stateful=True)

metrics.set_preferred_meter_provider_implementation(lambda _: MeterProvider())
meter = metrics.get_meter(__name__)

# Exporter to export metrics to the console
exporter = ConsoleMetricsExporter()

# Configure a push controller
controller = PushController(meter=meter, exporter=exporter, interval=2)


# Callback to gather cpu usage
def get_cpu_usage_callback(observer):
for (number, percent) in enumerate(psutil.cpu_percent(percpu=True)):
label_set = meter.get_label_set({"cpu_number": str(number)})
observer.observe(percent, label_set)


meter.register_observer(
callback=get_cpu_usage_callback,
name="cpu_percent",
description="per-cpu usage",
unit="1",
value_type=float,
label_keys=("cpu_number",),
)


# Callback to gather RAM memory usage
def get_ram_usage_callback(observer):
ram_percent = psutil.virtual_memory().percent
observer.observe(ram_percent, LabelSet())


meter.register_observer(
callback=get_ram_usage_callback,
name="ram_percent",
description="RAM memory usage",
unit="1",
value_type=float,
label_keys=(),
)

input("Press a key to finish...\n")
16 changes: 13 additions & 3 deletions examples/metrics/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,25 @@
exporter = ConsoleMetricsExporter()
# controller collects metrics created from meter and exports it via the
# exporter every interval
controller = PushController(meter, exporter, 5)
controller = PushController(meter=meter, exporter=exporter, interval=5)

# Example to show how to record using the meter
counter = meter.create_metric(
"requests", "number of requests", 1, int, Counter, ("environment",)
name="requests",
description="number of requests",
unit="1",
value_type=int,
metric_type=Counter,
label_keys=("environment",),
)

counter2 = meter.create_metric(
"clicks", "number of clicks", 1, int, Counter, ("environment",)
name="clicks",
description="number of clicks",
unit="1",
value_type=int,
metric_type=Counter,
label_keys=("environment",),
)

# Labelsets are used to identify key-values that are associated with a specific
Expand Down
27 changes: 18 additions & 9 deletions examples/metrics/simple_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,30 @@ def usage(argv):

# Metric instruments allow to capture measurements
requests_counter = meter.create_metric(
"requests", "number of requests", 1, int, Counter, ("environment",)
name="requests",
description="number of requests",
unit="1",
value_type=int,
metric_type=Counter,
label_keys=("environment",),
)

clicks_counter = meter.create_metric(
"clicks", "number of clicks", 1, int, Counter, ("environment",)
name="clicks",
description="number of clicks",
unit="1",
value_type=int,
metric_type=Counter,
label_keys=("environment",),
)

requests_size = meter.create_metric(
"requests_size", "size of requests", 1, int, Measure, ("environment",)
name="requests_size",
description="size of requests",
unit="1",
value_type=int,
metric_type=Measure,
label_keys=("environment",),
)

# Labelsets are used to identify key-values that are associated with a specific
Expand All @@ -82,21 +97,15 @@ def usage(argv):
# Update the metric instruments using the direct calling convention
requests_size.record(100, staging_label_set)
requests_counter.add(25, staging_label_set)
# Sleep for 5 seconds, exported value should be 25
time.sleep(5)

requests_size.record(5000, staging_label_set)
requests_counter.add(50, staging_label_set)
# Exported value should be 75
time.sleep(5)

requests_size.record(2, testing_label_set)
requests_counter.add(35, testing_label_set)
# There should be two exported values 75 and 35, one for each labelset
time.sleep(5)

clicks_counter.add(5, staging_label_set)
# There should be three exported values, labelsets can be reused for different
# metrics but will be recorded seperately, 75, 35 and 5

time.sleep(5)
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,10 @@
REGISTRY,
CollectorRegistry,
CounterMetricFamily,
GaugeMetricFamily,
UnknownMetricFamily,
)

from opentelemetry.metrics import Counter, Gauge, Measure, Metric
from opentelemetry.metrics import Counter, Measure, Metric
from opentelemetry.sdk.metrics.export import (
MetricRecord,
MetricsExporter,
Expand Down Expand Up @@ -112,17 +111,6 @@ def _translate_to_prometheus(self, metric_record: MetricRecord):
prometheus_metric.add_metric(
labels=label_values, value=metric_record.aggregator.checkpoint
)

elif isinstance(metric_record.metric, Gauge):
prometheus_metric = GaugeMetricFamily(
name=metric_name,
documentation=metric_record.metric.description,
labels=label_keys,
)
prometheus_metric.add_metric(
labels=label_values, value=metric_record.aggregator.checkpoint
)

# TODO: Add support for histograms when supported in OT
elif isinstance(metric_record.metric, Measure):
prometheus_metric = UnknownMetricFamily(
Expand Down
127 changes: 76 additions & 51 deletions opentelemetry-api/src/opentelemetry/metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,6 @@ def add(self, value: ValueT) -> None:
value: The value to add to the handle.
"""

def set(self, value: ValueT) -> None:
"""No-op implementation of `GaugeHandle` set.
Args:
value: The value to set to the handle.
"""

def record(self, value: ValueT) -> None:
"""No-op implementation of `MeasureHandle` record.
Expand All @@ -74,15 +67,6 @@ def add(self, value: ValueT) -> None:
"""


class GaugeHandle:
def set(self, value: ValueT) -> None:
"""Sets the current value of the handle to ``value``.
Args:
value: The value to set to the handle.
"""


class MeasureHandle:
def record(self, value: ValueT) -> None:
"""Records the given ``value`` to this handle.
Expand Down Expand Up @@ -124,7 +108,7 @@ def get_handle(self, label_set: LabelSet) -> "object":
Handles are useful to reduce the cost of repeatedly recording a metric
with a pre-defined set of label values. All metric kinds (counter,
gauge, measure) support declaring a set of required label keys. The
measure) support declaring a set of required label keys. The
values corresponding to these keys should be specified in every handle.
"Unspecified" label values, in cases where a handle is requested but
a value was not provided are permitted.
Expand Down Expand Up @@ -153,14 +137,6 @@ def add(self, value: ValueT, label_set: LabelSet) -> None:
label_set: `LabelSet` to associate with the returned handle.
"""

def set(self, value: ValueT, label_set: LabelSet) -> None:
"""No-op implementation of `Gauge` set.
Args:
value: The value to set the gauge metric to.
label_set: `LabelSet` to associate with the returned handle.
"""

def record(self, value: ValueT, label_set: LabelSet) -> None:
"""No-op implementation of `Measure` record.
Expand All @@ -186,28 +162,6 @@ def add(self, value: ValueT, label_set: LabelSet) -> None:
"""


class Gauge(Metric):
"""A gauge type metric that expresses a pre-calculated value.
Gauge metrics have a value that is either ``Set`` by explicit
instrumentation or observed through a callback. This kind of metric
should be used when the metric cannot be expressed as a sum or because
the measurement interval is arbitrary.
"""

def get_handle(self, label_set: LabelSet) -> "GaugeHandle":
"""Gets a `GaugeHandle`."""
return GaugeHandle()

def set(self, value: ValueT, label_set: LabelSet) -> None:
"""Sets the value of the gauge to ``value``.
Args:
value: The value to set the gauge metric to.
label_set: `LabelSet` to associate with the returned handle.
"""


class Measure(Metric):
"""A measure type metric that represent raw stats that are recorded.
Expand All @@ -227,6 +181,37 @@ def record(self, value: ValueT, label_set: LabelSet) -> None:
"""


class Observer(abc.ABC):
"""An observer type metric instrument used to capture a current set of values.
Observer instruments are asynchronous, a callback is invoked with the
observer instrument as argument allowing the user to capture multiple
values per collection interval.
"""

@abc.abstractmethod
def observe(self, value: ValueT, label_set: LabelSet) -> None:
"""Captures ``value`` to the observer.
Args:
value: The value to capture to this observer metric.
label_set: `LabelSet` associated to ``value``.
"""


class DefaultObserver(Observer):
"""No-op implementation of ``Observer``."""

def observe(self, value: ValueT, label_set: LabelSet) -> None:
"""Captures ``value`` to the observer.
Args:
value: The value to capture to this observer metric.
label_set: `LabelSet` associated to ``value``.
"""


class MeterProvider(abc.ABC):
@abc.abstractmethod
def get_meter(
Expand Down Expand Up @@ -277,15 +262,16 @@ def get_meter(
return DefaultMeter()


MetricT = TypeVar("MetricT", Counter, Gauge, Measure)
MetricT = TypeVar("MetricT", Counter, Measure, Observer)
ObserverCallbackT = Callable[[Observer], None]


# pylint: disable=unused-argument
class Meter(abc.ABC):
"""An interface to allow the recording of metrics.
`Metric` s are used for recording pre-defined aggregation (gauge and
counter), or raw values (measure) in which the aggregation and labels
`Metric` s are used for recording pre-defined aggregation (counter),
or raw values (measure) in which the aggregation and labels
for the exported metric are deferred.
"""

Expand Down Expand Up @@ -325,14 +311,41 @@ def create_metric(
Args:
name: The name of the metric.
description: Human-readable description of the metric.
unit: Unit of the metric values.
unit: Unit of the metric values following the UCUM convention
(https://unitsofmeasure.org/ucum.html).
value_type: The type of values being recorded by the metric.
metric_type: The type of metric being created.
label_keys: The keys for the labels with dynamic values.
enabled: Whether to report the metric by default.
Returns: A new ``metric_type`` metric with values of ``value_type``.
"""

@abc.abstractmethod
def register_observer(
self,
callback: ObserverCallbackT,
name: str,
description: str,
unit: str,
value_type: Type[ValueT],
label_keys: Sequence[str] = (),
enabled: bool = True,
) -> "Observer":
"""Registers an ``Observer`` metric instrument.
Args:
callback: Callback invoked each collection interval with the
observer as argument.
name: The name of the metric.
description: Human-readable description of the metric.
unit: Unit of the metric values following the UCUM convention
(https://unitsofmeasure.org/ucum.html).
value_type: The type of values being recorded by the metric.
label_keys: The keys for the labels with dynamic values.
enabled: Whether to report the metric by default.
Returns: A new ``Observer`` metric instrument.
"""

@abc.abstractmethod
def get_label_set(self, labels: Dict[str, str]) -> "LabelSet":
"""Gets a `LabelSet` with the given labels.
Expand Down Expand Up @@ -367,6 +380,18 @@ def create_metric(
# pylint: disable=no-self-use
return DefaultMetric()

def register_observer(
self,
callback: ObserverCallbackT,
name: str,
description: str,
unit: str,
value_type: Type[ValueT],
label_keys: Sequence[str] = (),
enabled: bool = True,
) -> "Observer":
return DefaultObserver()

def get_label_set(self, labels: Dict[str, str]) -> "LabelSet":
# pylint: disable=no-self-use
return DefaultLabelSet()
Expand Down
Loading

0 comments on commit 888bed9

Please sign in to comment.