Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement live metrics filtering for charts (part 1) #37998

Merged
merged 32 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
3209a6e
qp
lzchen Oct 11, 2024
691ea15
update config
lzchen Oct 14, 2024
59cb90d
telemetry type
lzchen Oct 14, 2024
57be629
init dervied metrics
lzchen Oct 14, 2024
bf0a70d
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-python …
lzchen Oct 15, 2024
3868560
target
lzchen Oct 15, 2024
569ca90
filtering + start of projections
lzchen Oct 17, 2024
0d1e46a
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-python …
lzchen Oct 17, 2024
055b8be
collection + exporting
lzchen Oct 19, 2024
09b1b8f
tests
lzchen Oct 19, 2024
2afed98
exporter tests
lzchen Oct 19, 2024
2845946
test live metrics
lzchen Oct 21, 2024
1eefc4a
trace utils
lzchen Oct 21, 2024
0da3860
derive
lzchen Oct 24, 2024
cba3518
filters
lzchen Oct 24, 2024
bea3637
projections
lzchen Oct 24, 2024
be7859c
live
lzchen Oct 25, 2024
3a1523c
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-python …
lzchen Oct 29, 2024
e87cb8b
Update _exporter.py
lzchen Oct 29, 2024
4ecc2d0
lint
lzchen Oct 29, 2024
fb5dbc8
black
lzchen Oct 29, 2024
ca864d8
lint
lzchen Oct 29, 2024
236f6f3
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-python …
lzchen Oct 29, 2024
f0ccd8b
lint
lzchen Oct 29, 2024
10df783
blac
lzchen Oct 29, 2024
d12aec6
comment
lzchen Oct 30, 2024
179c24d
Update test_utils.py
lzchen Oct 30, 2024
dc0d0a0
Update test_live_metrics.py
lzchen Oct 30, 2024
8de5913
comment
lzchen Oct 30, 2024
b8fb7cb
test
lzchen Oct 30, 2024
6c6da94
tests
lzchen Oct 30, 2024
979096a
lint
lzchen Oct 31, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
from enum import Enum
import sys

# cSpell:disable

Expand Down Expand Up @@ -47,21 +47,20 @@
_LONG_PING_INTERVAL_SECONDS = 60
_POST_CANCEL_INTERVAL_SECONDS = 20


# Live metrics data types
class _DocumentIngressDocumentType(Enum):
Request = "Request"
RemoteDependency = "RemoteDependency"
Exception = "Exception"
Event = "Event"
Trace = "Trace"


# Response Headers

_QUICKPULSE_ETAG_HEADER_NAME = "x-ms-qps-configuration-etag"
_QUICKPULSE_POLLING_HEADER_NAME = "x-ms-qps-service-polling-interval-hint"
_QUICKPULSE_REDIRECT_HEADER_NAME = "x-ms-qps-service-endpoint-redirect-v2"
_QUICKPULSE_SUBSCRIBED_HEADER_NAME = "x-ms-qps-subscribed"

# Projections (filtering)

_QUICKPULSE_PROJECTION_COUNT = "Count()"
_QUICKPULSE_PROJECTION_DURATION = "Duration"
_QUICKPULSE_PROJECTION_CUSTOM = "CustomDimensions."

_QUICKPULSE_PROJECTION_MAX_VALUE = sys.maxsize
_QUICKPULSE_PROJECTION_MIN_VALUE = -sys.maxsize - 1

# cSpell:enable
Original file line number Diff line number Diff line change
Expand Up @@ -23,27 +23,30 @@
MetricReader,
)

