Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add last_updated_timestamp for observers #522

Merged
merged 16 commits into from
Apr 2, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,9 @@ def get_collector_metric_type(metric: Metric) -> metrics_pb2.MetricDescriptor:
def get_collector_point(metric_record: MetricRecord) -> metrics_pb2.Point:
# TODO: horrible hack to get original list of keys to then get the bound
# instrument
key = dict(metric_record.labels)
point = metrics_pb2.Point(
timestamp=utils.proto_timestamp_from_time_ns(
metric_record.metric.bind(key).last_update_timestamp
metric_record.aggregator.last_update_timestamp
lzchen marked this conversation as resolved.
Show resolved Hide resolved
)
)
if metric_record.metric.value_type == int:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,13 +178,11 @@ def test_translate_to_collector(self):
self.assertEqual(len(output_metrics[0].timeseries[0].points), 1)
self.assertEqual(
output_metrics[0].timeseries[0].points[0].timestamp.seconds,
record.metric.bind(self._labels).last_update_timestamp
// 1000000000,
record.aggregator.last_update_timestamp // 1000000000,
)
self.assertEqual(
output_metrics[0].timeseries[0].points[0].timestamp.nanos,
record.metric.bind(self._labels).last_update_timestamp
% 1000000000,
record.aggregator.last_update_timestamp % 1000000000,
)
self.assertEqual(
output_metrics[0].timeseries[0].points[0].int64_value, 123
Expand Down
14 changes: 3 additions & 11 deletions opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from opentelemetry.sdk.metrics.export.batcher import UngroupedBatcher
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.util.instrumentation import InstrumentationInfo
from opentelemetry.util import time_ns

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -54,7 +53,6 @@ def __init__(
self.value_type = value_type
self.enabled = enabled
self.aggregator = aggregator
self.last_update_timestamp = time_ns()
self._ref_count = 0
self._ref_count_lock = threading.Lock()

Expand All @@ -69,7 +67,6 @@ def _validate_update(self, value: metrics_api.ValueT) -> bool:
return True

def update(self, value: metrics_api.ValueT):
self.last_update_timestamp = time_ns()
self.aggregator.update(value)

def release(self):
Expand All @@ -88,10 +85,8 @@ def ref_count(self):
return self._ref_count

def __repr__(self):
return '{}(data="{}", last_update_timestamp={})'.format(
type(self).__name__,
self.aggregator.current,
self.last_update_timestamp,
return '{}(data="{}")'.format(
type(self).__name__, self.aggregator.current
)


Expand Down Expand Up @@ -331,7 +326,6 @@ def _collect_observers(self) -> None:
if not observer.enabled:
continue

# TODO: capture timestamp?
if not observer.run():
continue

Expand Down Expand Up @@ -404,9 +398,7 @@ def unregister_observer(self, observer: "Observer") -> None:


class MeterProvider(metrics_api.MeterProvider):
def __init__(
self, resource: Resource = Resource.create_empty(),
):
def __init__(self, resource: Resource = Resource.create_empty()):
lzchen marked this conversation as resolved.
Show resolved Hide resolved
self.resource = resource

def get_meter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import threading
from collections import namedtuple

from opentelemetry.util import time_ns


class Aggregator(abc.ABC):
"""Base class for aggregators.
Expand Down Expand Up @@ -49,10 +51,12 @@ def __init__(self):
self.current = 0
self.checkpoint = 0
self._lock = threading.Lock()
self.last_update_timestamp = None

def update(self, value):
with self._lock:
self.current += value
self.last_update_timestamp = time_ns()

def take_checkpoint(self):
with self._lock:
Expand All @@ -62,6 +66,9 @@ def take_checkpoint(self):
def merge(self, other):
with self._lock:
self.checkpoint += other.checkpoint
self.last_update_timestamp = get_latest_timestamp(
self.last_update_timestamp, other.last_update_timestamp
)


class MinMaxSumCountAggregator(Aggregator):
Expand All @@ -88,6 +95,7 @@ def __init__(self):
self.current = self._EMPTY
self.checkpoint = self._EMPTY
self._lock = threading.Lock()
self.last_update_timestamp = None

def update(self, value):
with self._lock:
Expand All @@ -100,6 +108,7 @@ def update(self, value):
self.current.sum + value,
self.current.count + 1,
)
self.last_update_timestamp = time_ns()

def take_checkpoint(self):
with self._lock:
Expand All @@ -111,6 +120,9 @@ def merge(self, other):
self.checkpoint = self._merge_checkpoint(
self.checkpoint, other.checkpoint
)
self.last_update_timestamp = get_latest_timestamp(
self.last_update_timestamp, other.last_update_timestamp
)


class ObserverAggregator(Aggregator):
Expand All @@ -123,20 +135,32 @@ def __init__(self):
self.mmsc = MinMaxSumCountAggregator()
self.current = None
self.checkpoint = self._TYPE(None, None, None, 0, None)
self.last_update_timestamp = None

def update(self, value):
self.mmsc.update(value)
self.current = value
self.last_update_timestamp = time_ns()

def take_checkpoint(self):
self.mmsc.take_checkpoint()
self.checkpoint = self._TYPE(*(self.mmsc.checkpoint + (self.current,)))

def merge(self, other):
self.mmsc.merge(other.mmsc)
self.checkpoint = self._TYPE(
*(
self.mmsc.checkpoint
+ (other.checkpoint.last or self.checkpoint.last,)
)
last = self.checkpoint.last
self.last_update_timestamp = get_latest_timestamp(
self.last_update_timestamp, other.last_update_timestamp
)
if self.last_update_timestamp == other.last_update_timestamp:
last = other.checkpoint.last
self.checkpoint = self._TYPE(*(self.mmsc.checkpoint + (last,)))


def get_latest_timestamp(time_stamp, other_timestamp):
if time_stamp is None:
return other_timestamp
if other_timestamp is not None:
if time_stamp < other_timestamp:
return other_timestamp
return time_stamp
94 changes: 91 additions & 3 deletions opentelemetry-sdk/tests/metrics/export/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,11 +228,14 @@ def call_update(counter):
update_total += val
return update_total

def test_update(self):
@mock.patch("opentelemetry.sdk.metrics.export.aggregate.time_ns")
def test_update(self, time_mock):
time_mock.return_value = 123
counter = CounterAggregator()
counter.update(1.0)
counter.update(2.0)
self.assertEqual(counter.current, 3.0)
self.assertEqual(counter.last_update_timestamp, 123)

def test_checkpoint(self):
counter = CounterAggregator()
Expand All @@ -246,8 +249,10 @@ def test_merge(self):
counter2 = CounterAggregator()
counter.checkpoint = 1.0
counter2.checkpoint = 3.0
counter2.last_update_timestamp = 123
counter.merge(counter2)
self.assertEqual(counter.checkpoint, 4.0)
self.assertEqual(counter.last_update_timestamp, 123)

def test_concurrent_update(self):
counter = CounterAggregator()
Expand Down Expand Up @@ -296,7 +301,9 @@ def call_update(mmsc):
count_ += 1
return MinMaxSumCountAggregator._TYPE(min_, max_, sum_, count_)

def test_update(self):
@mock.patch("opentelemetry.sdk.metrics.export.aggregate.time_ns")
def test_update(self, time_mock):
time_mock.return_value = 123
mmsc = MinMaxSumCountAggregator()
# test current values without any update
self.assertEqual(mmsc.current, MinMaxSumCountAggregator._EMPTY)
Expand All @@ -309,6 +316,7 @@ def test_update(self):
self.assertEqual(
mmsc.current, (min(values), max(values), sum(values), len(values))
)
self.assertEqual(mmsc.last_update_timestamp, 123)

def test_checkpoint(self):
mmsc = MinMaxSumCountAggregator()
Expand Down Expand Up @@ -340,6 +348,9 @@ def test_merge(self):
mmsc1.checkpoint = checkpoint1
mmsc2.checkpoint = checkpoint2

mmsc1.last_update_timestamp = 100
mmsc2.last_update_timestamp = 123

mmsc1.merge(mmsc2)

self.assertEqual(
Expand All @@ -348,6 +359,7 @@ def test_merge(self):
checkpoint1, checkpoint2
),
)
self.assertEqual(mmsc1.last_update_timestamp, 123)

def test_merge_checkpoint(self):
func = MinMaxSumCountAggregator._merge_checkpoint
Expand Down Expand Up @@ -421,7 +433,9 @@ def test_concurrent_update_and_checkpoint(self):


class TestObserverAggregator(unittest.TestCase):
def test_update(self):
@mock.patch("opentelemetry.sdk.metrics.export.aggregate.time_ns")
def test_update(self, time_mock):
time_mock.return_value = 123
observer = ObserverAggregator()
# test current values without any update
self.assertEqual(observer.mmsc.current, (None, None, None, 0))
Expand All @@ -436,6 +450,7 @@ def test_update(self):
observer.mmsc.current,
(min(values), max(values), sum(values), len(values)),
)
self.assertEqual(observer.last_update_timestamp, 123)

self.assertEqual(observer.current, values[-1])

Expand Down Expand Up @@ -471,6 +486,77 @@ def test_merge(self):
observer1.mmsc.checkpoint = mmsc_checkpoint1
observer2.mmsc.checkpoint = mmsc_checkpoint2

observer1.last_update_timestamp = 100
observer2.last_update_timestamp = 123

observer1.checkpoint = checkpoint1
observer2.checkpoint = checkpoint2

observer1.merge(observer2)

self.assertEqual(
observer1.checkpoint,
(
min(checkpoint1.min, checkpoint2.min),
max(checkpoint1.max, checkpoint2.max),
checkpoint1.sum + checkpoint2.sum,
checkpoint1.count + checkpoint2.count,
checkpoint2.last,
),
)
self.assertEqual(observer1.last_update_timestamp, 123)

def test_merge_last_updated(self):
observer1 = ObserverAggregator()
observer2 = ObserverAggregator()

mmsc_checkpoint1 = MinMaxSumCountAggregator._TYPE(3, 150, 101, 3)
mmsc_checkpoint2 = MinMaxSumCountAggregator._TYPE(1, 33, 44, 2)

checkpoint1 = ObserverAggregator._TYPE(*(mmsc_checkpoint1 + (23,)))

checkpoint2 = ObserverAggregator._TYPE(*(mmsc_checkpoint2 + (27,)))

observer1.mmsc.checkpoint = mmsc_checkpoint1
observer2.mmsc.checkpoint = mmsc_checkpoint2

observer1.last_update_timestamp = 123
observer2.last_update_timestamp = 100

observer1.checkpoint = checkpoint1
observer2.checkpoint = checkpoint2

observer1.merge(observer2)

self.assertEqual(
observer1.checkpoint,
(
min(checkpoint1.min, checkpoint2.min),
max(checkpoint1.max, checkpoint2.max),
checkpoint1.sum + checkpoint2.sum,
checkpoint1.count + checkpoint2.count,
checkpoint1.last,
),
)
self.assertEqual(observer1.last_update_timestamp, 123)

def test_merge_last_updated_none(self):
observer1 = ObserverAggregator()
observer2 = ObserverAggregator()

mmsc_checkpoint1 = MinMaxSumCountAggregator._TYPE(3, 150, 101, 3)
mmsc_checkpoint2 = MinMaxSumCountAggregator._TYPE(1, 33, 44, 2)

checkpoint1 = ObserverAggregator._TYPE(*(mmsc_checkpoint1 + (23,)))

checkpoint2 = ObserverAggregator._TYPE(*(mmsc_checkpoint2 + (27,)))

observer1.mmsc.checkpoint = mmsc_checkpoint1
observer2.mmsc.checkpoint = mmsc_checkpoint2

observer1.last_update_timestamp = None
observer2.last_update_timestamp = 100

observer1.checkpoint = checkpoint1
observer2.checkpoint = checkpoint2

Expand All @@ -486,6 +572,7 @@ def test_merge(self):
checkpoint2.last,
),
)
self.assertEqual(observer1.last_update_timestamp, 100)

def test_merge_with_empty(self):
observer1 = ObserverAggregator()
Expand All @@ -496,6 +583,7 @@ def test_merge_with_empty(self):

observer1.mmsc.checkpoint = mmsc_checkpoint1
observer1.checkpoint = checkpoint1
observer1.last_update_timestamp = 100

observer1.merge(observer2)

Expand Down
10 changes: 2 additions & 8 deletions opentelemetry-sdk/tests/metrics/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,13 +368,10 @@ def test_add_incorrect_type(self, logger_mock):
self.assertEqual(bound_counter.aggregator.current, 0)
self.assertTrue(logger_mock.warning.called)

@mock.patch("opentelemetry.sdk.metrics.time_ns")
def test_update(self, time_mock):
def test_update(self):
aggregator = export.aggregate.CounterAggregator()
bound_counter = metrics.BoundCounter(int, True, aggregator)
time_mock.return_value = 123
bound_counter.update(4.0)
self.assertEqual(bound_counter.last_update_timestamp, 123)
self.assertEqual(bound_counter.aggregator.current, 4.0)


Expand Down Expand Up @@ -403,11 +400,8 @@ def test_record_incorrect_type(self, logger_mock):
)
self.assertTrue(logger_mock.warning.called)

@mock.patch("opentelemetry.sdk.metrics.time_ns")
def test_update(self, time_mock):
def test_update(self):
aggregator = export.aggregate.MinMaxSumCountAggregator()
bound_measure = metrics.BoundMeasure(int, True, aggregator)
time_mock.return_value = 123
bound_measure.update(4.0)
self.assertEqual(bound_measure.last_update_timestamp, 123)
self.assertEqual(bound_measure.aggregator.current, (4.0, 4.0, 4.0, 1))