From f51aa70810baf4640ea7aa98e2d40b19f8d27380 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Mon, 19 Oct 2020 14:33:12 +0800 Subject: [PATCH] Add kafka-client docker images (#195) Add Kafka client docker images that cover version 1.x.0 to 2.y.0 (x = 0 to 1 y = 0 to 6) for integration tests. --- integrations/build.sh | 21 +++- integrations/kafka-client/Dockerfile | 38 ++++++++ integrations/kafka-client/pom.xml | 95 ++++++++++++++++++ .../kafka-client/src/main/java/Main.java | 97 +++++++++++++++++++ .../main/resources/simplelogger.properties | 17 ++++ integrations/publish.sh | 19 +++- 6 files changed, 279 insertions(+), 8 deletions(-) create mode 100644 integrations/kafka-client/Dockerfile create mode 100644 integrations/kafka-client/pom.xml create mode 100644 integrations/kafka-client/src/main/java/Main.java create mode 100644 integrations/kafka-client/src/main/resources/simplelogger.properties diff --git a/integrations/build.sh b/integrations/build.sh index d0aa926c77a39..64cfe8ac82887 100755 --- a/integrations/build.sh +++ b/integrations/build.sh @@ -20,11 +20,24 @@ VERSION=$(${INTR_HOME}/scripts/dev/get-project-version.py) TAG=${VERSION%"-SNAPSHOT"} IMAGE_NAME_PREFIX="kop-test-" -for img_dir in `ls -d ${INTR_HOME}/integrations/*/ | grep -v dev`; do - BASE_NAME=$(basename ${img_dir}) - cd ${img_dir} - IMAGE="streamnative/${IMAGE_NAME_PREFIX}${BASE_NAME}:${TAG}" +build_image() { + IMAGE=$1 echo "Building test image : ${IMAGE}" docker build . -t ${IMAGE} echo "Successfully built test image : ${IMAGE}" +} + +for img_dir in `ls -d ${INTR_HOME}/integrations/*/ | grep -v dev`; do + BASE_NAME=$(basename ${img_dir}) + cd ${img_dir} + if [[ $BASE_NAME == "kafka-client" ]]; then + VERSIONS=(1.0.0 1.1.0 2.0.0 2.1.0 2.2.0 2.3.0 2.4.0 2.5.0 2.6.0) + for VERSION in ${VERSIONS[@]}; do + sed -i '' "s/.*<\/kafka\.version>/$VERSION<\/kafka.version>/" pom.xml + build_image "streamnative/${IMAGE_NAME_PREFIX}${BASE_NAME}-${VERSION}:${TAG}" + sed -i '' "s/.*<\/kafka\.version>/2.6.0<\/kafka.version>/" pom.xml + done + else + build_image "streamnative/${IMAGE_NAME_PREFIX}${BASE_NAME}:${TAG}" + fi done diff --git a/integrations/kafka-client/Dockerfile b/integrations/kafka-client/Dockerfile new file mode 100644 index 0000000000000..0e39db716646b --- /dev/null +++ b/integrations/kafka-client/Dockerfile @@ -0,0 +1,38 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +FROM maven:3.6.3-openjdk-8 AS build +WORKDIR /app +COPY pom.xml . +COPY src ./src +RUN mvn clean package + +FROM openjdk:8-jre +WORKDIR /app +COPY --from=build /app/target/*jar-with-dependencies.jar /app/kafka-client.jar +CMD exec java -cp /app/kafka-client.jar Main +CMD sh -c 'java -cp /app/kafka-client.jar Main; echo "ExitCode=$?"' diff --git a/integrations/kafka-client/pom.xml b/integrations/kafka-client/pom.xml new file mode 100644 index 0000000000000..a8db144d9e1ee --- /dev/null +++ b/integrations/kafka-client/pom.xml @@ -0,0 +1,95 @@ + + + + 4.0.0 + + org.example + kafka-client + 1.0 + + + 2.6.0 + + + + + org.apache.kafka + kafka-clients + ${kafka.version} + + + + org.slf4j + slf4j-simple + 1.7.30 + runtime + + + + + com.fasterxml.jackson.core + jackson-databind + 2.10.5 + + + + + + + maven-compiler-plugin + + 1.8 + 1.8 + UTF-8 + true + true + true + + + + + maven-assembly-plugin + + + jar-with-dependencies + + + + + make-assembly + package + + single + + + + + + + + + src/main/resources + + *.properties + + + + + + diff --git a/integrations/kafka-client/src/main/java/Main.java b/integrations/kafka-client/src/main/java/Main.java new file mode 100644 index 0000000000000..e0b370928f385 --- /dev/null +++ b/integrations/kafka-client/src/main/java/Main.java @@ -0,0 +1,97 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; + +import java.util.Collections; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; + +public class Main { + public static void main(String[] args) { + Map map = System.getenv(); + final String broker = map.getOrDefault("KOP_BROKER", "localhost:9092"); + final String topic = map.getOrDefault("KOP_TOPIC", "kafka-client"); + final int limit = Integer.parseInt(map.getOrDefault("KOP_LIMIT", "10")); + final boolean shouldProduce = Boolean.parseBoolean(map.getOrDefault("KOP_PRODUCE", "false")); + final boolean shouldConsume = Boolean.parseBoolean(map.getOrDefault("KOP_CONSUME", "false")); + final String stringSerializer = "org.apache.kafka.common.serialization.StringSerializer"; + final String stringDeserializer = "org.apache.kafka.common.serialization.StringDeserializer"; + final String group = "Subscription"; + + if (shouldProduce) { + System.out.println("starting to produce"); + + final Properties props = new Properties(); + props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker); + props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, stringSerializer); + props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, stringSerializer); + + final KafkaProducer producer = new KafkaProducer<>(props); + + AtomicInteger numMessagesSent = new AtomicInteger(0); + for (int i = 0; i < limit; i++) { + producer.send(new ProducerRecord<>(topic, "hello from kafka-client"), + (recordMetadata, e) -> { + if (e == null) { + System.out.println("Send to " + recordMetadata); + numMessagesSent.incrementAndGet(); + } else { + System.out.println("Failed to send: " + e.getMessage()); + } + }); + } + + producer.flush(); + producer.close(); + if (numMessagesSent.get() == limit) { + System.out.println("produced all messages successfully"); + } + } + + if (shouldConsume) { + System.out.println("starting to consume"); + + final Properties props = new Properties(); + props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker); + props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, group); + props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, stringDeserializer); + props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, stringDeserializer); + props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + final KafkaConsumer consumer = new KafkaConsumer<>(props); + consumer.subscribe(Collections.singleton(topic)); + + int i = 0; + while (i < limit) { + ConsumerRecords records = consumer.poll(3000); + for (ConsumerRecord record : records) { + System.out.println("Receive " + record); + } + i += records.count(); + } + + consumer.close(); + System.out.println("consumed all messages successfully"); + } + + System.out.println("exiting normally"); + } +} diff --git a/integrations/kafka-client/src/main/resources/simplelogger.properties b/integrations/kafka-client/src/main/resources/simplelogger.properties new file mode 100644 index 0000000000000..8c7396e9be030 --- /dev/null +++ b/integrations/kafka-client/src/main/resources/simplelogger.properties @@ -0,0 +1,17 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.slf4j.simpleLogger.defaultLogLevel=info +org.slf4j.simpleLogger.showDateTime=true +org.slf4j.simpleLogger.dateTimeFormat=yyyy-MM-dd HH:mm:ss:SSS diff --git a/integrations/publish.sh b/integrations/publish.sh index 10ea9c56653b2..557c927530984 100755 --- a/integrations/publish.sh +++ b/integrations/publish.sh @@ -36,11 +36,22 @@ if [ $? -ne 0 ]; then exit 1 fi -for img_dir in `ls -d ${INTR_HOME}/integrations/*/ | grep -v dev`; do - BASE_NAME=$(basename ${img_dir}) - IMAGE="streamnative/${IMAGE_NAME_PREFIX}${BASE_NAME}:${TAG}" - IMAGE_LATEST="streamnative/${IMAGE_NAME_PREFIX}${BASE_NAME}:latest" +push_image() { + IMAGE="streamnative/${IMAGE_NAME_PREFIX}$1:${TAG}" + IMAGE_LATEST="streamnative/${IMAGE_NAME_PREFIX}$1:latest" docker tag ${IMAGE} ${IMAGE_LATEST} docker push ${IMAGE_LATEST} docker push ${IMAGE} +} + +for img_dir in `ls -d ${INTR_HOME}/integrations/*/ | grep -v dev`; do + BASE_NAME=$(basename ${img_dir}) + if [[ $BASE_NAME == "kafka-client" ]]; then + KAFKA_VERSIONS=(1.0.0 1.1.0 2.0.0 2.1.0 2.2.0 2.3.0 2.4.0 2.5.0 2.6.0) + for KAFKA_VERSION in ${KAFKA_VERSIONS[@]}; do + push_image "${BASE_NAME}-${KAFKA_VERSION}" + done + else + push_image $BASE_NAME + fi done