diff --git a/src/core/Akka.Streams/Implementation/Fusing/Ops.cs b/src/core/Akka.Streams/Implementation/Fusing/Ops.cs index a02bb0fba1a..c24c8ab02fb 100644 --- a/src/core/Akka.Streams/Implementation/Fusing/Ops.cs +++ b/src/core/Akka.Streams/Implementation/Fusing/Ops.cs @@ -2529,13 +2529,15 @@ private class Holder { private readonly Action> _callback; - public Holder(Result element, Action> callback) + public Holder(object message, Result element, Action> callback) { _callback = callback; + Message = message; Element = element; } public Result Element { get; private set; } + public object Message { get; } public void SetElement(Result result) { @@ -2575,7 +2577,7 @@ public override void OnPush() try { var task = _stage._mapFunc(message); - var holder = new Holder(NotYetThere, _taskCallback); + var holder = new Holder(message, NotYetThere, _taskCallback); _buffer.Enqueue(holder); // We dispatch the task if it's ready to optimize away @@ -2642,9 +2644,29 @@ private void PushOne() } else { - var result = _buffer.Dequeue().Element; + var holder = _buffer.Dequeue(); + var result = holder.Element; if (!result.IsSuccess) + { + // this could happen if we are looping in PushOne and end up on a failed Task before the + // HolderCompleted callback has run + var strategy = _decider(result.Exception); + Log.Error(result.Exception, "An exception occured inside SelectAsync while processing message [{0}]. Supervision strategy: {1}", holder.Message, strategy); + switch (strategy) + { + case Directive.Stop: + FailStage(result.Exception); + return; + + case Directive.Resume: + case Directive.Restart: + break; + + default: + throw new AggregateException($"Unknown SupervisionStrategy directive: {strategy}", result.Exception); + } continue; + } Push(_stage.Out, result.Value); if (Todo < _stage._parallelism && !HasBeenPulled(inlet))