Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #4838] Deprecate unused eventMesh.connector.plugin.type etc. properties #4839

Merged
merged 5 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,12 @@ public EventMeshThreadFactory(final String threadNamePrefix) {
public Thread newThread(@Nonnull final Runnable runnable) {

StringBuilder threadName = new StringBuilder(threadNamePrefix);
if (null != threadIndex) {
if (threadIndex != null) {
threadName.append("-").append(threadIndex.incrementAndGet());
}
Thread thread = new Thread(runnable, threadName.toString());
thread.setDaemon(daemon);
if (null != priority) {
if (priority != null) {
thread.setPriority(priority);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,6 @@ public class CommonConfiguration {
@ConfigField(field = "metaStorage.plugin.password")
private String eventMeshMetaStoragePluginPassword = "";

@ConfigField(field = "metaStorage.plugin.metaStorageIntervalInMills")
private Integer eventMeshMetaStorageIntervalInMills = 10 * 1000;

@ConfigField(field = "metaStorage.plugin.fetchMetaStorageAddrIntervalInMills")
private Integer eventMeshFetchMetaStorageAddrInterval = 10 * 1000;

@ConfigField(field = "metaStorage.plugin.enabled")
private boolean eventMeshServerMetaStorageEnable = false;

Expand All @@ -85,11 +79,8 @@ public class CommonConfiguration {
@ConfigField(field = "security.plugin.type", notEmpty = true)
private String eventMeshSecurityPluginType = "security";

@ConfigField(field = "connector.plugin.type", notEmpty = true)
private String eventMeshConnectorPluginType = "rocketmq";

@ConfigField(field = "storage.plugin.type", notEmpty = true)
private String eventMeshStoragePluginType = "rocketmq";
private String eventMeshStoragePluginType = "standalone";

@ConfigField(field = "security.validation.type.token", notEmpty = true)
private boolean eventMeshSecurityValidateTypeToken = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ public void testGetCommonConfiguration() {
Assertions.assertEquals("cluster-succeed!!!", config.getEventMeshCluster());
Assertions.assertEquals("name-succeed!!!", config.getEventMeshName());
Assertions.assertEquals("816", config.getSysID());
// Assertions.assertEquals("connector-succeed!!!", config.getEventMeshConnectorPluginType());
Assertions.assertEquals("storage-succeed!!!", config.getEventMeshStoragePluginType());
Assertions.assertEquals("storage-succeed!!!", config.getEventMeshStoragePluginType());
Assertions.assertEquals("security-succeed!!!", config.getEventMeshSecurityPluginType());
Expand All @@ -55,9 +54,6 @@ public void testGetCommonConfiguration() {
Assertions.assertEquals("username-succeed!!!", config.getEventMeshMetaStoragePluginUsername());
Assertions.assertEquals("password-succeed!!!", config.getEventMeshMetaStoragePluginPassword());

Assertions.assertEquals(Integer.valueOf(816), config.getEventMeshMetaStorageIntervalInMills());
Assertions.assertEquals(Integer.valueOf(1816), config.getEventMeshFetchMetaStorageAddrInterval());

List<String> list = new ArrayList<>();
list.add("metrics-succeed1!!!");
list.add("metrics-succeed2!!!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ public class SystemUtilsTest {

@Test
public void isLinuxPlatform() {
if (null != SystemUtils.OS_NAME && SystemUtils.OS_NAME.toLowerCase().contains("linux")) {
if (SystemUtils.OS_NAME != null && SystemUtils.OS_NAME.toLowerCase().contains("linux")) {
Assertions.assertTrue(SystemUtils.isLinuxPlatform());
Assertions.assertFalse(SystemUtils.isWindowsPlatform());
}
}

@Test
public void isWindowsPlatform() {
if (null != SystemUtils.OS_NAME && SystemUtils.OS_NAME.toLowerCase().contains("windows")) {
if (SystemUtils.OS_NAME != null && SystemUtils.OS_NAME.toLowerCase().contains("windows")) {
Assertions.assertFalse(SystemUtils.isLinuxPlatform());
Assertions.assertTrue(SystemUtils.isWindowsPlatform());
}
Expand Down
3 changes: 0 additions & 3 deletions eventmesh-common/src/test/resources/configuration.properties
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,10 @@ eventMesh.sysid=816
eventMesh.server.cluster=cluster-succeed!!!
eventMesh.server.name=name-succeed!!!
eventMesh.server.hostIp=hostIp-succeed!!!
eventMesh.connector.plugin.type=connector-succeed!!!
eventMesh.storage.plugin.type=storage-succeed!!!
eventMesh.security.plugin.type=security-succeed!!!
eventMesh.metaStorage.plugin.type=metaStorage-succeed!!!
eventMesh.trace.plugin=trace-succeed!!!
eventMesh.metaStorage.plugin.metaStorageIntervalInMills=816
eventMesh.metaStorage.plugin.fetchMetaStorageAddrIntervalInMills=1816
eventMesh.metrics.plugin=metrics-succeed1!!!,metrics-succeed2!!!,metrics-succeed3!!!
eventMesh.metaStorage.plugin.server-addr=server-addr-succeed1!!!

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ protected void registerType(Type type) {
@Override
public String getTypeName(Dialect hibernateDialect, Column<?> column) {
Type type = this.getType(column);
if (null != type) {
if (type != null) {
return type.getTypeName(column);
}
Long length = Optional.ofNullable(column.getColumnLength()).orElse(0L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public void enterTableOptionCharset(TableOptionCharsetContext ctx) {
@Override
public void enterTableOptionAutoIncrement(TableOptionAutoIncrementContext ctx) {
DecimalLiteralContext decimalLiteralContext = ctx.decimalLiteral();
if (null != decimalLiteralContext) {
if (decimalLiteralContext != null) {
String autoIncrementNumber = Antlr4Utils.getText(decimalLiteralContext);
this.tableEditor.withOption(MysqlTableOptions.AUTO_INCREMENT, autoIncrementNumber);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ private void enableGtidHandle() {
EventMeshGtidSet purgedServerEventMeshGtidSet = new EventMeshGtidSet(purgedServerGtid);

EventMeshGtidSet filteredEventMeshGtidSet = filterGtidSet(context, executedEventMeshGtidSet, purgedServerEventMeshGtidSet);
if (null != filteredEventMeshGtidSet) {
if (filteredEventMeshGtidSet != null) {
client.setGtidSet(filteredEventMeshGtidSet.toString());
this.context.completedGtidSet(filteredEventMeshGtidSet.toString());
localGtidSet = new com.github.shyiko.mysql.binlog.GtidSet(filteredEventMeshGtidSet.toString());
Expand Down Expand Up @@ -645,7 +645,7 @@ private void handleCdcDmlData(MysqlJdbcContext context, MysqlSourceMateData sour
schema.addKeys(tableSchema.getPrimaryKey().getColumnNames());
Pair<Serializable[], BitSet> beforePair = Optional.ofNullable(pair.getLeft()).orElse(new Pair<>());
Serializable[] beforeRows = beforePair.getLeft();
if (null != beforeRows && beforeRows.length != 0) {
if (beforeRows != null && beforeRows.length != 0) {
BitSet includedColumns = beforePair.getRight();
Map<String, Object> beforeValues = new HashMap<>(beforeRows.length);
for (int index = 0; index < columnsSize; ++index) {
Expand All @@ -663,7 +663,7 @@ private void handleCdcDmlData(MysqlJdbcContext context, MysqlSourceMateData sour

Pair<Serializable[], BitSet> afterPair = Optional.ofNullable(pair.getRight()).orElse(new Pair<>());
Serializable[] afterRows = afterPair.getLeft();
if (null != afterRows && afterRows.length != 0) {
if (afterRows != null && afterRows.length != 0) {
BitSet includedColumns = afterPair.getRight();
Map<String, Object> afterValues = new HashMap<>(afterRows.length);
for (int index = 0; index < columnsSize; ++index) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public Table(TableId tableId, PrimaryKey primaryKey, List<UniqueKey> uniqueKeys,
this.primaryKey = primaryKey;
this.uniqueKeys = uniqueKeys;
this.comment = comment;
if (null != options) {
if (options != null) {
this.options.putAll(options);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,12 +313,12 @@ private String createInteractiveContent(ConnectRecord connectRecord, String titl

private boolean needAtAll(ConnectRecord connectRecord) {
String atAll = connectRecord.getExtension(ConnectRecordExtensionKeys.AT_ALL_4_LARK);
return null != atAll && !"null".equals(atAll) && Boolean.parseBoolean(atAll);
return atAll != null && !"null".equals(atAll) && Boolean.parseBoolean(atAll);
}

private String needAtUser(ConnectRecord connectRecord) {
String atUsers = connectRecord.getExtension(ConnectRecordExtensionKeys.AT_USERS_4_LARK);
return null != atUsers && !"null".equals(atUsers) ? atUsers : "";
return atUsers != null && !"null".equals(atUsers) ? atUsers : "";
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public void unbinding(Channel channel, String exchangeName, String routingKey, S
* @param connection connection
*/
public void closeConnection(Connection connection) {
if (null != connection) {
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
Expand All @@ -135,7 +135,7 @@ public void closeConnection(Connection connection) {
* @param channel channel
*/
public void closeChannel(Channel channel) {
if (null != channel) {
if (channel != null) {
try {
channel.close();
} catch (Exception ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void init() throws MetaException {
if (initStatus.compareAndSet(false, true)) {
for (String key : ConfigurationContextUtil.KEYS) {
CommonConfiguration commonConfiguration = ConfigurationContextUtil.get(key);
if (null != commonConfiguration) {
if (commonConfiguration != null) {
String metaStorageAddr = commonConfiguration.getMetaStorageAddr();
if (StringUtils.isBlank(metaStorageAddr)) {
throw new MetaException("namesrvAddr cannot be null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public void shutdown() throws MetaException {
if (!startStatus.compareAndSet(true, false)) {
return;
}
if (null != zkClient) {
if (zkClient != null) {
zkClient.close();
}
log.info("ZookeeperRegistryService closed");
Expand Down
14 changes: 3 additions & 11 deletions eventmesh-operator/config/samples/eventmesh_v1_runtime.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@ data:
eventMesh.server.cluster=COMMON
eventMesh.server.name=EVENTMESH-runtime
eventMesh.sysid=0000
eventMesh.server.tcp.port=10000
eventMesh.server.http.port=10105
eventMesh.server.grpc.port=10205
# HTTP Admin Server
eventMesh.server.admin.http.port=10106
########################## eventMesh tcp configuration ############################
eventMesh.server.tcp.enabled=true
eventMesh.server.tcp.port=10000
eventMesh.server.tcp.readerIdleSeconds=120
eventMesh.server.tcp.writerIdleSeconds=120
eventMesh.server.tcp.allIdleSeconds=120
Expand Down Expand Up @@ -69,13 +71,6 @@ data:
eventMesh.server.retry.async.pushRetryDelayInMills=500
eventMesh.server.retry.sync.pushRetryDelayInMills=500
eventMesh.server.retry.pushRetryQueueSize=10000
#admin
eventMesh.server.admin.http.port=10106
#metaStorage
eventMesh.server.metaStorage.metaStorageIntervalInMills=10000
eventMesh.server.metaStorage.fetchMetaStorageAddrIntervalInMills=20000
#auto-ack
#eventMesh.server.defibus.client.comsumeTimeoutInMin=5

#sleep interval between closing client of different group in server graceful shutdown
eventMesh.server.gracefulShutdown.sleepIntervalInMills=1000
Expand All @@ -85,9 +80,6 @@ data:
eventMesh.server.blacklist.ipv4=0.0.0.0/8,127.0.0.0/8,169.254.0.0/16,255.255.255.255/32
eventMesh.server.blacklist.ipv6=::/128,::1/128,ff00::/8

#connector plugin
eventMesh.connector.plugin.type=standalone

#storage plugin
eventMesh.storage.plugin.type=standalone

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public ProtocolTransportObject fromCloudEvent(CloudEvent cloudEvent) throws Prot
}
httpEventWrapper.setSysHeaderMap(sysHeaderMap);
// ce data
if (null != cloudEvent.getData()) {
if (cloudEvent.getData() != null) {
Map<String, Object> dataContentMap = JsonUtils.parseTypeReferenceObject(
new String(Objects.requireNonNull(cloudEvent.getData()).toBytes(), Constants.DEFAULT_CHARSET),
new TypeReference<Map<String, Object>>() {
Expand Down
22 changes: 8 additions & 14 deletions eventmesh-runtime/conf/eventmesh.properties
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@ eventMesh.server.provide.protocols=HTTP,TCP,GRPC
eventMesh.server.cluster=COMMON
eventMesh.server.name=EVENTMESH-runtime
eventMesh.sysid=0000
eventMesh.server.tcp.port=10000
eventMesh.server.http.port=10105
eventMesh.server.grpc.port=10205
# HTTP Admin Server
eventMesh.server.admin.http.port=10106

########################## EventMesh TCP Configuration ##########################
eventMesh.server.tcp.enabled=true
eventMesh.server.tcp.port=10000
eventMesh.server.tcp.readerIdleSeconds=120
eventMesh.server.tcp.writerIdleSeconds=120
eventMesh.server.tcp.allIdleSeconds=120
Expand Down Expand Up @@ -57,14 +60,6 @@ eventMesh.server.retry.sync.pushRetryDelayInMills=500
eventMesh.server.retry.pushRetryQueueSize=10000
eventMesh.server.retry.plugin.type=default

# runtime admin
eventMesh.server.admin.http.port=10106
# metaStorage
eventMesh.server.metaStorage.metaStorageIntervalInMills=10000
eventMesh.server.metaStorage.fetchMetaStorageAddrIntervalInMills=20000
# auto-ack
#eventMesh.server.defibus.client.comsumeTimeoutInMin=5

# sleep interval between closing client of different group in server graceful shutdown
eventMesh.server.gracefulShutdown.sleepIntervalInMills=1000
eventMesh.server.rebalanceRedirect.sleepIntervalInMills=200
Expand All @@ -73,9 +68,7 @@ eventMesh.server.rebalanceRedirect.sleepIntervalInMills=200
eventMesh.server.blacklist.ipv4=0.0.0.0/8,127.0.0.0/8,169.254.0.0/16,255.255.255.255/32
eventMesh.server.blacklist.ipv6=::/128,::1/128,ff00::/8

# connector plugin
eventMesh.connector.plugin.type=standalone

########################## EventMesh Plugin Configuration ##########################
# storage plugin
eventMesh.storage.plugin.type=standalone

Expand All @@ -91,6 +84,7 @@ eventMesh.metaStorage.plugin.type=nacos
eventMesh.metaStorage.plugin.server-addr=127.0.0.1:8848
eventMesh.metaStorage.plugin.username=nacos
eventMesh.metaStorage.plugin.password=nacos

# metaStorage plugin: nacos
#eventMesh.metaStorage.nacos.endpoint=
#eventMesh.metaStorage.nacos.accessKey=
Expand Down Expand Up @@ -137,9 +131,9 @@ eventMesh.trace.plugin=zipkin
eventMesh.webHook.admin.start=true
# Webhook event configuration storage mode. Currently, only file and nacos are supported
eventMesh.webHook.operationMode=file
# The file storage path of the file storage mode. If #{eventmeshhome} is written, it is in the eventmesh root directory
# The file storage path of the file storage mode. If #{eventMeshHome} is written, it is in the EventMesh root directory
eventMesh.webHook.fileMode.filePath= #{eventMeshHome}/webhook
# Nacos storage mode, and the configuration naming rule is eventmesh webHook. nacosMode. {nacos native configuration key} please see the specific configuration [nacos github api](https://github.com/alibaba/nacos/blob/develop/api/src/main/java/com/alibaba/nacos/api/SystemPropertyKeyConst.java)
# Nacos storage mode, and the configuration naming rule is EventMesh webHook. nacosMode. {nacos native configuration key} please see the specific configuration [nacos github api](https://github.com/alibaba/nacos/blob/develop/api/src/main/java/com/alibaba/nacos/api/SystemPropertyKeyConst.java)
## Address of Nacos
eventMesh.webHook.nacosMode.serverAddr=127.0.0.1:8848
# Webhook CloudEvent sending mode. This property is the same as the eventMesh.storage.plugin.type configuration.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,11 +451,11 @@ public void channelReadComplete(final ChannelHandlerContext ctx) throws Exceptio

@Override
public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
if (null != cause) {
if (cause != null) {
log.error("", cause);
}

if (null != ctx) {
if (ctx != null) {
ctx.close();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.eventmesh.common.utils.ConfigurationContextUtil;
import org.apache.eventmesh.runtime.acl.Acl;
import org.apache.eventmesh.runtime.common.ServiceState;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.http.producer.ProducerTopicManager;
import org.apache.eventmesh.runtime.meta.MetaStorage;
import org.apache.eventmesh.runtime.storage.StorageResource;
Expand Down Expand Up @@ -139,13 +138,10 @@ public void init() throws Exception {
adminBootstrap.init();
}

final String eventStore = System.getProperty(EventMeshConstants.EVENT_STORE_PROPERTIES, System.getenv(EventMeshConstants.EVENT_STORE_ENV));

log.info("eventStore : {}", eventStore);
producerTopicManager = new ProducerTopicManager(this);
producerTopicManager.init();
serviceState = ServiceState.INITED;

serviceState = ServiceState.INITED;
log.info(SERVER_STATE_MSG, serviceState);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@

public class EventMeshConstants {

public static final String EVENT_STORE_PROPERTIES = "eventstore";

public static final String EVENT_STORE_ENV = "EVENT_STORE";

public static final String PROTOCOL_HTTP = "http";

public static final String PROTOCOL_TCP = "tcp";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,5 +91,4 @@ public boolean reply(final CloudEvent cloudEvent, final SendCallback sendCallbac
public Producer getMeshMQProducer() {
return meshMQProducer;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,11 @@

package org.apache.eventmesh.runtime.core.plugin;

import org.apache.eventmesh.runtime.constants.EventMeshConstants;

import org.apache.commons.lang3.StringUtils;

import java.util.concurrent.atomic.AtomicBoolean;

public abstract class MQWrapper {

public static final String EVENT_STORE_DEFIBUS = "defibus";

public static String CURRENT_EVENT_STORE = EVENT_STORE_DEFIBUS;

public static final String EVENT_STORE_CONF = System.getProperty(EventMeshConstants.EVENT_STORE_PROPERTIES,
System.getenv(EventMeshConstants.EVENT_STORE_ENV));

static {
if (StringUtils.isNotBlank(EVENT_STORE_CONF)) {
CURRENT_EVENT_STORE = EVENT_STORE_CONF;
}
}

public AtomicBoolean started = new AtomicBoolean(Boolean.FALSE);

public AtomicBoolean inited = new AtomicBoolean(Boolean.FALSE);

}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ public synchronized void init(CommonConfiguration configuration,
keyValue.put(EventMeshConstants.INSTANCE_NAME, EventMeshUtil.buildMeshClientID(
producerGroupConfig.getGroupName(), configuration.getEventMeshCluster()));

// TODO for defibus
keyValue.put(EventMeshConstants.EVENT_MESH_IDC, configuration.getEventMeshIDC());
mqProducerWrapper = new MQProducerWrapper(
configuration.getEventMeshStoragePluginType());
Expand Down
Loading
Loading