Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature : RocketMQ transaction are supported #6230

Merged
merged 59 commits into from
Mar 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
8edf4d2
license
Bughue Dec 27, 2023
5397478
rocket
Bughue Dec 28, 2023
f67c651
style
Bughue Jan 2, 2024
304df90
check listener
Bughue Jan 2, 2024
93b96f3
bean holder
Bughue Jan 2, 2024
052532b
bean holder
Bughue Jan 2, 2024
b90978b
TCCRocketMQ
Bughue Jan 22, 2024
8803a39
rocketmq
Bughue Jan 25, 2024
0c335bc
Merge branch '2.x' of https://github.com/seata/seata into dev-rocketmq
Bughue Jan 26, 2024
625ff48
rocketmq
Bughue Jan 26, 2024
e11a818
rocketmq
Bughue Jan 26, 2024
1d76507
rocketmq
Bughue Jan 26, 2024
4f74db2
rocketmq
Bughue Jan 26, 2024
4f10e9e
rocketmq
Bughue Jan 26, 2024
bafee71
independent module
Bughue Jan 26, 2024
7fb2ee1
style
Bughue Jan 29, 2024
cc7cec4
style
Bughue Jan 29, 2024
daf0888
style bom
Bughue Jan 29, 2024
5f42ad9
module
Bughue Jan 29, 2024
b85c799
style
Bughue Jan 29, 2024
87643f7
style
Bughue Jan 29, 2024
0a16810
XID
Bughue Feb 1, 2024
768f862
rocket mq factory
Bughue Feb 4, 2024
6efaa5c
optimize producer
Bughue Feb 6, 2024
2e9d615
Merge branch '2.x' of https://github.com/seata/seata into dev-rocketmq
Bughue Feb 6, 2024
1580478
style
Bughue Feb 6, 2024
f1f37c0
fix configuration
Bughue Feb 7, 2024
748e733
getGlobalStatus in AbstractResourceManager
Bughue Feb 19, 2024
d31fc70
test
Bughue Feb 19, 2024
fc71d14
test
Bughue Feb 20, 2024
2b700ab
remove configuration
Bughue Feb 20, 2024
bb41a20
Merge branch '2.x' of https://github.com/seata/seata into dev-rocketmq
Bughue Feb 20, 2024
b8cc88f
tcc api
Bughue Feb 20, 2024
dc9e7c5
tcc api
Bughue Feb 20, 2024
e5a4790
style
Bughue Feb 20, 2024
4faffc7
to many getXX
Bughue Feb 26, 2024
2554991
unit test
Bughue Feb 26, 2024
2e53a3a
unit test
Bughue Feb 26, 2024
3b57588
style
Bughue Feb 27, 2024
547df7e
style
Bughue Feb 27, 2024
cfbb664
style
Bughue Feb 28, 2024
1be3c83
style
Bughue Mar 1, 2024
345baad
mock test
Bughue Mar 1, 2024
30bef40
mock test
Bughue Mar 1, 2024
c91c58d
includeFromCI
Bughue Mar 1, 2024
343c586
excludeCI
Bughue Mar 1, 2024
d17c95e
excludeCI
Bughue Mar 2, 2024
b7af430
excludeCI
Bughue Mar 2, 2024
1318eb0
test
Bughue Mar 2, 2024
de54524
test
Bughue Mar 2, 2024
5b16241
test
Bughue Mar 2, 2024
4d74963
test
Bughue Mar 2, 2024
96e91c4
update
funky-eyes Mar 3, 2024
e273e87
Merge branch '2.x' of github.com:seata/seata into dev-rocketmq
funky-eyes Mar 3, 2024
1de6fd8
update
funky-eyes Mar 3, 2024
c67ad75
update
funky-eyes Mar 3, 2024
eef9e66
update
funky-eyes Mar 3, 2024
8fd38b7
update
funky-eyes Mar 3, 2024
0c6e901
update
funky-eyes Mar 3, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions all/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.seata</groupId>
<artifactId>seata-rocketmq</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.seata</groupId>
<artifactId>seata-sqlparser-core</artifactId>
Expand Down
5 changes: 5 additions & 0 deletions bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,11 @@
<artifactId>seata-http</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.seata</groupId>
<artifactId>seata-rocketmq</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.seata</groupId>
<artifactId>seata-rm</artifactId>
Expand Down
1 change: 1 addition & 0 deletions changes/en-us/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#6370](https://github.com/seata/seata/pull/6370)] seata saga decouple spring, optimize architecture.
- [[#6205](https://github.com/apache/incubator-seata/pull/6205)] mock server
- [[#6169](https://github.com/apache/incubator-seata/pull/6169)] full support for states in the refactored state machine designer
- [[#6230](https://github.com/apache/incubator-seata/pull/6230)] RocketMQ transaction are supported

### bugfix:
- [[#6090](https://github.com/apache/incubator-seata/pull/6090)] fix the TCC aspect exception handling process, do not wrapping the internal call exceptions
Expand Down
1 change: 1 addition & 0 deletions changes/zh-cn/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
- [[#6370](https://github.com/seata/seata/pull/6370)] seata saga spring接耦、架构优化。
- [[#6205](https://github.com/apache/incubator-seata/pull/6205)] 提供mock server
- [[#6169](https://github.com/apache/incubator-seata/pull/6169)] 支持新版本状态机设计器
- [[#6230](https://github.com/apache/incubator-seata/pull/6230)] 支持RocketMQ消息事务

### bugfix:
- [[#6090](https://github.com/apache/incubator-seata/pull/6090)] 修复tcc切面异常处理过程,不对内部调用异常做包装处理,直接向外抛出
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1006,4 +1006,9 @@ public interface ConfigurationKeys {
* The constant SERVER_APPLICATION_DATA_SIZE_CHECK
*/
String SERVER_APPLICATION_DATA_SIZE_CHECK = SERVER_PREFIX + "applicationDataLimitCheck";

/**
* The constant ROCKET_MQ_MSG_TIMEOUT
*/
String ROCKET_MQ_MSG_TIMEOUT = SERVER_PREFIX + "rocketmqMsgTimeout";
}
Original file line number Diff line number Diff line change
Expand Up @@ -312,4 +312,6 @@ public interface DefaultValues {
* Default druid location in classpath
*/
String DRUID_LOCATION = "lib/sqlparser/druid.jar";

int DEFAULT_ROCKET_MQ_MSG_TIMEOUT = 60 * 1000;
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ private RootContext() {
*/
public static final String KEY_XID = "TX_XID";

public static final String KEY_BRANCHID = "TX_BRANCHID";

/**
* The constant HIDDEN_KEY_XID for sofa-rpc integration.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,13 @@ public interface ResourceManager extends ResourceManagerInbound, ResourceManager
* @return The BranchType of ResourceManager.
*/
BranchType getBranchType();

/**
* Get the GlobalStatus.
*
* @param branchType The BranchType of ResourceManager.
* @param xid The xid of transaction.
* @return The GlobalStatus of transaction.
*/
GlobalStatus getGlobalStatus(BranchType branchType, String xid);
}
7 changes: 7 additions & 0 deletions dependencies/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@
<!-- for jdbc driver when package -->
<mysql5.version>${mysql.version}</mysql5.version>
<mysql8.version>8.0.27</mysql8.version>
<!-- rocketmq -->
<rocketmq-version>5.0.0</rocketmq-version>

<!-- # for kotlin -->
<kotlin.version>1.4.32</kotlin.version>
Expand Down Expand Up @@ -781,6 +783,11 @@
<artifactId>janino</artifactId>
<version>${janino-version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq-version}</version>
</dependency>

<!-- web -->
<dependency>
Expand Down
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
<module>integration/brpc</module>
<module>rm</module>
<module>rm-datasource</module>
<module>rocketmq</module>
<module>spring</module>
<module>tcc</module>
<module>test</module>
Expand Down
15 changes: 15 additions & 0 deletions rm/src/main/java/org/apache/seata/rm/AbstractResourceManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,16 @@
import org.apache.seata.core.exception.TransactionExceptionCode;
import org.apache.seata.core.model.BranchStatus;
import org.apache.seata.core.model.BranchType;
import org.apache.seata.core.model.GlobalStatus;
import org.apache.seata.core.model.Resource;
import org.apache.seata.core.model.ResourceManager;
import org.apache.seata.core.protocol.ResultCode;
import org.apache.seata.core.protocol.transaction.BranchRegisterRequest;
import org.apache.seata.core.protocol.transaction.BranchRegisterResponse;
import org.apache.seata.core.protocol.transaction.BranchReportRequest;
import org.apache.seata.core.protocol.transaction.BranchReportResponse;
import org.apache.seata.core.protocol.transaction.GlobalStatusRequest;
import org.apache.seata.core.protocol.transaction.GlobalStatusResponse;
import org.apache.seata.core.rpc.netty.RmNettyRemotingClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -140,4 +143,16 @@ public void unregisterResource(Resource resource) {
public void registerResource(Resource resource) {
RmNettyRemotingClient.getInstance().registerResource(resource.getResourceGroupId(), resource.getResourceId());
}

@Override
public GlobalStatus getGlobalStatus(BranchType branchType, String xid) {
GlobalStatusRequest queryGlobalStatus = new GlobalStatusRequest();
queryGlobalStatus.setXid(xid);
try {
GlobalStatusResponse response = (GlobalStatusResponse) RmNettyRemotingClient.getInstance().sendSyncRequest(queryGlobalStatus);
return response.getGlobalStatus();
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.seata.core.exception.TransactionException;
import org.apache.seata.core.model.BranchStatus;
import org.apache.seata.core.model.BranchType;
import org.apache.seata.core.model.GlobalStatus;
import org.apache.seata.core.model.Resource;
import org.apache.seata.core.model.ResourceManager;

Expand Down Expand Up @@ -150,6 +151,11 @@ public BranchType getBranchType() {
throw new FrameworkException("DefaultResourceManager isn't a real ResourceManager");
}

@Override
public GlobalStatus getGlobalStatus(BranchType branchType, String xid) {
return getResourceManager(branchType).getGlobalStatus(branchType, xid);
}

private static class SingletonHolder {
private static DefaultResourceManager INSTANCE = new DefaultResourceManager();
}
Expand Down
49 changes: 49 additions & 0 deletions rocketmq/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--

Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.seata</groupId>
<artifactId>seata-parent</artifactId>
<version>${revision}</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>seata-rocketmq</artifactId>
<packaging>jar</packaging>
<name>seata-rocketmq ${project.version}</name>
<description>rocketmq integration for Seata built with Maven</description>

<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>seata-tcc</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>


</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.integration.rocketmq;

import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.seata.common.util.StringUtils;
import org.apache.seata.core.context.RootContext;
import org.apache.seata.core.model.GlobalStatus;
import org.apache.seata.rm.DefaultResourceManager;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.List;

/**
* Seata MQ Producer
**/
public class SeataMQProducer extends TransactionMQProducer {

private static final Logger LOGGER = LoggerFactory.getLogger(SeataMQProducer.class);

private static final List<GlobalStatus> COMMIT_STATUSES = Arrays.asList(GlobalStatus.Committed, GlobalStatus.Committing, GlobalStatus.CommitRetrying);
private static final List<GlobalStatus> ROLLBACK_STATUSES = Arrays.asList(GlobalStatus.Rollbacked, GlobalStatus.Rollbacking, GlobalStatus.RollbackRetrying);

public static String PROPERTY_SEATA_XID = RootContext.KEY_XID;
public static String PROPERTY_SEATA_BRANCHID = RootContext.KEY_BRANCHID;
private TransactionListener transactionListener;

private TCCRocketMQ tccRocketMQ;

SeataMQProducer(final String producerGroup) {
this(null, producerGroup, null);
}

SeataMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) {
super(namespace, producerGroup, rpcHook);
this.transactionListener = new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
return LocalTransactionState.UNKNOW;
}

@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String xid = msg.getProperty(PROPERTY_SEATA_XID);
if (StringUtils.isBlank(xid)) {
LOGGER.error("msg has no xid, msgTransactionId: {}, msg will be rollback", msg.getTransactionId());
return LocalTransactionState.ROLLBACK_MESSAGE;
}
GlobalStatus globalStatus = DefaultResourceManager.get().getGlobalStatus(SeataMQProducerFactory.ROCKET_BRANCH_TYPE, xid);
if (COMMIT_STATUSES.contains(globalStatus)) {
return LocalTransactionState.COMMIT_MESSAGE;
} else if (ROLLBACK_STATUSES.contains(globalStatus) || GlobalStatus.isOnePhaseTimeout(globalStatus)) {
return LocalTransactionState.ROLLBACK_MESSAGE;
} else if (GlobalStatus.Finished.equals(globalStatus)) {
LOGGER.error("global transaction finished, msg will be rollback, xid: {}", xid);
return LocalTransactionState.ROLLBACK_MESSAGE;
}
return LocalTransactionState.UNKNOW;
}
};
}

@Override
public SendResult send(Message msg) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
Bughue marked this conversation as resolved.
Show resolved Hide resolved
return send(msg, this.getSendMsgTimeout());
}

@Override
public SendResult send(Message msg, long timeout) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
if (RootContext.inGlobalTransaction()) {
if (tccRocketMQ == null) {
throw new RuntimeException("TCCRocketMQ is not initialized");
}
return tccRocketMQ.prepare(msg, timeout);
} else {
return super.send(msg, timeout);
}
}

public SendResult doSendMessageInTransaction(final Message msg, long timeout, String xid, long branchId) throws MQClientException {
msg.setTopic(withNamespace(msg.getTopic()));
if (msg.getDelayTimeLevel() != 0) {
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
}
Validators.checkMessage(msg, this);

SendResult sendResult = null;
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.getProducerGroup());
MessageAccessor.putProperty(msg, PROPERTY_SEATA_XID, xid);
MessageAccessor.putProperty(msg, PROPERTY_SEATA_BRANCHID, String.valueOf(branchId));
try {
sendResult = super.send(msg, timeout);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}

if (SendStatus.SEND_OK != sendResult.getSendStatus()) {
funky-eyes marked this conversation as resolved.
Show resolved Hide resolved
throw new RuntimeException("Message send fail.status=" + sendResult.getSendStatus());
}
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
return sendResult;
}


@Override
public TransactionListener getTransactionListener() {
return transactionListener;
}

public void setTccRocketMQ(TCCRocketMQ tccRocketMQ) {
this.tccRocketMQ = tccRocketMQ;
}
}
Loading
Loading