Skip to content

Commit

Permalink
linting
Browse files Browse the repository at this point in the history
  • Loading branch information
cnnradams committed Jul 24, 2020
1 parent a281776 commit ab2c1fa
Show file tree
Hide file tree
Showing 8 changed files with 358 additions and 125 deletions.
40 changes: 29 additions & 11 deletions docs/examples/exemplars/semantic_exemplars.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,19 @@
import time

from opentelemetry import metrics
from opentelemetry.sdk.metrics import (
MeterProvider,
ValueRecorder,
)
from opentelemetry.sdk.metrics import MeterProvider, ValueRecorder
from opentelemetry.sdk.metrics.export import ConsoleMetricsExporter
from opentelemetry.sdk.metrics.export.aggregate import (
HistogramAggregator,
)
from opentelemetry.sdk.metrics.export.aggregate import HistogramAggregator
from opentelemetry.sdk.metrics.view import View, ViewConfig

# Set up OpenTelemetry metrics
metrics.set_meter_provider(MeterProvider(stateful=False))
meter = metrics.get_meter(__name__)

# Use the Google Cloud Monitoring Metrics Exporter since its the only one that currently supports exemplars
metrics.get_meter_provider().start_pipeline(meter, ConsoleMetricsExporter(), 10)
metrics.get_meter_provider().start_pipeline(
meter, ConsoleMetricsExporter(), 10
)

# Create our duration metric
request_duration = meter.create_metric(
Expand All @@ -53,8 +50,26 @@
# [>=0ms, >=25ms, >=50ms, >=75ms, >=100ms, >=200ms, >=400ms, >=600ms, >=800ms, >=1s, >=2s, >=4s, >=6s]
# We want to generate 1 exemplar per bucket, where each exemplar has a linked trace that was recorded.
# So we need to set num_exemplars to 1 and not specify statistical_exemplars (defaults to false)
HistogramAggregator(config={"bounds": [0, 25, 50, 75, 100, 200, 400, 600, 800, 1000, 2000, 4000, 6000],
"num_exemplars": 1}),
HistogramAggregator(
config={
"bounds": [
0,
25,
50,
75,
100,
200,
400,
600,
800,
1000,
2000,
4000,
6000,
],
"num_exemplars": 1,
}
),
label_keys=["environment"],
config=ViewConfig.LABEL_KEYS,
)
Expand All @@ -63,5 +78,8 @@

for i in range(100):
# Generate some random data for the histogram with a dropped label "customer_id"
request_duration.record(random.randint(1, 8000), {"environment": "staging", "customer_id": random.randint(1, 100)})
request_duration.record(
random.randint(1, 8000),
{"environment": "staging", "customer_id": random.randint(1, 100)},
)
time.sleep(1)
90 changes: 71 additions & 19 deletions docs/examples/exemplars/statistical_exemplars.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
import numpy as np
import matplotlib.pyplot as plt
import random

from collections import defaultdict

import matplotlib.pyplot as plt
import numpy as np
from opentelemetry import metrics
from opentelemetry.sdk.metrics import Counter, MeterProvider
from opentelemetry.sdk.metrics.export.aggregate import SumAggregator
from opentelemetry.sdk.metrics.export.controller import PushController
from opentelemetry.sdk.metrics.export.in_memory_metrics_exporter import InMemoryMetricsExporter
from opentelemetry.sdk.metrics.export.in_memory_metrics_exporter import (
InMemoryMetricsExporter,
)
from opentelemetry.sdk.metrics.view import View, ViewConfig

## set up opentelemetry
# set up opentelemetry

# Sets the global MeterProvider instance
metrics.set_meter_provider(MeterProvider())
Expand Down Expand Up @@ -46,7 +47,8 @@

meter.register_view(counter_view)

## generate the random metric data
# generate the random metric data


def unknown_customer_calls():
"""Generate customer call data to our application"""
Expand All @@ -57,23 +59,49 @@ def unknown_customer_calls():
random.seed(1)

# customer 123 is a big user, and made 1000 requests in this timeframe
requests = np.random.normal(1000, 250, 1000) # 1000 requests with average 1000 bytes, covariance 100
requests = np.random.normal(
1000, 250, 1000
) # 1000 requests with average 1000 bytes, covariance 100

for request in requests:
bytes_counter.add(int(request), {"environment": "production", "method": "REST", "customer_id": 123})
bytes_counter.add(
int(request),
{
"environment": "production",
"method": "REST",
"customer_id": 123,
},
)

