From d25138bdb4a5e6907876955f577db8d6f620ac85 Mon Sep 17 00:00:00 2001 From: Daniel Kec Date: Mon, 28 Mar 2022 16:20:45 +0200 Subject: [PATCH 1/2] JMS configurable producer properties #3980 Signed-off-by: Daniel Kec --- docs/mp/reactivemessaging/05_jms.adoc | 3 +- .../connectors/jms/ConfigHelper.java | 37 +++++++++++++++ .../connectors/jms/JmsConnector.java | 38 ++++++++++++++- .../messaging/connectors/jms/ConfigTest.java | 46 ++++++++++++++++++- 4 files changed, 120 insertions(+), 4 deletions(-) create mode 100644 messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/ConfigHelper.java diff --git a/docs/mp/reactivemessaging/05_jms.adoc b/docs/mp/reactivemessaging/05_jms.adoc index 881db6f7376..e330b45a2eb 100644 --- a/docs/mp/reactivemessaging/05_jms.adoc +++ b/docs/mp/reactivemessaging/05_jms.adoc @@ -1,6 +1,6 @@ /////////////////////////////////////////////////////////////////////////////// - Copyright (c) 2020, 2021 Oracle and/or its affiliates. + Copyright (c) 2020, 2022 Oracle and/or its affiliates. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -69,6 +69,7 @@ Expression can only access headers and properties, not the payload. they share same JMS session and same JDBC connection as well. |`jndi.jms-factory` | JNDI name of JMS factory. |`jndi.env-properties` | Environment properties used for creating initial context `java.naming.factory.initial`, `java.naming.provider.url` ... +|`producer.someproperty` | property with producer prefix is set to producer instance (for example WLS Unit-of-Order `WLMessageProducer.setUnitOfOrder("unit-1")` can be configured as `producer.unit-of-order=unit-1`) |=== === Configured JMS factory diff --git a/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/ConfigHelper.java b/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/ConfigHelper.java new file mode 100644 index 00000000000..6c221d5deca --- /dev/null +++ b/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/ConfigHelper.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2022 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package io.helidon.messaging.connectors.jms; + +import java.util.regex.Pattern; + +class ConfigHelper { + + private ConfigHelper(){ + //noop + } + + static final Pattern KEBAB_DEL = Pattern.compile("\\-([a-z])"); + static final Pattern SETTER_PREFIX = Pattern.compile("set([A-Za-z])"); + + static String kebabCase2CameCase(String val) { + return KEBAB_DEL.matcher(val).replaceAll(res -> res.group(1).toUpperCase()); + } + + static String stripSet(String val) { + return SETTER_PREFIX.matcher(val).replaceFirst(res -> res.group(1).toLowerCase()); + } +} diff --git a/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/JmsConnector.java b/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/JmsConnector.java index e6af2c8ba2c..0d90ecc6ee8 100644 --- a/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/JmsConnector.java +++ b/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/JmsConnector.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, 2021 Oracle and/or its affiliates. + * Copyright (c) 2020, 2022 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,8 @@ package io.helidon.messaging.connectors.jms; +import java.lang.reflect.Method; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -28,8 +30,10 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; +import java.util.function.Function; import java.util.logging.Level; import java.util.logging.Logger; +import java.util.stream.Collectors; import io.helidon.common.Builder; import io.helidon.common.configurable.ScheduledThreadPoolSupplier; @@ -381,6 +385,7 @@ public SubscriberBuilder, Void> getSubscriberBuilder(Config Session session = sessionEntry.session(); Destination destination = createDestination(session, ctx); MessageProducer producer = session.createProducer(destination); + configureProducer(producer, ctx); AtomicReference mapper = new AtomicReference<>(); return ReactiveStreams.>builder() .flatMapCompletionStage(m -> consume(m, session, mapper, producer, config)) @@ -392,6 +397,37 @@ public SubscriberBuilder, Void> getSubscriberBuilder(Config } } + private void configureProducer(MessageProducer producer, ConnectionContext ctx) { + io.helidon.config.Config config = ctx.config().get("producer"); + if (!config.exists()) return; + + Class clazz = producer.getClass(); + Map setterMethods = Arrays.stream(clazz.getDeclaredMethods()) + .filter(m -> m.getParameterCount() == 1) + .collect(Collectors.toMap(m -> ConfigHelper.stripSet(m.getName()), Function.identity())); + config.detach() + .traverse() + .forEach(c -> { + String key = c.key().name(); + String normalizedKey = ConfigHelper.kebabCase2CameCase(key); + Method m = setterMethods.get(normalizedKey); + if (m == null) { + LOGGER.log(Level.WARNING, + "JMS producer property " + key + " can't be set for producer " + clazz.getName()); + return; + } + try { + m.invoke(producer, c.as(m.getParameterTypes()[0]).get()); + } catch (Throwable e) { + LOGGER.log(Level.WARNING, + "Error when setting JMS producer property " + key + + " on " + clazz.getName() + + "." + m.getName(), + e); + } + }); + } + private void produce( BufferedEmittingPublisher> emitter, SessionMetadata sessionEntry, diff --git a/messaging/connectors/jms/src/test/java/io/helidon/messaging/connectors/jms/ConfigTest.java b/messaging/connectors/jms/src/test/java/io/helidon/messaging/connectors/jms/ConfigTest.java index 84d52a206e6..8b23c8a1722 100644 --- a/messaging/connectors/jms/src/test/java/io/helidon/messaging/connectors/jms/ConfigTest.java +++ b/messaging/connectors/jms/src/test/java/io/helidon/messaging/connectors/jms/ConfigTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, 2021 Oracle and/or its affiliates. + * Copyright (c) 2020, 2022 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -12,6 +12,7 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * */ package io.helidon.messaging.connectors.jms; @@ -42,9 +43,11 @@ import org.eclipse.microprofile.reactive.messaging.Message; import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams; import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder; +import org.hamcrest.Matchers; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import static org.hamcrest.MatcherAssert.assertThat; @@ -56,6 +59,7 @@ public class ConfigTest { private static JmsConnector conn; private static final HashMap results = new HashMap<>(); + private ArgumentCaptor producerPropertyCapture; @BeforeEach void before() throws JMSException { @@ -67,11 +71,13 @@ void before() throws JMSException { Queue queue = Mockito.mock(Queue.class); Topic topic = Mockito.mock(Topic.class); MessageConsumer consumer = Mockito.mock(MessageConsumer.class); - MessageProducer producer = Mockito.mock(MessageProducer.class); + CustMessageProducer producer = Mockito.mock(CustMessageProducer.class); jakarta.jms.Message msg = Mockito.mock(jakarta.jms.Message.class); Mockito.when(connectionFactory.createConnection()).thenReturn(jmsConnection); Mockito.when(instance.select(NamedLiteral.of("test-factory"))).thenReturn(instance); Mockito.when(instance.stream()).thenReturn(Stream.of(connectionFactory)); + producerPropertyCapture = ArgumentCaptor.forClass(String.class); + Mockito.doNothing().when(producer).setCustomProperty(producerPropertyCapture.capture()); Mockito.when(connectionFactory.createConnection(Mockito.anyString(), Mockito.anyString())).thenAnswer(i -> { results.put(JmsConnector.USERNAME_ATTRIBUTE, i.getArgument(0)); results.put(JmsConnector.PASSWORD_ATTRIBUTE, i.getArgument(1)); @@ -152,6 +158,38 @@ void defaultsSub() { assertThat(results, hasEntry(JmsConnector.TRANSACTED_ATTRIBUTE, JmsConnector.TRANSACTED_DEFAULT)); } + @Test + @SuppressWarnings("unchecked") + void producerPropertyConfigCameCase() { + SubscriberBuilder, Void> subscriberBuilder = + (SubscriberBuilder, Void>) conn.getSubscriberBuilder(conf(Map.of( + JmsConnector.CHANNEL_NAME_ATTRIBUTE, "test-1", + JmsConnector.CONNECTOR_ATTRIBUTE, JmsConnector.CONNECTOR_NAME, + JmsConnector.NAMED_FACTORY_ATTRIBUTE, "test-factory", + JmsConnector.DESTINATION_ATTRIBUTE, "testQueue1", + JmsConnector.USERNAME_ATTRIBUTE, "Jack", + JmsConnector.PASSWORD_ATTRIBUTE, "O'Neil", + "producer.customProperty", "test prop value" + ))); + assertThat(producerPropertyCapture.getValue(), Matchers.is("test prop value")); + } + + @Test + @SuppressWarnings("unchecked") + void producerPropertyConfigKebabCase() { + SubscriberBuilder, Void> subscriberBuilder = + (SubscriberBuilder, Void>) conn.getSubscriberBuilder(conf(Map.of( + JmsConnector.CHANNEL_NAME_ATTRIBUTE, "test-1", + JmsConnector.CONNECTOR_ATTRIBUTE, JmsConnector.CONNECTOR_NAME, + JmsConnector.NAMED_FACTORY_ATTRIBUTE, "test-factory", + JmsConnector.DESTINATION_ATTRIBUTE, "testQueue1", + JmsConnector.USERNAME_ATTRIBUTE, "Jack", + JmsConnector.PASSWORD_ATTRIBUTE, "O'Neil", + "producer.custom-property", "test prop value" + ))); + assertThat(producerPropertyCapture.getValue(), Matchers.is("test prop value")); + } + @Test void sessionConfigPub() { await(conn.getPublisherBuilder(conf(Map.of( @@ -240,4 +278,8 @@ private T await(CompletionStage stage) { return null; } } + + public static interface CustMessageProducer extends MessageProducer{ + public void setCustomProperty(String prop); + } } From 6d88ab56d37609b6f68ac9539af7c9ed67141ada Mon Sep 17 00:00:00 2001 From: Daniel Kec Date: Thu, 16 Jun 2022 15:02:27 +0200 Subject: [PATCH 2/2] Review issues 1 Signed-off-by: Daniel Kec --- .../helidon/messaging/connectors/jms/ConfigHelper.java | 2 +- .../helidon/messaging/connectors/jms/JmsConnector.java | 4 ++-- .../META-INF/native-image/reflection-config.json | 9 +++++++++ 3 files changed, 12 insertions(+), 3 deletions(-) create mode 100644 messaging/connectors/jms/src/main/resources/META-INF/native-image/reflection-config.json diff --git a/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/ConfigHelper.java b/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/ConfigHelper.java index 6c221d5deca..fc1c2c4e7c1 100644 --- a/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/ConfigHelper.java +++ b/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/ConfigHelper.java @@ -27,7 +27,7 @@ private ConfigHelper(){ static final Pattern KEBAB_DEL = Pattern.compile("\\-([a-z])"); static final Pattern SETTER_PREFIX = Pattern.compile("set([A-Za-z])"); - static String kebabCase2CameCase(String val) { + static String kebabCase2CamelCase(String val) { return KEBAB_DEL.matcher(val).replaceAll(res -> res.group(1).toUpperCase()); } diff --git a/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/JmsConnector.java b/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/JmsConnector.java index 0d90ecc6ee8..d8e40459cd2 100644 --- a/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/JmsConnector.java +++ b/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/JmsConnector.java @@ -409,7 +409,7 @@ private void configureProducer(MessageProducer producer, ConnectionContext ctx) .traverse() .forEach(c -> { String key = c.key().name(); - String normalizedKey = ConfigHelper.kebabCase2CameCase(key); + String normalizedKey = ConfigHelper.kebabCase2CamelCase(key); Method m = setterMethods.get(normalizedKey); if (m == null) { LOGGER.log(Level.WARNING, @@ -452,7 +452,7 @@ private void produce( if (message == null) { return; } - LOGGER.fine(() -> "Received message: " + message.toString()); + LOGGER.fine(() -> "Received message: " + message); JmsMessage preparedMessage = createMessage(message, executor, sessionEntry); lastMessage.set(preparedMessage); emitter.emit(preparedMessage); diff --git a/messaging/connectors/jms/src/main/resources/META-INF/native-image/reflection-config.json b/messaging/connectors/jms/src/main/resources/META-INF/native-image/reflection-config.json new file mode 100644 index 00000000000..6578a6b724a --- /dev/null +++ b/messaging/connectors/jms/src/main/resources/META-INF/native-image/reflection-config.json @@ -0,0 +1,9 @@ +{ + "annotated": [ + ], + "class-hierarchy": [ + "jakarta.jms.MessageProducer" + ], + "classes": [ + ] +}