Skip to content

Commit

Permalink
Add AsyncSpanEndStrategy for Reactor 3.x instrumentation (#2714)
Browse files Browse the repository at this point in the history
  • Loading branch information
HaloFour authored Apr 20, 2021
1 parent fa359a4 commit c7e4314
Show file tree
Hide file tree
Showing 7 changed files with 627 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@ dependencies {
implementation project(':instrumentation:reactor-3.1:library')

testLibrary group: 'io.projectreactor', name: 'reactor-core', version: '3.1.0.RELEASE'
testLibrary group: 'io.projectreactor', name: 'reactor-test', version: '3.1.0.RELEASE'

testImplementation project(':instrumentation:reactor-3.1:testing')
testImplementation deps.opentelemetryExtAnnotations

latestDepTestLibrary group: 'io.projectreactor', name: 'reactor-core', version: '3.+'
latestDepTestLibrary group: 'io.projectreactor', name: 'reactor-test', version: '3.+'
// Looks like later versions on reactor need this dependency for some reason even though it is marked as optional.
latestDepTestLibrary group: 'io.micrometer', name: 'micrometer-core', version: '1.+'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

import io.opentelemetry.api.trace.SpanKind
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import io.opentelemetry.instrumentation.reactor.TracedWithSpan
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.publisher.UnicastProcessor
import reactor.test.StepVerifier

class ReactorWithSpanInstrumentationTest extends AgentInstrumentationSpecification {

def "should capture span for already completed Mono"() {
setup:
def source = Mono.just("Value")
def result = new TracedWithSpan()
.mono(source)

expect:
StepVerifier.create(result)
.expectNext("Value")
.verifyComplete()

assertTraces(1) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.mono"
kind SpanKind.INTERNAL
hasNoParent()
errored false
attributes {
}
}
}
}
}

def "should capture span for eventually completed Mono"() {
setup:
def source = UnicastProcessor.<String>create()
def mono = source.singleOrEmpty()
def result = new TracedWithSpan()
.mono(mono)
def verifier = StepVerifier.create(result)
.expectSubscription()

expect:
Thread.sleep(500) // sleep a bit just to make sure no span is captured
assertTraces(0) {}

source.onNext("Value")
source.onComplete()

verifier.expectNext("Value")
.verifyComplete()

assertTraces(1) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.mono"
kind SpanKind.INTERNAL
hasNoParent()
errored false
attributes {
}
}
}
}
}

def "should capture span for already errored Mono"() {
setup:
def error = new IllegalArgumentException("Boom")
def source = Mono.error(error)
def result = new TracedWithSpan()
.mono(source)

expect:
StepVerifier.create(result)
.verifyErrorMatches({ it == error })

assertTraces(1) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.mono"
kind SpanKind.INTERNAL
hasNoParent()
errored true
errorEvent(IllegalArgumentException, "Boom")
attributes {
}
}
}
}
}

def "should capture span for eventually errored Mono"() {
setup:
def error = new IllegalArgumentException("Boom")
def source = UnicastProcessor.<String>create()
def mono = source.singleOrEmpty()
def result = new TracedWithSpan()
.mono(mono)
def verifier = StepVerifier.create(result)
.expectSubscription()

expect:
Thread.sleep(500) // sleep a bit just to make sure no span is captured
assertTraces(0) {}

source.onError(error)

verifier
.verifyErrorMatches({ it == error })

assertTraces(1) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.mono"
kind SpanKind.INTERNAL
hasNoParent()
errored true
errorEvent(IllegalArgumentException, "Boom")
attributes {
}
}
}
}
}

def "should capture span for already completed Flux"() {
setup:
def source = Flux.just("Value")
def result = new TracedWithSpan()
.flux(source)

expect:
StepVerifier.create(result)
.expectNext("Value")
.verifyComplete()

assertTraces(1) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.flux"
kind SpanKind.INTERNAL
hasNoParent()
errored false
attributes {
}
}
}
}
}

def "should capture span for eventually completed Flux"() {
setup:
def source = UnicastProcessor.<String>create()
def result = new TracedWithSpan()
.flux(source)
def verifier = StepVerifier.create(result)
.expectSubscription()

expect:
Thread.sleep(500) // sleep a bit just to make sure no span is captured
assertTraces(0) {}

source.onNext("Value")
source.onComplete()

verifier.expectNext("Value")
.verifyComplete()

assertTraces(1) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.flux"
kind SpanKind.INTERNAL
hasNoParent()
errored false
attributes {
}
}
}
}
}

