Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Blocking improvements max-concurrency #2219

Merged
merged 1 commit into from
Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public abstract class AbstractMediator {
protected HealthCenter health;
private Instance<MessageConverter> converters;
private Instance<KeyValueExtractor> extractors;
private int maxConcurrency;

public AbstractMediator(MediatorConfiguration configuration) {
this.configuration = configuration;
Expand Down Expand Up @@ -99,6 +100,10 @@ public void setWorkerPoolRegistry(WorkerPoolRegistry workerPoolRegistry) {
this.workerPoolRegistry = workerPoolRegistry;
}

public void setMaxConcurrency(int maxConcurrency) {
this.maxConcurrency = maxConcurrency;
}

public void run() {
// Do nothing by default.
}
Expand Down Expand Up @@ -259,8 +264,11 @@ public Instance<KeyValueExtractor> extractors() {
return extractors;
}

public int maxConcurrency() {
return maxConcurrency;
}

public void terminate() {
// Do nothing by default.
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -323,10 +323,11 @@ private void processMethodReturningIndividualMessageAndConsumingIndividualItem()
this.mapper = upstream -> {
Multi<? extends Message<?>> multi = MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration);
return multi
.onItem().transformToMultiAndMerge(message -> invokeBlocking(message, getArguments(message))
.onItem().transformToMulti(message -> invokeBlocking(message, getArguments(message))
.onItemOrFailure()
.transformToUni((o, t) -> this.handlePostInvocationWithMessage((Message<?>) o, t))
.onItem().transformToMulti(this::handleSkip));
.onItem().transformToMulti(this::handleSkip))
.merge(maxConcurrency());
};
}

Expand Down Expand Up @@ -358,9 +359,10 @@ private void processMethodReturningIndividualPayloadAndConsumingIndividualItem()
.onItem().transformToMulti(this::handleSkip));
} else {
this.mapper = upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration)
.onItem().transformToMultiAndMerge(message -> invokeBlocking(message, getArguments(message))
.onItem().transformToMulti(message -> invokeBlocking(message, getArguments(message))
.onItemOrFailure().transformToUni((r, f) -> handlePostInvocation(message, r, f))
.onItem().transformToMulti(this::handleSkip));
.onItem().transformToMulti(this::handleSkip))
.merge(maxConcurrency());
}

} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ private void produceIndividualMessages() {
.onItem().transform(o -> (Message<?>) o));
} else {
this.publisher = decorate(MultiUtils.createFromGenerator(this::invokeBlocking)
.onItem().transformToUniAndMerge(u -> u)
.onItem().transformToUni(u -> u).merge(maxConcurrency())
.onItem().transform(o -> (Message<?>) o));
}
} else {
Expand All @@ -163,7 +163,7 @@ private void produceIndividualPayloads() {
.onItem().transform(Message::of));
} else {
this.publisher = decorate(MultiUtils.createFromGenerator(this::invokeBlocking)
.onItem().transformToUniAndMerge(u -> u)
.onItem().transformToUni(u -> u).merge(maxConcurrency())
.onItem().transform(Message::of));
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,9 @@ private void processMethodReturningVoid() {
.invoke(failure -> health.reportApplicationFailure(configuration.methodAsString(), failure));
} else {
this.function = upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration)
.onItem().transformToUniAndMerge(msg -> invokeBlocking(msg, getArguments(msg))
.onItem().transformToUni(msg -> invokeBlocking(msg, getArguments(msg))
.onItemOrFailure().transformToUni(handleInvocationResult(msg)))
.merge(maxConcurrency())
.onFailure()
.invoke(failure -> health.reportApplicationFailure(configuration.methodAsString(), failure));
}
Expand Down Expand Up @@ -210,7 +211,7 @@ private void processMethodReturningACompletionStage() {
.onFailure().invoke(this::reportFailure);
} else {
this.function = upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration)
.onItem().transformToUniAndMerge(this::invokeBlockingAndHandleOutcome)
.onItem().transformToUni(this::invokeBlockingAndHandleOutcome).merge(maxConcurrency())
.onFailure().invoke(this::reportFailure);
}
} else {
Expand Down Expand Up @@ -243,7 +244,7 @@ private void processMethodReturningAUni() {
.onFailure().invoke(this::reportFailure);
} else {
this.function = upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration)
.onItem().transformToUniAndMerge(this::invokeBlockingAndHandleOutcome)
.onItem().transformToUni(this::invokeBlockingAndHandleOutcome).merge(maxConcurrency())
.onFailure().invoke(this::reportFailure);
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@

@ApplicationScoped
public class WorkerPoolRegistry {
private static final String WORKER_CONFIG_PREFIX = "smallrye.messaging.worker";
private static final String WORKER_CONCURRENCY = "max-concurrency";
public static final String WORKER_CONFIG_PREFIX = "smallrye.messaging.worker";
public static final String WORKER_CONCURRENCY = "max-concurrency";

@Inject
Instance<ExecutionHolder> executionHolder;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.smallrye.reactive.messaging.providers.extension;

import static io.smallrye.reactive.messaging.providers.connectors.WorkerPoolRegistry.WORKER_CONCURRENCY;
import static io.smallrye.reactive.messaging.providers.connectors.WorkerPoolRegistry.WORKER_CONFIG_PREFIX;
import static io.smallrye.reactive.messaging.providers.i18n.ProviderLogging.log;

import java.lang.reflect.Constructor;
Expand All @@ -11,10 +13,12 @@
import jakarta.enterprise.inject.spi.*;
import jakarta.inject.Inject;

import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.mutiny.helpers.queues.Queues;
import io.smallrye.reactive.messaging.*;
import io.smallrye.reactive.messaging.EmitterConfiguration;
import io.smallrye.reactive.messaging.PublisherDecorator;
Expand Down Expand Up @@ -79,6 +83,9 @@ public class MediatorManager {
@ConfigProperty(name = STRICT_MODE_PROPERTY, defaultValue = "false")
boolean strictMode;

@Inject
Instance<Config> configInstance;

public <T> void analyze(AnnotatedType<T> annotatedType, Bean<T> bean) {

if (strictMode) {
Expand All @@ -97,6 +104,22 @@ public <T> void analyze(AnnotatedType<T> annotatedType, Bean<T> bean) {
});
}

private int getWorkerMaxConcurrency(MediatorConfiguration configuration) {
// max concurrency is not relevant if not blocking
if (!configuration.isBlocking()) {
return -1;
}
String poolName = configuration.getWorkerPoolName();
// if the poll name is null we are on the default worker pool, set the default concurrent requests
if (poolName == null) {
return Queues.BUFFER_S;
}
String concurrencyConfigKey = WORKER_CONFIG_PREFIX + "." + poolName + "." + WORKER_CONCURRENCY;
Optional<Integer> concurrency = configInstance.get().getOptionalValue(concurrencyConfigKey, Integer.class);
// Fallback to the default concurrent requests if setting is not found
return concurrency.orElse(Queues.BUFFER_S);
}

/**
* This method is used in the Quarkus extension.
*
Expand Down Expand Up @@ -177,6 +200,7 @@ public AbstractMediator createMediator(MediatorConfiguration configuration) {
mediator.setExtractors(extractors);
mediator.setHealth(health);
mediator.setWorkerPoolRegistry(workerPoolRegistry);
mediator.setMaxConcurrency(getWorkerMaxConcurrency(configuration));

try {
Object beanInstance = beanManager.getReference(configuration.getBean(), Object.class,
Expand Down