# customer 247 is another big user, making fewer, but bigger requests
requests = np.random.normal(5000, 1250, 200) # 200 requests with average size of 5k bytes
requests = np.random.normal(
5000, 1250, 200
) # 200 requests with average size of 5k bytes

for request in requests:
bytes_counter.add(int(request), {"environment": "production", "method": "REST", "customer_id": 247})
bytes_counter.add(
int(request),
{
"environment": "production",
"method": "REST",
"customer_id": 247,
},
)

# There are many other smaller customers
for customer_id in range(250):
requests = np.random.normal(1000, 250, np.random.randint(1, 10))
method = "REST" if np.random.randint(2) else "gRPC"
for request in requests:
bytes_counter.add(int(request), {"environment": "production", "method": method, "customer_id": customer_id})
bytes_counter.add(
int(request),
{
"environment": "production",
"method": method,
"customer_id": customer_id,
},
)


unknown_customer_calls()

Expand All @@ -93,10 +121,15 @@ def unknown_customer_calls():
customer_bytes_map[exemplar.dropped_labels] += exemplar.value


customer_bytes_list = sorted(list(customer_bytes_map.items()), key=lambda t: t[1], reverse=True)
customer_bytes_list = sorted(
list(customer_bytes_map.items()), key=lambda t: t[1], reverse=True
)

# Save our top 5 customers and sum all of the rest into "Others".
top_5_customers = [("Customer {}".format(dict(val[0])["customer_id"]), val[1]) for val in customer_bytes_list[:5]] + [("Other Customers", sum([val[1] for val in customer_bytes_list[5:]]))]
top_5_customers = [
("Customer {}".format(dict(val[0])["customer_id"]), val[1])
for val in customer_bytes_list[:5]
] + [("Other Customers", sum([val[1] for val in customer_bytes_list[5:]]))]

# unzip the data into X (sizes of each customer's contribution) and labels
labels, X = zip(*top_5_customers)
Expand All @@ -106,26 +139,45 @@ def unknown_customer_calls():
plt.show()

# Estimate how many bytes customer 123 sent
customer_123_bytes = customer_bytes_map[(("customer_id", 123), ("method", "REST"))]
customer_123_bytes = customer_bytes_map[
(("customer_id", 123), ("method", "REST"))
]

# Since the exemplars were randomly sampled, all sample_counts will be the same
sample_count = exemplars[0].sample_count
print("sample count", sample_count, "custmer", customer_123_bytes)
full_customer_123_bytes = sample_count * customer_123_bytes

# With seed == 1 we get 1008612 - quite close to the statistical mean of 1000000! (more exemplars would make this estimation even more accurate)
print("Customer 123 sent about {} bytes this interval".format(int(full_customer_123_bytes)))
print(
"Customer 123 sent about {} bytes this interval".format(
int(full_customer_123_bytes)
)
)

# Determine the top 25 customers by how many bytes they sent in exemplars
top_25_customers = customer_bytes_list[:25]

# out of those 25 customers, determine how many used grpc, and come up with a ratio
percent_grpc = len(list(filter(lambda customer_value: customer_value[0][1][1] == "gRPC", top_25_customers))) / len(top_25_customers)

print("~{}% of the top 25 customers (by bytes in) used gRPC this interval".format(int(percent_grpc*100)))
percent_grpc = len(
list(
filter(
lambda customer_value: customer_value[0][1][1] == "gRPC",
top_25_customers,
)
)
) / len(top_25_customers)

print(
"~{}% of the top 25 customers (by bytes in) used gRPC this interval".format(
int(percent_grpc * 100)
)
)

# Determine the 50th, 90th, and 99th percentile of byte size sent in
quantiles = np.quantile([exemplar.value for exemplar in exemplars], [0.5, 0.9, 0.99])
quantiles = np.quantile(
[exemplar.value for exemplar in exemplars], [0.5, 0.9, 0.99]
)
print("50th Percentile Bytes In:", int(quantiles[0]))
print("90th Percentile Bytes In:", int(quantiles[1]))
print("99th Percentile Bytes In:", int(quantiles[2]))
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def export(
record.instrument,
record.labels,
record.aggregator.checkpoint,
record.aggregator.checkpoint_exemplars
record.aggregator.checkpoint_exemplars,
)
)
return MetricsExportResult.SUCCESS
48 changes: 32 additions & 16 deletions opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,14 @@
import logging
import threading
from collections import OrderedDict, namedtuple
import itertools

