diff --git a/dependencies/pom.xml b/dependencies/pom.xml index 0e63e4d086c..00321407574 100644 --- a/dependencies/pom.xml +++ b/dependencies/pom.xml @@ -94,7 +94,7 @@ 3.1.3 6.7.0.202309050840-r 5.9.3 - 3.4.0 + 3.5.1 1.8.0 2.18.0 1.4.0 @@ -944,15 +944,6 @@ org.apache.kafka kafka-clients ${version.lib.kafka} - - - - - - org.xerial.snappy - snappy-java - - diff --git a/examples/messaging/docker/kafka/Dockerfile.kafka b/examples/messaging/docker/kafka/Dockerfile.kafka index f78d62addc0..cae5ad7368d 100644 --- a/examples/messaging/docker/kafka/Dockerfile.kafka +++ b/examples/messaging/docker/kafka/Dockerfile.kafka @@ -1,5 +1,5 @@ # -# Copyright (c) 2019, 2021 Oracle and/or its affiliates. +# Copyright (c) 2019, 2023 Oracle and/or its affiliates. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,12 +14,12 @@ # limitations under the License. # -FROM openjdk:17-jdk-slim-buster +FROM container-registry.oracle.com/java/openjdk:21 ENV VERSION=2.7.0 ENV SCALA_VERSION=2.13 -RUN apt-get -qq update && apt-get -qq -y install bash curl wget netcat jq +RUN dnf update && dnf -y install wget jq nc RUN REL_PATH=kafka/${VERSION}/kafka_${SCALA_VERSION}-${VERSION}.tgz \ && BACKUP_ARCHIVE=https://archive.apache.org/dist/ \ diff --git a/examples/messaging/docker/kafka/init_topics.sh b/examples/messaging/docker/kafka/init_topics.sh index bee6716faad..4957126f588 100644 --- a/examples/messaging/docker/kafka/init_topics.sh +++ b/examples/messaging/docker/kafka/init_topics.sh @@ -1,6 +1,6 @@ #!/bin/bash # -# Copyright (c) 2020, 2021 Oracle and/or its affiliates. +# Copyright (c) 2020, 2023 Oracle and/or its affiliates. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -40,9 +40,39 @@ while sleep 2; do --replication-factor 1 \ --partitions 10 \ --topic messaging-test-topic-2 + bash $KAFKA_TOPICS \ + --create \ + --replication-factor 1 \ + --partitions 10 \ + --config compression.type=snappy \ + --topic messaging-test-topic-snappy-compressed + bash $KAFKA_TOPICS \ + --create \ + --replication-factor 1 \ + --partitions 10 \ + --config compression.type=lz4 \ + --topic messaging-test-topic-lz4-compressed + bash $KAFKA_TOPICS \ + --create \ + --replication-factor 1 \ + --partitions 10 \ + --config compression.type=zstd \ + --topic messaging-test-topic-zstd-compressed + bash $KAFKA_TOPICS \ + --create \ + --replication-factor 1 \ + --partitions 10 \ + --config compression.type=gzip \ + --topic messaging-test-topic-gzip-compressed echo - echo "Example topics messaging-test-topic-1 and messaging-test-topic-2 created" + echo "Example topics created:" + echo " messaging-test-topic-1" + echo " messaging-test-topic-2" + echo " messaging-test-topic-snappy-compressed" + echo " messaging-test-topic-lz4-compressed" + echo " messaging-test-topic-zstd-compressed" + echo " messaging-test-topic-gzip-compressed" echo echo "================== Kafka is ready, stop it with Ctrl+C ==================" exit 0 diff --git a/examples/messaging/kafka-websocket-se/src/main/java/io/helidon/examples/messaging/se/SendingService.java b/examples/messaging/kafka-websocket-se/src/main/java/io/helidon/examples/messaging/se/SendingService.java index 8d8ef73a7a9..355c2ac33bb 100644 --- a/examples/messaging/kafka-websocket-se/src/main/java/io/helidon/examples/messaging/se/SendingService.java +++ b/examples/messaging/kafka-websocket-se/src/main/java/io/helidon/examples/messaging/se/SendingService.java @@ -35,6 +35,7 @@ class SendingService implements HttpService { String kafkaServer = config.get("app.kafka.bootstrap.servers").asString().get(); String topic = config.get("app.kafka.topic").asString().get(); + String compression = config.get("app.kafka.compression").asString().orElse("none"); // Prepare channel for connecting processor -> kafka connector with specific subscriber configuration, // channel -> connector mapping is automatic when using KafkaConnector.configBuilder() @@ -42,6 +43,7 @@ class SendingService implements HttpService { .subscriberConfig(KafkaConnector.configBuilder() .bootstrapServers(kafkaServer) .topic(topic) + .compressionType(compression) .keySerializer(StringSerializer.class) .valueSerializer(StringSerializer.class) .build()) diff --git a/examples/messaging/kafka-websocket-se/src/main/java/io/helidon/examples/messaging/se/WebSocketEndpoint.java b/examples/messaging/kafka-websocket-se/src/main/java/io/helidon/examples/messaging/se/WebSocketEndpoint.java index 3c62028ef9c..edf15419c88 100644 --- a/examples/messaging/kafka-websocket-se/src/main/java/io/helidon/examples/messaging/se/WebSocketEndpoint.java +++ b/examples/messaging/kafka-websocket-se/src/main/java/io/helidon/examples/messaging/se/WebSocketEndpoint.java @@ -47,6 +47,7 @@ public void onOpen(WsSession session) { String kafkaServer = config.get("app.kafka.bootstrap.servers").asString().get(); String topic = config.get("app.kafka.topic").asString().get(); + String compression = config.get("app.kafka.compression").asString().orElse("none"); // Prepare channel for connecting kafka connector with specific publisher configuration -> listener, // channel -> connector mapping is automatic when using KafkaConnector.configBuilder() @@ -60,6 +61,7 @@ public void onOpen(WsSession session) { .enableAutoCommit(true) .keyDeserializer(StringDeserializer.class) .valueDeserializer(StringDeserializer.class) + .compressionType(compression) .build() ) .build(); @@ -72,7 +74,7 @@ public void onOpen(WsSession session) { .listener(fromKafka, payload -> { System.out.println("Kafka says: " + payload); // Send message received from Kafka over websocket - session.send(payload, false); + session.send(payload, true); }) .build() .start(); diff --git a/examples/messaging/kafka-websocket-se/src/main/resources/application.yaml b/examples/messaging/kafka-websocket-se/src/main/resources/application.yaml index e5e6f96c0f7..949b803df73 100644 --- a/examples/messaging/kafka-websocket-se/src/main/resources/application.yaml +++ b/examples/messaging/kafka-websocket-se/src/main/resources/application.yaml @@ -1,5 +1,5 @@ # -# Copyright (c) 2020 Oracle and/or its affiliates. +# Copyright (c) 2020, 2023 Oracle and/or its affiliates. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -17,7 +17,11 @@ app: kafka: bootstrap.servers: localhost:9092 - topic: messaging-test-topic-1 + compression: snappy +# compression: lz4 +# compression: zstd +# compression: gzip + topic: messaging-test-topic-${app.kafka.compression}-compressed server: port: 7001 diff --git a/messaging/connectors/kafka/src/main/java/io/helidon/messaging/connectors/kafka/AppInfoParserSubstitution.java b/messaging/connectors/kafka/src/main/java/io/helidon/messaging/connectors/kafka/AppInfoParserSubstitution.java deleted file mode 100644 index d6bb2c570ea..00000000000 --- a/messaging/connectors/kafka/src/main/java/io/helidon/messaging/connectors/kafka/AppInfoParserSubstitution.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright (c) 2020, 2022 Oracle and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.helidon.messaging.connectors.kafka; - -import com.oracle.svm.core.annotate.Substitute; -import com.oracle.svm.core.annotate.TargetClass; -import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.utils.AppInfoParser; - -/** - * JMX not supported in native-image. - */ -@TargetClass(AppInfoParser.class) -@SuppressWarnings("checkstyle:HideUtilityClassConstructor") -final class AppInfoParserSubstitution { - - @Substitute - public static void registerAppInfo(String p, String i, Metrics m, long n) { - } - - @Substitute - public static void unregisterAppInfo(String p, String i, Metrics m) { - } -} diff --git a/messaging/connectors/kafka/src/main/java/io/helidon/messaging/connectors/kafka/CompressionTypeSubstitution.java b/messaging/connectors/kafka/src/main/java/io/helidon/messaging/connectors/kafka/CompressionTypeSubstitution.java deleted file mode 100644 index 89977c98157..00000000000 --- a/messaging/connectors/kafka/src/main/java/io/helidon/messaging/connectors/kafka/CompressionTypeSubstitution.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Copyright (c) 2020, 2022 Oracle and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.helidon.messaging.connectors.kafka; - -import java.io.InputStream; -import java.io.OutputStream; -import java.lang.reflect.Constructor; -import java.lang.reflect.Field; -import java.lang.reflect.Method; -import java.nio.ByteBuffer; - -import io.helidon.common.LazyValue; - -import com.oracle.svm.core.annotate.Substitute; -import com.oracle.svm.core.annotate.TargetClass; -import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.utils.BufferSupplier; -import org.apache.kafka.common.utils.ByteBufferInputStream; -import org.apache.kafka.common.utils.ByteBufferOutputStream; - -/** - * Helper for creating ZSTD or SNAPPY compression stream wrappers without method handles. - */ -@SuppressWarnings("checkstyle:OuterTypeFilename") -final class CompressionTypeHelper { - - private CompressionTypeHelper() { - } - - private static boolean zstdNativeLibLoaded = false; - - static final LazyValue> LAZY_INPUT_ZSTD = - LazyValue.create(() -> findConstructor("com.github.luben.zstd.ZstdInputStream", InputStream.class)); - static final LazyValue> LAZY_OUTPUT_ZSTD = - LazyValue.create(() -> findConstructor("com.github.luben.zstd.ZstdOutputStream", OutputStream.class)); - static final LazyValue> LAZY_INPUT_SNAPPY = - LazyValue.create(() -> findConstructor("org.xerial.snappy.SnappyInputStream", InputStream.class)); - static final LazyValue> LAZY_OUTPUT_SNAPPY = - LazyValue.create(() -> findConstructor("org.xerial.snappy.SnappyOutputStream", OutputStream.class)); - - static OutputStream snappyOutputStream(OutputStream orig) { - try { - return (OutputStream) LAZY_OUTPUT_SNAPPY.get().newInstance(orig); - } catch (KafkaException e) { - throw e; - } catch (Exception e) { - throw new KafkaException(e); - } - } - - static InputStream snappyInputStream(ByteBuffer orig) { - try { - return (InputStream) LAZY_INPUT_SNAPPY.get().newInstance(new ByteBufferInputStream(orig)); - } catch (KafkaException e) { - throw e; - } catch (Exception e) { - throw new KafkaException(e); - } - } - - static void zstdLoadNativeLibs() throws ReflectiveOperationException { - // loading jni libs in static blocks is not supported - // see https://github.com/oracle/graal/issues/439#issuecomment-394341725 - if (!zstdNativeLibLoaded) { - Class clazz = Class.forName("com.github.luben.zstd.util.Native"); - Field loadedField = clazz.getDeclaredField("loaded"); - loadedField.setAccessible(true); - loadedField.setBoolean(null, false); - Method loadMethod = clazz.getDeclaredMethod("load"); - loadMethod.invoke(null); - zstdNativeLibLoaded = true; - } - } - - static OutputStream zstdOutputStream(OutputStream orig) { - try { - zstdLoadNativeLibs(); - return (OutputStream) LAZY_OUTPUT_ZSTD.get().newInstance(orig); - } catch (KafkaException e) { - throw e; - } catch (Exception e) { - throw new KafkaException(e); - } - } - - static InputStream zstdInputStream(ByteBuffer orig) { - try { - zstdLoadNativeLibs(); - return (InputStream) LAZY_INPUT_ZSTD.get().newInstance(new ByteBufferInputStream(orig)); - } catch (KafkaException e) { - throw e; - } catch (Exception e) { - throw new KafkaException(e); - } - } - - static Constructor findConstructor(String className, Class... paramTypes) { - try { - return Class.forName(className) - .getDeclaredConstructor(paramTypes); - } catch (NoSuchMethodException | ClassNotFoundException e) { - throw new KafkaException(e); - } - } -} - -/** - * Substitution for {@link org.apache.kafka.common.record.CompressionType#SNAPPY CompressionType.SNAPPY}. - */ -@TargetClass(className = "org.apache.kafka.common.record.CompressionType$3") -@SuppressWarnings("checkstyle:OneTopLevelClass") -final class SnappySubstitution { - - @Substitute - public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) { - return CompressionTypeHelper.snappyOutputStream(buffer); - } - - @Substitute - public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) { - return CompressionTypeHelper.snappyInputStream(buffer); - } -} - -/** - * Substitution for {@link org.apache.kafka.common.record.CompressionType#ZSTD CompressionType.ZSTD}. - */ -@TargetClass(className = "org.apache.kafka.common.record.CompressionType$5") -@SuppressWarnings("checkstyle:OneTopLevelClass") -final class ZstdSubstitution { - - @Substitute - public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) { - return CompressionTypeHelper.zstdOutputStream(buffer); - } - - @Substitute - public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) { - return CompressionTypeHelper.zstdInputStream(buffer); - } -} diff --git a/messaging/connectors/kafka/src/main/java/io/helidon/messaging/connectors/kafka/Crc32CSubstitution.java b/messaging/connectors/kafka/src/main/java/io/helidon/messaging/connectors/kafka/Crc32CSubstitution.java deleted file mode 100644 index ce5214c0990..00000000000 --- a/messaging/connectors/kafka/src/main/java/io/helidon/messaging/connectors/kafka/Crc32CSubstitution.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Copyright (c) 2020, 2023 Oracle and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.helidon.messaging.connectors.kafka; - -import java.nio.ByteBuffer; -import java.util.zip.Checksum; - -import com.oracle.svm.core.annotate.Substitute; -import com.oracle.svm.core.annotate.TargetClass; -import org.apache.kafka.common.utils.Checksums; - - -/** - * Method handles are not supported by native-image, - * invoke {@link java.util.zip.CRC32C CRC32C} directly. - *

