Skip to content

Commit

Permalink
Add support for OTEL_BSP_* environment variables (#1120)
Browse files Browse the repository at this point in the history
Fixes #1105
  • Loading branch information
ocelotl authored Sep 24, 2020
1 parent 90d7400 commit 4af1341
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 6 deletions.
2 changes: 2 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

- Update sampling result names
([#1128](https://github.com/open-telemetry/opentelemetry-python/pull/1128))
- Add support for `OTEL_BSP_MAX_QUEUE_SIZE`, `OTEL_BSP_SCHEDULE_DELAY_MILLIS`, `OTEL_BSP_MAX_EXPORT_BATCH_SIZE` and `OTEL_BSP_EXPORT_TIMEOUT_MILLIS` environment variables
([#1105](https://github.com/open-telemetry/opentelemetry-python/pull/1120))

## Version 0.13b0

Expand Down
35 changes: 30 additions & 5 deletions opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import typing
from enum import Enum

from opentelemetry.configuration import Configuration
from opentelemetry.context import attach, detach, set_value
from opentelemetry.sdk.trace import sampling
from opentelemetry.util import time_ns

from .. import Span, SpanProcessor
Expand Down Expand Up @@ -113,10 +113,30 @@ class BatchExportSpanProcessor(SpanProcessor):
def __init__(
self,
span_exporter: SpanExporter,
max_queue_size: int = 2048,
schedule_delay_millis: float = 5000,
max_export_batch_size: int = 512,
max_queue_size: int = None,
schedule_delay_millis: float = None,
max_export_batch_size: int = None,
export_timeout_millis: float = None,
):

if max_queue_size is None:
max_queue_size = Configuration().get("BSP_MAX_QUEUE_SIZE", 2048)

if schedule_delay_millis is None:
schedule_delay_millis = Configuration().get(
"BSP_SCHEDULE_DELAY_MILLIS", 5000
)

if max_export_batch_size is None:
max_export_batch_size = Configuration().get(
"BSP_MAX_EXPORT_BATCH_SIZE", 512
)

if export_timeout_millis is None:
export_timeout_millis = Configuration().get(
"BSP_EXPORT_TIMEOUT_MILLIS", 30000
)

if max_queue_size <= 0:
raise ValueError("max_queue_size must be a positive integer.")

Expand All @@ -143,6 +163,7 @@ def __init__(
self.schedule_delay_millis = schedule_delay_millis
self.max_export_batch_size = max_export_batch_size
self.max_queue_size = max_queue_size
self.export_timeout_millis = export_timeout_millis
self.done = False
# flag that indicates that spans are being dropped
self._spans_dropped = False
Expand Down Expand Up @@ -306,7 +327,11 @@ def _drain_queue(self):
while self.queue:
self._export_batch()

def force_flush(self, timeout_millis: int = 30000) -> bool:
def force_flush(self, timeout_millis: int = None) -> bool:

if timeout_millis is None:
timeout_millis = self.export_timeout_millis

if self.done:
logger.warning("Already shutdown, ignoring call to force_flush().")
return True
Expand Down
22 changes: 21 additions & 1 deletion opentelemetry-sdk/tests/trace/export/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,26 @@ def _create_start_and_end_span(name, span_processor):


class TestBatchExportSpanProcessor(unittest.TestCase):
@mock.patch.dict(
"os.environ",
{
"OTEL_BSP_MAX_QUEUE_SIZE": "10",
"OTEL_BSP_SCHEDULE_DELAY_MILLIS": "2",
"OTEL_BSP_MAX_EXPORT_BATCH_SIZE": "3",
"OTEL_BSP_EXPORT_TIMEOUT_MILLIS": "4",
},
)
def test_batch_span_processor_environment_variables(self):

batch_span_processor = export.BatchExportSpanProcessor(
MySpanExporter(destination=[])
)

self.assertEqual(batch_span_processor.max_queue_size, 10)
self.assertEqual(batch_span_processor.schedule_delay_millis, 2)
self.assertEqual(batch_span_processor.max_export_batch_size, 3)
self.assertEqual(batch_span_processor.export_timeout_millis, 4)

def test_shutdown(self):
spans_names_list = []

Expand Down Expand Up @@ -266,7 +286,7 @@ def test_batch_span_processor_many_spans(self):
for _ in range(256):
_create_start_and_end_span("foo", span_processor)

time.sleep(0.05) # give some time for the exporter to upload spans
time.sleep(0.1) # give some time for the exporter to upload spans

self.assertTrue(span_processor.force_flush())
self.assertEqual(len(spans_names_list), 1024)
Expand Down

0 comments on commit 4af1341

Please sign in to comment.