Skip to content

Commit

Permalink
Implement live metrics filtering for charts (part 1) (Azure#37998)
Browse files Browse the repository at this point in the history
  • Loading branch information
lzchen authored and allenkim0129 committed Nov 5, 2024
1 parent e6c2c82 commit 25d4b5d
Show file tree
Hide file tree
Showing 13 changed files with 1,067 additions and 115 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
from enum import Enum
import sys

# cSpell:disable

Expand Down Expand Up @@ -47,21 +47,20 @@
_LONG_PING_INTERVAL_SECONDS = 60
_POST_CANCEL_INTERVAL_SECONDS = 20


# Live metrics data types
class _DocumentIngressDocumentType(Enum):
Request = "Request"
RemoteDependency = "RemoteDependency"
Exception = "Exception"
Event = "Event"
Trace = "Trace"


# Response Headers

_QUICKPULSE_ETAG_HEADER_NAME = "x-ms-qps-configuration-etag"
_QUICKPULSE_POLLING_HEADER_NAME = "x-ms-qps-service-polling-interval-hint"
_QUICKPULSE_REDIRECT_HEADER_NAME = "x-ms-qps-service-endpoint-redirect-v2"
_QUICKPULSE_SUBSCRIBED_HEADER_NAME = "x-ms-qps-subscribed"

# Projections (filtering)

_QUICKPULSE_PROJECTION_COUNT = "Count()"
_QUICKPULSE_PROJECTION_DURATION = "Duration"
_QUICKPULSE_PROJECTION_CUSTOM = "CustomDimensions."

_QUICKPULSE_PROJECTION_MAX_VALUE = sys.maxsize
_QUICKPULSE_PROJECTION_MIN_VALUE = -sys.maxsize - 1

# cSpell:enable
Original file line number Diff line number Diff line change
Expand Up @@ -23,27 +23,30 @@
MetricReader,
)

