Skip to content

Commit

Permalink
[#7463] Fix Reactor-Netty HTTP client plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
jaehong-kim committed Dec 16, 2020
1 parent a4246c7 commit b2a0d2c
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public String clientGet() {
return response;
}

@RequestMapping(value = "/client/get_local", method = RequestMethod.GET)
@RequestMapping(value = "/client/local", method = RequestMethod.GET)
@ResponseBody
public String clientError(HttpServletRequest request) {
HttpClient client = HttpClient.create().port(request.getLocalPort());
Expand All @@ -63,7 +63,7 @@ public String clientPost() {
return response.toString();
}

@RequestMapping(value = "/client/unknown_host", method = RequestMethod.GET)
@RequestMapping(value = "/client/unknown", method = RequestMethod.GET)
@ResponseBody
public String clientError() {
HttpClient client = HttpClient.create().port(80);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import com.navercorp.pinpoint.plugin.reactor.netty.interceptor.HttpClientOperationsOnInboundNextInterceptor;
import com.navercorp.pinpoint.plugin.reactor.netty.interceptor.HttpClientOperationsOnOutboundCompleteInterceptor;
import com.navercorp.pinpoint.plugin.reactor.netty.interceptor.HttpClientOperationsOnOutboundErrorInterceptor;
import com.navercorp.pinpoint.plugin.reactor.netty.interceptor.HttpClientOperationsSendInterceptor;
import com.navercorp.pinpoint.plugin.reactor.netty.interceptor.HttpServerHandleHttpServerStateInterceptor;
import com.navercorp.pinpoint.plugin.reactor.netty.interceptor.HttpServerHandleStateInterceptor;
import com.navercorp.pinpoint.plugin.reactor.netty.interceptor.HttpTcpClientConnectInterceptor;
Expand Down Expand Up @@ -189,6 +190,15 @@ public static class HttpClientOperationsTransform implements TransformCallback {
public byte[] doInTransform(Instrumentor instrumentor, ClassLoader loader, String className, Class<?> classBeingRedefined, ProtectionDomain protectionDomain, byte[] classfileBuffer) throws InstrumentException {
InstrumentClass target = instrumentor.getInstrumentClass(loader, className, classfileBuffer);
target.addField(AsyncContextAccessor.class);

final InstrumentMethod sendMethod = target.getDeclaredMethod("send");
if (sendMethod != null) {
sendMethod.addInterceptor(HttpClientOperationsSendInterceptor.class);
}
final InstrumentMethod sendArgMethod = target.getDeclaredMethod("send", "org.reactivestreams.Publisher");
if (sendArgMethod != null) {
sendArgMethod.addInterceptor(HttpClientOperationsSendInterceptor.class);
}
final InstrumentMethod onOutboundCompleteMethod = target.getDeclaredMethod("onOutboundComplete");
if (onOutboundCompleteMethod != null) {
onOutboundCompleteMethod.addInterceptor(HttpClientOperationsOnOutboundCompleteInterceptor.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,8 @@
import com.navercorp.pinpoint.bootstrap.context.SpanEventRecorder;
import com.navercorp.pinpoint.bootstrap.context.TraceContext;
import com.navercorp.pinpoint.bootstrap.interceptor.SpanEventSimpleAroundInterceptorForPlugin;
import com.navercorp.pinpoint.bootstrap.plugin.util.SocketAddressUtils;
import com.navercorp.pinpoint.common.plugin.util.HostAndPort;
import com.navercorp.pinpoint.common.trace.AnnotationKey;
import com.navercorp.pinpoint.plugin.reactor.netty.ReactorNettyConstants;

import java.net.InetSocketAddress;

/**
* @author jaehong.kim
*/
Expand All @@ -40,15 +35,6 @@ public HttpClientHandlerConstructorInterceptor(TraceContext traceContext, Method

@Override
public void doInBeforeTrace(SpanEventRecorder recorder, Object target, Object[] args) throws Exception {
if (args != null && args.length >= 2 && args[1] instanceof InetSocketAddress) {
final InetSocketAddress inetSocketAddress = (InetSocketAddress) args[1];
if (inetSocketAddress != null) {
final String hostName = SocketAddressUtils.getHostNameFirst(inetSocketAddress);
if (hostName != null) {
recorder.recordAttribute(AnnotationKey.HTTP_INTERNAL_DISPLAY, HostAndPort.toHostAndPortString(hostName, inetSocketAddress.getPort()));
}
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,122 +17,59 @@
package com.navercorp.pinpoint.plugin.reactor.netty.interceptor;

import com.navercorp.pinpoint.bootstrap.async.AsyncContextAccessor;
import com.navercorp.pinpoint.bootstrap.async.AsyncContextAccessorUtils;
import com.navercorp.pinpoint.bootstrap.context.AsyncContext;
import com.navercorp.pinpoint.bootstrap.context.MethodDescriptor;
import com.navercorp.pinpoint.bootstrap.context.SpanEventRecorder;
import com.navercorp.pinpoint.bootstrap.context.Trace;
import com.navercorp.pinpoint.bootstrap.context.TraceContext;
import com.navercorp.pinpoint.bootstrap.context.TraceId;
import com.navercorp.pinpoint.bootstrap.interceptor.AsyncContextSpanEventSimpleAroundInterceptor;
import com.navercorp.pinpoint.bootstrap.logging.PLogger;
import com.navercorp.pinpoint.bootstrap.logging.PLoggerFactory;
import com.navercorp.pinpoint.bootstrap.plugin.request.ClientRequestAdaptor;
import com.navercorp.pinpoint.bootstrap.plugin.request.ClientRequestRecorder;
import com.navercorp.pinpoint.bootstrap.plugin.request.ClientRequestWrapper;
import com.navercorp.pinpoint.bootstrap.plugin.request.ClientRequestWrapperAdaptor;
import com.navercorp.pinpoint.bootstrap.plugin.request.DefaultRequestTraceWriter;
import com.navercorp.pinpoint.bootstrap.plugin.request.RequestTraceWriter;
import com.navercorp.pinpoint.bootstrap.plugin.util.SocketAddressUtils;
import com.navercorp.pinpoint.common.plugin.util.HostAndPort;
import com.navercorp.pinpoint.common.trace.AnnotationKey;
import com.navercorp.pinpoint.plugin.reactor.netty.ReactorNettyConstants;
import com.navercorp.pinpoint.plugin.reactor.netty.ReactorNettyPluginConfig;
import reactor.netty.http.client.HttpClientRequest;
import reactor.netty.channel.ChannelOperations;

import java.net.InetSocketAddress;

/**
* @author jaehong.kim
*/
public class HttpClientHandlerRequestWithBodyInterceptor extends AsyncContextSpanEventSimpleAroundInterceptor {
private final PLogger logger = PLoggerFactory.getLogger(this.getClass());
private final boolean isDebug = logger.isDebugEnabled();

private final ClientRequestRecorder<ClientRequestWrapper> clientRequestRecorder;
private final RequestTraceWriter<HttpClientRequest> requestTraceWriter;

public HttpClientHandlerRequestWithBodyInterceptor(TraceContext traceContext, MethodDescriptor methodDescriptor) {
super(traceContext, methodDescriptor);

final ReactorNettyPluginConfig config = new ReactorNettyPluginConfig(traceContext.getProfilerConfig());
final boolean param = config.isParam();
final ClientRequestAdaptor<ClientRequestWrapper> clientRequestAdaptor = ClientRequestWrapperAdaptor.INSTANCE;
this.clientRequestRecorder = new ClientRequestRecorder<ClientRequestWrapper>(param, clientRequestAdaptor);
final HttpClientRequestHeaderAdaptor clientHeaderAdaptor = new HttpClientRequestHeaderAdaptor();
this.requestTraceWriter = new DefaultRequestTraceWriter<HttpClientRequest>(clientHeaderAdaptor, traceContext);
}

// BEFORE
@Override
public AsyncContext getAsyncContext(Object target, Object[] args) {
if (Boolean.FALSE == validate(args)) {
return null;
}

final HttpClientRequest request = (HttpClientRequest) args[0];
final AsyncContext asyncContext = AsyncContextAccessorUtils.getAsyncContext(target);
if (asyncContext == null) {
// Set sampling rate to false
this.requestTraceWriter.write(request);
return null;
}
return asyncContext;
}

@Override
public void doInBeforeTrace(SpanEventRecorder recorder, AsyncContext asyncContext, Object target, Object[] args) {
final Trace trace = asyncContext.currentAsyncTraceObject();
if (trace == null) {
if (logger.isWarnEnabled()) {
logger.warn("Unexpected error, Current async trace is null");
}
if (args == null || args.length < 1) {
// Skip
return;
}
final TraceId nextId = trace.getTraceId().getNextTraceId();
recorder.recordNextSpanId(nextId.getSpanId());
recorder.recordServiceType(ReactorNettyConstants.REACTOR_NETTY_CLIENT);

final HttpClientRequest request = (HttpClientRequest) args[0];
final ClientRequestWrapper clientRequestWrapper = new HttpClientRequestWrapper(request);
this.requestTraceWriter.write(request, nextId, clientRequestWrapper.getDestinationId());

// Set HttpClientOptions
if (request instanceof AsyncContextAccessor) {
((AsyncContextAccessor) request)._$PINPOINT$_setAsyncContext(asyncContext);
if (args[0] instanceof AsyncContextAccessor) {
((AsyncContextAccessor) args[0])._$PINPOINT$_setAsyncContext(asyncContext);
}
}

// AFTER
@Override
public AsyncContext getAsyncContext(Object target, Object[] args, Object result, Throwable throwable) {
if (Boolean.FALSE == validate(args)) {
return null;
// Set hostname
if (args[0] instanceof ChannelOperations) {
try {
final ChannelOperations channelOperations = (ChannelOperations) args[0];
final InetSocketAddress inetSocketAddress = (InetSocketAddress) channelOperations.channel().remoteAddress();
if (inetSocketAddress != null) {
final String hostName = SocketAddressUtils.getHostNameFirst(inetSocketAddress);
if (hostName != null) {
recorder.recordAttribute(AnnotationKey.HTTP_INTERNAL_DISPLAY, HostAndPort.toHostAndPortString(hostName, inetSocketAddress.getPort()));
}
}
} catch (Exception ignored) {
}
}

return AsyncContextAccessorUtils.getAsyncContext(target);
}

@Override
public void doInAfterTrace(SpanEventRecorder recorder, Object target, Object[] args, Object result, Throwable throwable) {
recorder.recordApi(methodDescriptor);
recorder.recordException(throwable);

final HttpClientRequest request = (HttpClientRequest) args[0];
final ClientRequestWrapper clientRequestWrapper = new HttpClientRequestWrapper(request);
this.clientRequestRecorder.record(recorder, clientRequestWrapper, throwable);
}

private boolean validate(final Object[] args) {
if (args == null || args.length < 1) {
if (isDebug) {
logger.debug("Invalid args object. args={}.", args);
}
return false;
}

if (!(args[0] instanceof HttpClientRequest)) {
if (isDebug) {
logger.debug("Invalid args[0] object. Need ClientHttpRequest, args[0]={}.", args[0]);
}
return false;
}

return true;
recorder.recordServiceType(ReactorNettyConstants.REACTOR_NETTY_CLIENT_INTERNAL);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Copyright 2020 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.navercorp.pinpoint.plugin.reactor.netty.interceptor;

import com.navercorp.pinpoint.bootstrap.async.AsyncContextAccessorUtils;
import com.navercorp.pinpoint.bootstrap.context.AsyncContext;
import com.navercorp.pinpoint.bootstrap.context.MethodDescriptor;
import com.navercorp.pinpoint.bootstrap.context.SpanEventRecorder;
import com.navercorp.pinpoint.bootstrap.context.Trace;
import com.navercorp.pinpoint.bootstrap.context.TraceContext;
import com.navercorp.pinpoint.bootstrap.context.TraceId;
import com.navercorp.pinpoint.bootstrap.interceptor.AsyncContextSpanEventSimpleAroundInterceptor;
import com.navercorp.pinpoint.bootstrap.plugin.request.ClientRequestAdaptor;
import com.navercorp.pinpoint.bootstrap.plugin.request.ClientRequestRecorder;
import com.navercorp.pinpoint.bootstrap.plugin.request.ClientRequestWrapper;
import com.navercorp.pinpoint.bootstrap.plugin.request.ClientRequestWrapperAdaptor;
import com.navercorp.pinpoint.bootstrap.plugin.request.DefaultRequestTraceWriter;
import com.navercorp.pinpoint.bootstrap.plugin.request.RequestTraceWriter;
import com.navercorp.pinpoint.plugin.reactor.netty.ReactorNettyConstants;
import com.navercorp.pinpoint.plugin.reactor.netty.ReactorNettyPluginConfig;
import reactor.netty.http.client.HttpClientRequest;

/**
* @author jaehong.kim
*/
public class HttpClientOperationsSendInterceptor extends AsyncContextSpanEventSimpleAroundInterceptor {
private final ClientRequestRecorder<ClientRequestWrapper> clientRequestRecorder;
private final RequestTraceWriter<HttpClientRequest> requestTraceWriter;

public HttpClientOperationsSendInterceptor(TraceContext traceContext, MethodDescriptor methodDescriptor) {
super(traceContext, methodDescriptor);

final ReactorNettyPluginConfig config = new ReactorNettyPluginConfig(traceContext.getProfilerConfig());
final boolean param = config.isParam();
final ClientRequestAdaptor<ClientRequestWrapper> clientRequestAdaptor = ClientRequestWrapperAdaptor.INSTANCE;
this.clientRequestRecorder = new ClientRequestRecorder<ClientRequestWrapper>(param, clientRequestAdaptor);
final HttpClientRequestHeaderAdaptor clientHeaderAdaptor = new HttpClientRequestHeaderAdaptor();
this.requestTraceWriter = new DefaultRequestTraceWriter<HttpClientRequest>(clientHeaderAdaptor, traceContext);
}

// BEFORE
@Override
public AsyncContext getAsyncContext(Object target, Object[] args) {
if (Boolean.FALSE == validate(target)) {
return null;
}

final HttpClientRequest request = (HttpClientRequest) target;
final AsyncContext asyncContext = AsyncContextAccessorUtils.getAsyncContext(target);
if (asyncContext == null) {
// Set sampling rate to false
this.requestTraceWriter.write(request);
return null;
}
return asyncContext;
}

@Override
public void doInBeforeTrace(SpanEventRecorder recorder, AsyncContext asyncContext, Object target, Object[] args) {
final Trace trace = asyncContext.currentAsyncTraceObject();
if (trace == null) {
if (logger.isWarnEnabled()) {
logger.warn("Unexpected error, Current async trace is null");
}
return;
}
final TraceId nextId = trace.getTraceId().getNextTraceId();
recorder.recordNextSpanId(nextId.getSpanId());
recorder.recordServiceType(ReactorNettyConstants.REACTOR_NETTY_CLIENT);

final HttpClientRequest request = (HttpClientRequest) target;
final ClientRequestWrapper clientRequestWrapper = new HttpClientRequestWrapper(request);
this.requestTraceWriter.write(request, nextId, clientRequestWrapper.getDestinationId());
}

// AFTER
@Override
public AsyncContext getAsyncContext(Object target, Object[] args, Object result, Throwable throwable) {
if (Boolean.FALSE == validate(target)) {
return null;
}

return AsyncContextAccessorUtils.getAsyncContext(target);
}

@Override
public void doInAfterTrace(SpanEventRecorder recorder, Object target, Object[] args, Object result, Throwable throwable) {
recorder.recordApi(methodDescriptor);
recorder.recordException(throwable);

final HttpClientRequest request = (HttpClientRequest) target;
final ClientRequestWrapper clientRequestWrapper = new HttpClientRequestWrapper(request);
this.clientRequestRecorder.record(recorder, clientRequestWrapper, throwable);
}

private boolean validate(final Object target) {
if (Boolean.FALSE == (target instanceof HttpClientRequest)) {
if (isDebug) {
logger.debug("Invalid target object. Need ClientHttpRequest, target={}.", target);
}
return false;
}

return true;
}
}

0 comments on commit b2a0d2c

Please sign in to comment.