Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding OpenTelemetry exporter code #14784

Merged
merged 30 commits into from
Nov 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
55343e0
Adding OpenTelemetry exporter code
hectorhdzg Oct 26, 2020
3590ac8
Adding OT requires
hectorhdzg Oct 26, 2020
b55b73b
Update ci.yml
rakshith91 Oct 28, 2020
5abed09
Update README.md
rakshith91 Oct 28, 2020
e9eb839
rename samples
rakshith91 Oct 29, 2020
495c119
Merge remote-tracking branch 'upstream/master' into hectorhdzg/otexpo…
hectorhdzg Nov 4, 2020
78a6953
Merge branch 'hectorhdzg/otexporter' of https://github.com/hectorhdzg…
hectorhdzg Nov 4, 2020
22026dc
Addressing comments
hectorhdzg Nov 4, 2020
fc05356
Merge remote-tracking branch 'upstream/master' into hectorhdzg/otexpo…
hectorhdzg Nov 4, 2020
a5e2b89
Updating generated client name
hectorhdzg Nov 4, 2020
e08fa26
changelog fix
rakshith91 Nov 5, 2020
bf1093b
ignore 27
rakshith91 Nov 5, 2020
51489a9
oops
rakshith91 Nov 5, 2020
549f203
3.5
rakshith91 Nov 5, 2020
6fdfb37
Update sdk/monitor/opentelemetry-exporter-azuremonitor/opentelemetry/…
rakshith91 Nov 5, 2020
1ce6bbb
Merge remote-tracking branch 'upstream/master' into hectorhdzg/otexpo…
hectorhdzg Nov 10, 2020
de56d02
Fixing lint
hectorhdzg Nov 10, 2020
1adc16c
Adding check for storage test path
hectorhdzg Nov 10, 2020
871b55e
Adding check in tearDown
hectorhdzg Nov 10, 2020
75e1912
Adding file check in tests
hectorhdzg Nov 10, 2020
11d17cb
Add check in setUp
hectorhdzg Nov 10, 2020
bd971d8
Disable put test
hectorhdzg Nov 11, 2020
c6ab18b
Adding ignore_errors = true for rmtree method
hectorhdzg Nov 11, 2020
611bb43
Merge remote-tracking branch 'upstream/master' into hectorhdzg/otexpo…
hectorhdzg Nov 11, 2020
b8a57cc
Renaming import update
hectorhdzg Nov 11, 2020
faaf083
Merge remote-tracking branch 'upstream/master' into hectorhdzg/otexpo…
hectorhdzg Nov 11, 2020
7c8bbe1
fix set up
rakshith91 Nov 11, 2020
8e2500a
Merge branch 'hectorhdzg/otexporter' of https://github.com/hectorhdzg…
hectorhdzg Nov 11, 2020
3bd0773
Updating swagger readme
hectorhdzg Nov 11, 2020
f4e9524
Adding exist_ok=True to makedirs method
hectorhdzg Nov 11, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
# Release History

## Unreleased
## 1.0.0b1 (Unreleased)

## 0.5b.0
Released 2020-09-24
**Breaking Changes**
- This library is renamed to `microsoft-opentelemetry-exporter-azuremonitor`.

## 0.5b.0 (2020-09-24)

