Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#7463] Fix Reactor-Netty HTTP client plugin #7497

Merged
merged 1 commit into from
Dec 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public String clientGet() {
return response;
}

@RequestMapping(value = "/client/get_local", method = RequestMethod.GET)
@RequestMapping(value = "/client/local", method = RequestMethod.GET)
@ResponseBody
public String clientError(HttpServletRequest request) {
HttpClient client = HttpClient.create().port(request.getLocalPort());
Expand All @@ -63,7 +63,7 @@ public String clientPost() {
return response.toString();
}

@RequestMapping(value = "/client/unknown_host", method = RequestMethod.GET)
@RequestMapping(value = "/client/unknown", method = RequestMethod.GET)
@ResponseBody
public String clientError() {
HttpClient client = HttpClient.create().port(80);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import com.navercorp.pinpoint.plugin.reactor.netty.interceptor.HttpClientOperationsOnInboundNextInterceptor;
import com.navercorp.pinpoint.plugin.reactor.netty.interceptor.HttpClientOperationsOnOutboundCompleteInterceptor;
import com.navercorp.pinpoint.plugin.reactor.netty.interceptor.HttpClientOperationsOnOutboundErrorInterceptor;
import com.navercorp.pinpoint.plugin.reactor.netty.interceptor.HttpClientOperationsSendInterceptor;
import com.navercorp.pinpoint.plugin.reactor.netty.interceptor.HttpServerHandleHttpServerStateInterceptor;
import com.navercorp.pinpoint.plugin.reactor.netty.interceptor.HttpServerHandleStateInterceptor;
import com.navercorp.pinpoint.plugin.reactor.netty.interceptor.HttpTcpClientConnectInterceptor;
Expand Down Expand Up @@ -189,6 +190,15 @@ public static class HttpClientOperationsTransform implements TransformCallback {
public byte[] doInTransform(Instrumentor instrumentor, ClassLoader loader, String className, Class<?> classBeingRedefined, ProtectionDomain protectionDomain, byte[] classfileBuffer) throws InstrumentException {
InstrumentClass target = instrumentor.getInstrumentClass(loader, className, classfileBuffer);
target.addField(AsyncContextAccessor.class);

final InstrumentMethod sendMethod = target.getDeclaredMethod("send");
if (sendMethod != null) {
sendMethod.addInterceptor(HttpClientOperationsSendInterceptor.class);
}
final InstrumentMethod sendArgMethod = target.getDeclaredMethod("send", "org.reactivestreams.Publisher");
if (sendArgMethod != null) {
sendArgMethod.addInterceptor(HttpClientOperationsSendInterceptor.class);
}
final InstrumentMethod onOutboundCompleteMethod = target.getDeclaredMethod("onOutboundComplete");
if (onOutboundCompleteMethod != null) {
onOutboundCompleteMethod.addInterceptor(HttpClientOperationsOnOutboundCompleteInterceptor.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,8 @@
import com.navercorp.pinpoint.bootstrap.context.SpanEventRecorder;
import com.navercorp.pinpoint.bootstrap.context.TraceContext;
import com.navercorp.pinpoint.bootstrap.interceptor.SpanEventSimpleAroundInterceptorForPlugin;
import com.navercorp.pinpoint.bootstrap.plugin.util.SocketAddressUtils;
import com.navercorp.pinpoint.common.plugin.util.HostAndPort;
import com.navercorp.pinpoint.common.trace.AnnotationKey;
import com.navercorp.pinpoint.plugin.reactor.netty.ReactorNettyConstants;

import java.net.InetSocketAddress;

/**
* @author jaehong.kim
*/
Expand All @@ -40,15 +35,6 @@ public HttpClientHandlerConstructorInterceptor(TraceContext traceContext, Method

@Override
public void doInBeforeTrace(SpanEventRecorder recorder, Object target, Object[] args) throws Exception {
if (args != null && args.length >= 2 && args[1] instanceof InetSocketAddress) {
final InetSocketAddress inetSocketAddress = (InetSocketAddress) args[1];
if (inetSocketAddress != null) {
final String hostName = SocketAddressUtils.getHostNameFirst(inetSocketAddress);
if (hostName != null) {
recorder.recordAttribute(AnnotationKey.HTTP_INTERNAL_DISPLAY, HostAndPort.toHostAndPortString(hostName, inetSocketAddress.getPort()));
}
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,122 +17,59 @@
package com.navercorp.pinpoint.plugin.reactor.netty.interceptor;

import com.navercorp.pinpoint.bootstrap.async.AsyncContextAccessor;
import com.navercorp.pinpoint.bootstrap.async.AsyncContextAccessorUtils;
import com.navercorp.pinpoint.bootstrap.context.AsyncContext;
import com.navercorp.pinpoint.bootstrap.context.MethodDescriptor;
import com.navercorp.pinpoint.bootstrap.context.SpanEventRecorder;
import com.navercorp.pinpoint.bootstrap.context.Trace;
import com.navercorp.pinpoint.bootstrap.context.TraceContext;
import com.navercorp.pinpoint.bootstrap.context.TraceId;
import com.navercorp.pinpoint.bootstrap.interceptor.AsyncContextSpanEventSimpleAroundInterceptor;
import com.navercorp.pinpoint.bootstrap.logging.PLogger;
import com.navercorp.pinpoint.bootstrap.logging.PLoggerFactory;
import com.navercorp.pinpoint.bootstrap.plugin.request.ClientRequestAdaptor;
import com.navercorp.pinpoint.bootstrap.plugin.request.ClientRequestRecorder;
import com.navercorp.pinpoint.bootstrap.plugin.request.ClientRequestWrapper;
import com.navercorp.pinpoint.bootstrap.plugin.request.ClientRequestWrapperAdaptor;
import com.navercorp.pinpoint.bootstrap.plugin.request.DefaultRequestTraceWriter;
import com.navercorp.pinpoint.bootstrap.plugin.request.RequestTraceWriter;
import com.navercorp.pinpoint.bootstrap.plugin.util.SocketAddressUtils;
import com.navercorp.pinpoint.common.plugin.util.HostAndPort;
import com.navercorp.pinpoint.common.trace.AnnotationKey;
import com.navercorp.pinpoint.plugin.reactor.netty.ReactorNettyConstants;
import com.navercorp.pinpoint.plugin.reactor.netty.ReactorNettyPluginConfig;
import reactor.netty.http.client.HttpClientRequest;
import reactor.netty.channel.ChannelOperations;

import java.net.InetSocketAddress;

/**
* @author jaehong.kim
*/
public class HttpClientHandlerRequestWithBodyInterceptor extends AsyncContextSpanEventSimpleAroundInterceptor {
private final PLogger logger = PLoggerFactory.getLogger(this.getClass());
private final boolean isDebug = logger.isDebugEnabled();

private final ClientRequestRecorder<ClientRequestWrapper> clientRequestRecorder;
private final RequestTraceWriter<HttpClientRequest> requestTraceWriter;

public HttpClientHandlerRequestWithBodyInterceptor(TraceContext traceContext, MethodDescriptor methodDescriptor) {
super(traceContext, methodDescriptor);

final ReactorNettyPluginConfig config = new ReactorNettyPluginConfig(traceContext.getProfilerConfig());
final boolean param = config.isParam();
final ClientRequestAdaptor<ClientRequestWrapper> clientRequestAdaptor = ClientRequestWrapperAdaptor.INSTANCE;
this.clientRequestRecorder = new ClientRequestRecorder<ClientRequestWrapper>(param, clientRequestAdaptor);
final HttpClientRequestHeaderAdaptor clientHeaderAdaptor = new HttpClientRequestHeaderAdaptor();
this.requestTraceWriter = new DefaultRequestTraceWriter<HttpClientRequest>(clientHeaderAdaptor, traceContext);
}

// BEFORE
@Override
public AsyncContext getAsyncContext(Object target, Object[] args) {
if (Boolean.FALSE == validate(args)) {
return null;
}

final HttpClientRequest request = (HttpClientRequest) args[0];
final AsyncContext asyncContext = AsyncContextAccessorUtils.getAsyncContext(target);
if (asyncContext == null) {
// Set sampling rate to false
this.requestTraceWriter.write(request);
return null;
}
return asyncContext;
}

@Override
public void doInBeforeTrace(SpanEventRecorder recorder, AsyncContext asyncContext, Object target, Object[] args) {
final Trace trace = asyncContext.currentAsyncTraceObject();
if (trace == null) {
if (logger.isWarnEnabled()) {
logger.warn("Unexpected error, Current async trace is null");
}
if (args == null || args.length < 1) {
// Skip
return;
}
final TraceId nextId = trace.getTraceId().getNextTraceId();
recorder.recordNextSpanId(nextId.getSpanId());
recorder.recordServiceType(ReactorNettyConstants.REACTOR_NETTY_CLIENT);

final HttpClientRequest request = (HttpClientRequest) args[0];
final ClientRequestWrapper clientRequestWrapper = new HttpClientRequestWrapper(request);
this.requestTraceWriter.write(request, nextId, clientRequestWrapper.getDestinationId());

// Set HttpClientOptions
if (request instanceof AsyncContextAccessor) {
((AsyncContextAccessor) request)._$PINPOINT$_setAsyncContext(asyncContext);
if (args[0] instanceof AsyncContextAccessor) {
((AsyncContextAccessor) args[0])._$PINPOINT$_setAsyncContext(asyncContext);
}
}

// AFTER
@Override
public AsyncContext getAsyncContext(Object target, Object[] args, Object result, Throwable throwable) {
if (Boolean.FALSE == validate(args)) {
return null;
// Set hostname
if (args[0] instanceof ChannelOperations) {
try {
final ChannelOperations channelOperations = (ChannelOperations) args[0];
final InetSocketAddress inetSocketAddress = (InetSocketAddress) channelOperations.channel().remoteAddress();
if (inetSocketAddress != null) {
final String hostName = SocketAddressUtils.getHostNameFirst(inetSocketAddress);
if (hostName != null) {
recorder.recordAttribute(AnnotationKey.HTTP_INTERNAL_DISPLAY, HostAndPort.toHostAndPortString(hostName, inetSocketAddress.getPort()));
}
}
} catch (Exception ignored) {
}
}

return AsyncContextAccessorUtils.getAsyncContext(target);
}

@Override
public void doInAfterTrace(SpanEventRecorder recorder, Object target, Object[] args, Object result, Throwable throwable) {
recorder.recordApi(methodDescriptor);
recorder.recordException(throwable);

final HttpClientRequest request = (HttpClientRequest) args[0];
final ClientRequestWrapper clientRequestWrapper = new HttpClientRequestWrapper(request);
this.clientRequestRecorder.record(recorder, clientRequestWrapper, throwable);
}

private boolean validate(final Object[] args) {
if (args == null || args.length < 1) {
if (isDebug) {
logger.debug("Invalid args object. args={}.", args);
}
return false;
}

if (!(args[0] instanceof HttpClientRequest)) {
if (isDebug) {
logger.debug("Invalid args[0] object. Need ClientHttpRequest, args[0]={}.", args[0]);
}
return false;
}

return true;
recorder.recordServiceType(ReactorNettyConstants.REACTOR_NETTY_CLIENT_INTERNAL);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Copyright 2020 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.navercorp.pinpoint.plugin.reactor.netty.interceptor;

import com.navercorp.pinpoint.bootstrap.async.AsyncContextAccessorUtils;
import com.navercorp.pinpoint.bootstrap.context.AsyncContext;
import com.navercorp.pinpoint.bootstrap.context.MethodDescriptor;
import com.navercorp.pinpoint.bootstrap.context.SpanEventRecorder;
import com.navercorp.pinpoint.bootstrap.context.Trace;
import com.navercorp.pinpoint.bootstrap.context.TraceContext;
import com.navercorp.pinpoint.bootstrap.context.TraceId;
import com.navercorp.pinpoint.bootstrap.interceptor.AsyncContextSpanEventSimpleAroundInterceptor;
import com.navercorp.pinpoint.bootstrap.plugin.request.ClientRequestAdaptor;
import com.navercorp.pinpoint.bootstrap.plugin.request.ClientRequestRecorder;
import com.navercorp.pinpoint.bootstrap.plugin.request.ClientRequestWrapper;
import com.navercorp.pinpoint.bootstrap.plugin.request.ClientRequestWrapperAdaptor;
import com.navercorp.pinpoint.bootstrap.plugin.request.DefaultRequestTraceWriter;
import com.navercorp.pinpoint.bootstrap.plugin.request.RequestTraceWriter;
import com.navercorp.pinpoint.plugin.reactor.netty.ReactorNettyConstants;
import com.navercorp.pinpoint.plugin.reactor.netty.ReactorNettyPluginConfig;
import reactor.netty.http.client.HttpClientRequest;

/**
* @author jaehong.kim
*/
public class HttpClientOperationsSendInterceptor extends AsyncContextSpanEventSimpleAroundInterceptor {
private final ClientRequestRecorder<ClientRequestWrapper> clientRequestRecorder;
private final RequestTraceWriter<HttpClientRequest> requestTraceWriter;

public HttpClientOperationsSendInterceptor(TraceContext traceContext, MethodDescriptor methodDescriptor) {
super(traceContext, methodDescriptor);

final ReactorNettyPluginConfig config = new ReactorNettyPluginConfig(traceContext.getProfilerConfig());
final boolean param = config.isParam();
final ClientRequestAdaptor<ClientRequestWrapper> clientRequestAdaptor = ClientRequestWrapperAdaptor.INSTANCE;
this.clientRequestRecorder = new ClientRequestRecorder<ClientRequestWrapper>(param, clientRequestAdaptor);
final HttpClientRequestHeaderAdaptor clientHeaderAdaptor = new HttpClientRequestHeaderAdaptor();
this.requestTraceWriter = new DefaultRequestTraceWriter<HttpClientRequest>(clientHeaderAdaptor, traceContext);
}

// BEFORE
@Override
public AsyncContext getAsyncContext(Object target, Object[] args) {
if (Boolean.FALSE == validate(target)) {
return null;
}

final HttpClientRequest request = (HttpClientRequest) target;
final AsyncContext asyncContext = AsyncContextAccessorUtils.getAsyncContext(target);
if (asyncContext == null) {
// Set sampling rate to false
this.requestTraceWriter.write(request);
return null;
}
return asyncContext;
}

@Override
public void doInBeforeTrace(SpanEventRecorder recorder, AsyncContext asyncContext, Object target, Object[] args) {
final Trace trace = asyncContext.currentAsyncTraceObject();
if (trace == null) {
if (logger.isWarnEnabled()) {
logger.warn("Unexpected error, Current async trace is null");
}
return;
}
final TraceId nextId = trace.getTraceId().getNextTraceId();
recorder.recordNextSpanId(nextId.getSpanId());
recorder.recordServiceType(ReactorNettyConstants.REACTOR_NETTY_CLIENT);

final HttpClientRequest request = (HttpClientRequest) target;
final ClientRequestWrapper clientRequestWrapper = new HttpClientRequestWrapper(request);
this.requestTraceWriter.write(request, nextId, clientRequestWrapper.getDestinationId());
}

// AFTER
@Override
public AsyncContext getAsyncContext(Object target, Object[] args, Object result, Throwable throwable) {
if (Boolean.FALSE == validate(target)) {
return null;
}

return AsyncContextAccessorUtils.getAsyncContext(target);
}

@Override
public void doInAfterTrace(SpanEventRecorder recorder, Object target, Object[] args, Object result, Throwable throwable) {
recorder.recordApi(methodDescriptor);
recorder.recordException(throwable);

final HttpClientRequest request = (HttpClientRequest) target;
final ClientRequestWrapper clientRequestWrapper = new HttpClientRequestWrapper(request);
this.clientRequestRecorder.record(recorder, clientRequestWrapper, throwable);
}

private boolean validate(final Object target) {
if (Boolean.FALSE == (target instanceof HttpClientRequest)) {
if (isDebug) {
logger.debug("Invalid target object. Need ClientHttpRequest, target={}.", target);
}
return false;
}

return true;
}
}