Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: Integrate RocketMQ into Seata #3974

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
22f069c
feature-透过Spring的后置代理器实现了RocketMQ Bean的拦截,并返回了静态代理类
GasolLY Aug 7, 2021
9250bf4
Merge branch 'develop' into feature-tcc-rocketmq
slievrly Aug 24, 2021
65b1d0d
feature-初步实现TCC集成RocketMQ的目标
GasolLY Sep 23, 2021
b0503b4
初步完成tcc 集成 rocketmq
GasolLY Sep 23, 2021
44ada44
reformat code
GasolLY Sep 23, 2021
1a48e4a
Merge branch 'develop' into feature-tcc-rocketmq
wangliang181230 Sep 24, 2021
d9b01f4
添加了RocketMQAspect的自动配置,并对代码进行了优化
GasolLY Sep 24, 2021
077f473
将rocketmq自动配置类从seata-spring模块移动到seata-spring-boot-starter模块
GasolLY Sep 25, 2021
f18251e
Merge branch 'develop' into feature-tcc-rocketmq
wangliang181230 Sep 28, 2021
4c04890
代码优化
GasolLY Sep 28, 2021
9475bc3
Merge remote-tracking branch 'gitlab/feature-tcc-rocketmq' into featu…
GasolLY Sep 28, 2021
a2a0643
merge修复
GasolLY Sep 28, 2021
e1f1f8a
bugfix
GasolLY Sep 28, 2021
2f878ae
bugfix
GasolLY Sep 28, 2021
618bac2
修改seata.rocketmq-enabled默认值为false
GasolLY Sep 29, 2021
0ecce21
Merge remote-tracking branch 'origin/feature-tcc-rocketmq' into featu…
GasolLY Sep 29, 2021
69909f2
Merge remote-tracking branch 'origin/develop' into feature-tcc-rocketmq
GasolLY Oct 20, 2021
7528426
使用静态代理重构代码
GasolLY Oct 20, 2021
62cfc33
Merge branch 'develop' into feature-tcc-rocketmq
funky-eyes Mar 1, 2022
038e7ea
bug fix
GasolLY May 27, 2022
cc4b655
Merge branch '2.x' into feature-tcc-rocketmq
funky-eyes Jan 30, 2023
df29716
opt
funky-eyes Feb 1, 2023
74aaf43
opt
funky-eyes Feb 1, 2023
c22aea7
opt
funky-eyes Feb 1, 2023
6a85838
opt
funky-eyes Feb 2, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions script/client/spring/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#

