Skip to content

Commit

Permalink
Add kafka-client docker images (apache#195)
Browse files Browse the repository at this point in the history
Add Kafka client docker images that cover version 1.x.0 to 2.y.0 (x = 0 to 1 y = 0 to 6) for integration tests.
  • Loading branch information
BewareMyPower authored Oct 19, 2020
1 parent 41f274c commit f51aa70
Show file tree
Hide file tree
Showing 6 changed files with 279 additions and 8 deletions.
21 changes: 17 additions & 4 deletions integrations/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,24 @@ VERSION=$(${INTR_HOME}/scripts/dev/get-project-version.py)
TAG=${VERSION%"-SNAPSHOT"}
IMAGE_NAME_PREFIX="kop-test-"

for img_dir in `ls -d ${INTR_HOME}/integrations/*/ | grep -v dev`; do
BASE_NAME=$(basename ${img_dir})
cd ${img_dir}
IMAGE="streamnative/${IMAGE_NAME_PREFIX}${BASE_NAME}:${TAG}"
build_image() {
IMAGE=$1
echo "Building test image : ${IMAGE}"
docker build . -t ${IMAGE}
echo "Successfully built test image : ${IMAGE}"
}

for img_dir in `ls -d ${INTR_HOME}/integrations/*/ | grep -v dev`; do
BASE_NAME=$(basename ${img_dir})
cd ${img_dir}
if [[ $BASE_NAME == "kafka-client" ]]; then
VERSIONS=(1.0.0 1.1.0 2.0.0 2.1.0 2.2.0 2.3.0 2.4.0 2.5.0 2.6.0)
for VERSION in ${VERSIONS[@]}; do
sed -i '' "s/<kafka\.version>.*<\/kafka\.version>/<kafka.version>$VERSION<\/kafka.version>/" pom.xml
build_image "streamnative/${IMAGE_NAME_PREFIX}${BASE_NAME}-${VERSION}:${TAG}"
sed -i '' "s/<kafka\.version>.*<\/kafka\.version>/<kafka.version>2.6.0<\/kafka.version>/" pom.xml
done
else
build_image "streamnative/${IMAGE_NAME_PREFIX}${BASE_NAME}:${TAG}"
fi
done
38 changes: 38 additions & 0 deletions integrations/kafka-client/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
FROM maven:3.6.3-openjdk-8 AS build
WORKDIR /app
COPY pom.xml .
COPY src ./src
RUN mvn clean package

FROM openjdk:8-jre
WORKDIR /app
COPY --from=build /app/target/*jar-with-dependencies.jar /app/kafka-client.jar
CMD exec java -cp /app/kafka-client.jar Main
CMD sh -c 'java -cp /app/kafka-client.jar Main; echo "ExitCode=$?"'
95 changes: 95 additions & 0 deletions integrations/kafka-client/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.example</groupId>
<artifactId>kafka-client</artifactId>
<version>1.0</version>

<properties>
<kafka.version>2.6.0</kafka.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.30</version>
<scope>runtime</scope>
</dependency>

<!-- This dependency is required for kafka-clients 2.6.0 -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.10.5</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
<showDeprecation>true</showDeprecation>
<showWarnings>true</showWarnings>
<optimize>true</optimize>
</configuration>
</plugin>

<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>

<resources>
<resource>
<directory>src/main/resources</directory>
<includes>
<include>*.properties</include>
</includes>
</resource>
</resources>
</build>

</project>
97 changes: 97 additions & 0 deletions integrations/kafka-client/src/main/java/Main.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;

public class Main {
public static void main(String[] args) {
Map<String, String> map = System.getenv();
final String broker = map.getOrDefault("KOP_BROKER", "localhost:9092");
final String topic = map.getOrDefault("KOP_TOPIC", "kafka-client");
final int limit = Integer.parseInt(map.getOrDefault("KOP_LIMIT", "10"));
final boolean shouldProduce = Boolean.parseBoolean(map.getOrDefault("KOP_PRODUCE", "false"));
final boolean shouldConsume = Boolean.parseBoolean(map.getOrDefault("KOP_CONSUME", "false"));
final String stringSerializer = "org.apache.kafka.common.serialization.StringSerializer";
final String stringDeserializer = "org.apache.kafka.common.serialization.StringDeserializer";
final String group = "Subscription";

if (shouldProduce) {
System.out.println("starting to produce");

final Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, stringSerializer);
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, stringSerializer);

final KafkaProducer<String, String> producer = new KafkaProducer<>(props);

AtomicInteger numMessagesSent = new AtomicInteger(0);
for (int i = 0; i < limit; i++) {
producer.send(new ProducerRecord<>(topic, "hello from kafka-client"),
(recordMetadata, e) -> {
if (e == null) {
System.out.println("Send to " + recordMetadata);
numMessagesSent.incrementAndGet();
} else {
System.out.println("Failed to send: " + e.getMessage());
}
});
}

producer.flush();
producer.close();
if (numMessagesSent.get() == limit) {
System.out.println("produced all messages successfully");
}
}

if (shouldConsume) {
System.out.println("starting to consume");

final Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, group);
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, stringDeserializer);
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, stringDeserializer);
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singleton(topic));

int i = 0;
while (i < limit) {
ConsumerRecords<String, String> records = consumer.poll(3000);
for (ConsumerRecord<String, String> record : records) {
System.out.println("Receive " + record);
}
i += records.count();
}

consumer.close();
System.out.println("consumed all messages successfully");
}

System.out.println("exiting normally");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

org.slf4j.simpleLogger.defaultLogLevel=info
org.slf4j.simpleLogger.showDateTime=true
org.slf4j.simpleLogger.dateTimeFormat=yyyy-MM-dd HH:mm:ss:SSS
19 changes: 15 additions & 4 deletions integrations/publish.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,22 @@ if [ $? -ne 0 ]; then
exit 1
fi

for img_dir in `ls -d ${INTR_HOME}/integrations/*/ | grep -v dev`; do
BASE_NAME=$(basename ${img_dir})
IMAGE="streamnative/${IMAGE_NAME_PREFIX}${BASE_NAME}:${TAG}"
IMAGE_LATEST="streamnative/${IMAGE_NAME_PREFIX}${BASE_NAME}:latest"
push_image() {
IMAGE="streamnative/${IMAGE_NAME_PREFIX}$1:${TAG}"
IMAGE_LATEST="streamnative/${IMAGE_NAME_PREFIX}$1:latest"
docker tag ${IMAGE} ${IMAGE_LATEST}
docker push ${IMAGE_LATEST}
docker push ${IMAGE}
}

for img_dir in `ls -d ${INTR_HOME}/integrations/*/ | grep -v dev`; do
BASE_NAME=$(basename ${img_dir})
if [[ $BASE_NAME == "kafka-client" ]]; then
KAFKA_VERSIONS=(1.0.0 1.1.0 2.0.0 2.1.0 2.2.0 2.3.0 2.4.0 2.5.0 2.6.0)
for KAFKA_VERSION in ${KAFKA_VERSIONS[@]}; do
push_image "${BASE_NAME}-${KAFKA_VERSION}"
done
else
push_image $BASE_NAME
fi
done

0 comments on commit f51aa70

Please sign in to comment.