Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Renaming batcher to processor #1203

Merged
merged 6 commits into from
Oct 6, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/getting_started/prometheus_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
# Start Prometheus client
start_http_server(port=8000, addr="localhost")

batcher_mode = "stateful"
processor_mode = "stateful"
metrics.set_meter_provider(MeterProvider())
meter = metrics.get_meter(__name__, batcher_mode == "stateful")
meter = metrics.get_meter(__name__, processor_mode == "stateful")
exporter = PrometheusMetricsExporter("MyAppPrefix")
controller = PushController(meter, exporter, 5)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
opentelemetry.sdk.metrics.export.batcher
opentelemetry.sdk.metrics.export.processor
==========================================

.. toctree::

metrics.export

.. automodule:: opentelemetry.sdk.metrics.export.batcher
.. automodule:: opentelemetry.sdk.metrics.export.processor
:members:
:undoc-members:
:show-inheritance:
2 changes: 1 addition & 1 deletion docs/sdk/metrics.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Submodules
.. toctree::

metrics.export.aggregate
metrics.export.batcher
metrics.export.processor
util.instrumentation

.. automodule:: opentelemetry.sdk.metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def translate_to_collector(

# If cumulative and stateful, explicitly set the start_timestamp to
# exporter start time.
if metric_record.instrument.meter.batcher.stateful:
if metric_record.instrument.meter.processor.stateful:
start_timestamp = exporter_start_timestamp
else:
start_timestamp = None
Expand Down
2 changes: 2 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
([#1105](https://github.com/open-telemetry/opentelemetry-python/pull/1120))
- Allow for Custom Trace and Span IDs Generation - `IdsGenerator` for TracerProvider
([#1153](https://github.com/open-telemetry/opentelemetry-python/pull/1153))
- Renaming metrics Batcher to Processor
([#1203](https://github.com/open-telemetry/opentelemetry-python/pull/1203))

## Version 0.13b0

Expand Down
12 changes: 6 additions & 6 deletions opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
MetricsExporter,
)
from opentelemetry.sdk.metrics.export.aggregate import Aggregator
from opentelemetry.sdk.metrics.export.batcher import Batcher
from opentelemetry.sdk.metrics.export.controller import PushController
from opentelemetry.sdk.metrics.export.processor import Processor
from opentelemetry.sdk.metrics.view import (
ViewData,
ViewManager,
Expand Down Expand Up @@ -325,7 +325,7 @@ class ValueObserver(Observer, metrics_api.ValueObserver):


class Record:
"""Container class used for processing in the `Batcher`"""
"""Container class used for processing in the `Processor`"""

def __init__(
self,
Expand All @@ -352,7 +352,7 @@ def __init__(
instrumentation_info: "InstrumentationInfo",
):
self.instrumentation_info = instrumentation_info
self.batcher = Batcher(source.stateful)
self.processor = Processor(source.stateful)
self.resource = source.resource
self.metrics = set()
self.observers = set()
Expand All @@ -363,7 +363,7 @@ def __init__(
def collect(self) -> None:
"""Collects all the metrics created with this `Meter` for export.

Utilizes the batcher to create checkpoints of the current values in
Utilizes the processor to create checkpoints of the current values in
each aggregator belonging to the metrics that were created with this
meter instance.
"""
Expand All @@ -385,7 +385,7 @@ def _collect_metrics(self) -> None:
record = Record(
metric, view_data.labels, view_data.aggregator
)
self.batcher.process(record)
self.processor.process(record)

if bound_instrument.ref_count() == 0:
to_remove.append(labels)
Expand All @@ -405,7 +405,7 @@ def _collect_observers(self) -> None:

for labels, aggregator in observer.aggregators.items():
record = Record(observer, labels, aggregator)
self.batcher.process(record)
self.processor.process(record)

def record_batch(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def tick(self):
self.meter.collect()
# Export the collected metrics
token = attach(set_value("suppress_instrumentation", True))
self.exporter.export(self.meter.batcher.checkpoint_set())
self.exporter.export(self.meter.processor.checkpoint_set())
detach(token)
# Perform post-exporting logic
self.meter.batcher.finished_collection()
self.meter.processor.finished_collection()
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@
from opentelemetry.sdk.util import get_dict_as_key


class Batcher:
"""Base class for all batcher types.
class Processor:
"""Base class for all processor types.

The batcher is responsible for storing the aggregators and aggregated
The processor is responsible for storing the aggregators and aggregated
values received from updates from metrics in the meter. The stored values
will be sent to an exporter for exporting.
"""

def __init__(self, stateful: bool):
self._batch_map = {}
# stateful=True indicates the batcher computes checkpoints from over
# the process lifetime. False indicates the batcher computes
# stateful=True indicates the processor computes checkpoints from over
# the process lifetime. False indicates the processor computes
# checkpoints which describe the updates of a single collection period
# (deltas)
self.stateful = stateful
Expand All @@ -38,7 +38,7 @@ def checkpoint_set(self) -> Sequence[MetricRecord]:
"""Returns a list of MetricRecords used for exporting.

The list of MetricRecords is a snapshot created from the current
data in all of the aggregators in this batcher.
data in all of the aggregators in this processor.
"""
metric_records = []
# pylint: disable=W0612
Expand Down
70 changes: 35 additions & 35 deletions opentelemetry-sdk/tests/metrics/export/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
SumAggregator,
ValueObserverAggregator,
)
from opentelemetry.sdk.metrics.export.batcher import Batcher
from opentelemetry.sdk.metrics.export.controller import PushController
from opentelemetry.sdk.metrics.export.processor import Processor


# pylint: disable=protected-access
Expand Down Expand Up @@ -64,7 +64,7 @@ def test_export(self):
class TestBatcher(unittest.TestCase):
def test_checkpoint_set(self):
meter = metrics.MeterProvider().get_meter(__name__)
batcher = Batcher(True)
processor = Processor(True)
aggregator = SumAggregator()
metric = metrics.Counter(
"available memory", "available memory", "bytes", int, meter
Expand All @@ -73,21 +73,21 @@ def test_checkpoint_set(self):
labels = ()
_batch_map = {}
_batch_map[(metric, SumAggregator, tuple(), labels)] = aggregator
batcher._batch_map = _batch_map
records = batcher.checkpoint_set()
processor._batch_map = _batch_map
records = processor.checkpoint_set()
self.assertEqual(len(records), 1)
self.assertEqual(records[0].instrument, metric)
self.assertEqual(records[0].labels, labels)
self.assertEqual(records[0].aggregator, aggregator)

def test_checkpoint_set_empty(self):
batcher = Batcher(True)
records = batcher.checkpoint_set()
processor = Processor(True)
records = processor.checkpoint_set()
self.assertEqual(len(records), 0)

def test_finished_collection_stateless(self):
meter = metrics.MeterProvider().get_meter(__name__)
batcher = Batcher(False)
processor = Processor(False)
aggregator = SumAggregator()
metric = metrics.Counter(
"available memory", "available memory", "bytes", int, meter
Expand All @@ -96,13 +96,13 @@ def test_finished_collection_stateless(self):
labels = ()
_batch_map = {}
_batch_map[(metric, SumAggregator, tuple(), labels)] = aggregator
batcher._batch_map = _batch_map
batcher.finished_collection()
self.assertEqual(len(batcher._batch_map), 0)
processor._batch_map = _batch_map
processor.finished_collection()
self.assertEqual(len(processor._batch_map), 0)

def test_finished_collection_stateful(self):
meter = metrics.MeterProvider().get_meter(__name__)
batcher = Batcher(True)
processor = Processor(True)
aggregator = SumAggregator()
metric = metrics.Counter(
"available memory", "available memory", "bytes", int, meter
Expand All @@ -111,13 +111,13 @@ def test_finished_collection_stateful(self):
labels = ()
_batch_map = {}
_batch_map[(metric, SumAggregator, tuple(), labels)] = aggregator
batcher._batch_map = _batch_map
batcher.finished_collection()
self.assertEqual(len(batcher._batch_map), 1)
processor._batch_map = _batch_map
processor.finished_collection()
self.assertEqual(len(processor._batch_map), 1)

def test_batcher_process_exists(self):
meter = metrics.MeterProvider().get_meter(__name__)
batcher = Batcher(True)
processor = Processor(True)
aggregator = SumAggregator()
aggregator2 = SumAggregator()
metric = metrics.Counter(
Expand All @@ -128,17 +128,17 @@ def test_batcher_process_exists(self):
batch_key = (metric, SumAggregator, tuple(), labels)
_batch_map[batch_key] = aggregator
aggregator2.update(1.0)
batcher._batch_map = _batch_map
processor._batch_map = _batch_map
record = metrics.Record(metric, labels, aggregator2)
batcher.process(record)
self.assertEqual(len(batcher._batch_map), 1)
self.assertIsNotNone(batcher._batch_map.get(batch_key))
self.assertEqual(batcher._batch_map.get(batch_key).current, 0)
self.assertEqual(batcher._batch_map.get(batch_key).checkpoint, 1.0)
processor.process(record)
self.assertEqual(len(processor._batch_map), 1)
self.assertIsNotNone(processor._batch_map.get(batch_key))
self.assertEqual(processor._batch_map.get(batch_key).current, 0)
self.assertEqual(processor._batch_map.get(batch_key).checkpoint, 1.0)

def test_batcher_process_not_exists(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Remove "batcher" from test names?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed, thanks for the catch

meter = metrics.MeterProvider().get_meter(__name__)
batcher = Batcher(True)
processor = Processor(True)
aggregator = SumAggregator()
metric = metrics.Counter(
"available memory", "available memory", "bytes", int, meter
Expand All @@ -147,17 +147,17 @@ def test_batcher_process_not_exists(self):
_batch_map = {}
batch_key = (metric, SumAggregator, tuple(), labels)
aggregator.update(1.0)
batcher._batch_map = _batch_map
processor._batch_map = _batch_map
record = metrics.Record(metric, labels, aggregator)
batcher.process(record)
self.assertEqual(len(batcher._batch_map), 1)
self.assertIsNotNone(batcher._batch_map.get(batch_key))
self.assertEqual(batcher._batch_map.get(batch_key).current, 0)
self.assertEqual(batcher._batch_map.get(batch_key).checkpoint, 1.0)
processor.process(record)
self.assertEqual(len(processor._batch_map), 1)
self.assertIsNotNone(processor._batch_map.get(batch_key))
self.assertEqual(processor._batch_map.get(batch_key).current, 0)
self.assertEqual(processor._batch_map.get(batch_key).checkpoint, 1.0)

def test_batcher_process_not_stateful(self):
meter = metrics.MeterProvider().get_meter(__name__)
batcher = Batcher(True)
processor = Processor(True)
aggregator = SumAggregator()
metric = metrics.Counter(
"available memory", "available memory", "bytes", int, meter
Expand All @@ -166,13 +166,13 @@ def test_batcher_process_not_stateful(self):
_batch_map = {}
batch_key = (metric, SumAggregator, tuple(), labels)
aggregator.update(1.0)
batcher._batch_map = _batch_map
processor._batch_map = _batch_map
record = metrics.Record(metric, labels, aggregator)
batcher.process(record)
self.assertEqual(len(batcher._batch_map), 1)
self.assertIsNotNone(batcher._batch_map.get(batch_key))
self.assertEqual(batcher._batch_map.get(batch_key).current, 0)
self.assertEqual(batcher._batch_map.get(batch_key).checkpoint, 1.0)
processor.process(record)
self.assertEqual(len(processor._batch_map), 1)
self.assertIsNotNone(processor._batch_map.get(batch_key))
self.assertEqual(processor._batch_map.get(batch_key).current, 0)
self.assertEqual(processor._batch_map.get(batch_key).checkpoint, 1.0)


class TestSumAggregator(unittest.TestCase):
Expand Down
12 changes: 6 additions & 6 deletions opentelemetry-sdk/tests/metrics/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class TestMeterProvider(unittest.TestCase):
def test_stateful(self):
meter_provider = metrics.MeterProvider(stateful=False)
meter = meter_provider.get_meter(__name__)
self.assertIs(meter.batcher.stateful, False)
self.assertIs(meter.processor.stateful, False)

def test_resource(self):
resource = resources.Resource.create({})
Expand Down Expand Up @@ -75,7 +75,7 @@ def test_extends_api(self):
def test_collect_metrics(self):
meter = metrics.MeterProvider().get_meter(__name__)
batcher_mock = mock.Mock()
meter.batcher = batcher_mock
meter.processor = batcher_mock
counter = meter.create_metric(
"name", "desc", "unit", float, metrics.Counter
)
Expand All @@ -88,14 +88,14 @@ def test_collect_metrics(self):
def test_collect_no_metrics(self):
meter = metrics.MeterProvider().get_meter(__name__)
batcher_mock = mock.Mock()
meter.batcher = batcher_mock
meter.processor = batcher_mock
meter.collect()
self.assertFalse(batcher_mock.process.called)

def test_collect_not_registered(self):
meter = metrics.MeterProvider().get_meter(__name__)
batcher_mock = mock.Mock()
meter.batcher = batcher_mock
meter.processor = batcher_mock
counter = metrics.Counter("name", "desc", "unit", float, meter)
labels = {"key1": "value1"}
counter.add(1.0, labels)
Expand All @@ -105,7 +105,7 @@ def test_collect_not_registered(self):
def test_collect_disabled_metric(self):
meter = metrics.MeterProvider().get_meter(__name__)
batcher_mock = mock.Mock()
meter.batcher = batcher_mock
meter.processor = batcher_mock
counter = metrics.Counter("name", "desc", "unit", float, meter, False)
labels = {"key1": "value1"}
meter.register_view(View(counter, SumAggregator))
Expand All @@ -116,7 +116,7 @@ def test_collect_disabled_metric(self):
def test_collect_observers(self):
meter = metrics.MeterProvider().get_meter(__name__)
batcher_mock = mock.Mock()
meter.batcher = batcher_mock
meter.processor = batcher_mock
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename batcher_mock to processor_mock?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


def callback(observer):
self.assertIsInstance(observer, metrics_api.Observer)
Expand Down