Skip to content

Commit

Permalink
Fix thread safety issue in HTTP exporters
Browse files Browse the repository at this point in the history
We were seeing occasional crashes that were caused by pendingLogRecords being accessed from multiple threads.
  • Loading branch information
justinhporter committed Oct 25, 2023
1 parent dea8ace commit 409f18b
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ public func defaultOltpHttpLoggingEndpoint() -> URL {
public class OtlpHttpLogExporter : OtlpHttpExporterBase, LogRecordExporter {

var pendingLogRecords: [ReadableLogRecord] = []

let dispatchQueue = DispatchQueue(label: "OtlpHttpLogExporter Queue")

override public init(endpoint: URL = defaultOltpHttpLoggingEndpoint(),
config: OtlpConfiguration = OtlpConfiguration(),
useSession: URLSession? = nil,
Expand All @@ -23,22 +24,28 @@ public class OtlpHttpLogExporter : OtlpHttpExporterBase, LogRecordExporter {
}

public func export(logRecords: [OpenTelemetrySdk.ReadableLogRecord], explicitTimeout: TimeInterval? = nil) -> OpenTelemetrySdk.ExportResult {
pendingLogRecords.append(contentsOf: logRecords)
let sendingLogRecords = pendingLogRecords
pendingLogRecords = []

var sendingLogRecords: [ReadableLogRecord]!
dispatchQueue.sync {
pendingLogRecords.append(contentsOf: logRecords)
sendingLogRecords = pendingLogRecords
pendingLogRecords = []
}

let body = Opentelemetry_Proto_Collector_Logs_V1_ExportLogsServiceRequest.with { request in
request.resourceLogs = LogRecordAdapter.toProtoResourceRecordLog(logRecordList: sendingLogRecords)
}

var request = createRequest(body: body, endpoint: endpoint)
request.timeoutInterval = min(explicitTimeout ?? TimeInterval.greatestFiniteMagnitude , config.timeout)
httpClient.send(request: request) { [weak self] result in
guard let self = self else { return }
switch result {
case .success(_):
break
case .failure(let error):
self?.pendingLogRecords.append(contentsOf: sendingLogRecords)
self.dispatchQueue.sync { [weak self] in
self?.pendingLogRecords.append(contentsOf: sendingLogRecords)
}
print(error)
}
}
Expand All @@ -52,7 +59,10 @@ public class OtlpHttpLogExporter : OtlpHttpExporterBase, LogRecordExporter {

public func flush(explicitTimeout: TimeInterval? = nil) -> ExportResult {
var exporterResult: ExportResult = .success

var pendingLogRecords: [ReadableLogRecord]!
dispatchQueue.sync {
pendingLogRecords = self.pendingLogRecords
}
if !pendingLogRecords.isEmpty {
let body = Opentelemetry_Proto_Collector_Logs_V1_ExportLogsServiceRequest.with { request in
request.resourceLogs = LogRecordAdapter.toProtoResourceRecordLog(logRecordList: pendingLogRecords)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,35 @@ public func defaultOltpHTTPMetricsEndpoint() -> URL {

public class OtlpHttpMetricExporter: OtlpHttpExporterBase, MetricExporter {
var pendingMetrics: [Metric] = []

let dispatchQueue = DispatchQueue(label: "OtlpHttpMetricExporter Queue")

override
public init(endpoint: URL = defaultOltpHTTPMetricsEndpoint(), config : OtlpConfiguration = OtlpConfiguration(), useSession: URLSession? = nil, envVarHeaders: [(String,String)]? = EnvVarHeaders.attributes) {
super.init(endpoint: endpoint, config: config, useSession: useSession, envVarHeaders: envVarHeaders)
}

public func export(metrics: [Metric], shouldCancel: (() -> Bool)?) -> MetricExporterResultCode {
pendingMetrics.append(contentsOf: metrics)
let sendingMetrics = pendingMetrics
pendingMetrics = []
var sendingMetrics: [Metric]!
dispatchQueue.sync {
pendingMetrics.append(contentsOf: metrics)
sendingMetrics = pendingMetrics
pendingMetrics = []
}

let body = Opentelemetry_Proto_Collector_Metrics_V1_ExportMetricsServiceRequest.with {
$0.resourceMetrics = MetricsAdapter.toProtoResourceMetrics(metricDataList: sendingMetrics)
}

let request = createRequest(body: body, endpoint: endpoint)
httpClient.send(request: request) { [weak self] result in
guard let self = self else { return }
switch result {
case .success(_):
break
case .failure(let error):
self?.pendingMetrics.append(contentsOf: sendingMetrics)
self.dispatchQueue.sync { [weak self] in
self?.pendingMetrics.append(contentsOf: sendingMetrics)
}
print(error)
}
}
Expand All @@ -43,7 +51,10 @@ public class OtlpHttpMetricExporter: OtlpHttpExporterBase, MetricExporter {

public func flush() -> MetricExporterResultCode {
var exporterResult: MetricExporterResultCode = .success

var pendingMetrics: [Metric]!
dispatchQueue.sync {
pendingMetrics = self.pendingMetrics
}
if !pendingMetrics.isEmpty {
let body = Opentelemetry_Proto_Collector_Metrics_V1_ExportMetricsServiceRequest.with {
$0.resourceMetrics = MetricsAdapter.toProtoResourceMetrics(metricDataList: pendingMetrics)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,24 @@ public class OtlpHttpTraceExporter: OtlpHttpExporterBase, SpanExporter {


var pendingSpans: [SpanData] = []
let dispatchQueue = DispatchQueue(label: "OtlpHttpTraceExporter Queue")

override
public init(endpoint: URL = defaultOltpHttpTracesEndpoint(), config: OtlpConfiguration = OtlpConfiguration(),
useSession: URLSession? = nil, envVarHeaders: [(String,String)]? = EnvVarHeaders.attributes) {
super.init(endpoint: endpoint, config: config, useSession: useSession)
}

public func export(spans: [SpanData], explicitTimeout: TimeInterval? = nil) -> SpanExporterResultCode {
pendingSpans.append(contentsOf: spans)
let sendingSpans = pendingSpans
pendingSpans = []

var sendingSpans: [SpanData]!
dispatchQueue.sync {
pendingSpans.append(contentsOf: spans)
sendingSpans = pendingSpans
pendingSpans = []
}

let body = Opentelemetry_Proto_Collector_Trace_V1_ExportTraceServiceRequest.with {
$0.resourceSpans = SpanAdapter.toProtoResourceSpans(spanDataList: spans)
$0.resourceSpans = SpanAdapter.toProtoResourceSpans(spanDataList: sendingSpans)
}
var request = createRequest(body: body, endpoint: endpoint)
if let headers = envVarHeaders {
Expand All @@ -41,11 +46,14 @@ public class OtlpHttpTraceExporter: OtlpHttpExporterBase, SpanExporter {
}
}
httpClient.send(request: request) { [weak self] result in
guard let self = self else { return }
switch result {
case .success:
break
case .failure(let error):
self?.pendingSpans.append(contentsOf: sendingSpans)
self.dispatchQueue.sync { [weak self] in
self?.pendingSpans.append(contentsOf: sendingSpans)
}
print(error)
}
}
Expand All @@ -54,6 +62,10 @@ public class OtlpHttpTraceExporter: OtlpHttpExporterBase, SpanExporter {

public func flush(explicitTimeout: TimeInterval? = nil) -> SpanExporterResultCode {
var resultValue: SpanExporterResultCode = .success
var pendingSpans: [SpanData]!
dispatchQueue.sync {
pendingSpans = self.pendingSpans
}
if !pendingSpans.isEmpty {
let body = Opentelemetry_Proto_Collector_Trace_V1_ExportTraceServiceRequest.with {
$0.resourceSpans = SpanAdapter.toProtoResourceSpans(spanDataList: pendingSpans)
Expand Down

0 comments on commit 409f18b

Please sign in to comment.