from azure.core.exceptions import HttpResponseError
from azure.core.pipeline.policies import ContentDecodePolicy
from azure.monitor.opentelemetry.exporter._quickpulse._constants import (
_LONG_PING_INTERVAL_SECONDS,
_POST_CANCEL_INTERVAL_SECONDS,
_POST_INTERVAL_SECONDS,
_QUICKPULSE_ETAG_HEADER_NAME,
_QUICKPULSE_SUBSCRIBED_HEADER_NAME,
)
from azure.monitor.opentelemetry.exporter._quickpulse._generated._configuration import QuickpulseClientConfiguration
from azure.monitor.opentelemetry.exporter._quickpulse._generated._client import QuickpulseClient
from azure.monitor.opentelemetry.exporter._quickpulse._generated.models import MonitoringDataPoint
from azure.monitor.opentelemetry.exporter._quickpulse._policy import _QuickpulseRedirectPolicy
from azure.monitor.opentelemetry.exporter._quickpulse._state import (
_get_and_clear_quickpulse_documents,
_get_global_quickpulse_state,
_get_quickpulse_etag,
_is_ping_state,
_set_global_quickpulse_state,
_get_and_clear_quickpulse_documents,
_set_quickpulse_etag,
_QuickpulseState,
)
from azure.monitor.opentelemetry.exporter._quickpulse._utils import (
_metric_to_quick_pulse_data_points,
_update_filter_configuration,
)
from azure.monitor.opentelemetry.exporter._connection_string_parser import ConnectionStringParser
from azure.monitor.opentelemetry.exporter._utils import (
Expand Down Expand Up @@ -143,13 +146,14 @@ def export(
base_monitoring_data_point=base_monitoring_data_point,
documents=_get_and_clear_quickpulse_documents(),
)

configuration_etag = _get_quickpulse_etag() or ""
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
try:
post_response = self._client.publish( # type: ignore
endpoint=self._live_endpoint,
monitoring_data_points=data_points,
ikey=self._instrumentation_key,
ikey=self._instrumentation_key, # type: ignore
configuration_etag=configuration_etag,
jeremydvoss marked this conversation as resolved.
Show resolved Hide resolved
transmission_time=_ticks_since_dot_net_epoch(),
cls=_Response,
)
Expand All @@ -163,6 +167,19 @@ def export(
if header != "true":
# User leaving the live metrics page will be treated as an unsuccessful
result = MetricExportResult.FAILURE
else:
# Check if etag has changed
etag = post_response._response_headers.get( # pylint: disable=protected-access
_QUICKPULSE_ETAG_HEADER_NAME # pylint: disable=protected-access
)
if etag and etag != configuration_etag:
config = (
post_response._pipeline_response.http_response.content # pylint: disable=protected-access
)
# Content will only be populated if configuration has changed (etag is different)
if config:
# Update and apply configuration changes
_update_filter_configuration(etag, config)
except Exception: # pylint: disable=broad-except,invalid-name
_logger.exception("Exception occurred while publishing live metrics.")
result = MetricExportResult.FAILURE
Expand Down Expand Up @@ -201,21 +218,23 @@ def shutdown(
def _ping(self, monitoring_data_point: MonitoringDataPoint) -> Optional[_Response]:
ping_response = None
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
etag = _get_quickpulse_etag() or ""
try:
ping_response = self._client.is_subscribed( # type: ignore
endpoint=self._live_endpoint,
monitoring_data_point=monitoring_data_point,
ikey=self._instrumentation_key,
ikey=self._instrumentation_key, # type: ignore
transmission_time=_ticks_since_dot_net_epoch(),
machine_name=monitoring_data_point.machine_name,
instance_name=monitoring_data_point.instance,
stream_id=monitoring_data_point.stream_id,
role_name=monitoring_data_point.role_name,
invariant_version=monitoring_data_point.invariant_version,
invariant_version=monitoring_data_point.invariant_version, # type: ignore
configuration_etag=etag,
cls=_Response,
)
return ping_response # type: ignore
except HttpResponseError:
except Exception: # pylint: disable=broad-except,invalid-name
_logger.exception("Exception occurred while pinging live metrics.")
detach(token)
return ping_response
Expand Down Expand Up @@ -243,28 +262,42 @@ def __init__(
)
self._worker.start()

# pylint: disable=protected-access
# pylint: disable=too-many-nested-blocks
def _ticker(self) -> None:
if _is_ping_state():
# Send a ping if elapsed number of request meets the threshold
if self._elapsed_num_seconds % _get_global_quickpulse_state().value == 0:
ping_response = self._exporter._ping( # pylint: disable=protected-access
ping_response = self._exporter._ping(
self._base_monitoring_data_point,
)
if ping_response:
header = ping_response._response_headers.get( # pylint: disable=protected-access
_QUICKPULSE_SUBSCRIBED_HEADER_NAME
)
if header and header == "true":
# Switch state to post if subscribed
_set_global_quickpulse_state(_QuickpulseState.POST_SHORT)
self._elapsed_num_seconds = 0
else:
# Backoff after _LONG_PING_INTERVAL_SECONDS (60s) of no successful requests
if (
_get_global_quickpulse_state() is _QuickpulseState.PING_SHORT
and self._elapsed_num_seconds >= _LONG_PING_INTERVAL_SECONDS
):
_set_global_quickpulse_state(_QuickpulseState.PING_LONG)
try:
lzchen marked this conversation as resolved.
Show resolved Hide resolved
subscribed = ping_response._response_headers.get(_QUICKPULSE_SUBSCRIBED_HEADER_NAME)
if subscribed and subscribed == "true":
# Switch state to post if subscribed
_set_global_quickpulse_state(_QuickpulseState.POST_SHORT)
self._elapsed_num_seconds = 0
# Update config etag
etag = ping_response._response_headers.get(_QUICKPULSE_ETAG_HEADER_NAME)
if etag is None:
etag = ""
if _get_quickpulse_etag() != etag:
_set_quickpulse_etag(etag)
lzchen marked this conversation as resolved.
Show resolved Hide resolved
# TODO: Set default document filter config from response body
# config = ping_response._pipeline_response.http_response.content
else:
# Backoff after _LONG_PING_INTERVAL_SECONDS (60s) of no successful requests
if (
_get_global_quickpulse_state() is _QuickpulseState.PING_SHORT
and self._elapsed_num_seconds >= _LONG_PING_INTERVAL_SECONDS
):
_set_global_quickpulse_state(_QuickpulseState.PING_LONG)
# Reset etag to default if not subscribed
_set_quickpulse_etag("")
except Exception: # pylint: disable=broad-except,invalid-name
_logger.exception("Exception occurred while pinging live metrics.")
_set_quickpulse_etag("")
# TODO: Implement redirect
else:
# Erroneous ping responses instigate backoff logic
Expand All @@ -274,6 +307,8 @@ def _ticker(self) -> None:
and self._elapsed_num_seconds >= _LONG_PING_INTERVAL_SECONDS
):
_set_global_quickpulse_state(_QuickpulseState.PING_LONG)
# Reset etag to default if error
_set_quickpulse_etag("")
else:
try:
self.collect()
Expand All @@ -283,6 +318,8 @@ def _ticker(self) -> None:
# And resume pinging
if self._elapsed_num_seconds >= _POST_CANCEL_INTERVAL_SECONDS:
_set_global_quickpulse_state(_QuickpulseState.PING_SHORT)
# Reset etag to default
_set_quickpulse_etag("")
self._elapsed_num_seconds = 0

self._elapsed_num_seconds += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from datetime import datetime
from typing import Any, Iterable

import logging
import platform
import psutil

Expand Down Expand Up @@ -39,6 +40,7 @@
_QuickpulseState,
_is_post_state,
_append_quickpulse_document,
_get_quickpulse_derived_metric_infos,
_get_quickpulse_last_process_cpu,
_get_quickpulse_last_process_time,
_get_quickpulse_process_elapsed_time,
Expand All @@ -47,7 +49,9 @@
_set_quickpulse_last_process_time,
_set_quickpulse_process_elapsed_time,
)
from azure.monitor.opentelemetry.exporter._quickpulse._types import _TelemetryData
from azure.monitor.opentelemetry.exporter._quickpulse._utils import (
_derive_metrics_from_telemetry_data,
_get_log_record_document,
_get_span_document,
)
Expand All @@ -61,6 +65,8 @@
Singleton,
)

_logger = logging.getLogger(__name__)


PROCESS = psutil.Process()
NUM_CPUS = psutil.cpu_count()
Expand Down Expand Up @@ -93,7 +99,8 @@ def __init__(self, **kwargs: Any) -> None:
id_generator = RandomIdGenerator()
self._base_monitoring_data_point = MonitoringDataPoint(
version=_get_sdk_version(),
invariant_version=1,
# Invariant version 5 indicates filtering is supported
invariant_version=5,
lzchen marked this conversation as resolved.
Show resolved Hide resolved
instance=part_a_fields.get(ContextTagKeys.AI_CLOUD_ROLE_INSTANCE, ""),
role_name=part_a_fields.get(ContextTagKeys.AI_CLOUD_ROLE, ""),
machine_name=platform.node(),
Expand Down Expand Up @@ -152,39 +159,60 @@ def __init__(self, **kwargs: Any) -> None:
def _record_span(self, span: ReadableSpan) -> None:
# Only record if in post state
if _is_post_state():
document = _get_span_document(span)
_append_quickpulse_document(document)
duration_ms = 0
if span.end_time and span.start_time:
duration_ms = (span.end_time - span.start_time) / 1e9 # type: ignore
# TODO: Spec out what "success" is
success = span.status.is_ok

if span.kind in (SpanKind.SERVER, SpanKind.CONSUMER):
if success:
self._request_rate_counter.add(1)
else:
self._request_failed_rate_counter.add(1)
self._request_duration.record(duration_ms)
else:
if success:
self._dependency_rate_counter.add(1)
try:
document = _get_span_document(span)
_append_quickpulse_document(document)
duration_ms = 0
if span.end_time and span.start_time:
duration_ms = (span.end_time - span.start_time) / 1e9 # type: ignore
# TODO: Spec out what "success" is
lzchen marked this conversation as resolved.
Show resolved Hide resolved
success = span.status.is_ok

if span.kind in (SpanKind.SERVER, SpanKind.CONSUMER):
if success:
self._request_rate_counter.add(1)
else:
self._request_failed_rate_counter.add(1)
self._request_duration.record(duration_ms)
else:
self._dependency_failure_rate_counter.add(1)
self._dependency_duration.record(duration_ms)
if success:
self._dependency_rate_counter.add(1)
else:
self._dependency_failure_rate_counter.add(1)
self._dependency_duration.record(duration_ms)

metric_infos_dict = _get_quickpulse_derived_metric_infos()
# check if filtering is enabled
if metric_infos_dict:
# Derive metrics for quickpulse filtering
data = _TelemetryData._from_span(span)
_derive_metrics_from_telemetry_data(data)
# TODO: derive exception metrics from span events
except Exception: # pylint: disable=broad-except
_logger.exception("Exception occurred while recording span.")

def _record_log_record(self, log_data: LogData) -> None:
# Only record if in post state
if _is_post_state():
if log_data.log_record:
log_record = log_data.log_record
if log_record.attributes:
document = _get_log_record_document(log_data)
_append_quickpulse_document(document)
exc_type = log_record.attributes.get(SpanAttributes.EXCEPTION_TYPE)
exc_message = log_record.attributes.get(SpanAttributes.EXCEPTION_MESSAGE)
if exc_type is not None or exc_message is not None:
self._exception_rate_counter.add(1)
try:
if log_data.log_record:
log_record = log_data.log_record
if log_record.attributes:
document = _get_log_record_document(log_data)
_append_quickpulse_document(document)
exc_type = log_record.attributes.get(SpanAttributes.EXCEPTION_TYPE)
exc_message = log_record.attributes.get(SpanAttributes.EXCEPTION_MESSAGE)
if exc_type is not None or exc_message is not None:
self._exception_rate_counter.add(1)

metric_infos_dict = _get_quickpulse_derived_metric_infos()
# check if filtering is enabled
if metric_infos_dict:
# Derive metrics for quickpulse filtering
data = _TelemetryData._from_log_record(log_record)
_derive_metrics_from_telemetry_data(data)
except Exception: # pylint: disable=broad-except
_logger.exception("Exception occurred while recording log record.")


# pylint: disable=unused-argument
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
# pylint: disable=protected-access
class _QuickpulseLogRecordProcessor(LogRecordProcessor):

def emit(self, log_data: LogData) -> None:
def emit(self, log_data: LogData) -> None: # type: ignore
qpm = _QuickpulseManager._instance
if qpm:
qpm._record_log_record(log_data)
Expand Down
Loading
Loading