from azure.core.exceptions import HttpResponseError
from azure.core.pipeline.policies import ContentDecodePolicy
from azure.monitor.opentelemetry.exporter._quickpulse._constants import (
_LONG_PING_INTERVAL_SECONDS,
_POST_CANCEL_INTERVAL_SECONDS,
_POST_INTERVAL_SECONDS,
_QUICKPULSE_ETAG_HEADER_NAME,
_QUICKPULSE_SUBSCRIBED_HEADER_NAME,
)
from azure.monitor.opentelemetry.exporter._quickpulse._generated._configuration import QuickpulseClientConfiguration
from azure.monitor.opentelemetry.exporter._quickpulse._generated._client import QuickpulseClient
from azure.monitor.opentelemetry.exporter._quickpulse._generated.models import MonitoringDataPoint
from azure.monitor.opentelemetry.exporter._quickpulse._policy import _QuickpulseRedirectPolicy
from azure.monitor.opentelemetry.exporter._quickpulse._state import (
_get_and_clear_quickpulse_documents,
_get_global_quickpulse_state,
_get_quickpulse_etag,
_is_ping_state,
_set_global_quickpulse_state,
_get_and_clear_quickpulse_documents,
_set_quickpulse_etag,
_QuickpulseState,
)
from azure.monitor.opentelemetry.exporter._quickpulse._utils import (
_metric_to_quick_pulse_data_points,
_update_filter_configuration,
)
from azure.monitor.opentelemetry.exporter._connection_string_parser import ConnectionStringParser
from azure.monitor.opentelemetry.exporter._utils import (
Expand Down Expand Up @@ -143,13 +146,14 @@ def export(
base_monitoring_data_point=base_monitoring_data_point,
documents=_get_and_clear_quickpulse_documents(),
)

configuration_etag = _get_quickpulse_etag() or ""
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
try:
post_response = self._client.publish( # type: ignore
endpoint=self._live_endpoint,
monitoring_data_points=data_points,
ikey=self._instrumentation_key,
ikey=self._instrumentation_key, # type: ignore
configuration_etag=configuration_etag,
transmission_time=_ticks_since_dot_net_epoch(),
cls=_Response,
)
Expand All @@ -163,6 +167,19 @@ def export(
if header != "true":
# User leaving the live metrics page will be treated as an unsuccessful
result = MetricExportResult.FAILURE
else:
# Check if etag has changed
etag = post_response._response_headers.get( # pylint: disable=protected-access
_QUICKPULSE_ETAG_HEADER_NAME # pylint: disable=protected-access
)
if etag and etag != configuration_etag:
config = (
post_response._pipeline_response.http_response.content # pylint: disable=protected-access
)
# Content will only be populated if configuration has changed (etag is different)
if config:
# Update and apply configuration changes
_update_filter_configuration(etag, config)
except Exception: # pylint: disable=broad-except,invalid-name
_logger.exception("Exception occurred while publishing live metrics.")
result = MetricExportResult.FAILURE
Expand Down Expand Up @@ -201,21 +218,23 @@ def shutdown(
def _ping(self, monitoring_data_point: MonitoringDataPoint) -> Optional[_Response]:
ping_response = None
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
etag = _get_quickpulse_etag() or ""
try:
ping_response = self._client.is_subscribed( # type: ignore
endpoint=self._live_endpoint,
monitoring_data_point=monitoring_data_point,
ikey=self._instrumentation_key,
ikey=self._instrumentation_key, # type: ignore
transmission_time=_ticks_since_dot_net_epoch(),
machine_name=monitoring_data_point.machine_name,
instance_name=monitoring_data_point.instance,
stream_id=monitoring_data_point.stream_id,
role_name=monitoring_data_point.role_name,
invariant_version=monitoring_data_point.invariant_version,
invariant_version=monitoring_data_point.invariant_version, # type: ignore
configuration_etag=etag,
cls=_Response,
)
return ping_response # type: ignore
except HttpResponseError:
except Exception: # pylint: disable=broad-except,invalid-name
_logger.exception("Exception occurred while pinging live metrics.")
detach(token)
return ping_response
Expand Down Expand Up @@ -243,28 +262,42 @@ def __init__(
)
self._worker.start()

# pylint: disable=protected-access
# pylint: disable=too-many-nested-blocks
def _ticker(self) -> None:
if _is_ping_state():
# Send a ping if elapsed number of request meets the threshold
if self._elapsed_num_seconds % _get_global_quickpulse_state().value == 0:
ping_response = self._exporter._ping( # pylint: disable=protected-access
ping_response = self._exporter._ping(
self._base_monitoring_data_point,
)
if ping_response:
header = ping_response._response_headers.get( # pylint: disable=protected-access
_QUICKPULSE_SUBSCRIBED_HEADER_NAME
)
if header and header == "true":
# Switch state to post if subscribed
_set_global_quickpulse_state(_QuickpulseState.POST_SHORT)
self._elapsed_num_seconds = 0
else:
# Backoff after _LONG_PING_INTERVAL_SECONDS (60s) of no successful requests
if (
_get_global_quickpulse_state() is _QuickpulseState.PING_SHORT
and self._elapsed_num_seconds >= _LONG_PING_INTERVAL_SECONDS
):
_set_global_quickpulse_state(_QuickpulseState.PING_LONG)
try:
subscribed = ping_response._response_headers.get(_QUICKPULSE_SUBSCRIBED_HEADER_NAME)
if subscribed and subscribed == "true":
# Switch state to post if subscribed
_set_global_quickpulse_state(_QuickpulseState.POST_SHORT)
self._elapsed_num_seconds = 0
# Update config etag
etag = ping_response._response_headers.get(_QUICKPULSE_ETAG_HEADER_NAME)
if etag is None:
etag = ""
if _get_quickpulse_etag() != etag:
_set_quickpulse_etag(etag)
# TODO: Set default document filter config from response body
# config = ping_response._pipeline_response.http_response.content
else:
# Backoff after _LONG_PING_INTERVAL_SECONDS (60s) of no successful requests
if (
_get_global_quickpulse_state() is _QuickpulseState.PING_SHORT
and self._elapsed_num_seconds >= _LONG_PING_INTERVAL_SECONDS
):
_set_global_quickpulse_state(_QuickpulseState.PING_LONG)
# Reset etag to default if not subscribed
_set_quickpulse_etag("")
except Exception: # pylint: disable=broad-except,invalid-name
_logger.exception("Exception occurred while pinging live metrics.")
_set_quickpulse_etag("")
# TODO: Implement redirect
else:
# Erroneous ping responses instigate backoff logic
Expand All @@ -274,6 +307,8 @@ def _ticker(self) -> None:
and self._elapsed_num_seconds >= _LONG_PING_INTERVAL_SECONDS
):
_set_global_quickpulse_state(_QuickpulseState.PING_LONG)
# Reset etag to default if error
_set_quickpulse_etag("")
else:
try:
self.collect()
Expand All @@ -283,6 +318,8 @@ def _ticker(self) -> None:
# And resume pinging
if self._elapsed_num_seconds >= _POST_CANCEL_INTERVAL_SECONDS:
_set_global_quickpulse_state(_QuickpulseState.PING_SHORT)
# Reset etag to default
_set_quickpulse_etag("")
self._elapsed_num_seconds = 0

self._elapsed_num_seconds += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from datetime import datetime
from typing import Any, Iterable

import logging
import platform
import psutil

Expand Down Expand Up @@ -39,6 +40,7 @@
_QuickpulseState,
_is_post_state,
_append_quickpulse_document,
_get_quickpulse_derived_metric_infos,
_get_quickpulse_last_process_cpu,
_get_quickpulse_last_process_time,
_get_quickpulse_process_elapsed_time,
Expand All @@ -47,7 +49,9 @@
_set_quickpulse_last_process_time,
_set_quickpulse_process_elapsed_time,
)
from azure.monitor.opentelemetry.exporter._quickpulse._types import _TelemetryData
from azure.monitor.opentelemetry.exporter._quickpulse._utils import (
_derive_metrics_from_telemetry_data,
_get_log_record_document,
_get_span_document,
)
Expand All @@ -61,6 +65,8 @@
Singleton,
)

_logger = logging.getLogger(__name__)


PROCESS = psutil.Process()
NUM_CPUS = psutil.cpu_count()
Expand Down Expand Up @@ -93,7 +99,8 @@ def __init__(self, **kwargs: Any) -> None:
id_generator = RandomIdGenerator()
self._base_monitoring_data_point = MonitoringDataPoint(
version=_get_sdk_version(),
invariant_version=1,
# Invariant version 5 indicates filtering is supported
invariant_version=5,
instance=part_a_fields.get(ContextTagKeys.AI_CLOUD_ROLE_INSTANCE, ""),
role_name=part_a_fields.get(ContextTagKeys.AI_CLOUD_ROLE, ""),
machine_name=platform.node(),
Expand Down Expand Up @@ -152,39 +159,60 @@ def __init__(self, **kwargs: Any) -> None:
def _record_span(self, span: ReadableSpan) -> None:
# Only record if in post state
if _is_post_state():
document = _get_span_document(span)
_append_quickpulse_document(document)
duration_ms = 0
if span.end_time and span.start_time:
duration_ms = (span.end_time - span.start_time) / 1e9 # type: ignore
# TODO: Spec out what "success" is
success = span.status.is_ok

if span.kind in (SpanKind.SERVER, SpanKind.CONSUMER):
if success:
self._request_rate_counter.add(1)
else:
self._request_failed_rate_counter.add(1)
self._request_duration.record(duration_ms)
else:
if success:
self._dependency_rate_counter.add(1)
try:
document = _get_span_document(span)
_append_quickpulse_document(document)
duration_ms = 0
if span.end_time and span.start_time:
duration_ms = (span.end_time - span.start_time) / 1e9 # type: ignore
# TODO: Spec out what "success" is
success = span.status.is_ok

if span.kind in (SpanKind.SERVER, SpanKind.CONSUMER):
if success:
self._request_rate_counter.add(1)
else:
self._request_failed_rate_counter.add(1)
self._request_duration.record(duration_ms)
else:
self._dependency_failure_rate_counter.add(1)
self._dependency_duration.record(duration_ms)
if success:
self._dependency_rate_counter.add(1)
else:
self._dependency_failure_rate_counter.add(1)
self._dependency_duration.record(duration_ms)

metric_infos_dict = _get_quickpulse_derived_metric_infos()
# check if filtering is enabled
if metric_infos_dict:
# Derive metrics for quickpulse filtering
data = _TelemetryData._from_span(span)
_derive_metrics_from_telemetry_data(data)
# TODO: derive exception metrics from span events
except Exception: # pylint: disable=broad-except
_logger.exception("Exception occurred while recording span.")

def _record_log_record(self, log_data: LogData) -> None:
# Only record if in post state
if _is_post_state():
if log_data.log_record:
log_record = log_data.log_record
if log_record.attributes:
document = _get_log_record_document(log_data)
_append_quickpulse_document(document)
exc_type = log_record.attributes.get(SpanAttributes.EXCEPTION_TYPE)
exc_message = log_record.attributes.get(SpanAttributes.EXCEPTION_MESSAGE)
if exc_type is not None or exc_message is not None:
self._exception_rate_counter.add(1)
try:
if log_data.log_record:
log_record = log_data.log_record
if log_record.attributes:
document = _get_log_record_document(log_data)
_append_quickpulse_document(document)
exc_type = log_record.attributes.get(SpanAttributes.EXCEPTION_TYPE)
exc_message = log_record.attributes.get(SpanAttributes.EXCEPTION_MESSAGE)
if exc_type is not None or exc_message is not None:
self._exception_rate_counter.add(1)

metric_infos_dict = _get_quickpulse_derived_metric_infos()
# check if filtering is enabled
if metric_infos_dict:
# Derive metrics for quickpulse filtering
data = _TelemetryData._from_log_record(log_record)
_derive_metrics_from_telemetry_data(data)
except Exception: # pylint: disable=broad-except
_logger.exception("Exception occurred while recording log record.")


# pylint: disable=unused-argument
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
# pylint: disable=protected-access
class _QuickpulseLogRecordProcessor(LogRecordProcessor):

def emit(self, log_data: LogData) -> None:
def emit(self, log_data: LogData) -> None: # type: ignore
qpm = _QuickpulseManager._instance
if qpm:
qpm._record_log_record(log_data)
Expand Down
Loading

0 comments on commit 25d4b5d

Please sign in to comment.