Skip to content

Commit

Permalink
Fix retry loop issue
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzhiguo committed May 9, 2024
1 parent 2e0edf9 commit 633eb46
Show file tree
Hide file tree
Showing 10 changed files with 138 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,11 @@ protected Supplier<Response> createRetrySupplier(MethodContext ctx) {
* attempt. Returns null if the response is null or if no retry is
* performed and the initial attempt fails.
*/
protected Object invokeWithRetry(O invocation, MethodContext ctx) {
protected Response invokeWithRetry(O invocation, MethodContext ctx) {
Supplier<Response> retrySupplier = createRetrySupplier(ctx);
ServicePolicy servicePolicy = invocation == null ? null : invocation.getServiceMetadata().getServicePolicy();
RetryPolicy retryPolicy = servicePolicy == null ? null : servicePolicy.getRetryPolicy();
Response response = null;
if (retryPolicy != null && retryPolicy.isEnabled()) {
RetrierFactory retrierFactory = context.getOrDefaultRetrierFactory(retryPolicy.getType());
Retrier retrier = retrierFactory == null ? null : retrierFactory.get(retryPolicy);
Expand All @@ -221,11 +222,12 @@ protected Object invokeWithRetry(O invocation, MethodContext ctx) {
} else {
RequestContext.removeAttribute(Carrier.ATTRIBUTE_DEADLINE);
}
return retrier.execute(retrySupplier);
response = retrier.execute(retrySupplier);
}
} else {
response = retrySupplier.get();
}
Response response = retrySupplier.get();
return response == null ? null : response.getResponse();
return response;
}
}

