Skip to content

Commit

Permalink
implemented batching
Browse files Browse the repository at this point in the history
  • Loading branch information
musketyr committed Oct 24, 2024
1 parent 232fe77 commit 5da4775
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 11 deletions.
2 changes: 2 additions & 0 deletions docs/guide/src/docs/asciidoc/usage.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ Producer jobs are always run only on the leader server.
include::{root-dir}/libs/micronaut-worker-tck/src/main/groovy/com/agorapulse/worker/tck/queue/SendWordsJob.groovy[tag=simple-producer-method,indent=0]
----

TIP: If the producer job returns a `Publisher`, the messages can be batched if underlying implementation supports it. At the moment, only SQS AWS v2 implementation supports batching.

==== Consumer

Consumer jobs take a single parameter. They are usually a `@FixedRate` job waiting for a message from a queue.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ dependencies {
api "com.agorapulse:micronaut-amazon-awssdk-sqs:$micronautAwsSdkVersion"

implementation 'io.micronaut:micronaut-jackson-databind'
implementation 'io.micronaut.reactor:micronaut-reactor'

testImplementation project(':micronaut-worker-tck')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import io.micronaut.core.type.Argument;
import io.micronaut.jackson.JacksonConfiguration;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import software.amazon.awssdk.services.sqs.model.SqsException;

import java.time.Duration;
Expand Down Expand Up @@ -51,9 +53,7 @@ public <T> void readMessages(String queueName, int maxNumberOfMessages, Duration
@Override
public void sendMessage(String queueName, Object result) {
try {
simpleQueueService.sendMessage(queueName, objectMapper.writeValueAsString(result));
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("Cannot marshal object " + result + " to JSON", e);
simpleQueueService.sendMessage(queueName, convertMessageToJson(result));
} catch (SqsException sqsException) {
if (sqsException.getMessage() != null && sqsException.getMessage().contains("Concurrent access: Queue already exists")) {
sendMessage(queueName, result);
Expand All @@ -63,6 +63,11 @@ public void sendMessage(String queueName, Object result) {
}
}

@Override
public void sendMessages(String queueName, Publisher<?> result) {
Flux.from(simpleQueueService.sendMessages(queueName, Flux.from(result).map(this::convertMessageToJson))).subscribe();
}

private <T> void readMessageInternal(String queueName, Argument<T> argument, Consumer<T> action, String body, String handle, boolean tryReformat) {
try {
action.accept(objectMapper.readValue(body, JacksonConfiguration.constructType(argument, objectMapper.getTypeFactory())));
Expand All @@ -80,4 +85,12 @@ private <T> void readMessageInternal(String queueName, Argument<T> argument, Con
throw new IllegalArgumentException("Cannot convert to " + argument + "from message\n" + body, e);
}
}

private String convertMessageToJson(Object result) {
try {
return objectMapper.writeValueAsString(result);
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("Cannot marshal object " + result + " to JSON", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,14 +142,7 @@ protected void handleResult(JobConfiguration configuration, Publisher<Object> re
JobQueues sender = queues(configuration.getProducer().getQueueType());

if (result instanceof Publisher) {
Flux<?> publisher = Flux.from((Publisher<?>) result);
publisher.subscribe(o -> {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Sending message {} to {} using {}", o, queueName, sender);

}
sender.sendMessage(queueName, o);
}, t -> LOGGER.error("Exception sending messages to queue " + queueName, t));
sender.sendMessages(queueName, (Publisher<?>) result);
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package com.agorapulse.worker.queue;

import io.micronaut.core.type.Argument;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

import java.time.Duration;
import java.util.function.Consumer;
Expand All @@ -27,4 +29,8 @@ public interface JobQueues {
<T> void readMessages(String queueName, int maxNumberOfMessages, Duration waitTime, Argument<T> argument, Consumer<T> action);
void sendMessage(String queueName, Object result);

default void sendMessages(String queueName, Publisher<?> result) {
Flux.from(result).subscribe(message -> sendMessage(queueName, message));
}

}

0 comments on commit 5da4775

Please sign in to comment.