From 4af1341c7966a95bcb53d475f3ae48b831dbbd78 Mon Sep 17 00:00:00 2001 From: Diego Hurtado Date: Wed, 23 Sep 2020 22:31:17 -0600 Subject: [PATCH] Add support for OTEL_BSP_* environment variables (#1120) Fixes #1105 --- opentelemetry-sdk/CHANGELOG.md | 2 ++ .../sdk/trace/export/__init__.py | 35 ++++++++++++++++--- .../tests/trace/export/test_export.py | 22 +++++++++++- 3 files changed, 53 insertions(+), 6 deletions(-) diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 7fd47150ca9..c9b4c3538db 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -4,6 +4,8 @@ - Update sampling result names ([#1128](https://github.com/open-telemetry/opentelemetry-python/pull/1128)) +- Add support for `OTEL_BSP_MAX_QUEUE_SIZE`, `OTEL_BSP_SCHEDULE_DELAY_MILLIS`, `OTEL_BSP_MAX_EXPORT_BATCH_SIZE` and `OTEL_BSP_EXPORT_TIMEOUT_MILLIS` environment variables + ([#1105](https://github.com/open-telemetry/opentelemetry-python/pull/1120)) ## Version 0.13b0 diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py index 857537b90a1..59231e60f36 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py @@ -20,8 +20,8 @@ import typing from enum import Enum +from opentelemetry.configuration import Configuration from opentelemetry.context import attach, detach, set_value -from opentelemetry.sdk.trace import sampling from opentelemetry.util import time_ns from .. import Span, SpanProcessor @@ -113,10 +113,30 @@ class BatchExportSpanProcessor(SpanProcessor): def __init__( self, span_exporter: SpanExporter, - max_queue_size: int = 2048, - schedule_delay_millis: float = 5000, - max_export_batch_size: int = 512, + max_queue_size: int = None, + schedule_delay_millis: float = None, + max_export_batch_size: int = None, + export_timeout_millis: float = None, ): + + if max_queue_size is None: + max_queue_size = Configuration().get("BSP_MAX_QUEUE_SIZE", 2048) + + if schedule_delay_millis is None: + schedule_delay_millis = Configuration().get( + "BSP_SCHEDULE_DELAY_MILLIS", 5000 + ) + + if max_export_batch_size is None: + max_export_batch_size = Configuration().get( + "BSP_MAX_EXPORT_BATCH_SIZE", 512 + ) + + if export_timeout_millis is None: + export_timeout_millis = Configuration().get( + "BSP_EXPORT_TIMEOUT_MILLIS", 30000 + ) + if max_queue_size <= 0: raise ValueError("max_queue_size must be a positive integer.") @@ -143,6 +163,7 @@ def __init__( self.schedule_delay_millis = schedule_delay_millis self.max_export_batch_size = max_export_batch_size self.max_queue_size = max_queue_size + self.export_timeout_millis = export_timeout_millis self.done = False # flag that indicates that spans are being dropped self._spans_dropped = False @@ -306,7 +327,11 @@ def _drain_queue(self): while self.queue: self._export_batch() - def force_flush(self, timeout_millis: int = 30000) -> bool: + def force_flush(self, timeout_millis: int = None) -> bool: + + if timeout_millis is None: + timeout_millis = self.export_timeout_millis + if self.done: logger.warning("Already shutdown, ignoring call to force_flush().") return True diff --git a/opentelemetry-sdk/tests/trace/export/test_export.py b/opentelemetry-sdk/tests/trace/export/test_export.py index e6fcdb9c22c..8c43f731b2c 100644 --- a/opentelemetry-sdk/tests/trace/export/test_export.py +++ b/opentelemetry-sdk/tests/trace/export/test_export.py @@ -136,6 +136,26 @@ def _create_start_and_end_span(name, span_processor): class TestBatchExportSpanProcessor(unittest.TestCase): + @mock.patch.dict( + "os.environ", + { + "OTEL_BSP_MAX_QUEUE_SIZE": "10", + "OTEL_BSP_SCHEDULE_DELAY_MILLIS": "2", + "OTEL_BSP_MAX_EXPORT_BATCH_SIZE": "3", + "OTEL_BSP_EXPORT_TIMEOUT_MILLIS": "4", + }, + ) + def test_batch_span_processor_environment_variables(self): + + batch_span_processor = export.BatchExportSpanProcessor( + MySpanExporter(destination=[]) + ) + + self.assertEqual(batch_span_processor.max_queue_size, 10) + self.assertEqual(batch_span_processor.schedule_delay_millis, 2) + self.assertEqual(batch_span_processor.max_export_batch_size, 3) + self.assertEqual(batch_span_processor.export_timeout_millis, 4) + def test_shutdown(self): spans_names_list = [] @@ -266,7 +286,7 @@ def test_batch_span_processor_many_spans(self): for _ in range(256): _create_start_and_end_span("foo", span_processor) - time.sleep(0.05) # give some time for the exporter to upload spans + time.sleep(0.1) # give some time for the exporter to upload spans self.assertTrue(span_processor.force_flush()) self.assertEqual(len(spans_names_list), 1024)