Skip to content

Commit

Permalink
Added Vert.x context dispatching to the JMS connector
Browse files Browse the repository at this point in the history
  • Loading branch information
ozangunalp committed May 7, 2024
1 parent 2c1568a commit 549f06e
Show file tree
Hide file tree
Showing 10 changed files with 824 additions and 28 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.smallrye.reactive.messaging.jms;

import static io.smallrye.reactive.messaging.jms.i18n.JmsExceptions.ex;
import static io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage.captureContextMetadata;

import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
Expand All @@ -14,8 +15,9 @@
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.jms.fault.JmsFailureHandler;
import io.smallrye.reactive.messaging.json.JsonMapping;
import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage;

public class IncomingJmsMessage<T> implements org.eclipse.microprofile.reactive.messaging.Message<T> {
public class IncomingJmsMessage<T> implements ContextAwareMessage<T> {
private final Message delegate;
private final Executor executor;
private final Class<T> clazz;
Expand Down Expand Up @@ -45,7 +47,7 @@ public class IncomingJmsMessage<T> implements org.eclipse.microprofile.reactive.
}

this.jmsMetadata = new IncomingJmsMessageMetadata(message);
this.metadata = Metadata.of(this.jmsMetadata);
this.metadata = captureContextMetadata(this.jmsMetadata);
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -122,6 +124,7 @@ public CompletionStage<Void> ack(Metadata metadata) {
}
})
.runSubscriptionOn(executor)
.emitOn(this::runOnMessageContext)
.subscribeAsCompletionStage();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.smallrye.reactive.messaging.connector.OutboundConnector;
import io.smallrye.reactive.messaging.jms.fault.JmsFailureHandler;
import io.smallrye.reactive.messaging.json.JsonMapping;
import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder;
import io.smallrye.reactive.messaging.providers.i18n.ProviderLogging;

