Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: Integrate RocketMQ into Seata #3974

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
22f069c
feature-透过Spring的后置代理器实现了RocketMQ Bean的拦截,并返回了静态代理类
GasolLY Aug 7, 2021
9250bf4
Merge branch 'develop' into feature-tcc-rocketmq
slievrly Aug 24, 2021
65b1d0d
feature-初步实现TCC集成RocketMQ的目标
GasolLY Sep 23, 2021
b0503b4
初步完成tcc 集成 rocketmq
GasolLY Sep 23, 2021
44ada44
reformat code
GasolLY Sep 23, 2021
1a48e4a
Merge branch 'develop' into feature-tcc-rocketmq
wangliang181230 Sep 24, 2021
d9b01f4
添加了RocketMQAspect的自动配置,并对代码进行了优化
GasolLY Sep 24, 2021
077f473
将rocketmq自动配置类从seata-spring模块移动到seata-spring-boot-starter模块
GasolLY Sep 25, 2021
f18251e
Merge branch 'develop' into feature-tcc-rocketmq
wangliang181230 Sep 28, 2021
4c04890
代码优化
GasolLY Sep 28, 2021
9475bc3
Merge remote-tracking branch 'gitlab/feature-tcc-rocketmq' into featu…
GasolLY Sep 28, 2021
a2a0643
merge修复
GasolLY Sep 28, 2021
e1f1f8a
bugfix
GasolLY Sep 28, 2021
2f878ae
bugfix
GasolLY Sep 28, 2021
618bac2
修改seata.rocketmq-enabled默认值为false
GasolLY Sep 29, 2021
0ecce21
Merge remote-tracking branch 'origin/feature-tcc-rocketmq' into featu…
GasolLY Sep 29, 2021
69909f2
Merge remote-tracking branch 'origin/develop' into feature-tcc-rocketmq
GasolLY Oct 20, 2021
7528426
使用静态代理重构代码
GasolLY Oct 20, 2021
62cfc33
Merge branch 'develop' into feature-tcc-rocketmq
funky-eyes Mar 1, 2022
038e7ea
bug fix
GasolLY May 27, 2022
cc4b655
Merge branch '2.x' into feature-tcc-rocketmq
funky-eyes Jan 30, 2023
df29716
opt
funky-eyes Feb 1, 2023
74aaf43
opt
funky-eyes Feb 1, 2023
c22aea7
opt
funky-eyes Feb 1, 2023
6a85838
opt
funky-eyes Feb 2, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions dependencies/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@
<mockito.version>2.23.4</mockito.version>
<assertj-core.version>3.12.2</assertj-core.version>
<jetty-version>9.4.38.v20210224</jetty-version>
<rocketmq-version>4.9.0</rocketmq-version>
<bytebuddy.version>1.12.17</bytebuddy.version>
</properties>