- Change epoch for live metrics
([#115](https://github.com/microsoft/opentelemetry-azure-monitor-python/pull/115))
- Dropping support for Python 3.4
([#117](https://github.com/microsoft/opentelemetry-azure-monitor-python/pull/117))

## 0.4b.0
Released 2020-06-29
## 0.4b.0 (2020-06-29)

- Added live metrics
([#96](https://github.com/microsoft/opentelemetry-azure-monitor-python/pull/96))
Expand All @@ -24,14 +25,12 @@ Released 2020-06-29
- Remove request failed per second metrics from auto-collection
([#102](https://github.com/microsoft/opentelemetry-azure-monitor-python/pull/102))

## 0.3b.1
Released 2020-05-21
## 0.3b.1 (2020-05-21)

- Fix metrics exporter serialization bug
([#92](https://github.com/microsoft/opentelemetry-azure-monitor-python/pull/92))

## 0.3b.0
Released 2020-05-19
## 0.3b.0 (2020-05-19)

- Implement max size logic for local storage
([#74](https://github.com/microsoft/opentelemetry-azure-monitor-python/pull/74))
Expand All @@ -44,12 +43,10 @@ Released 2020-05-19
- Fix breaking changes from OT release 0.7b.0
([#86](https://github.com/microsoft/opentelemetry-azure-monitor-python/pull/86))

## 0.2b.0
Released 2020-03-31
## 0.2b.0 (2020-03-31)

- Initial beta release

## 0.1a.0
Released 2019-11-06
## 0.1a.0 (2019-11-06)

- Initial alpha release
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
include *.md
include opentelemetry/__init__.py
include opentelemetry/exporter/__init__.py
include microsoft/opentelemetry/__init__.py
include microsoft/opentelemetry/exporter/__init__.py
include LICENSE.txt
recursive-include tests *.py
recursive-include samples *.py *.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------

from ._generated._azure_monitor_exporter import AzureMonitorExporter
from microsoft.opentelemetry.exporter.azuremonitor.export.trace import AzureMonitorSpanExporter
from ._version import VERSION

__all__ = ['AzureMonitorExporter']
__all__ = ["AzureMonitorSpanExporter"]
__version__ = VERSION
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import logging
import typing
from enum import Enum

from azure.core.exceptions import HttpResponseError
from azure.core.pipeline.policies import ProxyPolicy, RetryPolicy

from opentelemetry.sdk.metrics.export import MetricsExportResult
from opentelemetry.sdk.trace.export import SpanExportResult
from microsoft.opentelemetry.exporter.azuremonitor._generated import AzureMonitorClient
from microsoft.opentelemetry.exporter.azuremonitor._generated.models import TelemetryItem
from microsoft.opentelemetry.exporter.azuremonitor.options import ExporterOptions
from microsoft.opentelemetry.exporter.azuremonitor.storage import LocalFileStorage


logger = logging.getLogger(__name__)


class ExportResult(Enum):
SUCCESS = 0
FAILED_RETRYABLE = 1
FAILED_NOT_RETRYABLE = 2


# pylint: disable=broad-except
class BaseExporter:
"""Azure Monitor base exporter for OpenTelemetry.

Args:
options: :doc:`export.options` to allow configuration for the exporter
"""

def __init__(self, **options):
self._telemetry_processors = []
self.options = ExporterOptions(**options)
retry_policy = RetryPolicy(timeout=self.options.timeout)
proxy_policy = ProxyPolicy(proxies=self.options.proxies)
self.client = AzureMonitorClient(
self.options.endpoint, proxy_policy=proxy_policy, retry_policy=retry_policy)
self.storage = LocalFileStorage(
path=self.options.storage_path,
max_size=self.options.storage_max_size,
maintenance_period=self.options.storage_maintenance_period,
retention_period=self.options.storage_retention_period,
)

def add_telemetry_processor(
self, processor: typing.Callable[..., any]
) -> None:
"""Adds telemetry processor to the collection.

Telemetry processors will be called one by one before telemetry
item is pushed for sending and in the order they were added.

Args:
processor: Processor to add
"""
self._telemetry_processors.append(processor)

def clear_telemetry_processors(self) -> None:
"""Removes all telemetry processors"""
self._telemetry_processors = []

def _apply_telemetry_processors(
self, envelopes: typing.List[TelemetryItem]
) -> typing.List[TelemetryItem]:
"""Applies all telemetry processors in the order they were added.

This function will return the list of envelopes to be exported after
each processor has been run sequentially. Individual processors can
throw exceptions and fail, but the applying of all telemetry processors
will proceed (not fast fail). Processors also return True if envelope
should be included for exporting, False otherwise.

Args:
envelopes: The envelopes to apply each processor to.
"""
filtered_envelopes = []
for envelope in envelopes:
accepted = True
for processor in self._telemetry_processors:
try:
if processor(envelope) is False:
accepted = False
break
except Exception as ex:
logger.warning("Telemetry processor failed with: %s.", ex)
if accepted:
filtered_envelopes.append(envelope)
return filtered_envelopes

def _transmit_from_storage(self) -> None:
for blob in self.storage.gets():
# give a few more seconds for blob lease operation
# to reduce the chance of race (for perf consideration)
if blob.lease(self.options.timeout + 5):
envelopes = blob.get()
result = self._transmit(envelopes)
if result == ExportResult.FAILED_RETRYABLE:
blob.lease(1)
else:
blob.delete()

# pylint: disable=too-many-branches
# pylint: disable=too-many-nested-blocks
# pylint: disable=too-many-return-statements
def _transmit(self, envelopes: typing.List[TelemetryItem]) -> ExportResult:
"""
Transmit the data envelopes to the ingestion service.

Returns an ExportResult, this function should never
throw an exception.
"""
if len(envelopes) > 0:
try:
track_response = self.client.track(envelopes)
if not track_response.errors:
logger.info("Transmission succeeded: Item received: %s. Items accepted: %s",
track_response.items_received, track_response.items_accepted)
return ExportResult.SUCCESS
resend_envelopes = []
for error in track_response.errors:
if is_retryable_code(error.statusCode):
resend_envelopes.append(
envelopes[error.index]
)
else:
logger.error(
"Data drop %s: %s %s.",
error.statusCode,
error.message,
envelopes[error.index],
)
if resend_envelopes:
self.storage.put(resend_envelopes)

except HttpResponseError as response_error:
if is_retryable_code(response_error.status_code):
return ExportResult.FAILED_RETRYABLE
return ExportResult.FAILED_NOT_RETRYABLE
except Exception as ex:
logger.warning(
"Retrying due to transient client side error %s.", ex
)
# client side error (retryable)
return ExportResult.FAILED_RETRYABLE
return ExportResult.FAILED_NOT_RETRYABLE
# No spans to export
return ExportResult.SUCCESS


def is_retryable_code(response_code: int) -> bool:
"""
Determine if response is retryable
"""
return bool(response_code in (
206, # Retriable
408, # Timeout
429, # Throttle, too Many Requests
439, # Quota, too Many Requests over extended time
500, # Internal Server Error
503, # Service Unavailable
))


def get_trace_export_result(result: ExportResult) -> SpanExportResult:
if result == ExportResult.SUCCESS:
return SpanExportResult.SUCCESS
if result in (
ExportResult.FAILED_RETRYABLE,
ExportResult.FAILED_NOT_RETRYABLE,
):
return SpanExportResult.FAILURE
return None


def get_metrics_export_result(result: ExportResult) -> MetricsExportResult:
if result == ExportResult.SUCCESS:
return MetricsExportResult.SUCCESS
if result in (
ExportResult.FAILED_RETRYABLE,
ExportResult.FAILED_NOT_RETRYABLE,
):
return MetricsExportResult.FAILURE
return None
Loading