def "should capture span for already errored Flux"() {
setup:
def error = new IllegalArgumentException("Boom")
def source = Flux.error(error)
def result = new TracedWithSpan()
.flux(source)

expect:
StepVerifier.create(result)
.verifyErrorMatches({ it == error })

assertTraces(1) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.flux"
kind SpanKind.INTERNAL
hasNoParent()
errored true
errorEvent(IllegalArgumentException, "Boom")
attributes {
}
}
}
}
}

def "should capture span for eventually errored Flux"() {
setup:
def error = new IllegalArgumentException("Boom")
def source = UnicastProcessor.<String>create()
def result = new TracedWithSpan()
.flux(source)
def verifier = StepVerifier.create(result)
.expectSubscription()

expect:
Thread.sleep(500) // sleep a bit just to make sure no span is captured
assertTraces(0) {}

source.onError(error)

verifier.verifyErrorMatches({ it == error })

assertTraces(1) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.flux"
kind SpanKind.INTERNAL
hasNoParent()
errored true
errorEvent(IllegalArgumentException, "Boom")
attributes {
}
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.reactor;

import io.opentelemetry.extension.annotations.WithSpan;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class TracedWithSpan {
@WithSpan
public Mono<String> mono(Mono<String> mono) {
return mono;
}

@WithSpan
public Flux<String> flux(Flux<String> flux) {
return flux;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ apply from: "$rootDir/gradle/instrumentation-library.gradle"

dependencies {
library group: 'io.projectreactor', name: 'reactor-core', version: '3.1.0.RELEASE'
testLibrary group: 'io.projectreactor', name: 'reactor-test', version: '3.1.0.RELEASE'

testImplementation project(':instrumentation:reactor-3.1:testing')

latestDepTestLibrary group: 'io.projectreactor', name: 'reactor-core', version: '3.+'
latestDepTestLibrary group: 'io.projectreactor', name: 'reactor-test', version: '3.+'
// Looks like later versions on reactor need this dependency for some reason even though it is marked as optional.
latestDepTestLibrary group: 'io.micrometer', name: 'micrometer-core', version: '1.+'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.reactor;

import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.tracer.BaseTracer;
import io.opentelemetry.instrumentation.api.tracer.async.AsyncSpanEndStrategy;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public enum ReactorAsyncSpanEndStrategy implements AsyncSpanEndStrategy {
INSTANCE;

@Override
public boolean supports(Class<?> returnType) {
return returnType == Publisher.class || returnType == Mono.class || returnType == Flux.class;
}

@Override
public Object end(BaseTracer tracer, Context context, Object returnValue) {

EndOnFirstNotificationConsumer notificationConsumer =
new EndOnFirstNotificationConsumer(tracer, context);
if (returnValue instanceof Mono) {
Mono<?> mono = (Mono<?>) returnValue;
return mono.doOnError(notificationConsumer).doOnSuccess(notificationConsumer::onSuccess);
} else {
Flux<?> flux = Flux.from((Publisher<?>) returnValue);
return flux.doOnError(notificationConsumer).doOnComplete(notificationConsumer);
}
}

/**
* Helper class to ensure that the span is ended exactly once regardless of how many OnComplete or
* OnError notifications are received. Multiple notifications can happen anytime multiple
* subscribers subscribe to the same publisher.
*/
private static final class EndOnFirstNotificationConsumer extends AtomicBoolean
implements Runnable, Consumer<Throwable> {

private final BaseTracer tracer;
private final Context context;

public EndOnFirstNotificationConsumer(BaseTracer tracer, Context context) {
super(false);
this.tracer = tracer;
this.context = context;
}

public <T> void onSuccess(T ignored) {
accept(null);
}

@Override
public void run() {
accept(null);
}

@Override
public void accept(Throwable exception) {
if (compareAndSet(false, true)) {
if (exception != null) {
tracer.endExceptionally(context, exception);
} else {
tracer.end(context);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
package io.opentelemetry.instrumentation.reactor;

import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.tracer.async.AsyncSpanEndStrategies;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.reactivestreams.Publisher;
Expand All @@ -43,11 +44,13 @@ public class TracingOperator {
*/
public static void registerOnEachOperator() {
Hooks.onEachOperator(TracingSubscriber.class.getName(), tracingLift());
AsyncSpanEndStrategies.getInstance().registerStrategy(ReactorAsyncSpanEndStrategy.INSTANCE);
}

/** Unregisters the hook registered by {@link #registerOnEachOperator()}. */
public static void resetOnEachOperator() {
Hooks.resetOnEachOperator(TracingSubscriber.class.getName());
AsyncSpanEndStrategies.getInstance().unregisterStrategy(ReactorAsyncSpanEndStrategy.INSTANCE);
}

private static <T> Function<? super Publisher<T>, ? extends Publisher<T>> tracingLift() {
Expand Down
Loading

0 comments on commit c7e4314

Please sign in to comment.