Expand Down Expand Up @@ -687,6 +688,12 @@
<artifactId>jetty-servlet</artifactId>
<version>${jetty-version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq-version}</version>
<optional>true</optional>
</dependency>
</dependencies>
</dependencyManagement>
</project>
4 changes: 2 additions & 2 deletions integration-tx-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>io.seata</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@ public class DefaultCommonFenceHandler implements FenceHandler {

private FenceHandler fenceHandler;

private static class SingletonHolder {
private static final DefaultCommonFenceHandler INSTANCE = new DefaultCommonFenceHandler();
}

public static DefaultCommonFenceHandler get() {
return DefaultCommonFenceHandler.SingletonHolder.INSTANCE;
}
Expand Down Expand Up @@ -69,4 +65,8 @@ public int deleteFenceByDate(Date datetime) {
check();
return fenceHandler.deleteFenceByDate(datetime);
}

private static class SingletonHolder {
private static final DefaultCommonFenceHandler INSTANCE = new DefaultCommonFenceHandler();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,24 +46,20 @@ public class CommonFenceConfig implements Disposable {
* Common fence clean period max value. maximum interval is 68 years
*/
private static final Duration MAX_PERIOD = Duration.ofSeconds(Integer.MAX_VALUE);

/**
* Common fence clean scheduled thread pool
*/
private final ScheduledThreadPoolExecutor tccFenceClean = new ScheduledThreadPoolExecutor(1,
new NamedThreadFactory("tccFenceClean", 1));
/**
* Common fence clean period. only duration type format are supported
*/
private Duration cleanPeriod = Duration.ofDays(DefaultValues.DEFAULT_COMMON_FENCE_CLEAN_PERIOD);

/**
* Common fence log table name
*/
private String logTableName = DefaultValues.DEFAULT_COMMON_FENCE_LOG_TABLE_NAME;


/**
* Common fence clean scheduled thread pool
*/
private final ScheduledThreadPoolExecutor tccFenceClean = new ScheduledThreadPoolExecutor(1,
new NamedThreadFactory("tccFenceClean", 1));

public AtomicBoolean getInitialized() {
return initialized;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,12 @@
*/
public class CommonFenceStoreDataBaseDAO implements CommonFenceStore {

private static volatile CommonFenceStoreDataBaseDAO instance = null;
/**
* Common fence log table name
*/
private String logTableName = DefaultValues.DEFAULT_COMMON_FENCE_LOG_TABLE_NAME;

private static volatile CommonFenceStoreDataBaseDAO instance = null;

private CommonFenceStoreDataBaseDAO() {}

public static CommonFenceStore getInstance() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@
*/
public class CommonFenceStoreSqls {

private CommonFenceStoreSqls() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

为什么夹杂这么多tccfence的改动,而且看起来好像都是调了位置而已

throw new IllegalStateException("Utility class");
}

/**
* The constant LOCAL_TCC_LOG_PLACEHOLD.
*/
Expand All @@ -46,21 +42,27 @@ private CommonFenceStoreSqls() {
+ " (xid, branch_id, action_name, status, gmt_create, gmt_modified) "
+ " values (?, ?, ?, ?, ?, ?) ";

/**
* The constant QUERY_END_STATUS_BY_DATE.
*/
protected static final String QUERY_END_STATUS_BY_DATE = "select xid, branch_id, status, gmt_create, gmt_modified "
+ "from " + LOCAL_TCC_LOG_PLACEHOLD
+ " where gmt_modified < ? "
+ " and status in (" + CommonFenceConstant.STATUS_COMMITTED + " , " + CommonFenceConstant.STATUS_ROLLBACKED + " , " + CommonFenceConstant.STATUS_SUSPENDED + ")"
+ " limit ?";

/**
* The constant QUERY_BY_BRANCH_ID_AND_XID.
*/
protected static final String QUERY_BY_BRANCH_ID_AND_XID = "select xid, branch_id, status, gmt_create, gmt_modified "
+ "from " + LOCAL_TCC_LOG_PLACEHOLD
+ " where xid = ? and branch_id = ? for update";

/**
* The constant QUERY_END_STATUS_BY_DATE.
* The constant DELETE_BY_DATE_AND_STATUS.
*/
protected static final String QUERY_END_STATUS_BY_DATE = "select xid, branch_id, status, gmt_create, gmt_modified "
+ "from " + LOCAL_TCC_LOG_PLACEHOLD
+ " where gmt_modified < ? "
+ " and status in (" + CommonFenceConstant.STATUS_COMMITTED + " , " + CommonFenceConstant.STATUS_ROLLBACKED + " , " + CommonFenceConstant.STATUS_SUSPENDED + ")"
+ " limit ?";
protected static final String DELETE_BY_DATE_AND_STATUS = "delete from " + LOCAL_TCC_LOG_PLACEHOLD
+ " where gmt_modified < ? "
+ " and status in (" + CommonFenceConstant.STATUS_COMMITTED + " , " + CommonFenceConstant.STATUS_ROLLBACKED + " , " + CommonFenceConstant.STATUS_SUSPENDED + ")";

/**
* The constant UPDATE_STATUS_BY_BRANCH_ID_AND_XID.
Expand All @@ -79,12 +81,9 @@ private CommonFenceStoreSqls() {
protected static final String DELETE_BY_BRANCH_XIDS = "delete from " + LOCAL_TCC_LOG_PLACEHOLD + " where xid in (" + PRAMETER_PLACEHOLD + ")";


/**
* The constant DELETE_BY_DATE_AND_STATUS.
*/
protected static final String DELETE_BY_DATE_AND_STATUS = "delete from " + LOCAL_TCC_LOG_PLACEHOLD
+ " where gmt_modified < ? "
+ " and status in (" + CommonFenceConstant.STATUS_COMMITTED + " , " + CommonFenceConstant.STATUS_ROLLBACKED + " , " + CommonFenceConstant.STATUS_SUSPENDED + ")";
private CommonFenceStoreSqls() {
throw new IllegalStateException("Utility class");
}

public static String getInsertLocalTCCLogSQL(String localTccTable) {
return INSERT_LOCAL_TCC_LOG.replace(LOCAL_TCC_LOG_PLACEHOLD, localTccTable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,43 +72,21 @@ public class GlobalTransactionalInterceptorHandler extends AbstractProxyInvocati

private final TransactionalTemplate transactionalTemplate = new TransactionalTemplate();
private final GlobalLockTemplate globalLockTemplate = new GlobalLockTemplate();

private Set<String> methodsToProxy;
private static int degradeCheckAllowTimes;

private volatile boolean disable;
private static final AtomicBoolean ATOMIC_DEGRADE_CHECK = new AtomicBoolean(false);
private static volatile Integer degradeNum = 0;
private static volatile Integer reachNum = 0;
private static int degradeCheckAllowTimes;
protected AspectTransactional aspectTransactional;
private static int degradeCheckPeriod;

private static int defaultGlobalTransactionTimeout = 0;

private final FailureHandler failureHandler;
protected AspectTransactional aspectTransactional;
private Set<String> methodsToProxy;

private static final EventBus EVENT_BUS = new GuavaEventBus("degradeCheckEventBus", true);
private static volatile ScheduledThreadPoolExecutor executor;

private void initDefaultGlobalTransactionTimeout() {
if (GlobalTransactionalInterceptorHandler.defaultGlobalTransactionTimeout <= 0) {
int defaultGlobalTransactionTimeout;
try {
defaultGlobalTransactionTimeout = ConfigurationFactory.getInstance().getInt(
ConfigurationKeys.DEFAULT_GLOBAL_TRANSACTION_TIMEOUT, DEFAULT_GLOBAL_TRANSACTION_TIMEOUT);
} catch (Exception e) {
LOGGER.error("Illegal global transaction timeout value: " + e.getMessage());
defaultGlobalTransactionTimeout = DEFAULT_GLOBAL_TRANSACTION_TIMEOUT;
}
if (defaultGlobalTransactionTimeout <= 0) {
LOGGER.warn("Global transaction timeout value '{}' is illegal, and has been reset to the default value '{}'",
defaultGlobalTransactionTimeout, DEFAULT_GLOBAL_TRANSACTION_TIMEOUT);
defaultGlobalTransactionTimeout = DEFAULT_GLOBAL_TRANSACTION_TIMEOUT;
}
GlobalTransactionalInterceptorHandler.defaultGlobalTransactionTimeout = defaultGlobalTransactionTimeout;
}
}

public GlobalTransactionalInterceptorHandler(FailureHandler failureHandler, Set<String> methodsToProxy) {
this.failureHandler = failureHandler == null ? FailureHandlerHolder.getFailureHandler() : failureHandler;
this.methodsToProxy = methodsToProxy;
Expand All @@ -134,6 +112,25 @@ public GlobalTransactionalInterceptorHandler(FailureHandler failureHandler, Set<
this.aspectTransactional = aspectTransactional;
}

private void initDefaultGlobalTransactionTimeout() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个没必要挪位置吧,看不出来改动了什么

if (GlobalTransactionalInterceptorHandler.defaultGlobalTransactionTimeout <= 0) {
int defaultGlobalTransactionTimeout;
try {
defaultGlobalTransactionTimeout = ConfigurationFactory.getInstance().getInt(
ConfigurationKeys.DEFAULT_GLOBAL_TRANSACTION_TIMEOUT, DEFAULT_GLOBAL_TRANSACTION_TIMEOUT);
} catch (Exception e) {
LOGGER.error("Illegal global transaction timeout value: " + e.getMessage());
defaultGlobalTransactionTimeout = DEFAULT_GLOBAL_TRANSACTION_TIMEOUT;
}
if (defaultGlobalTransactionTimeout <= 0) {
LOGGER.warn("Global transaction timeout value '{}' is illegal, and has been reset to the default value '{}'",
defaultGlobalTransactionTimeout, DEFAULT_GLOBAL_TRANSACTION_TIMEOUT);
defaultGlobalTransactionTimeout = DEFAULT_GLOBAL_TRANSACTION_TIMEOUT;
}
GlobalTransactionalInterceptorHandler.defaultGlobalTransactionTimeout = defaultGlobalTransactionTimeout;
}
}

@Override
public Set<String> getMethodsToProxy() {
return methodsToProxy;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,14 @@ public class DefaultInterfaceParser implements InterfaceParser {
protected static final List<InterfaceParser> ALL_INTERFACE_PARSERS = new ArrayList<>();


private static class SingletonHolder {
private static final DefaultInterfaceParser INSTANCE = new DefaultInterfaceParser();
protected DefaultInterfaceParser() {
initInterfaceParser();
}

public static DefaultInterfaceParser get() {
return DefaultInterfaceParser.SingletonHolder.INSTANCE;
}

protected DefaultInterfaceParser() {
initInterfaceParser();
}

/**
* init parsers
*/
Expand All @@ -64,4 +60,8 @@ public ProxyInvocationHandler parserInterfaceToProxy(Object target) throws Excep
return null;
}

private static class SingletonHolder {
private static final DefaultInterfaceParser INSTANCE = new DefaultInterfaceParser();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,18 @@ public class DefaultResourceRegisterParser {

protected static List<RegisterResourceParser> allRegisterResourceParsers = new ArrayList<>();

public void registerResource(Object target) {
for (RegisterResourceParser registerResourceParser : allRegisterResourceParsers) {
registerResourceParser.registerResource(target);
}
}

private static class SingletonHolder {
private static final DefaultResourceRegisterParser INSTANCE = new DefaultResourceRegisterParser();
protected DefaultResourceRegisterParser() {
initResourceRegisterParser();
}

public static DefaultResourceRegisterParser get() {
return DefaultResourceRegisterParser.SingletonHolder.INSTANCE;
}

protected DefaultResourceRegisterParser() {
initResourceRegisterParser();
public void registerResource(Object target) {
for (RegisterResourceParser registerResourceParser : allRegisterResourceParsers) {
registerResourceParser.registerResource(target);
}
}

/**
Expand All @@ -57,5 +53,9 @@ protected void initResourceRegisterParser() {
}
}

private static class SingletonHolder {
private static final DefaultResourceRegisterParser INSTANCE = new DefaultResourceRegisterParser();
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,14 @@ public class DefaultTargetClassParser implements TargetClassParser {
protected static List<TargetClassParser> allTargetClassParsers = new ArrayList<>();


private static class SingletonHolder {
private static final DefaultTargetClassParser INSTANCE = new DefaultTargetClassParser();
protected DefaultTargetClassParser() {
initTargetClassParser();
}

public static DefaultTargetClassParser get() {
return DefaultTargetClassParser.SingletonHolder.INSTANCE;
}

protected DefaultTargetClassParser() {
initTargetClassParser();
}

/**
* init parsers
*/
Expand Down Expand Up @@ -73,4 +69,8 @@ public Class<?>[] findInterfaces(Object target) throws Exception {
}
return target.getClass().getInterfaces();
}

private static class SingletonHolder {
private static final DefaultTargetClassParser INSTANCE = new DefaultTargetClassParser();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@ public class DefaultJsonParser implements JsonParser {

protected static List<JsonParser> allJsonParsers = new ArrayList<>();

private static class SingletonHolder {
private static final DefaultJsonParser INSTANCE = new DefaultJsonParser();
}

private DefaultJsonParser() {
initJsonParser();
}
Expand Down Expand Up @@ -73,4 +69,8 @@ public <T> T parseObject(String text, Class<T> clazz) {
return null;
}

private static class SingletonHolder {
private static final DefaultJsonParser INSTANCE = new DefaultJsonParser();
}

}
1 change: 1 addition & 0 deletions script/client/spring/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#

seata.enabled=true
seata.rocketmq-enabled=false
seata.scan-packages=firstPackage,secondPackage
seata.excludes-for-scanning=firstBeanNameForExclude,secondBeanNameForExclude
seata.excludes-for-auto-proxying=firstClassNameForExclude,secondClassNameForExclude
Expand Down
1 change: 1 addition & 0 deletions script/client/spring/application.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
seata:
enabled: true
rocketmq-enabled: false
application-id: applicationName
tx-service-group: default_tx_group
access-key: aliyunAccessKey
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ public class SeataProperties {
* whether enable auto configuration
*/
private boolean enabled = true;
/**
* whether enable rocketmq integrate
*/
private boolean rocketmqEnabled = false;
/**
* application id
*/
Expand Down Expand Up @@ -87,6 +91,14 @@ public SeataProperties setEnabled(boolean enabled) {
return this;
}

public boolean isRocketmqEnabled() {
return rocketmqEnabled;
}

public void setRocketmqEnabled(boolean rocketmqEnabled) {
this.rocketmqEnabled = rocketmqEnabled;
}

public String getApplicationId() {
if (applicationId == null) {
applicationId = springCloudAlibabaConfiguration.getApplicationId();
Expand Down
Loading