Skip to content

Commit

Permalink
added initial exporter metric code (#637)
Browse files Browse the repository at this point in the history
  • Loading branch information
mamunto authored Nov 19, 2024
1 parent 725e134 commit 6570e78
Show file tree
Hide file tree
Showing 6 changed files with 280 additions and 52 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
//
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
//
import Foundation
import OpenTelemetryApi

/// `ExporterMetrics` will provide a way to track how many data have been seen or successfully exported,
/// as well as how many failed. The exporter will adopt an instance of this and inject the provider as a dependency.
/// The host application can then track different types of exporters, such as `http, grpc, and log`
public class ExporterMetrics {
public enum TransporterType: String {
case grpc = "grpc"
case protoBuf = "http"
case httpJson = "http-json"
}

public static let ATTRIBUTE_KEY_TYPE: String = "type"
public static let ATTRIBUTE_KEY_SUCCESS: String = "success"

private let meterProvider: StableMeterProvider
private let exporterName: String
private let transportName: String
private var seenAttrs: [String: AttributeValue] = [:]
private var successAttrs: [String: AttributeValue] = [:]
private var failedAttrs: [String: AttributeValue] = [:]

private var seen: LongCounter?
private var exported: LongCounter?

/// - Parameters:
/// - type: That represent what type of exporter it is. `otlp`
/// - meterProvider: Injected `StableMeterProvider` for metric
/// - exporterName: Could be `span`, `log` etc
/// - transportName: Kind of exporter defined by type `TransporterType`
public init(
type: String,
meterProvider: StableMeterProvider,
exporterName: String,
transportName: TransporterType
) {
self.meterProvider = meterProvider
self.exporterName = exporterName
self.transportName = transportName.rawValue
self.seenAttrs = [
ExporterMetrics.ATTRIBUTE_KEY_TYPE: .string(type)
]
self.successAttrs = [
ExporterMetrics.ATTRIBUTE_KEY_SUCCESS: .bool(true)
]
self.failedAttrs = [
ExporterMetrics.ATTRIBUTE_KEY_SUCCESS: .bool(false)
]

self.seen = meter.counterBuilder(name: "\(exporterName).exporter.seen").build()
self.exported = meter.counterBuilder(name: "\(exporterName).exporter.exported").build()

}

public func addSeen(value: Int) -> Void {
seen?.add(value: value, attribute: seenAttrs)
}

public func addSuccess(value: Int) -> Void {
exported?.add(value: value, attribute: successAttrs)
}

public func addFailed(value: Int) -> Void {
exported?.add(value: value, attribute: failedAttrs)
}

// MARK: - Private functions

/***
* Create an instance for recording exporter metrics under the meter
* "io.opentelemetry.exporters." + exporterName + "-transporterType".
**/
private var meter: StableMeter {
meterProvider.get(name: "io.opentelemetry.exporters.\(exporterName)-\(transportName)")
}

// MARK: - Static function

public static func makeExporterMetric(
type: String,
meterProvider: StableMeterProvider,
exporterName: String,
transportName: TransporterType
) -> ExporterMetrics {
ExporterMetrics(
type: type,
meterProvider: meterProvider,
exporterName: exporterName,
transportName: transportName
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,16 @@ public struct OtlpConfiguration {
public let headers : [(String,String)]?
public let timeout : TimeInterval
public let compression: CompressionType

public let exportAsJson: Bool
public init(
timeout : TimeInterval = OtlpConfiguration.DefaultTimeoutInterval,
compression: CompressionType = .gzip,
headers: [(String,String)]? = nil
headers: [(String,String)]? = nil,
exportAsJson: Bool = true
) {
self.headers = headers
self.timeout = timeout
self.compression = compression
self.exportAsJson = exportAsJson
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,25 @@ import FoundationNetworking
#endif

public class OtlpHttpExporterBase {
let endpoint: URL
let httpClient: HTTPClient
let envVarHeaders : [(String,String)]?

let config : OtlpConfiguration
public init(endpoint: URL, config: OtlpConfiguration = OtlpConfiguration(), useSession: URLSession? = nil, envVarHeaders: [(String,String)]? = EnvVarHeaders.attributes) {
self.envVarHeaders = envVarHeaders

self.endpoint = endpoint
self.config = config
if let providedSession = useSession {
self.httpClient = HTTPClient(session: providedSession)
} else {
self.httpClient = HTTPClient()
}
let endpoint: URL
let httpClient: HTTPClient
let envVarHeaders : [(String,String)]?
let config : OtlpConfiguration

public init(
endpoint: URL,
config: OtlpConfiguration = OtlpConfiguration(),
useSession: URLSession? = nil,
envVarHeaders: [(String,String)]? = EnvVarHeaders.attributes
) {
self.envVarHeaders = envVarHeaders
self.endpoint = endpoint
self.config = config
if let providedSession = useSession {
self.httpClient = HTTPClient(session: providedSession)
} else {
self.httpClient = HTTPClient()
}
}

public func createRequest(body: Message, endpoint: URL) -> URLRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import Foundation
import OpenTelemetryProtocolExporterCommon
import OpenTelemetrySdk
import OpenTelemetryApi
#if canImport(FoundationNetworking)
import FoundationNetworking
#endif
Expand All @@ -17,13 +18,47 @@ public func defaultOltpHttpLoggingEndpoint() -> URL {
public class OtlpHttpLogExporter: OtlpHttpExporterBase, LogRecordExporter {
var pendingLogRecords: [ReadableLogRecord] = []
private let exporterLock = Lock()
override public init(endpoint: URL = defaultOltpHttpLoggingEndpoint(),
config: OtlpConfiguration = OtlpConfiguration(),
useSession: URLSession? = nil,
envVarHeaders: [(String, String)]? = EnvVarHeaders.attributes) {
super.init(endpoint: endpoint, config: config, useSession: useSession, envVarHeaders: envVarHeaders)
private var exporterMetrics: ExporterMetrics?

override public init(
endpoint: URL = defaultOltpHttpLoggingEndpoint(),
config: OtlpConfiguration = OtlpConfiguration(),
useSession: URLSession? = nil,
envVarHeaders: [(String, String)]? = EnvVarHeaders.attributes
) {
super.init(
endpoint: endpoint,
config: config,
useSession: useSession,
envVarHeaders: envVarHeaders
)
}

/// A `convenience` constructor to provide support for exporter metric using`StableMeterProvider` type
/// - Parameters:
/// - endpoint: Exporter endpoint injected as dependency
/// - config: Exporter configuration including type of exporter
/// - meterProvider: Injected `StableMeterProvider` for metric
/// - useSession: Overridden `URLSession` if any
/// - envVarHeaders: Extra header key-values
convenience public init(
endpoint: URL = defaultOltpHttpLoggingEndpoint(),
config: OtlpConfiguration = OtlpConfiguration(),
meterProvider: StableMeterProvider,
useSession: URLSession? = nil,
envVarHeaders: [(String, String)]? = EnvVarHeaders.attributes
) {
self.init(endpoint: endpoint, config: config, useSession: useSession, envVarHeaders: envVarHeaders)
exporterMetrics = ExporterMetrics(
type: "otlp",
meterProvider: meterProvider,
exporterName: "log",
transportName: config.exportAsJson ?
ExporterMetrics.TransporterType.httpJson :
ExporterMetrics.TransporterType.grpc
)
}

public func export(logRecords: [OpenTelemetrySdk.ReadableLogRecord], explicitTimeout: TimeInterval? = nil) -> OpenTelemetrySdk.ExportResult {
var sendingLogRecords: [ReadableLogRecord] = []
exporterLock.withLockVoid {
Expand All @@ -47,12 +82,15 @@ public class OtlpHttpLogExporter: OtlpHttpExporterBase, LogRecordExporter {
request.addValue(value, forHTTPHeaderField: key)
}
}
exporterMetrics?.addSeen(value: sendingLogRecords.count)
request.timeoutInterval = min(explicitTimeout ?? TimeInterval.greatestFiniteMagnitude, config.timeout)
httpClient.send(request: request) { [weak self] result in
switch result {
case .success:
self?.exporterMetrics?.addSuccess(value: sendingLogRecords.count)
break
case let .failure(error):
self?.exporterMetrics?.addFailed(value: sendingLogRecords.count)
self?.exporterLock.withLockVoid {
self?.pendingLogRecords.append(contentsOf: sendingLogRecords)
}
Expand Down Expand Up @@ -90,11 +128,13 @@ public class OtlpHttpLogExporter: OtlpHttpExporterBase, LogRecordExporter {
request.addValue(value, forHTTPHeaderField: key)
}
}
httpClient.send(request: request) { result in
httpClient.send(request: request) { [weak self] result in
switch result {
case .success:
self?.exporterMetrics?.addSuccess(value: pendingLogRecords.count)
exporterResult = ExportResult.success
case let .failure(error):
self?.exporterMetrics?.addFailed(value: pendingLogRecords.count)
print(error)
exporterResult = ExportResult.failure
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import OpenTelemetrySdk
import OpenTelemetryProtocolExporterCommon
import Foundation
import OpenTelemetryApi
#if canImport(FoundationNetworking)
import FoundationNetworking
#endif
Expand All @@ -16,14 +17,50 @@ public func defaultOltpHTTPMetricsEndpoint() -> URL {

@available(*, deprecated, renamed: "StableOtlpHTTPMetricExporter")
public class OtlpHttpMetricExporter: OtlpHttpExporterBase, MetricExporter {
var pendingMetrics: [Metric] = []
private let exporterLock = Lock()

var pendingMetrics: [Metric] = []
private let exporterLock = Lock()
private var exporterMetrics: ExporterMetrics?

override
public init(endpoint: URL = defaultOltpHTTPMetricsEndpoint(), config : OtlpConfiguration = OtlpConfiguration(), useSession: URLSession? = nil, envVarHeaders: [(String,String)]? = EnvVarHeaders.attributes) {
super.init(endpoint: endpoint, config: config, useSession: useSession, envVarHeaders: envVarHeaders)
public init(
endpoint: URL = defaultOltpHTTPMetricsEndpoint(),
config : OtlpConfiguration = OtlpConfiguration(),
useSession: URLSession? = nil,
envVarHeaders: [(String,String)]? = EnvVarHeaders.attributes
) {
super.init(
endpoint: endpoint,
config: config,
useSession: useSession,
envVarHeaders: envVarHeaders
)
}


/// A `convenience` constructor to provide support for exporter metric using`StableMeterProvider` type
/// - Parameters:
/// - endpoint: Exporter endpoint injected as dependency
/// - config: Exporter configuration including type of exporter
/// - meterProvider: Injected `StableMeterProvider` for metric
/// - useSession: Overridden `URLSession` if any
/// - envVarHeaders: Extra header key-values
convenience public init(
endpoint: URL = defaultOltpHTTPMetricsEndpoint(),
config : OtlpConfiguration = OtlpConfiguration(),
meterProvider: StableMeterProvider,
useSession: URLSession? = nil,
envVarHeaders: [(String,String)]? = EnvVarHeaders.attributes
) {
self.init(endpoint: endpoint, config: config, useSession: useSession, envVarHeaders: envVarHeaders)
exporterMetrics = ExporterMetrics(
type: "otlp",
meterProvider: meterProvider,
exporterName: "metric",
transportName: config.exportAsJson ?
ExporterMetrics.TransporterType.httpJson :
ExporterMetrics.TransporterType.grpc
)
}

public func export(metrics: [Metric], shouldCancel: (() -> Bool)?) -> MetricExporterResultCode {
var sendingMetrics: [Metric] = []
exporterLock.withLockVoid {
Expand All @@ -46,11 +83,14 @@ public class OtlpHttpMetricExporter: OtlpHttpExporterBase, MetricExporter {
request.addValue(value, forHTTPHeaderField: key)
}
}
exporterMetrics?.addSeen(value: sendingMetrics.count)
httpClient.send(request: request) { [weak self] result in
switch result {
case .success(_):
self?.exporterMetrics?.addSuccess(value: sendingMetrics.count)
break
case .failure(let error):
self?.exporterMetrics?.addFailed(value: sendingMetrics.count)
self?.exporterLock.withLockVoid {
self?.pendingMetrics.append(contentsOf: sendingMetrics)
}
Expand All @@ -64,25 +104,27 @@ public class OtlpHttpMetricExporter: OtlpHttpExporterBase, MetricExporter {
public func flush() -> MetricExporterResultCode {
var exporterResult: MetricExporterResultCode = .success

if !pendingMetrics.isEmpty {
let body = Opentelemetry_Proto_Collector_Metrics_V1_ExportMetricsServiceRequest.with {
$0.resourceMetrics = MetricsAdapter.toProtoResourceMetrics(metricDataList: pendingMetrics)
if !pendingMetrics.isEmpty {
let body = Opentelemetry_Proto_Collector_Metrics_V1_ExportMetricsServiceRequest.with {
$0.resourceMetrics = MetricsAdapter.toProtoResourceMetrics(metricDataList: pendingMetrics)
}

let semaphore = DispatchSemaphore(value: 0)
let request = createRequest(body: body, endpoint: endpoint)
httpClient.send(request: request) { [weak self, count = pendingMetrics.count] result in
switch result {
case .success(_):
self?.exporterMetrics?.addSuccess(value: count)
break
case .failure(let error):
self?.exporterMetrics?.addFailed(value: count)
print(error)
exporterResult = MetricExporterResultCode.failureNotRetryable
}
semaphore.signal()
}
semaphore.wait()
}

let semaphore = DispatchSemaphore(value: 0)
let request = createRequest(body: body, endpoint: endpoint)
httpClient.send(request: request) { result in
switch result {
case .success(_):
break
case .failure(let error):
print(error)
exporterResult = MetricExporterResultCode.failureNotRetryable
}
semaphore.signal()
}
semaphore.wait()
}
return exporterResult
return exporterResult
}
}
Loading

0 comments on commit 6570e78

Please sign in to comment.