Skip to content

Commit

Permalink
Fix the thread safety bug of finishing operation for the span named "…
Browse files Browse the repository at this point in the history
…SpringCloudGateway/sendRequest" (#555)
  • Loading branch information
gzlicanyi authored Jun 16, 2023
1 parent 35388ac commit 7af3f06
Show file tree
Hide file tree
Showing 6 changed files with 181 additions and 29 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Release Notes.
* Support Jetty 11.x plugin
* Fix the scenario of using the HBase plugin with spring-data-hadoop.
* Add RocketMQ 5.x plugin
* Fix the thread safety bug of finishing operation for the span named "SpringCloudGateway/sendRequest"

#### Documentation

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,16 @@
import java.net.URL;
import java.util.function.BiConsumer;
import java.util.function.Function;

import org.apache.skywalking.apm.agent.core.context.CarrierItem;
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.v2.InstanceMethodsAroundInterceptorV2;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.v2.MethodInvocationContext;
import org.apache.skywalking.apm.plugin.spring.cloud.gateway.v20x.define.EnhanceCacheObject;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
Expand All @@ -38,13 +39,14 @@

import static org.apache.skywalking.apm.network.trace.component.ComponentsDefine.SPRING_CLOUD_GATEWAY;

public class HttpClientRequestInterceptor implements InstanceMethodsAroundInterceptor {
public class HttpClientRequestInterceptor implements InstanceMethodsAroundInterceptorV2 {

@Override
public void beforeMethod(final EnhancedInstance objInst,
final Method method,
final Object[] allArguments,
final Class<?>[] argumentsTypes,
final MethodInterceptResult result) throws Throwable {
final MethodInvocationContext context) throws Throwable {

/*
In this plug-in, the HttpClientFinalizerSendInterceptor depends on the NettyRoutingFilterInterceptor
Expand All @@ -54,13 +56,13 @@ public void beforeMethod(final EnhancedInstance objInst,
if (!ContextManager.isActive()) {
return;
}

AbstractSpan span = ContextManager.activeSpan();

URL url = new URL((String) allArguments[1]);
ContextCarrier contextCarrier = new ContextCarrier();
AbstractSpan abstractSpan = ContextManager.createExitSpan(
"SpringCloudGateway/sendRequest", contextCarrier, getPeer(url));
"SpringCloudGateway/sendRequest", contextCarrier, getPeer(url));
abstractSpan.prepareForAsync();
Tags.URL.set(abstractSpan, String.valueOf(allArguments[1]));
abstractSpan.setLayer(SpanLayer.HTTP);
Expand All @@ -80,36 +82,53 @@ public Publisher<Void> apply(final HttpClientRequest httpClientRequest) {
}
};

objInst.setSkyWalkingDynamicField(new EnhanceCacheObject(span, abstractSpan));
context.setContext(new EnhanceCacheObject(span, abstractSpan));
}

@Override
public Object afterMethod(final EnhancedInstance objInst,
final Method method,
final Object[] allArguments,
final Class<?>[] argumentsTypes,
final Object ret) {
EnhanceCacheObject enhanceCacheObject = (EnhanceCacheObject) objInst.getSkyWalkingDynamicField();
final Object ret,
MethodInvocationContext context) {
EnhanceCacheObject enhanceCacheObject = (EnhanceCacheObject) context.getContext();
Mono<HttpClientResponse> responseMono = (Mono<HttpClientResponse>) ret;
return responseMono.doAfterSuccessOrError(new BiConsumer<HttpClientResponse, Throwable>() {
@Override
public void accept(final HttpClientResponse httpClientResponse, final Throwable throwable) {
doAfterSuccessOrError(httpClientResponse, throwable, enhanceCacheObject);
}
});
}

AbstractSpan abstractSpan = enhanceCacheObject.getSendSpan();
if (abstractSpan != null) {
if (throwable != null) {
abstractSpan.log(throwable);
} else if (httpClientResponse.status().code() > 400) {
abstractSpan.errorOccurred();
}
Tags.HTTP_RESPONSE_STATUS_CODE.set(abstractSpan, httpClientResponse.status().code());
abstractSpan.asyncFinish();
}
void doAfterSuccessOrError(HttpClientResponse httpClientResponse, Throwable throwable, EnhanceCacheObject enhanceCacheObject) {
try {
//When executing the beforeMethod method, if the ContextManager is inactive, the enhanceCacheObject will be null.
if (enhanceCacheObject == null) {
return;
}

objInst.setSkyWalkingDynamicField(null);
enhanceCacheObject.getFilterSpan().asyncFinish();
//The doAfterSuccessOrError method may be executed multiple times.
if (enhanceCacheObject.isSpanFinish()) {
return;
}
});

AbstractSpan abstractSpan = enhanceCacheObject.getSendSpan();
if (throwable != null) {
abstractSpan.log(throwable);
} else if (httpClientResponse.status().code() > 400) {
abstractSpan.errorOccurred();
}
Tags.HTTP_RESPONSE_STATUS_CODE.set(abstractSpan, httpClientResponse.status().code());

abstractSpan.asyncFinish();
enhanceCacheObject.getFilterSpan().asyncFinish();

enhanceCacheObject.setSpanFinish(true);
} catch (Throwable e) {
//Catch unknown exceptions to avoid interrupting business processes.
}
}

private String getPeer(URL url) {
Expand All @@ -121,7 +140,8 @@ public void handleMethodException(final EnhancedInstance objInst,
final Method method,
final Object[] allArguments,
final Class<?>[] argumentsTypes,
final Throwable t) {
final Throwable t,
MethodInvocationContext context) {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.skywalking.apm.plugin.spring.cloud.gateway.v20x.define;

import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.v2.ClassInstanceMethodsEnhancePluginDefineV2;

public abstract class AbstractGateway200EnhancePluginDefineV2 extends ClassInstanceMethodsEnhancePluginDefineV2 {

@Override
protected String[] witnessClasses() {
return new String[] {
"org.springframework.cloud.gateway.config.GatewayAutoConfiguration$1"
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
public class EnhanceCacheObject {
private final AbstractSpan filterSpan;
private final AbstractSpan sendSpan;
private volatile boolean spanFinish = false;

public EnhanceCacheObject(final AbstractSpan filterSpan, final AbstractSpan sendSpan) {
this.filterSpan = filterSpan;
Expand All @@ -35,4 +36,12 @@ public AbstractSpan getFilterSpan() {
public AbstractSpan getSendSpan() {
return sendSpan;
}

public boolean isSpanFinish() {
return spanFinish;
}

public void setSpanFinish(boolean spanFinish) {
this.spanFinish = spanFinish;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.v2.InstanceMethodsInterceptV2Point;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;

import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;

public class HttpClientInstrumentation extends AbstractGateway200EnhancePluginDefine {
public class HttpClientInstrumentation extends AbstractGateway200EnhancePluginDefineV2 {

@Override
protected ClassMatch enhanceClass() {
Expand All @@ -39,16 +39,16 @@ public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
}

@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[] {
new InstanceMethodsInterceptPoint() {
public InstanceMethodsInterceptV2Point[] getInstanceMethodsInterceptV2Points() {
return new InstanceMethodsInterceptV2Point[] {
new InstanceMethodsInterceptV2Point() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named("request");
}

@Override
public String getMethodsInterceptor() {
public String getMethodsInterceptorV2() {
return Constants.REQUEST_INTERCEPTOR;
}

Expand All @@ -59,4 +59,5 @@ public boolean isOverrideArgs() {
}
};
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.skywalking.apm.plugin.spring.cloud.gateway.v20x;

import io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule;
import org.apache.skywalking.apm.agent.test.tools.SegmentStorage;
import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint;
import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner;
import org.apache.skywalking.apm.plugin.spring.cloud.gateway.v20x.define.EnhanceCacheObject;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
import reactor.ipc.netty.http.client.HttpClientResponse;

import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;

@RunWith(TracingSegmentRunner.class)
public class HttpClientRequestInterceptorTest {

private HttpClientRequestInterceptor httpClientRequestInterceptor = new HttpClientRequestInterceptor();

@Rule
public AgentServiceRule serviceRule = new AgentServiceRule();
@Rule
public MockitoRule rule = MockitoJUnit.rule();

@SegmentStoragePoint
private SegmentStorage segmentStorage;

private HttpClientResponse httpClientResponse;

@Before
public void setUp() throws Exception {

httpClientResponse = Mockito.mock(HttpClientResponse.class);
HttpResponseStatus httpResponseStatus = Mockito.mock(HttpResponseStatus.class);

Mockito.when(httpResponseStatus.code()).thenReturn(200);
Mockito.when(httpClientResponse.status()).thenReturn(httpResponseStatus);
}

@Test
public void testDoAfterSuccessOrError() {
AbstractSpan filterSpan = ContextManager.createLocalSpan("mockFilterSpan");
filterSpan.prepareForAsync();
ContextManager.stopSpan(filterSpan);

AbstractSpan sendSpan = ContextManager.createExitSpan("SpringCloudGateway/sendRequest", "http://127.0.0.1:80");
sendSpan.prepareForAsync();
ContextManager.stopSpan(sendSpan);

EnhanceCacheObject enhanceCacheObject = new EnhanceCacheObject(filterSpan, sendSpan);
enhanceCacheObject = spy(enhanceCacheObject);

//Test the ContextManager is inactive.
httpClientRequestInterceptor.doAfterSuccessOrError(httpClientResponse, null, null);
verify(enhanceCacheObject, Mockito.times(0)).setSpanFinish(true);

//Test normal scenario.
httpClientRequestInterceptor.doAfterSuccessOrError(httpClientResponse, null, enhanceCacheObject);
verify(enhanceCacheObject, Mockito.times(1)).setSpanFinish(true);

//Test the doAfterSuccessOrError method is executed multiple times.
httpClientRequestInterceptor.doAfterSuccessOrError(httpClientResponse, null, enhanceCacheObject);
verify(enhanceCacheObject, Mockito.times(1)).setSpanFinish(true);
}

}

0 comments on commit 7af3f06

Please sign in to comment.