Skip to content

Commit

Permalink
Add timeouts to metric SDK (#2653)
Browse files Browse the repository at this point in the history
* Add timeouts to metric SDK

* comments

* don't use TimeoutError as it is intended for OS related timeouts

* changelog and typo

* isort

* fix _time_ns import

* Update CHANGELOG.md

Co-authored-by: Srikanth Chekuri <srikanth.chekuri92@gmail.com>

* use self.fail in tests

Co-authored-by: Srikanth Chekuri <srikanth.chekuri92@gmail.com>
  • Loading branch information
aabmass and srikanthccv authored May 6, 2022
1 parent 7397605 commit a72c7de
Show file tree
Hide file tree
Showing 10 changed files with 196 additions and 42 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased](https://github.com/open-telemetry/opentelemetry-python/compare/v1.11.1-0.30b1...HEAD)

- Add timeouts to metric SDK
([#2653](https://github.com/open-telemetry/opentelemetry-python/pull/2653))
- Add variadic arguments to metric exporter/reader interfaces
([#2654](https://github.com/open-telemetry/opentelemetry-python/pull/2654))
- Move Metrics API behind internal package
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,13 @@ def _translate_data(
)

def export(
self, metrics: Sequence[Metric], *args, **kwargs
self,
metrics: Sequence[Metric],
timeout_millis: float = 10_000,
**kwargs,
) -> MetricExportResult:
# TODO(#2663): OTLPExporterMixin should pass timeout to gRPC
return self._export(metrics)

def shutdown(self, *args, **kwargs):
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
pass
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,17 @@ def __init__(self, prefix: str = "") -> None:
self._collector._callback = self.collect

def _receive_metrics(
self, metrics: Iterable[Metric], *args, **kwargs
self,
metrics: Iterable[Metric],
timeout_millis: float = 10_000,
**kwargs,
) -> None:
if metrics is None:
return
self._collector.add_metrics_data(metrics)

def shutdown(self, *args, **kwargs) -> bool:
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
REGISTRY.unregister(self._collector)
return True


class _CustomCollector:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.util.instrumentation import InstrumentationScope
from opentelemetry.util._once import Once
from opentelemetry.util._time import _time_ns

_logger = getLogger(__name__)

Expand Down Expand Up @@ -369,16 +370,20 @@ def __init__(
self._shutdown_once = Once()
self._shutdown = False

def force_flush(self) -> bool:

# FIXME implement a timeout
def force_flush(self, timeout_millis: float = 10_000) -> bool:
deadline_ns = _time_ns() + timeout_millis * 10**6

for metric_reader in self._sdk_config.metric_readers:
metric_reader.collect()
current_ts = _time_ns()
if current_ts >= deadline_ns:
raise Exception("Timed out while flushing metric readers")
metric_reader.collect(
timeout_millis=(deadline_ns - current_ts) / 10**6
)
return True

def shutdown(self):
# FIXME implement a timeout
def shutdown(self, timeout_millis: float = 30_000):
deadline_ns = _time_ns() + timeout_millis * 10**6

def _shutdown():
self._shutdown = True
Expand All @@ -392,8 +397,15 @@ def _shutdown():
metric_reader_error = {}

for metric_reader in self._sdk_config.metric_readers:
current_ts = _time_ns()
try:
metric_reader.shutdown()
if current_ts >= deadline_ns:
raise Exception(
"Didn't get to execute, deadline already exceeded"
)
metric_reader.shutdown(
timeout_millis=(deadline_ns - current_ts) / 10**6
)

# pylint: disable=broad-except
except Exception as error:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from opentelemetry.sdk._metrics.metric_reader import MetricReader
from opentelemetry.sdk._metrics.point import AggregationTemporality, Metric
from opentelemetry.util._once import Once
from opentelemetry.util._time import _time_ns

_logger = logging.getLogger(__name__)

Expand All @@ -53,8 +54,11 @@ class MetricExporter(ABC):

@abstractmethod
def export(
self, metrics: Sequence[Metric], *args, **kwargs
) -> "MetricExportResult":
self,
metrics: Sequence[Metric],
timeout_millis: float = 10_000,
**kwargs,
) -> MetricExportResult:
"""Exports a batch of telemetry data.
Args:
Expand All @@ -65,7 +69,7 @@ def export(
"""

@abstractmethod
def shutdown(self, *args, **kwargs) -> None:
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
"""Shuts down the exporter.
Called when the SDK is shut down.
Expand All @@ -90,14 +94,17 @@ def __init__(
self.formatter = formatter

def export(
self, metrics: Sequence[Metric], *args, **kwargs
self,
metrics: Sequence[Metric],
timeout_millis: float = 10_000,
**kwargs,
) -> MetricExportResult:
for metric in metrics:
self.out.write(self.formatter(metric))
self.out.flush()
return MetricExportResult.SUCCESS

def shutdown(self, *args, **kwargs) -> None:
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
pass


Expand Down Expand Up @@ -127,11 +134,16 @@ def get_metrics(self) -> List[Metric]:
self._metrics = []
return metrics

def _receive_metrics(self, metrics: Iterable[Metric], *args, **kwargs):
def _receive_metrics(
self,
metrics: Iterable[Metric],
timeout_millis: float = 10_000,
**kwargs,
) -> None:
with self._lock:
self._metrics = list(metrics)

def shutdown(self, *args, **kwargs):
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
pass


Expand Down Expand Up @@ -193,23 +205,28 @@ def _at_fork_reinit(self):
def _ticker(self) -> None:
interval_secs = self._export_interval_millis / 1e3
while not self._shutdown_event.wait(interval_secs):
self.collect()
self.collect(timeout_millis=self._export_timeout_millis)
# one last collection below before shutting down completely
self.collect()
self.collect(timeout_millis=self._export_interval_millis)

def _receive_metrics(
self, metrics: Iterable[Metric], *args, **kwargs
self,
metrics: Iterable[Metric],
timeout_millis: float = 10_000,
**kwargs,
) -> None:
if metrics is None:
return
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
try:
self._exporter.export(metrics)
self._exporter.export(metrics, timeout_millis=timeout_millis)
except Exception as e: # pylint: disable=broad-except,invalid-name
_logger.exception("Exception while exporting metrics %s", str(e))
detach(token)

def shutdown(self, *args, **kwargs):
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
deadline_ns = _time_ns() + timeout_millis * 10**6

def _shutdown():
self._shutdown = True

Expand All @@ -219,5 +236,5 @@ def _shutdown():
return

self._shutdown_event.set()
self._daemon_thread.join()
self._exporter.shutdown()
self._daemon_thread.join(timeout=(deadline_ns - _time_ns()) / 10**9)
self._exporter.shutdown(timeout=(deadline_ns - _time_ns()) / 10**6)
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def __init__(
self._instrument_class_aggregation.update(preferred_aggregation or {})

@final
def collect(self) -> None:
def collect(self, timeout_millis: float = 10_000) -> None:
"""Collects the metrics from the internal SDK state and
invokes the `_receive_metrics` with the collection.
"""
Expand All @@ -148,7 +148,8 @@ def collect(self) -> None:
)
return
self._receive_metrics(
self._collect(self, self._instrument_class_temporality)
self._collect(self, self._instrument_class_temporality),
timeout_millis=timeout_millis,
)

@final
Expand All @@ -162,11 +163,16 @@ def _set_collect_callback(
self._collect = func

@abstractmethod
def _receive_metrics(self, metrics: Iterable[Metric], *args, **kwargs):
def _receive_metrics(
self,
metrics: Iterable[Metric],
timeout_millis: float = 10_000,
**kwargs,
) -> None:
"""Called by `MetricReader.collect` when it receives a batch of metrics"""

@abstractmethod
def shutdown(self, *args, **kwargs):
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
"""Shuts down the MetricReader. This method provides a way
for the MetricReader to do any cleanup required. A metric reader can
only be shutdown once, any subsequent calls are ignored and return
Expand Down
89 changes: 89 additions & 0 deletions opentelemetry-sdk/tests/metrics/test_backward_compat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
The purpose of this test is to test for backward compatibility with any user-implementable
interfaces as they were originally defined. For example, changes to the MetricExporter ABC must
be made in such a way that existing implementations (outside of this repo) continue to work
when *called* by the SDK.
This does not apply to classes which are not intended to be overriden by the user e.g. Meter
and PeriodicExportingMetricReader concrete class. Those may freely be modified in a
backward-compatible way for *callers*.
Ideally, we could use mypy for this as well, but SDK is not type checked atm.
"""

from typing import Iterable, Sequence
from unittest import TestCase

from opentelemetry.sdk._metrics import MeterProvider
from opentelemetry.sdk._metrics.export import (
MetricExporter,
MetricExportResult,
PeriodicExportingMetricReader,
)
from opentelemetry.sdk._metrics.metric_reader import MetricReader
from opentelemetry.sdk._metrics.point import Metric


# Do not change these classes until after major version 1
class OrigMetricExporter(MetricExporter):
def export(
self,
metrics: Sequence[Metric],
timeout_millis: float = 10_000,
**kwargs,
) -> MetricExportResult:
pass

def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
pass


class OrigMetricReader(MetricReader):
def _receive_metrics(
self,
metrics: Iterable[Metric],
timeout_millis: float = 10_000,
**kwargs,
) -> None:
pass

def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
self.collect()


class TestBackwardCompat(TestCase):
def test_metric_exporter(self):
exporter = OrigMetricExporter()
meter_provider = MeterProvider(
metric_readers=[PeriodicExportingMetricReader(exporter)]
)
# produce some data
meter_provider.get_meter("foo").create_counter("mycounter").add(12)
try:
meter_provider.shutdown()
except Exception:
self.fail()

def test_metric_reader(self):
reader = OrigMetricReader()
meter_provider = MeterProvider(metric_readers=[reader])
# produce some data
meter_provider.get_meter("foo").create_counter("mycounter").add(12)
try:
meter_provider.shutdown()
except Exception:
self.fail()
15 changes: 10 additions & 5 deletions opentelemetry-sdk/tests/metrics/test_metric_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@
# limitations under the License.

from os import environ
from typing import Dict
from typing import Dict, Iterable
from unittest import TestCase
from unittest.mock import patch

from opentelemetry.sdk._metrics._internal.aggregation import Aggregation
from opentelemetry.sdk._metrics.aggregation import (
Aggregation,
DefaultAggregation,
LastValueAggregation,
)
Expand All @@ -31,7 +31,7 @@
UpDownCounter,
)
from opentelemetry.sdk._metrics.metric_reader import MetricReader
from opentelemetry.sdk._metrics.point import AggregationTemporality
from opentelemetry.sdk._metrics.point import AggregationTemporality, Metric
from opentelemetry.sdk.environment_variables import (
_OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE,
)
Expand All @@ -48,10 +48,15 @@ def __init__(
preferred_aggregation=preferred_aggregation,
)

def _receive_metrics(self, metrics):
def _receive_metrics(
self,
metrics: Iterable[Metric],
timeout_millis: float = 10_000,
**kwargs,
) -> None:
pass

def shutdown(self):
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
return True


Expand Down
Loading

0 comments on commit a72c7de

Please sign in to comment.