Skip to content

Commit

Permalink
Merge pull request #57 from agorapulse/fix/handle-returned-publisher-…
Browse files Browse the repository at this point in the history
…from-producer

handle publisher returned from non-producer method
  • Loading branch information
musketyr authored Dec 11, 2024
2 parents 7cc66c5 + ce6fef5 commit d817540
Showing 1 changed file with 14 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public <B> void invoke(MethodJob<B, ?> job, B bean, JobRunContext context) {

if (method.getArguments().length == 0) {
context.message(null);
handleResult(configuration, context, executor(context, configuration).apply(() -> {
handleResult(producer, configuration, context, executor(context, configuration).apply(() -> {
if (configuration.getFork() > 1) {
ParallelFlux<Object> resultsOfParallelExecution = Flux.range(0, configuration.getFork())
.parallel(configuration.getFork())
Expand Down Expand Up @@ -115,7 +115,7 @@ public <B> void invoke(MethodJob<B, ?> job, B bean, JobRunContext context) {
return method.invoke(bean);
}));
} else if (method.getArguments().length == 1) {
handleResult(configuration, context, executor(context, configuration).apply(() -> {
handleResult(producer, configuration, context, executor(context, configuration).apply(() -> {
JobConfiguration.ConsumerQueueConfiguration queueConfiguration = configuration.getConsumer();
Flux<? extends QueueMessage<?>> messages = Flux.from(
queues(queueConfiguration.getQueueType()).readMessages(
Expand Down Expand Up @@ -185,14 +185,25 @@ private <T> Function<Callable<T>, Publisher<T>> executor(JobRunContext context,
return s -> distributedJobExecutor.execute(context, s);
}

protected void handleResult(JobConfiguration configuration, JobRunContext callback, Publisher<Object> resultPublisher) {
protected void handleResult(boolean producer, JobConfiguration configuration, JobRunContext callback, Publisher<Object> resultPublisher) {
Object result = Flux.from(resultPublisher).blockFirst();

if (result == null) {
callback.finished();
return;
}

if (!producer && result instanceof Publisher<?> p) {
Mono.from(p)
.doOnNext(callback::result)
.doOnError(callback::error)
.doFinally(signalType -> callback.finished())
.block();

return;
}


String queueName = configuration.getProducer().getQueueName();

JobQueues sender = queues(configuration.getProducer().getQueueType());
Expand Down

0 comments on commit d817540

Please sign in to comment.