Skip to content

Commit

Permalink
Merge pull request #2652 from ozangunalp/rabbit_amqp_context_per_channel
Browse files Browse the repository at this point in the history
Create per channel Vert.x context for Vert.x-client based connectors
  • Loading branch information
ozangunalp authored Jun 25, 2024
2 parents 618c1e0 + 61321c6 commit 3b6a92b
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder;
import io.smallrye.reactive.messaging.providers.helpers.MultiUtils;
import io.smallrye.reactive.messaging.providers.helpers.VertxContext;
import io.smallrye.reactive.messaging.providers.impl.ConcurrencyConnectorConfig;
import io.vertx.amqp.AmqpClientOptions;
import io.vertx.amqp.AmqpReceiverOptions;
import io.vertx.amqp.AmqpSenderOptions;
Expand Down Expand Up @@ -224,10 +223,7 @@ public Flow.Publisher<? extends Message<?>> getPublisher(Config config) {

AmqpClient client = AmqpClientHelper.createClient(this, ic, clientOptions, clientSslContexts);

Context root = null;
if (ConcurrencyConnectorConfig.getConcurrency(config).filter(i -> i > 1).isPresent()) {
root = Context.newInstance(((VertxInternal) getVertx().getDelegate()).createEventLoopContext());
}
Context root = Context.newInstance(((VertxInternal) getVertx().getDelegate()).createEventLoopContext());
ConnectionHolder holder = new ConnectionHolder(client, ic, getVertx(), root);
holders.put(ic.getChannel(), holder);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,14 +276,21 @@ public void testWithDisconnection() {
.extracting(m -> m.getBody(String.class))
.containsExactly("1"));

// close client
close();

// send just before stopping
bean.send("2");
stopArtemis();

assertThat(received).hasSize(1);

startArtemis();

// send after restart
bean.send("3");

// init the client
init();
consumer = jms.createConsumer(q);
consumer.setMessageListener(received::add);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import io.smallrye.reactive.messaging.mqtt.session.MqttClientSessionOptions;
import io.smallrye.reactive.messaging.mqtt.session.RequestedQoS;
import io.smallrye.reactive.messaging.providers.helpers.VertxContext;
import io.smallrye.reactive.messaging.providers.impl.ConcurrencyConnectorConfig;
import io.vertx.core.impl.VertxInternal;
import io.vertx.mutiny.core.Context;
import io.vertx.mutiny.core.Vertx;
Expand Down Expand Up @@ -54,9 +53,7 @@ public MqttSource(Vertx vertx, MqttConnectorIncomingConfiguration config,
} else {
pattern = null;
}
final Context root = ConcurrencyConnectorConfig.getConcurrency(config.config).filter(i -> i > 1)
.map(__ -> Context.newInstance(((VertxInternal) vertx.getDelegate()).createEventLoopContext()))
.orElse(null);
final Context root = Context.newInstance(((VertxInternal) vertx.getDelegate()).createEventLoopContext());
holder = Clients.getHolder(vertx, options);
holder.start().onSuccess(ignore -> started.set(true));
holder.getClient()
Expand All @@ -69,7 +66,7 @@ public MqttSource(Vertx vertx, MqttConnectorIncomingConfiguration config,

this.source = holder.stream()
.select().where(m -> MqttTopicHelper.matches(topic, pattern, m))
.plug(m -> (root != null) ? m.emitOn(c -> VertxContext.runOnContext(root.getDelegate(), c)) : m)
.emitOn(c -> VertxContext.runOnContext(root.getDelegate(), c))
.onItem().transform(m -> new ReceivingMqttMessage(m, onNack))
.stage(multi -> {
if (broadcast)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.providers.helpers.CDIUtils;
import io.smallrye.reactive.messaging.providers.helpers.VertxContext;
import io.smallrye.reactive.messaging.providers.impl.ConcurrencyConnectorConfig;
import io.smallrye.reactive.messaging.rabbitmq.ClientHolder;
import io.smallrye.reactive.messaging.rabbitmq.IncomingRabbitMQMessage;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnector;
Expand Down Expand Up @@ -131,10 +130,7 @@ private Uni<Tuple2<ClientHolder, RabbitMQConsumer>> createConsumer(RabbitMQConne
.subscribe().with(ignored -> promise.complete(), promise::fail);
});

Context root = null;
if (ConcurrencyConnectorConfig.getConcurrency(ic.config()).filter(i -> i > 1).isPresent()) {
root = Context.newInstance(((VertxInternal) connector.vertx().getDelegate()).createEventLoopContext());
}
Context root = Context.newInstance(((VertxInternal) connector.vertx().getDelegate()).createEventLoopContext());
final ClientHolder holder = new ClientHolder(client, ic, connector.vertx(), root);
return holder.getOrEstablishConnection()
.invoke(() -> log.connectionEstablished(ic.getChannel()))
Expand Down

0 comments on commit 3b6a92b

Please sign in to comment.