Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented reconnect and downstream propagation when reading messages #2572

Merged
merged 2 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import jakarta.enterprise.inject.literal.NamedLiteral;
import jakarta.inject.Inject;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.JMSConsumer;
import jakarta.jms.JMSContext;
import jakarta.jms.JMSProducer;

import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.inject.ConfigProperty;
Expand Down Expand Up @@ -59,6 +61,11 @@
@ConnectorAttribute(name = "reply-to", description = "The reply to destination if any", direction = Direction.OUTGOING, type = "string")
@ConnectorAttribute(name = "reply-to-destination-type", description = "The type of destination for the response. It can be either `queue` or `topic`", direction = Direction.OUTGOING, type = "string", defaultValue = "queue")
@ConnectorAttribute(name = "merge", direction = OUTGOING, description = "Whether the connector should allow multiple upstreams", type = "boolean", defaultValue = "false")
@ConnectorAttribute(name = "retry", direction = Direction.INCOMING_AND_OUTGOING, description = "Whether to retry on terminal stream errors.", type = "boolean", defaultValue = "true")
@ConnectorAttribute(name = "retry.max-retries", direction = Direction.INCOMING_AND_OUTGOING, description = "Maximum number of retries for terminal stream errors.", type = "int", defaultValue = "3")
@ConnectorAttribute(name = "retry.initial-delay", direction = Direction.INCOMING_AND_OUTGOING, description = "The initial delay for the retry.", type = "string", defaultValue = "PT1S")
@ConnectorAttribute(name = "retry.max-delay", direction = Direction.INCOMING_AND_OUTGOING, description = "The maximum delay", type = "string", defaultValue = "PT10S")
@ConnectorAttribute(name = "retry.jitter", direction = Direction.INCOMING_AND_OUTGOING, description = "How much the delay jitters as a multiplier between 0 and 1. The formula is current delay * jitter. For example, with a current delay of 2H, a jitter of 0.5 will result in an actual delay somewhere between 1H and 3H.", type = "double", defaultValue = "0.5")
public class JmsConnector implements InboundConnector, OutboundConnector {

/**
Expand Down Expand Up @@ -96,7 +103,7 @@ public class JmsConnector implements InboundConnector, OutboundConnector {
private ExecutorService executor;
private JsonMapping jsonMapping;
private final List<JmsSource> sources = new CopyOnWriteArrayList<>();
private final List<JMSContext> contexts = new CopyOnWriteArrayList<>();
private final List<JmsResourceHolder<?>> contexts = new CopyOnWriteArrayList<>();

@PostConstruct
public void init() {
Expand All @@ -118,16 +125,16 @@ public void init() {
@PreDestroy
public void cleanup() {
sources.forEach(JmsSource::close);
contexts.forEach(JMSContext::close);
contexts.forEach(JmsResourceHolder::close);
this.executor.shutdown();
}

@Override
public Flow.Publisher<? extends Message<?>> getPublisher(Config config) {
JmsConnectorIncomingConfiguration ic = new JmsConnectorIncomingConfiguration(config);
JMSContext context = createJmsContext(ic);
contexts.add(context);
JmsSource source = new JmsSource(context, ic, jsonMapping, executor);
JmsResourceHolder<JMSConsumer> holder = new JmsResourceHolder<>(ic.getChannel(), () -> createJmsContext(ic));
contexts.add(holder);
JmsSource source = new JmsSource(holder, ic, jsonMapping, executor);
sources.add(source);
return source.getSource();
}
Expand All @@ -146,9 +153,9 @@ private JMSContext createJmsContext(JmsConnectorCommonConfiguration config) {
@Override
public Flow.Subscriber<? extends Message<?>> getSubscriber(Config config) {
JmsConnectorOutgoingConfiguration oc = new JmsConnectorOutgoingConfiguration(config);
JMSContext context = createJmsContext(oc);
contexts.add(context);
return new JmsSink(context, oc, jsonMapping, executor).getSink();
JmsResourceHolder<JMSProducer> holder = new JmsResourceHolder<>(oc.getChannel(), () -> createJmsContext(oc));
contexts.add(holder);
return new JmsSink(holder, oc, jsonMapping, executor).getSink();
}

private ConnectionFactory pickTheFactory(String factoryName) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package io.smallrye.reactive.messaging.jms;

import static io.smallrye.reactive.messaging.jms.i18n.JmsLogging.log;

import java.util.function.Function;
import java.util.function.Supplier;

import jakarta.jms.Destination;
import jakarta.jms.ExceptionListener;
import jakarta.jms.JMSContext;
import jakarta.jms.JMSException;

public class JmsResourceHolder<T> implements ExceptionListener {

private final String channel;
private final Supplier<JMSContext> contextCreator;
private Function<JmsResourceHolder<T>, Destination> destinationCreator;
private Function<JmsResourceHolder<T>, T> clientCreator;
private volatile JMSContext context;
private volatile Destination destination;
private volatile T client;

private volatile boolean closed = false;

public JmsResourceHolder(String channel, Supplier<JMSContext> contextCreator) {
this.channel = channel;
this.contextCreator = contextCreator;
}

public JmsResourceHolder<T> configure(Function<JmsResourceHolder<T>, Destination> destinationCreator,
Function<JmsResourceHolder<T>, T> clientCreator) {
this.destinationCreator = destinationCreator;
this.clientCreator = clientCreator;
return this;
}

public Destination getDestination() {
if (destination == null) {
synchronized (this) {
if (destination == null) {
destination = destinationCreator.apply(this);
}
}
}
return destination;
}

public T getClient() {
if (client == null) {
synchronized (this) {
if (client == null) {
client = clientCreator.apply(this);
}
}
}
return client;
}

public JMSContext getContext() {
if (context == null) {
synchronized (this) {
if (context == null) {
context = contextCreator.get();
closed = false;
context.setExceptionListener(this);
}
}
}
return context;
}

@Override
public void onException(JMSException exception) {
synchronized (this) {
if (closed) {
return;
}
log.jmsException(channel, exception);
close();
}
}

void close() {
synchronized (this) {
if (context != null) {
context.close();
context = null;
}
destination = null;
if (client != null) {
if (client instanceof AutoCloseable) {
try {
((AutoCloseable) client).close();
} catch (Exception e) {
log.infof(e, "Error closing client for channel %s", channel);
}
}
client = null;
}
closed = true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static io.smallrye.reactive.messaging.jms.i18n.JmsExceptions.ex;
import static io.smallrye.reactive.messaging.jms.i18n.JmsLogging.log;

import java.time.Duration;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;

Expand All @@ -21,61 +22,76 @@

class JmsSink {

private final JMSProducer producer;
private final Destination destination;
private final Flow.Subscriber<Message<?>> sink;
private final JMSContext context;
private final JsonMapping jsonMapping;
private final Executor executor;

JmsSink(JMSContext context, JmsConnectorOutgoingConfiguration config, JsonMapping jsonMapping, Executor executor) {
JmsSink(JmsResourceHolder<JMSProducer> resourceHolder, JmsConnectorOutgoingConfiguration config, JsonMapping jsonMapping,
Executor executor) {
String name = config.getDestination().orElseGet(config::getChannel);

this.destination = getDestination(context, name, config.getDestinationType());
this.context = context;
String type = config.getDestinationType();
boolean retry = config.getRetry();
int retryMaxRetries = config.getRetryMaxRetries();
Duration retryInitialDelay = Duration.parse(config.getRetryInitialDelay());
Duration retryMaxDelay = Duration.parse(config.getRetryMaxDelay());
double retryJitter = config.getRetryJitter();
resourceHolder.configure(r -> getDestination(r.getContext(), name, type),
r -> {
JMSContext context = r.getContext();
JMSProducer producer = context.createProducer();
config.getDeliveryDelay().ifPresent(producer::setDeliveryDelay);
config.getDeliveryMode().ifPresent(v -> {
if (v.equalsIgnoreCase("persistent")) {
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
} else if (v.equalsIgnoreCase("non_persistent")) {
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
} else {
throw ex.illegalArgumentInvalidDeliveryMode(v);
}
});
config.getDisableMessageId().ifPresent(producer::setDisableMessageID);
config.getDisableMessageTimestamp().ifPresent(producer::setDisableMessageTimestamp);
config.getCorrelationId().ifPresent(producer::setJMSCorrelationID);
config.getTtl().ifPresent(producer::setTimeToLive);
config.getPriority().ifPresent(producer::setPriority);
config.getReplyTo().ifPresent(rt -> {
String replyToDestinationType = config.getReplyToDestinationType();
Destination replyToDestination;
if (replyToDestinationType.equalsIgnoreCase("topic")) {
replyToDestination = context.createTopic(rt);
} else if (replyToDestinationType.equalsIgnoreCase("queue")) {
replyToDestination = context.createQueue(rt);
} else {
throw ex.illegalArgumentInvalidDestinationType(replyToDestinationType);
}
producer.setJMSReplyTo(replyToDestination);
});
return producer;
});
resourceHolder.getDestination();
resourceHolder.getClient();
this.jsonMapping = jsonMapping;
this.executor = executor;

producer = context.createProducer();
config.getDeliveryDelay().ifPresent(producer::setDeliveryDelay);
config.getDeliveryMode().ifPresent(v -> {
if (v.equalsIgnoreCase("persistent")) {
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
} else if (v.equalsIgnoreCase("non_persistent")) {
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
} else {
throw ex.illegalArgumentInvalidDeliveryMode(v);
}
});
config.getDisableMessageId().ifPresent(producer::setDisableMessageID);
config.getDisableMessageTimestamp().ifPresent(producer::setDisableMessageTimestamp);
config.getCorrelationId().ifPresent(producer::setJMSCorrelationID);
config.getTtl().ifPresent(producer::setTimeToLive);
config.getPriority().ifPresent(producer::setPriority);
config.getReplyTo().ifPresent(rt -> {
String replyToDestinationType = config.getReplyToDestinationType();
Destination replyToDestination;
if (replyToDestinationType.equalsIgnoreCase("topic")) {
replyToDestination = context.createTopic(rt);
} else if (replyToDestinationType.equalsIgnoreCase("queue")) {
replyToDestination = context.createQueue(rt);
} else {
throw ex.illegalArgumentInvalidDestinationType(replyToDestinationType);
}
producer.setJMSReplyTo(replyToDestination);
});

sink = MultiUtils.via(m -> m.onItem().transformToUniAndConcatenate(this::send)
sink = MultiUtils.via(m -> m.onItem().transformToUniAndConcatenate(message -> send(resourceHolder, message)
.onFailure(t -> retry)
.retry()
.withJitter(retryJitter)
.withBackOff(retryInitialDelay, retryMaxDelay)
.atMost(retryMaxRetries))
.onFailure().invoke(log::unableToSend));

}

private Uni<? extends Message<?>> send(Message<?> message) {
private Uni<? extends Message<?>> send(JmsResourceHolder<JMSProducer> resourceHolder, Message<?> message) {
Object payload = message.getPayload();

Destination destination = resourceHolder.getDestination();
JMSContext context = resourceHolder.getContext();
// If the payload is a JMS Message, send it as it is, ignoring metadata.
if (payload instanceof jakarta.jms.Message) {
return dispatch(message, () -> producer.send(destination, (jakarta.jms.Message) payload));
return dispatch(message,
() -> resourceHolder.getClient().send(destination, (jakarta.jms.Message) payload));
}

try {
Expand Down Expand Up @@ -129,12 +145,12 @@ private Uni<? extends Message<?>> send(Message<?> message) {
JmsPropertiesBuilder.OutgoingJmsProperties op = ((JmsPropertiesBuilder.OutgoingJmsProperties) properties);
op.getProperties().forEach(p -> p.apply(outgoing));
}
actualDestination = dest != null ? dest : this.destination;
actualDestination = dest != null ? dest : destination;
} else {
actualDestination = this.destination;
actualDestination = destination;
}

return dispatch(message, () -> producer.send(actualDestination, outgoing));
return dispatch(message, () -> resourceHolder.getClient().send(actualDestination, outgoing));
} catch (JMSException e) {
return Uni.createFrom().failure(new IllegalStateException(e));
}
Expand Down
Loading
Loading