Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PeriodicExportingMetricsReader with value = infinity to support explicit metric collection #3059

Merged
merged 29 commits into from
Feb 3, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
f99a4fb
ExportingMetricsReporter and unit tests added,
howardyoo Nov 29, 2022
3dd484c
included ExportingMetricReader to the list of importable classes
howardyoo Nov 29, 2022
aa5b196
fixed incorrect parameter for the method.
howardyoo Nov 29, 2022
e5f6101
fixed minor bug in unit test code
howardyoo Nov 29, 2022
95e0cc6
fixed a lint issue.
howardyoo Nov 29, 2022
8d6d6df
fixing lint issues on the order of library imports
howardyoo Nov 29, 2022
523b428
another lint fix on the test code on the order of the library imports
howardyoo Nov 29, 2022
b9a2ee2
Merge branch 'main' into main
srikanthccv Dec 10, 2022
3fdca7d
Merge branch 'main' into main
srikanthccv Jan 2, 2023
809c049
1. removed ExportMetricReader related tests and codes
howardyoo Jan 4, 2023
4df9ddc
Merge branch 'main' into main
srikanthccv Jan 8, 2023
d7b52d1
Added example for zero interval periodic exporting metrics reader,
howardyoo Jan 9, 2023
2e4fad3
reformatting...
howardyoo Jan 10, 2023
d0a0664
Merge branch 'main' into main
lzchen Jan 12, 2023
e8bb84a
removed the examples for non-interval reader as a result of SIG discu…
howardyoo Jan 13, 2023
83266b0
Merge branch 'main' into main
lzchen Jan 13, 2023
9b4979a
minor changes in the code comment and changelog
howardyoo Jan 17, 2023
1dc1852
Merge branch 'open-telemetry:main' into main
howardyoo Jan 28, 2023
a891ae0
1) added support for math.inf which will disable interval thread
howardyoo Jan 28, 2023
97fe654
Update CHANGELOG.md
howardyoo Jan 30, 2023
d59be9c
Update opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/expo…
howardyoo Jan 30, 2023
bd4c62e
Merge branch 'main' into main
lzchen Jan 30, 2023
5c06eac
Update opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/expo…
howardyoo Feb 2, 2023
2b5029a
Merge branch 'main' into main
srikanthccv Feb 2, 2023
c042bed
Merge branch 'main' into main
srikanthccv Feb 2, 2023
a5cbed0
Update opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/expo…
howardyoo Feb 2, 2023
24da061
Merge branch 'open-telemetry:main' into main
howardyoo Feb 3, 2023
16330da
fixed incorrect reference to self.
howardyoo Feb 3, 2023
436ad0f
make sure to have the _daemon_thread set to None
howardyoo Feb 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -411,22 +411,17 @@ def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
pass


