Skip to content

Commit

Permalink
1. Optimize plugin load, support load plugin by plugin instance name
Browse files Browse the repository at this point in the history
2. Optimize install plugin, suppory install plugin by ./gradlew jar dist
3. Make UrlClassLoader siglenton
  • Loading branch information
ruanwenjun committed Sep 10, 2021
1 parent 16ae31a commit b3ecf54
Show file tree
Hide file tree
Showing 21 changed files with 179 additions and 113 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ dist
classes
package-lock.json
node_modules
.DS_Store
.DS_Store
.run
35 changes: 35 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,18 @@ subprojects {
}
}

// pluginModuleName -> pluginInstanceName
Map<String, String> pluginInstanceMap = [
"eventmesh-connector-rocketmq": "rocketmq",
"eventmesh-security-acl": "acl",
"eventmesh-registry-rocketmq-namesrv": "namesrv"
]
Map<String, String> pluginTypeMap = [
"eventmesh-connector-rocketmq": "connector",
"eventmesh-security-acl": "security",
"eventmesh-registry-rocketmq-namesrv": "registry"
]

task dist(dependsOn: ['jar']) {
doFirst {
new File(projectDir, '../dist/bin').mkdirs()
Expand All @@ -196,12 +208,35 @@ subprojects {
}

doLast {
if (pluginInstanceMap.containsKey(project.name)) {
var pluginInstanceName = pluginInstanceMap.get(project.name)
var pluginTypeName = pluginTypeMap.get(project.name)
println "install connector plugin" + project.name
new File("${rootDir}/dist/plugin/${pluginTypeName}/${pluginInstanceName}").mkdirs()
copy {
into "${rootDir}/dist/plugin/${pluginTypeName}/${pluginInstanceName}"
from project.jar.getArchivePath()
}
copy {
into "${rootDir}/dist/plugin/${pluginTypeName}/${pluginInstanceName}"
from project.configurations.runtimeClasspath
}
copy {
into "${rootDir}/dist/conf"
from sourceSets.main.resources.srcDirs
exclude 'META-INF'
}
}

copy {
into('../dist/apps/')
from project.jar.getArchivePath()
exclude 'eventmesh-common*.jar'
exclude 'eventmesh-connector-api*.jar'
exclude 'eventmesh-connector-plugin*.jar'
exclude 'eventmesh-registry-plugin*.jar'
exclude 'eventmesh-security-plugin*.jar'
exclude 'eventmesh-spi-*.jar'
exclude 'eventmesh-starter*.jar'
exclude 'eventmesh-test*.jar'
exclude 'eventmesh-sdk*.jar'
Expand Down
23 changes: 1 addition & 22 deletions eventmesh-connector-plugin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,4 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

task copyConnectorPlugin(dependsOn: ['jar']) {
doFirst {
new File(projectDir, '../eventmesh-connector-plugin/dist/apps').mkdir()
new File(projectDir, '../dist/plugin/connector').mkdirs()
}
doLast {
copy {
into('../eventmesh-connector-plugin/dist/apps/')
from project.jar.getArchivePath()
exclude {
"eventmesh-connector-plugin-${version}.jar"
"eventmesh-connector-api-${version}.jar"
}
}
copy {
into '../dist/plugin/connector'
from "../eventmesh-connector-plugin/dist/apps/eventmesh-connector-rocketmq-${version}.jar"
}
}
}
*/
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
*/
package org.apache.eventmesh.api.connector;

import org.apache.eventmesh.spi.EventMeshExtensionType;
import org.apache.eventmesh.spi.EventMeshSPI;

@EventMeshSPI(isSingleton = true)
@EventMeshSPI(isSingleton = true, eventMeshExtensionType = EventMeshExtensionType.CONNECTOR)
public interface ConnectorResourceService {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@
import io.openmessaging.api.Message;

import org.apache.eventmesh.api.AbstractContext;
import org.apache.eventmesh.spi.EventMeshExtensionType;
import org.apache.eventmesh.spi.EventMeshSPI;

@EventMeshSPI(isSingleton = false)
@EventMeshSPI(isSingleton = false, eventMeshExtensionType = EventMeshExtensionType.CONNECTOR)
public interface MeshMQPushConsumer extends Consumer {

void init(Properties keyValue) throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@
import io.openmessaging.api.SendCallback;

import org.apache.eventmesh.api.RRCallback;
import org.apache.eventmesh.spi.EventMeshExtensionType;
import org.apache.eventmesh.spi.EventMeshSPI;

@EventMeshSPI(isSingleton = false)
@EventMeshSPI(isSingleton = false, eventMeshExtensionType = EventMeshExtensionType.CONNECTOR)
public interface MeshMQProducer extends Producer {

void init(Properties properties) throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,10 @@
import io.openmessaging.api.MessageListener;
import io.openmessaging.api.MessageSelector;
import io.openmessaging.api.MessagingAccessPoint;
import io.openmessaging.api.OMS;
import io.openmessaging.api.OMSBuiltinKeys;

import org.apache.eventmesh.api.AbstractContext;
import org.apache.eventmesh.api.consumer.MeshMQPushConsumer;
import org.apache.eventmesh.connector.rocketmq.MessagingAccessPointImpl;
import org.apache.eventmesh.connector.rocketmq.common.Constants;
import org.apache.eventmesh.connector.rocketmq.common.EventMeshConstants;
import org.apache.eventmesh.connector.rocketmq.config.ClientConfiguration;
Expand All @@ -53,8 +52,6 @@ public class RocketMQConsumerImpl implements MeshMQPushConsumer {

public Logger messageLogger = LoggerFactory.getLogger("message");

public final String DEFAULT_ACCESS_DRIVER = "org.apache.eventmesh.connector.rocketmq.MessagingAccessPointImpl";

private PushConsumerImpl pushConsumer;

@Override
Expand All @@ -75,9 +72,7 @@ public synchronized void init(Properties keyValue) throws Exception {
}

String omsNamesrv = clientConfiguration.namesrvAddr;
// KeyValue properties = OMS.newKeyValue().put(OMSBuiltinKeys.DRIVER_IMPL, DEFAULT_ACCESS_DRIVER);
Properties properties = new Properties();
properties.put(OMSBuiltinKeys.DRIVER_IMPL, DEFAULT_ACCESS_DRIVER);
properties.put("ACCESS_POINTS", omsNamesrv);
properties.put("REGION", "namespace");
properties.put("instanceName", instanceName);
Expand All @@ -87,7 +82,7 @@ public synchronized void init(Properties keyValue) throws Exception {
} else {
properties.put("MESSAGE_MODEL", MessageModel.CLUSTERING.name());
}
MessagingAccessPoint messagingAccessPoint = OMS.builder().build(properties);
MessagingAccessPoint messagingAccessPoint = new MessagingAccessPointImpl(properties);
pushConsumer = (PushConsumerImpl) messagingAccessPoint.createConsumer(properties);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import org.apache.eventmesh.api.RRCallback;
import org.apache.eventmesh.api.producer.MeshMQProducer;
import org.apache.eventmesh.connector.rocketmq.MessagingAccessPointImpl;
import org.apache.eventmesh.connector.rocketmq.common.EventMeshConstants;
import org.apache.eventmesh.connector.rocketmq.config.ClientConfiguration;
import org.apache.eventmesh.connector.rocketmq.config.ConfigurationWrapper;
Expand All @@ -48,8 +49,6 @@ public class RocketMQProducerImpl implements MeshMQProducer {

private ProducerImpl producer;

public final String DEFAULT_ACCESS_DRIVER = "org.apache.eventmesh.connector.rocketmq.MessagingAccessPointImpl";

@Override
public synchronized void init(Properties keyValue) {
ConfigurationWrapper configurationWrapper =
Expand All @@ -62,14 +61,13 @@ public synchronized void init(Properties keyValue) {

String omsNamesrv = clientConfiguration.namesrvAddr;
Properties properties = new Properties();
properties.put(OMSBuiltinKeys.DRIVER_IMPL, DEFAULT_ACCESS_DRIVER);
properties.put("ACCESS_POINTS", omsNamesrv);
properties.put("REGION", "namespace");
properties.put("RMQ_PRODUCER_GROUP", producerGroup);
properties.put("OPERATION_TIMEOUT", 3000);
properties.put("PRODUCER_ID", producerGroup);

MessagingAccessPoint messagingAccessPoint = OMS.builder().build(properties);
MessagingAccessPoint messagingAccessPoint = new MessagingAccessPointImpl(properties);
producer = (ProducerImpl) messagingAccessPoint.createProducer(properties);

}
Expand Down
21 changes: 0 additions & 21 deletions eventmesh-registry-plugin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,3 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

task copyRegistryPlugin(dependsOn: ['jar']) {
doFirst {
new File(projectDir, '../eventmesh-registry-plugin/dist/apps').mkdir()
new File(projectDir, '../dist/plugin/registry').mkdirs()
}
doLast {
copy {
into('../eventmesh-registry-plugin/dist/apps/')
from project.jar.getArchivePath()
exclude {
"eventmesh-registry-plugin-${version}.jar"
"eventmesh-registry-api-${version}.jar"
}
}
copy {
into '../dist/plugin/registry'
from "../eventmesh-registry-plugin/dist/apps/eventmesh-registry-rocketmq-namesrv-${version}.jar"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@
import org.apache.eventmesh.api.registry.dto.EventMeshDataInfo;
import org.apache.eventmesh.api.registry.dto.EventMeshRegisterInfo;
import org.apache.eventmesh.api.registry.dto.EventMeshUnRegisterInfo;
import org.apache.eventmesh.spi.EventMeshExtensionType;
import org.apache.eventmesh.spi.EventMeshSPI;

import java.util.List;
import java.util.Map;

@EventMeshSPI(isSingleton = true)
@EventMeshSPI(isSingleton = true, eventMeshExtensionType = EventMeshExtensionType.REGISTRY)
public interface RegistryService {
void init() throws RegistryException;

Expand Down
23 changes: 1 addition & 22 deletions eventmesh-security-plugin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,4 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

task copyAclPlugin(dependsOn: ['jar']) {
doFirst {
new File(projectDir, '../eventmesh-security-plugin/dist/apps').mkdir()
new File(projectDir, '../dist/plugin/security').mkdirs()
}
doLast {
copy {
into('../eventmesh-security-plugin/dist/apps/')
from project.jar.getArchivePath()
exclude {
"eventmesh-security-plugin-${version}.jar"
"eventmesh-security-api-${version}.jar"
}
}
copy {
into '../dist/plugin/security'
from "../eventmesh-security-plugin/dist/apps/eventmesh-security-acl-${version}.jar"
}
}
}
*/
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
package org.apache.eventmesh.api.acl;

import org.apache.eventmesh.api.exception.AclException;
import org.apache.eventmesh.spi.EventMeshExtensionType;
import org.apache.eventmesh.spi.EventMeshSPI;

import java.util.Properties;

@EventMeshSPI(isSingleton = true)
@EventMeshSPI(isSingleton = true, eventMeshExtensionType = EventMeshExtensionType.SECURITY)
public interface AclService {
void init() throws AclException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,43 +72,43 @@ public static <T> T getExtension(Class<T> extensionType, String extensionName) {
}

@SuppressWarnings("unchecked")
private static <T> T getSingletonExtension(Class<T> extensionType, String extensionName) {
return (T) EXTENSION_INSTANCE_CACHE.computeIfAbsent(extensionName, name -> {
Class<T> extensionInstanceClass = getExtensionClass(extensionType, extensionName);
private static <T> T getSingletonExtension(Class<T> extensionType, String extensionInstanceName) {
return (T) EXTENSION_INSTANCE_CACHE.computeIfAbsent(extensionInstanceName, name -> {
Class<T> extensionInstanceClass = getExtensionInstanceClass(extensionType, extensionInstanceName);
try {
if (extensionInstanceClass == null) {
return null;
}
T extensionInstance = extensionInstanceClass.newInstance();
logger.info("initialize extension instance success, extensionType: {}, extensionName: {}",
extensionType, extensionName);
logger.info("initialize extension instance success, extensionType: {}, extensionInstanceName: {}",
extensionType, extensionInstanceName);
return extensionInstance;
} catch (InstantiationException | IllegalAccessException e) {
throw new ExtensionException("Extension initialize error", e);
}
});
}

private static <T> T getPrototypeExtension(Class<T> extensionType, String extensionName) {
Class<T> extensionInstanceClass = getExtensionClass(extensionType, extensionName);
private static <T> T getPrototypeExtension(Class<T> extensionType, String extensionInstanceName) {
Class<T> extensionInstanceClass = getExtensionInstanceClass(extensionType, extensionInstanceName);
try {
if (extensionInstanceClass == null) {
return null;
}
T extensionInstance = extensionInstanceClass.newInstance();
logger.info("initialize extension instance success, extensionType: {}, extensionName: {}",
extensionType, extensionName);
extensionType, extensionInstanceName);
return extensionInstance;
} catch (InstantiationException | IllegalAccessException e) {
throw new ExtensionException("Extension initialize error", e);
}
}

@SuppressWarnings("unchecked")
private static <T> Class<T> getExtensionClass(Class<T> extensionType, String extensionName) {
private static <T> Class<T> getExtensionInstanceClass(Class<T> extensionType, String extensionInstanceName) {
for (ExtensionClassLoader extensionClassLoader : extensionClassLoaders) {
Map<String, Class<?>> extensionClassMap = extensionClassLoader.loadExtensionClass(extensionType);
Class<?> instanceClass = extensionClassMap.get(extensionName);
Map<String, Class<?>> extensionInstanceClassMap = extensionClassLoader.loadExtensionClass(extensionType, extensionInstanceName);
Class<?> instanceClass = extensionInstanceClassMap.get(extensionInstanceName);
if (instanceClass != null) {
return (Class<T>) instanceClass;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.apache.eventmesh.spi;

/**
* An Extension can be defined by extensionTypeName and extensionInstanceName
*/
public enum EventMeshExtensionType {
UNKNOWN("unknown"),
CONNECTOR("connector"),
REGISTRY("registry"),
SECURITY("security"),
;

private final String extensionTypeName;

EventMeshExtensionType(String extensionTypeName) {
this.extensionTypeName = extensionTypeName;
}

public String getExtensionTypeName() {
return extensionTypeName;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,11 @@
*/
boolean isSingleton() default false;

/**
* {@link EventMeshExtensionType}
* @return extension type
*/
EventMeshExtensionType eventMeshExtensionType();

}

Loading

0 comments on commit b3ecf54

Please sign in to comment.