Skip to content

Commit

Permalink
Merge branch 'main' into thread-safe-http-exporters
Browse files Browse the repository at this point in the history
  • Loading branch information
Ignacio Bonafonte authored Oct 26, 2023
2 parents 33e1771 + f7061de commit d615964
Show file tree
Hide file tree
Showing 10 changed files with 411 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,9 @@ public enum MetricsAdapter {
var protoDataPoint = Opentelemetry_Proto_Metrics_V1_NumberDataPoint()
injectPointData(protoNumberPoint: &protoDataPoint, pointData: gaugeData)
protoDataPoint.value = .asInt(Int64(gaugeData.value))
protoMetric.sum.aggregationTemporality = .cumulative
protoMetric.sum.aggregationTemporality = stableMetric.data.aggregationTemporality.convertToProtoEnum()
protoMetric.sum.dataPoints.append(protoDataPoint)
protoMetric.sum.isMonotonic = stableMetric.isMonotonic
case .DoubleGauge:
guard let gaugeData = $0 as? DoublePointData else {
break
Expand All @@ -120,8 +121,9 @@ public enum MetricsAdapter {
var protoDataPoint = Opentelemetry_Proto_Metrics_V1_NumberDataPoint()
injectPointData(protoNumberPoint: &protoDataPoint, pointData: gaugeData)
protoDataPoint.value = .asDouble(gaugeData.value)
protoMetric.sum.aggregationTemporality = .cumulative
protoMetric.sum.aggregationTemporality = stableMetric.data.aggregationTemporality.convertToProtoEnum()
protoMetric.sum.dataPoints.append(protoDataPoint)
protoMetric.sum.isMonotonic = stableMetric.isMonotonic
case .Summary:
guard let summaryData = $0 as? SummaryPointData else {
break
Expand All @@ -147,7 +149,7 @@ public enum MetricsAdapter {
protoDataPoint.count = UInt64(histogramData.count)
protoDataPoint.explicitBounds = histogramData.boundaries.map { Double($0) }
protoDataPoint.bucketCounts = histogramData.counts.map { UInt64($0) }
protoMetric.histogram.aggregationTemporality = .cumulative
protoMetric.histogram.aggregationTemporality = stableMetric.data.aggregationTemporality.convertToProtoEnum()
protoMetric.histogram.dataPoints.append(protoDataPoint)
case .ExponentialHistogram:
// TODO: implement
Expand Down Expand Up @@ -394,3 +396,14 @@ public enum MetricsAdapter {
return protoMetric
}
}

extension AggregationTemporality {
func convertToProtoEnum() -> Opentelemetry_Proto_Metrics_V1_AggregationTemporality {
switch self {
case .cumulative:
return .cumulative
case .delta:
return .delta
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
//
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
//

import Foundation
import SwiftProtobuf
import OpenTelemetryProtocolExporterCommon

public class StableOtlpHTTPExporterBase {
let endpoint: URL
let httpClient: HTTPClient
let envVarHeaders: [(String, String)]?
let config: OtlpConfiguration

// MARK: - Init

public init(endpoint: URL, config: OtlpConfiguration = OtlpConfiguration(), useSession: URLSession? = nil, envVarHeaders: [(String, String)]? = EnvVarHeaders.attributes) {
self.envVarHeaders = envVarHeaders
self.endpoint = endpoint
self.config = config
if let providedSession = useSession {
self.httpClient = HTTPClient(session: providedSession)
} else {
self.httpClient = HTTPClient()
}
}

public func createRequest(body: Message, endpoint: URL) -> URLRequest {
var request = URLRequest(url: endpoint)

for header in config.headers ?? [] {
request.addValue(header.1, forHTTPHeaderField: header.0)
}

do {
request.httpMethod = "POST"
request.httpBody = try body.serializedData()
request.setValue(Headers.getUserAgentHeader(), forHTTPHeaderField: Constants.HTTP.userAgent)
request.setValue("application/x-protobuf", forHTTPHeaderField: "Content-Type")
} catch {
print("Error serializing body: \(error)")
}

return request
}

public func shutdown(explicitTimeout: TimeInterval? = nil) { }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
//
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
//

import Foundation
import OpenTelemetrySdk
import OpenTelemetryProtocolExporterCommon

public func defaultStableOtlpHTTPMetricsEndpoint() -> URL {
URL(string: "http://localhost:4318/v1/metrics")!
}

public class StableOtlpHTTPMetricExporter: StableOtlpHTTPExporterBase, StableMetricExporter {
var aggregationTemporalitySelector: AggregationTemporalitySelector
var defaultAggregationSelector: DefaultAggregationSelector

var pendingMetrics: [StableMetricData] = []

// MARK: - Init

public init(endpoint: URL, config: OtlpConfiguration = OtlpConfiguration(), aggregationTemporalitySelector: AggregationTemporalitySelector = AggregationTemporality.alwaysCumulative(), defaultAggregationSelector: DefaultAggregationSelector = AggregationSelector.instance, useSession: URLSession? = nil, envVarHeaders: [(String, String)]? = EnvVarHeaders.attributes) {

self.aggregationTemporalitySelector = aggregationTemporalitySelector
self.defaultAggregationSelector = defaultAggregationSelector

super.init(endpoint: endpoint, config: config, useSession: useSession, envVarHeaders: envVarHeaders)
}


// MARK: - StableMetricsExporter

public func export(metrics : [StableMetricData]) -> ExportResult {
pendingMetrics.append(contentsOf: metrics)
let sendingMetrics = pendingMetrics
pendingMetrics = []
let body = Opentelemetry_Proto_Collector_Metrics_V1_ExportMetricsServiceRequest.with {
$0.resourceMetrics = MetricsAdapter.toProtoResourceMetrics(stableMetricData: sendingMetrics)
}

let request = createRequest(body: body, endpoint: endpoint)
httpClient.send(request: request) { [weak self] result in
switch result {
case .success(_):
break
case .failure(let error):
self?.pendingMetrics.append(contentsOf: sendingMetrics)
print(error)
}
}

return .success
}

public func flush() -> ExportResult {
var exporterResult: ExportResult = .success

if !pendingMetrics.isEmpty {
let body = Opentelemetry_Proto_Collector_Metrics_V1_ExportMetricsServiceRequest.with {
$0.resourceMetrics = MetricsAdapter.toProtoResourceMetrics(stableMetricData: pendingMetrics)
}
let semaphore = DispatchSemaphore(value: 0)
let request = createRequest(body: body, endpoint: endpoint)
httpClient.send(request: request) { result in
switch result {
case .success(_):
break
case .failure(let error):
print(error)
exporterResult = .failure
}
semaphore.signal()
}
semaphore.wait()
}

return exporterResult
}

public func shutdown() -> ExportResult {
return .success
}

// MARK: - AggregationTemporalitySelectorProtocol

public func getAggregationTemporality(for instrument: OpenTelemetrySdk.InstrumentType) -> OpenTelemetrySdk.AggregationTemporality {
return aggregationTemporalitySelector.getAggregationTemporality(for: instrument)
}

// MARK: - DefaultAggregationSelector

public func getDefaultAggregation(for instrument: OpenTelemetrySdk.InstrumentType) -> OpenTelemetrySdk.Aggregation {
return defaultAggregationSelector.getDefaultAggregation(for: instrument)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public class AggregationTemporalitySelector : AggregationTemporalitySelectorProt
return aggregationTemporalitySelector(instrument)
}

init(aggregationTemporalitySelector: @escaping (InstrumentType) -> AggregationTemporality) {
public init(aggregationTemporalitySelector: @escaping (InstrumentType) -> AggregationTemporality) {
self.aggregationTemporalitySelector = aggregationTemporalitySelector
}

Expand All @@ -31,6 +31,12 @@ public enum AggregationTemporality {
}

}

public static func alwaysDelta() -> AggregationTemporalitySelector {
return AggregationTemporalitySelector() { (type) in
.delta
}
}

public static func deltaPreferred() -> AggregationTemporalitySelector {
return AggregationTemporalitySelector() { type in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class DoubleSumAggregator: SumAggregator, StableAggregator {
}

public func toMetricData(resource: Resource, scope: InstrumentationScopeInfo, descriptor: MetricDescriptor, points: [PointData], temporality: AggregationTemporality) -> StableMetricData {
StableMetricData.createDoubleSum(resource: resource, instrumentationScopeInfo: scope, name: descriptor.instrument.name, description: descriptor.instrument.description, unit: descriptor.instrument.unit, data: StableSumData(aggregationTemporality: temporality, points: points as! [DoublePointData]))
StableMetricData.createDoubleSum(resource: resource, instrumentationScopeInfo: scope, name: descriptor.instrument.name, description: descriptor.instrument.description, unit: descriptor.instrument.unit, isMonotonic: self.isMonotonic, data: StableSumData(aggregationTemporality: temporality, points: points as! [DoublePointData]))
}

init(instrumentDescriptor: InstrumentDescriptor, reservoirSupplier: @escaping () -> ExemplarReservoir) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class LongSumAggregator: SumAggregator, StableAggregator {
}

public func toMetricData(resource: Resource, scope: InstrumentationScopeInfo, descriptor: MetricDescriptor, points: [PointData], temporality: AggregationTemporality) -> StableMetricData {
StableMetricData.createLongSum(resource: resource, instrumentationScopeInfo: scope, name: descriptor.instrument.name, description: descriptor.instrument.description, unit: descriptor.instrument.unit, data: StableSumData(aggregationTemporality: temporality, points: points as! [LongPointData]))
StableMetricData.createLongSum(resource: resource, instrumentationScopeInfo: scope, name: descriptor.instrument.name, description: descriptor.instrument.description, unit: descriptor.instrument.unit, isMonotonic: self.isMonotonic, data: StableSumData(aggregationTemporality: temporality, points: points as! [LongPointData]))
}

private class Handle: AggregatorHandle {
Expand Down
59 changes: 27 additions & 32 deletions Sources/OpenTelemetrySdk/Metrics/Stable/Data/StableMetricData.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,29 +23,33 @@ public struct StableMetricData: Equatable {
public private(set) var description: String
public private(set) var unit: String
public private(set) var type: MetricDataType
public private(set) var isMonotonic: Bool
public private(set) var data: Data

public static let empty = StableMetricData(resource: Resource.empty, instrumentationScopeInfo: InstrumentationScopeInfo(), name: "", description: "", unit: "", type: .Summary, data: StableMetricData.Data(points: [PointData]()))
public static let empty = StableMetricData(resource: Resource.empty, instrumentationScopeInfo: InstrumentationScopeInfo(), name: "", description: "", unit: "", type: .Summary, isMonotonic: false, data: StableMetricData.Data(aggregationTemporality: .cumulative, points: [PointData]()))

public class Data: Equatable {
public private(set) var points: [PointData]
public private(set) var aggregationTemporality: AggregationTemporality

internal init(points: [PointData]) {
internal init(aggregationTemporality: AggregationTemporality, points: [PointData]) {
self.aggregationTemporality = aggregationTemporality
self.points = points
}

public static func == (lhs: StableMetricData.Data, rhs: StableMetricData.Data) -> Bool {
return lhs.points == rhs.points
return lhs.points == rhs.points && lhs.aggregationTemporality == rhs.aggregationTemporality
}
}

internal init(resource: Resource, instrumentationScopeInfo: InstrumentationScopeInfo, name: String, description: String, unit: String, type: MetricDataType, data: StableMetricData.Data) {
internal init(resource: Resource, instrumentationScopeInfo: InstrumentationScopeInfo, name: String, description: String, unit: String, type: MetricDataType, isMonotonic: Bool, data: StableMetricData.Data) {
self.resource = resource
self.instrumentationScopeInfo = instrumentationScopeInfo
self.name = name
self.description = description
self.unit = unit
self.type = type
self.isMonotonic = isMonotonic
self.data = data
}

Expand All @@ -56,33 +60,34 @@ public struct StableMetricData: Equatable {
lhs.description == rhs.description &&
lhs.unit == rhs.unit &&
lhs.type == rhs.type &&
lhs.data.points == rhs.data.points
lhs.isMonotonic == rhs.isMonotonic &&
lhs.data == rhs.data
}
}

extension StableMetricData {
static func createExponentialHistogram(resource: Resource, instrumentationScopeInfo: InstrumentationScopeInfo, name: String, description: String, unit: String, data: StableExponentialHistogramData) -> StableMetricData {
StableMetricData(resource: resource, instrumentationScopeInfo: instrumentationScopeInfo, name: name, description: description, unit: unit, type: .ExponentialHistogram, data: data)
StableMetricData(resource: resource, instrumentationScopeInfo: instrumentationScopeInfo, name: name, description: description, unit: unit, type: .ExponentialHistogram, isMonotonic: false, data: data)
}

static func createDoubleGauge(resource: Resource, instrumentationScopeInfo: InstrumentationScopeInfo, name: String, description: String, unit: String, data: StableGaugeData) -> StableMetricData {
StableMetricData(resource: resource, instrumentationScopeInfo: instrumentationScopeInfo, name: name, description: description, unit: unit, type: .DoubleGauge, data: data)
StableMetricData(resource: resource, instrumentationScopeInfo: instrumentationScopeInfo, name: name, description: description, unit: unit, type: .DoubleGauge, isMonotonic: false, data: data)
}

static func createLongGauge(resource: Resource, instrumentationScopeInfo: InstrumentationScopeInfo, name: String, description: String, unit: String, data: StableGaugeData) -> StableMetricData {
StableMetricData(resource: resource, instrumentationScopeInfo: instrumentationScopeInfo, name: name, description: description, unit: unit, type: .LongGauge, data: data)
StableMetricData(resource: resource, instrumentationScopeInfo: instrumentationScopeInfo, name: name, description: description, unit: unit, type: .LongGauge, isMonotonic: false, data: data)
}

static func createDoubleSum(resource: Resource, instrumentationScopeInfo: InstrumentationScopeInfo, name: String, description: String, unit: String, data: StableSumData) -> StableMetricData {
StableMetricData(resource: resource, instrumentationScopeInfo: instrumentationScopeInfo, name: name, description: description, unit: unit, type: .DoubleSum, data: data)
static func createDoubleSum(resource: Resource, instrumentationScopeInfo: InstrumentationScopeInfo, name: String, description: String, unit: String, isMonotonic: Bool, data: StableSumData) -> StableMetricData {
StableMetricData(resource: resource, instrumentationScopeInfo: instrumentationScopeInfo, name: name, description: description, unit: unit, type: .DoubleSum, isMonotonic: isMonotonic, data: data)
}

static func createLongSum(resource: Resource, instrumentationScopeInfo: InstrumentationScopeInfo, name: String, description: String, unit: String, data: StableSumData) -> StableMetricData {
StableMetricData(resource: resource, instrumentationScopeInfo: instrumentationScopeInfo, name: name, description: description, unit: unit, type: .LongSum, data: data)
static func createLongSum(resource: Resource, instrumentationScopeInfo: InstrumentationScopeInfo, name: String, description: String, unit: String, isMonotonic: Bool, data: StableSumData) -> StableMetricData {
StableMetricData(resource: resource, instrumentationScopeInfo: instrumentationScopeInfo, name: name, description: description, unit: unit, type: .LongSum, isMonotonic: isMonotonic, data: data)
}

static func createHistogram(resource: Resource, instrumentationScopeInfo: InstrumentationScopeInfo, name: String, description: String, unit: String, data: StableHistogramData) -> StableMetricData {
StableMetricData(resource: resource, instrumentationScopeInfo: instrumentationScopeInfo, name: name, description: description, unit: unit, type: .Histogram, data: data)
StableMetricData(resource: resource, instrumentationScopeInfo: instrumentationScopeInfo, name: name, description: description, unit: unit, type: .Histogram, isMonotonic: false, data: data)
}

func isEmpty() -> Bool {
Expand All @@ -99,41 +104,31 @@ extension StableMetricData {
}

public class StableHistogramData: StableMetricData.Data {
public private(set) var aggregationTemporality: AggregationTemporality
init(aggregationTemporality: AggregationTemporality, points: [HistogramPointData]) {
self.aggregationTemporality = aggregationTemporality
super.init(points: points)
super.init(aggregationTemporality: aggregationTemporality, points: points)
}
}

public class StableExponentialHistogramData: StableMetricData.Data {
public private(set) var aggregationTemporality: AggregationTemporality
init(aggregationTemporality: AggregationTemporality, points: [PointData]) {
self.aggregationTemporality = aggregationTemporality
super.init(points: points)
override init(aggregationTemporality: AggregationTemporality, points: [PointData]) {
super.init(aggregationTemporality: aggregationTemporality, points: points)
}
}

public class StableGaugeData: StableMetricData.Data {
public private(set) var aggregationTemporality: AggregationTemporality
init(aggregationTemporality: AggregationTemporality, points: [PointData]) {
self.aggregationTemporality = aggregationTemporality
super.init(points: points)
override init(aggregationTemporality: AggregationTemporality, points: [PointData]) {
super.init(aggregationTemporality: aggregationTemporality, points: points)
}
}

public class StableSumData: StableMetricData.Data {
public private(set) var aggregationTemporality: AggregationTemporality
init(aggregationTemporality: AggregationTemporality, points: [PointData]) {
self.aggregationTemporality = aggregationTemporality
super.init(points: points)
override init(aggregationTemporality: AggregationTemporality, points: [PointData]) {
super.init(aggregationTemporality: aggregationTemporality, points: points)
}
}

public class StableSummaryData: StableMetricData.Data {
public private(set) var aggregationTemporality: AggregationTemporality
init(aggregationTemporality: AggregationTemporality, points: [PointData]) {
self.aggregationTemporality = aggregationTemporality
super.init(points: points)
override init(aggregationTemporality: AggregationTemporality, points: [PointData]) {
super.init(aggregationTemporality: aggregationTemporality, points: points)
}
}
Loading

0 comments on commit d615964

Please sign in to comment.