class PeriodicExportingMetricReader(MetricReader):
"""`PeriodicExportingMetricReader` is an implementation of `MetricReader`
that collects metrics based on a user-configurable time interval, and passes the
metrics to the configured exporter.

The configured exporter's :py:meth:`~MetricExporter.export` method will not be called
concurrently.
class ExportingMetricReader(MetricReader):
lzchen marked this conversation as resolved.
Show resolved Hide resolved
"""`ExportingMetricReader` is an implementation of `MetricReader`
that collects metrics and passes the metrics to the configured exporter, explicitly.
"""

def __init__(
self,
exporter: MetricExporter,
export_interval_millis: Optional[float] = None,
export_timeout_millis: Optional[float] = None,
) -> None:
# PeriodicExportingMetricReader defers to exporter for configuration
# ExportingMetricReader defers to exporter for configuration
super().__init__(
preferred_temporality=exporter._preferred_temporality,
preferred_aggregation=exporter._preferred_aggregation,
Expand All @@ -438,16 +433,7 @@ def __init__(
self._export_lock = Lock()

self._exporter = exporter
if export_interval_millis is None:
try:
export_interval_millis = float(
environ.get(OTEL_METRIC_EXPORT_INTERVAL, 60000)
)
except ValueError:
_logger.warning(
"Found invalid value for export interval, using default"
)
export_interval_millis = 60000

if export_timeout_millis is None:
try:
export_timeout_millis = float(
Expand All @@ -458,36 +444,15 @@ def __init__(
"Found invalid value for export timeout, using default"
)
export_timeout_millis = 30000
self._export_interval_millis = export_interval_millis

self._export_timeout_millis = export_timeout_millis
self._shutdown = False
self._shutdown_event = Event()
self._shutdown_once = Once()
self._daemon_thread = Thread(
name="OtelPeriodicExportingMetricReader",
target=self._ticker,
daemon=True,
)
self._daemon_thread.start()
if hasattr(os, "register_at_fork"):
os.register_at_fork(
after_in_child=self._at_fork_reinit
) # pylint: disable=protected-access

def _at_fork_reinit(self):
self._daemon_thread = Thread(
name="OtelPeriodicExportingMetricReader",
target=self._ticker,
daemon=True,
)
self._daemon_thread.start()

def _ticker(self) -> None:
interval_secs = self._export_interval_millis / 1e3
while not self._shutdown_event.wait(interval_secs):
self.collect(timeout_millis=self._export_timeout_millis)
# one last collection below before shutting down completely
self.collect(timeout_millis=self._export_interval_millis)
def export(self, timeout_millis: float = 10_000) -> None:
# export is just a wrapper on force_flush.
return self.force_flush(timeout_millis=timeout_millis)

def _receive_metrics(
self,
Expand Down Expand Up @@ -519,10 +484,74 @@ def _shutdown():
return

self._shutdown_event.set()
self._daemon_thread.join(timeout=(deadline_ns - time_ns()) / 10**9)
self._exporter.shutdown(timeout=(deadline_ns - time_ns()) / 10**6)

def force_flush(self, timeout_millis: float = 10_000) -> bool:
super().force_flush(timeout_millis=timeout_millis)
self._exporter.force_flush(timeout_millis=timeout_millis)
return True


class PeriodicExportingMetricReader(ExportingMetricReader):
"""`PeriodicExportingMetricReader` is an implementation of `ExportingMetricReader`
that collects metrics based on a user-configurable time interval, and passes the
metrics to the configured exporter.

