Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MINOR] Fix plugin cannot load properties from classpath #763

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,6 @@ public class Constants {

public static final String MESSAGE_PROP_SEPARATOR = "99";

public static final String EVENTMESH_CONF_HOME = System.getProperty("confPath", System.getenv("confPath"));

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.eventmesh.connector.rocketmq.config;

import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.connector.rocketmq.common.EventMeshConstants;

import org.apache.commons.lang3.StringUtils;
Expand All @@ -25,7 +26,7 @@
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.URL;
import java.io.InputStream;
import java.util.Properties;

import lombok.experimental.UtilityClass;
Expand All @@ -38,27 +39,33 @@ public class ConfigurationWrapper {
private static final Properties properties = new Properties();

static {
String configFile = getConfigFilePath();
log.info("loading config: {}", configFile);
try {
properties.load(new BufferedReader(new FileReader(configFile)));
} catch (IOException e) {
throw new IllegalArgumentException(
String.format("Cannot load RocketMQ configuration file from :%s", configFile));
}
loadProperties();
}

public String getProp(String key) {
return StringUtils.isEmpty(key) ? null : properties.getProperty(key, null);
}

private static String getConfigFilePath() {
// get from classpath
URL resource = ConfigurationWrapper.class.getClassLoader().getResource(EventMeshConstants.EVENTMESH_CONF_FILE);
if (resource != null && new File(resource.getPath()).exists()) {
return resource.getPath();
/**
* Load rocketmq properties file from classpath and conf home.
* The properties defined in conf home will override classpath.
*/
private void loadProperties() {
try (InputStream resourceAsStream = ConfigurationWrapper.class.getResourceAsStream(
File.separator + EventMeshConstants.EVENTMESH_CONF_FILE)) {
if (resourceAsStream != null) {
properties.load(resourceAsStream);
}
} catch (IOException e) {
throw new RuntimeException(String.format("Load %s.properties file from classpath error", EventMeshConstants.EVENTMESH_CONF_FILE));
}
try {
String configPath = Constants.EVENTMESH_CONF_HOME + File.separator + EventMeshConstants.EVENTMESH_CONF_FILE;
if (new File(configPath).exists()) {
properties.load(new BufferedReader(new FileReader(configPath)));
}
} catch (IOException e) {
throw new IllegalArgumentException(String.format("Cannot load %s file from conf", EventMeshConstants.EVENTMESH_CONF_FILE));
}
// get from config home
return EventMeshConstants.EVENTMESH_CONF_HOME + File.separator + EventMeshConstants.EVENTMESH_CONF_FILE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@

package org.apache.eventmesh.metrics.opentelemetry.config;

import org.apache.eventmesh.common.Constants;

import org.apache.commons.lang3.StringUtils;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URL;
import java.util.Properties;

import lombok.experimental.UtilityClass;
Expand Down Expand Up @@ -56,25 +56,25 @@ private void initializeConfig() {
}
}

/**
* Load properties file from classpath and conf home.
* The properties defined in conf home will override classpath.
*/
private void loadProperties() {
URL resource = OpenTelemetryConfiguration.class.getClassLoader().getResource(CONFIG_FILE);
if (resource != null) {
try (InputStream inputStream = resource.openStream()) {
if (inputStream.available() > 0) {
properties.load(new BufferedReader(new InputStreamReader(inputStream)));
}
} catch (IOException e) {
throw new RuntimeException("Load opentelemetry.properties file from classpath error");
try (InputStream resourceAsStream = OpenTelemetryConfiguration.class.getResourceAsStream(File.separator + CONFIG_FILE)) {
if (resourceAsStream != null) {
properties.load(resourceAsStream);
}
} catch (IOException e) {
throw new RuntimeException(String.format("Load %s file from classpath error", CONFIG_FILE));
}
// get from config home
try {
String configPath = System.getProperty("confPath", System.getenv("confPath")) + File.separator + CONFIG_FILE;
String configPath = Constants.EVENTMESH_CONF_HOME + File.separator + CONFIG_FILE;
if (new File(configPath).exists()) {
properties.load(new BufferedReader(new FileReader(configPath)));
}
} catch (IOException e) {
throw new IllegalArgumentException("Cannot load opentelemetry.properties file from conf");
throw new IllegalArgumentException(String.format("Cannot load %s file from conf", CONFIG_FILE));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@
# limitations under the License.
#

#prometheusPort
# You can get the export metrics by this port
eventMesh.metrics.prometheus.port=19090
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.metrics.opentelemetry.config;

import org.junit.Assert;
import org.junit.Test;

public class OpenTelemetryConfigurationTest {

@Test
public void getEventMeshPrometheusPort() {
int eventMeshPrometheusPort = OpenTelemetryConfiguration.getEventMeshPrometheusPort();
Assert.assertEquals(19091, eventMeshPrometheusPort);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

eventMesh.metrics.prometheus.port=19091
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@ public RRCallbackResponseHandlerAdapter(ProtocolMessage protocolMessage, RRCallb
long timeout) {
Preconditions.checkNotNull(rrCallback, "rrCallback invalid");
Preconditions.checkNotNull(protocolMessage, "message invalid");
if (!(protocolMessage instanceof EventMeshMessage) && !(protocolMessage instanceof CloudEvent)
&& !(protocolMessage instanceof Message)) {
if (!(protocolMessage instanceof EventMeshMessage)
&& !(protocolMessage instanceof CloudEvent)
&& !(protocolMessage instanceof Message)) {
throw new IllegalArgumentException(String.format("ProtocolMessage: %s is not supported", protocolMessage));
}
this.protocolMessage = protocolMessage;
Expand Down Expand Up @@ -95,15 +96,18 @@ public String handleResponse(HttpResponse response) throws IOException {
return protocolMessage.toString();
}

@SuppressWarnings("unchecked")
private ProtocolMessage transformToProtocolMessage(EventMeshRetObj ret) {
// todo: constructor other protocol message, can judge by protocol type in properties
SendMessageResponseBody.ReplyMessage replyMessage = JsonUtils.deserialize(ret.getRetMsg(),
SendMessageResponseBody.ReplyMessage.class);
EventMeshMessage eventMeshMessage = EventMeshMessage.builder()
SendMessageResponseBody.ReplyMessage replyMessage = JsonUtils.deserialize(ret.getRetMsg(), SendMessageResponseBody.ReplyMessage.class);
if (protocolMessage instanceof EventMeshMessage) {
EventMeshMessage eventMeshMessage = EventMeshMessage.builder()
.content(replyMessage.body)
.prop(replyMessage.properties)
.topic(replyMessage.topic)
.build();
return (ProtocolMessage) eventMeshMessage;
return (ProtocolMessage) eventMeshMessage;
}
// todo: constructor other protocol message
throw new RuntimeException("Unsupported callback message type");
}
}