diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/EventMeshThreadFactory.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/EventMeshThreadFactory.java index 0442ef1286..d18ec5a048 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/EventMeshThreadFactory.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/EventMeshThreadFactory.java @@ -67,12 +67,12 @@ public EventMeshThreadFactory(final String threadNamePrefix) { public Thread newThread(@Nonnull final Runnable runnable) { StringBuilder threadName = new StringBuilder(threadNamePrefix); - if (null != threadIndex) { + if (threadIndex != null) { threadName.append("-").append(threadIndex.incrementAndGet()); } Thread thread = new Thread(runnable, threadName.toString()); thread.setDaemon(daemon); - if (null != priority) { + if (priority != null) { thread.setPriority(priority); } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java index c7465d32c8..2f38a372ce 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java @@ -67,12 +67,6 @@ public class CommonConfiguration { @ConfigField(field = "metaStorage.plugin.password") private String eventMeshMetaStoragePluginPassword = ""; - @ConfigField(field = "metaStorage.plugin.metaStorageIntervalInMills") - private Integer eventMeshMetaStorageIntervalInMills = 10 * 1000; - - @ConfigField(field = "metaStorage.plugin.fetchMetaStorageAddrIntervalInMills") - private Integer eventMeshFetchMetaStorageAddrInterval = 10 * 1000; - @ConfigField(field = "metaStorage.plugin.enabled") private boolean eventMeshServerMetaStorageEnable = false; @@ -85,11 +79,8 @@ public class CommonConfiguration { @ConfigField(field = "security.plugin.type", notEmpty = true) private String eventMeshSecurityPluginType = "security"; - @ConfigField(field = "connector.plugin.type", notEmpty = true) - private String eventMeshConnectorPluginType = "rocketmq"; - @ConfigField(field = "storage.plugin.type", notEmpty = true) - private String eventMeshStoragePluginType = "rocketmq"; + private String eventMeshStoragePluginType = "standalone"; @ConfigField(field = "security.validation.type.token", notEmpty = true) private boolean eventMeshSecurityValidateTypeToken = false; diff --git a/eventmesh-common/src/test/java/org/apache/eventmesh/common/config/CommonConfigurationTest.java b/eventmesh-common/src/test/java/org/apache/eventmesh/common/config/CommonConfigurationTest.java index 11265522a7..fb1ebc1635 100644 --- a/eventmesh-common/src/test/java/org/apache/eventmesh/common/config/CommonConfigurationTest.java +++ b/eventmesh-common/src/test/java/org/apache/eventmesh/common/config/CommonConfigurationTest.java @@ -45,7 +45,6 @@ public void testGetCommonConfiguration() { Assertions.assertEquals("cluster-succeed!!!", config.getEventMeshCluster()); Assertions.assertEquals("name-succeed!!!", config.getEventMeshName()); Assertions.assertEquals("816", config.getSysID()); - // Assertions.assertEquals("connector-succeed!!!", config.getEventMeshConnectorPluginType()); Assertions.assertEquals("storage-succeed!!!", config.getEventMeshStoragePluginType()); Assertions.assertEquals("storage-succeed!!!", config.getEventMeshStoragePluginType()); Assertions.assertEquals("security-succeed!!!", config.getEventMeshSecurityPluginType()); @@ -55,9 +54,6 @@ public void testGetCommonConfiguration() { Assertions.assertEquals("username-succeed!!!", config.getEventMeshMetaStoragePluginUsername()); Assertions.assertEquals("password-succeed!!!", config.getEventMeshMetaStoragePluginPassword()); - Assertions.assertEquals(Integer.valueOf(816), config.getEventMeshMetaStorageIntervalInMills()); - Assertions.assertEquals(Integer.valueOf(1816), config.getEventMeshFetchMetaStorageAddrInterval()); - List list = new ArrayList<>(); list.add("metrics-succeed1!!!"); list.add("metrics-succeed2!!!"); diff --git a/eventmesh-common/src/test/java/org/apache/eventmesh/common/utils/SystemUtilsTest.java b/eventmesh-common/src/test/java/org/apache/eventmesh/common/utils/SystemUtilsTest.java index b941fee1d4..32f4ae946e 100644 --- a/eventmesh-common/src/test/java/org/apache/eventmesh/common/utils/SystemUtilsTest.java +++ b/eventmesh-common/src/test/java/org/apache/eventmesh/common/utils/SystemUtilsTest.java @@ -24,7 +24,7 @@ public class SystemUtilsTest { @Test public void isLinuxPlatform() { - if (null != SystemUtils.OS_NAME && SystemUtils.OS_NAME.toLowerCase().contains("linux")) { + if (SystemUtils.OS_NAME != null && SystemUtils.OS_NAME.toLowerCase().contains("linux")) { Assertions.assertTrue(SystemUtils.isLinuxPlatform()); Assertions.assertFalse(SystemUtils.isWindowsPlatform()); } @@ -32,7 +32,7 @@ public void isLinuxPlatform() { @Test public void isWindowsPlatform() { - if (null != SystemUtils.OS_NAME && SystemUtils.OS_NAME.toLowerCase().contains("windows")) { + if (SystemUtils.OS_NAME != null && SystemUtils.OS_NAME.toLowerCase().contains("windows")) { Assertions.assertFalse(SystemUtils.isLinuxPlatform()); Assertions.assertTrue(SystemUtils.isWindowsPlatform()); } diff --git a/eventmesh-common/src/test/resources/configuration.properties b/eventmesh-common/src/test/resources/configuration.properties index f80bb7db01..f53a7680f0 100644 --- a/eventmesh-common/src/test/resources/configuration.properties +++ b/eventmesh-common/src/test/resources/configuration.properties @@ -20,13 +20,10 @@ eventMesh.sysid=816 eventMesh.server.cluster=cluster-succeed!!! eventMesh.server.name=name-succeed!!! eventMesh.server.hostIp=hostIp-succeed!!! -eventMesh.connector.plugin.type=connector-succeed!!! eventMesh.storage.plugin.type=storage-succeed!!! eventMesh.security.plugin.type=security-succeed!!! eventMesh.metaStorage.plugin.type=metaStorage-succeed!!! eventMesh.trace.plugin=trace-succeed!!! -eventMesh.metaStorage.plugin.metaStorageIntervalInMills=816 -eventMesh.metaStorage.plugin.fetchMetaStorageAddrIntervalInMills=1816 eventMesh.metrics.plugin=metrics-succeed1!!!,metrics-succeed2!!!,metrics-succeed3!!! eventMesh.metaStorage.plugin.server-addr=server-addr-succeed1!!! diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/dialect/AbstractGeneralDatabaseDialect.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/dialect/AbstractGeneralDatabaseDialect.java index 64bde49935..0a7463a187 100644 --- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/dialect/AbstractGeneralDatabaseDialect.java +++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/dialect/AbstractGeneralDatabaseDialect.java @@ -146,7 +146,7 @@ protected void registerType(Type type) { @Override public String getTypeName(Dialect hibernateDialect, Column column) { Type type = this.getType(column); - if (null != type) { + if (type != null) { return type.getTypeName(column); } Long length = Optional.ofNullable(column.getColumnLength()).orElse(0L); diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/antlr4/mysql/listener/CreateTableParserListener.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/antlr4/mysql/listener/CreateTableParserListener.java index c24f885d75..4e30da93f9 100644 --- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/antlr4/mysql/listener/CreateTableParserListener.java +++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/antlr4/mysql/listener/CreateTableParserListener.java @@ -166,7 +166,7 @@ public void enterTableOptionCharset(TableOptionCharsetContext ctx) { @Override public void enterTableOptionAutoIncrement(TableOptionAutoIncrementContext ctx) { DecimalLiteralContext decimalLiteralContext = ctx.decimalLiteral(); - if (null != decimalLiteralContext) { + if (decimalLiteralContext != null) { String autoIncrementNumber = Antlr4Utils.getText(decimalLiteralContext); this.tableEditor.withOption(MysqlTableOptions.AUTO_INCREMENT, autoIncrementNumber); } diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/cdc/mysql/MysqlCdcEngine.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/cdc/mysql/MysqlCdcEngine.java index 645e5c9eca..5650c3d0cc 100644 --- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/cdc/mysql/MysqlCdcEngine.java +++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/cdc/mysql/MysqlCdcEngine.java @@ -392,7 +392,7 @@ private void enableGtidHandle() { EventMeshGtidSet purgedServerEventMeshGtidSet = new EventMeshGtidSet(purgedServerGtid); EventMeshGtidSet filteredEventMeshGtidSet = filterGtidSet(context, executedEventMeshGtidSet, purgedServerEventMeshGtidSet); - if (null != filteredEventMeshGtidSet) { + if (filteredEventMeshGtidSet != null) { client.setGtidSet(filteredEventMeshGtidSet.toString()); this.context.completedGtidSet(filteredEventMeshGtidSet.toString()); localGtidSet = new com.github.shyiko.mysql.binlog.GtidSet(filteredEventMeshGtidSet.toString()); @@ -645,7 +645,7 @@ private void handleCdcDmlData(MysqlJdbcContext context, MysqlSourceMateData sour schema.addKeys(tableSchema.getPrimaryKey().getColumnNames()); Pair beforePair = Optional.ofNullable(pair.getLeft()).orElse(new Pair<>()); Serializable[] beforeRows = beforePair.getLeft(); - if (null != beforeRows && beforeRows.length != 0) { + if (beforeRows != null && beforeRows.length != 0) { BitSet includedColumns = beforePair.getRight(); Map beforeValues = new HashMap<>(beforeRows.length); for (int index = 0; index < columnsSize; ++index) { @@ -663,7 +663,7 @@ private void handleCdcDmlData(MysqlJdbcContext context, MysqlSourceMateData sour Pair afterPair = Optional.ofNullable(pair.getRight()).orElse(new Pair<>()); Serializable[] afterRows = afterPair.getLeft(); - if (null != afterRows && afterRows.length != 0) { + if (afterRows != null && afterRows.length != 0) { BitSet includedColumns = afterPair.getRight(); Map afterValues = new HashMap<>(afterRows.length); for (int index = 0; index < columnsSize; ++index) { diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/Table.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/Table.java index c475227248..158ffc81da 100644 --- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/Table.java +++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/Table.java @@ -48,7 +48,7 @@ public Table(TableId tableId, PrimaryKey primaryKey, List uniqueKeys, this.primaryKey = primaryKey; this.uniqueKeys = uniqueKeys; this.comment = comment; - if (null != options) { + if (options != null) { this.options.putAll(options); } } diff --git a/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/ImServiceHandler.java b/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/ImServiceHandler.java index 4797d58ef9..75e51e690b 100644 --- a/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/ImServiceHandler.java +++ b/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/ImServiceHandler.java @@ -313,12 +313,12 @@ private String createInteractiveContent(ConnectRecord connectRecord, String titl private boolean needAtAll(ConnectRecord connectRecord) { String atAll = connectRecord.getExtension(ConnectRecordExtensionKeys.AT_ALL_4_LARK); - return null != atAll && !"null".equals(atAll) && Boolean.parseBoolean(atAll); + return atAll != null && !"null".equals(atAll) && Boolean.parseBoolean(atAll); } private String needAtUser(ConnectRecord connectRecord) { String atUsers = connectRecord.getExtension(ConnectRecordExtensionKeys.AT_USERS_4_LARK); - return null != atUsers && !"null".equals(atUsers) ? atUsers : ""; + return atUsers != null && !"null".equals(atUsers) ? atUsers : ""; } /** diff --git a/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/client/RabbitmqClient.java b/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/client/RabbitmqClient.java index 34cda9620e..1f324b5847 100644 --- a/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/client/RabbitmqClient.java +++ b/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/client/RabbitmqClient.java @@ -120,7 +120,7 @@ public void unbinding(Channel channel, String exchangeName, String routingKey, S * @param connection connection */ public void closeConnection(Connection connection) { - if (null != connection) { + if (connection != null) { try { connection.close(); } catch (Exception ex) { @@ -135,7 +135,7 @@ public void closeConnection(Connection connection) { * @param channel channel */ public void closeChannel(Channel channel) { - if (null != channel) { + if (channel != null) { try { channel.close(); } catch (Exception ex) { diff --git a/eventmesh-meta/eventmesh-meta-consul/src/main/java/org/apache/eventmesh/meta/consul/service/ConsulMetaService.java b/eventmesh-meta/eventmesh-meta-consul/src/main/java/org/apache/eventmesh/meta/consul/service/ConsulMetaService.java index 2d9b921efb..a3849d2cc8 100644 --- a/eventmesh-meta/eventmesh-meta-consul/src/main/java/org/apache/eventmesh/meta/consul/service/ConsulMetaService.java +++ b/eventmesh-meta/eventmesh-meta-consul/src/main/java/org/apache/eventmesh/meta/consul/service/ConsulMetaService.java @@ -72,7 +72,7 @@ public void init() throws MetaException { if (initStatus.compareAndSet(false, true)) { for (String key : ConfigurationContextUtil.KEYS) { CommonConfiguration commonConfiguration = ConfigurationContextUtil.get(key); - if (null != commonConfiguration) { + if (commonConfiguration != null) { String metaStorageAddr = commonConfiguration.getMetaStorageAddr(); if (StringUtils.isBlank(metaStorageAddr)) { throw new MetaException("namesrvAddr cannot be null"); diff --git a/eventmesh-meta/eventmesh-meta-zookeeper/src/main/java/org/apache/eventmesh/meta/zookeeper/service/ZookeeperMetaService.java b/eventmesh-meta/eventmesh-meta-zookeeper/src/main/java/org/apache/eventmesh/meta/zookeeper/service/ZookeeperMetaService.java index 359e7427f0..18520feb4d 100644 --- a/eventmesh-meta/eventmesh-meta-zookeeper/src/main/java/org/apache/eventmesh/meta/zookeeper/service/ZookeeperMetaService.java +++ b/eventmesh-meta/eventmesh-meta-zookeeper/src/main/java/org/apache/eventmesh/meta/zookeeper/service/ZookeeperMetaService.java @@ -175,7 +175,7 @@ public void shutdown() throws MetaException { if (!startStatus.compareAndSet(true, false)) { return; } - if (null != zkClient) { + if (zkClient != null) { zkClient.close(); } log.info("ZookeeperRegistryService closed"); diff --git a/eventmesh-operator/config/samples/eventmesh_v1_runtime.yaml b/eventmesh-operator/config/samples/eventmesh_v1_runtime.yaml index ee94d606fb..fc640aa639 100644 --- a/eventmesh-operator/config/samples/eventmesh_v1_runtime.yaml +++ b/eventmesh-operator/config/samples/eventmesh_v1_runtime.yaml @@ -35,11 +35,13 @@ data: eventMesh.server.cluster=COMMON eventMesh.server.name=EVENTMESH-runtime eventMesh.sysid=0000 + eventMesh.server.tcp.port=10000 eventMesh.server.http.port=10105 eventMesh.server.grpc.port=10205 + # HTTP Admin Server + eventMesh.server.admin.http.port=10106 ########################## eventMesh tcp configuration ############################ eventMesh.server.tcp.enabled=true - eventMesh.server.tcp.port=10000 eventMesh.server.tcp.readerIdleSeconds=120 eventMesh.server.tcp.writerIdleSeconds=120 eventMesh.server.tcp.allIdleSeconds=120 @@ -69,13 +71,6 @@ data: eventMesh.server.retry.async.pushRetryDelayInMills=500 eventMesh.server.retry.sync.pushRetryDelayInMills=500 eventMesh.server.retry.pushRetryQueueSize=10000 - #admin - eventMesh.server.admin.http.port=10106 - #metaStorage - eventMesh.server.metaStorage.metaStorageIntervalInMills=10000 - eventMesh.server.metaStorage.fetchMetaStorageAddrIntervalInMills=20000 - #auto-ack - #eventMesh.server.defibus.client.comsumeTimeoutInMin=5 #sleep interval between closing client of different group in server graceful shutdown eventMesh.server.gracefulShutdown.sleepIntervalInMills=1000 @@ -85,9 +80,6 @@ data: eventMesh.server.blacklist.ipv4=0.0.0.0/8,127.0.0.0/8,169.254.0.0/16,255.255.255.255/32 eventMesh.server.blacklist.ipv6=::/128,::1/128,ff00::/8 - #connector plugin - eventMesh.connector.plugin.type=standalone - #storage plugin eventMesh.storage.plugin.type=standalone diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-http/src/main/java/org/apache/eventmesh/protocol/http/HttpProtocolAdaptor.java b/eventmesh-protocol-plugin/eventmesh-protocol-http/src/main/java/org/apache/eventmesh/protocol/http/HttpProtocolAdaptor.java index a232781ff6..08c4718825 100644 --- a/eventmesh-protocol-plugin/eventmesh-protocol-http/src/main/java/org/apache/eventmesh/protocol/http/HttpProtocolAdaptor.java +++ b/eventmesh-protocol-plugin/eventmesh-protocol-http/src/main/java/org/apache/eventmesh/protocol/http/HttpProtocolAdaptor.java @@ -97,7 +97,7 @@ public ProtocolTransportObject fromCloudEvent(CloudEvent cloudEvent) throws Prot } httpEventWrapper.setSysHeaderMap(sysHeaderMap); // ce data - if (null != cloudEvent.getData()) { + if (cloudEvent.getData() != null) { Map dataContentMap = JsonUtils.parseTypeReferenceObject( new String(Objects.requireNonNull(cloudEvent.getData()).toBytes(), Constants.DEFAULT_CHARSET), new TypeReference>() { diff --git a/eventmesh-runtime/conf/eventmesh.properties b/eventmesh-runtime/conf/eventmesh.properties index cabe3f9bc5..91a422fa4c 100644 --- a/eventmesh-runtime/conf/eventmesh.properties +++ b/eventmesh-runtime/conf/eventmesh.properties @@ -21,11 +21,14 @@ eventMesh.server.provide.protocols=HTTP,TCP,GRPC eventMesh.server.cluster=COMMON eventMesh.server.name=EVENTMESH-runtime eventMesh.sysid=0000 +eventMesh.server.tcp.port=10000 eventMesh.server.http.port=10105 eventMesh.server.grpc.port=10205 +# HTTP Admin Server +eventMesh.server.admin.http.port=10106 + ########################## EventMesh TCP Configuration ########################## eventMesh.server.tcp.enabled=true -eventMesh.server.tcp.port=10000 eventMesh.server.tcp.readerIdleSeconds=120 eventMesh.server.tcp.writerIdleSeconds=120 eventMesh.server.tcp.allIdleSeconds=120 @@ -57,14 +60,6 @@ eventMesh.server.retry.sync.pushRetryDelayInMills=500 eventMesh.server.retry.pushRetryQueueSize=10000 eventMesh.server.retry.plugin.type=default -# runtime admin -eventMesh.server.admin.http.port=10106 -# metaStorage -eventMesh.server.metaStorage.metaStorageIntervalInMills=10000 -eventMesh.server.metaStorage.fetchMetaStorageAddrIntervalInMills=20000 -# auto-ack -#eventMesh.server.defibus.client.comsumeTimeoutInMin=5 - # sleep interval between closing client of different group in server graceful shutdown eventMesh.server.gracefulShutdown.sleepIntervalInMills=1000 eventMesh.server.rebalanceRedirect.sleepIntervalInMills=200 @@ -73,9 +68,7 @@ eventMesh.server.rebalanceRedirect.sleepIntervalInMills=200 eventMesh.server.blacklist.ipv4=0.0.0.0/8,127.0.0.0/8,169.254.0.0/16,255.255.255.255/32 eventMesh.server.blacklist.ipv6=::/128,::1/128,ff00::/8 -# connector plugin -eventMesh.connector.plugin.type=standalone - +########################## EventMesh Plugin Configuration ########################## # storage plugin eventMesh.storage.plugin.type=standalone @@ -91,6 +84,7 @@ eventMesh.metaStorage.plugin.type=nacos eventMesh.metaStorage.plugin.server-addr=127.0.0.1:8848 eventMesh.metaStorage.plugin.username=nacos eventMesh.metaStorage.plugin.password=nacos + # metaStorage plugin: nacos #eventMesh.metaStorage.nacos.endpoint= #eventMesh.metaStorage.nacos.accessKey= @@ -137,9 +131,9 @@ eventMesh.trace.plugin=zipkin eventMesh.webHook.admin.start=true # Webhook event configuration storage mode. Currently, only file and nacos are supported eventMesh.webHook.operationMode=file -# The file storage path of the file storage mode. If #{eventmeshhome} is written, it is in the eventmesh root directory +# The file storage path of the file storage mode. If #{eventMeshHome} is written, it is in the EventMesh root directory eventMesh.webHook.fileMode.filePath= #{eventMeshHome}/webhook -# Nacos storage mode, and the configuration naming rule is eventmesh webHook. nacosMode. {nacos native configuration key} please see the specific configuration [nacos github api](https://github.com/alibaba/nacos/blob/develop/api/src/main/java/com/alibaba/nacos/api/SystemPropertyKeyConst.java) +# Nacos storage mode, and the configuration naming rule is EventMesh webHook. nacosMode. {nacos native configuration key} please see the specific configuration [nacos github api](https://github.com/alibaba/nacos/blob/develop/api/src/main/java/com/alibaba/nacos/api/SystemPropertyKeyConst.java) ## Address of Nacos eventMesh.webHook.nacosMode.serverAddr=127.0.0.1:8848 # Webhook CloudEvent sending mode. This property is the same as the eventMesh.storage.plugin.type configuration. diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java index 8dd9525170..5065b73a01 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java @@ -451,11 +451,11 @@ public void channelReadComplete(final ChannelHandlerContext ctx) throws Exceptio @Override public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) { - if (null != cause) { + if (cause != null) { log.error("", cause); } - if (null != ctx) { + if (ctx != null) { ctx.close(); } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java index 26e727406b..3b33a71b8f 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java @@ -27,7 +27,6 @@ import org.apache.eventmesh.common.utils.ConfigurationContextUtil; import org.apache.eventmesh.runtime.acl.Acl; import org.apache.eventmesh.runtime.common.ServiceState; -import org.apache.eventmesh.runtime.constants.EventMeshConstants; import org.apache.eventmesh.runtime.core.protocol.http.producer.ProducerTopicManager; import org.apache.eventmesh.runtime.meta.MetaStorage; import org.apache.eventmesh.runtime.storage.StorageResource; @@ -139,13 +138,10 @@ public void init() throws Exception { adminBootstrap.init(); } - final String eventStore = System.getProperty(EventMeshConstants.EVENT_STORE_PROPERTIES, System.getenv(EventMeshConstants.EVENT_STORE_ENV)); - - log.info("eventStore : {}", eventStore); producerTopicManager = new ProducerTopicManager(this); producerTopicManager.init(); - serviceState = ServiceState.INITED; + serviceState = ServiceState.INITED; log.info(SERVER_STATE_MSG, serviceState); } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/constants/EventMeshConstants.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/constants/EventMeshConstants.java index 4196572ccb..2830ae9596 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/constants/EventMeshConstants.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/constants/EventMeshConstants.java @@ -21,10 +21,6 @@ public class EventMeshConstants { - public static final String EVENT_STORE_PROPERTIES = "eventstore"; - - public static final String EVENT_STORE_ENV = "EVENT_STORE"; - public static final String PROTOCOL_HTTP = "http"; public static final String PROTOCOL_TCP = "tcp"; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQProducerWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQProducerWrapper.java index f0a50dcc05..84b0079f4b 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQProducerWrapper.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQProducerWrapper.java @@ -91,5 +91,4 @@ public boolean reply(final CloudEvent cloudEvent, final SendCallback sendCallbac public Producer getMeshMQProducer() { return meshMQProducer; } - } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQWrapper.java index 6edf93f530..e0828d460f 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQWrapper.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQWrapper.java @@ -17,29 +17,11 @@ package org.apache.eventmesh.runtime.core.plugin; -import org.apache.eventmesh.runtime.constants.EventMeshConstants; - -import org.apache.commons.lang3.StringUtils; - import java.util.concurrent.atomic.AtomicBoolean; public abstract class MQWrapper { - public static final String EVENT_STORE_DEFIBUS = "defibus"; - - public static String CURRENT_EVENT_STORE = EVENT_STORE_DEFIBUS; - - public static final String EVENT_STORE_CONF = System.getProperty(EventMeshConstants.EVENT_STORE_PROPERTIES, - System.getenv(EventMeshConstants.EVENT_STORE_ENV)); - - static { - if (StringUtils.isNotBlank(EVENT_STORE_CONF)) { - CURRENT_EVENT_STORE = EVENT_STORE_CONF; - } - } - public AtomicBoolean started = new AtomicBoolean(Boolean.FALSE); public AtomicBoolean inited = new AtomicBoolean(Boolean.FALSE); - } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/producer/EventMeshProducer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/producer/EventMeshProducer.java index bd27d6933d..749695d4c7 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/producer/EventMeshProducer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/producer/EventMeshProducer.java @@ -65,7 +65,6 @@ public synchronized void init(CommonConfiguration configuration, keyValue.put(EventMeshConstants.INSTANCE_NAME, EventMeshUtil.buildMeshClientID( producerGroupConfig.getGroupName(), configuration.getEventMeshCluster())); - // TODO for defibus keyValue.put(EventMeshConstants.EVENT_MESH_IDC, configuration.getEventMeshIDC()); mqProducerWrapper = new MQProducerWrapper( configuration.getEventMeshStoragePluginType()); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java index f5a6f748e4..8b4ce0a4e1 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java @@ -289,7 +289,6 @@ public synchronized void startClientGroupProducer() throws Exception { .buildMeshTcpClientID(sysId, EventMeshConstants.PURPOSE_PUB_UPPER_CASE, eventMeshTCPConfiguration.getEventMeshCluster())); - // TODO for defibus keyValue.put(EventMeshConstants.EVENT_MESH_IDC, eventMeshTCPConfiguration.getEventMeshIDC()); mqProducerWrapper.init(keyValue); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java index 37c5a1e372..e2e36690a6 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java @@ -238,15 +238,15 @@ public boolean equals(Object o) { @Override public int hashCode() { int result = 1001; // primeNumber - if (null != client) { + if (client != null) { result += 31 * result + Objects.hash(client); } - if (null != context) { + if (context != null) { result += 31 * result + Objects.hash(context); } - if (null != sessionState) { + if (sessionState != null) { result += 31 * result + Objects.hash(sessionState); } return result; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/HttpTinyClient.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/HttpTinyClient.java index 38d3019de3..24142a6821 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/HttpTinyClient.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/HttpTinyClient.java @@ -80,7 +80,7 @@ private static String encodingParams(Collection paramValues, String enco } private static void setHeaders(HttpURLConnection conn, Collection headers, String encoding) { - if (null != headers) { + if (headers != null) { for (Iterator iter = headers.iterator(); iter.hasNext();) { conn.addRequestProperty(iter.next(), iter.next()); } @@ -116,7 +116,7 @@ public static HttpResult httpPost(String url, List headers, List return new HttpResult(respCode, resp); } finally { - if (null != conn) { + if (conn != null) { conn.disconnect(); } } diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/boot/EventMeshServerTest.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/boot/EventMeshServerTest.java index 3d7cfdf877..c546d38e3b 100644 --- a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/boot/EventMeshServerTest.java +++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/boot/EventMeshServerTest.java @@ -93,7 +93,6 @@ private void assertCommonConfig(CommonConfiguration config) { Assertions.assertEquals("cluster-succeed!!!", config.getEventMeshCluster()); Assertions.assertEquals("name-succeed!!!", config.getEventMeshName()); Assertions.assertEquals("816", config.getSysID()); - Assertions.assertEquals("connector-succeed!!!", config.getEventMeshConnectorPluginType()); Assertions.assertEquals("storage-succeed!!!", config.getEventMeshStoragePluginType()); Assertions.assertEquals("security-succeed!!!", config.getEventMeshSecurityPluginType()); Assertions.assertEquals("metaStorage-succeed!!!", config.getEventMeshMetaStoragePluginType()); diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/client/common/UserAgentUtils.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/client/common/UserAgentUtils.java index 2602c85384..500008dd7e 100644 --- a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/client/common/UserAgentUtils.java +++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/client/common/UserAgentUtils.java @@ -43,8 +43,6 @@ public static UserAgent createPubUserAgent() { public static UserAgent createUserAgent() { UserAgent userAgent = new UserAgent(); userAgent.setSubsystem("5123"); - // userAgent.setPid(UtilAll.getPid()); - // userAgent.setHost(RemotingUtil.getLocalAddress()); userAgent.setVersion("2.0.8"); userAgent.setUsername("username"); userAgent.setPassword("1234"); @@ -54,13 +52,11 @@ public static UserAgent createUserAgent() { public static UserAgent createSubUserAgent() { UserAgent userAgent = new UserAgent(); userAgent.setSubsystem("5243"); - // userAgent.setPid(UtilAll.getPid()); - // userAgent.setHost(RemotingUtil.getLocalAddress()); userAgent.setPort(8888); userAgent.setVersion("2.0.8"); userAgent.setUsername("username"); userAgent.setPassword("1234"); - userAgent.setPath("/data/app/defibus-acl/"); + userAgent.setPath("/data/app/acl/"); userAgent.setPurpose(EventMeshConstants.PURPOSE_SUB); return userAgent; } diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/configuration/EventMeshGrpcConfigurationTest.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/configuration/EventMeshGrpcConfigurationTest.java index b27b7c5f9f..6b206167dc 100644 --- a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/configuration/EventMeshGrpcConfigurationTest.java +++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/configuration/EventMeshGrpcConfigurationTest.java @@ -72,7 +72,6 @@ private void assertCommonConfig(CommonConfiguration config) { Assertions.assertEquals("cluster-succeed!!!", config.getEventMeshCluster()); Assertions.assertEquals("name-succeed!!!", config.getEventMeshName()); Assertions.assertEquals("816", config.getSysID()); - Assertions.assertEquals("connector-succeed!!!", config.getEventMeshConnectorPluginType()); Assertions.assertEquals("storage-succeed!!!", config.getEventMeshStoragePluginType()); Assertions.assertEquals("security-succeed!!!", config.getEventMeshSecurityPluginType()); Assertions.assertEquals("metaStorage-succeed!!!", config.getEventMeshMetaStoragePluginType()); diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfigurationTest.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfigurationTest.java index 0965744121..d522ff5519 100644 --- a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfigurationTest.java +++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfigurationTest.java @@ -87,7 +87,6 @@ private void assertCommonConfig(CommonConfiguration config) { Assertions.assertEquals("cluster-succeed!!!", config.getEventMeshCluster()); Assertions.assertEquals("name-succeed!!!", config.getEventMeshName()); Assertions.assertEquals("816", config.getSysID()); - Assertions.assertEquals("connector-succeed!!!", config.getEventMeshConnectorPluginType()); Assertions.assertEquals("storage-succeed!!!", config.getEventMeshStoragePluginType()); Assertions.assertEquals("security-succeed!!!", config.getEventMeshSecurityPluginType()); Assertions.assertEquals("metaStorage-succeed!!!", config.getEventMeshMetaStoragePluginType()); diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfigurationTest.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfigurationTest.java index c16a073d56..1501cf1b5d 100644 --- a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfigurationTest.java +++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfigurationTest.java @@ -74,7 +74,6 @@ private void assertCommonConfig(CommonConfiguration config) { Assertions.assertEquals("cluster-succeed!!!", config.getEventMeshCluster()); Assertions.assertEquals("name-succeed!!!", config.getEventMeshName()); Assertions.assertEquals("816", config.getSysID()); - Assertions.assertEquals("connector-succeed!!!", config.getEventMeshConnectorPluginType()); Assertions.assertEquals("storage-succeed!!!", config.getEventMeshStoragePluginType()); Assertions.assertEquals("security-succeed!!!", config.getEventMeshSecurityPluginType()); Assertions.assertEquals("metaStorage-succeed!!!", config.getEventMeshMetaStoragePluginType()); diff --git a/eventmesh-runtime/src/test/resources/configuration.properties b/eventmesh-runtime/src/test/resources/configuration.properties index 71ad34e19f..70ff82e05c 100644 --- a/eventmesh-runtime/src/test/resources/configuration.properties +++ b/eventmesh-runtime/src/test/resources/configuration.properties @@ -22,7 +22,6 @@ eventMesh.sysid=816 eventMesh.server.cluster=cluster-succeed!!! eventMesh.server.name=name-succeed!!! eventMesh.server.hostIp=hostIp-succeed!!! -eventMesh.connector.plugin.type=connector-succeed!!! eventMesh.storage.plugin.type=storage-succeed!!! eventMesh.security.plugin.type=security-succeed!!! eventMesh.trace.plugin=trace-succeed!!! @@ -33,8 +32,6 @@ eventMesh.metaStorage.plugin.server-addr=server-addr-succeed1!!! eventMesh.metaStorage.plugin.enabled=true eventMesh.metaStorage.plugin.username=username-succeed!!! eventMesh.metaStorage.plugin.password=password-succeed!!! -eventMesh.metaStorage.plugin.registerIntervalInMills=816 -eventMesh.metaStorage.plugin.fetchRegistryAddrIntervalInMills=1816 eventMesh.server.security.enabled=true eventMesh.server.trace.enabled=true diff --git a/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/util/EventMeshCloudEventBuilder.java b/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/util/EventMeshCloudEventBuilder.java index f7ab40c20a..5621b41e0f 100644 --- a/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/util/EventMeshCloudEventBuilder.java +++ b/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/util/EventMeshCloudEventBuilder.java @@ -157,7 +157,7 @@ private static CloudEvent switchEventMeshMessage2EventMeshCloudEvent(EventMeshMe CloudEventAttributeValue.newBuilder().setCeString(Constants.PROTOCOL_DESC_GRPC_CLOUD_EVENT).build()); attributeValueMap.put(ProtocolKey.PRODUCERGROUP, CloudEventAttributeValue.newBuilder().setCeString(clientConfig.getProducerGroup()).build()); - if (null != message.getTopic()) { + if (message.getTopic() != null) { attributeValueMap.put(ProtocolKey.SUBJECT, CloudEventAttributeValue.newBuilder().setCeString(message.getTopic()).build()); } attributeValueMap.put(ProtocolKey.DATA_CONTENT_TYPE, CloudEventAttributeValue.newBuilder().setCeString("text/plain").build()); diff --git a/eventmesh-storage-plugin/eventmesh-storage-rabbitmq/src/main/java/org/apache/eventmesh/storage/rabbitmq/client/RabbitmqClient.java b/eventmesh-storage-plugin/eventmesh-storage-rabbitmq/src/main/java/org/apache/eventmesh/storage/rabbitmq/client/RabbitmqClient.java index 4e44c77fe3..678ba0884d 100644 --- a/eventmesh-storage-plugin/eventmesh-storage-rabbitmq/src/main/java/org/apache/eventmesh/storage/rabbitmq/client/RabbitmqClient.java +++ b/eventmesh-storage-plugin/eventmesh-storage-rabbitmq/src/main/java/org/apache/eventmesh/storage/rabbitmq/client/RabbitmqClient.java @@ -120,7 +120,7 @@ public void unbinding(Channel channel, String exchangeName, String routingKey, S * @param connection connection */ public void closeConnection(Connection connection) { - if (null != connection) { + if (connection != null) { try { connection.close(); } catch (Exception ex) { @@ -135,7 +135,7 @@ public void closeConnection(Connection connection) { * @param channel channel */ public void closeChannel(Channel channel) { - if (null != channel) { + if (channel != null) { try { channel.close(); } catch (Exception ex) {