Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix thread safety issue in HTTP exporters #481

Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ public func defaultOltpHttpLoggingEndpoint() -> URL {
public class OtlpHttpLogExporter : OtlpHttpExporterBase, LogRecordExporter {

var pendingLogRecords: [ReadableLogRecord] = []

let dispatchQueue = DispatchQueue(label: "OtlpHttpLogExporter Queue")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer if you imported Locks.swift file which is in other parts of the project (e.g in the sdk, but its methods are internal) and you used it instead of GCD queues like:

private let exporterLock = Lock()

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It appears that Lock is internal to OpenTelemetrySdk. Would you like me to make it public, or copy it to OpenTelemetryProtocolHttp?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy in the exporter folder. Thanks.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.


override public init(endpoint: URL = defaultOltpHttpLoggingEndpoint(),
config: OtlpConfiguration = OtlpConfiguration(),
useSession: URLSession? = nil,
Expand All @@ -23,22 +24,28 @@ public class OtlpHttpLogExporter : OtlpHttpExporterBase, LogRecordExporter {
}

public func export(logRecords: [OpenTelemetrySdk.ReadableLogRecord], explicitTimeout: TimeInterval? = nil) -> OpenTelemetrySdk.ExportResult {
pendingLogRecords.append(contentsOf: logRecords)
let sendingLogRecords = pendingLogRecords
pendingLogRecords = []

var sendingLogRecords: [ReadableLogRecord]!
dispatchQueue.sync {
pendingLogRecords.append(contentsOf: logRecords)
sendingLogRecords = pendingLogRecords
pendingLogRecords = []
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of this you can use:

var sendingLogRecords: [ReadableLogRecord] = []
exporterLock.withLockVoid {
    pendingLogRecords.append(contentsOf: logRecords)
    sendingLogRecords = pendingLogRecords
    pendingLogRecords = []      
  }


let body = Opentelemetry_Proto_Collector_Logs_V1_ExportLogsServiceRequest.with { request in
request.resourceLogs = LogRecordAdapter.toProtoResourceRecordLog(logRecordList: sendingLogRecords)
}

var request = createRequest(body: body, endpoint: endpoint)
request.timeoutInterval = min(explicitTimeout ?? TimeInterval.greatestFiniteMagnitude , config.timeout)
httpClient.send(request: request) { [weak self] result in
guard let self = self else { return }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is wrong, we would ideally like to know if there was an error, which now is not printing anymore

switch result {
case .success(_):
break
case .failure(let error):
self?.pendingLogRecords.append(contentsOf: sendingLogRecords)
self.dispatchQueue.sync { [weak self] in
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same with the exporterLock.withLockVoid

self?.pendingLogRecords.append(contentsOf: sendingLogRecords)
}
print(error)
}
}
Expand All @@ -52,7 +59,10 @@ public class OtlpHttpLogExporter : OtlpHttpExporterBase, LogRecordExporter {

public func flush(explicitTimeout: TimeInterval? = nil) -> ExportResult {
var exporterResult: ExportResult = .success

var pendingLogRecords: [ReadableLogRecord]!
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same with the exporterLock.withLockVoid

dispatchQueue.sync {
pendingLogRecords = self.pendingLogRecords
}
if !pendingLogRecords.isEmpty {
let body = Opentelemetry_Proto_Collector_Logs_V1_ExportLogsServiceRequest.with { request in
request.resourceLogs = LogRecordAdapter.toProtoResourceRecordLog(logRecordList: pendingLogRecords)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,35 @@ public func defaultOltpHTTPMetricsEndpoint() -> URL {

public class OtlpHttpMetricExporter: OtlpHttpExporterBase, MetricExporter {
var pendingMetrics: [Metric] = []

let dispatchQueue = DispatchQueue(label: "OtlpHttpMetricExporter Queue")

override
public init(endpoint: URL = defaultOltpHTTPMetricsEndpoint(), config : OtlpConfiguration = OtlpConfiguration(), useSession: URLSession? = nil, envVarHeaders: [(String,String)]? = EnvVarHeaders.attributes) {
super.init(endpoint: endpoint, config: config, useSession: useSession, envVarHeaders: envVarHeaders)
}

public func export(metrics: [Metric], shouldCancel: (() -> Bool)?) -> MetricExporterResultCode {
pendingMetrics.append(contentsOf: metrics)
let sendingMetrics = pendingMetrics
pendingMetrics = []
var sendingMetrics: [Metric]!
dispatchQueue.sync {
pendingMetrics.append(contentsOf: metrics)
sendingMetrics = pendingMetrics
pendingMetrics = []
}

let body = Opentelemetry_Proto_Collector_Metrics_V1_ExportMetricsServiceRequest.with {
$0.resourceMetrics = MetricsAdapter.toProtoResourceMetrics(metricDataList: sendingMetrics)
}

let request = createRequest(body: body, endpoint: endpoint)
httpClient.send(request: request) { [weak self] result in
guard let self = self else { return }
switch result {
case .success(_):
break
case .failure(let error):
self?.pendingMetrics.append(contentsOf: sendingMetrics)
self.dispatchQueue.sync { [weak self] in
self?.pendingMetrics.append(contentsOf: sendingMetrics)
}
print(error)
}
}
Expand All @@ -43,7 +51,10 @@ public class OtlpHttpMetricExporter: OtlpHttpExporterBase, MetricExporter {

public func flush() -> MetricExporterResultCode {
var exporterResult: MetricExporterResultCode = .success

var pendingMetrics: [Metric]!
dispatchQueue.sync {
pendingMetrics = self.pendingMetrics
}
if !pendingMetrics.isEmpty {
let body = Opentelemetry_Proto_Collector_Metrics_V1_ExportMetricsServiceRequest.with {
$0.resourceMetrics = MetricsAdapter.toProtoResourceMetrics(metricDataList: pendingMetrics)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,24 @@ public class OtlpHttpTraceExporter: OtlpHttpExporterBase, SpanExporter {


var pendingSpans: [SpanData] = []
let dispatchQueue = DispatchQueue(label: "OtlpHttpTraceExporter Queue")

override
public init(endpoint: URL = defaultOltpHttpTracesEndpoint(), config: OtlpConfiguration = OtlpConfiguration(),
useSession: URLSession? = nil, envVarHeaders: [(String,String)]? = EnvVarHeaders.attributes) {
super.init(endpoint: endpoint, config: config, useSession: useSession)
}

public func export(spans: [SpanData], explicitTimeout: TimeInterval? = nil) -> SpanExporterResultCode {
pendingSpans.append(contentsOf: spans)
let sendingSpans = pendingSpans
pendingSpans = []

var sendingSpans: [SpanData]!
dispatchQueue.sync {
pendingSpans.append(contentsOf: spans)
sendingSpans = pendingSpans
pendingSpans = []
}

let body = Opentelemetry_Proto_Collector_Trace_V1_ExportTraceServiceRequest.with {
$0.resourceSpans = SpanAdapter.toProtoResourceSpans(spanDataList: spans)
$0.resourceSpans = SpanAdapter.toProtoResourceSpans(spanDataList: sendingSpans)
}
var request = createRequest(body: body, endpoint: endpoint)
if let headers = envVarHeaders {
Expand All @@ -41,11 +46,14 @@ public class OtlpHttpTraceExporter: OtlpHttpExporterBase, SpanExporter {
}
}
httpClient.send(request: request) { [weak self] result in
guard let self = self else { return }
switch result {
case .success:
break
case .failure(let error):
self?.pendingSpans.append(contentsOf: sendingSpans)
self.dispatchQueue.sync { [weak self] in
self?.pendingSpans.append(contentsOf: sendingSpans)
}
print(error)
}
}
Expand All @@ -54,6 +62,10 @@ public class OtlpHttpTraceExporter: OtlpHttpExporterBase, SpanExporter {

public func flush(explicitTimeout: TimeInterval? = nil) -> SpanExporterResultCode {
var resultValue: SpanExporterResultCode = .success
var pendingSpans: [SpanData]!
dispatchQueue.sync {
pendingSpans = self.pendingSpans
}
if !pendingSpans.isEmpty {
let body = Opentelemetry_Proto_Collector_Trace_V1_ExportTraceServiceRequest.with {
$0.resourceSpans = SpanAdapter.toProtoResourceSpans(spanDataList: pendingSpans)
Expand Down