Expand Down Expand Up @@ -285,7 +287,7 @@ protected O routing(R request, List<? extends Endpoint> instances) {
*
* @param invocation the OutboundInvocation to which the filters will be applied
* @return A list of endpoints that have been determined as suitable targets for the invocation after
* applying the route filters.
* applying the route filters.
*/
protected List<? extends Endpoint> routing(O invocation) {
RouteFilterChain.Chain chain = new RouteFilterChain.Chain(routeFilters);
Expand Down Expand Up @@ -545,7 +547,7 @@ protected OutboundInvocation<? extends O> routing(I request) {
*
* @param invocation The OutboundInvocation to which the route filters will be applied.
* @return A list of endpoints that have been determined as suitable targets for the invocation after applying the
* route filters.
* route filters.
*/
protected List<? extends Endpoint> routing(OutboundInvocation<? extends O> invocation) {
RouteFilterChain.Chain chain = new RouteFilterChain.Chain(routeFilters);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ public OutboundInvocation(T request, InvocationContext context) {
/**
* Constructs an OutboundInvocation with a request and a base invocation.
*
* @param request the request associated with this invocation
* @param invocation the base invocation from which to derive properties
* @param request the request associated with this invocation
* @param invocation the base invocation from which to derive properties
*/
public OutboundInvocation(T request, Invocation<?> invocation) {
this.request = request;
Expand Down Expand Up @@ -103,6 +103,18 @@ public List<? extends Endpoint> getEndpoints() {
return routeTarget == null ? new ArrayList<>(0) : routeTarget.getEndpoints();
}

/**
* Get the routing target. When the routing target is empty, the default routing target is returned.
*
* @return RouteTarget
*/
public RouteTarget getRouteTarget() {
if (null == routeTarget) {
routeTarget = RouteTarget.forward(getInstances());
}
return routeTarget;
}

@Override
protected TrafficEventBuilder configure(TrafficEventBuilder builder) {
Unit targetUnit = routeTarget == null ? null : routeTarget.getUnit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
*/
public interface Retrier {

String RETRY_MARK = "retryMark";

/**
* Execute retry logic
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,18 @@
import com.jd.live.agent.bootstrap.bytekit.context.ExecutableContext;
import com.jd.live.agent.bootstrap.bytekit.context.MethodContext;
import com.jd.live.agent.bootstrap.exception.RejectException;
import com.jd.live.agent.governance.context.RequestContext;
import com.jd.live.agent.governance.interceptor.AbstractInterceptor.AbstractOutboundInterceptor;
import com.jd.live.agent.governance.invoke.InvocationContext;
import com.jd.live.agent.governance.invoke.filter.OutboundFilter;
import com.jd.live.agent.governance.invoke.filter.OutboundFilterChain;
import com.jd.live.agent.governance.invoke.retry.Retrier;
import com.jd.live.agent.governance.response.Response;
import com.jd.live.agent.plugin.router.dubbo.v2_6.request.DubboRequest.DubboOutboundRequest;
import com.jd.live.agent.plugin.router.dubbo.v2_6.request.invoke.DubboInvocation.DubboOutboundInvocation;
import com.jd.live.agent.plugin.router.dubbo.v2_6.response.DubboResponse.DubboOutboundResponse;

import java.lang.reflect.InvocationTargetException;
import java.util.List;

/**
Expand All @@ -49,16 +52,32 @@ public MonitorFilterInterceptor(InvocationContext context, List<OutboundFilter>
*/
@Override
public void onEnter(ExecutableContext ctx) {
if (RequestContext.getAttribute(Retrier.RETRY_MARK) != null) {
return;
} else {
RequestContext.setAttribute(Retrier.RETRY_MARK, Boolean.TRUE);
}
MethodContext mc = (MethodContext) ctx;
Invocation invocation = (Invocation) mc.getArguments()[1];
Object result;
try {
DubboOutboundInvocation outboundInvocation = process(new DubboOutboundRequest(invocation));
result = invokeWithRetry(outboundInvocation, mc);
Response response = invokeWithRetry(outboundInvocation, mc);
if (response.getThrowable() != null) {
if (response.getThrowable() instanceof InvocationTargetException) {
mc.setThrowable(((InvocationTargetException) response.getThrowable()).getTargetException());
} else {
mc.setThrowable(response.getThrowable());
}
} else {
mc.setResult(response.getResponse());
}
} catch (RejectException e) {
result = new RpcResult(new RpcException(RpcException.FORBIDDEN_EXCEPTION, e.getMessage()));
mc.setResult(result);
} finally {
RequestContext.remove();
}
mc.setResult(result);
mc.setSkip(true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,20 @@
import com.jd.live.agent.bootstrap.bytekit.context.ExecutableContext;
import com.jd.live.agent.bootstrap.bytekit.context.MethodContext;
import com.jd.live.agent.bootstrap.exception.RejectException;
import com.jd.live.agent.governance.context.RequestContext;
import com.jd.live.agent.governance.interceptor.AbstractInterceptor.AbstractOutboundInterceptor;
import com.jd.live.agent.governance.invoke.InvocationContext;
import com.jd.live.agent.governance.invoke.filter.OutboundFilter;
import com.jd.live.agent.governance.invoke.filter.OutboundFilterChain;
import com.jd.live.agent.governance.invoke.retry.Retrier;
import com.jd.live.agent.governance.response.Response;
import com.jd.live.agent.plugin.router.dubbo.v2_7.request.DubboRequest.DubboOutboundRequest;
import com.jd.live.agent.plugin.router.dubbo.v2_7.request.invoke.DubboInvocation.DubboOutboundInvocation;
import com.jd.live.agent.plugin.router.dubbo.v2_7.response.DubboResponse.DubboOutboundResponse;
import org.apache.dubbo.monitor.support.MonitorFilter;
import org.apache.dubbo.rpc.*;

import java.lang.reflect.InvocationTargetException;
import java.util.List;

/**
Expand All @@ -50,17 +53,33 @@ public MonitorFilterInterceptor(InvocationContext context, List<OutboundFilter>
*/
@Override
public void onEnter(ExecutableContext ctx) {
if (RequestContext.getAttribute(Retrier.RETRY_MARK) != null) {
return;
} else {
RequestContext.setAttribute(Retrier.RETRY_MARK, Boolean.TRUE);
}
MethodContext mc = (MethodContext) ctx;
Object[] arguments = mc.getArguments();
Invocation invocation = (Invocation) arguments[1];
Object result;
try {
DubboOutboundInvocation outboundInvocation = process(new DubboOutboundRequest(invocation));
result = invokeWithRetry(outboundInvocation, mc);
Response response = invokeWithRetry(outboundInvocation, mc);
if (response.getThrowable() != null) {
if (response.getThrowable() instanceof InvocationTargetException) {
mc.setThrowable(((InvocationTargetException) response.getThrowable()).getTargetException());
} else {
mc.setThrowable(response.getThrowable());
}
} else {
mc.setResult(response.getResponse());
}
} catch (RejectException e) {
result = new AppResponse(new RpcException(RpcException.FORBIDDEN_EXCEPTION, e.getMessage()));
mc.setResult(result);
} finally {
RequestContext.remove();
}
mc.setResult(result);
mc.setSkip(true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,20 @@
import com.jd.live.agent.bootstrap.bytekit.context.ExecutableContext;
import com.jd.live.agent.bootstrap.bytekit.context.MethodContext;
import com.jd.live.agent.bootstrap.exception.RejectException;
import com.jd.live.agent.governance.context.RequestContext;
import com.jd.live.agent.governance.interceptor.AbstractInterceptor.AbstractOutboundInterceptor;
import com.jd.live.agent.governance.invoke.InvocationContext;
import com.jd.live.agent.governance.invoke.filter.OutboundFilter;
import com.jd.live.agent.governance.invoke.filter.OutboundFilterChain;
import com.jd.live.agent.governance.invoke.retry.Retrier;
import com.jd.live.agent.governance.response.Response;
import com.jd.live.agent.plugin.router.dubbo.v3.request.DubboRequest.DubboOutboundRequest;
import com.jd.live.agent.plugin.router.dubbo.v3.request.invoke.DubboInvocation.DubboOutboundInvocation;
import com.jd.live.agent.plugin.router.dubbo.v3.response.DubboResponse.DubboOutboundResponse;
import org.apache.dubbo.rpc.*;
import org.apache.dubbo.rpc.cluster.filter.support.ConsumerClassLoaderFilter;

import java.lang.reflect.InvocationTargetException;
import java.util.List;

/**
Expand All @@ -50,17 +53,33 @@ public ConsumerClassLoaderFilterInterceptor(InvocationContext context, List<Outb
*/
@Override
public void onEnter(ExecutableContext ctx) {
if (RequestContext.getAttribute(Retrier.RETRY_MARK) != null) {
return;
} else {
RequestContext.setAttribute(Retrier.RETRY_MARK, Boolean.TRUE);
}
MethodContext mc = (MethodContext) ctx;
Object[] arguments = mc.getArguments();
Invocation invocation = (Invocation) arguments[1];
Object result;
try {
DubboOutboundInvocation outboundInvocation = process(new DubboOutboundRequest(invocation));
result = invokeWithRetry(outboundInvocation, mc);
Response response = invokeWithRetry(outboundInvocation, mc);
if (response.getThrowable() != null) {
if (response.getThrowable() instanceof InvocationTargetException) {
mc.setThrowable(((InvocationTargetException) response.getThrowable()).getTargetException());
} else {
mc.setThrowable(response.getThrowable());
}
} else {
mc.setResult(response.getResponse());
}
} catch (RejectException e) {
result = new AppResponse(new RpcException(RpcException.FORBIDDEN_EXCEPTION, e.getMessage()));
mc.setResult(result);
} finally {
RequestContext.remove();
}
mc.setResult(result);
mc.setSkip(true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,12 @@ protected SofaRpcOutboundInvocation createOutlet(SofaRpcOutboundRequest request)
/**
* Determines if the provided response is retryable based on the nature of the response
* and the type of exception it contains.
*
* <p>
* This method considers a response retryable under two main conditions:
* 1. The response object is null, indicating that no successful response was received.
* 2. The throwable within the response is an instance of {@code SofaRpcException} with
* specific error types such as SERVER_BUSY or CLIENT_TIMEOUT, which are typically
* transient errors and might be resolved upon retrying.
* specific error types such as SERVER_BUSY or CLIENT_TIMEOUT, which are typically
* transient errors and might be resolved upon retrying.
*
* @param response The RPC response to evaluate for retryability.
* @return {@code true} if the response is considered retryable; {@code false} otherwise.
Expand Down Expand Up @@ -175,7 +175,7 @@ private boolean isRetryable(Response response) {
*/
private static class SofaRpcInvocationContext extends InvocationContext.InvocationContextDelegate {

public SofaRpcInvocationContext(InvocationContext delegate) {
SofaRpcInvocationContext(InvocationContext delegate) {
super(delegate);
}

Expand All @@ -200,7 +200,7 @@ public LoadBalancer getOrDefaultLoadBalancer(String name) {
*/
private static class SofaRpcLoadBalancer extends LoadBalancer.LoadBalancerDelegate {

public SofaRpcLoadBalancer(LoadBalancer delegate) {
SofaRpcLoadBalancer(LoadBalancer delegate) {
super(delegate);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import com.jd.live.agent.governance.config.GovernanceConfig;
import com.jd.live.agent.governance.invoke.InvocationContext;
import com.jd.live.agent.governance.invoke.filter.OutboundFilter;
import com.jd.live.agent.plugin.router.springcloud.v3.interceptor.FeignRequestInterceptor;
import com.jd.live.agent.plugin.router.springcloud.v3.interceptor.FeignClientRequestInterceptor;

import java.util.List;

Expand All @@ -52,7 +52,7 @@ public class FeignClientRequestDefinition extends PluginDefinitionAdapter {
private static final String METHOD_EXECUTE = "execute";

private static final String[] ARGUMENT_EXECUTE = new String[]{
"feign.Request", "feign.Request.Options"
"feign.Request", "feign.Request$Options"
};

@Inject(InvocationContext.COMPONENT_INVOCATION_CONTEXT)
Expand All @@ -66,8 +66,9 @@ public FeignClientRequestDefinition() {
this.matcher = () -> MatcherBuilder.isImplement(TYPE_FEIGN_CLIENT_CLASS);
this.interceptors = new InterceptorDefinition[]{
new InterceptorDefinitionAdapter(
MatcherBuilder.named(METHOD_EXECUTE),
() -> new FeignRequestInterceptor(context, filters)
MatcherBuilder.named(METHOD_EXECUTE).
and(MatcherBuilder.arguments(ARGUMENT_EXECUTE)),
() -> new FeignClientRequestInterceptor(context, filters)
)
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,25 @@
import com.jd.live.agent.governance.invoke.InvocationContext;
import com.jd.live.agent.governance.invoke.OutboundInvocation.HttpOutboundInvocation;
import com.jd.live.agent.governance.invoke.filter.OutboundFilter;
import com.jd.live.agent.governance.invoke.retry.Retrier;
import com.jd.live.agent.governance.response.Response;
import com.jd.live.agent.plugin.router.springcloud.v3.request.FeignOutboundRequest;
import com.jd.live.agent.plugin.router.springcloud.v3.response.FeignOutboundResponse;
import feign.Request;
import org.springframework.http.HttpStatus;

import java.lang.reflect.InvocationTargetException;
import java.nio.charset.StandardCharsets;
import java.util.List;

/**
* FeignRequestInterceptor
* FeignClientRequestInterceptor
*
* @since 1.0.0
*/
public class FeignRequestInterceptor extends AbstractHttpOutboundInterceptor<FeignOutboundRequest> {
public class FeignClientRequestInterceptor extends AbstractHttpOutboundInterceptor<FeignOutboundRequest> {

public FeignRequestInterceptor(InvocationContext context, List<OutboundFilter> filters) {
public FeignClientRequestInterceptor(InvocationContext context, List<OutboundFilter> filters) {
super(context, filters);
}

Expand All @@ -53,27 +55,35 @@ public FeignRequestInterceptor(InvocationContext context, List<OutboundFilter> f
*/
@Override
public void onEnter(ExecutableContext ctx) {
if (RequestContext.getAttribute(Retrier.RETRY_MARK) != null) {
return;
} else {
RequestContext.setAttribute(Retrier.RETRY_MARK, Boolean.TRUE);
}
MethodContext mc = (MethodContext) ctx;
Request request = (Request) mc.getArguments()[0];
HttpOutboundInvocation<FeignOutboundRequest> invocation;
try {
invocation = process(new FeignOutboundRequest(request, RequestContext.getAttribute(Carrier.ATTRIBUTE_SERVICE_ID)));
mc.setResult(invokeWithRetry(invocation, mc));
Response response = invokeWithRetry(invocation, mc);
if (response.getThrowable() != null) {
if (response.getThrowable() instanceof InvocationTargetException) {
mc.setThrowable(((InvocationTargetException) response.getThrowable()).getTargetException());
} else {
mc.setThrowable(response.getThrowable());
}
} else {
mc.setResult(response.getResponse());
}
} catch (RejectException e) {
mc.setResult(feign.Response.builder().status(HttpStatus.REQUESTED_RANGE_NOT_SATISFIABLE.value())
.body(e.getMessage().getBytes(StandardCharsets.UTF_8)).build());
} finally {
RequestContext.remove();
}
mc.setSkip(true);
}

/**
* {@inheritDoc}
*/
@Override
public void onExit(ExecutableContext ctx) {
RequestContext.remove();
}

@Override
protected Response createResponse(Object result, Throwable throwable) {
return new FeignOutboundResponse((feign.Response) result, throwable);
Expand Down
Loading

0 comments on commit 633eb46

Please sign in to comment.