seata.enabled=true
seata.rocketmq-enabled=false
seata.scan-packages=firstPackage,secondPackage
seata.excludes-for-scanning=firstBeanNameForExclude,secondBeanNameForExclude
seata.excludes-for-auto-proxying=firstClassNameForExclude,secondClassNameForExclude
Expand Down
1 change: 1 addition & 0 deletions script/client/spring/application.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
seata:
enabled: true
rocketmq-enabled: false
application-id: applicationName
tx-service-group: default_tx_group
enable-auto-data-source-proxy: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ public class SeataProperties {
* whether enable auto configuration
*/
private boolean enabled = true;
/**
* whether enable rocketmq integrate
*/
private boolean rocketmqEnabled = false;
/**
* application id
*/
Expand Down Expand Up @@ -77,6 +81,14 @@ public SeataProperties setEnabled(boolean enabled) {
return this;
}

public boolean isRocketmqEnabled() {
return rocketmqEnabled;
}

public void setRocketmqEnabled(boolean rocketmqEnabled) {
this.rocketmqEnabled = rocketmqEnabled;
}

public String getApplicationId() {
if (applicationId == null) {
applicationId = springCloudAlibabaConfiguration.getApplicationId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@
"sourceType": "io.seata.spring.boot.autoconfigure.properties.SeataProperties",
"defaultValue": "AT"
},
{
"name": "seata.rocketmq-enabled",
"type": "java.lang.Boolean",
"sourceType": "io.seata.spring.boot.autoconfigure.properties.SeataProperties",
"description": "Enable seata integrate rocketmq",
"defaultValue": false
},
{
"name": "spring.cloud.alibaba.seata.application-id",
"type": "java.lang.String",
Expand Down
7 changes: 7 additions & 0 deletions seata-spring-boot-starter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<!-- rocketmq -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.0</version>
<optional>true</optional>
funky-eyes marked this conversation as resolved.
Show resolved Hide resolved
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.spring.boot.autoconfigure;

import io.seata.rm.tcc.rocketmq.RocketMQAspect;
import io.seata.rm.tcc.rocketmq.TCCRocketMQ;
import io.seata.rm.tcc.rocketmq.TCCRocketMQImpl;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;

@ConditionalOnClass(DefaultMQProducer.class)
@ConditionalOnBean(DefaultMQProducer.class)
@ConditionalOnExpression("${seata.enabled:true} && ${seata.rocketmq-enabled:false}")
public class RocketMQAutoConfiguration {

@Bean
@ConditionalOnMissingBean
public TCCRocketMQ tccRocketMQ() {
return new TCCRocketMQImpl();
}

@Bean
public RocketMQAspect rocketMQAspect(TCCRocketMQ tccRocketMQ) {
return new RocketMQAspect(tccRocketMQ);
funky-eyes marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
io.seata.spring.boot.autoconfigure.SeataDataSourceAutoConfiguration,\
io.seata.spring.boot.autoconfigure.SeataAutoConfiguration,\
io.seata.spring.boot.autoconfigure.HttpAutoConfiguration,\
io.seata.spring.boot.autoconfigure.SeataSagaAutoConfiguration
io.seata.spring.boot.autoconfigure.SeataSagaAutoConfiguration,\
io.seata.spring.boot.autoconfigure.RocketMQAutoConfiguration
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.spring.annotation;

import io.seata.rm.tcc.rocketmq.RocketMQAspect;
import io.seata.rm.tcc.rocketmq.TCCRocketMQImpl;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.springframework.context.annotation.Import;

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import({RocketMQAspect.class, TCCRocketMQImpl.class})
public @interface EnableRocketMQAspect {
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public static void addScannablePackages(String... packages) {

@Override
public boolean check(Object bean, String beanName, @Nullable ConfigurableListableBeanFactory beanFactory) throws Exception {
if (SCANNABLE_PACKAGE_SET.isEmpty()) {
if (SCANNABLE_PACKAGE_SET.isEmpty() || bean.getClass().getName().startsWith("io.seata")) {
// if empty, pass this checker
return true;
}
Expand Down
12 changes: 12 additions & 0 deletions tcc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,18 @@
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>

<!-- tcc rocketmq -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aop</artifactId>
funky-eyes marked this conversation as resolved.
Show resolved Hide resolved
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.0</version>
<optional>true</optional>
funky-eyes marked this conversation as resolved.
Show resolved Hide resolved
</dependency>
</dependencies>


Expand Down
47 changes: 47 additions & 0 deletions tcc/src/main/java/io/seata/rm/tcc/rocketmq/RocketMQAspect.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.rm.tcc.rocketmq;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;

public class RocketMQAspect implements BeanPostProcessor {
public static Logger LOGGER = LoggerFactory.getLogger(RocketMQAspect.class);

private final TCCRocketMQ tccRocketMQ;

public RocketMQAspect(TCCRocketMQ tccRocketMQ) {
this.tccRocketMQ = tccRocketMQ;
}

@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
}

@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof DefaultMQProducer) {
LOGGER.info("Generate RocketMQ Producer Proxy");
tccRocketMQ.setDefaultMQProducer((DefaultMQProducer) bean);
return new SeataMQProducer((DefaultMQProducer) bean, tccRocketMQ);
}
return bean;
}
}
84 changes: 84 additions & 0 deletions tcc/src/main/java/io/seata/rm/tcc/rocketmq/RocketMQUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.rm.tcc.rocketmq;

import java.net.UnknownHostException;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.remoting.exception.RemotingException;

public class RocketMQUtils {

public static SendResult halfSend(DefaultMQProducer defaultMQProducer,
Message msg) throws MQClientException {
// ignore DelayTimeLevel parameter
if (msg.getDelayTimeLevel() != 0) {
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
}

Validators.checkMessage(msg, defaultMQProducer);

MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, defaultMQProducer.getProducerGroup());
DefaultMQProducerImpl defaultMQProducerImpl = defaultMQProducer.getDefaultMQProducerImpl();
SendResult sendResult = null;
try {
sendResult = defaultMQProducerImpl.send(msg);
funky-eyes marked this conversation as resolved.
Show resolved Hide resolved
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}

LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
switch (sendResult.getSendStatus()) {
case SEND_OK: {
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
default:
throw new RuntimeException("Message send fail.");
}
return sendResult;
}

public static void confirm(DefaultMQProducer defaultMQProducer, Message msg,
SendResult sendResult) throws UnknownHostException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducerImpl defaultMQProducerImpl = defaultMQProducer.getDefaultMQProducerImpl();
defaultMQProducerImpl.endTransaction(msg, sendResult, LocalTransactionState.COMMIT_MESSAGE, null);
}

public static void cancel(DefaultMQProducer defaultMQProducer, Message msg,
SendResult sendResult) throws UnknownHostException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducerImpl defaultMQProducerImpl = defaultMQProducer.getDefaultMQProducerImpl();
defaultMQProducerImpl.endTransaction(msg, sendResult, LocalTransactionState.ROLLBACK_MESSAGE, null);
}
}
Loading