@ApplicationScoped
Expand Down Expand Up @@ -108,6 +109,9 @@ public class JmsConnector implements InboundConnector, OutboundConnector {
@Any
Instance<JmsFailureHandler.Factory> failureHandlerFactories;

@Inject
ExecutionHolder executionHolder;

private ExecutorService executor;
private JsonMapping jsonMapping;
private final List<JmsSource> sources = new CopyOnWriteArrayList<>();
Expand Down Expand Up @@ -142,7 +146,7 @@ public Flow.Publisher<? extends Message<?>> getPublisher(Config config) {
JmsConnectorIncomingConfiguration ic = new JmsConnectorIncomingConfiguration(config);
JmsResourceHolder<JMSConsumer> holder = new JmsResourceHolder<>(ic.getChannel(), () -> createJmsContext(ic));
contexts.add(holder);
JmsSource source = new JmsSource(holder, ic, jsonMapping, executor, failureHandlerFactories);
JmsSource source = new JmsSource(executionHolder.vertx(), holder, ic, jsonMapping, executor, failureHandlerFactories);
sources.add(source);
return source.getSource();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.reactive.messaging.jms.fault.JmsFailureHandler;
import io.smallrye.reactive.messaging.json.JsonMapping;
import io.vertx.core.impl.VertxInternal;
import io.vertx.mutiny.core.Context;
import io.vertx.mutiny.core.Vertx;

class JmsSource {
Expand All @@ -33,8 +35,10 @@ class JmsSource {
private final Instance<JmsFailureHandler.Factory> failureHandlerFactories;
private final JmsConnectorIncomingConfiguration config;
private final JmsFailureHandler failureHandler;
private final Context context;

JmsSource(JmsResourceHolder<JMSConsumer> resourceHolder, JmsConnectorIncomingConfiguration config, JsonMapping jsonMapping,
JmsSource(Vertx vertx, JmsResourceHolder<JMSConsumer> resourceHolder, JmsConnectorIncomingConfiguration config,
JsonMapping jsonMapping,
Executor executor, Instance<JmsFailureHandler.Factory> failureHandlerFactories) {
String channel = config.getChannel();
final String destinationName = config.getDestination().orElseGet(config::getChannel);
Expand All @@ -59,8 +63,10 @@ class JmsSource {
resourceHolder.getClient();
this.publisher = new JmsPublisher(resourceHolder);
this.failureHandlerFactories = failureHandlerFactories;
this.failureHandler = createFailureHandler(Vertx.vertx()); //FIXME Find correct vertx instance
this.context = Context.newInstance(((VertxInternal) vertx.getDelegate()).createEventLoopContext());
this.failureHandler = createFailureHandler();
source = Multi.createFrom().publisher(publisher)
.emitOn(r -> context.runOnContext(r))
.<IncomingJmsMessage<?>> map(m -> new IncomingJmsMessage<>(m, executor, jsonMapping, failureHandler))
.onFailure(t -> {
log.terminalErrorOnChannel(channel);
Expand Down Expand Up @@ -105,12 +111,12 @@ Multi<IncomingJmsMessage<?>> getSource() {
return source;
}

private JmsFailureHandler createFailureHandler(Vertx vertx) {
private JmsFailureHandler createFailureHandler() {
String strategy = config.getFailureStrategy();
Instance<JmsFailureHandler.Factory> failureHandlerFactory = failureHandlerFactories
.select(Identifier.Literal.of(strategy));
if (failureHandlerFactory.isResolvable()) {
return failureHandlerFactory.get().create(config, vertx, resourceHolder.getClient(), this::reportFailure);
return failureHandlerFactory.get().create(config, this::reportFailure);
} else {
throw ex.illegalArgumentInvalidFailureStrategy(strategy);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,13 @@
import java.util.function.BiConsumer;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.jms.JMSConsumer;

import org.eclipse.microprofile.reactive.messaging.Metadata;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.jms.IncomingJmsMessage;
import io.smallrye.reactive.messaging.jms.JmsConnectorIncomingConfiguration;
import io.vertx.mutiny.core.Vertx;

public class JmsFailStop implements JmsFailureHandler {

Expand All @@ -25,7 +23,7 @@ public class JmsFailStop implements JmsFailureHandler {
public static class Factory implements JmsFailureHandler.Factory {

@Override
public JmsFailureHandler create(JmsConnectorIncomingConfiguration config, Vertx vertx, JMSConsumer consumer,
public JmsFailureHandler create(JmsConnectorIncomingConfiguration config,
BiConsumer<Throwable, Boolean> reportFailure) {
return new JmsFailStop(config.getChannel(), reportFailure);
}
Expand All @@ -37,13 +35,12 @@ public <K, V> JmsFailStop(String channel, BiConsumer<Throwable, Boolean> reportF
}

@Override
public <T> Uni<Void> handle(
IncomingJmsMessage<T> message, Throwable reason, Metadata metadata) {
public <T> Uni<Void> handle(IncomingJmsMessage<T> message, Throwable reason, Metadata metadata) {
// We don't commit, we just fail and stop the client.
log.messageNackedFailStop(channel);
// report failure to the connector for health check
reportFailure.accept(reason, true);
return Uni.createFrom().<Void> failure(reason);
//.emitOn(message::runOnMessageContext);
return Uni.createFrom().<Void> failure(reason)
.emitOn(message::runOnMessageContext);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,12 @@

import java.util.function.BiConsumer;

import jakarta.jms.JMSConsumer;

import org.eclipse.microprofile.reactive.messaging.Metadata;

import io.smallrye.common.annotation.Experimental;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.jms.IncomingJmsMessage;
import io.smallrye.reactive.messaging.jms.JmsConnectorIncomingConfiguration;
import io.vertx.mutiny.core.Vertx;

/**
* Jms Failure handling strategy
Expand All @@ -34,8 +31,6 @@ interface Strategy {
interface Factory {
JmsFailureHandler create(
JmsConnectorIncomingConfiguration config,
Vertx vertx,
JMSConsumer consumer,
BiConsumer<Throwable, Boolean> reportFailure);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,13 @@
import java.util.function.BiConsumer;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.jms.JMSConsumer;

import org.eclipse.microprofile.reactive.messaging.Metadata;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.jms.IncomingJmsMessage;
import io.smallrye.reactive.messaging.jms.JmsConnectorIncomingConfiguration;
import io.vertx.mutiny.core.Vertx;

public class JmsIgnoreFailure implements JmsFailureHandler {

Expand All @@ -24,8 +22,8 @@ public class JmsIgnoreFailure implements JmsFailureHandler {
public static class Factory implements JmsFailureHandler.Factory {

@Override
public JmsFailureHandler create(JmsConnectorIncomingConfiguration config, Vertx vertx,
JMSConsumer consumer, BiConsumer<Throwable, Boolean> reportFailure) {
public JmsFailureHandler create(JmsConnectorIncomingConfiguration config,
BiConsumer<Throwable, Boolean> reportFailure) {
return new JmsIgnoreFailure(config.getChannel());
}
}
Expand All @@ -35,11 +33,11 @@ public JmsIgnoreFailure(String channel) {
}

@Override
public <T> Uni<Void> handle(
IncomingJmsMessage<T> message, Throwable reason, Metadata metadata) {
public <T> Uni<Void> handle(IncomingJmsMessage<T> message, Throwable reason, Metadata metadata) {
// We commit the message, log and continue
log.messageNackedIgnore(channel, reason.getMessage());
log.messageNackedFullIgnored(reason);
return Uni.createFrom().completionStage(message.ack());
return Uni.createFrom().completionStage(message.ack())
.emitOn(message::runOnMessageContext);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.support.JmsTestBase;
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;
import io.vertx.mutiny.core.Vertx;

@SuppressWarnings("ReactiveStreamsSubscriberImplementation")
public class JmsSourceTest extends JmsTestBase {
Expand Down Expand Up @@ -185,7 +186,7 @@ public void testReceptionOfMultipleMessages() {

@Test
public void testMultipleRequests() {
JmsSource source = new JmsSource(getResourceHolder("queue"),
JmsSource source = new JmsSource(Vertx.vertx(), getResourceHolder("queue"),
new JmsConnectorIncomingConfiguration(new MapBasedConfig().put("channel-name", "queue")),
null, null, failureHandlerFactories);
Publisher<IncomingJmsMessage<?>> publisher = source.getSource();
Expand Down Expand Up @@ -238,7 +239,7 @@ public void onComplete() {

@Test
public void testBroadcast() {
JmsSource source = new JmsSource(getResourceHolder("queue"),
JmsSource source = new JmsSource(Vertx.vertx(), getResourceHolder("queue"),
new JmsConnectorIncomingConfiguration(new MapBasedConfig()
.with("channel-name", "queue").with("broadcast", true)),
null, null, failureHandlerFactories);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package io.smallrye.reactive.messaging.jms;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import java.lang.reflect.Method;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.jms.ConnectionFactory;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.jboss.weld.environment.se.WeldContainer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.providers.locals.LocalContextMetadata;
import io.smallrye.reactive.messaging.support.JmsTestBase;
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;
import io.vertx.mutiny.core.Vertx;

public class LocalPropagationAckTest extends JmsTestBase {

private WeldContainer container;

private String destination;

@BeforeEach
public void initTopic(TestInfo testInfo) {
String cn = testInfo.getTestClass().map(Class::getSimpleName).orElse(UUID.randomUUID().toString());
String mn = testInfo.getTestMethod().map(Method::getName).orElse(UUID.randomUUID().toString());
destination = cn + "-" + mn + "-" + UUID.randomUUID().getMostSignificantBits();
}

private MapBasedConfig dataconfig() {
return new MapBasedConfig()
.with("mp.messaging.incoming.data.connector", JmsConnector.CONNECTOR_NAME)
.with("mp.messaging.incoming.data.destination", destination)
.with("mp.messaging.incoming.data.durable", false)
.with("mp.messaging.incoming.data.tracing.enabled", false);
}

private void produceIntegers() {
ConnectionFactory cf = container.getBeanManager().createInstance().select(ConnectionFactory.class).get();
AtomicInteger counter = new AtomicInteger(1);
produceIntegers(cf, destination, 5, counter::getAndIncrement);
}

private <T> T runApplication(MapBasedConfig config, Class<T> beanClass) {
config.write();
container = deploy(beanClass);

return container.getBeanManager().createInstance().select(beanClass).get();
}

@Test
public void testChannelWithAckOnMessageContext() {
IncomingChannelWithAckOnMessageContext bean = runApplication(dataconfig(),
IncomingChannelWithAckOnMessageContext.class);
bean.process(i -> i + 1);

produceIntegers();

await().atMost(20, TimeUnit.SECONDS).until(() -> bean.getResults().size() >= 5);
assertThat(bean.getResults()).containsExactly(2, 3, 4, 5, 6);
}

@Test
public void testChannelWithNackOnMessageContextFail() {
IncomingChannelWithAckOnMessageContext bean = runApplication(dataconfig()
.with("mp.messaging.incoming.data.failure-strategy", "fail"),
IncomingChannelWithAckOnMessageContext.class);
bean.process(i -> {
throw new RuntimeException("boom");
});

produceIntegers();

await().atMost(20, TimeUnit.SECONDS).until(() -> bean.getResults().size() >= 1);
assertThat(bean.getResults()).contains(1);
}

@Test
public void testChannelWithNackOnMessageContextIgnore() {
IncomingChannelWithAckOnMessageContext bean = runApplication(dataconfig()
.with("mp.messaging.incoming.data.failure-strategy", "ignore"),
IncomingChannelWithAckOnMessageContext.class);
bean.process(i -> {
throw new RuntimeException("boom");
});

produceIntegers();

await().atMost(20, TimeUnit.SECONDS).until(() -> bean.getResults().size() >= 5);
assertThat(bean.getResults()).containsExactly(1, 2, 3, 4, 5);
}

@ApplicationScoped
public static class IncomingChannelWithAckOnMessageContext {

private final List<Integer> list = new CopyOnWriteArrayList<>();

@Inject
@Channel("data")
Multi<Message<Integer>> incoming;

void process(Function<Integer, Integer> mapper) {
incoming.onItem()
.transformToUniAndConcatenate(msg -> Uni.createFrom()
.item(() -> msg.withPayload(mapper.apply(msg.getPayload())))
.chain(m -> Uni.createFrom().completionStage(m.ack()).replaceWith(m))
.onFailure().recoverWithUni(t -> Uni.createFrom().completionStage(msg.nack(t))
.onItemOrFailure().transform((unused, throwable) -> msg)))
.subscribe().with(m -> {
m.getMetadata(LocalContextMetadata.class).map(LocalContextMetadata::context).ifPresent(context -> {
if (Vertx.currentContext().getDelegate() == context) {
list.add(m.getPayload());
}
});
});
}

public List<Integer> getResults() {
return list;
}
}

}
Loading

0 comments on commit 549f06e

Please sign in to comment.