Skip to content

Commit

Permalink
Explicit handling of nested splits in SplitState
Browse files Browse the repository at this point in the history
Resolves #3857
  • Loading branch information
hpoettker authored and fmbenhassine committed Jul 18, 2023
1 parent 5e849c6 commit 58a4134
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -301,9 +301,9 @@ else if (input instanceof Flow) {
return result;
}

private SplitState createState(Collection<Flow> flows, TaskExecutor executor) {
private SplitState createState(Collection<Flow> flows, TaskExecutor executor, SplitState parentSplit) {
if (!states.containsKey(flows)) {
states.put(flows, new SplitState(flows, prefix + "split" + (splitCounter++)));
states.put(flows, new SplitState(flows, prefix + "split" + (splitCounter++), parentSplit));
}
SplitState result = (SplitState) states.get(flows);
if (executor != null) {
Expand Down Expand Up @@ -627,23 +627,23 @@ public SplitBuilder(FlowBuilder<Q> parent, TaskExecutor executor) {
public FlowBuilder<Q> add(Flow... flows) {
Collection<Flow> list = new ArrayList<>(Arrays.asList(flows));
String name = "split" + (parent.splitCounter++);
int counter = 0;
State one = parent.currentState;
Flow flow = null;

if (one instanceof SplitState) {
parent.currentState = parent.createState(list, executor, (SplitState) one);
return parent;
}

if (!(one == null || one instanceof FlowState)) {
FlowBuilder<Flow> stateBuilder = new FlowBuilder<>(name + "_" + (counter++));
FlowBuilder<Flow> stateBuilder = new FlowBuilder<>(name + "_0");
stateBuilder.currentState = one;
flow = stateBuilder.build();
list.add(stateBuilder.build());
}
else if (one instanceof FlowState && parent.states.size() == 1) {
list.add(((FlowState) one).getFlows().iterator().next());
}

if (flow != null) {
list.add(flow);
}
State next = parent.createState(list, executor);
parent.currentState = next;
parent.currentState = parent.createState(list, executor, null);
return parent;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
package org.springframework.batch.core.job.flow.support.state;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
Expand All @@ -31,6 +33,7 @@
import org.springframework.core.task.SyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.core.task.TaskRejectedException;
import org.springframework.lang.Nullable;

/**
* A {@link State} implementation that splits a {@link Flow} into multiple parallel
Expand All @@ -44,6 +47,8 @@ public class SplitState extends AbstractState implements FlowHolder {

private final Collection<Flow> flows;

private final SplitState parentSplit;

private TaskExecutor taskExecutor = new SyncTaskExecutor();

private final FlowExecutionAggregator aggregator = new MaxValueFlowExecutionAggregator();
Expand All @@ -53,8 +58,18 @@ public class SplitState extends AbstractState implements FlowHolder {
* @param name the name of the state.
*/
public SplitState(Collection<Flow> flows, String name) {
this(flows, name, null);
}

/**
* @param flows collection of {@link Flow} instances.
* @param name the name of the state.
* @param parentSplit the parent {@link SplitState}.
*/
public SplitState(Collection<Flow> flows, String name, @Nullable SplitState parentSplit) {
super(name);
this.flows = flows;
this.parentSplit = parentSplit;
}

/**
Expand Down Expand Up @@ -101,6 +116,8 @@ public FlowExecutionStatus handle(final FlowExecutor executor) throws Exception

}

FlowExecutionStatus parentSplitStatus = parentSplit == null ? null : parentSplit.handle(executor);

Collection<FlowExecution> results = new ArrayList<>();

// Could use a CompletionService here?
Expand All @@ -120,7 +137,11 @@ public FlowExecutionStatus handle(final FlowExecutor executor) throws Exception
}
}

return doAggregation(results, executor);
FlowExecutionStatus flowExecutionStatus = doAggregation(results, executor);
if (parentSplitStatus != null) {
return Collections.max(Arrays.asList(flowExecutionStatus, parentSplitStatus));
}
return flowExecutionStatus;
}

protected FlowExecutionStatus doAggregation(Collection<FlowExecution> results, FlowExecutor executor) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2012-2022 the original author or authors.
* Copyright 2012-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -38,6 +38,7 @@
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.job.flow.FlowExecutionStatus;
import org.springframework.batch.core.job.flow.JobExecutionDecider;
import org.springframework.batch.core.job.flow.support.SimpleFlow;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;
Expand Down Expand Up @@ -171,6 +172,22 @@ void testBuildSplit() {
assertEquals(2, execution.getStepExecutions().size());
}

@Test
void testNestedSplitsWithSingleThread() {
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
taskExecutor.setConcurrencyLimit(1);

FlowBuilder<SimpleFlow> flowBuilder = new FlowBuilder<>("flow");
FlowBuilder.SplitBuilder<SimpleFlow> splitBuilder = flowBuilder.split(taskExecutor);
splitBuilder.add(new FlowBuilder<Flow>("subflow1").from(step1).end());
splitBuilder.add(new FlowBuilder<Flow>("subflow2").from(step2).end());
Job job = new JobBuilder("job").repository(jobRepository).start(flowBuilder.build()).end().build();
job.execute(execution);

assertEquals(BatchStatus.COMPLETED, execution.getStatus());
assertEquals(2, execution.getStepExecutions().size());
}

@Test
void testBuildSplitUsingStartAndAdd_BATCH_2346() {
Flow subflow1 = new FlowBuilder<Flow>("subflow1").from(step2).end();
Expand Down

0 comments on commit 58a4134

Please sign in to comment.