The configured exporter's :py:meth:`~MetricExporter.export` method will not be called
concurrently.
"""

def __init__(
self,
exporter: MetricExporter,
export_interval_millis: Optional[float] = None,
export_timeout_millis: Optional[float] = None,
) -> None:
# PeriodicExportingMetricReader defers to exporter for configuration
super().__init__(
exporter=exporter,
export_timeout_millis=export_timeout_millis,
)

if export_interval_millis is None:
try:
export_interval_millis = float(
environ.get(OTEL_METRIC_EXPORT_INTERVAL, 60000)
)
except ValueError:
_logger.warning(
"Found invalid value for export interval, using default"
)
export_interval_millis = 60000

self._export_interval_millis = export_interval_millis
self._daemon_thread = Thread(
name="OtelPeriodicExportingMetricReader",
target=self._ticker,
daemon=True,
)
self._daemon_thread.start()
if hasattr(os, "register_at_fork"):
os.register_at_fork(
after_in_child=self._at_fork_reinit
) # pylint: disable=protected-access

def _at_fork_reinit(self):
self._daemon_thread = Thread(
name="OtelPeriodicExportingMetricReader",
target=self._ticker,
daemon=True,
)
self._daemon_thread.start()

def _ticker(self) -> None:
interval_secs = self._export_interval_millis / 1e3
while not self._shutdown_event.wait(interval_secs):
self.collect(timeout_millis=self._export_timeout_millis)
# one last collection below before shutting down completely
self.collect(timeout_millis=self._export_interval_millis)

def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
deadline_ns = time_ns() + timeout_millis * 10**6
super().shutdown(timeout_millis=timeout_millis)
self._daemon_thread.join(timeout=(deadline_ns - time_ns()) / 10**9)
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from opentelemetry.sdk.metrics._internal.export import ( # noqa: F401
AggregationTemporality,
ConsoleMetricExporter,
ExportingMetricReader,
InMemoryMetricReader,
MetricExporter,
MetricExportResult,
Expand Down
185 changes: 185 additions & 0 deletions opentelemetry-sdk/tests/metrics/test_exporting_metric_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from time import sleep, time_ns
from typing import Sequence
from unittest.mock import Mock

from flaky import flaky

from opentelemetry.sdk.metrics import Counter
from opentelemetry.sdk.metrics._internal import _Counter
from opentelemetry.sdk.metrics.export import (
AggregationTemporality,
ExportingMetricReader,
Gauge,
Metric,
MetricExporter,
MetricExportResult,
NumberDataPoint,
Sum,
)
from opentelemetry.sdk.metrics.view import (
DefaultAggregation,
LastValueAggregation,
)
from opentelemetry.test.concurrency_test import ConcurrencyTestBase


class FakeMetricsExporter(MetricExporter):
def __init__(
self, wait=0, preferred_temporality=None, preferred_aggregation=None
):
self.wait = wait
self.metrics = []
self._shutdown = False
super().__init__(
preferred_temporality=preferred_temporality,
preferred_aggregation=preferred_aggregation,
)

def export(
self,
metrics: Sequence[Metric],
timeout_millis: float = 10_000,
**kwargs,
) -> MetricExportResult:
sleep(self.wait)
self.metrics.extend(metrics)
return True

def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
self._shutdown = True

def force_flush(self, timeout_millis: float = 10_000) -> bool:
return True


metrics_list = [
Metric(
name="sum_name",
description="",
unit="",
data=Sum(
data_points=[
NumberDataPoint(
attributes={},
start_time_unix_nano=time_ns(),
time_unix_nano=time_ns(),
value=2,
)
],
aggregation_temporality=1,
is_monotonic=True,
),
),
Metric(
name="gauge_name",
description="",
unit="",
data=Gauge(
data_points=[
NumberDataPoint(
attributes={},
start_time_unix_nano=time_ns(),
time_unix_nano=time_ns(),
value=2,
)
]
),
),
]


class TestExportingMetricReader(ConcurrencyTestBase):
def test_defaults(self):
pmr = ExportingMetricReader(FakeMetricsExporter())
self.assertEqual(pmr._export_timeout_millis, 30000)
pmr.shutdown()

def _create_exporting_reader(
self, metrics, exporter, collect_wait=0, timeout=30000
):

pmr = ExportingMetricReader(exporter, export_timeout_millis=timeout)

def _collect(reader, timeout_millis):
sleep(collect_wait)
pmr._receive_metrics(metrics, timeout_millis)

pmr._set_collect_callback(_collect)
return pmr

def test_export(self):
collect_mock = Mock()
exporter = FakeMetricsExporter()
exporter.export = Mock()
pmr = ExportingMetricReader(exporter, export_timeout_millis=1)
pmr._set_collect_callback(collect_mock)
pmr.export()
self.assertTrue(collect_mock.assert_called_once)
pmr.shutdown()

@flaky(max_runs=3, min_passes=1)
def test_collects_metrics(self):
exporter = FakeMetricsExporter()

pmr = self._create_exporting_reader(
metrics_list, exporter, timeout=1000
)
pmr.collect()
self.assertEqual(exporter.metrics, metrics_list)
pmr.shutdown()

def test_shutdown(self):
exporter = FakeMetricsExporter()

pmr = self._create_exporting_reader([], exporter)
pmr.shutdown()
self.assertEqual(exporter.metrics, [])
self.assertTrue(pmr._shutdown)
self.assertTrue(exporter._shutdown)

def test_shutdown_multiple_times(self):
pmr = self._create_exporting_reader([], FakeMetricsExporter())
with self.assertLogs(level="WARNING") as w:
self.run_with_many_threads(pmr.shutdown)
self.assertTrue("Can't shutdown multiple times", w.output[0])
pmr.shutdown()

def test_exporter_temporality_preference(self):
exporter = FakeMetricsExporter(
preferred_temporality={
Counter: AggregationTemporality.DELTA,
},
)
pmr = ExportingMetricReader(exporter)
for key, value in pmr._instrument_class_temporality.items():
if key is not _Counter:
self.assertEqual(value, AggregationTemporality.CUMULATIVE)
else:
self.assertEqual(value, AggregationTemporality.DELTA)

def test_exporter_aggregation_preference(self):
exporter = FakeMetricsExporter(
preferred_aggregation={
Counter: LastValueAggregation(),
},
)
pmr = ExportingMetricReader(exporter)
for key, value in pmr._instrument_class_aggregation.items():
if key is not _Counter:
self.assertTrue(isinstance(value, DefaultAggregation))
else:
self.assertTrue(isinstance(value, LastValueAggregation))