Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka connector native image updates #4

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 1 addition & 10 deletions dependencies/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@
<version.lib.jersey>3.1.3</version.lib.jersey>
<version.lib.jgit>6.7.0.202309050840-r</version.lib.jgit>
<version.lib.junit>5.9.3</version.lib.junit>
<version.lib.kafka>3.4.0</version.lib.kafka>
<version.lib.kafka>3.5.1</version.lib.kafka>
<version.lib.kotlin>1.8.0</version.lib.kotlin>
<version.lib.log4j>2.18.0</version.lib.log4j>
<version.lib.logback>1.4.0</version.lib.logback>
Expand Down Expand Up @@ -944,15 +944,6 @@
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${version.lib.kafka}</version>
<!-- Snip transitive dependency on Snappy (which should be optional anyways). -->
<!-- This can be removed once kafka-clients is upgraded -->
<!-- to 3.4.2 or newer. See https://issues.apache.org/jira/browse/KAFKA-15096 -->
<exclusions>
<exclusion>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<!-- required transitively by okhttp (used in OpenTelemetry through Jaeger) -->
Expand Down
6 changes: 3 additions & 3 deletions examples/messaging/docker/kafka/Dockerfile.kafka
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright (c) 2019, 2021 Oracle and/or its affiliates.
# Copyright (c) 2019, 2023 Oracle and/or its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -14,12 +14,12 @@
# limitations under the License.
#

FROM openjdk:17-jdk-slim-buster
FROM container-registry.oracle.com/java/openjdk:21

ENV VERSION=2.7.0
ENV SCALA_VERSION=2.13

RUN apt-get -qq update && apt-get -qq -y install bash curl wget netcat jq
RUN dnf update && dnf -y install wget jq nc

RUN REL_PATH=kafka/${VERSION}/kafka_${SCALA_VERSION}-${VERSION}.tgz \
&& BACKUP_ARCHIVE=https://archive.apache.org/dist/ \
Expand Down
34 changes: 32 additions & 2 deletions examples/messaging/docker/kafka/init_topics.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/bin/bash
#
# Copyright (c) 2020, 2021 Oracle and/or its affiliates.
# Copyright (c) 2020, 2023 Oracle and/or its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -40,9 +40,39 @@ while sleep 2; do
--replication-factor 1 \
--partitions 10 \
--topic messaging-test-topic-2
bash $KAFKA_TOPICS \
--create \
--replication-factor 1 \
--partitions 10 \
--config compression.type=snappy \
--topic messaging-test-topic-snappy-compressed
bash $KAFKA_TOPICS \
--create \
--replication-factor 1 \
--partitions 10 \
--config compression.type=lz4 \
--topic messaging-test-topic-lz4-compressed
bash $KAFKA_TOPICS \
--create \
--replication-factor 1 \
--partitions 10 \
--config compression.type=zstd \
--topic messaging-test-topic-zstd-compressed
bash $KAFKA_TOPICS \
--create \
--replication-factor 1 \
--partitions 10 \
--config compression.type=gzip \
--topic messaging-test-topic-gzip-compressed

echo
echo "Example topics messaging-test-topic-1 and messaging-test-topic-2 created"
echo "Example topics created:"
echo " messaging-test-topic-1"
echo " messaging-test-topic-2"
echo " messaging-test-topic-snappy-compressed"
echo " messaging-test-topic-lz4-compressed"
echo " messaging-test-topic-zstd-compressed"
echo " messaging-test-topic-gzip-compressed"
echo
echo "================== Kafka is ready, stop it with Ctrl+C =================="
exit 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,15 @@ class SendingService implements HttpService {

String kafkaServer = config.get("app.kafka.bootstrap.servers").asString().get();
String topic = config.get("app.kafka.topic").asString().get();
String compression = config.get("app.kafka.compression").asString().orElse("none");

// Prepare channel for connecting processor -> kafka connector with specific subscriber configuration,
// channel -> connector mapping is automatic when using KafkaConnector.configBuilder()
Channel<String> toKafka = Channel.<String>builder()
.subscriberConfig(KafkaConnector.configBuilder()
.bootstrapServers(kafkaServer)
.topic(topic)
.compressionType(compression)
.keySerializer(StringSerializer.class)
.valueSerializer(StringSerializer.class)
.build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public void onOpen(WsSession session) {

String kafkaServer = config.get("app.kafka.bootstrap.servers").asString().get();
String topic = config.get("app.kafka.topic").asString().get();
String compression = config.get("app.kafka.compression").asString().orElse("none");

// Prepare channel for connecting kafka connector with specific publisher configuration -> listener,
// channel -> connector mapping is automatic when using KafkaConnector.configBuilder()
Expand All @@ -60,6 +61,7 @@ public void onOpen(WsSession session) {
.enableAutoCommit(true)
.keyDeserializer(StringDeserializer.class)
.valueDeserializer(StringDeserializer.class)
.compressionType(compression)
.build()
)
.build();
Expand All @@ -72,7 +74,7 @@ public void onOpen(WsSession session) {
.listener(fromKafka, payload -> {
System.out.println("Kafka says: " + payload);
// Send message received from Kafka over websocket
session.send(payload, false);
session.send(payload, true);
})
.build()
.start();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright (c) 2020 Oracle and/or its affiliates.
# Copyright (c) 2020, 2023 Oracle and/or its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -17,7 +17,11 @@
app:
kafka:
bootstrap.servers: localhost:9092
topic: messaging-test-topic-1
compression: snappy
# compression: lz4
# compression: zstd
# compression: gzip
topic: messaging-test-topic-${app.kafka.compression}-compressed

server:
port: 7001
Expand Down

This file was deleted.

This file was deleted.

Loading