Skip to content

Commit

Permalink
Merge pull request #122 from rabeyta/asyncHttpClient2Module
Browse files Browse the repository at this point in the history
add riposte-async-http-client2 module that uses async http client 2.x
  • Loading branch information
nicmunroe authored Jun 4, 2019
2 parents b68d86e + 330bc21 commit 03f9f45
Show file tree
Hide file tree
Showing 20 changed files with 4,279 additions and 2 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ classes/
.classpath

### Misc stuff
.vagrant
.vagrant
.DS_Store
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ Riposte is a collection of several libraries, mainly divided up based on depende

* [riposte-spi](riposte-spi/) - Contains the main interfaces and classes necessary to define the interface for a Riposte server.
* [riposte-core](riposte-core/) - Builds on `riposte-spi` to provide a fully functioning Riposte server.
* [riposte-async-http-client](riposte-async-http-client/) - Contains [`AsyncHttpClientHelper`](https://github.com/Nike-Inc/riposte/blob/master/riposte-async-http-client/src/main/java/com/nike/riposte/client/asynchttp/ning/AsyncHttpClientHelper.java), an HTTP client for performing async nonblocking calls using `CompletableFuture`s with distributed tracing baked in.
* [riposte-async-http-client](riposte-async-http-client/) - **DEPRECATED - Please see riposte-async-http-clien2** Contains [`AsyncHttpClientHelper`](https://github.com/Nike-Inc/riposte/blob/master/riposte-async-http-client/src/main/java/com/nike/riposte/client/asynchttp/ning/AsyncHttpClientHelper.java), an HTTP client for performing async nonblocking calls using `CompletableFuture`s with distributed tracing baked in.
* [riposte-async-http-client2](riposte-async-http-client2/) - Contains [`AsyncHttpClientHelper`](https://github.com/Nike-Inc/riposte/blob/master/riposte-async-http-client2/src/main/java/com/nike/riposte/client/asynchttp/ning/AsyncHttpClientHelper.java), an HTTP client for performing async nonblocking calls using `CompletableFuture`s with distributed tracing baked in.
* [riposte-metrics-codahale](riposte-metrics-codahale/) - Contains metrics support for Riposte using the `io.dropwizard` version of Codahale metrics.
* [riposte-metrics-codahale-signalfx](riposte-metrics-codahale-signalfx/) - Contains SignalFx-specific extensions of the `riposte-metrics-codahale` library module.
* [riposte-auth](riposte-auth/) - Contains a few implementations of the Riposte [`RequestSecurityValidator`](https://github.com/Nike-Inc/riposte/blob/master/riposte-spi/src/main/java/com/nike/riposte/server/error/validation/RequestSecurityValidator.java), e.g. for basic auth and other security schemes.
Expand Down
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ ext {
javassistVersion = '3.18.2-GA'
jacksonVersion = '2.9.9'
ningAsyncHttpClientVersion = '1.9.38'
asyncHttpClientVersion = '2.9.0'

servletApiVersion = '3.1.0'

Expand Down
21 changes: 21 additions & 0 deletions riposte-async-http-client2/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
evaluationDependsOn(':')

dependencies {
compile(
project(":riposte-core"),
"org.asynchttpclient:async-http-client:$asyncHttpClientVersion"
)
compileOnly(
"org.jetbrains:annotations:$jetbrainsAnnotationsVersion"
)
testCompile(
"org.jetbrains:annotations:$jetbrainsAnnotationsVersion",
"junit:junit:$junitVersion",
"org.mockito:mockito-core:$mockitoVersion",
"io.rest-assured:rest-assured:$restAssuredVersion",
"org.assertj:assertj-core:$assertJVersion",
"com.tngtech.java:junit-dataprovider:$junitDataproviderVersion",
"ch.qos.logback:logback-classic:$logbackVersion",
"ch.qos.logback:logback-core:$logbackVersion"
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,299 @@
package com.nike.riposte.client.asynchttp;

import com.nike.fastbreak.CircuitBreaker;
import com.nike.internal.util.Pair;
import com.nike.internal.util.StringUtils;
import com.nike.riposte.server.config.distributedtracing.SpanNamingAndTaggingStrategy;
import com.nike.riposte.util.AsyncNettyHelper;
import com.nike.wingtips.Span;
import com.nike.wingtips.Tracer;
import com.nike.wingtips.http.HttpRequestTracingUtils;

import org.asynchttpclient.AsyncCompletionHandler;
import org.asynchttpclient.Response;

import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import java.util.Deque;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import static com.nike.riposte.util.AsyncNettyHelper.linkTracingAndMdcToCurrentThread;

/**
* Extension of {@link org.asynchttpclient.AsyncCompletionHandler} that handles distributed tracing and MDC issues so
* that the dtrace and MDC info you want are attached to the thread performing the work for the downstream call. The
* {@link #completableFutureResponse} you pass in will be completed or completed exceptionally depending on the result
* of the downstream call, and it will be completed with the result of the {@link #responseHandlerFunction} you pass
* in.
* <p/>
* Used by {@link AsyncHttpClientHelper}
*
* @author Nic Munroe
*/
@SuppressWarnings({"WeakerAccess", "OptionalUsedAsFieldOrParameterType"})
class AsyncCompletionHandlerWithTracingAndMdcSupport<O> extends AsyncCompletionHandler<Response> {

private static final Logger logger = LoggerFactory.getLogger(AsyncCompletionHandlerWithTracingAndMdcSupport.class);

/**
* The {@link CompletableFuture} that should be completed with {@link #responseHandlerFunction}'s value (or
* completed exceptionally if an error occurs) when the downstream call returns.
*/
protected final CompletableFuture<O> completableFutureResponse;
/**
* The handler that will get notified with the downstream call's response. The value of {@link
* AsyncResponseHandler#handleResponse(Response)} will be used to complete {@link #completableFutureResponse}.
*/
protected final AsyncResponseHandler<O> responseHandlerFunction;
/**
* Whether or not the downstream call should be surrounded with a subspan. If true then {@link
* #distributedTraceStackToUse} will have a subspan placed on top, otherwise it will be used as-is.
*/
protected final boolean performSubSpanAroundDownstreamCalls;
/**
* The distributed tracing span stack to use for the downstream call. If {@link
* #performSubSpanAroundDownstreamCalls} is true then a new subspan will be placed on top of this, otherwise it will
* be used as-is.
*/
protected final Deque<Span> distributedTraceStackToUse;
/**
* The MDC context to associate with the downstream call.
*/
protected final Map<String, String> mdcContextToUse;
/**
* The circuit breaker manual mode task to notify of response events or exceptions, or empty if circuit breaking has
* been disabled for this call.
*/
protected final Optional<CircuitBreaker.ManualModeTask<Response>> circuitBreakerManualTask;
/**
* A copy of the {@link RequestBuilderWrapper} that will be used to execute the HTTP client call, with ony the
* HTTP method and URL populated. This copy performs two purposes - it keeps us from holding onto the original,
* which may have a large body that we don't want to hold onto, and it lets the original be reused (i.e. adjusting
* HTTP method and/or URL, etc) without affecting this class' referencing of the original values.
*
*/
protected final RequestBuilderWrapper rbwCopyWithHttpMethodAndUrlOnly;
/**
* The {@link SpanNamingAndTaggingStrategy} to use when creating subspan names and response/error tagging for the
* subspan (only used if {@link #performSubSpanAroundDownstreamCalls} is true).
*/
protected final SpanNamingAndTaggingStrategy<RequestBuilderWrapper, Response, Span> tagAndNamingStrategy;

/**
* @param completableFutureResponse
* The {@link CompletableFuture} that should be completed with {@code responseHandlerFunction}'s value (or
* completed exceptionally if an error occurs) when the downstream call returns.
* @param responseHandlerFunction
* The handler that will get notified with the downstream call's response. The value of {@link
* AsyncResponseHandler#handleResponse(Response)} will be used to complete {@code completableFutureResponse}.
* @param performSubSpanAroundDownstreamCalls
* Whether or not the downstream call should be surrounded with a subspan. If true then {@code
* distributedTraceStackToUse} will have a subspan placed on top, otherwise it will be used as-is.
* @param requestBuilderWrapper The {@link RequestBuilderWrapper} that will be used to execute the HTTP client call.
* @param circuitBreakerManualTask
* The circuit breaker manual mode task to notify of response events or exceptions, or empty if circuit breaking
* has been disabled for this call.
* @param distributedTraceStackToUse
* The distributed trace stack to use for the downstream call. If {@code performSubSpanAroundDownstreamCalls} is
* true then a new subspan will be placed on top of this, otherwise it will be used as-is.
* @param mdcContextToUse The MDC context to associate with the downstream call.
* @param tagAndNamingStrategy The {@link SpanNamingAndTaggingStrategy} to use when creating subspan names and
* response/error tagging for the subspan (only used if {@link #performSubSpanAroundDownstreamCalls} is true).
*/
AsyncCompletionHandlerWithTracingAndMdcSupport(
CompletableFuture<O> completableFutureResponse,
AsyncResponseHandler<O> responseHandlerFunction,
boolean performSubSpanAroundDownstreamCalls,
RequestBuilderWrapper requestBuilderWrapper,
Optional<CircuitBreaker.ManualModeTask<Response>> circuitBreakerManualTask,
Deque<Span> distributedTraceStackToUse,
Map<String, String> mdcContextToUse,
SpanNamingAndTaggingStrategy<RequestBuilderWrapper, Response, Span> tagAndNamingStrategy
) {
this.completableFutureResponse = completableFutureResponse;
this.responseHandlerFunction = responseHandlerFunction;
this.performSubSpanAroundDownstreamCalls = performSubSpanAroundDownstreamCalls;
this.circuitBreakerManualTask = circuitBreakerManualTask;
this.rbwCopyWithHttpMethodAndUrlOnly = new RequestBuilderWrapper(
requestBuilderWrapper.getUrl(),
requestBuilderWrapper.getHttpMethod(),
null,
null,
true
);
this.tagAndNamingStrategy = tagAndNamingStrategy;

// Grab the calling thread's dtrace stack and MDC info so we can set it back when this constructor completes.
Pair<Deque<Span>, Map<String, String>> originalThreadInfo = null;

try {
// Do a subspan around the downstream call if desired.
if (performSubSpanAroundDownstreamCalls) {
// Start by setting up the distributed trace stack and MDC for the call as specified in the method
// arguments, and grab the return value so we have the original calling thread's dtrace stack and
// MDC info (used to set everything back to original state when this constructor completes).
originalThreadInfo = linkTracingAndMdcToCurrentThread(distributedTraceStackToUse, mdcContextToUse);

// Then add the subspan.
String spanName = getSubspanSpanName(requestBuilderWrapper, tagAndNamingStrategy);
// Start a new child/subspan for this call if possible, falling back to a new trace (rather
// than child/subspan) if there's no current span on the thread. The
// startSpanInCurrentContext() method will do the right thing here in either case.
Tracer.getInstance().startSpanInCurrentContext(spanName, Span.SpanPurpose.CLIENT);

// Since we modified the stack/MDC we need to update the args that will be used for the downstream call.
distributedTraceStackToUse = Tracer.getInstance().getCurrentSpanStackCopy();
mdcContextToUse = MDC.getCopyOfContextMap();
}

this.distributedTraceStackToUse = distributedTraceStackToUse;
this.mdcContextToUse = mdcContextToUse;
} finally {
// Reset the tracing and MDC info to what it was when the constructor was called if we messed around with
// stuff. If originalThreadInfo is null then nothing needs to be done.
if (originalThreadInfo != null)
AsyncNettyHelper.unlinkTracingAndMdcFromCurrentThread(originalThreadInfo);
}
}

/**
* @return The span that will be used for the downstream call, or null if no span will be used.
*/
public Span getSpanForCall() {
if (distributedTraceStackToUse == null || distributedTraceStackToUse.isEmpty())
return null;

return distributedTraceStackToUse.peek();
}

/**
* Returns the name that should be used for the subspan surrounding the call. Defaults to whatever {@link
* SpanNamingAndTaggingStrategy#getInitialSpanName(Object)} returns, with a fallback
* of {@link HttpRequestTracingUtils#getFallbackSpanNameForHttpRequest(String, String)} if the naming strategy
* returned null or blank string. You can override this method to return something else if you want different
* behavior and you don't want to adjust the naming strategy or adapter.
*
* @param request The request that is about to be executed.
* @param namingStrategy The {@link SpanNamingAndTaggingStrategy} being used.
* @return The name that should be used for the subspan surrounding the call.
*/
protected @NotNull String getSubspanSpanName(
@NotNull RequestBuilderWrapper request,
@NotNull SpanNamingAndTaggingStrategy<RequestBuilderWrapper, ?, ?> namingStrategy
) {
// Try the naming strategy first.
String subspanNameFromStrategy = namingStrategy.getInitialSpanName(request);

if (StringUtils.isNotBlank(subspanNameFromStrategy)) {
return subspanNameFromStrategy;
}

// The naming strategy didn't have anything for us. Fall back to something reasonable.
return HttpRequestTracingUtils.getFallbackSpanNameForHttpRequest(
"async_downstream_call", request.httpMethod
);
}

@Override
public Response onCompleted(Response response) {
Pair<Deque<Span>, Map<String, String>> originalThreadInfo = null;

try {
// Link up the distributed tracing and MDC information to the current thread
originalThreadInfo = linkTracingAndMdcToCurrentThread(distributedTraceStackToUse, mdcContextToUse);

// Notify the circuit breaker of an event.
try {
circuitBreakerManualTask.ifPresent(cb -> cb.handleEvent(response));
} catch (Throwable t) {
logger.error(
"Circuit breaker threw an exception during handleEvent. This should never happen and means the "
+ "CircuitBreaker is malfunctioning. Ignoring exception.", t
);
}

// If a subspan was started for the downstream call, it should now be completed
if (performSubSpanAroundDownstreamCalls) {
Span spanAroundCall = Tracer.getInstance().getCurrentSpan();

// Handle the final span naming and response tagging.
tagAndNamingStrategy.handleResponseTaggingAndFinalSpanName(
spanAroundCall, rbwCopyWithHttpMethodAndUrlOnly, response, null
);

// The Span.close() method will do the right thing whether or not this is an overall request span or
// subspan.
spanAroundCall.close();
}

// If the completableFutureResponse is already done it means we were cancelled or some other error occurred,
// and we should not do any more processing here.
if (completableFutureResponse.isDone())
return response;

// Pass the response to our responseHandlerFunction to get the resulting object to complete the
// completableFutureResponse with.
try {
O responseInfo = responseHandlerFunction.handleResponse(response);
completableFutureResponse.complete(responseInfo);
} catch (Throwable throwable) {
// responseHandlerFunction threw an error. Complete completableFutureResponse exceptionally.
completableFutureResponse.completeExceptionally(throwable);
}

return response;
} finally {
AsyncNettyHelper.unlinkTracingAndMdcFromCurrentThread(originalThreadInfo);
}
}

@Override
public void onThrowable(Throwable t) {
Pair<Deque<Span>, Map<String, String>> originalThreadInfo = null;

try {
// Link up the distributed trace and MDC information to the current thread
originalThreadInfo =
linkTracingAndMdcToCurrentThread(distributedTraceStackToUse, mdcContextToUse);

// Notify the circuit breaker of an exception.
try {
circuitBreakerManualTask.ifPresent(cb -> cb.handleException(t));
} catch (Throwable cbError) {
logger.error(
"Circuit breaker threw an exception during handleException. This should never happen and means the "
+ "CircuitBreaker is malfunctioning. Ignoring exception.", cbError
);
}

// If a subspan was started for the downstream call, it should now be completed
if (performSubSpanAroundDownstreamCalls) {
Span spanAroundCall = Tracer.getInstance().getCurrentSpan();

// Handle the final span naming and response tagging.
tagAndNamingStrategy.handleResponseTaggingAndFinalSpanName(
spanAroundCall, rbwCopyWithHttpMethodAndUrlOnly, null, t
);

// The Span.close() method will do the right thing whether or not this is an overall request span or
// subspan.
spanAroundCall.close();
}

// If the completableFutureResponse is already done it means we were cancelled or some other error occurred,
// and we should not do any more processing here.
if (completableFutureResponse.isDone())
return;

// Complete the completableFutureResponse with the exception.
completableFutureResponse.completeExceptionally(t);
} finally {
AsyncNettyHelper.unlinkTracingAndMdcFromCurrentThread(originalThreadInfo);
}
}
}
Loading

0 comments on commit 03f9f45

Please sign in to comment.