Skip to content

Commit

Permalink
Adding speced out env vars and args to BLRP
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremydvoss committed Mar 24, 2023
1 parent b6a1b22 commit 84fbeaf
Show file tree
Hide file tree
Showing 4 changed files with 204 additions and 9 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#2964](https://github.com/open-telemetry/opentelemetry-python/pull/2964))
- Add OpenCensus trace bridge/shim
([#3210](https://github.com/open-telemetry/opentelemetry-python/pull/3210))
- Add speced out environment variables and arguments for BatchLogRecordProcessor
([#3237](https://github.com/open-telemetry/opentelemetry-python/pull/3237))

## Version 1.16.0/0.37b0 (2023-02-17)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,18 @@
import os
import sys
import threading
from os import linesep
from os import environ, linesep
from time import time_ns
from typing import IO, Callable, Deque, List, Optional, Sequence

from opentelemetry.context import attach, detach, set_value
from opentelemetry.sdk._logs import LogData, LogRecord, LogRecordProcessor
from opentelemetry.sdk.environment_variables import (
OTEL_BLRP_EXPORT_TIMEOUT,
OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
OTEL_BLRP_MAX_QUEUE_SIZE,
OTEL_BLRP_SCHEDULE_DELAY,
)
from opentelemetry.util._once import Once

_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -137,20 +143,66 @@ class BatchLogRecordProcessor(LogRecordProcessor):
"""This is an implementation of LogRecordProcessor which creates batches of
received logs in the export-friendly LogData representation and
send to the configured LogExporter, as soon as they are emitted.
`BatchLogRecordProcessor` is configurable with the following environment
variables which correspond to constructor parameters:
- :envvar:`OTEL_BLRP_SCHEDULE_DELAY`
- :envvar:`OTEL_BLRP_MAX_QUEUE_SIZE`
- :envvar:`OTEL_BLRP_MAX_EXPORT_BATCH_SIZE`
- :envvar:`OTEL_BLRP_EXPORT_TIMEOUT`
"""

def __init__(
self,
exporter: LogExporter,
schedule_delay_millis: int = 5000,
max_export_batch_size: int = 512,
export_timeout_millis: int = 30000,
max_queue_size: int = None,
schedule_delay_millis: int = None,
max_export_batch_size: int = None,
export_timeout_millis: int = None,
):
if max_queue_size is None:
max_queue_size = int(environ.get(OTEL_BLRP_MAX_QUEUE_SIZE, 2048))

if schedule_delay_millis is None:
schedule_delay_millis = int(
environ.get(OTEL_BLRP_SCHEDULE_DELAY, 5000)
)

if max_export_batch_size is None:
max_export_batch_size = int(
environ.get(OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, 512)
)

if export_timeout_millis is None:
export_timeout_millis = int(
environ.get(OTEL_BLRP_EXPORT_TIMEOUT, 30000)
)

if max_queue_size <= 0:
raise ValueError("max_queue_size must be a positive integer.")

if schedule_delay_millis <= 0:
raise ValueError("schedule_delay_millis must be positive.")

if max_export_batch_size <= 0:
raise ValueError(
"max_export_batch_size must be a positive integer."
)

if max_export_batch_size > max_queue_size:
raise ValueError(
"max_export_batch_size must be less than or equal to max_queue_size."
)

self._exporter = exporter
self._max_queue_size = max_queue_size
self._schedule_delay_millis = schedule_delay_millis
self._max_export_batch_size = max_export_batch_size
self._export_timeout_millis = export_timeout_millis
self._queue = collections.deque() # type: Deque[LogData]
self._queue = collections.deque(
[], max_queue_size
) # type: Deque[LogData]
self._worker_thread = threading.Thread(
name="OtelBatchLogRecordProcessor",
target=self.worker,
Expand Down
40 changes: 36 additions & 4 deletions opentelemetry-sdk/src/opentelemetry/sdk/environment_variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,35 +66,67 @@
i.e. the SDK behaves as if OTEL_TRACES_SAMPLER_ARG is not set.
"""

OTEL_BLRP_SCHEDULE_DELAY = "OTEL_BLRP_SCHEDULE_DELAY"
"""
.. envvar:: OTEL_BLRP_SCHEDULE_DELAY
The :envvar:`OTEL_BLRP_SCHEDULE_DELAY` represents the delay interval between two consecutive exports of the BatchLogRecordProcessor.
Default: 5000
"""

OTEL_BLRP_EXPORT_TIMEOUT = "OTEL_BLRP_EXPORT_TIMEOUT"
"""
.. envvar:: OTEL_BLRP_EXPORT_TIMEOUT
The :envvar:`OTEL_BLRP_EXPORT_TIMEOUT` represents the maximum allowed time to export data from the BatchLogRecordProcessor.
Default: 30000
"""

OTEL_BLRP_MAX_QUEUE_SIZE = "OTEL_BLRP_MAX_QUEUE_SIZE"
"""
.. envvar:: OTEL_BLRP_MAX_QUEUE_SIZE
The :envvar:`OTEL_BLRP_MAX_QUEUE_SIZE` represents the maximum queue size for the data export of the BatchLogRecordProcessor.
Default: 2048
"""

OTEL_BLRP_MAX_EXPORT_BATCH_SIZE = "OTEL_BLRP_MAX_EXPORT_BATCH_SIZE"
"""
.. envvar:: OTEL_BLRP_MAX_EXPORT_BATCH_SIZE
The :envvar:`OTEL_BLRP_MAX_EXPORT_BATCH_SIZE` represents the maximum batch size for the data export of the BatchLogRecordProcessor.
Default: 512
"""

OTEL_BSP_SCHEDULE_DELAY = "OTEL_BSP_SCHEDULE_DELAY"
"""
.. envvar:: OTEL_BSP_SCHEDULE_DELAY
The :envvar:`OTEL_BSP_SCHEDULE_DELAY` represents the delay interval between two consecutive exports.
The :envvar:`OTEL_BSP_SCHEDULE_DELAY` represents the delay interval between two consecutive exports of the BatchSpanProcessor.
Default: 5000
"""

OTEL_BSP_EXPORT_TIMEOUT = "OTEL_BSP_EXPORT_TIMEOUT"
"""
.. envvar:: OTEL_BSP_EXPORT_TIMEOUT
The :envvar:`OTEL_BSP_EXPORT_TIMEOUT` represents the maximum allowed time to export data.
The :envvar:`OTEL_BSP_EXPORT_TIMEOUT` represents the maximum allowed time to export data from the BatchSpanProcessor.
Default: 30000
"""

OTEL_BSP_MAX_QUEUE_SIZE = "OTEL_BSP_MAX_QUEUE_SIZE"
"""
.. envvar:: OTEL_BSP_MAX_QUEUE_SIZE
The :envvar:`OTEL_BSP_MAX_QUEUE_SIZE` represents the maximum queue size for the data export.
The :envvar:`OTEL_BSP_MAX_QUEUE_SIZE` represents the maximum queue size for the data export of the BatchSpanProcessor.
Default: 2048
"""

OTEL_BSP_MAX_EXPORT_BATCH_SIZE = "OTEL_BSP_MAX_EXPORT_BATCH_SIZE"
"""
.. envvar:: OTEL_BSP_MAX_EXPORT_BATCH_SIZE
The :envvar:`OTEL_BSP_MAX_EXPORT_BATCH_SIZE` represents the maximum batch size for the data export.
The :envvar:`OTEL_BSP_MAX_EXPORT_BATCH_SIZE` represents the maximum batch size for the data export of the BatchSpanProcessor.
Default: 512
"""

Expand Down
109 changes: 109 additions & 0 deletions opentelemetry-sdk/tests/logs/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@
InMemoryLogExporter,
SimpleLogRecordProcessor,
)
from opentelemetry.sdk.environment_variables import (
OTEL_BLRP_EXPORT_TIMEOUT,
OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
OTEL_BLRP_MAX_QUEUE_SIZE,
OTEL_BLRP_SCHEDULE_DELAY,
)
from opentelemetry.sdk.resources import Resource as SDKResource
from opentelemetry.sdk.util.instrumentation import InstrumentationScope
from opentelemetry.test.concurrency_test import ConcurrencyTestBase
Expand Down Expand Up @@ -175,6 +181,109 @@ def test_emit_call_log_record(self):
logger.error("error")
self.assertEqual(log_record_processor.emit.call_count, 1)

def test_args(self):
exporter = InMemoryLogExporter()
log_record_processor = BatchLogRecordProcessor(
exporter,
max_queue_size=1024,
schedule_delay_millis=2500,
max_export_batch_size=256,
export_timeout_millis=15000,
)
self.assertEqual(log_record_processor._exporter, exporter)
self.assertEqual(log_record_processor._max_queue_size, 1024)
self.assertEqual(log_record_processor._schedule_delay_millis, 2500)
self.assertEqual(log_record_processor._max_export_batch_size, 256)
self.assertEqual(log_record_processor._export_timeout_millis, 15000)

@patch.dict(
"os.environ",
{
OTEL_BLRP_MAX_QUEUE_SIZE: "1024",
OTEL_BLRP_SCHEDULE_DELAY: "2500",
OTEL_BLRP_MAX_EXPORT_BATCH_SIZE: "256",
OTEL_BLRP_EXPORT_TIMEOUT: "15000",
},
)
def test_env_vars(self):
exporter = InMemoryLogExporter()
log_record_processor = BatchLogRecordProcessor(exporter)
self.assertEqual(log_record_processor._exporter, exporter)
self.assertEqual(log_record_processor._max_queue_size, 1024)
self.assertEqual(log_record_processor._schedule_delay_millis, 2500)
self.assertEqual(log_record_processor._max_export_batch_size, 256)
self.assertEqual(log_record_processor._export_timeout_millis, 15000)

def test_args_defaults(self):
exporter = InMemoryLogExporter()
log_record_processor = BatchLogRecordProcessor(exporter)
self.assertEqual(log_record_processor._exporter, exporter)
self.assertEqual(log_record_processor._max_queue_size, 2048)
self.assertEqual(log_record_processor._schedule_delay_millis, 5000)
self.assertEqual(log_record_processor._max_export_batch_size, 512)
self.assertEqual(log_record_processor._export_timeout_millis, 30000)

def test_args_none_defaults(self):
exporter = InMemoryLogExporter()
log_record_processor = BatchLogRecordProcessor(
exporter,
max_queue_size=None,
schedule_delay_millis=None,
max_export_batch_size=None,
export_timeout_millis=None,
)
self.assertEqual(log_record_processor._exporter, exporter)
self.assertEqual(log_record_processor._max_queue_size, 2048)
self.assertEqual(log_record_processor._schedule_delay_millis, 5000)
self.assertEqual(log_record_processor._max_export_batch_size, 512)
self.assertEqual(log_record_processor._export_timeout_millis, 30000)

def test_validation_negative_max_queue_size(self):
exporter = InMemoryLogExporter()
self.assertRaises(
ValueError,
BatchLogRecordProcessor,
exporter,
max_queue_size=0,
)
self.assertRaises(
ValueError,
BatchLogRecordProcessor,
exporter,
max_queue_size=-1,
)
self.assertRaises(
ValueError,
BatchLogRecordProcessor,
exporter,
schedule_delay_millis=0,
)
self.assertRaises(
ValueError,
BatchLogRecordProcessor,
exporter,
schedule_delay_millis=-1,
)
self.assertRaises(
ValueError,
BatchLogRecordProcessor,
exporter,
max_export_batch_size=0,
)
self.assertRaises(
ValueError,
BatchLogRecordProcessor,
exporter,
max_export_batch_size=-1,
)
self.assertRaises(
ValueError,
BatchLogRecordProcessor,
exporter,
max_queue_size=100,
max_export_batch_size=101,
)

def test_shutdown(self):
exporter = InMemoryLogExporter()
log_record_processor = BatchLogRecordProcessor(exporter)
Expand Down

0 comments on commit 84fbeaf

Please sign in to comment.