diff --git a/build.gradle b/build.gradle index 12a2b005e9..310f24eb67 100644 --- a/build.gradle +++ b/build.gradle @@ -305,6 +305,7 @@ subprojects { new File(projectDir, '../dist/apps').mkdirs() new File(projectDir, '../dist/conf').mkdirs() new File(projectDir, '../dist/lib').mkdirs() + new File(projectDir, '../dist/plugin/connector').mkdirs() } doLast { @@ -313,6 +314,7 @@ subprojects { from project.jar.getArchivePath() exclude 'eventmesh-common*.jar' exclude 'eventmesh-connector-api*.jar' + exclude 'eventmesh-connector-plugin*.jar' exclude 'eventmesh-starter*.jar' exclude 'eventmesh-test*.jar' exclude 'eventmesh-sdk*.jar' @@ -335,6 +337,11 @@ subprojects { exclude 'commons-collections-3.2.2.jar' } + copy { + into '../dist/plugin/connector' + from '../eventmesh-connector-plugin/dist/apps' + } + copy { into '../dist/bin' from '../eventmesh-runtime/bin' diff --git a/eventmesh-connector-plugin/build.gradle b/eventmesh-connector-plugin/build.gradle new file mode 100644 index 0000000000..d973dcedae --- /dev/null +++ b/eventmesh-connector-plugin/build.gradle @@ -0,0 +1,16 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ \ No newline at end of file diff --git a/eventmesh-connector-rocketmq/build.gradle b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/build.gradle similarity index 100% rename from eventmesh-connector-rocketmq/build.gradle rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/build.gradle diff --git a/eventmesh-connector-rocketmq/gradle.properties b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/gradle.properties similarity index 100% rename from eventmesh-connector-rocketmq/gradle.properties rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/gradle.properties diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/MessagingAccessPointImpl.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/MessagingAccessPointImpl.java similarity index 100% rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/MessagingAccessPointImpl.java rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/MessagingAccessPointImpl.java diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/common/Constants.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/common/Constants.java similarity index 100% rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/common/Constants.java rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/common/Constants.java diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/common/EventMeshConstants.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/common/EventMeshConstants.java similarity index 100% rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/common/EventMeshConstants.java rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/common/EventMeshConstants.java diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfig.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfig.java similarity index 100% rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfig.java rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfig.java diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfiguration.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfiguration.java similarity index 100% rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfiguration.java rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfiguration.java diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ConfigurationWrapper.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ConfigurationWrapper.java similarity index 99% rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ConfigurationWrapper.java rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ConfigurationWrapper.java index b1c2732de4..9334b5fc98 100644 --- a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ConfigurationWrapper.java +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ConfigurationWrapper.java @@ -73,4 +73,4 @@ private void load() { public String getProp(String key) { return StringUtils.isEmpty(key) ? null : properties.getProperty(key, null); } -} \ No newline at end of file +} diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/PushConsumerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/PushConsumerImpl.java similarity index 100% rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/PushConsumerImpl.java rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/PushConsumerImpl.java diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/RocketMQConsumerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/RocketMQConsumerImpl.java similarity index 100% rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/RocketMQConsumerImpl.java rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/RocketMQConsumerImpl.java diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/domain/ConsumeRequest.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/domain/ConsumeRequest.java similarity index 100% rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/domain/ConsumeRequest.java rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/domain/ConsumeRequest.java diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/domain/NonStandardKeys.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/domain/NonStandardKeys.java similarity index 100% rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/domain/NonStandardKeys.java rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/domain/NonStandardKeys.java diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/domain/RocketMQConstants.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/domain/RocketMQConstants.java similarity index 100% rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/domain/RocketMQConstants.java rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/domain/RocketMQConstants.java diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/patch/EventMeshConsumeConcurrentlyContext.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/patch/EventMeshConsumeConcurrentlyContext.java similarity index 100% rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/patch/EventMeshConsumeConcurrentlyContext.java rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/patch/EventMeshConsumeConcurrentlyContext.java diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/patch/EventMeshConsumeConcurrentlyStatus.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/patch/EventMeshConsumeConcurrentlyStatus.java similarity index 100% rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/patch/EventMeshConsumeConcurrentlyStatus.java rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/patch/EventMeshConsumeConcurrentlyStatus.java diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/patch/EventMeshMessageListenerConcurrently.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/patch/EventMeshMessageListenerConcurrently.java similarity index 100% rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/patch/EventMeshMessageListenerConcurrently.java rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/patch/EventMeshMessageListenerConcurrently.java diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/AbstractOMSProducer.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/AbstractOMSProducer.java similarity index 100% rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/AbstractOMSProducer.java rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/AbstractOMSProducer.java diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/ProducerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/ProducerImpl.java similarity index 100% rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/ProducerImpl.java rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/ProducerImpl.java diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/RocketMQProducerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/RocketMQProducerImpl.java similarity index 100% rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/RocketMQProducerImpl.java rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/RocketMQProducerImpl.java diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/promise/DefaultPromise.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/promise/DefaultPromise.java similarity index 100% rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/promise/DefaultPromise.java rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/promise/DefaultPromise.java diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/promise/FutureState.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/promise/FutureState.java similarity index 100% rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/promise/FutureState.java rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/promise/FutureState.java diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/BeanUtils.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/BeanUtils.java similarity index 100% rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/BeanUtils.java rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/BeanUtils.java diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/OMSUtil.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/OMSUtil.java similarity index 100% rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/OMSUtil.java rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/OMSUtil.java diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java similarity index 100% rename from eventmesh-connector-rocketmq/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java diff --git a/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.consumer.MeshMQPushConsumer b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.consumer.MeshMQPushConsumer similarity index 100% rename from eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.consumer.MeshMQPushConsumer rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.consumer.MeshMQPushConsumer diff --git a/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.producer.MeshMQProducer b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.producer.MeshMQProducer similarity index 100% rename from eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.producer.MeshMQProducer rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.producer.MeshMQProducer diff --git a/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/consumer/PushConsumerImplTest.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/consumer/PushConsumerImplTest.java similarity index 99% rename from eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/consumer/PushConsumerImplTest.java rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/consumer/PushConsumerImplTest.java index 114053520e..86a5ffda22 100644 --- a/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/consumer/PushConsumerImplTest.java +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/consumer/PushConsumerImplTest.java @@ -23,7 +23,6 @@ import java.util.Collections; import java.util.Properties; -import io.openmessaging.api.Action; import io.openmessaging.api.AsyncConsumeContext; import io.openmessaging.api.AsyncMessageListener; import io.openmessaging.api.Consumer; diff --git a/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/producer/ProducerImplTest.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/producer/ProducerImplTest.java similarity index 100% rename from eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/producer/ProducerImplTest.java rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/producer/ProducerImplTest.java diff --git a/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/promise/DefaultPromiseTest.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/promise/DefaultPromiseTest.java similarity index 100% rename from eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/promise/DefaultPromiseTest.java rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/promise/DefaultPromiseTest.java diff --git a/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/utils/BeanUtilsTest.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/utils/BeanUtilsTest.java similarity index 100% rename from eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/utils/BeanUtilsTest.java rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/utils/BeanUtilsTest.java diff --git a/eventmesh-connector-rocketmq/src/test/resources/META-INF/services/org.apache.io.openmessaging.MessagingAccessPoint b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/resources/META-INF/services/org.apache.io.openmessaging.MessagingAccessPoint similarity index 100% rename from eventmesh-connector-rocketmq/src/test/resources/META-INF/services/org.apache.io.openmessaging.MessagingAccessPoint rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/resources/META-INF/services/org.apache.io.openmessaging.MessagingAccessPoint diff --git a/eventmesh-connector-rocketmq/src/test/resources/META-INF/services/org.apache.io.openmessaging.producer.Producer b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/resources/META-INF/services/org.apache.io.openmessaging.producer.Producer similarity index 100% rename from eventmesh-connector-rocketmq/src/test/resources/META-INF/services/org.apache.io.openmessaging.producer.Producer rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/resources/META-INF/services/org.apache.io.openmessaging.producer.Producer diff --git a/eventmesh-connector-rocketmq/src/main/resources/META-INF/services/org.apache.eventmesh.api.consumer.MeshMQPushConsumer b/eventmesh-connector-plugin/gradle.properties similarity index 89% rename from eventmesh-connector-rocketmq/src/main/resources/META-INF/services/org.apache.eventmesh.api.consumer.MeshMQPushConsumer rename to eventmesh-connector-plugin/gradle.properties index c98880a841..9d1744e07a 100644 --- a/eventmesh-connector-rocketmq/src/main/resources/META-INF/services/org.apache.eventmesh.api.consumer.MeshMQPushConsumer +++ b/eventmesh-connector-plugin/gradle.properties @@ -1,3 +1,4 @@ +# # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. @@ -12,5 +13,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -org.apache.eventmesh.connector.rocketmq.consumer.RocketMQConsumerImpl \ No newline at end of file +# +group=org.apache.eventmesh +version=1.2.0-SNAPSHOT +jdk=1.8 +mavenUserName= +mavenPassword= \ No newline at end of file diff --git a/eventmesh-connector-rocketmq/src/main/resources/META-INF/services/org.apache.eventmesh.api.producer.MeshMQProducer b/eventmesh-connector-rocketmq/src/main/resources/META-INF/services/org.apache.eventmesh.api.producer.MeshMQProducer deleted file mode 100644 index 28907ca176..0000000000 --- a/eventmesh-connector-rocketmq/src/main/resources/META-INF/services/org.apache.eventmesh.api.producer.MeshMQProducer +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -org.apache.eventmesh.connector.rocketmq.producer.RocketMQProducerImpl \ No newline at end of file diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQConsumerWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQConsumerWrapper.java index b09f9eda11..0b020a4612 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQConsumerWrapper.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQConsumerWrapper.java @@ -19,7 +19,6 @@ import java.util.List; import java.util.Properties; -import java.util.ServiceLoader; import io.openmessaging.api.AsyncMessageListener; import io.openmessaging.api.Message; @@ -52,24 +51,11 @@ public void unsubscribe(String topic) throws Exception { } public synchronized void init(Properties keyValue) throws Exception { - meshMQPushConsumer = getMeshMQPushConsumer(); - if (meshMQPushConsumer == null) { - logger.error("can't load the meshMQPushConsumer plugin, please check."); - throw new RuntimeException("doesn't load the meshMQPushConsumer plugin, please check."); - } meshMQPushConsumer.init(keyValue); inited.compareAndSet(false, true); } - private MeshMQPushConsumer getMeshMQPushConsumer() { - ServiceLoader meshMQPushConsumerServiceLoader = ServiceLoader.load(MeshMQPushConsumer.class); - if (meshMQPushConsumerServiceLoader.iterator().hasNext()) { - return meshMQPushConsumerServiceLoader.iterator().next(); - } - return null; - } - public synchronized void start() throws Exception { meshMQPushConsumer.start(); started.compareAndSet(false, true); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQProducerWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQProducerWrapper.java index 2e55fbfd78..7da9d1da39 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQProducerWrapper.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQProducerWrapper.java @@ -18,7 +18,6 @@ package org.apache.eventmesh.runtime.core.plugin; import java.util.Properties; -import java.util.ServiceLoader; import io.openmessaging.api.Message; import io.openmessaging.api.SendCallback; @@ -47,24 +46,10 @@ public synchronized void init(Properties keyValue) throws Exception { return; } - meshMQProducer = getSpiMeshMQProducer(); - if (meshMQProducer == null) { - logger.error("can't load the meshMQProducer plugin, please check."); - throw new RuntimeException("doesn't load the meshMQProducer plugin, please check."); - } - meshMQProducer.init(keyValue); inited.compareAndSet(false, true); } - private MeshMQProducer getSpiMeshMQProducer() { - ServiceLoader meshMQProducerServiceLoader = ServiceLoader.load(MeshMQProducer.class); - if (meshMQProducerServiceLoader.iterator().hasNext()) { - return meshMQProducerServiceLoader.iterator().next(); - } - return null; - } - public synchronized void start() throws Exception { if (started.get()) { return; diff --git a/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionFactory.java b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionFactory.java index 6aea9db11d..04426d5fde 100644 --- a/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionFactory.java +++ b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionFactory.java @@ -18,11 +18,43 @@ package org.apache.eventmesh.spi; import org.apache.commons.lang3.StringUtils; +import org.apache.eventmesh.spi.loader.ExtensionClassLoader; +import org.apache.eventmesh.spi.loader.JarExtensionClassLoader; +import org.apache.eventmesh.spi.loader.MetaInfExtensionClassLoader; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * The extension fetching factory, all extension plugins should be fetched by this factory. + * And all the extension plugins defined in eventmesh should have {@link EventMeshSPI} annotation. + */ public enum EventMeshExtensionFactory { ; + private static final Logger logger = LoggerFactory.getLogger(EventMeshExtensionFactory.class); + + private static final List extensionClassLoaders = new ArrayList<>(); + + static { + extensionClassLoaders.add(new MetaInfExtensionClassLoader()); + extensionClassLoaders.add(new JarExtensionClassLoader()); + } + + private static final ConcurrentHashMap EXTENSION_INSTANCE_CACHE = + new ConcurrentHashMap<>(16); + + /** + * @param extensionType extension plugin class type + * @param extensionName extension instance name + * @param the type of the plugin + * @return plugin instance + */ + @SuppressWarnings("unchecked") public static T getExtension(Class extensionType, String extensionName) { if (extensionType == null) { throw new ExtensionException("extensionType is null"); @@ -33,6 +65,29 @@ public static T getExtension(Class extensionType, String extensionName) { if (!extensionType.isInterface() || !extensionType.isAnnotationPresent(EventMeshSPI.class)) { throw new ExtensionException(String.format("extensionType:%s is invalided", extensionType)); } - return EventMeshExtensionLoader.getExtension(extensionType, extensionName); + if (!EXTENSION_INSTANCE_CACHE.containsKey(extensionName)) { + synchronized (EventMeshExtensionFactory.class) { + initializeExtension(extensionType, extensionName); + } + } + return (T) EXTENSION_INSTANCE_CACHE.get(extensionName); + } + + private static void initializeExtension(Class extensionType, String extensionName) { + for (ExtensionClassLoader extensionClassLoader : extensionClassLoaders) { + Map> extensionClassMap = extensionClassLoader.loadExtensionClass(extensionType); + Class instanceClass = extensionClassMap.get(extensionName); + if (instanceClass != null) { + try { + Object extensionObj = instanceClass.newInstance(); + logger.info("initialize extension instance success, extensionType: {}, extensionName: {}", + extensionType, extensionName); + EXTENSION_INSTANCE_CACHE.put(extensionName, extensionObj); + } catch (InstantiationException | IllegalAccessException e) { + throw new ExtensionException("Extension initialize error", e); + } + } + } } + } diff --git a/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionLoader.java b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionLoader.java deleted file mode 100644 index 89696e04da..0000000000 --- a/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionLoader.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eventmesh.spi; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.InputStream; -import java.net.URL; -import java.util.Enumeration; -import java.util.Properties; -import java.util.concurrent.ConcurrentHashMap; - -public enum EventMeshExtensionLoader { - ; - - private static final Logger logger = LoggerFactory.getLogger(EventMeshExtensionLoader.class); - - private static final ConcurrentHashMap, ConcurrentHashMap>> EXTENSION_CLASS_LOAD_CACHE = new ConcurrentHashMap<>(16); - - private static final ConcurrentHashMap EXTENSION_INSTANCE_CACHE = new ConcurrentHashMap<>(16); - - private static final String EVENTMESH_EXTENSION_DIR = "META-INF/eventmesh/"; - - @SuppressWarnings("unchecked") - public static T getExtension(Class extensionType, String extensionName) { - if (!hasLoadExtensionClass(extensionType)) { - loadExtensionClass(extensionType); - } - if (!hasInitializeExtension(extensionName)) { - initializeExtension(extensionType, extensionName); - } - return (T) EXTENSION_INSTANCE_CACHE.get(extensionName); - } - - private static void initializeExtension(Class extensionType, String extensionName) { - ConcurrentHashMap> extensionClassMap = EXTENSION_CLASS_LOAD_CACHE.get(extensionType); - if (extensionClassMap == null) { - throw new ExtensionException(String.format("Extension type:%s has not been loaded", extensionType)); - } - if (!extensionClassMap.containsKey(extensionName)) { - throw new ExtensionException(String.format("Extension name:%s has not been loaded", extensionName)); - } - Class aClass = extensionClassMap.get(extensionName); - try { - Object extensionObj = aClass.newInstance(); - logger.info("initialize extension instance success, extensionType: {}, extensionName: {}", extensionType, extensionName); - EXTENSION_INSTANCE_CACHE.put(extensionName, extensionObj); - } catch (InstantiationException | IllegalAccessException e) { - throw new ExtensionException("Extension initialize error", e); - } - } - - public static void loadExtensionClass(Class extensionType) { - String extensionFileName = EVENTMESH_EXTENSION_DIR + extensionType.getName(); - ClassLoader classLoader = EventMeshExtensionLoader.class.getClassLoader(); - try { - Enumeration extensionUrls = classLoader.getResources(extensionFileName); - if (extensionUrls != null) { - while (extensionUrls.hasMoreElements()) { - URL url = extensionUrls.nextElement(); - loadResources(url, extensionType); - } - } - } catch (IOException e) { - throw new ExtensionException("load extension class error", e); - } - - - } - - private static void loadResources(URL url, Class extensionType) throws IOException { - try (InputStream inputStream = url.openStream()) { - Properties properties = new Properties(); - properties.load(inputStream); - properties.forEach((extensionName, extensionClass) -> { - String extensionNameStr = (String) extensionName; - String extensionClassStr = (String) extensionClass; - try { - Class targetClass = Class.forName(extensionClassStr); - logger.info("load extension class success, extensionType: {}, extensionClass: {}", extensionType, targetClass); - if (!extensionType.isAssignableFrom(targetClass)) { - throw new ExtensionException( - String.format("class: %s is not subClass of %s", targetClass, extensionType)); - } - EXTENSION_CLASS_LOAD_CACHE.computeIfAbsent(extensionType, k -> new ConcurrentHashMap<>()) - .put(extensionNameStr, targetClass); - } catch (ClassNotFoundException e) { - throw new ExtensionException("load extension class error", e); - } - }); - } - } - - private static boolean hasLoadExtensionClass(Class extensionType) { - return EXTENSION_CLASS_LOAD_CACHE.containsKey(extensionType); - } - - private static boolean hasInitializeExtension(String extensionName) { - return EXTENSION_INSTANCE_CACHE.containsKey(extensionName); - } -} diff --git a/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/loader/ExtensionClassLoader.java b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/loader/ExtensionClassLoader.java new file mode 100644 index 0000000000..feca359ee4 --- /dev/null +++ b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/loader/ExtensionClassLoader.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.spi.loader; + +import java.util.Map; + +/** + * Load extension class + *
    + *
  • {@link MetaInfExtensionClassLoader}
  • + *
  • {@link JarExtensionClassLoader}
  • + *
+ */ +public interface ExtensionClassLoader { + + /** + * load + * + * @param extensionType extension type class + * @param extension type + * @return extension instance name to extension instance class + */ + Map> loadExtensionClass(Class extensionType); +} diff --git a/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/loader/JarExtensionClassLoader.java b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/loader/JarExtensionClassLoader.java new file mode 100644 index 0000000000..a1d1fa955f --- /dev/null +++ b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/loader/JarExtensionClassLoader.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.spi.loader; + +import com.google.common.collect.Lists; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.eventmesh.spi.ExtensionException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Load extension from '${pluginConf}/plugin' + */ +public class JarExtensionClassLoader implements ExtensionClassLoader { + + private static final Logger logger = LoggerFactory.getLogger(JarExtensionClassLoader.class); + + private static final ConcurrentHashMap, Map>> EXTENSION_CLASS_CACHE = + new ConcurrentHashMap<>(16); + + private static final String EVENTMESH_EXTENSION_PLUGIN_DIR = System.getProperty("eventMeshPluginDir", + "../plugin/connector"); + + private static final String EVENTMESH_EXTENSION_META_DIR = "META-INF/eventmesh/"; + + @Override + public Map> loadExtensionClass(Class extensionType) { + return EXTENSION_CLASS_CACHE.computeIfAbsent(extensionType, this::doLoadExtensionClass); + } + + private Map> doLoadExtensionClass(Class extensionType) { + Map> extensionMap = new HashMap<>(); + + List pluginJarPaths = loadJarPathFromResource(EVENTMESH_EXTENSION_PLUGIN_DIR); + if (CollectionUtils.isEmpty(pluginJarPaths)) { + return extensionMap; + } + + String extensionFileName = EVENTMESH_EXTENSION_META_DIR + extensionType.getName(); + URLClassLoader urlClassLoader = URLClassLoader.newInstance(pluginJarPaths.toArray(new URL[0])); + try { + Enumeration extensionUrls = urlClassLoader.getResources(extensionFileName); + if (extensionUrls != null) { + while (extensionUrls.hasMoreElements()) { + URL url = extensionUrls.nextElement(); + extensionMap.putAll(loadResources(urlClassLoader, url, extensionType)); + } + } + } catch (IOException e) { + throw new ExtensionException("load extension class error", e); + } + return extensionMap; + } + + private List loadJarPathFromResource(String pluginPath) { + File plugin = new File(pluginPath); + if (!plugin.exists()) { + logger.warn("plugin dir:{} is not exist", pluginPath); + return Lists.newArrayList(); + } + if (plugin.isFile() && plugin.getName().endsWith(".jar")) { + try { + return Lists.newArrayList(plugin.toURI().toURL()); + } catch (Exception e) { + throw new ExtensionException(e); + } + } + File[] files = plugin.listFiles(); + List pluginUrls = new ArrayList<>(); + if (files != null) { + for (File file : files) { + pluginUrls.addAll(loadJarPathFromResource(file.getPath())); + } + } + return pluginUrls; + } + + private static Map> loadResources(URLClassLoader urlClassLoader, URL url, Class extensionType) throws IOException { + Map> extensionMap = new HashMap<>(); + try (InputStream inputStream = url.openStream()) { + Properties properties = new Properties(); + properties.load(inputStream); + properties.forEach((extensionName, extensionClass) -> { + String extensionNameStr = (String) extensionName; + String extensionClassStr = (String) extensionClass; + try { + Class targetClass = urlClassLoader.loadClass(extensionClassStr); + logger.info("load extension class success, extensionType: {}, extensionClass: {}", + extensionType, targetClass); + if (!extensionType.isAssignableFrom(targetClass)) { + throw new ExtensionException( + String.format("class: %s is not subClass of %s", targetClass, extensionType)); + } + extensionMap.put(extensionNameStr, targetClass); + } catch (ClassNotFoundException e) { + throw new ExtensionException("load extension class error", e); + } + }); + } + return extensionMap; + } +} diff --git a/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/loader/MetaInfExtensionClassLoader.java b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/loader/MetaInfExtensionClassLoader.java new file mode 100644 index 0000000000..36766acf53 --- /dev/null +++ b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/loader/MetaInfExtensionClassLoader.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.spi.loader; + +import org.apache.eventmesh.spi.ExtensionException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Load extension from META-INF/eventmesh + */ +public class MetaInfExtensionClassLoader implements ExtensionClassLoader { + + private static final Logger logger = LoggerFactory.getLogger(MetaInfExtensionClassLoader.class); + + private static final ConcurrentHashMap, Map>> EXTENSION_CLASS_CACHE = + new ConcurrentHashMap<>(16); + + private static final String EVENTMESH_EXTENSION_META_DIR = "META-INF/eventmesh/"; + + @Override + public Map> loadExtensionClass(Class extensionType) { + return EXTENSION_CLASS_CACHE.computeIfAbsent(extensionType, this::doLoadExtensionClass); + } + + private Map> doLoadExtensionClass(Class extensionType) { + Map> extensionMap = new HashMap<>(); + String extensionFileName = EVENTMESH_EXTENSION_META_DIR + extensionType.getName(); + ClassLoader classLoader = MetaInfExtensionClassLoader.class.getClassLoader(); + try { + Enumeration extensionUrls = classLoader.getResources(extensionFileName); + if (extensionUrls != null) { + while (extensionUrls.hasMoreElements()) { + URL url = extensionUrls.nextElement(); + extensionMap.putAll(loadResources(url, extensionType)); + } + } + } catch (IOException e) { + throw new ExtensionException("load extension class error", e); + } + return extensionMap; + } + + private static Map> loadResources(URL url, Class extensionType) throws IOException { + Map> extensionMap = new HashMap<>(); + try (InputStream inputStream = url.openStream()) { + Properties properties = new Properties(); + properties.load(inputStream); + properties.forEach((extensionName, extensionClass) -> { + String extensionNameStr = (String) extensionName; + String extensionClassStr = (String) extensionClass; + try { + Class targetClass = Class.forName(extensionClassStr); + logger.info("load extension class success, extensionType: {}, extensionClass: {}", + extensionType, targetClass); + if (!extensionType.isAssignableFrom(targetClass)) { + throw new ExtensionException( + String.format("class: %s is not subClass of %s", targetClass, extensionType)); + } + extensionMap.put(extensionNameStr, targetClass); + } catch (ClassNotFoundException e) { + throw new ExtensionException("load extension class error", e); + } + }); + } + return extensionMap; + } +} diff --git a/eventmesh-starter/build.gradle b/eventmesh-starter/build.gradle index 7b81c6bf08..48aafcb72d 100644 --- a/eventmesh-starter/build.gradle +++ b/eventmesh-starter/build.gradle @@ -16,7 +16,7 @@ */ dependencies { - implementation project(":eventmesh-runtime"), project(":eventmesh-connector-rocketmq") - testImplementation project(":eventmesh-runtime"), project(":eventmesh-connector-rocketmq") + implementation project(":eventmesh-runtime") + testImplementation project(":eventmesh-runtime") //testImplementation group: 'junit', name: 'junit', version: '4.12' } \ No newline at end of file diff --git a/install.sh b/install.sh index 19397ac752..7acf43d904 100644 --- a/install.sh +++ b/install.sh @@ -26,7 +26,7 @@ # jar : produce jar # package tar.gz/zip -gradle clean -Pdev=true -Pjdk=1.7 dist tar zip +gradle clean -Pdev=true -Pjdk=1.8 dist tar zip # package jar -gradle clean -Pdev=true -Pjdk=1.7 jar \ No newline at end of file +gradle clean -Pdev=true -Pjdk=1.8 jar \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index 2b5e0af662..233f213842 100644 --- a/settings.gradle +++ b/settings.gradle @@ -18,11 +18,12 @@ rootProject.name = 'EventMesh' String jdkVersion = "${jdk}" include 'eventmesh-runtime' -include 'eventmesh-connector-rocketmq' include 'eventmesh-sdk-java' include 'eventmesh-common' include 'eventmesh-connector-api' include 'eventmesh-starter' include 'eventmesh-test' include 'eventmesh-spi' +include 'eventmesh-connector-plugin' +include 'eventmesh-connector-plugin:eventmesh-connector-rocketmq'