Skip to content

Commit

Permalink
Merge pull request #2680 from eight-nines/MR1
Browse files Browse the repository at this point in the history
[ISSUE #2576] Improve configuration management of Kafka and Redis SPI Impl
  • Loading branch information
xwm1992 authored Dec 31, 2022
2 parents 9d2ef64 + 8e006df commit 027ebd5
Show file tree
Hide file tree
Showing 15 changed files with 341 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.commons.lang3.StringUtils;

import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.Locale;
import java.util.Objects;
Expand Down Expand Up @@ -73,8 +74,10 @@ public <T> T getConfig(Class<?> clazz) {

Config config = configArray[0];
try {
// todo Complete all attributes
ConfigInfo configInfo = new ConfigInfo();
configInfo.setClazz(clazz);
configInfo.setPath(config.path());
configInfo.setHump(config.hump());
configInfo.setPrefix(config.prefix());
configInfo.setMonitor(config.monitor());
Expand All @@ -85,11 +88,11 @@ public <T> T getConfig(Class<?> clazz) {
}
}

public void getConfig(Object object) throws Exception {
public void getConfig(Object object) throws IllegalAccessException, NoSuchFieldException, IOException {
this.getConfig(object, object.getClass());
}

public void getConfig(Object object, Class<?> clazz) throws Exception {
public void getConfig(Object object, Class<?> clazz) throws NoSuchFieldException, IOException, IllegalAccessException {
Config[] configArray = clazz.getAnnotationsByType(Config.class);
if (configArray.length == 0) {
return;
Expand Down Expand Up @@ -127,7 +130,7 @@ public void getConfig(Object object, Class<?> clazz) throws Exception {


@SuppressWarnings("unchecked")
public <T> T getConfig(ConfigInfo configInfo) throws Exception {
public <T> T getConfig(ConfigInfo configInfo) throws IOException {
Object object;

if (Objects.isNull(configInfo.getPath()) || StringUtils.isEmpty(configInfo.getPath().trim())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.Objects;
import java.util.Properties;

Expand Down Expand Up @@ -54,14 +56,14 @@ public static YamlFileLoad getYamlFileLoad() {
return YAML_FILE_LOAD;
}

public <T> T getConfig(ConfigInfo configInfo) throws Exception;
public <T> T getConfig(ConfigInfo configInfo) throws IOException;

class PropertiesFileLoad implements FileLoad {

private final Convert convert = new Convert();

@SuppressWarnings("unchecked")
public <T> T getConfig(ConfigInfo configInfo) throws Exception {
public <T> T getConfig(ConfigInfo configInfo) throws IOException {
Properties properties = new Properties();
properties.load(new BufferedReader(new FileReader(configInfo.getFilePath())));
if (Objects.isNull(configInfo.getClazz())) {
Expand All @@ -72,7 +74,7 @@ public <T> T getConfig(ConfigInfo configInfo) throws Exception {
}

@SuppressWarnings("unchecked")
public <T> T getConfig(Properties properties, ConfigInfo configInfo) throws Exception {
public <T> T getConfig(Properties properties, ConfigInfo configInfo) {
return (T) convert.createObject(configInfo, properties);
}
}
Expand All @@ -81,7 +83,7 @@ class YamlFileLoad implements FileLoad {

@SuppressWarnings("unchecked")
@Override
public <T> T getConfig(ConfigInfo configInfo) throws Exception {
public <T> T getConfig(ConfigInfo configInfo) throws IOException {
Yaml yaml = new Yaml();
return (T) yaml.loadAs(new BufferedInputStream(new FileInputStream(configInfo.getFilePath())), configInfo.getClazz());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.eventmesh.common.config.ConfigInfo;
import org.apache.eventmesh.common.config.NotNull;

import org.apache.commons.lang3.StringUtils;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
Expand Down Expand Up @@ -79,6 +81,7 @@ public class Convert {
this.register(new ConvertList(), List.class, ArrayList.class, LinkedList.class, Vector.class);
this.register(new ConvertMap(), Map.class, HashMap.class, TreeMap.class, LinkedHashMap.class);
this.register(new ConvertIPAddress(), IPAddress.class);
this.register(new ConvertProperties(), Properties.class);
}


Expand Down Expand Up @@ -192,16 +195,21 @@ private void setValue() throws Exception {
} else {
key = keyPrefix.append(configFiled.field()).toString();
}
// todo configFiled.reload() verify
if (!needReload && configFiled != null && configFiled.reload()) {
needReload = Boolean.TRUE;
}

Class<?> clazz = field.getType();
ConvertValue<?> convertValue = classToConvert.get(clazz);
Properties properties = convertInfo.getProperties();
if (clazz.isEnum()) {
String value = convertInfo.getProperties().getProperty(key);
String value = properties.getProperty(key);
convertInfo.setValue(value);
convertValue = convertEnum;
} else if (convertValue instanceof ConvertProperties) {
Properties value = getPropertiesByPrefix(properties, key);
convertInfo.setValue(value);
} else if (Objects.isNull(convertValue)) {
if (Objects.equals("ConfigurationWrapper", clazz.getSimpleName())) {
continue;
Expand All @@ -213,10 +221,10 @@ private void setValue() throws Exception {
} else {
convertInfo.setClazz(field.getType());
}
convertInfo.setProperties(this.convertInfo.getProperties());
convertInfo.setProperties(properties);
convertInfo.setConfigInfo(this.convertInfo.getConfigInfo());
} else {
String value = convertInfo.getProperties().getProperty(key);
String value = properties.getProperty(key);
if (Objects.isNull(value) && convertValue.isNotHandleNullValue()) {
NotNull notNull = field.getAnnotation(NotNull.class);
if (Objects.nonNull(notNull)) {
Expand All @@ -226,6 +234,11 @@ private void setValue() throws Exception {
}
convertInfo.setValue(value);
}

if (Objects.isNull(convertInfo.getValue())) {
continue;
}

convertInfo.setField(field);
convertInfo.setKey(key);
Object value = convertValue.convert(convertInfo);
Expand Down Expand Up @@ -286,74 +299,76 @@ private static class ConvertCharacter implements ConvertValue<Character> {

@Override
public Character convert(ConvertInfo convertInfo) {
return convertInfo.getValue().charAt(0);
String value = (String) convertInfo.getValue();
return value.charAt(0);
}
}

private static class ConvertBoolean implements ConvertValue<Boolean> {

@Override
public Boolean convert(ConvertInfo convertInfo) {
if (Objects.equals(convertInfo.getValue().length(), 1)) {
String value = (String) convertInfo.getValue();
if (Objects.equals(value.length(), 1)) {
return Objects.equals(convertInfo.getValue(), "1") ? Boolean.TRUE : Boolean.FALSE;
}
return Boolean.valueOf(convertInfo.getValue());
return Boolean.valueOf((String) convertInfo.getValue());
}
}

private static class ConvertByte implements ConvertValue<Byte> {

@Override
public Byte convert(ConvertInfo convertInfo) {
return Byte.valueOf(convertInfo.getValue());
return Byte.valueOf((String) convertInfo.getValue());
}
}

private static class ConvertShort implements ConvertValue<Short> {

@Override
public Short convert(ConvertInfo convertInfo) {
return Short.valueOf(convertInfo.getValue());
return Short.valueOf((String) convertInfo.getValue());
}
}

private static class ConvertInteger implements ConvertValue<Integer> {

@Override
public Integer convert(ConvertInfo convertInfo) {
return Integer.valueOf(convertInfo.getValue());
return Integer.valueOf((String) convertInfo.getValue());
}
}

private static class ConvertLong implements ConvertValue<Long> {

@Override
public Long convert(ConvertInfo convertInfo) {
return Long.valueOf(convertInfo.getValue());
return Long.valueOf((String) convertInfo.getValue());
}
}

private static class ConvertFloat implements ConvertValue<Float> {

@Override
public Float convert(ConvertInfo convertInfo) {
return Float.valueOf(convertInfo.getValue());
return Float.valueOf((String) convertInfo.getValue());
}
}

private static class ConvertDouble implements ConvertValue<Double> {

@Override
public Double convert(ConvertInfo convertInfo) {
return Double.valueOf(convertInfo.getValue());
return Double.valueOf((String) convertInfo.getValue());
}
}

private static class ConvertString implements ConvertValue<String> {

@Override
public String convert(ConvertInfo convertInfo) {
return convertInfo.getValue();
return (String) convertInfo.getValue();
}
}

Expand All @@ -363,7 +378,7 @@ private static class ConvertDate implements ConvertValue<Date> {
public Date convert(ConvertInfo convertInfo) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
try {
return sdf.parse(convertInfo.getValue());
return sdf.parse((String) convertInfo.getValue());
} catch (ParseException e) {
throw new RuntimeException(e);
}
Expand All @@ -374,7 +389,7 @@ private static class ConvertLocalDate implements ConvertValue<LocalDate> {

@Override
public LocalDate convert(ConvertInfo convertInfo) {
return LocalDate.parse(convertInfo.getValue(), DateTimeFormatter.ofPattern("yyyy-MM-dd"));
return LocalDate.parse((String) convertInfo.getValue(), DateTimeFormatter.ofPattern("yyyy-MM-dd"));
}

}
Expand All @@ -383,7 +398,7 @@ private static class ConvertLocalDateTime implements ConvertValue<LocalDateTime>

@Override
public LocalDateTime convert(ConvertInfo convertInfo) {
return LocalDateTime.parse(convertInfo.getValue(), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
return LocalDateTime.parse((String) convertInfo.getValue(), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
}

}
Expand All @@ -393,7 +408,7 @@ private static class ConvertEnum implements ConvertValue<Enum<?>> {
@SuppressWarnings({"unchecked", "rawtypes"})
@Override
public Enum<?> convert(ConvertInfo convertInfo) {
return Enum.valueOf((Class<Enum>) convertInfo.getField().getType(), convertInfo.getValue());
return Enum.valueOf((Class<Enum>) convertInfo.getField().getType(), (String) convertInfo.getValue());
}

}
Expand All @@ -411,7 +426,7 @@ public List<Object> convert(ConvertInfo convertInfo) {
if (convertInfo.getValue() == null) {
return new ArrayList<>();
}
List<String> values = Splitter.on(",").omitEmptyStrings().trimResults().splitToList(convertInfo.getValue());
List<String> values = Splitter.on(",").omitEmptyStrings().trimResults().splitToList((String) convertInfo.getValue());
List<Object> list;
if (Objects.equals(convertInfo.getField().getType(), List.class)) {
list = new ArrayList<>();
Expand Down Expand Up @@ -479,19 +494,40 @@ private static class ConvertIPAddress implements ConvertValue<IPAddress> {
@Override
public IPAddress convert(ConvertInfo convertInfo) {
try {
return new IPAddressString(convertInfo.getValue()).toAddress();
return new IPAddressString((String) convertInfo.getValue()).toAddress();
} catch (AddressStringException e) {
throw new RuntimeException(e);
}
}
}

private static class ConvertProperties implements ConvertValue<Properties> {

@Override
public Properties convert(ConvertInfo convertInfo) {

try {
return (Properties) convertInfo.getValue();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

private static Properties getPropertiesByPrefix(Properties properties, String prefix) {
if (StringUtils.isBlank(prefix)) {
return null;
}
Properties to = new Properties();
return PropertiesUtils.getPropertiesByPrefix(properties, to, prefix);
}

@Data
static class ConvertInfo {
char hump;
String key;
Field field;
String value;
Object value;
Class<?> clazz;
Properties properties;
ConfigInfo configInfo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,62 @@

package org.apache.eventmesh.connector.kafka.config;

import org.apache.eventmesh.common.config.Config;
import org.apache.eventmesh.common.config.ConfigFiled;

import org.apache.commons.lang3.StringUtils;

import com.google.common.base.Preconditions;

@Config(prefix = "eventMesh.server.kafka", path = "classPath://kafka-client.properties")
public class ClientConfiguration {

@ConfigFiled(field = "namesrvAddr")
public String namesrvAddr = "";

@ConfigFiled(field = "username")
public String clientUserName = "username";

@ConfigFiled(field = "password")
public String clientPass = "password";

@ConfigFiled(field = "client.consumeThreadMin")
public Integer consumeThreadMin = 2;

@ConfigFiled(field = "client.consumeThreadMax")
public Integer consumeThreadMax = 2;

@ConfigFiled(field = "client.consumeThreadPoolQueueSize")
public Integer consumeQueueSize = 10000;

@ConfigFiled(field = "client.pullBatchSize")
public Integer pullBatchSize = 32;

@ConfigFiled(field = "client.ackwindow")
public Integer ackWindow = 1000;

@ConfigFiled(field = "client.pubwindow")
public Integer pubWindow = 100;

@ConfigFiled(field = "client.comsumeTimeoutInMin")
public long consumeTimeout = 0L;

@ConfigFiled(field = "client.pollNameServerInterval")
public Integer pollNameServerInterval = 10 * 1000;

@ConfigFiled(field = "client.heartbeatBrokerInterval")
public Integer heartbeatBrokerInterval = 30 * 1000;

@ConfigFiled(field = "client.rebalanceInterval")
public Integer rebalanceInterval = 20 * 1000;

@ConfigFiled(field = "cluster")
public String clusterName = "";

@ConfigFiled(field = "accessKey")
public String accessKey = "";

@ConfigFiled(field = "secretKey")
public String secretKey = "";

public void init() {
Expand All @@ -49,7 +84,7 @@ public void init() {

static class ConfKeys {

public static String KEYS_EVENTMESH_KAFKA_SERVER_PORT = "eventMesh.server.kafka.port";
public static String KEYS_EVENTMESH_KAFKA_SERVER_PORT = "eventMesh.server.kafka.namesrvAddr";

}
}
Loading

0 comments on commit 027ebd5

Please sign in to comment.