Skip to content

Commit

Permalink
Control Blocking max-concurrency on mutiny requests level
Browse files Browse the repository at this point in the history
Adds maxConcurrency parameter to Blocking annotation

Deprecates smallrye.messaging.worker.pool-name.max-concurrency config option and replaces it with
smallrye.messaging.worker.pool-name.pool-size
  • Loading branch information
ozangunalp committed Jul 7, 2023
1 parent 86ff386 commit a7d89bc
Show file tree
Hide file tree
Showing 12 changed files with 133 additions and 20 deletions.
10 changes: 8 additions & 2 deletions api/revapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,13 @@
"criticality" : "highlight",
"minSeverity" : "POTENTIALLY_BREAKING",
"minCriticality" : "documented",
"differences" : [ ]
"differences" : [
{
"code": "java.method.addedToInterface",
"new": "method int io.smallrye.reactive.messaging.MediatorConfiguration::getMaxConcurrency()",
"justification": "Added getMaxConcurrency to MediatorConfiguration"
}
]
}
}, {
"extension" : "revapi.reporter.json",
Expand All @@ -46,4 +52,4 @@
"minCriticality" : "documented",
"output" : "out"
}
} ]
} ]
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public interface MediatorConfiguration {

boolean isBlockingExecutionOrdered();

int getMaxConcurrency();

/**
* Implementation of the {@link Invoker} interface that can be used to invoke the method described by this configuration
* The invoker class can either have a no-arg constructor in which case it's expected to be look up the bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@
@Target(METHOD)
public @interface Blocking {
String DEFAULT_WORKER_POOL = "<no-value>";
int DEFAULT_MAX_CONCURRENCY = -1;

/**
* Indicates the name of the worker pool to use for execution.
* By default all executions will be performed on the default worker pool.
*
* The maximum concurrency of a custom worker pool can be set with the following configuration key:
* <code>smallrye.messaging.worker.{pool-name}.max-concurrency</code>
* <code>smallrye.messaging.worker.{pool-name}.pool-size</code>
*
* @return custom worker pool name for blocking execution.
*/
Expand Down Expand Up @@ -53,4 +54,17 @@
* @return whether executions will be ordered.
*/
boolean ordered() default true;

/**
* Indicates the maximum level of concurrency when ordered is set to <code>false</code>.
*
* If maxConcurrency is not explicitly set and ordered is set to <code>false</code>,
* then maxConcurrency is considered <code>256</code>, to preserve backwards compatibility.
*
* If maxConcurrency is set to a positive value but ordered is set to <code>true</code> (the default value),
* then given maxConcurrency is applied and ordered is considered <code>false</code>.
*
* @return max concurrency level
*/
int maxConcurrency() default DEFAULT_MAX_CONCURRENCY;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.mutiny.helpers.queues.Queues;
import io.smallrye.reactive.messaging.Invoker;
import io.smallrye.reactive.messaging.MediatorConfiguration;
import io.smallrye.reactive.messaging.MethodParameterDescriptor;
Expand Down Expand Up @@ -77,6 +78,8 @@ public class DefaultMediatorConfiguration implements MediatorConfiguration {

private String workerPoolName = null;

private int maxConcurrency;

private boolean isOrderedExecution;

private final MediatorConfigurationSupport mediatorConfigurationSupport;
Expand Down Expand Up @@ -182,10 +185,19 @@ public void compute(List<Incoming> incomings, Outgoing outgoing, Blocking blocki
}
if (blocking != null) {
this.isBlocking = true;
this.isOrderedExecution = blocking.ordered();
if (!blocking.value().equals(Blocking.DEFAULT_WORKER_POOL)) {
this.workerPoolName = blocking.value();
}
if (blocking.ordered()) {
this.isOrderedExecution = true;
if (blocking.maxConcurrency() > 0) {
this.isOrderedExecution = false;
this.maxConcurrency = blocking.maxConcurrency();
}
} else {
this.isOrderedExecution = false;
this.maxConcurrency = blocking.maxConcurrency() > 0 ? blocking.maxConcurrency() : Queues.BUFFER_S;
}
}

MediatorConfigurationSupport.ValidationOutput validationOutput = this.mediatorConfigurationSupport.validate(this.shape,
Expand Down Expand Up @@ -315,6 +327,11 @@ public String getWorkerPoolName() {
return workerPoolName;
}

@Override
public int getMaxConcurrency() {
return maxConcurrency;
}

@Override
public boolean isBlockingExecutionOrdered() {
return isOrderedExecution;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,10 +323,11 @@ private void processMethodReturningIndividualMessageAndConsumingIndividualItem()
this.mapper = upstream -> {
Multi<? extends Message<?>> multi = MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration);
return multi
.onItem().transformToMultiAndMerge(message -> invokeBlocking(message, getArguments(message))
.onItem().transformToMulti(message -> invokeBlocking(message, getArguments(message))
.onItemOrFailure()
.transformToUni((o, t) -> this.handlePostInvocationWithMessage((Message<?>) o, t))
.onItem().transformToMulti(this::handleSkip));
.onItem().transformToMulti(this::handleSkip))
.merge(configuration.getMaxConcurrency());
};
}

Expand Down Expand Up @@ -358,9 +359,10 @@ private void processMethodReturningIndividualPayloadAndConsumingIndividualItem()
.onItem().transformToMulti(this::handleSkip));
} else {
this.mapper = upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration)
.onItem().transformToMultiAndMerge(message -> invokeBlocking(message, getArguments(message))
.onItem().transformToMulti(message -> invokeBlocking(message, getArguments(message))
.onItemOrFailure().transformToUni((r, f) -> handlePostInvocation(message, r, f))
.onItem().transformToMulti(this::handleSkip));
.onItem().transformToMulti(this::handleSkip))
.merge(configuration.getMaxConcurrency());
}

} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ private void produceIndividualMessages() {
.onItem().transform(o -> (Message<?>) o));
} else {
this.publisher = decorate(MultiUtils.createFromGenerator(this::invokeBlocking)
.onItem().transformToUniAndMerge(u -> u)
.onItem().transformToUni(u -> u).merge(configuration.getMaxConcurrency())
.onItem().transform(o -> (Message<?>) o));
}
} else {
Expand All @@ -163,7 +163,7 @@ private void produceIndividualPayloads() {
.onItem().transform(Message::of));
} else {
this.publisher = decorate(MultiUtils.createFromGenerator(this::invokeBlocking)
.onItem().transformToUniAndMerge(u -> u)
.onItem().transformToUni(u -> u).merge(configuration.getMaxConcurrency())
.onItem().transform(Message::of));
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,9 @@ private void processMethodReturningVoid() {
.invoke(failure -> health.reportApplicationFailure(configuration.methodAsString(), failure));
} else {
this.function = upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration)
.onItem().transformToUniAndMerge(msg -> invokeBlocking(msg, getArguments(msg))
.onItem().transformToUni(msg -> invokeBlocking(msg, getArguments(msg))
.onItemOrFailure().transformToUni(handleInvocationResult(msg)))
.merge(configuration.getMaxConcurrency())
.onFailure()
.invoke(failure -> health.reportApplicationFailure(configuration.methodAsString(), failure));
}
Expand Down Expand Up @@ -210,7 +211,7 @@ private void processMethodReturningACompletionStage() {
.onFailure().invoke(this::reportFailure);
} else {
this.function = upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration)
.onItem().transformToUniAndMerge(this::invokeBlockingAndHandleOutcome)
.onItem().transformToUni(this::invokeBlockingAndHandleOutcome).merge(configuration.getMaxConcurrency())
.onFailure().invoke(this::reportFailure);
}
} else {
Expand Down Expand Up @@ -243,7 +244,7 @@ private void processMethodReturningAUni() {
.onFailure().invoke(this::reportFailure);
} else {
this.function = upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration)
.onItem().transformToUniAndMerge(this::invokeBlockingAndHandleOutcome)
.onItem().transformToUni(this::invokeBlockingAndHandleOutcome).merge(configuration.getMaxConcurrency())
.onFailure().invoke(this::reportFailure);
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@
@ApplicationScoped
public class WorkerPoolRegistry {
private static final String WORKER_CONFIG_PREFIX = "smallrye.messaging.worker";
@Deprecated(since = "4.8.0")
private static final String WORKER_CONCURRENCY = "max-concurrency";
private static final String WORKER_POOL_SIZE = "pool-size";
private static final String WORKER_EXECUTE_TIME = "max-execute-time";

@Inject
Expand Down Expand Up @@ -156,16 +158,24 @@ public void defineWorker(String className, String method, String poolName) {
}

// Validate @Blocking worker pool has configuration to define concurrency
String poolSizeConfigKey = WORKER_CONFIG_PREFIX + "." + poolName + "." + WORKER_POOL_SIZE;
String concurrencyConfigKey = WORKER_CONFIG_PREFIX + "." + poolName + "." + WORKER_CONCURRENCY;
String executeTimeConfigKey = WORKER_CONFIG_PREFIX + "." + poolName + "." + WORKER_EXECUTE_TIME;
Optional<Integer> poolSize = configInstance.get().getOptionalValue(poolSizeConfigKey, Integer.class);
Optional<Integer> concurrency = configInstance.get().getOptionalValue(concurrencyConfigKey, Integer.class);
Optional<Duration> executeTime = configInstance.get().getOptionalValue(executeTimeConfigKey, Duration.class);
if (concurrency.isEmpty()) {
throw ex.illegalArgumentForWorkerConfigKey("@Blocking", className + "#" + method,
executeTimeConfigKey);
if (poolSize.isEmpty()) {
if (concurrency.isEmpty()) {
throw ex.illegalArgumentForWorkerConfigKey("@Blocking", className + "#" + method,
poolSizeConfigKey);
} else {
log.maxConcurrencyDeprecated();
workerConcurrency.put(poolName, concurrency.get());
}
} else {
workerConcurrency.put(poolName, poolSize.get());
}

workerConcurrency.put(poolName, concurrency.get());
executeTime.ifPresent(duration -> workerExecuteTime.put(poolName, duration));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public interface ProviderLogging extends BasicLogger {
void vertXInstanceCreated();

@LogMessage(level = Logger.Level.INFO)
@Message(id = 203, value = "Created worker pool named %s with concurrency of %d")
@Message(id = 203, value = "Created worker pool named %s with pool size of %d")
void workerPoolCreated(String workerName, Integer count);

@LogMessage(level = Logger.Level.WARN)
Expand Down Expand Up @@ -135,4 +135,8 @@ public interface ProviderLogging extends BasicLogger {

@Message(id = 240, value = "Could not find an SSLContext bean with the @Identifier=%s")
IllegalStateException couldFindSslContextWithIdentifier(String sslContextIdentifier);

@LogMessage(level = Logger.Level.WARN)
@Message(id = 241, value = "Use of `smallrye.messaging.worker.{pool-name}.max-concurrency` is deprecated, instead use `@Blocking(maxConcurrency)` to adjust the maximum concurrency level and `smallrye.messaging.worker.{pool-name}.pool-size` for worker pool size")
void maxConcurrencyDeprecated();
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,25 @@ void testIncomingBlockingUnordered() {
}
}

@Test
void testIncomingBlockingMaxConcurrency() {
addBeanClass(ProduceIn.class);
addBeanClass(IncomingMaxConcurrencyBlockingBean.class);
initialize();

IncomingMaxConcurrencyBlockingBean bean = container.getBeanManager().createInstance()
.select(IncomingMaxConcurrencyBlockingBean.class).get();

await().until(() -> bean.list().size() == 6);
assertThat(bean.list()).contains("a", "b", "c", "d", "e", "f");

List<String> threadNames = bean.threads().stream().distinct().collect(Collectors.toList());
assertThat(threadNames.contains(Thread.currentThread().getName())).isFalse();
for (String name : threadNames) {
assertThat(name.startsWith("vert.x-worker-thread-")).isTrue();
}
}

@Test
void testIncomingBlockingCustomPool() {
addBeanClass(ProduceIn.class);
Expand Down Expand Up @@ -246,7 +265,7 @@ public static class BlockingSubscriberOfMessage {
Map<String, String> payloads = new LinkedHashMap<>();

@Incoming("in")
@Blocking
@Blocking(ordered = false, maxConcurrency = 100)
public CompletionStage<Void> process(Message<String> event) {
payloads.put(event.getPayload(), Thread.currentThread().getName());
return event.ack();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package io.smallrye.reactive.messaging.blocking.beans;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;

import io.smallrye.reactive.messaging.annotations.Blocking;

@ApplicationScoped
public class IncomingMaxConcurrencyBlockingBean {
private List<String> list = new CopyOnWriteArrayList<>();
private List<String> threads = new CopyOnWriteArrayList<>();

@Incoming("in")
@Blocking(maxConcurrency = 100)
public void consume(String s) {
if (s.equals("b") || s.equals("d") || s.equals("f")) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
threads.add(Thread.currentThread().getName());
list.add(s);
}

public List<String> list() {
return list;
}

public List<String> threads() {
return threads;
}
}
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
smallrye.messaging.worker.my-pool.max-concurrency=2
smallrye.messaging.worker.my-pool.pool-size=2
smallrye.messaging.worker.another-pool.max-concurrency=5

0 comments on commit a7d89bc

Please sign in to comment.