diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/AbstractMediator.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/AbstractMediator.java index 8bb6a5a575..0f252e74f1 100644 --- a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/AbstractMediator.java +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/AbstractMediator.java @@ -44,6 +44,7 @@ public abstract class AbstractMediator { protected HealthCenter health; private Instance converters; private Instance extractors; + private int maxConcurrency; public AbstractMediator(MediatorConfiguration configuration) { this.configuration = configuration; @@ -99,6 +100,10 @@ public void setWorkerPoolRegistry(WorkerPoolRegistry workerPoolRegistry) { this.workerPoolRegistry = workerPoolRegistry; } + public void setMaxConcurrency(int maxConcurrency) { + this.maxConcurrency = maxConcurrency; + } + public void run() { // Do nothing by default. } @@ -259,8 +264,11 @@ public Instance extractors() { return extractors; } + public int maxConcurrency() { + return maxConcurrency; + } + public void terminate() { // Do nothing by default. } - } diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/ProcessorMediator.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/ProcessorMediator.java index 822d76ccd8..cde6e8afe1 100644 --- a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/ProcessorMediator.java +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/ProcessorMediator.java @@ -323,10 +323,11 @@ private void processMethodReturningIndividualMessageAndConsumingIndividualItem() this.mapper = upstream -> { Multi> multi = MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration); return multi - .onItem().transformToMultiAndMerge(message -> invokeBlocking(message, getArguments(message)) + .onItem().transformToMulti(message -> invokeBlocking(message, getArguments(message)) .onItemOrFailure() .transformToUni((o, t) -> this.handlePostInvocationWithMessage((Message) o, t)) - .onItem().transformToMulti(this::handleSkip)); + .onItem().transformToMulti(this::handleSkip)) + .merge(maxConcurrency()); }; } @@ -358,9 +359,10 @@ private void processMethodReturningIndividualPayloadAndConsumingIndividualItem() .onItem().transformToMulti(this::handleSkip)); } else { this.mapper = upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration) - .onItem().transformToMultiAndMerge(message -> invokeBlocking(message, getArguments(message)) + .onItem().transformToMulti(message -> invokeBlocking(message, getArguments(message)) .onItemOrFailure().transformToUni((r, f) -> handlePostInvocation(message, r, f)) - .onItem().transformToMulti(this::handleSkip)); + .onItem().transformToMulti(this::handleSkip)) + .merge(maxConcurrency()); } } else { diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/PublisherMediator.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/PublisherMediator.java index c64807b4a0..7b4d69beb7 100644 --- a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/PublisherMediator.java +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/PublisherMediator.java @@ -143,7 +143,7 @@ private void produceIndividualMessages() { .onItem().transform(o -> (Message) o)); } else { this.publisher = decorate(MultiUtils.createFromGenerator(this::invokeBlocking) - .onItem().transformToUniAndMerge(u -> u) + .onItem().transformToUni(u -> u).merge(maxConcurrency()) .onItem().transform(o -> (Message) o)); } } else { @@ -163,7 +163,7 @@ private void produceIndividualPayloads() { .onItem().transform(Message::of)); } else { this.publisher = decorate(MultiUtils.createFromGenerator(this::invokeBlocking) - .onItem().transformToUniAndMerge(u -> u) + .onItem().transformToUni(u -> u).merge(maxConcurrency()) .onItem().transform(Message::of)); } } else { diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/SubscriberMediator.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/SubscriberMediator.java index 12f4fd6e08..090d24fad0 100644 --- a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/SubscriberMediator.java +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/SubscriberMediator.java @@ -165,8 +165,9 @@ private void processMethodReturningVoid() { .invoke(failure -> health.reportApplicationFailure(configuration.methodAsString(), failure)); } else { this.function = upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration) - .onItem().transformToUniAndMerge(msg -> invokeBlocking(msg, getArguments(msg)) + .onItem().transformToUni(msg -> invokeBlocking(msg, getArguments(msg)) .onItemOrFailure().transformToUni(handleInvocationResult(msg))) + .merge(maxConcurrency()) .onFailure() .invoke(failure -> health.reportApplicationFailure(configuration.methodAsString(), failure)); } @@ -210,7 +211,7 @@ private void processMethodReturningACompletionStage() { .onFailure().invoke(this::reportFailure); } else { this.function = upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration) - .onItem().transformToUniAndMerge(this::invokeBlockingAndHandleOutcome) + .onItem().transformToUni(this::invokeBlockingAndHandleOutcome).merge(maxConcurrency()) .onFailure().invoke(this::reportFailure); } } else { @@ -243,7 +244,7 @@ private void processMethodReturningAUni() { .onFailure().invoke(this::reportFailure); } else { this.function = upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration) - .onItem().transformToUniAndMerge(this::invokeBlockingAndHandleOutcome) + .onItem().transformToUni(this::invokeBlockingAndHandleOutcome).merge(maxConcurrency()) .onFailure().invoke(this::reportFailure); } } else { diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/connectors/WorkerPoolRegistry.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/connectors/WorkerPoolRegistry.java index 067db34031..13bdf45677 100644 --- a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/connectors/WorkerPoolRegistry.java +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/connectors/WorkerPoolRegistry.java @@ -35,8 +35,8 @@ @ApplicationScoped public class WorkerPoolRegistry { - private static final String WORKER_CONFIG_PREFIX = "smallrye.messaging.worker"; - private static final String WORKER_CONCURRENCY = "max-concurrency"; + public static final String WORKER_CONFIG_PREFIX = "smallrye.messaging.worker"; + public static final String WORKER_CONCURRENCY = "max-concurrency"; @Inject Instance executionHolder; diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/extension/MediatorManager.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/extension/MediatorManager.java index 20f2042ca4..1d502baef8 100644 --- a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/extension/MediatorManager.java +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/extension/MediatorManager.java @@ -1,5 +1,7 @@ package io.smallrye.reactive.messaging.providers.extension; +import static io.smallrye.reactive.messaging.providers.connectors.WorkerPoolRegistry.WORKER_CONCURRENCY; +import static io.smallrye.reactive.messaging.providers.connectors.WorkerPoolRegistry.WORKER_CONFIG_PREFIX; import static io.smallrye.reactive.messaging.providers.i18n.ProviderLogging.log; import java.lang.reflect.Constructor; @@ -11,10 +13,12 @@ import jakarta.enterprise.inject.spi.*; import jakarta.inject.Inject; +import org.eclipse.microprofile.config.Config; import org.eclipse.microprofile.config.inject.ConfigProperty; import org.eclipse.microprofile.reactive.messaging.Incoming; import org.eclipse.microprofile.reactive.messaging.Outgoing; +import io.smallrye.mutiny.helpers.queues.Queues; import io.smallrye.reactive.messaging.*; import io.smallrye.reactive.messaging.EmitterConfiguration; import io.smallrye.reactive.messaging.PublisherDecorator; @@ -79,6 +83,9 @@ public class MediatorManager { @ConfigProperty(name = STRICT_MODE_PROPERTY, defaultValue = "false") boolean strictMode; + @Inject + Instance configInstance; + public void analyze(AnnotatedType annotatedType, Bean bean) { if (strictMode) { @@ -97,6 +104,22 @@ public void analyze(AnnotatedType annotatedType, Bean bean) { }); } + private int getWorkerMaxConcurrency(MediatorConfiguration configuration) { + // max concurrency is not relevant if not blocking + if (!configuration.isBlocking()) { + return -1; + } + String poolName = configuration.getWorkerPoolName(); + // if the poll name is null we are on the default worker pool, set the default concurrent requests + if (poolName == null) { + return Queues.BUFFER_S; + } + String concurrencyConfigKey = WORKER_CONFIG_PREFIX + "." + poolName + "." + WORKER_CONCURRENCY; + Optional concurrency = configInstance.get().getOptionalValue(concurrencyConfigKey, Integer.class); + // Fallback to the default concurrent requests if setting is not found + return concurrency.orElse(Queues.BUFFER_S); + } + /** * This method is used in the Quarkus extension. * @@ -177,6 +200,7 @@ public AbstractMediator createMediator(MediatorConfiguration configuration) { mediator.setExtractors(extractors); mediator.setHealth(health); mediator.setWorkerPoolRegistry(workerPoolRegistry); + mediator.setMaxConcurrency(getWorkerMaxConcurrency(configuration)); try { Object beanInstance = beanManager.getReference(configuration.getBean(), Object.class,