Skip to content

Commit

Permalink
Merge branch 'master' into feature/virtual-threads-support
Browse files Browse the repository at this point in the history
  • Loading branch information
musketyr committed Dec 11, 2024
2 parents 0af7d99 + bcf62a3 commit ec4e81d
Show file tree
Hide file tree
Showing 7 changed files with 284 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ interface QueueConfiguration {
interface ConsumerQueueConfiguration extends QueueConfiguration {

int DEFAULT_MAX_MESSAGES = 10;
Duration DEFAULT_WAITING_TIME = Duration.ofSeconds(20);
String DEFAULT_WAITING_TIME_STRING = "20s";

/**
* @return the number of messages which are fetched from the queue in a single poll, defaults to {@link #DEFAULT_MAX_MESSAGES} when set to <code>0</code>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,11 @@ public void mergeWith(ConsumerQueueConfiguration overrides) {
this.maxMessages = overrides.getMaxMessages();
}

if (overrides.getWaitingTime() != null && !overrides.getWaitingTime().isZero() && overrides.getWaitingTime() != this.waitingTime) {
if (overrides.getWaitingTime() != null
&& !overrides.getWaitingTime().isZero()
&& !overrides.getWaitingTime().equals(this.waitingTime)
&& !overrides.getWaitingTime().equals(DEFAULT_WAITING_TIME)
) {
this.waitingTime = overrides.getWaitingTime();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,22 @@
@Documented
@Consumes
@Fork(JobConfiguration.ConsumerQueueConfiguration.DEFAULT_MAX_MESSAGES)
@FixedRate("20s")
@FixedRate(JobConfiguration.ConsumerQueueConfiguration.DEFAULT_WAITING_TIME_STRING)
@Retention(RUNTIME)
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
public @interface QueueConsumer {

/**
* Allows to override the default name of the job which is <code>JobClassName</code> if there is only one executable
* method (e.g. job definition) in the class or <code>JobClassName-methodName</code> if there is more then one executable method in the class.
* <p>
* Either the job name specified here or the default name is converted using {@link io.micronaut.core.naming.NameUtils#hyphenate(String)}.
*
* @return the name of the job used for configuration
*/
@AliasFor(annotation = Named.class, member = "value")
String name() default "";

/**
* @return the name of the work queue to consume items from
*/
Expand All @@ -57,9 +68,9 @@
* The time to wait for the next message to be available and also the time to wait for the next run.
* @return the maximum waiting time as duration string
*/
@AliasFor(annotation = Consumes.class, member = "value")
@AliasFor(annotation = Consumes.class, member = "waitingTime")
@AliasFor(annotation = FixedRate.class, member = "value")
String waitingTime() default "";
String waitingTime() default JobConfiguration.ConsumerQueueConfiguration.DEFAULT_WAITING_TIME_STRING;

/**
* The number of messages to consume and also the number of threads to use to consume the messages.
Expand All @@ -70,7 +81,10 @@
int maxMessages() default JobConfiguration.ConsumerQueueConfiguration.DEFAULT_MAX_MESSAGES;

/**
* @return The name of a {@link Named} bean that is a
* The name of the task executor to use to execute the job. If default value is usd then new scheduled executor
* is created for each job with the number of threads equal to the fork value.
*
* @return The name of a {@link jakarta.inject.Named} bean that is a
* {@link java.util.concurrent.ScheduledExecutorService} to use to schedule the task
*/
@AliasFor(annotation = Job.class, member = "scheduler")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.agorapulse.worker.annotation.Job;
import com.agorapulse.worker.annotation.Produces;
import io.micronaut.context.annotation.AliasFor;
import jakarta.inject.Named;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
Expand Down Expand Up @@ -50,6 +51,12 @@
*
* @return the name of the job used for configuration
*/
@AliasFor(annotation = Named.class, member = "value")
String name() default "";

/**
* @return the name of the work queue to produce items to
*/
@AliasFor(annotation = Produces.class, member = "value")
String value() default "";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.ParallelFlux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

Expand Down Expand Up @@ -74,13 +75,14 @@ public void onApplicationEvent(RefreshEvent event) {

public <B> void invoke(MethodJob<B, ?> job, B bean, JobRunContext context) {
ExecutableMethod<B, ?> method = job.getMethod();
boolean producer = !method.getReturnType().isVoid();
JobConfiguration configuration = job.getConfiguration();

if (method.getArguments().length == 0) {
context.message(null);
handleResult(configuration, context, executor(context, configuration).apply(() -> {
handleResult(producer, configuration, context, executor(context, configuration).apply(() -> {
if (configuration.getFork() > 1) {
return Flux.range(0, configuration.getFork())
ParallelFlux<Object> resultsOfParallelExecution = Flux.range(0, configuration.getFork())
.parallel(configuration.getFork())
.runOn(getScheduler(job))
.flatMap(i -> {
Expand All @@ -102,12 +104,18 @@ public <B> void invoke(MethodJob<B, ?> job, B bean, JobRunContext context) {
return Mono.empty();
}
});

if (producer) {
return resultsOfParallelExecution.sequential(configuration.getFork());
}

return resultsOfParallelExecution.then();
}

return method.invoke(bean);
}));
} else if (method.getArguments().length == 1) {
handleResult(configuration, context, executor(context, configuration).apply(() -> {
handleResult(producer, configuration, context, executor(context, configuration).apply(() -> {
JobConfiguration.ConsumerQueueConfiguration queueConfiguration = configuration.getConsumer();
Flux<? extends QueueMessage<?>> messages = Flux.from(
queues(queueConfiguration.getQueueType()).readMessages(
Expand Down Expand Up @@ -145,10 +153,16 @@ public <B> void invoke(MethodJob<B, ?> job, B bean, JobRunContext context) {
};

if (configuration.getFork() > 1) {
return messages
ParallelFlux<Object> parallelFlux = messages
.parallel(configuration.getFork())
.runOn(getScheduler(job))
.flatMap(messageProcessor);

if (producer) {
return parallelFlux.sequential(configuration.getFork());
}

return parallelFlux.then();
}

return messages.flatMap(messageProcessor);
Expand All @@ -171,14 +185,25 @@ private <T> Function<Callable<T>, Publisher<T>> executor(JobRunContext context,
return s -> distributedJobExecutor.execute(context, s);
}

protected void handleResult(JobConfiguration configuration, JobRunContext callback, Publisher<Object> resultPublisher) {
protected void handleResult(boolean producer, JobConfiguration configuration, JobRunContext callback, Publisher<Object> resultPublisher) {
Object result = Flux.from(resultPublisher).blockFirst();

if (result == null) {
callback.finished();
return;
}

if (!producer && result instanceof Publisher<?> p) {
Mono.from(p)
.doOnNext(callback::result)
.doOnError(callback::error)
.doFinally(signalType -> callback.finished())
.block();

return;
}


String queueName = configuration.getProducer().getQueueName();

JobQueues sender = queues(configuration.getProducer().getQueueType());
Expand Down
Loading

0 comments on commit ec4e81d

Please sign in to comment.