diff --git a/dependencies/pom.xml b/dependencies/pom.xml
index 0e63e4d086c..00321407574 100644
--- a/dependencies/pom.xml
+++ b/dependencies/pom.xml
@@ -94,7 +94,7 @@
3.1.3
6.7.0.202309050840-r
5.9.3
- 3.4.0
+ 3.5.1
1.8.0
2.18.0
1.4.0
@@ -944,15 +944,6 @@
org.apache.kafka
kafka-clients
${version.lib.kafka}
-
-
-
-
-
- org.xerial.snappy
- snappy-java
-
-
diff --git a/examples/messaging/docker/kafka/Dockerfile.kafka b/examples/messaging/docker/kafka/Dockerfile.kafka
index f78d62addc0..cae5ad7368d 100644
--- a/examples/messaging/docker/kafka/Dockerfile.kafka
+++ b/examples/messaging/docker/kafka/Dockerfile.kafka
@@ -1,5 +1,5 @@
#
-# Copyright (c) 2019, 2021 Oracle and/or its affiliates.
+# Copyright (c) 2019, 2023 Oracle and/or its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,12 +14,12 @@
# limitations under the License.
#
-FROM openjdk:17-jdk-slim-buster
+FROM container-registry.oracle.com/java/openjdk:21
ENV VERSION=2.7.0
ENV SCALA_VERSION=2.13
-RUN apt-get -qq update && apt-get -qq -y install bash curl wget netcat jq
+RUN dnf update && dnf -y install wget jq nc
RUN REL_PATH=kafka/${VERSION}/kafka_${SCALA_VERSION}-${VERSION}.tgz \
&& BACKUP_ARCHIVE=https://archive.apache.org/dist/ \
diff --git a/examples/messaging/docker/kafka/init_topics.sh b/examples/messaging/docker/kafka/init_topics.sh
index bee6716faad..4957126f588 100644
--- a/examples/messaging/docker/kafka/init_topics.sh
+++ b/examples/messaging/docker/kafka/init_topics.sh
@@ -1,6 +1,6 @@
#!/bin/bash
#
-# Copyright (c) 2020, 2021 Oracle and/or its affiliates.
+# Copyright (c) 2020, 2023 Oracle and/or its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -40,9 +40,39 @@ while sleep 2; do
--replication-factor 1 \
--partitions 10 \
--topic messaging-test-topic-2
+ bash $KAFKA_TOPICS \
+ --create \
+ --replication-factor 1 \
+ --partitions 10 \
+ --config compression.type=snappy \
+ --topic messaging-test-topic-snappy-compressed
+ bash $KAFKA_TOPICS \
+ --create \
+ --replication-factor 1 \
+ --partitions 10 \
+ --config compression.type=lz4 \
+ --topic messaging-test-topic-lz4-compressed
+ bash $KAFKA_TOPICS \
+ --create \
+ --replication-factor 1 \
+ --partitions 10 \
+ --config compression.type=zstd \
+ --topic messaging-test-topic-zstd-compressed
+ bash $KAFKA_TOPICS \
+ --create \
+ --replication-factor 1 \
+ --partitions 10 \
+ --config compression.type=gzip \
+ --topic messaging-test-topic-gzip-compressed
echo
- echo "Example topics messaging-test-topic-1 and messaging-test-topic-2 created"
+ echo "Example topics created:"
+ echo " messaging-test-topic-1"
+ echo " messaging-test-topic-2"
+ echo " messaging-test-topic-snappy-compressed"
+ echo " messaging-test-topic-lz4-compressed"
+ echo " messaging-test-topic-zstd-compressed"
+ echo " messaging-test-topic-gzip-compressed"
echo
echo "================== Kafka is ready, stop it with Ctrl+C =================="
exit 0
diff --git a/examples/messaging/kafka-websocket-se/src/main/java/io/helidon/examples/messaging/se/SendingService.java b/examples/messaging/kafka-websocket-se/src/main/java/io/helidon/examples/messaging/se/SendingService.java
index 8d8ef73a7a9..355c2ac33bb 100644
--- a/examples/messaging/kafka-websocket-se/src/main/java/io/helidon/examples/messaging/se/SendingService.java
+++ b/examples/messaging/kafka-websocket-se/src/main/java/io/helidon/examples/messaging/se/SendingService.java
@@ -35,6 +35,7 @@ class SendingService implements HttpService {
String kafkaServer = config.get("app.kafka.bootstrap.servers").asString().get();
String topic = config.get("app.kafka.topic").asString().get();
+ String compression = config.get("app.kafka.compression").asString().orElse("none");
// Prepare channel for connecting processor -> kafka connector with specific subscriber configuration,
// channel -> connector mapping is automatic when using KafkaConnector.configBuilder()
@@ -42,6 +43,7 @@ class SendingService implements HttpService {
.subscriberConfig(KafkaConnector.configBuilder()
.bootstrapServers(kafkaServer)
.topic(topic)
+ .compressionType(compression)
.keySerializer(StringSerializer.class)
.valueSerializer(StringSerializer.class)
.build())
diff --git a/examples/messaging/kafka-websocket-se/src/main/java/io/helidon/examples/messaging/se/WebSocketEndpoint.java b/examples/messaging/kafka-websocket-se/src/main/java/io/helidon/examples/messaging/se/WebSocketEndpoint.java
index 3c62028ef9c..edf15419c88 100644
--- a/examples/messaging/kafka-websocket-se/src/main/java/io/helidon/examples/messaging/se/WebSocketEndpoint.java
+++ b/examples/messaging/kafka-websocket-se/src/main/java/io/helidon/examples/messaging/se/WebSocketEndpoint.java
@@ -47,6 +47,7 @@ public void onOpen(WsSession session) {
String kafkaServer = config.get("app.kafka.bootstrap.servers").asString().get();
String topic = config.get("app.kafka.topic").asString().get();
+ String compression = config.get("app.kafka.compression").asString().orElse("none");
// Prepare channel for connecting kafka connector with specific publisher configuration -> listener,
// channel -> connector mapping is automatic when using KafkaConnector.configBuilder()
@@ -60,6 +61,7 @@ public void onOpen(WsSession session) {
.enableAutoCommit(true)
.keyDeserializer(StringDeserializer.class)
.valueDeserializer(StringDeserializer.class)
+ .compressionType(compression)
.build()
)
.build();
@@ -72,7 +74,7 @@ public void onOpen(WsSession session) {
.listener(fromKafka, payload -> {
System.out.println("Kafka says: " + payload);
// Send message received from Kafka over websocket
- session.send(payload, false);
+ session.send(payload, true);
})
.build()
.start();
diff --git a/examples/messaging/kafka-websocket-se/src/main/resources/application.yaml b/examples/messaging/kafka-websocket-se/src/main/resources/application.yaml
index e5e6f96c0f7..949b803df73 100644
--- a/examples/messaging/kafka-websocket-se/src/main/resources/application.yaml
+++ b/examples/messaging/kafka-websocket-se/src/main/resources/application.yaml
@@ -1,5 +1,5 @@
#
-# Copyright (c) 2020 Oracle and/or its affiliates.
+# Copyright (c) 2020, 2023 Oracle and/or its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -17,7 +17,11 @@
app:
kafka:
bootstrap.servers: localhost:9092
- topic: messaging-test-topic-1
+ compression: snappy
+# compression: lz4
+# compression: zstd
+# compression: gzip
+ topic: messaging-test-topic-${app.kafka.compression}-compressed
server:
port: 7001
diff --git a/messaging/connectors/kafka/src/main/java/io/helidon/messaging/connectors/kafka/AppInfoParserSubstitution.java b/messaging/connectors/kafka/src/main/java/io/helidon/messaging/connectors/kafka/AppInfoParserSubstitution.java
deleted file mode 100644
index d6bb2c570ea..00000000000
--- a/messaging/connectors/kafka/src/main/java/io/helidon/messaging/connectors/kafka/AppInfoParserSubstitution.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Copyright (c) 2020, 2022 Oracle and/or its affiliates.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.helidon.messaging.connectors.kafka;
-
-import com.oracle.svm.core.annotate.Substitute;
-import com.oracle.svm.core.annotate.TargetClass;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.utils.AppInfoParser;
-
-/**
- * JMX not supported in native-image.
- */
-@TargetClass(AppInfoParser.class)
-@SuppressWarnings("checkstyle:HideUtilityClassConstructor")
-final class AppInfoParserSubstitution {
-
- @Substitute
- public static void registerAppInfo(String p, String i, Metrics m, long n) {
- }
-
- @Substitute
- public static void unregisterAppInfo(String p, String i, Metrics m) {
- }
-}
diff --git a/messaging/connectors/kafka/src/main/java/io/helidon/messaging/connectors/kafka/CompressionTypeSubstitution.java b/messaging/connectors/kafka/src/main/java/io/helidon/messaging/connectors/kafka/CompressionTypeSubstitution.java
deleted file mode 100644
index 89977c98157..00000000000
--- a/messaging/connectors/kafka/src/main/java/io/helidon/messaging/connectors/kafka/CompressionTypeSubstitution.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Copyright (c) 2020, 2022 Oracle and/or its affiliates.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.helidon.messaging.connectors.kafka;
-
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
-import java.nio.ByteBuffer;
-
-import io.helidon.common.LazyValue;
-
-import com.oracle.svm.core.annotate.Substitute;
-import com.oracle.svm.core.annotate.TargetClass;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.utils.BufferSupplier;
-import org.apache.kafka.common.utils.ByteBufferInputStream;
-import org.apache.kafka.common.utils.ByteBufferOutputStream;
-
-/**
- * Helper for creating ZSTD or SNAPPY compression stream wrappers without method handles.
- */
-@SuppressWarnings("checkstyle:OuterTypeFilename")
-final class CompressionTypeHelper {
-
- private CompressionTypeHelper() {
- }
-
- private static boolean zstdNativeLibLoaded = false;
-
- static final LazyValue> LAZY_INPUT_ZSTD =
- LazyValue.create(() -> findConstructor("com.github.luben.zstd.ZstdInputStream", InputStream.class));
- static final LazyValue> LAZY_OUTPUT_ZSTD =
- LazyValue.create(() -> findConstructor("com.github.luben.zstd.ZstdOutputStream", OutputStream.class));
- static final LazyValue> LAZY_INPUT_SNAPPY =
- LazyValue.create(() -> findConstructor("org.xerial.snappy.SnappyInputStream", InputStream.class));
- static final LazyValue> LAZY_OUTPUT_SNAPPY =
- LazyValue.create(() -> findConstructor("org.xerial.snappy.SnappyOutputStream", OutputStream.class));
-
- static OutputStream snappyOutputStream(OutputStream orig) {
- try {
- return (OutputStream) LAZY_OUTPUT_SNAPPY.get().newInstance(orig);
- } catch (KafkaException e) {
- throw e;
- } catch (Exception e) {
- throw new KafkaException(e);
- }
- }
-
- static InputStream snappyInputStream(ByteBuffer orig) {
- try {
- return (InputStream) LAZY_INPUT_SNAPPY.get().newInstance(new ByteBufferInputStream(orig));
- } catch (KafkaException e) {
- throw e;
- } catch (Exception e) {
- throw new KafkaException(e);
- }
- }
-
- static void zstdLoadNativeLibs() throws ReflectiveOperationException {
- // loading jni libs in static blocks is not supported
- // see https://github.com/oracle/graal/issues/439#issuecomment-394341725
- if (!zstdNativeLibLoaded) {
- Class> clazz = Class.forName("com.github.luben.zstd.util.Native");
- Field loadedField = clazz.getDeclaredField("loaded");
- loadedField.setAccessible(true);
- loadedField.setBoolean(null, false);
- Method loadMethod = clazz.getDeclaredMethod("load");
- loadMethod.invoke(null);
- zstdNativeLibLoaded = true;
- }
- }
-
- static OutputStream zstdOutputStream(OutputStream orig) {
- try {
- zstdLoadNativeLibs();
- return (OutputStream) LAZY_OUTPUT_ZSTD.get().newInstance(orig);
- } catch (KafkaException e) {
- throw e;
- } catch (Exception e) {
- throw new KafkaException(e);
- }
- }
-
- static InputStream zstdInputStream(ByteBuffer orig) {
- try {
- zstdLoadNativeLibs();
- return (InputStream) LAZY_INPUT_ZSTD.get().newInstance(new ByteBufferInputStream(orig));
- } catch (KafkaException e) {
- throw e;
- } catch (Exception e) {
- throw new KafkaException(e);
- }
- }
-
- static Constructor> findConstructor(String className, Class>... paramTypes) {
- try {
- return Class.forName(className)
- .getDeclaredConstructor(paramTypes);
- } catch (NoSuchMethodException | ClassNotFoundException e) {
- throw new KafkaException(e);
- }
- }
-}
-
-/**
- * Substitution for {@link org.apache.kafka.common.record.CompressionType#SNAPPY CompressionType.SNAPPY}.
- */
-@TargetClass(className = "org.apache.kafka.common.record.CompressionType$3")
-@SuppressWarnings("checkstyle:OneTopLevelClass")
-final class SnappySubstitution {
-
- @Substitute
- public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) {
- return CompressionTypeHelper.snappyOutputStream(buffer);
- }
-
- @Substitute
- public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
- return CompressionTypeHelper.snappyInputStream(buffer);
- }
-}
-
-/**
- * Substitution for {@link org.apache.kafka.common.record.CompressionType#ZSTD CompressionType.ZSTD}.
- */
-@TargetClass(className = "org.apache.kafka.common.record.CompressionType$5")
-@SuppressWarnings("checkstyle:OneTopLevelClass")
-final class ZstdSubstitution {
-
- @Substitute
- public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) {
- return CompressionTypeHelper.zstdOutputStream(buffer);
- }
-
- @Substitute
- public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
- return CompressionTypeHelper.zstdInputStream(buffer);
- }
-}
diff --git a/messaging/connectors/kafka/src/main/java/io/helidon/messaging/connectors/kafka/Crc32CSubstitution.java b/messaging/connectors/kafka/src/main/java/io/helidon/messaging/connectors/kafka/Crc32CSubstitution.java
deleted file mode 100644
index ce5214c0990..00000000000
--- a/messaging/connectors/kafka/src/main/java/io/helidon/messaging/connectors/kafka/Crc32CSubstitution.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Copyright (c) 2020, 2023 Oracle and/or its affiliates.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.helidon.messaging.connectors.kafka;
-
-import java.nio.ByteBuffer;
-import java.util.zip.Checksum;
-
-import com.oracle.svm.core.annotate.Substitute;
-import com.oracle.svm.core.annotate.TargetClass;
-import org.apache.kafka.common.utils.Checksums;
-
-
-/**
- * Method handles are not supported by native-image,
- * invoke {@link java.util.zip.CRC32C CRC32C} directly.
- *
- * Helidon (since version 2) runs only on Java 11 and newer, {@link java.util.zip.CRC32C CRC32C}
- * doesn't have to be instantiated by method handles.
- */
-@TargetClass(org.apache.kafka.common.utils.Crc32C.class)
-@Substitute
-@SuppressWarnings("checkstyle:HideUtilityClassConstructor")
-final class Crc32CSubstitution {
-
- @Substitute
- public static long compute(byte[] bytes, int offset, int size) {
- Checksum crc = create();
- crc.update(bytes, offset, size);
- return crc.getValue();
- }
-
- @Substitute
- public static long compute(ByteBuffer buffer, int offset, int size) {
- Checksum crc = create();
- Checksums.update(crc, buffer, offset, size);
- return crc.getValue();
- }
-
- @Substitute
- public static Checksum create() {
- return new java.util.zip.CRC32C();
- }
-}
diff --git a/messaging/connectors/kafka/src/main/java/io/helidon/messaging/connectors/kafka/JmxReporterSubstitution.java b/messaging/connectors/kafka/src/main/java/io/helidon/messaging/connectors/kafka/JmxReporterSubstitution.java
deleted file mode 100644
index bb483d8084c..00000000000
--- a/messaging/connectors/kafka/src/main/java/io/helidon/messaging/connectors/kafka/JmxReporterSubstitution.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Copyright (c) 2020, 2022 Oracle and/or its affiliates.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.helidon.messaging.connectors.kafka;
-
-import com.oracle.svm.core.annotate.Substitute;
-import com.oracle.svm.core.annotate.TargetClass;
-import org.apache.kafka.common.metrics.KafkaMetric;
-
-/**
- * JMX not supported in native-image.
- */
-@TargetClass(org.apache.kafka.common.metrics.JmxReporter.class)
-final class JmxReporterSubstitution {
-
- @Substitute
- private Object addAttribute(KafkaMetric metric) {
- return null;
- }
-
- @Substitute
- public void metricChange(KafkaMetric metric) {
- }
-
-}
-
diff --git a/messaging/connectors/kafka/src/main/java/io/helidon/messaging/connectors/kafka/SaslClientCallbackHandlerSubstitution.java b/messaging/connectors/kafka/src/main/java/io/helidon/messaging/connectors/kafka/SaslClientCallbackHandlerSubstitution.java
deleted file mode 100644
index a38e494e05c..00000000000
--- a/messaging/connectors/kafka/src/main/java/io/helidon/messaging/connectors/kafka/SaslClientCallbackHandlerSubstitution.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Copyright (c) 2021, 2022 Oracle and/or its affiliates.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.helidon.messaging.connectors.kafka;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import javax.security.auth.Subject;
-import javax.security.auth.callback.Callback;
-import javax.security.auth.callback.NameCallback;
-import javax.security.auth.callback.PasswordCallback;
-import javax.security.auth.callback.UnsupportedCallbackException;
-import javax.security.auth.login.AppConfigurationEntry;
-import javax.security.auth.spi.LoginModule;
-import javax.security.sasl.AuthorizeCallback;
-import javax.security.sasl.RealmCallback;
-
-import com.oracle.svm.core.annotate.Alias;
-import com.oracle.svm.core.annotate.Inject;
-import com.oracle.svm.core.annotate.Substitute;
-import com.oracle.svm.core.annotate.TargetClass;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.config.SaslConfigs;
-import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
-import org.apache.kafka.common.security.auth.SaslExtensions;
-import org.apache.kafka.common.security.auth.SaslExtensionsCallback;
-import org.apache.kafka.common.security.authenticator.LoginManager;
-import org.apache.kafka.common.security.scram.ScramExtensionsCallback;
-import org.apache.kafka.common.security.scram.internals.ScramMechanism;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@TargetClass(org.apache.kafka.common.security.authenticator.SaslClientCallbackHandler.class)
-@SuppressWarnings("checkstyle:RedundantModifier")
-final class SaslClientCallbackHandlerSubstitution implements AuthenticateCallbackHandler {
-
- @Alias
- private String mechanism;
-
- @Inject
- private Logger logger;
-
- @Inject
- private Subject subject;
-
- @Substitute
- public SaslClientCallbackHandlerSubstitution() {
- logger = LoggerFactory.getLogger(LoginManager.class);
- }
-
- @Override
- @Substitute
- public void configure(Map configs, String saslMechanism, List jaasConfigEntries) {
- this.mechanism = saslMechanism;
- this.subject = null;
-
- int entrySize = jaasConfigEntries.size();
- if (entrySize == 0) {
- logger.warn("Missing JAAS config entry, missing or malformed sasl.jaas.config property.");
- return;
- } else if (entrySize > 1) {
- logger.warn("Multiple JAAS config entries, Kafka client's sasl.jaas.config can have only one JAAS config entry.");
- return;
- }
-
- AppConfigurationEntry jaasConfigEntry = jaasConfigEntries.get(0);
- String jaasLoginModuleName = jaasConfigEntry.getLoginModuleName();
- subject = new Subject();
-
- try {
- Class.forName(jaasLoginModuleName)
- .asSubclass(LoginModule.class)
- .getDeclaredConstructor()
- .newInstance()
- .initialize(subject, this, new HashMap<>(), jaasConfigEntry.getOptions());
- } catch (ReflectiveOperationException e) {
- throw new KafkaException("Can't instantiate JAAS login module" + jaasLoginModuleName, e);
- }
- }
-
- @Override
- @Substitute
- public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
- // Subject.getSubject doesn't return proper subject in native image
- // Remove substitution when https://github.com/oracle/graal/issues/2745 is fixed
- // Subject subject = Subject.getSubject(AccessController.getContext());
-
- for (Callback callback : callbacks) {
- if (callback instanceof NameCallback) {
- NameCallback nc = (NameCallback) callback;
- if (subject != null && !subject.getPublicCredentials(String.class).isEmpty()) {
- nc.setName(subject.getPublicCredentials(String.class).iterator().next());
- } else {
- nc.setName(nc.getDefaultName());
- }
- } else if (callback instanceof PasswordCallback) {
- if (subject != null && !subject.getPrivateCredentials(String.class).isEmpty()) {
- char[] password = subject.getPrivateCredentials(String.class).iterator().next().toCharArray();
- ((PasswordCallback) callback).setPassword(password);
- } else {
- String errorMessage = "Could not login: the client is being asked for a password, but the Kafka"
- + " client code does not currently support obtaining a password from the user.";
- throw new UnsupportedCallbackException(callback, errorMessage);
- }
- } else if (callback instanceof RealmCallback) {
- RealmCallback rc = (RealmCallback) callback;
- rc.setText(rc.getDefaultText());
- } else if (callback instanceof AuthorizeCallback) {
- AuthorizeCallback ac = (AuthorizeCallback) callback;
- String authId = ac.getAuthenticationID();
- String authzId = ac.getAuthorizationID();
- ac.setAuthorized(authId.equals(authzId));
- if (ac.isAuthorized()) {
- ac.setAuthorizedID(authzId);
- }
- } else if (callback instanceof ScramExtensionsCallback) {
- if (ScramMechanism.isScram(mechanism) && subject != null && !subject.getPublicCredentials(Map.class).isEmpty()) {
- @SuppressWarnings("unchecked")
- Map extensions =
- (Map) subject.getPublicCredentials(Map.class).iterator().next();
- ((ScramExtensionsCallback) callback).extensions(extensions);
- }
- } else if (callback instanceof SaslExtensionsCallback) {
- if (!SaslConfigs.GSSAPI_MECHANISM.equals(mechanism)
- && subject != null && !subject.getPublicCredentials(SaslExtensions.class).isEmpty()) {
- SaslExtensions extensions = subject.getPublicCredentials(SaslExtensions.class).iterator().next();
- ((SaslExtensionsCallback) callback).extensions(extensions);
- }
- } else {
- throw new UnsupportedCallbackException(callback, "Unrecognized SASL ClientCallback");
- }
- }
- }
-
- @Override
- @Substitute
- public void close() {
- }
-}
diff --git a/messaging/connectors/kafka/src/main/resources/META-INF/helidon/native-image/reflection-config.json b/messaging/connectors/kafka/src/main/resources/META-INF/helidon/native-image/reflection-config.json
index 9d8a492e06d..a96c8b57def 100644
--- a/messaging/connectors/kafka/src/main/resources/META-INF/helidon/native-image/reflection-config.json
+++ b/messaging/connectors/kafka/src/main/resources/META-INF/helidon/native-image/reflection-config.json
@@ -8,13 +8,21 @@
"org.apache.kafka.clients.producer.Partitioner",
"org.apache.kafka.common.security.auth.AuthenticateCallbackHandler",
"org.apache.kafka.common.security.auth.Login",
- "javax.security.auth.spi.LoginModule"
+ "javax.security.auth.spi.LoginModule",
+ "net.jpountz.xxhash.StreamingXXHash32",
+ "net.jpountz.xxhash.StreamingXXHash32$Factory",
+ "net.jpountz.xxhash.StreamingXXHash64JavaSafe$Factory",
+ "net.jpountz.xxhash.XXHash32",
+ "net.jpountz.lz4.LZ4UnknownSizeDecompressor",
+ "net.jpountz.lz4.LZ4Compressor",
+ "net.jpountz.lz4.LZ4Decompressor"
],
"classes": [
"org.xerial.snappy.SnappyInputStream",
"org.xerial.snappy.SnappyOutputStream",
"com.github.luben.zstd.ZstdInputStream",
"com.github.luben.zstd.ZstdOutputStream",
- "com.github.luben.zstd.util.Native"
+ "com.github.luben.zstd.util.Native",
+ "net.jpountz.xxhash.XXHash64JavaSafe"
]
}
diff --git a/messaging/connectors/kafka/src/main/resources/META-INF/native-image/io.helidon.messaging.connectors.kafka/native-image.properties b/messaging/connectors/kafka/src/main/resources/META-INF/native-image/io.helidon.messaging.connectors.kafka/native-image.properties
deleted file mode 100644
index c56586661ca..00000000000
--- a/messaging/connectors/kafka/src/main/resources/META-INF/native-image/io.helidon.messaging.connectors.kafka/native-image.properties
+++ /dev/null
@@ -1,19 +0,0 @@
-#
-# Copyright (c) 2020, 2023 Oracle and/or its affiliates.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# Manual registration of Sun Sasl as workaround for https://github.com/oracle/graal/issues/3664
-Args=-H:AdditionalSecurityProviders=com.sun.security.sasl.Provider \
- -H:+JNI
diff --git a/messaging/messaging/src/main/java/module-info.java b/messaging/messaging/src/main/java/module-info.java
index 3c88d7ff326..855b6180d11 100644
--- a/messaging/messaging/src/main/java/module-info.java
+++ b/messaging/messaging/src/main/java/module-info.java
@@ -26,7 +26,7 @@
in = HelidonFlavor.SE,
path = "Messaging"
)
-@Aot(description = "Experimental support in native image")
+@Aot
@SuppressWarnings({ "requires-automatic", "requires-transitive-automatic" })
module io.helidon.messaging {