Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timeouts to metric SDK #2653

Merged
merged 8 commits into from
May 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased](https://github.com/open-telemetry/opentelemetry-python/compare/v1.11.1-0.30b1...HEAD)

- Add timeouts to metric SDK
([#2653](https://github.com/open-telemetry/opentelemetry-python/pull/2653))
- Add variadic arguments to metric exporter/reader interfaces
([#2654](https://github.com/open-telemetry/opentelemetry-python/pull/2654))
- Move Metrics API behind internal package
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,13 @@ def _translate_data(
)

def export(
self, metrics: Sequence[Metric], *args, **kwargs
self,
metrics: Sequence[Metric],
ocelotl marked this conversation as resolved.
Show resolved Hide resolved
timeout_millis: float = 10_000,
**kwargs,
) -> MetricExportResult:
# TODO(#2663): OTLPExporterMixin should pass timeout to gRPC
return self._export(metrics)

def shutdown(self, *args, **kwargs):
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
pass
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,17 @@ def __init__(self, prefix: str = "") -> None:
self._collector._callback = self.collect

def _receive_metrics(
self, metrics: Iterable[Metric], *args, **kwargs
self,
metrics: Iterable[Metric],
timeout_millis: float = 10_000,
**kwargs,
) -> None:
if metrics is None:
return
self._collector.add_metrics_data(metrics)

def shutdown(self, *args, **kwargs) -> bool:
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
REGISTRY.unregister(self._collector)
return True


class _CustomCollector:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.util.instrumentation import InstrumentationScope
from opentelemetry.util._once import Once
from opentelemetry.util._time import _time_ns

_logger = getLogger(__name__)

Expand Down Expand Up @@ -369,16 +370,20 @@ def __init__(
self._shutdown_once = Once()
self._shutdown = False

def force_flush(self) -> bool:

# FIXME implement a timeout
def force_flush(self, timeout_millis: float = 10_000) -> bool:
deadline_ns = _time_ns() + timeout_millis * 10**6

for metric_reader in self._sdk_config.metric_readers:
metric_reader.collect()
current_ts = _time_ns()
if current_ts >= deadline_ns:
raise Exception("Timed out while flushing metric readers")
ocelotl marked this conversation as resolved.
Show resolved Hide resolved
metric_reader.collect(
timeout_millis=(deadline_ns - current_ts) / 10**6
)
return True

def shutdown(self):
# FIXME implement a timeout
def shutdown(self, timeout_millis: float = 30_000):
deadline_ns = _time_ns() + timeout_millis * 10**6

def _shutdown():
self._shutdown = True
Expand All @@ -392,8 +397,15 @@ def _shutdown():
metric_reader_error = {}

for metric_reader in self._sdk_config.metric_readers:
current_ts = _time_ns()
try:
metric_reader.shutdown()
if current_ts >= deadline_ns:
raise Exception(
"Didn't get to execute, deadline already exceeded"
)
metric_reader.shutdown(
timeout_millis=(deadline_ns - current_ts) / 10**6
)

# pylint: disable=broad-except
except Exception as error:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from opentelemetry.sdk._metrics.metric_reader import MetricReader
from opentelemetry.sdk._metrics.point import AggregationTemporality, Metric
from opentelemetry.util._once import Once
from opentelemetry.util._time import _time_ns

_logger = logging.getLogger(__name__)

Expand All @@ -53,8 +54,11 @@ class MetricExporter(ABC):

@abstractmethod
def export(
self, metrics: Sequence[Metric], *args, **kwargs
) -> "MetricExportResult":
self,
metrics: Sequence[Metric],
timeout_millis: float = 10_000,
**kwargs,
) -> MetricExportResult:
"""Exports a batch of telemetry data.

Args:
Expand All @@ -65,7 +69,7 @@ def export(
"""

@abstractmethod
def shutdown(self, *args, **kwargs) -> None:
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
"""Shuts down the exporter.

Called when the SDK is shut down.
Expand All @@ -90,14 +94,17 @@ def __init__(
self.formatter = formatter

def export(
self, metrics: Sequence[Metric], *args, **kwargs
self,
metrics: Sequence[Metric],
timeout_millis: float = 10_000,
**kwargs,
) -> MetricExportResult:
for metric in metrics:
self.out.write(self.formatter(metric))
self.out.flush()
return MetricExportResult.SUCCESS

def shutdown(self, *args, **kwargs) -> None:
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
pass


Expand Down Expand Up @@ -127,11 +134,16 @@ def get_metrics(self) -> List[Metric]:
self._metrics = []
return metrics

def _receive_metrics(self, metrics: Iterable[Metric], *args, **kwargs):
def _receive_metrics(
self,
metrics: Iterable[Metric],
timeout_millis: float = 10_000,
**kwargs,
) -> None:
with self._lock:
self._metrics = list(metrics)

def shutdown(self, *args, **kwargs):
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
pass


Expand Down Expand Up @@ -193,23 +205,28 @@ def _at_fork_reinit(self):
def _ticker(self) -> None:
interval_secs = self._export_interval_millis / 1e3
while not self._shutdown_event.wait(interval_secs):
self.collect()
self.collect(timeout_millis=self._export_timeout_millis)
# one last collection below before shutting down completely
self.collect()
self.collect(timeout_millis=self._export_interval_millis)

def _receive_metrics(
self, metrics: Iterable[Metric], *args, **kwargs
self,
metrics: Iterable[Metric],
timeout_millis: float = 10_000,
**kwargs,
) -> None:
if metrics is None:
return
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
try:
self._exporter.export(metrics)
self._exporter.export(metrics, timeout_millis=timeout_millis)
except Exception as e: # pylint: disable=broad-except,invalid-name
_logger.exception("Exception while exporting metrics %s", str(e))
detach(token)

def shutdown(self, *args, **kwargs):
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
ocelotl marked this conversation as resolved.
Show resolved Hide resolved
deadline_ns = _time_ns() + timeout_millis * 10**6

def _shutdown():
self._shutdown = True

Expand All @@ -219,5 +236,5 @@ def _shutdown():
return

self._shutdown_event.set()
self._daemon_thread.join()
self._exporter.shutdown()
self._daemon_thread.join(timeout=(deadline_ns - _time_ns()) / 10**9)
self._exporter.shutdown(timeout=(deadline_ns - _time_ns()) / 10**6)
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def __init__(
self._instrument_class_aggregation.update(preferred_aggregation or {})

@final
def collect(self) -> None:
def collect(self, timeout_millis: float = 10_000) -> None:
"""Collects the metrics from the internal SDK state and
invokes the `_receive_metrics` with the collection.
"""
Expand All @@ -148,7 +148,8 @@ def collect(self) -> None:
)
return
self._receive_metrics(
self._collect(self, self._instrument_class_temporality)
self._collect(self, self._instrument_class_temporality),
timeout_millis=timeout_millis,
)

@final
Expand All @@ -162,11 +163,16 @@ def _set_collect_callback(
self._collect = func

@abstractmethod
def _receive_metrics(self, metrics: Iterable[Metric], *args, **kwargs):
def _receive_metrics(
self,
metrics: Iterable[Metric],
timeout_millis: float = 10_000,
**kwargs,
) -> None:
"""Called by `MetricReader.collect` when it receives a batch of metrics"""

@abstractmethod
def shutdown(self, *args, **kwargs):
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
"""Shuts down the MetricReader. This method provides a way
for the MetricReader to do any cleanup required. A metric reader can
only be shutdown once, any subsequent calls are ignored and return
Expand Down
89 changes: 89 additions & 0 deletions opentelemetry-sdk/tests/metrics/test_backward_compat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
The purpose of this test is to test for backward compatibility with any user-implementable
interfaces as they were originally defined. For example, changes to the MetricExporter ABC must
be made in such a way that existing implementations (outside of this repo) continue to work
when *called* by the SDK.

This does not apply to classes which are not intended to be overriden by the user e.g. Meter
and PeriodicExportingMetricReader concrete class. Those may freely be modified in a
backward-compatible way for *callers*.

Ideally, we could use mypy for this as well, but SDK is not type checked atm.
"""

from typing import Iterable, Sequence
from unittest import TestCase

from opentelemetry.sdk._metrics import MeterProvider
from opentelemetry.sdk._metrics.export import (
MetricExporter,
MetricExportResult,
PeriodicExportingMetricReader,
)
from opentelemetry.sdk._metrics.metric_reader import MetricReader
from opentelemetry.sdk._metrics.point import Metric


# Do not change these classes until after major version 1
class OrigMetricExporter(MetricExporter):
def export(
self,
metrics: Sequence[Metric],
timeout_millis: float = 10_000,
**kwargs,
) -> MetricExportResult:
pass

def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
pass


class OrigMetricReader(MetricReader):
def _receive_metrics(
self,
metrics: Iterable[Metric],
timeout_millis: float = 10_000,
**kwargs,
) -> None:
pass

def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
self.collect()


class TestBackwardCompat(TestCase):
def test_metric_exporter(self):
exporter = OrigMetricExporter()
meter_provider = MeterProvider(
metric_readers=[PeriodicExportingMetricReader(exporter)]
)
# produce some data
meter_provider.get_meter("foo").create_counter("mycounter").add(12)
aabmass marked this conversation as resolved.
Show resolved Hide resolved
try:
meter_provider.shutdown()
except Exception:
self.fail()

def test_metric_reader(self):
reader = OrigMetricReader()
meter_provider = MeterProvider(metric_readers=[reader])
# produce some data
meter_provider.get_meter("foo").create_counter("mycounter").add(12)
try:
meter_provider.shutdown()
except Exception:
self.fail()
15 changes: 10 additions & 5 deletions opentelemetry-sdk/tests/metrics/test_metric_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@
# limitations under the License.

from os import environ
from typing import Dict
from typing import Dict, Iterable
from unittest import TestCase
from unittest.mock import patch

from opentelemetry.sdk._metrics._internal.aggregation import Aggregation
from opentelemetry.sdk._metrics.aggregation import (
Aggregation,
DefaultAggregation,
LastValueAggregation,
)
Expand All @@ -31,7 +31,7 @@
UpDownCounter,
)
from opentelemetry.sdk._metrics.metric_reader import MetricReader
from opentelemetry.sdk._metrics.point import AggregationTemporality
from opentelemetry.sdk._metrics.point import AggregationTemporality, Metric
from opentelemetry.sdk.environment_variables import (
_OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE,
)
Expand All @@ -48,10 +48,15 @@ def __init__(
preferred_aggregation=preferred_aggregation,
)

def _receive_metrics(self, metrics):
def _receive_metrics(
self,
metrics: Iterable[Metric],
timeout_millis: float = 10_000,
**kwargs,
) -> None:
pass

def shutdown(self):
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
return True


Expand Down
Loading