diff --git a/examples/cluster-example/pom.xml b/examples/cluster-example/pom.xml index a17005de..aa3709c3 100644 --- a/examples/cluster-example/pom.xml +++ b/examples/cluster-example/pom.xml @@ -48,6 +48,10 @@ group.idealworld.dew cluster-spi-redis + + group.idealworld.dew + cluster-spi-rocket + + + + 4.0.0 + + group.idealworld.dew + parent-starter + 3.0.0-Beta3 + ../parent-starter + + + cluster-spi-rocket + 1.1.4 Dew Cluster Rocket + Dew 集群 Rocket 实现 + jar + + + + + + + group.idealworld.dew + cluster-common + + + org.apache.rocketmq + rocketmq-spring-boot-starter + ${rocket.version} + + + group.idealworld.dew + cluster-common-test + + + group.idealworld.dew + test-starter + + + org.springframework.boot + spring-boot-autoconfigure + + + group.idealworld.dew + test-starter + + + + diff --git a/framework/modules/cluster-rocket/src/main/java/group/idealworld/dew/core/cluster/spi/rocket/ReceiveBeforeFun.java b/framework/modules/cluster-rocket/src/main/java/group/idealworld/dew/core/cluster/spi/rocket/ReceiveBeforeFun.java new file mode 100644 index 00000000..01b6a22b --- /dev/null +++ b/framework/modules/cluster-rocket/src/main/java/group/idealworld/dew/core/cluster/spi/rocket/ReceiveBeforeFun.java @@ -0,0 +1,28 @@ +/* + * Copyright 2020. the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package group.idealworld.dew.core.cluster.spi.rocket; + + +import java.util.Map; + +@FunctionalInterface +public interface ReceiveBeforeFun { + + + Object invoke(String topic, Map properties); + +} diff --git a/framework/modules/cluster-rocket/src/main/java/group/idealworld/dew/core/cluster/spi/rocket/ReceiveErrorFun.java b/framework/modules/cluster-rocket/src/main/java/group/idealworld/dew/core/cluster/spi/rocket/ReceiveErrorFun.java new file mode 100644 index 00000000..2961bd6b --- /dev/null +++ b/framework/modules/cluster-rocket/src/main/java/group/idealworld/dew/core/cluster/spi/rocket/ReceiveErrorFun.java @@ -0,0 +1,31 @@ +/* + * Copyright 2020. the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package group.idealworld.dew.core.cluster.spi.rocket; + + +@FunctionalInterface +public interface ReceiveErrorFun { + + /** + * Invoke. + * + * @param ex the ex + * @param beforeResult the before result + */ + void invoke(Exception ex, Object beforeResult); + +} diff --git a/framework/modules/cluster-rocket/src/main/java/group/idealworld/dew/core/cluster/spi/rocket/ReceiveFinishFun.java b/framework/modules/cluster-rocket/src/main/java/group/idealworld/dew/core/cluster/spi/rocket/ReceiveFinishFun.java new file mode 100644 index 00000000..c8cc4e13 --- /dev/null +++ b/framework/modules/cluster-rocket/src/main/java/group/idealworld/dew/core/cluster/spi/rocket/ReceiveFinishFun.java @@ -0,0 +1,30 @@ +/* + * Copyright 2020. the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package group.idealworld.dew.core.cluster.spi.rocket; + + +@FunctionalInterface +public interface ReceiveFinishFun { + + /** + * Invoke. + * + * @param beforeResult the before result + */ + void invoke(Object beforeResult); + +} diff --git a/framework/modules/cluster-rocket/src/main/java/group/idealworld/dew/core/cluster/spi/rocket/RocketAdapter.java b/framework/modules/cluster-rocket/src/main/java/group/idealworld/dew/core/cluster/spi/rocket/RocketAdapter.java new file mode 100644 index 00000000..12f93079 --- /dev/null +++ b/framework/modules/cluster-rocket/src/main/java/group/idealworld/dew/core/cluster/spi/rocket/RocketAdapter.java @@ -0,0 +1,52 @@ +/* + * Copyright 2020. the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package group.idealworld.dew.core.cluster.spi.rocket; + +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.spring.core.RocketMQTemplate; + +/** + * Rocket adapter. + * + * @author nipeixuan + */ +public class RocketAdapter { + + private RocketMQTemplate rocketMQTemplate; + + /** + * Instantiates a new Rabbit adapter. + * + * @param rocketMQTemplate the rabbit template + */ + public RocketAdapter(RocketMQTemplate rocketMQTemplate) { + this.rocketMQTemplate = rocketMQTemplate; + } + + DefaultMQProducer getProducer(){ + return rocketMQTemplate.getProducer(); + } + + RocketMQTemplate getRocketMQTemplate(){ + return this.rocketMQTemplate; + } + + + + + +} diff --git a/framework/modules/cluster-rocket/src/main/java/group/idealworld/dew/core/cluster/spi/rocket/RocketAutoConfiguration.java b/framework/modules/cluster-rocket/src/main/java/group/idealworld/dew/core/cluster/spi/rocket/RocketAutoConfiguration.java new file mode 100644 index 00000000..0f1cfbfe --- /dev/null +++ b/framework/modules/cluster-rocket/src/main/java/group/idealworld/dew/core/cluster/spi/rocket/RocketAutoConfiguration.java @@ -0,0 +1,78 @@ +/* + * Copyright 2020. the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package group.idealworld.dew.core.cluster.spi.rocket; + +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import javax.annotation.PostConstruct; +import javax.validation.Valid; + +/** + * rocket auto configuration. + * + * @author nipeixuan + */ +@Configuration +@ConditionalOnClass(RocketMQTemplate.class) +@ConditionalOnExpression("#{'${dew.cluster.mq}'=='rocket'}") +public class RocketAutoConfiguration { + + private static final Logger logger = LoggerFactory.getLogger(RocketAutoConfiguration.class); + + @Value("${rocketmq.producer.group}") + private String groupName; + + @Value("${rocketmq.name-server}") + private String nameServer; + + + @PostConstruct + public void init() { + logger.info("Load Auto Configuration : {}", this.getClass().getName()); + } + + /** + * Rabbit adapter. + * + * @param rocketMQTemplate the rocket template + * @return the rocket adapter + */ + @Bean + public RocketAdapter rocketAdapter(RocketMQTemplate rocketMQTemplate) { + return new RocketAdapter(rocketMQTemplate); + } + + /** + * Rocket cluster mq. + * + * @param rocketAdapter the rocket adapter + * @return the rocket cluster mq + */ + @Bean + @ConditionalOnExpression("'${dew.cluster.mq}'=='rocket'") + public RocketClusterMQ rocketClusterMQ(RocketAdapter rocketAdapter) { + return new RocketClusterMQ(rocketAdapter, nameServer, groupName); + } + +} diff --git a/framework/modules/cluster-rocket/src/main/java/group/idealworld/dew/core/cluster/spi/rocket/RocketClusterMQ.java b/framework/modules/cluster-rocket/src/main/java/group/idealworld/dew/core/cluster/spi/rocket/RocketClusterMQ.java new file mode 100644 index 00000000..195e1751 --- /dev/null +++ b/framework/modules/cluster-rocket/src/main/java/group/idealworld/dew/core/cluster/spi/rocket/RocketClusterMQ.java @@ -0,0 +1,212 @@ +package group.idealworld.dew.core.cluster.spi.rocket; + +import com.ecfront.dew.common.exception.RTUnsupportedEncodingException; +import group.idealworld.dew.core.cluster.AbsClusterMQ; +import group.idealworld.dew.core.cluster.dto.MessageWrap; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.remoting.common.RemotingHelper; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; + +import java.util.Arrays; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +public class RocketClusterMQ extends AbsClusterMQ { + + private static SendBeforeFun sendBeforeFun = (topic, messageProperties) -> null; + private static SendErrorFun sendErrorFun = (ex, beforeResult) -> { + }; + private static SendFinishFun sendFinishFun = beforeResult -> { + }; + private static ReceiveBeforeFun receiveBeforeFun = (topic, messageProperties) -> null; + private static ReceiveErrorFun receiveErrorFun = (ex, beforeResult) -> { + }; + private static ReceiveFinishFun receiveFinishFun = beforeResult -> { + }; + + private RocketAdapter rocketAdapter; + + private String nameServer; + + private String groupName; + + public RocketClusterMQ(RocketAdapter rocketAdapter, String nameServer, String groupName) { + this.rocketAdapter = rocketAdapter; + this.nameServer = nameServer; + this.groupName = groupName; + } + + /** + * Sets send before fun. + * + * @param sendBeforeFun the send before fun + */ + public static void setSendBeforeFun(SendBeforeFun sendBeforeFun) { + RocketClusterMQ.sendBeforeFun = sendBeforeFun; + } + + /** + * Sets send error fun. + * + * @param sendErrorFun the send error fun + */ + public static void setSendErrorFun(SendErrorFun sendErrorFun) { + RocketClusterMQ.sendErrorFun = sendErrorFun; + } + + /** + * Sets send finish fun. + * + * @param sendFinishFun the send finish fun + */ + public static void setSendFinishFun(SendFinishFun sendFinishFun) { + RocketClusterMQ.sendFinishFun = sendFinishFun; + } + + /** + * Sets receive before fun. + * + * @param receiveBeforeFun the receive before fun + */ + public static void setReceiveBeforeFun(ReceiveBeforeFun receiveBeforeFun) { + RocketClusterMQ.receiveBeforeFun = receiveBeforeFun; + } + + /** + * Sets receive error fun. + * + * @param receiveErrorFun the receive error fun + */ + public static void setReceiveErrorFun(ReceiveErrorFun receiveErrorFun) { + RocketClusterMQ.receiveErrorFun = receiveErrorFun; + } + + /** + * Sets receive finish fun. + * + * @param receiveFinishFun the receive finish fun + */ + public static void setReceiveFinishFun(ReceiveFinishFun receiveFinishFun) { + RocketClusterMQ.receiveFinishFun = receiveFinishFun; + } + + @Override + protected boolean doPublish(String topic, String message, Optional> header, boolean confirm) { + RocketMQTemplate rocketMQTemplate = rocketAdapter.getRocketMQTemplate(); + Object funResult = null; + if (confirm) { + throw new RTUnsupportedEncodingException("Rocket doesn't support confirm mode"); + } + try { + Map sendHeader = getMQHeader(topic); + header.ifPresent(sendHeader::putAll); + Message msg = MessageBuilder.withPayload(message).copyHeaders(sendHeader) + .build(); + funResult = sendBeforeFun.invoke(topic, sendHeader); + rocketMQTemplate.syncSend(topic, msg); + return true; + } catch (Exception e) { + logger.error("[MQ] Rocket publish error.", e); + sendErrorFun.invoke(e, funResult); + return false; + }finally { + sendFinishFun.invoke(funResult); + } + } + + @Override + protected void doSubscribe(String topic, Consumer consumer) { + DefaultMQPushConsumer mqConsumer = new DefaultMQPushConsumer(groupName); + mqConsumer.setNamesrvAddr(nameServer); + mqConsumer.setInstanceName(UUID.randomUUID().toString()); + + try { + mqConsumer.subscribe(topic, "*"); + mqConsumer.setMessageModel(MessageModel.BROADCASTING); + mqConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); + receiveMessage(topic, mqConsumer, consumer); + mqConsumer.start(); + } catch (MQClientException e) { + logger.error("[MQ] Rocket response error.", e); + } + } + + @Override + protected boolean doRequest(String address, String message, Optional> header, boolean confirm) { + RocketMQTemplate rocketMQTemplate = rocketAdapter.getRocketMQTemplate(); + Object funResult = null; + if (confirm) { + throw new RTUnsupportedEncodingException("Rocket doesn't support confirm mode"); + } + try { + Map sendHeader = getMQHeader(address); + header.ifPresent(sendHeader::putAll); + funResult = sendBeforeFun.invoke(address, sendHeader); + Message msg = MessageBuilder.withPayload(message).copyHeaders(sendHeader) + .build(); + rocketMQTemplate.syncSend(address, msg); + return true; + } catch (Exception e) { + logger.error("[MQ] Rocket publish error.", e); + sendErrorFun.invoke(e, funResult); + return false; + }finally { + sendFinishFun.invoke(funResult); + } + } + + @Override + protected void doResponse(String address, Consumer consumer) { + DefaultMQPushConsumer mqConsumer = new DefaultMQPushConsumer(groupName); + mqConsumer.setNamesrvAddr(nameServer); + mqConsumer.setInstanceName(UUID.randomUUID().toString()); + try { + mqConsumer.subscribe(address, "*"); + mqConsumer.setMessageModel(MessageModel.CLUSTERING); + mqConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); + receiveMessage(address, mqConsumer, consumer); + mqConsumer.start(); + } catch (MQClientException e) { + logger.error("[MQ] Rocket response error.", e); + } + } + + private void receiveMessage(String topic, DefaultMQPushConsumer mqConsumer, Consumer consumer){ + mqConsumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> { + AtomicReference funResult = null; + try { + list.parallelStream().forEach((messageExt)->{ + Map headers = messageExt.getProperties() + .entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + Map receiveHeader = setMQHeader(topic, headers); + funResult.set(receiveBeforeFun.invoke(topic, receiveHeader)); + consumer.accept(new MessageWrap(topic, Optional.of(receiveHeader), Arrays.toString(messageExt.getBody()))); + }); + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } catch (Exception e) { + receiveErrorFun.invoke(e, funResult); + logger.error("[MQ] Rocket response error.", e); + return ConsumeConcurrentlyStatus.RECONSUME_LATER; + }finally { + receiveFinishFun.invoke(funResult); + } + }); + } + + @Override + public boolean supportHeader() { + return true; + } +} diff --git a/framework/modules/cluster-rocket/src/main/java/group/idealworld/dew/core/cluster/spi/rocket/SendBeforeFun.java b/framework/modules/cluster-rocket/src/main/java/group/idealworld/dew/core/cluster/spi/rocket/SendBeforeFun.java new file mode 100644 index 00000000..b1767dda --- /dev/null +++ b/framework/modules/cluster-rocket/src/main/java/group/idealworld/dew/core/cluster/spi/rocket/SendBeforeFun.java @@ -0,0 +1,28 @@ +/* + * Copyright 2020. the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package group.idealworld.dew.core.cluster.spi.rocket; + + +import java.util.Map; + +@FunctionalInterface +public interface SendBeforeFun { + + + Object invoke(String topic, Map properties); + +} diff --git a/framework/modules/cluster-rocket/src/main/java/group/idealworld/dew/core/cluster/spi/rocket/SendErrorFun.java b/framework/modules/cluster-rocket/src/main/java/group/idealworld/dew/core/cluster/spi/rocket/SendErrorFun.java new file mode 100644 index 00000000..995f5c0c --- /dev/null +++ b/framework/modules/cluster-rocket/src/main/java/group/idealworld/dew/core/cluster/spi/rocket/SendErrorFun.java @@ -0,0 +1,31 @@ +/* + * Copyright 2020. the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package group.idealworld.dew.core.cluster.spi.rocket; + + +@FunctionalInterface +public interface SendErrorFun { + + /** + * Invoke. + * + * @param ex the ex + * @param beforeResult the before result + */ + void invoke(Exception ex, Object beforeResult); + +} diff --git a/framework/modules/cluster-rocket/src/main/java/group/idealworld/dew/core/cluster/spi/rocket/SendFinishFun.java b/framework/modules/cluster-rocket/src/main/java/group/idealworld/dew/core/cluster/spi/rocket/SendFinishFun.java new file mode 100644 index 00000000..09b2d3f0 --- /dev/null +++ b/framework/modules/cluster-rocket/src/main/java/group/idealworld/dew/core/cluster/spi/rocket/SendFinishFun.java @@ -0,0 +1,30 @@ +/* + * Copyright 2020. the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package group.idealworld.dew.core.cluster.spi.rocket; + + +@FunctionalInterface +public interface SendFinishFun { + + /** + * Invoke. + * + * @param beforeResult the before result + */ + void invoke(Object beforeResult); + +} diff --git a/framework/modules/cluster-rocket/src/main/resources/META-INF/spring.factories b/framework/modules/cluster-rocket/src/main/resources/META-INF/spring.factories new file mode 100644 index 00000000..4540f7d5 --- /dev/null +++ b/framework/modules/cluster-rocket/src/main/resources/META-INF/spring.factories @@ -0,0 +1,2 @@ +org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ + group.idealworld.dew.core.cluster.spi.rocket.RocketAutoConfiguration diff --git a/framework/modules/cluster-rocket/src/test/java/group/idealworld/dew/core/cluster/ClusterTest.java b/framework/modules/cluster-rocket/src/test/java/group/idealworld/dew/core/cluster/ClusterTest.java new file mode 100644 index 00000000..80e8e713 --- /dev/null +++ b/framework/modules/cluster-rocket/src/test/java/group/idealworld/dew/core/cluster/ClusterTest.java @@ -0,0 +1,56 @@ +/* + * Copyright 2021. the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package group.idealworld.dew.core.cluster; + +import group.idealworld.dew.core.cluster.spi.rocket.RocketClusterMQ; +import group.idealworld.dew.core.cluster.test.ClusterMQTest; +import group.idealworld.dew.test.RocketMQExtension; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit.jupiter.SpringExtension; +import org.testcontainers.junit.jupiter.Testcontainers; + +/** + * Cluster test. + * + * @author nipeixuan + */ +@ExtendWith({SpringExtension.class, RocketMQExtension.class}) +@ContextConfiguration(initializers = RocketMQExtension.Initializer.class) +@SpringBootApplication +@SpringBootTest +@Testcontainers +public class ClusterTest { + + @Autowired + private RocketClusterMQ rocketClusterMQ; + + /** + * Test mq. + * + * @throws InterruptedException the interrupted exception + */ + @Test + public void testMQ() throws InterruptedException { + new ClusterMQTest().test(rocketClusterMQ); + } + +} diff --git a/framework/modules/cluster-rocket/src/test/java/resources/application.yml b/framework/modules/cluster-rocket/src/test/java/resources/application.yml new file mode 100644 index 00000000..290602c3 --- /dev/null +++ b/framework/modules/cluster-rocket/src/test/java/resources/application.yml @@ -0,0 +1,4 @@ +dew: + cluster: + mq: rocket + diff --git a/framework/modules/parent-starter/pom.xml b/framework/modules/parent-starter/pom.xml index 546a0764..93b80a17 100644 --- a/framework/modules/parent-starter/pom.xml +++ b/framework/modules/parent-starter/pom.xml @@ -67,6 +67,7 @@ 3.12.12 1.2.5 + 2.1.1 14.0.0 3.2.12 @@ -141,6 +142,11 @@ cluster-spi-rabbit ${dew.version} + + group.idealworld.dew + cluster-spi-rocket + ${dew.version} + group.idealworld.dew cluster-spi-hazelcast diff --git a/framework/modules/test-starter/src/main/java/group/idealworld/dew/test/RocketMQExtension.java b/framework/modules/test-starter/src/main/java/group/idealworld/dew/test/RocketMQExtension.java new file mode 100644 index 00000000..c6231fd0 --- /dev/null +++ b/framework/modules/test-starter/src/main/java/group/idealworld/dew/test/RocketMQExtension.java @@ -0,0 +1,57 @@ +/* + * Copyright 2021. the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package group.idealworld.dew.test; + +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.test.util.TestPropertyValues; +import org.springframework.context.ApplicationContextInitializer; +import org.springframework.context.ConfigurableApplicationContext; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy; + +import java.time.Duration; + +public class RocketMQExtension implements BeforeAllCallback { + + private static final Logger logger = LoggerFactory.getLogger(RocketMQExtension.class); + +// private static GenericContainer rocketmqContainer = new GenericContainer("apache/rocketmq") +// .withExposedPorts(10909); + + @Override + public void beforeAll(ExtensionContext extensionContext) { + //rocketmqContainer.start(); + //rocketmqContainer.waitingFor((new LogMessageWaitStrategy()).withRegEx("Ready to accept connections").withTimes(1)) + // .withStartupTimeout(Duration.ofSeconds(60L)); +// logger.info("Test Redis port: " + rocketmqContainer.getFirstMappedPort()); + } + + public static class Initializer + implements ApplicationContextInitializer { + public void initialize(ConfigurableApplicationContext configurableApplicationContext) { + TestPropertyValues.of( + "rocketmq.name-server=172.30.107.2:9876", + "rocketmq.producer.group=rocketmq-group", + "dew.cluster.mq=rocket" + ).applyTo(configurableApplicationContext.getEnvironment()); + } + } + +} diff --git a/framework/pom.xml b/framework/pom.xml index 16b527a6..e1bbd052 100644 --- a/framework/pom.xml +++ b/framework/pom.xml @@ -45,6 +45,7 @@ modules/idempotent-starter modules/test-starter assists/sdkgen-maven-plugin + modules/cluster-rocket