Skip to content

Commit

Permalink
Merge pull request #28 from mvSapphire/FixedParallel
Browse files Browse the repository at this point in the history
Fixed parallel condition steps execution; Added check is cancellation…
  • Loading branch information
mvSapphire authored Sep 21, 2023
2 parents 25f3763 + a300485 commit e923082
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 8 deletions.
6 changes: 2 additions & 4 deletions samples/PowerPipe.Sample/SamplePipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,18 @@ public SamplePipeline(IPipelineStepFactory pipelineStepFactory)
public IPipeline<SamplePipelineResult> SetupPipeline()
{
var context = new SamplePipelineContext();


var builder = new PipelineBuilder<SamplePipelineContext, SamplePipelineResult>(_pipelineStepFactory, context)
.Parallel(b => b
.Add<SampleParallelStep1>()
.AddIf<SampleParallelStep1>(_ => false)
.Add<SampleParallelStep2>()
.Add<SampleParallelStep3>()
.Add<SampleParallelStep4>()
.OnError(PipelineStepErrorHandling.Suppress)
.Add<SampleParallelStep5>()
.OnError(PipelineStepErrorHandling.Retry)
.CompensateWith<SampleParallelStep5Compensation>()
.Add<SampleParallelStep6>()
.Add<SampleParallelStep7>())
.AddIfElse<SampleParallelStep6, SampleParallelStep7>(_ => false))
.Add<SampleStep1>()
.CompensateWith<SampleStep1Compensation>()
.AddIf<SampleStep2>(Step2ExecutionAllowed)
Expand Down
3 changes: 2 additions & 1 deletion src/PowerPipe/Builder/Steps/AddIfStep.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ protected override async Task ExecuteInternalAsync(TContext context, Cancellatio
}
else
{
await NextStep.ExecuteAsync(context, cancellationToken);
if (NextStep is not null)
await NextStep.ExecuteAsync(context, cancellationToken);
}
}
}
3 changes: 2 additions & 1 deletion src/PowerPipe/Builder/Steps/IfPipelineStep.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ protected override async Task ExecuteInternalAsync(TContext context, Cancellatio
await _pipelineBuilder.Build().RunAsync(cancellationToken, returnResult: false);
}

await NextStep.ExecuteAsync(context, cancellationToken);
if (NextStep is not null)
await NextStep.ExecuteAsync(context, cancellationToken);
}
}
4 changes: 3 additions & 1 deletion src/PowerPipe/Builder/Steps/InternalStep.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,13 @@ public async Task ExecuteAsync(TContext context, CancellationToken cancellationT
{
try
{
cancellationToken.ThrowIfCancellationRequested();

await ExecuteInternalAsync(context, cancellationToken);
}
catch (Exception e)
{
if (e is PipelineExecutionException)
if (e is PipelineExecutionException or OperationCanceledException)
{
ErrorHandledSucceed = false;
throw;
Expand Down
3 changes: 2 additions & 1 deletion src/PowerPipe/Builder/Steps/ParallelStep.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ protected override async Task ExecuteInternalAsync(TContext context, Cancellatio

await Parallel.ForEachAsync(_pipelineBuilder.Steps, parallelOptions, async (step, token) => await step.ExecuteAsync(context, token));

await NextStep.ExecuteAsync(context, cancellationToken);
if (NextStep is not null)
await NextStep.ExecuteAsync(context, cancellationToken);
}
}

0 comments on commit e923082

Please sign in to comment.