from collections import namedtuple, OrderedDict
from opentelemetry.util import time_ns
from opentelemetry.sdk.metrics.export.exemplars import (
Exemplar,
RandomExemplarSampler,
MinMaxExemplarSampler,
BucketedExemplarSampler,
ExemplarManager
ExemplarManager,
MinMaxExemplarSampler,
RandomExemplarSampler,
)
from opentelemetry.util import time_ns

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -67,7 +64,9 @@ def __init__(self, config=None):
self.checkpoint = 0
self._lock = threading.Lock()
self.last_update_timestamp = None
self.exemplar_manager = ExemplarManager(config, MinMaxExemplarSampler, RandomExemplarSampler)
self.exemplar_manager = ExemplarManager(
config, MinMaxExemplarSampler, RandomExemplarSampler
)

def update(self, value, dropped_labels=None):
with self._lock:
Expand All @@ -89,7 +88,9 @@ def merge(self, other):
self.last_update_timestamp = get_latest_timestamp(
self.last_update_timestamp, other.last_update_timestamp
)
self.checkpoint_exemplars = self.exemplar_manager.merge(self.checkpoint_exemplars, other.checkpoint_exemplars)
self.checkpoint_exemplars = self.exemplar_manager.merge(
self.checkpoint_exemplars, other.checkpoint_exemplars
)


class MinMaxSumCountAggregator(Aggregator):
Expand Down Expand Up @@ -118,7 +119,9 @@ def __init__(self, config=None):
self._lock = threading.Lock()
self.last_update_timestamp = None

self.exemplar_manager = ExemplarManager(config, MinMaxExemplarSampler, RandomExemplarSampler)
self.exemplar_manager = ExemplarManager(
config, MinMaxExemplarSampler, RandomExemplarSampler
)

def update(self, value, dropped_labels=None):
with self._lock:
Expand Down Expand Up @@ -151,7 +154,9 @@ def merge(self, other):
self.last_update_timestamp = get_latest_timestamp(
self.last_update_timestamp, other.last_update_timestamp
)
self.checkpoint_exemplars = self.exemplar_manager.merge(self.checkpoint_exemplars, other.checkpoint_exemplars)
self.checkpoint_exemplars = self.exemplar_manager.merge(
self.checkpoint_exemplars, other.checkpoint_exemplars
)


class HistogramAggregator(Aggregator):
Expand All @@ -161,15 +166,20 @@ def __init__(self, config=None):
super().__init__(config=config)
self._lock = threading.Lock()
self.last_update_timestamp = None
boundaries = self.config.get('bounds', None)
boundaries = self.config.get("bounds", None)
if boundaries and self._validate_boundaries(boundaries):
self._boundaries = boundaries
else:
self._boundaries = (10, 20, 30, 40, 50, 60, 70, 80, 90, 100)
self.current = OrderedDict([(bb, 0) for bb in self._boundaries])
self.checkpoint = OrderedDict([(bb, 0) for bb in self._boundaries])

self.exemplar_manager = ExemplarManager(config, BucketedExemplarSampler, BucketedExemplarSampler, boundaries=self._boundaries)
self.exemplar_manager = ExemplarManager(
config,
BucketedExemplarSampler,
BucketedExemplarSampler,
boundaries=self._boundaries,
)

self.current[">"] = 0
self.checkpoint[">"] = 0
Expand Down Expand Up @@ -205,14 +215,18 @@ def update(self, value, dropped_labels=None):
# greater than max value
if value >= self._boundaries[len(self._boundaries) - 1]:
self.current[">"] += 1
self.exemplar_manager.sample(value, dropped_labels, bucket_index=len(self._boundaries))
self.exemplar_manager.sample(
value, dropped_labels, bucket_index=len(self._boundaries)
)
else:
for index, bb in enumerate(self._boundaries):
# find first bucket that value is less than
if value < bb:
self.current[bb] += 1

self.exemplar_manager.sample(value, dropped_labels, bucket_index=index)
self.exemplar_manager.sample(
value, dropped_labels, bucket_index=index
)
break
self.last_update_timestamp = time_ns()

Expand All @@ -232,7 +246,9 @@ def merge(self, other):
self.checkpoint, other.checkpoint
)

self.checkpoint_exemplars = self.exemplar_manager.merge(self.checkpoint_exemplars, other.checkpoint_exemplars)
self.checkpoint_exemplars = self.exemplar_manager.merge(
self.checkpoint_exemplars, other.checkpoint_exemplars
)

self.last_update_timestamp = get_latest_timestamp(
self.last_update_timestamp, other.last_update_timestamp
Expand Down
Loading

0 comments on commit ab2c1fa

Please sign in to comment.