Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added BatchSpanProcessor metrics on drop and export span #635

Merged
26 changes: 13 additions & 13 deletions Sources/OpenTelemetryApi/Metrics/Stable/DefaultStableMeter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public class DefaultStableMeter : StableMeter {
}
}

private class NoopLongGaugeBuilder : LongGaugeBuilder {
private class NoopLongGaugeBuilder : LongGaugeBuilder {
mamunto marked this conversation as resolved.
Show resolved Hide resolved
func buildWithCallback(_ callback: @escaping (ObservableLongMeasurement) -> Void) -> ObservableLongGauge {
NoopObservableLongGauge()
}
Expand Down Expand Up @@ -123,18 +123,18 @@ public class DefaultStableMeter : StableMeter {
func add(value: Int, attribute: [String : AttributeValue]) {}
}

private class NoopLongCounterBuilder : LongCounterBuilder {
func ofDoubles() -> DoubleCounterBuilder {
NoopDoubleCounterBuilder()
}

func build() -> LongCounter {
NoopLongCounter()
}

func buildWithCallback(_ callback: @escaping (ObservableLongMeasurement) -> Void) -> ObservableLongCounter {
NoopObservableLongCounter()
}
private class NoopLongCounterBuilder : LongCounterBuilder {
mamunto marked this conversation as resolved.
Show resolved Hide resolved
func ofDoubles() -> DoubleCounterBuilder {
nachoBonafonte marked this conversation as resolved.
Show resolved Hide resolved
NoopDoubleCounterBuilder()
}
func build() -> LongCounter {
NoopLongCounter()
}
func buildWithCallback(_ callback: @escaping (ObservableLongMeasurement) -> Void) -> ObservableLongCounter {
NoopObservableLongCounter()
}
}

private class NoopDoubleCounterBuilder : DoubleCounterBuilder {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import Foundation
public class DefaultStableMeterProvider: StableMeterProvider {
static let noopMeterBuilder = NoopMeterBuilder()

public init() {}
mamunto marked this conversation as resolved.
Show resolved Hide resolved

public static func noop() -> MeterBuilder {
noopMeterBuilder
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,4 @@ public class LongGaugeBuilderSdk : LongGaugeBuilder, InstrumentBuilder {
public func buildWithCallback(_ callback: @escaping (OpenTelemetryApi.ObservableLongMeasurement) -> Void) -> OpenTelemetryApi.ObservableLongGauge {
registerLongAsynchronousInstrument(type: type, updater: callback)
}




}
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,36 @@ import OpenTelemetryApi
/// exports the spans to wake up and start a new export cycle.
/// This batchSpanProcessor can cause high contention in a very high traffic service.
public struct BatchSpanProcessor: SpanProcessor {
fileprivate static let SPAN_PROCESSOR_TYPE_LABEL: String = "processorType"
fileprivate static let SPAN_PROCESSOR_DROPPED_LABEL: String = "dropped"
fileprivate static let SPAN_PROCESSOR_TYPE_VALUE: String = BatchSpanProcessor.name

fileprivate var worker: BatchWorker


fileprivate var worker: BatchWorker

public init(spanExporter: SpanExporter, scheduleDelay: TimeInterval = 5, exportTimeout: TimeInterval = 30,
maxQueueSize: Int = 2048, maxExportBatchSize: Int = 512, willExportCallback: ((inout [SpanData]) -> Void)? = nil)
{
worker = BatchWorker(spanExporter: spanExporter,
scheduleDelay: scheduleDelay,
exportTimeout: exportTimeout,
maxQueueSize: maxQueueSize,
maxExportBatchSize: maxExportBatchSize,
willExportCallback: willExportCallback)
worker.start()
}
public static var name: String {
String(describing: Self.self)
}

public init(
spanExporter: SpanExporter,
meterProvider: StableMeterProvider,
scheduleDelay: TimeInterval = 5,
exportTimeout: TimeInterval = 30,
maxQueueSize: Int = 2048,
maxExportBatchSize: Int = 512,
willExportCallback: ((inout [SpanData]) -> Void)? = nil
) {
worker = BatchWorker(
spanExporter: spanExporter,
meterProvider: meterProvider,
scheduleDelay: scheduleDelay,
exportTimeout: exportTimeout,
maxQueueSize: maxQueueSize,
maxExportBatchSize: maxExportBatchSize,
willExportCallback: willExportCallback
)
worker.start()
}

public let isStartRequired = false
public let isEndRequired = true
Expand Down Expand Up @@ -57,40 +72,113 @@ public struct BatchSpanProcessor: SpanProcessor {
/// the data.
/// The list of batched data is protected by a NSCondition which ensures full concurrency.
private class BatchWorker: Thread {
let spanExporter: SpanExporter
let scheduleDelay: TimeInterval
let maxQueueSize: Int
let exportTimeout: TimeInterval
let maxExportBatchSize: Int
let willExportCallback: ((inout [SpanData]) -> Void)?
let halfMaxQueueSize: Int
private let cond = NSCondition()
var spanList = [ReadableSpan]()
var queue: OperationQueue

init(spanExporter: SpanExporter, scheduleDelay: TimeInterval, exportTimeout: TimeInterval, maxQueueSize: Int, maxExportBatchSize: Int, willExportCallback: ((inout [SpanData]) -> Void)?) {
self.spanExporter = spanExporter
self.scheduleDelay = scheduleDelay
self.exportTimeout = exportTimeout
self.maxQueueSize = maxQueueSize
halfMaxQueueSize = maxQueueSize >> 1
self.maxExportBatchSize = maxExportBatchSize
self.willExportCallback = willExportCallback
queue = OperationQueue()
queue.name = "BatchWorker Queue"
queue.maxConcurrentOperationCount = 1
}
let spanExporter: SpanExporter
let meterProvider: StableMeterProvider
let scheduleDelay: TimeInterval
let maxQueueSize: Int
let exportTimeout: TimeInterval
let maxExportBatchSize: Int
let willExportCallback: ((inout [SpanData]) -> Void)?
let halfMaxQueueSize: Int
private let cond = NSCondition()
var spanList = [ReadableSpan]()
var queue: OperationQueue

private var queueSizeGauge: ObservableLongGauge?
private var spanGaugeObserver: ObservableLongGauge?

private var processedSpansCounter: LongCounter?
private let droppedAttrs: [String: AttributeValue]
private let exportedAttrs: [String: AttributeValue]
private let spanGaugeBuilder: LongGaugeBuilder
init(
spanExporter: SpanExporter,
meterProvider: StableMeterProvider,
scheduleDelay: TimeInterval,
exportTimeout: TimeInterval,
maxQueueSize: Int,
maxExportBatchSize: Int,
willExportCallback: ((inout [SpanData]) -> Void)?
) {
self.spanExporter = spanExporter
self.meterProvider = meterProvider
self.scheduleDelay = scheduleDelay
self.exportTimeout = exportTimeout
self.maxQueueSize = maxQueueSize
halfMaxQueueSize = maxQueueSize >> 1
self.maxExportBatchSize = maxExportBatchSize
self.willExportCallback = willExportCallback
queue = OperationQueue()
queue.name = "BatchWorker Queue"
queue.maxConcurrentOperationCount = 1

let meter = meterProvider.meterBuilder(name: "io.opentelemetry.sdk.trace").build()
do {
self.queueSizeGauge = try meter.gaugeBuilder(name: "queueSize")
.ofLongs()
.asLongSdk()
.setDescription("The number of items queued")
.setUnit("1")
.buildWithCallback { result in
result.record(
value: maxQueueSize,
attributes: [
BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE)
]
)
}
} catch {}

self.spanGaugeBuilder = meter.gaugeBuilder(name: "spanSize")
.ofLongs()

do {
processedSpansCounter = try meter.counterBuilder(name: "processedSpans")
.asLongSdk()
.setUnit("1")
.setDescription("The number of spans processed by the BatchSpanProcessor. [dropped=true if they were dropped due to high throughput]")
.build()
} catch {}

droppedAttrs = [
BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE),
BatchSpanProcessor.SPAN_PROCESSOR_DROPPED_LABEL: .bool(true)
]
exportedAttrs = [
BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE),
BatchSpanProcessor.SPAN_PROCESSOR_DROPPED_LABEL: .bool(false)
]
}

deinit {
// Cleanup all gauge observer
self.queueSizeGauge?.close()
self.spanGaugeObserver?.close()
}
Comment on lines +158 to +162
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clean up here cc: @bryce-b


func addSpan(span: ReadableSpan) {
cond.lock()
defer { cond.unlock() }

if spanList.count == maxQueueSize {
// TODO: Record a counter for dropped spans.
processedSpansCounter?.add(value: 1, attribute: droppedAttrs)
return
}
// TODO: Record a gauge for referenced spans.
spanList.append(span)

// If there is any observer before, let's close it
self.spanGaugeObserver?.close()
mamunto marked this conversation as resolved.
Show resolved Hide resolved
// Subscribe to new gauge observer
self.spanGaugeObserver = self.spanGaugeBuilder
.buildWithCallback { [count = spanList.count] result in
result.record(
value: count,
attributes: [
BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE)
]
)
}

// Notify the worker thread that at half of the queue is available. It will take
// time anyway for the thread to wake up.
if spanList.count >= halfMaxQueueSize {
Expand Down Expand Up @@ -148,11 +236,39 @@ private class BatchWorker: Thread {
timeoutTimer.cancel()
}

private func exportAction(spanList: [ReadableSpan], explicitTimeout: TimeInterval? = nil) {
stride(from: 0, to: spanList.endIndex, by: maxExportBatchSize).forEach {
var spansToExport = spanList[$0 ..< min($0 + maxExportBatchSize, spanList.count)].map { $0.toSpanData() }
willExportCallback?(&spansToExport)
spanExporter.export(spans: spansToExport, explicitTimeout: explicitTimeout)
private func exportAction(spanList: [ReadableSpan], explicitTimeout: TimeInterval? = nil) {
stride(from: 0, to: spanList.endIndex, by: maxExportBatchSize).forEach {
var spansToExport = spanList[$0 ..< min($0 + maxExportBatchSize, spanList.count)].map { $0.toSpanData() }
willExportCallback?(&spansToExport)
let result = spanExporter.export(spans: spansToExport, explicitTimeout: explicitTimeout)
if result == .success {
cond.lock()
processedSpansCounter?.add(value: spanList.count, attribute: exportedAttrs)
cond.unlock()
}
}
}
}

public enum GuageError: Error {
mamunto marked this conversation as resolved.
Show resolved Hide resolved
case invalidType
}

/// Helper function to handle
extension LongGaugeBuilder {
mamunto marked this conversation as resolved.
Show resolved Hide resolved
public func asLongSdk() throws -> LongGaugeBuilderSdk {
guard let sdkType = self as? LongGaugeBuilderSdk else {
throw GuageError.invalidType
}
return sdkType
}
}

extension LongCounterBuilder {
public func asLongSdk() throws -> LongCounterMeterBuilderSdk {
guard let sdkType = self as? LongCounterMeterBuilderSdk else {
throw GuageError.invalidType
}
return sdkType
mamunto marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Loading
Loading