- * Helidon (since version 2) runs only on Java 11 and newer, {@link java.util.zip.CRC32C CRC32C} - * doesn't have to be instantiated by method handles. - */ -@TargetClass(org.apache.kafka.common.utils.Crc32C.class) -@Substitute -@SuppressWarnings("checkstyle:HideUtilityClassConstructor") -final class Crc32CSubstitution { - - @Substitute - public static long compute(byte[] bytes, int offset, int size) { - Checksum crc = create(); - crc.update(bytes, offset, size); - return crc.getValue(); - } - - @Substitute - public static long compute(ByteBuffer buffer, int offset, int size) { - Checksum crc = create(); - Checksums.update(crc, buffer, offset, size); - return crc.getValue(); - } - - @Substitute - public static Checksum create() { - return new java.util.zip.CRC32C(); - } -} diff --git a/messaging/connectors/kafka/src/main/java/io/helidon/messaging/connectors/kafka/JmxReporterSubstitution.java b/messaging/connectors/kafka/src/main/java/io/helidon/messaging/connectors/kafka/JmxReporterSubstitution.java deleted file mode 100644 index bb483d8084c..00000000000 --- a/messaging/connectors/kafka/src/main/java/io/helidon/messaging/connectors/kafka/JmxReporterSubstitution.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright (c) 2020, 2022 Oracle and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.helidon.messaging.connectors.kafka; - -import com.oracle.svm.core.annotate.Substitute; -import com.oracle.svm.core.annotate.TargetClass; -import org.apache.kafka.common.metrics.KafkaMetric; - -/** - * JMX not supported in native-image. - */ -@TargetClass(org.apache.kafka.common.metrics.JmxReporter.class) -final class JmxReporterSubstitution { - - @Substitute - private Object addAttribute(KafkaMetric metric) { - return null; - } - - @Substitute - public void metricChange(KafkaMetric metric) { - } - -} - diff --git a/messaging/connectors/kafka/src/main/java/io/helidon/messaging/connectors/kafka/SaslClientCallbackHandlerSubstitution.java b/messaging/connectors/kafka/src/main/java/io/helidon/messaging/connectors/kafka/SaslClientCallbackHandlerSubstitution.java deleted file mode 100644 index a38e494e05c..00000000000 --- a/messaging/connectors/kafka/src/main/java/io/helidon/messaging/connectors/kafka/SaslClientCallbackHandlerSubstitution.java +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Copyright (c) 2021, 2022 Oracle and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.helidon.messaging.connectors.kafka; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import javax.security.auth.Subject; -import javax.security.auth.callback.Callback; -import javax.security.auth.callback.NameCallback; -import javax.security.auth.callback.PasswordCallback; -import javax.security.auth.callback.UnsupportedCallbackException; -import javax.security.auth.login.AppConfigurationEntry; -import javax.security.auth.spi.LoginModule; -import javax.security.sasl.AuthorizeCallback; -import javax.security.sasl.RealmCallback; - -import com.oracle.svm.core.annotate.Alias; -import com.oracle.svm.core.annotate.Inject; -import com.oracle.svm.core.annotate.Substitute; -import com.oracle.svm.core.annotate.TargetClass; -import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.config.SaslConfigs; -import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler; -import org.apache.kafka.common.security.auth.SaslExtensions; -import org.apache.kafka.common.security.auth.SaslExtensionsCallback; -import org.apache.kafka.common.security.authenticator.LoginManager; -import org.apache.kafka.common.security.scram.ScramExtensionsCallback; -import org.apache.kafka.common.security.scram.internals.ScramMechanism; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@TargetClass(org.apache.kafka.common.security.authenticator.SaslClientCallbackHandler.class) -@SuppressWarnings("checkstyle:RedundantModifier") -final class SaslClientCallbackHandlerSubstitution implements AuthenticateCallbackHandler { - - @Alias - private String mechanism; - - @Inject - private Logger logger; - - @Inject - private Subject subject; - - @Substitute - public SaslClientCallbackHandlerSubstitution() { - logger = LoggerFactory.getLogger(LoginManager.class); - } - - @Override - @Substitute - public void configure(Map configs, String saslMechanism, List jaasConfigEntries) { - this.mechanism = saslMechanism; - this.subject = null; - - int entrySize = jaasConfigEntries.size(); - if (entrySize == 0) { - logger.warn("Missing JAAS config entry, missing or malformed sasl.jaas.config property."); - return; - } else if (entrySize > 1) { - logger.warn("Multiple JAAS config entries, Kafka client's sasl.jaas.config can have only one JAAS config entry."); - return; - } - - AppConfigurationEntry jaasConfigEntry = jaasConfigEntries.get(0); - String jaasLoginModuleName = jaasConfigEntry.getLoginModuleName(); - subject = new Subject(); - - try { - Class.forName(jaasLoginModuleName) - .asSubclass(LoginModule.class) - .getDeclaredConstructor() - .newInstance() - .initialize(subject, this, new HashMap<>(), jaasConfigEntry.getOptions()); - } catch (ReflectiveOperationException e) { - throw new KafkaException("Can't instantiate JAAS login module" + jaasLoginModuleName, e); - } - } - - @Override - @Substitute - public void handle(Callback[] callbacks) throws UnsupportedCallbackException { - // Subject.getSubject doesn't return proper subject in native image - // Remove substitution when https://github.com/oracle/graal/issues/2745 is fixed - // Subject subject = Subject.getSubject(AccessController.getContext()); - - for (Callback callback : callbacks) { - if (callback instanceof NameCallback) { - NameCallback nc = (NameCallback) callback; - if (subject != null && !subject.getPublicCredentials(String.class).isEmpty()) { - nc.setName(subject.getPublicCredentials(String.class).iterator().next()); - } else { - nc.setName(nc.getDefaultName()); - } - } else if (callback instanceof PasswordCallback) { - if (subject != null && !subject.getPrivateCredentials(String.class).isEmpty()) { - char[] password = subject.getPrivateCredentials(String.class).iterator().next().toCharArray(); - ((PasswordCallback) callback).setPassword(password); - } else { - String errorMessage = "Could not login: the client is being asked for a password, but the Kafka" - + " client code does not currently support obtaining a password from the user."; - throw new UnsupportedCallbackException(callback, errorMessage); - } - } else if (callback instanceof RealmCallback) { - RealmCallback rc = (RealmCallback) callback; - rc.setText(rc.getDefaultText()); - } else if (callback instanceof AuthorizeCallback) { - AuthorizeCallback ac = (AuthorizeCallback) callback; - String authId = ac.getAuthenticationID(); - String authzId = ac.getAuthorizationID(); - ac.setAuthorized(authId.equals(authzId)); - if (ac.isAuthorized()) { - ac.setAuthorizedID(authzId); - } - } else if (callback instanceof ScramExtensionsCallback) { - if (ScramMechanism.isScram(mechanism) && subject != null && !subject.getPublicCredentials(Map.class).isEmpty()) { - @SuppressWarnings("unchecked") - Map extensions = - (Map) subject.getPublicCredentials(Map.class).iterator().next(); - ((ScramExtensionsCallback) callback).extensions(extensions); - } - } else if (callback instanceof SaslExtensionsCallback) { - if (!SaslConfigs.GSSAPI_MECHANISM.equals(mechanism) - && subject != null && !subject.getPublicCredentials(SaslExtensions.class).isEmpty()) { - SaslExtensions extensions = subject.getPublicCredentials(SaslExtensions.class).iterator().next(); - ((SaslExtensionsCallback) callback).extensions(extensions); - } - } else { - throw new UnsupportedCallbackException(callback, "Unrecognized SASL ClientCallback"); - } - } - } - - @Override - @Substitute - public void close() { - } -} diff --git a/messaging/connectors/kafka/src/main/resources/META-INF/helidon/native-image/reflection-config.json b/messaging/connectors/kafka/src/main/resources/META-INF/helidon/native-image/reflection-config.json index 9d8a492e06d..a96c8b57def 100644 --- a/messaging/connectors/kafka/src/main/resources/META-INF/helidon/native-image/reflection-config.json +++ b/messaging/connectors/kafka/src/main/resources/META-INF/helidon/native-image/reflection-config.json @@ -8,13 +8,21 @@ "org.apache.kafka.clients.producer.Partitioner", "org.apache.kafka.common.security.auth.AuthenticateCallbackHandler", "org.apache.kafka.common.security.auth.Login", - "javax.security.auth.spi.LoginModule" + "javax.security.auth.spi.LoginModule", + "net.jpountz.xxhash.StreamingXXHash32", + "net.jpountz.xxhash.StreamingXXHash32$Factory", + "net.jpountz.xxhash.StreamingXXHash64JavaSafe$Factory", + "net.jpountz.xxhash.XXHash32", + "net.jpountz.lz4.LZ4UnknownSizeDecompressor", + "net.jpountz.lz4.LZ4Compressor", + "net.jpountz.lz4.LZ4Decompressor" ], "classes": [ "org.xerial.snappy.SnappyInputStream", "org.xerial.snappy.SnappyOutputStream", "com.github.luben.zstd.ZstdInputStream", "com.github.luben.zstd.ZstdOutputStream", - "com.github.luben.zstd.util.Native" + "com.github.luben.zstd.util.Native", + "net.jpountz.xxhash.XXHash64JavaSafe" ] } diff --git a/messaging/connectors/kafka/src/main/resources/META-INF/native-image/io.helidon.messaging.connectors.kafka/native-image.properties b/messaging/connectors/kafka/src/main/resources/META-INF/native-image/io.helidon.messaging.connectors.kafka/native-image.properties index c56586661ca..3f598a985be 100644 --- a/messaging/connectors/kafka/src/main/resources/META-INF/native-image/io.helidon.messaging.connectors.kafka/native-image.properties +++ b/messaging/connectors/kafka/src/main/resources/META-INF/native-image/io.helidon.messaging.connectors.kafka/native-image.properties @@ -15,5 +15,7 @@ # # Manual registration of Sun Sasl as workaround for https://github.com/oracle/graal/issues/3664 -Args=-H:AdditionalSecurityProviders=com.sun.security.sasl.Provider \ +Args=\ + -H:+UnlockExperimentalVMOptions \ + -H:AdditionalSecurityProviders=com.sun.security.sasl.Provider \ -H:+JNI diff --git a/messaging/messaging/src/main/java/module-info.java b/messaging/messaging/src/main/java/module-info.java index 3c88d7ff326..855b6180d11 100644 --- a/messaging/messaging/src/main/java/module-info.java +++ b/messaging/messaging/src/main/java/module-info.java @@ -26,7 +26,7 @@ in = HelidonFlavor.SE, path = "Messaging" ) -@Aot(description = "Experimental support in native image") +@Aot @SuppressWarnings({ "requires-automatic", "requires-transitive-automatic" }) module io.helidon.messaging {