diff --git a/Sources/Exporters/OpenTelemetryProtocolHttp/Lock.swift b/Sources/Exporters/OpenTelemetryProtocolHttp/Lock.swift new file mode 100644 index 00000000..5a51c654 --- /dev/null +++ b/Sources/Exporters/OpenTelemetryProtocolHttp/Lock.swift @@ -0,0 +1,198 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Metrics API open source project +// +// Copyright (c) 2018-2019 Apple Inc. and the Swift Metrics API project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of Swift Metrics API project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +#if os(macOS) || os(iOS) || os(tvOS) || os(watchOS) +import Darwin +#else +import Glibc +#endif + +/// A threading lock based on `libpthread` instead of `libdispatch`. +/// +/// This object provides a lock on top of a single `pthread_mutex_t`. This kind +/// of lock is safe to use with `libpthread`-based threading models, such as the +/// one used by NIO. +internal final class Lock { + fileprivate let mutex: UnsafeMutablePointer = UnsafeMutablePointer.allocate(capacity: 1) + + /// Create a new lock. + public init() { + let err = pthread_mutex_init(self.mutex, nil) + precondition(err == 0, "pthread_mutex_init failed with error \(err)") + } + + deinit { + let err = pthread_mutex_destroy(self.mutex) + precondition(err == 0, "pthread_mutex_destroy failed with error \(err)") + self.mutex.deallocate() + } + + /// Acquire the lock. + /// + /// Whenever possible, consider using `withLock` instead of this method and + /// `unlock`, to simplify lock handling. + public func lock() { + let err = pthread_mutex_lock(self.mutex) + precondition(err == 0, "pthread_mutex_lock failed with error \(err)") + } + + /// Release the lock. + /// + /// Whenever possible, consider using `withLock` instead of this method and + /// `lock`, to simplify lock handling. + public func unlock() { + let err = pthread_mutex_unlock(self.mutex) + precondition(err == 0, "pthread_mutex_unlock failed with error \(err)") + } +} + +extension Lock { + /// Acquire the lock for the duration of the given block. + /// + /// This convenience method should be preferred to `lock` and `unlock` in + /// most situations, as it ensures that the lock will be released regardless + /// of how `body` exits. + /// + /// - Parameter body: The block to execute while holding the lock. + /// - Returns: The value returned by the block. + @inlinable + internal func withLock(_ body: () throws -> T) rethrows -> T { + self.lock() + defer { + self.unlock() + } + return try body() + } + + // specialise Void return (for performance) + @inlinable + internal func withLockVoid(_ body: () throws -> Void) rethrows { + try self.withLock(body) + } +} + +/// A threading lock based on `libpthread` instead of `libdispatch`. +/// +/// This object provides a lock on top of a single `pthread_mutex_t`. This kind +/// of lock is safe to use with `libpthread`-based threading models, such as the +/// one used by NIO. +internal final class ReadWriteLock { + fileprivate let rwlock: UnsafeMutablePointer = UnsafeMutablePointer.allocate(capacity: 1) + + /// Create a new lock. + public init() { + let err = pthread_rwlock_init(self.rwlock, nil) + precondition(err == 0, "pthread_rwlock_init failed with error \(err)") + } + + deinit { + let err = pthread_rwlock_destroy(self.rwlock) + precondition(err == 0, "pthread_rwlock_destroy failed with error \(err)") + self.rwlock.deallocate() + } + + /// Acquire a reader lock. + /// + /// Whenever possible, consider using `withLock` instead of this method and + /// `unlock`, to simplify lock handling. + public func lockRead() { + let err = pthread_rwlock_rdlock(self.rwlock) + precondition(err == 0, "pthread_rwlock_rdlock failed with error \(err)") + } + + /// Acquire a writer lock. + /// + /// Whenever possible, consider using `withLock` instead of this method and + /// `unlock`, to simplify lock handling. + public func lockWrite() { + let err = pthread_rwlock_wrlock(self.rwlock) + precondition(err == 0, "pthread_rwlock_wrlock failed with error \(err)") + } + + /// Release the lock. + /// + /// Whenever possible, consider using `withLock` instead of this method and + /// `lock`, to simplify lock handling. + public func unlock() { + let err = pthread_rwlock_unlock(self.rwlock) + precondition(err == 0, "pthread_rwlock_unlock failed with error \(err)") + } +} + +extension ReadWriteLock { + /// Acquire the reader lock for the duration of the given block. + /// + /// This convenience method should be preferred to `lock` and `unlock` in + /// most situations, as it ensures that the lock will be released regardless + /// of how `body` exits. + /// + /// - Parameter body: The block to execute while holding the lock. + /// - Returns: The value returned by the block. + @inlinable + internal func withReaderLock(_ body: () throws -> T) rethrows -> T { + self.lockRead() + defer { + self.unlock() + } + return try body() + } + + /// Acquire the writer lock for the duration of the given block. + /// + /// This convenience method should be preferred to `lock` and `unlock` in + /// most situations, as it ensures that the lock will be released regardless + /// of how `body` exits. + /// + /// - Parameter body: The block to execute while holding the lock. + /// - Returns: The value returned by the block. + @inlinable + internal func withWriterLock(_ body: () throws -> T) rethrows -> T { + self.lockWrite() + defer { + self.unlock() + } + return try body() + } + + // specialise Void return (for performance) + @inlinable + internal func withReaderLockVoid(_ body: () throws -> Void) rethrows { + try self.withReaderLock(body) + } + + // specialise Void return (for performance) + @inlinable + internal func withWriterLockVoid(_ body: () throws -> Void) rethrows { + try self.withWriterLock(body) + } +} diff --git a/Sources/Exporters/OpenTelemetryProtocolHttp/logs/OtlpHttpLogExporter.swift b/Sources/Exporters/OpenTelemetryProtocolHttp/logs/OtlpHttpLogExporter.swift index d3e9cb97..4e982913 100644 --- a/Sources/Exporters/OpenTelemetryProtocolHttp/logs/OtlpHttpLogExporter.swift +++ b/Sources/Exporters/OpenTelemetryProtocolHttp/logs/OtlpHttpLogExporter.swift @@ -14,7 +14,7 @@ public func defaultOltpHttpLoggingEndpoint() -> URL { public class OtlpHttpLogExporter : OtlpHttpExporterBase, LogRecordExporter { var pendingLogRecords: [ReadableLogRecord] = [] - + private let exporterLock = Lock() override public init(endpoint: URL = defaultOltpHttpLoggingEndpoint(), config: OtlpConfiguration = OtlpConfiguration(), useSession: URLSession? = nil, @@ -24,8 +24,13 @@ public class OtlpHttpLogExporter : OtlpHttpExporterBase, LogRecordExporter { public func export(logRecords: [OpenTelemetrySdk.ReadableLogRecord], explicitTimeout: TimeInterval? = nil) -> OpenTelemetrySdk.ExportResult { pendingLogRecords.append(contentsOf: logRecords) - let sendingLogRecords = pendingLogRecords - pendingLogRecords = [] + var sendingLogRecords: [ReadableLogRecord] = [] + + exporterLock.withLockVoid { + pendingLogRecords.append(contentsOf: logRecords) + sendingLogRecords = pendingLogRecords + pendingLogRecords = [] + } let body = Opentelemetry_Proto_Collector_Logs_V1_ExportLogsServiceRequest.with { request in request.resourceLogs = LogRecordAdapter.toProtoResourceRecordLog(logRecordList: sendingLogRecords) @@ -38,7 +43,9 @@ public class OtlpHttpLogExporter : OtlpHttpExporterBase, LogRecordExporter { case .success(_): break case .failure(let error): - self?.pendingLogRecords.append(contentsOf: sendingLogRecords) + self?.exporterLock.withLockVoid { + self?.pendingLogRecords.append(contentsOf: sendingLogRecords) + } print(error) } } @@ -52,6 +59,10 @@ public class OtlpHttpLogExporter : OtlpHttpExporterBase, LogRecordExporter { public func flush(explicitTimeout: TimeInterval? = nil) -> ExportResult { var exporterResult: ExportResult = .success + var pendingLogRecords: [ReadableLogRecord] = [] + exporterLock.withLockVoid { + pendingLogRecords = self.pendingLogRecords + } if !pendingLogRecords.isEmpty { let body = Opentelemetry_Proto_Collector_Logs_V1_ExportLogsServiceRequest.with { request in diff --git a/Sources/Exporters/OpenTelemetryProtocolHttp/metric/OltpHTTPMetricExporter.swift b/Sources/Exporters/OpenTelemetryProtocolHttp/metric/OltpHTTPMetricExporter.swift index 2ac04ab8..c8c253e0 100644 --- a/Sources/Exporters/OpenTelemetryProtocolHttp/metric/OltpHTTPMetricExporter.swift +++ b/Sources/Exporters/OpenTelemetryProtocolHttp/metric/OltpHTTPMetricExporter.swift @@ -13,6 +13,7 @@ public func defaultOltpHTTPMetricsEndpoint() -> URL { public class OtlpHttpMetricExporter: OtlpHttpExporterBase, MetricExporter { var pendingMetrics: [Metric] = [] + private let exporterLock = Lock() override public init(endpoint: URL = defaultOltpHTTPMetricsEndpoint(), config : OtlpConfiguration = OtlpConfiguration(), useSession: URLSession? = nil, envVarHeaders: [(String,String)]? = EnvVarHeaders.attributes) { @@ -20,9 +21,12 @@ public class OtlpHttpMetricExporter: OtlpHttpExporterBase, MetricExporter { } public func export(metrics: [Metric], shouldCancel: (() -> Bool)?) -> MetricExporterResultCode { - pendingMetrics.append(contentsOf: metrics) - let sendingMetrics = pendingMetrics - pendingMetrics = [] + var sendingMetrics: [Metric] = [] + exporterLock.withLockVoid { + pendingMetrics.append(contentsOf: metrics) + sendingMetrics = pendingMetrics + pendingMetrics = [] + } let body = Opentelemetry_Proto_Collector_Metrics_V1_ExportMetricsServiceRequest.with { $0.resourceMetrics = MetricsAdapter.toProtoResourceMetrics(metricDataList: sendingMetrics) } @@ -33,7 +37,9 @@ public class OtlpHttpMetricExporter: OtlpHttpExporterBase, MetricExporter { case .success(_): break case .failure(let error): - self?.pendingMetrics.append(contentsOf: sendingMetrics) + self?.exporterLock.withLockVoid { + self?.pendingMetrics.append(contentsOf: sendingMetrics) + } print(error) } } diff --git a/Sources/Exporters/OpenTelemetryProtocolHttp/metric/StableOtlpHTTPMetricExporter.swift b/Sources/Exporters/OpenTelemetryProtocolHttp/metric/StableOtlpHTTPMetricExporter.swift index f83ed5cc..cf0999de 100644 --- a/Sources/Exporters/OpenTelemetryProtocolHttp/metric/StableOtlpHTTPMetricExporter.swift +++ b/Sources/Exporters/OpenTelemetryProtocolHttp/metric/StableOtlpHTTPMetricExporter.swift @@ -16,6 +16,7 @@ public class StableOtlpHTTPMetricExporter: StableOtlpHTTPExporterBase, StableMet var defaultAggregationSelector: DefaultAggregationSelector var pendingMetrics: [StableMetricData] = [] + private let exporterLock = Lock() // MARK: - Init @@ -31,9 +32,12 @@ public class StableOtlpHTTPMetricExporter: StableOtlpHTTPExporterBase, StableMet // MARK: - StableMetricsExporter public func export(metrics : [StableMetricData]) -> ExportResult { - pendingMetrics.append(contentsOf: metrics) - let sendingMetrics = pendingMetrics - pendingMetrics = [] + var sendingMetrics: [StableMetricData] = [] + exporterLock.withLockVoid { + pendingMetrics.append(contentsOf: metrics) + sendingMetrics = pendingMetrics + pendingMetrics = [] + } let body = Opentelemetry_Proto_Collector_Metrics_V1_ExportMetricsServiceRequest.with { $0.resourceMetrics = MetricsAdapter.toProtoResourceMetrics(stableMetricData: sendingMetrics) } @@ -44,7 +48,9 @@ public class StableOtlpHTTPMetricExporter: StableOtlpHTTPExporterBase, StableMet case .success(_): break case .failure(let error): - self?.pendingMetrics.append(contentsOf: sendingMetrics) + self?.exporterLock.withLockVoid { + self?.pendingMetrics.append(contentsOf: sendingMetrics) + } print(error) } } @@ -54,7 +60,10 @@ public class StableOtlpHTTPMetricExporter: StableOtlpHTTPExporterBase, StableMet public func flush() -> ExportResult { var exporterResult: ExportResult = .success - + var pendingMetrics: [StableMetricData] = [] + exporterLock.withLockVoid { + pendingMetrics = self.pendingMetrics + } if !pendingMetrics.isEmpty { let body = Opentelemetry_Proto_Collector_Metrics_V1_ExportMetricsServiceRequest.with { $0.resourceMetrics = MetricsAdapter.toProtoResourceMetrics(stableMetricData: pendingMetrics) diff --git a/Sources/Exporters/OpenTelemetryProtocolHttp/trace/OtlpHttpTraceExporter.swift b/Sources/Exporters/OpenTelemetryProtocolHttp/trace/OtlpHttpTraceExporter.swift index b0545865..ceee8362 100644 --- a/Sources/Exporters/OpenTelemetryProtocolHttp/trace/OtlpHttpTraceExporter.swift +++ b/Sources/Exporters/OpenTelemetryProtocolHttp/trace/OtlpHttpTraceExporter.swift @@ -15,6 +15,7 @@ public class OtlpHttpTraceExporter: OtlpHttpExporterBase, SpanExporter { var pendingSpans: [SpanData] = [] + private let exporterLock = Lock() override public init(endpoint: URL = defaultOltpHttpTracesEndpoint(), config: OtlpConfiguration = OtlpConfiguration(), useSession: URLSession? = nil, envVarHeaders: [(String,String)]? = EnvVarHeaders.attributes) { @@ -22,12 +23,15 @@ public class OtlpHttpTraceExporter: OtlpHttpExporterBase, SpanExporter { } public func export(spans: [SpanData], explicitTimeout: TimeInterval? = nil) -> SpanExporterResultCode { - pendingSpans.append(contentsOf: spans) - let sendingSpans = pendingSpans - pendingSpans = [] + var sendingSpans: [SpanData] = [] + exporterLock.withLockVoid { + pendingSpans.append(contentsOf: spans) + sendingSpans = pendingSpans + pendingSpans = [] + } let body = Opentelemetry_Proto_Collector_Trace_V1_ExportTraceServiceRequest.with { - $0.resourceSpans = SpanAdapter.toProtoResourceSpans(spanDataList: spans) + $0.resourceSpans = SpanAdapter.toProtoResourceSpans(spanDataList: sendingSpans) } var request = createRequest(body: body, endpoint: endpoint) if let headers = envVarHeaders { @@ -45,7 +49,9 @@ public class OtlpHttpTraceExporter: OtlpHttpExporterBase, SpanExporter { case .success: break case .failure(let error): - self?.pendingSpans.append(contentsOf: sendingSpans) + self?.exporterLock.withLockVoid { + self?.pendingSpans.append(contentsOf: sendingSpans) + } print(error) } } @@ -54,6 +60,10 @@ public class OtlpHttpTraceExporter: OtlpHttpExporterBase, SpanExporter { public func flush(explicitTimeout: TimeInterval? = nil) -> SpanExporterResultCode { var resultValue: SpanExporterResultCode = .success + var pendingSpans: [SpanData] = [] + exporterLock.withLockVoid { + pendingSpans = self.pendingSpans + } if !pendingSpans.isEmpty { let body = Opentelemetry_Proto_Collector_Trace_V1_ExportTraceServiceRequest.with { $0.resourceSpans = SpanAdapter.toProtoResourceSpans(spanDataList: pendingSpans)