Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add XDS retry and circuit-breaking functionality to the flow control plugin #1698

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions sermant-plugins/sermant-flowcontrol/config/config.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
# FlowControl configuration
flow.control.plugin:
useCseRule: true # whether to configure cse rules
enable-start-monitor: false # whether to enable indicator monitoring
enable-system-adaptive: false # whether to enable system adaptive flow control
enable-system-rule: false # whether to enable system rule flow control
# whether to configure cse rules
useCseRule: true
# whether to enable indicator monitoring
enable-start-monitor: false
# whether to enable system adaptive flow control
enable-system-adaptive: false
# whether to enable system rule flow control
enable-system-rule: false
xds.flow.control.config:
# Whether to enable Xds flow control
enable: false
# whether to use secure protocol to invoke spring cloud downstream service with xds route, example: http or https
enabled-springcloud-xds-route-secure: false
retry:
retryHostPredicate: PreviousHostsPredicate
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add comment

x-sermant-retriable-status-codes:
x-sermant-retriable-header-names:
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
<netflix-core.version>1.4.7.RELEASE</netflix-core.version>
<spring.cloud.context.version>2.2.0.RELEASE</spring.cloud.context.version>
<google.guava>31.1-jre</google.guava>
<ribbon.version>2.2.5</ribbon.version>
</properties>
<dependencies>
<!--compile-->
Expand Down Expand Up @@ -99,6 +100,12 @@
<version>${netflix-core.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.netflix.ribbon</groupId>
<artifactId>ribbon-loadbalancer</artifactId>
<version>${ribbon.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>jakarta.annotation</groupId>
<artifactId>jakarta.annotation-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.sermant.flowcontrol;

import io.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer;
import io.sermant.core.plugin.config.PluginConfigManager;
import io.sermant.flowcontrol.common.config.XdsFlowControlConfig;

/**
* okhttp request declarer
*
* @author zhp
* @since 2024-11-30
*/
public abstract class AbstractXdsDeclarer extends AbstractPluginDeclarer {
private final XdsFlowControlConfig config = PluginConfigManager.getPluginConfig(XdsFlowControlConfig.class);

@Override
public boolean isEnabled() {
return config.isEnable();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.sermant.flowcontrol.circuit;

import io.sermant.core.plugin.agent.entity.ExecuteContext;
import io.sermant.core.plugin.agent.interceptor.Interceptor;
import io.sermant.core.service.xds.entity.XdsRequestCircuitBreakers;
import io.sermant.flowcontrol.common.entity.BusinessEntity;
import io.sermant.flowcontrol.common.util.StringUtils;
import io.sermant.flowcontrol.common.util.XdsThreadLocalUtil;
import io.sermant.flowcontrol.common.xds.circuit.CircuitBreakerManager;
import io.sermant.flowcontrol.common.xds.handler.XdsFlowControlHandler;

import java.util.Optional;

/**
* Enhance the request header sending method to include an indicator of whether the request byte stream has been sent
* to the server
*
* @param <R> request param
* @author zhp
* @since 2024-11-30
*/
public abstract class AbstractCircuitInterceptor<R> implements Interceptor {
protected static final String MESSAGE = "CircuitBreaker has forced open and deny any requests";

/**
* Perform circuit breaker judgment and handling
*
* @param context context
* @param requestParam request param
*/
public void handlerCircuitBreaker(ExecuteContext context, R requestParam) {
BusinessEntity businessEntity = XdsThreadLocalUtil.getBusinessEntity();
if (businessEntity == null) {
return;
}
Optional<XdsRequestCircuitBreakers> circuitBreakersOptional = XdsFlowControlHandler.INSTANCE.
getRequestCircuitBreakers(businessEntity.getServiceName(), businessEntity.getClusterName());
if (!circuitBreakersOptional.isPresent()) {
return;
}
String businessName = buildBusinessName(requestParam, businessEntity);
int activeRequestNum = CircuitBreakerManager.incrementActiveRequest(businessName);
int maxRequest = circuitBreakersOptional.get().getMaxRequests();
if (maxRequest != 0 && activeRequestNum > maxRequest) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When maxrequest is 0, is maxrequest not configured?

context.skip(buildResult(requestParam, context));
}
businessEntity.setCircuitBusinessName(businessName);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setCircuitBreakerBusinessName

}

@Override
public ExecuteContext after(ExecuteContext context) throws Exception {
BusinessEntity businessEntity = XdsThreadLocalUtil.getBusinessEntity();
if (context.getThrowable() != null || businessEntity == null
|| StringUtils.isEmpty(businessEntity.getCircuitBusinessName())) {
return context;
}
handlerFailureRequest(context, businessEntity);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the request also a failed request triggered by concurrent circuit breaker? This method is also executed when the circuit breaker is not triggered.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When there is no trigger, operations such as counting the number of failed requests will be performed.

return context;
}

@Override
public ExecuteContext onThrow(ExecuteContext context) throws Exception {
BusinessEntity businessEntity = XdsThreadLocalUtil.getBusinessEntity();
if (businessEntity == null || StringUtils.isEmpty(businessEntity.getCircuitBusinessName())) {
return context;
}
handlerFailureRequest(context, businessEntity);
return context;
}

/**
* handler failure request
*
* @param context context
* @param businessEntity business information
*/
protected abstract void handlerFailureRequest(ExecuteContext context, BusinessEntity businessEntity);

/**
* build result
*
* @param context context
* @param requestParam request param
* @return response
*/
protected abstract Object buildResult(R requestParam, ExecuteContext context);

/**
* build result
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update comment

*
* @param businessEntity business information
* @param requestParam request param
* @return business name
*/
protected abstract String buildBusinessName(R requestParam, BusinessEntity businessEntity);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.sermant.flowcontrol.circuit;

import io.sermant.core.plugin.agent.declarer.InterceptDeclarer;
import io.sermant.core.plugin.agent.matcher.ClassMatcher;
import io.sermant.core.plugin.agent.matcher.MethodMatcher;
import io.sermant.core.plugin.config.PluginConfigManager;
import io.sermant.flowcontrol.AbstractXdsDeclarer;
import io.sermant.flowcontrol.common.config.XdsFlowControlConfig;

/**
* okhttp request declarer
*
* @author zhp
* @since 2024-11-30
*/
public class FeignClientDeclarer extends AbstractXdsDeclarer {
/**
* the fully qualified name of the enhanced class
*/
private static final String ENHANCE_CLASS = "feign.client";

private static final int PARAM_COUNT = 2;

private static final String METHOD_NAME = "execute";

private final XdsFlowControlConfig config = PluginConfigManager.getPluginConfig(XdsFlowControlConfig.class);

@Override
public ClassMatcher getClassMatcher() {
return ClassMatcher.isExtendedFrom(ENHANCE_CLASS);
}

@Override
public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) {
return new InterceptDeclarer[]{
InterceptDeclarer.build(MethodMatcher.nameEquals(METHOD_NAME).
and(MethodMatcher.paramCountEquals(PARAM_COUNT)), new FeignClientInterceptor())
};
}

@Override
public boolean isEnabled() {
return config.isEnable();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.sermant.flowcontrol.circuit;

import feign.Request;
import feign.Response;
import io.sermant.core.common.LoggerFactory;
import io.sermant.core.plugin.agent.entity.ExecuteContext;
import io.sermant.core.service.xds.entity.XdsInstanceCircuitBreakers;
import io.sermant.flowcontrol.common.config.CommonConst;
import io.sermant.flowcontrol.common.entity.BusinessEntity;
import io.sermant.flowcontrol.common.xds.circuit.CircuitBreakerManager;
import io.sermant.flowcontrol.common.xds.handler.XdsFlowControlHandler;

import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Optional;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* Enhance the request header sending method to include an indicator of whether the request byte stream has been sent
* to the server
*
* @author zhp
* @since 2024-11-30
*/
public class FeignClientInterceptor extends AbstractCircuitInterceptor<Request> {
private static final Logger LOGGER = LoggerFactory.getLogger();

@Override
public ExecuteContext before(ExecuteContext context) throws Exception {
Object params = context.getArguments()[1];
if (!(params instanceof Request)) {
return context;
}
Request request = (Request) params;
handlerCircuitBreaker(context, request);
return context;
}

@Override
public void handlerFailureRequest(ExecuteContext context, BusinessEntity business) {
CircuitBreakerManager.decrementActiveRequest(business.getCircuitBusinessName());
Object result = context.getResult();
if (!(result instanceof Response)) {
return;
}
Optional<XdsInstanceCircuitBreakers> instanceCircuitBreakersOptional = XdsFlowControlHandler.INSTANCE.
getInstanceCircuitBreakers(business.getServiceName(), business.getClusterName());
if (!instanceCircuitBreakersOptional.isPresent()) {
return;
}
XdsInstanceCircuitBreakers circuitBreakers = instanceCircuitBreakersOptional.get();
Response response = (Response) context.getResult();
if (response == null) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is difficult to understand. Can you add comments to the following logic?

CircuitBreakerManager.addFailureRequest(business.getCircuitBusinessName(), 0, circuitBreakers);
return;
}
CircuitBreakerManager.addFailureRequest(business.getCircuitBusinessName(), response.status(), circuitBreakers);
}

@Override
protected Object buildResult(Request request, ExecuteContext context) {
return Response.builder().status(CommonConst.TOO_MANY_REQUEST_CODE)
.body(MESSAGE, StandardCharsets.UTF_8).headers(Collections.emptyMap())
.reason(MESSAGE).request(request).build();
}

@Override
protected String buildBusinessName(Request request, BusinessEntity businessEntity) {
try {
URL url = new URL(request.url());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the entire URL can be directly used as a part of the business, it is not recommended to parse the URL.

return businessEntity.getClusterName() + CommonConst.CONNECT + businessEntity.getServiceName()
+ CommonConst.CONNECT + url.getHost() + CommonConst.CONNECT + url.getPort();
} catch (MalformedURLException ex) {
LOGGER.log(Level.SEVERE, "URL parsing failed.", ex);
}
return businessEntity.getClusterName() + CommonConst.CONNECT + businessEntity.getServiceName()
+ CommonConst.CONNECT + request.url();
}
}
Loading
Loading