Skip to content

Commit

Permalink
Improve configuration management of Knative SPI Impl
Browse files Browse the repository at this point in the history
[1] KnativeConsumerImpl and KnativeProducerImpl config improve
[2] KnativeConsumerImpl and KnativeProducerImpl config UT
  • Loading branch information
eight-nines committed Jan 1, 2023
1 parent 2812bf7 commit b3b098c
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,56 @@

package org.apache.eventmesh.connector.knative.config;

import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.config.Config;
import org.apache.eventmesh.common.config.ConfigFiled;
import org.apache.eventmesh.common.config.convert.converter.ListConverter.ListConverterSemi;

import com.google.common.base.Preconditions;
import java.util.List;

@Config(prefix = "eventMesh.server.knative", path = "classPath://knative-client.properties")
public class ClientConfiguration {

@ConfigFiled(field = "service", converter = ListConverterSemi.class)
public List<String> service;

/**
* In keeping with the old way of configuration parsing, the value is taken from the service field [0]
*/
@ConfigFiled(reload = true)
public String emurl = "";

/**
* In keeping with the old way of configuration parsing, the value is taken from the service field [1]
*/
@ConfigFiled(reload = true)
public String serviceAddr = "";

public void init() {
String serviceAddrStr = ConfigurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_KNATIVE_SERVICE_ADDR);
Preconditions.checkState(StringUtils.isNotEmpty(serviceAddrStr),
String.format("%s error", ConfKeys.KEYS_EVENTMESH_KNATIVE_SERVICE_ADDR));
serviceAddr = StringUtils.trim(serviceAddrStr);
String[] temp = serviceAddr.split(";");
emurl = temp[0];
serviceAddr = temp[1];
public void reload() {
emurl = this.service.get(0);
serviceAddr = this.service.get(1);
}

public List<String> getService() {
return service;
}

public void setService(List<String> service) {
this.service = service;
}

static class ConfKeys {
public String getEmurl() {
return emurl;
}

public static final String KEYS_EVENTMESH_KNATIVE_SERVICE_ADDR = "eventMesh.server.knative.service";
public void setEmurl(String emurl) {
this.emurl = emurl;
}

public String getServiceAddr() {
return serviceAddr;
}

public void setServiceAddr(String serviceAddr) {
this.serviceAddr = serviceAddr;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.eventmesh.api.AbstractContext;
import org.apache.eventmesh.api.EventListener;
import org.apache.eventmesh.api.consumer.Consumer;
import org.apache.eventmesh.common.config.Config;
import org.apache.eventmesh.connector.knative.config.ClientConfiguration;

import java.util.List;
Expand All @@ -30,17 +31,21 @@

import io.cloudevents.CloudEvent;

@Config(field = "clientConfiguration")
public class KnativeConsumerImpl implements Consumer {

private PullConsumerImpl pullConsumer;

/**
* Unified configuration class corresponding to knative-client.properties
*/
private ClientConfiguration clientConfiguration;

private static final Logger logger = LoggerFactory.getLogger(KnativeConsumerImpl.class);

@Override
public synchronized void init(Properties properties) throws Exception {
// Load parameters from properties file:
final ClientConfiguration clientConfiguration = new ClientConfiguration();
clientConfiguration.init();
properties.put("emUrl", clientConfiguration.emurl);
properties.put("serviceAddr", clientConfiguration.serviceAddr);

Expand Down Expand Up @@ -90,4 +95,8 @@ public void start() {
public void shutdown() {
pullConsumer.shutdown();
}

public ClientConfiguration getClientConfiguration() {
return this.clientConfiguration;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,26 @@
import org.apache.eventmesh.api.SendCallback;
import org.apache.eventmesh.api.exception.ConnectorRuntimeException;
import org.apache.eventmesh.api.producer.Producer;
import org.apache.eventmesh.common.config.Config;
import org.apache.eventmesh.connector.knative.config.ClientConfiguration;

import java.util.Properties;

import io.cloudevents.CloudEvent;

@Config(field = "clientConfiguration")
public class KnativeProducerImpl implements Producer {

private ProducerImpl producer;

/**
* Unified configuration class corresponding to knative-client.properties
*/
private ClientConfiguration clientConfiguration;

@Override
public synchronized void init(Properties properties) throws Exception {
// Load parameters from properties file:
final ClientConfiguration clientConfiguration = new ClientConfiguration();
clientConfiguration.init();

properties.put("url", clientConfiguration.serviceAddr);
producer = new ProducerImpl(properties);
}
Expand Down Expand Up @@ -90,4 +94,8 @@ public void checkTopicExist(String topic) throws Exception {
public void setExtFields() {
throw new ConnectorRuntimeException("SetExtFields is not supported");
}

public ClientConfiguration getClientConfiguration() {
return this.clientConfiguration;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.connector.knative.config;

import org.apache.eventmesh.api.factory.ConnectorPluginFactory;
import org.apache.eventmesh.connector.knative.consumer.KnativeConsumerImpl;
import org.apache.eventmesh.connector.knative.producer.KnativeProducerImpl;

import org.junit.Assert;
import org.junit.Test;

public class ClientConfigurationTest {

@Test
public void getConfigWhenRocketMQConsumerInit() {
KnativeConsumerImpl consumer =
(KnativeConsumerImpl) ConnectorPluginFactory.getMeshMQPushConsumer("knative");

ClientConfiguration config = consumer.getClientConfiguration();
assertConfig(config);
}

@Test
public void getConfigWhenRocketMQProducerInit() {
KnativeProducerImpl producer =
(KnativeProducerImpl) ConnectorPluginFactory.getMeshMQProducer("knative");

ClientConfiguration config = producer.getClientConfiguration();
assertConfig(config);
}

private void assertConfig(ClientConfiguration config) {
Assert.assertEquals(config.getEmurl(), "127.0.0.1");
Assert.assertEquals(config.getServiceAddr(), "cloudevents-player.default.127.0.0.1.sslip.io");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.eventmesh.connector.knative.consumer;

import org.apache.eventmesh.api.factory.ConnectorPluginFactory;

import java.util.Properties;

import org.junit.Assert;
Expand All @@ -31,7 +33,8 @@ public void testSubscribe() throws Exception {
properties.put("topic", topic);

// Create a Knative consumer:
KnativeConsumerImpl knativeConsumer = new KnativeConsumerImpl();
KnativeConsumerImpl knativeConsumer =
(KnativeConsumerImpl) ConnectorPluginFactory.getMeshMQPushConsumer("knative");

try {
knativeConsumer.init(properties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.eventmesh.api.SendCallback;
import org.apache.eventmesh.api.SendResult;
import org.apache.eventmesh.api.exception.OnExceptionContext;
import org.apache.eventmesh.api.factory.ConnectorPluginFactory;
import org.apache.eventmesh.connector.knative.cloudevent.KnativeMessageFactory;
import org.apache.eventmesh.connector.knative.cloudevent.impl.KnativeHeaders;

Expand All @@ -42,7 +43,8 @@ public void testPublish() throws Exception {
properties.put("data", "Hello Knative from EventMesh!");

// Create a Knative producer:
KnativeProducerImpl knativehProducer = new KnativeProducerImpl();
KnativeProducerImpl knativehProducer =
(KnativeProducerImpl) ConnectorPluginFactory.getMeshMQProducer("knative");

try {
knativehProducer.init(properties);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#######################knative-client##################
eventMesh.server.knative.service=127.0.0.1;cloudevents-player.default.127.0.0.1.sslip.io

0 comments on commit b3b098c

Please sign in to comment.