Skip to content

Commit

Permalink
Instrument akka-http bindAndHandle (#8174)
Browse files Browse the repository at this point in the history
Resolves
#8143
Resolves
#6081
Resolves
#5137
Using the same approach as in
#6243
and as used by DataDog. Unlike in #6243 this pr does not attempt to
prevent leaking scopes into actors but rather instruments the actor to
reset context to get rid of the leaked scopes (DataDog does the same).
  • Loading branch information
laurit authored Apr 5, 2023
1 parent 2f0819a commit d87f40c
Show file tree
Hide file tree
Showing 8 changed files with 267 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import akka.dispatch.sysmsg.SystemMessage;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.util.VirtualField;
import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge;
import io.opentelemetry.javaagent.bootstrap.executors.PropagatedContext;
import io.opentelemetry.javaagent.bootstrap.executors.TaskAdviceHelper;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
Expand Down Expand Up @@ -52,6 +53,11 @@ public static void exit(@Advice.Enter Scope scope) {
if (scope != null) {
scope.close();
}
// akka-http instrumentation can leak scopes
// reset the context to clear the leaked scopes
if (Java8BytecodeBridge.currentContext() != Java8BytecodeBridge.rootContext()) {
Java8BytecodeBridge.rootContext().makeCurrent();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,9 @@ dependencies {
library("com.typesafe.akka:akka-http_2.11:10.0.0")
library("com.typesafe.akka:akka-stream_2.11:2.4.14")

// these instrumentations are not needed for the tests to pass
// they are here to test for context leaks
testInstrumentation(project(":instrumentation:akka:akka-actor-2.3:javaagent"))
testInstrumentation(project(":instrumentation:akka:akka-actor-fork-join-2.5:javaagent"))
testInstrumentation(project(":instrumentation:scala-fork-join-2.8:javaagent"))

latestDepTestLibrary("com.typesafe.akka:akka-http_2.13:+")
latestDepTestLibrary("com.typesafe.akka:akka-stream_2.13:+")
Expand All @@ -48,6 +47,8 @@ tasks.withType<Test>().configureEach {
jvmArgs("--add-exports=java.base/sun.security.util=ALL-UNNAMED")
jvmArgs("-XX:+IgnoreUnrecognizedVMOptions")

jvmArgs("-Dio.opentelemetry.javaagent.shaded.io.opentelemetry.context.enableStrictContext=false")

systemProperty("testLatestDeps", findProperty("testLatestDeps") as Boolean)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.akkahttp.server;

import static io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge.currentContext;
import static io.opentelemetry.javaagent.instrumentation.akkahttp.server.AkkaHttpServerSingletons.errorResponse;
import static io.opentelemetry.javaagent.instrumentation.akkahttp.server.AkkaHttpServerSingletons.instrumenter;

import akka.http.scaladsl.model.HttpRequest;
import akka.http.scaladsl.model.HttpResponse;
import akka.stream.Attributes;
import akka.stream.BidiShape;
import akka.stream.Inlet;
import akka.stream.Outlet;
import akka.stream.scaladsl.Flow;
import akka.stream.stage.AbstractInHandler;
import akka.stream.stage.AbstractOutHandler;
import akka.stream.stage.GraphStage;
import akka.stream.stage.GraphStageLogic;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import java.util.ArrayDeque;
import java.util.Deque;

public class AkkaFlowWrapper
extends GraphStage<BidiShape<HttpResponse, HttpResponse, HttpRequest, HttpRequest>> {
private final Inlet<HttpRequest> requestIn = Inlet.create("otel.requestIn");
private final Outlet<HttpRequest> requestOut = Outlet.create("otel.requestOut");
private final Inlet<HttpResponse> responseIn = Inlet.create("otel.responseIn");
private final Outlet<HttpResponse> responseOut = Outlet.create("otel.responseOut");

private final BidiShape<HttpResponse, HttpResponse, HttpRequest, HttpRequest> shape =
BidiShape.of(responseIn, responseOut, requestIn, requestOut);

public static Flow<HttpRequest, HttpResponse, ?> wrap(
Flow<HttpRequest, HttpResponse, ?> handler) {
return handler.join(new AkkaFlowWrapper());
}

@Override
public BidiShape<HttpResponse, HttpResponse, HttpRequest, HttpRequest> shape() {
return shape;
}

@Override
public GraphStageLogic createLogic(Attributes attributes) {
return new TracingLogic();
}

private class TracingLogic extends GraphStageLogic {
private final Deque<TracingRequest> requests = new ArrayDeque<>();

public TracingLogic() {
super(shape);

// server pulls response, pass response from user code to server
setHandler(
responseOut,
new AbstractOutHandler() {
@Override
public void onPull() {
pull(responseIn);
}

@Override
public void onDownstreamFinish() {
cancel(responseIn);
}
});

// user code pulls request, pass request from server to user code
setHandler(
requestOut,
new AbstractOutHandler() {
@Override
public void onPull() {
pull(requestIn);
}

@Override
public void onDownstreamFinish() {
// Invoked on errors. Don't complete this stage to allow error-capturing
cancel(requestIn);
}
});

// new request from server
setHandler(
requestIn,
new AbstractInHandler() {
@Override
public void onPush() {
HttpRequest request = grab(requestIn);

TracingRequest tracingRequest = TracingRequest.EMPTY;
Context parentContext = currentContext();
if (instrumenter().shouldStart(parentContext, request)) {
Context context = instrumenter().start(parentContext, request);
// scope opened here may leak, actor instrumentation will close it
Scope scope = context.makeCurrent();
tracingRequest = new TracingRequest(context, scope, request);
}
// event if span wasn't started we need to push TracingRequest to match response
// with request
requests.push(tracingRequest);

push(requestOut, request);
}

@Override
public void onUpstreamFinish() {
complete(requestOut);
}

@Override
public void onUpstreamFailure(Throwable exception) {
fail(requestOut, exception);
}
});

// response from user code
setHandler(
responseIn,
new AbstractInHandler() {
@Override
public void onPush() {
HttpResponse response = grab(responseIn);

TracingRequest tracingRequest = requests.poll();
if (tracingRequest != null && tracingRequest != TracingRequest.EMPTY) {
// this may happen on a different thread from the one that opened the scope
// actor instrumentation will take care of the leaked scopes
tracingRequest.scope.close();
instrumenter().end(tracingRequest.context, tracingRequest.request, response, null);
}
push(responseOut, response);
}

@Override
public void onUpstreamFailure(Throwable exception) {
TracingRequest tracingRequest;
while ((tracingRequest = requests.poll()) != null) {
if (tracingRequest == TracingRequest.EMPTY) {
continue;
}
tracingRequest.scope.close();
instrumenter()
.end(
tracingRequest.context, tracingRequest.request, errorResponse(), exception);
}

fail(responseOut, exception);
}

@Override
public void onUpstreamFinish() {
completeStage();
}
});
}
}

private static class TracingRequest {
static final TracingRequest EMPTY = new TracingRequest(null, null, null);
final Context context;
final Scope scope;
final HttpRequest request;

TracingRequest(Context context, Scope scope, HttpRequest request) {
this.context = context;
this.scope = scope;
this.request = request;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,12 @@

package io.opentelemetry.javaagent.instrumentation.akkahttp.server;

import static io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge.currentContext;
import static io.opentelemetry.javaagent.instrumentation.akkahttp.server.AkkaHttpServerSingletons.errorResponse;
import static io.opentelemetry.javaagent.instrumentation.akkahttp.server.AkkaHttpServerSingletons.instrumenter;
import static java.util.Collections.singletonList;

import akka.http.scaladsl.model.HttpRequest;
import akka.http.scaladsl.model.HttpResponse;
import com.google.auto.service.AutoService;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import java.util.List;
import scala.Function1;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.runtime.AbstractFunction1;

@AutoService(InstrumentationModule.class)
public class AkkaHttpServerInstrumentationModule extends InstrumentationModule {
Expand All @@ -33,73 +22,4 @@ public AkkaHttpServerInstrumentationModule() {
public List<TypeInstrumentation> typeInstrumentations() {
return singletonList(new HttpExtServerInstrumentation());
}

public static class SyncWrapper extends AbstractFunction1<HttpRequest, HttpResponse> {
private final Function1<HttpRequest, HttpResponse> userHandler;

public SyncWrapper(Function1<HttpRequest, HttpResponse> userHandler) {
this.userHandler = userHandler;
}

@Override
public HttpResponse apply(HttpRequest request) {
Context parentContext = currentContext();
if (!instrumenter().shouldStart(parentContext, request)) {
return userHandler.apply(request);
}
Context context = instrumenter().start(parentContext, request);
try (Scope ignored = context.makeCurrent()) {
HttpResponse response = userHandler.apply(request);
instrumenter().end(context, request, response, null);
return response;
} catch (Throwable t) {
instrumenter().end(context, request, errorResponse(), t);
throw t;
}
}
}

public static class AsyncWrapper extends AbstractFunction1<HttpRequest, Future<HttpResponse>> {
private final Function1<HttpRequest, Future<HttpResponse>> userHandler;
private final ExecutionContext executionContext;

public AsyncWrapper(
Function1<HttpRequest, Future<HttpResponse>> userHandler,
ExecutionContext executionContext) {
this.userHandler = userHandler;
this.executionContext = executionContext;
}

@Override
public Future<HttpResponse> apply(HttpRequest request) {
Context parentContext = currentContext();
if (!instrumenter().shouldStart(parentContext, request)) {
return userHandler.apply(request);
}
Context context = instrumenter().start(parentContext, request);
try (Scope ignored = context.makeCurrent()) {
return userHandler
.apply(request)
.transform(
new AbstractFunction1<HttpResponse, HttpResponse>() {
@Override
public HttpResponse apply(HttpResponse response) {
instrumenter().end(context, request, response, null);
return response;
}
},
new AbstractFunction1<Throwable, Throwable>() {
@Override
public Throwable apply(Throwable t) {
instrumenter().end(context, request, errorResponse(), t);
return t;
}
},
executionContext);
} catch (Throwable t) {
instrumenter().end(context, request, null, t);
throw t;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,12 @@

import akka.http.scaladsl.model.HttpRequest;
import akka.http.scaladsl.model.HttpResponse;
import akka.stream.Materializer;
import akka.stream.scaladsl.Flow;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import scala.Function1;
import scala.concurrent.Future;

public class HttpExtServerInstrumentation implements TypeInstrumentation {
@Override
Expand All @@ -27,42 +25,18 @@ public ElementMatcher<TypeDescription> typeMatcher() {

@Override
public void transform(TypeTransformer transformer) {
// Instrumenting akka-streams bindAndHandle api was previously attempted.
// This proved difficult as there was no clean way to close the async scope
// in the graph logic after the user's request handler completes.
//
// Instead, we're instrumenting the bindAndHandle function helpers by
// wrapping the scala functions with our own handlers.
transformer.applyAdviceToMethod(
named("bindAndHandleSync").and(takesArgument(0, named("scala.Function1"))),
this.getClass().getName() + "$AkkaHttpSyncAdvice");
transformer.applyAdviceToMethod(
named("bindAndHandleAsync").and(takesArgument(0, named("scala.Function1"))),
this.getClass().getName() + "$AkkaHttpAsyncAdvice");
}

@SuppressWarnings("unused")
public static class AkkaHttpSyncAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void wrapHandler(
@Advice.Argument(value = 0, readOnly = false)
Function1<HttpRequest, HttpResponse> handler) {
handler = new AkkaHttpServerInstrumentationModule.SyncWrapper(handler);
}
named("bindAndHandle").and(takesArgument(0, named("akka.stream.scaladsl.Flow"))),
this.getClass().getName() + "$AkkaBindAndHandleAdvice");
}

@SuppressWarnings("unused")
public static class AkkaHttpAsyncAdvice {
public static class AkkaBindAndHandleAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void wrapHandler(
@Advice.Argument(value = 0, readOnly = false)
Function1<HttpRequest, Future<HttpResponse>> handler,
@Advice.Argument(7) Materializer materializer) {
handler =
new AkkaHttpServerInstrumentationModule.AsyncWrapper(
handler, materializer.executionContext());
@Advice.Argument(value = 0, readOnly = false) Flow<HttpRequest, HttpResponse, ?> handler) {
handler = AkkaFlowWrapper.wrap(handler);
}
}
}
Loading

0 comments on commit d87f40c

Please sign in to comment.