Skip to content

Commit

Permalink
Merge pull request #2572 from dankristensen/feature/jms_reconnect
Browse files Browse the repository at this point in the history
Implemented reconnect and downstream propagation when reading messages
  • Loading branch information
ozangunalp authored Apr 11, 2024
2 parents 75bfe4f + f1466c3 commit cfce2e3
Show file tree
Hide file tree
Showing 7 changed files with 380 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import jakarta.enterprise.inject.literal.NamedLiteral;
import jakarta.inject.Inject;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.JMSConsumer;
import jakarta.jms.JMSContext;
import jakarta.jms.JMSProducer;

import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.inject.ConfigProperty;
Expand Down Expand Up @@ -59,6 +61,11 @@
@ConnectorAttribute(name = "reply-to", description = "The reply to destination if any", direction = Direction.OUTGOING, type = "string")
@ConnectorAttribute(name = "reply-to-destination-type", description = "The type of destination for the response. It can be either `queue` or `topic`", direction = Direction.OUTGOING, type = "string", defaultValue = "queue")
@ConnectorAttribute(name = "merge", direction = OUTGOING, description = "Whether the connector should allow multiple upstreams", type = "boolean", defaultValue = "false")
@ConnectorAttribute(name = "retry", direction = Direction.INCOMING_AND_OUTGOING, description = "Whether to retry on terminal stream errors.", type = "boolean", defaultValue = "true")
@ConnectorAttribute(name = "retry.max-retries", direction = Direction.INCOMING_AND_OUTGOING, description = "Maximum number of retries for terminal stream errors.", type = "int", defaultValue = "3")
@ConnectorAttribute(name = "retry.initial-delay", direction = Direction.INCOMING_AND_OUTGOING, description = "The initial delay for the retry.", type = "string", defaultValue = "PT1S")
@ConnectorAttribute(name = "retry.max-delay", direction = Direction.INCOMING_AND_OUTGOING, description = "The maximum delay", type = "string", defaultValue = "PT10S")
@ConnectorAttribute(name = "retry.jitter", direction = Direction.INCOMING_AND_OUTGOING, description = "How much the delay jitters as a multiplier between 0 and 1. The formula is current delay * jitter. For example, with a current delay of 2H, a jitter of 0.5 will result in an actual delay somewhere between 1H and 3H.", type = "double", defaultValue = "0.5")
public class JmsConnector implements InboundConnector, OutboundConnector {

/**
Expand Down Expand Up @@ -96,7 +103,7 @@ public class JmsConnector implements InboundConnector, OutboundConnector {
private ExecutorService executor;
private JsonMapping jsonMapping;
private final List<JmsSource> sources = new CopyOnWriteArrayList<>();
private final List<JMSContext> contexts = new CopyOnWriteArrayList<>();
private final List<JmsResourceHolder<?>> contexts = new CopyOnWriteArrayList<>();

@PostConstruct
public void init() {
Expand All @@ -118,16 +125,16 @@ public void init() {
@PreDestroy
public void cleanup() {
sources.forEach(JmsSource::close);
contexts.forEach(JMSContext::close);
contexts.forEach(JmsResourceHolder::close);
this.executor.shutdown();
}

@Override
public Flow.Publisher<? extends Message<?>> getPublisher(Config config) {
JmsConnectorIncomingConfiguration ic = new JmsConnectorIncomingConfiguration(config);
JMSContext context = createJmsContext(ic);
contexts.add(context);
JmsSource source = new JmsSource(context, ic, jsonMapping, executor);
JmsResourceHolder<JMSConsumer> holder = new JmsResourceHolder<>(ic.getChannel(), () -> createJmsContext(ic));
contexts.add(holder);
JmsSource source = new JmsSource(holder, ic, jsonMapping, executor);
sources.add(source);
return source.getSource();
}
Expand All @@ -146,9 +153,9 @@ private JMSContext createJmsContext(JmsConnectorCommonConfiguration config) {
@Override
public Flow.Subscriber<? extends Message<?>> getSubscriber(Config config) {
JmsConnectorOutgoingConfiguration oc = new JmsConnectorOutgoingConfiguration(config);
JMSContext context = createJmsContext(oc);
contexts.add(context);
return new JmsSink(context, oc, jsonMapping, executor).getSink();
JmsResourceHolder<JMSProducer> holder = new JmsResourceHolder<>(oc.getChannel(), () -> createJmsContext(oc));
contexts.add(holder);
return new JmsSink(holder, oc, jsonMapping, executor).getSink();
}

private ConnectionFactory pickTheFactory(String factoryName) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package io.smallrye.reactive.messaging.jms;

import static io.smallrye.reactive.messaging.jms.i18n.JmsLogging.log;

import java.util.function.Function;
import java.util.function.Supplier;

import jakarta.jms.Destination;
import jakarta.jms.ExceptionListener;
import jakarta.jms.JMSContext;
import jakarta.jms.JMSException;

public class JmsResourceHolder<T> implements ExceptionListener {

private final String channel;
private final Supplier<JMSContext> contextCreator;
private Function<JmsResourceHolder<T>, Destination> destinationCreator;
private Function<JmsResourceHolder<T>, T> clientCreator;
private volatile JMSContext context;
private volatile Destination destination;
private volatile T client;

private volatile boolean closed = false;

public JmsResourceHolder(String channel, Supplier<JMSContext> contextCreator) {
this.channel = channel;
this.contextCreator = contextCreator;
}

public JmsResourceHolder<T> configure(Function<JmsResourceHolder<T>, Destination> destinationCreator,
Function<JmsResourceHolder<T>, T> clientCreator) {
this.destinationCreator = destinationCreator;
this.clientCreator = clientCreator;
return this;
}

public Destination getDestination() {
if (destination == null) {
synchronized (this) {
if (destination == null) {
destination = destinationCreator.apply(this);
}
}
}
return destination;
}

public T getClient() {
if (client == null) {
synchronized (this) {
if (client == null) {
client = clientCreator.apply(this);
}
}
}
return client;
}

public JMSContext getContext() {
if (context == null) {
synchronized (this) {
if (context == null) {
context = contextCreator.get();
closed = false;
context.setExceptionListener(this);
}
}
}
return context;
}

@Override
public void onException(JMSException exception) {
synchronized (this) {
if (closed) {
return;
}
log.jmsException(channel, exception);
close();
}
}

void close() {
synchronized (this) {
if (context != null) {
context.close();
context = null;
}
destination = null;
if (client != null) {
if (client instanceof AutoCloseable) {
try {
((AutoCloseable) client).close();
} catch (Exception e) {
log.infof(e, "Error closing client for channel %s", channel);
}
}
client = null;
}
closed = true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static io.smallrye.reactive.messaging.jms.i18n.JmsExceptions.ex;
import static io.smallrye.reactive.messaging.jms.i18n.JmsLogging.log;

import java.time.Duration;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;

Expand All @@ -21,61 +22,76 @@

class JmsSink {

private final JMSProducer producer;
private final Destination destination;
private final Flow.Subscriber<Message<?>> sink;
private final JMSContext context;
private final JsonMapping jsonMapping;
private final Executor executor;

JmsSink(JMSContext context, JmsConnectorOutgoingConfiguration config, JsonMapping jsonMapping, Executor executor) {
JmsSink(JmsResourceHolder<JMSProducer> resourceHolder, JmsConnectorOutgoingConfiguration config, JsonMapping jsonMapping,
Executor executor) {
String name = config.getDestination().orElseGet(config::getChannel);

this.destination = getDestination(context, name, config.getDestinationType());
this.context = context;
String type = config.getDestinationType();
boolean retry = config.getRetry();
int retryMaxRetries = config.getRetryMaxRetries();
Duration retryInitialDelay = Duration.parse(config.getRetryInitialDelay());
Duration retryMaxDelay = Duration.parse(config.getRetryMaxDelay());
double retryJitter = config.getRetryJitter();
resourceHolder.configure(r -> getDestination(r.getContext(), name, type),
r -> {
JMSContext context = r.getContext();
JMSProducer producer = context.createProducer();
config.getDeliveryDelay().ifPresent(producer::setDeliveryDelay);
config.getDeliveryMode().ifPresent(v -> {
if (v.equalsIgnoreCase("persistent")) {
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
} else if (v.equalsIgnoreCase("non_persistent")) {
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
} else {
throw ex.illegalArgumentInvalidDeliveryMode(v);
}
});
config.getDisableMessageId().ifPresent(producer::setDisableMessageID);
config.getDisableMessageTimestamp().ifPresent(producer::setDisableMessageTimestamp);
config.getCorrelationId().ifPresent(producer::setJMSCorrelationID);
config.getTtl().ifPresent(producer::setTimeToLive);
config.getPriority().ifPresent(producer::setPriority);
config.getReplyTo().ifPresent(rt -> {
String replyToDestinationType = config.getReplyToDestinationType();
Destination replyToDestination;
if (replyToDestinationType.equalsIgnoreCase("topic")) {
replyToDestination = context.createTopic(rt);
} else if (replyToDestinationType.equalsIgnoreCase("queue")) {
replyToDestination = context.createQueue(rt);
} else {
throw ex.illegalArgumentInvalidDestinationType(replyToDestinationType);
}
producer.setJMSReplyTo(replyToDestination);
});
return producer;
});
resourceHolder.getDestination();
resourceHolder.getClient();
this.jsonMapping = jsonMapping;
this.executor = executor;

producer = context.createProducer();
config.getDeliveryDelay().ifPresent(producer::setDeliveryDelay);
config.getDeliveryMode().ifPresent(v -> {
if (v.equalsIgnoreCase("persistent")) {
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
} else if (v.equalsIgnoreCase("non_persistent")) {
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
} else {
throw ex.illegalArgumentInvalidDeliveryMode(v);
}
});
config.getDisableMessageId().ifPresent(producer::setDisableMessageID);
config.getDisableMessageTimestamp().ifPresent(producer::setDisableMessageTimestamp);
config.getCorrelationId().ifPresent(producer::setJMSCorrelationID);
config.getTtl().ifPresent(producer::setTimeToLive);
config.getPriority().ifPresent(producer::setPriority);
config.getReplyTo().ifPresent(rt -> {
String replyToDestinationType = config.getReplyToDestinationType();
Destination replyToDestination;
if (replyToDestinationType.equalsIgnoreCase("topic")) {
replyToDestination = context.createTopic(rt);
} else if (replyToDestinationType.equalsIgnoreCase("queue")) {
replyToDestination = context.createQueue(rt);
} else {
throw ex.illegalArgumentInvalidDestinationType(replyToDestinationType);
}
producer.setJMSReplyTo(replyToDestination);
});

sink = MultiUtils.via(m -> m.onItem().transformToUniAndConcatenate(this::send)
sink = MultiUtils.via(m -> m.onItem().transformToUniAndConcatenate(message -> send(resourceHolder, message)
.onFailure(t -> retry)
.retry()
.withJitter(retryJitter)
.withBackOff(retryInitialDelay, retryMaxDelay)
.atMost(retryMaxRetries))
.onFailure().invoke(log::unableToSend));

}

private Uni<? extends Message<?>> send(Message<?> message) {
private Uni<? extends Message<?>> send(JmsResourceHolder<JMSProducer> resourceHolder, Message<?> message) {
Object payload = message.getPayload();

Destination destination = resourceHolder.getDestination();
JMSContext context = resourceHolder.getContext();
// If the payload is a JMS Message, send it as it is, ignoring metadata.
if (payload instanceof jakarta.jms.Message) {
return dispatch(message, () -> producer.send(destination, (jakarta.jms.Message) payload));
return dispatch(message,
() -> resourceHolder.getClient().send(destination, (jakarta.jms.Message) payload));
}

try {
Expand Down Expand Up @@ -129,12 +145,12 @@ private Uni<? extends Message<?>> send(Message<?> message) {
JmsPropertiesBuilder.OutgoingJmsProperties op = ((JmsPropertiesBuilder.OutgoingJmsProperties) properties);
op.getProperties().forEach(p -> p.apply(outgoing));
}
actualDestination = dest != null ? dest : this.destination;
actualDestination = dest != null ? dest : destination;
} else {
actualDestination = this.destination;
actualDestination = destination;
}

return dispatch(message, () -> producer.send(actualDestination, outgoing));
return dispatch(message, () -> resourceHolder.getClient().send(actualDestination, outgoing));
} catch (JMSException e) {
return Uni.createFrom().failure(new IllegalStateException(e));
}
Expand Down
Loading

0 comments on commit cfce2e3

Please sign in to comment.