From fb8d2888f377163eb439280c5efa1e99cba251c0 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Fri, 23 Aug 2024 23:53:48 +0700 Subject: [PATCH] Fix missing AlsoTo public APIs --- ...APISpec.ApproveStreams.DotNet.verified.txt | 5 ++ ...oreAPISpec.ApproveStreams.Net.verified.txt | 5 ++ src/core/Akka.Streams/Dsl/FlowOperations.cs | 61 +++++++++++++++++- .../Dsl/Internal/InternalFlowOperations.cs | 38 ----------- src/core/Akka.Streams/Dsl/SourceOperations.cs | 31 ++++++++- .../Akka.Streams/Dsl/SubFlowOperations.cs | 63 +++++++++++++++++-- 6 files changed, 155 insertions(+), 48 deletions(-) diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt index 7426f505ba9..3167a0d0445 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt @@ -1361,7 +1361,9 @@ namespace Akka.Streams.Dsl public static Akka.Streams.Dsl.Flow Aggregate(this Akka.Streams.Dsl.Flow flow, TOut2 zero, System.Func fold) { } public static Akka.Streams.Dsl.Flow AggregateAsync(this Akka.Streams.Dsl.Flow flow, TOut2 zero, System.Func> fold) { } public static Akka.Streams.Dsl.Flow AlsoTo(this Akka.Streams.Dsl.Flow flow, Akka.Streams.IGraph, TMat> that) { } + public static Akka.Streams.Dsl.Flow AlsoTo(this Akka.Streams.Dsl.Flow flow, Akka.Streams.IGraph, TMat> that, bool propagateFailure) { } public static Akka.Streams.Dsl.Flow AlsoToMaterialized(this Akka.Streams.Dsl.Flow flow, Akka.Streams.IGraph, TMat2> that, System.Func materializerFunction) { } + public static Akka.Streams.Dsl.Flow AlsoToMaterialized(this Akka.Streams.Dsl.Flow flow, Akka.Streams.IGraph, TMat2> that, System.Func materializerFunction, bool propagateFailure) { } public static Akka.Streams.Dsl.FlowWithContext AsFlowWithContext(this Akka.Streams.Dsl.Flow flow, System.Func collapseContext, System.Func extractContext) { } public static Akka.Streams.Dsl.Flow BackpressureTimeout(this Akka.Streams.Dsl.Flow flow, System.TimeSpan timeout) { } public static Akka.Streams.Dsl.Flow Batch(this Akka.Streams.Dsl.Flow flow, long max, System.Func seed, System.Func aggregate) { } @@ -2061,6 +2063,7 @@ namespace Akka.Streams.Dsl public static Akka.Streams.Dsl.Source AlsoTo(this Akka.Streams.Dsl.Source flow, Akka.Streams.IGraph, TMat> that) { } public static Akka.Streams.Dsl.Source AlsoTo(this Akka.Streams.Dsl.Source flow, Akka.Streams.IGraph, TMat> that, bool propagateFailure) { } public static Akka.Streams.Dsl.Source AlsoToMaterialized(this Akka.Streams.Dsl.Source flow, Akka.Streams.IGraph, TMat2> that, System.Func materializerFunction) { } + public static Akka.Streams.Dsl.Source AlsoToMaterialized(this Akka.Streams.Dsl.Source flow, Akka.Streams.IGraph, TMat2> that, System.Func materializerFunction, bool propagateFailure) { } public static Akka.Streams.Dsl.SourceWithContext AsSourceWithContext(this Akka.Streams.Dsl.Source flow, System.Func fn) { } public static Akka.Streams.Dsl.Source BackpressureTimeout(this Akka.Streams.Dsl.Source flow, System.TimeSpan timeout) { } public static Akka.Streams.Dsl.Source Batch(this Akka.Streams.Dsl.Source flow, long max, System.Func seed, System.Func aggregate) { } @@ -2263,7 +2266,9 @@ namespace Akka.Streams.Dsl public static Akka.Streams.Dsl.SubFlow Aggregate(this Akka.Streams.Dsl.SubFlow flow, TOut2 zero, System.Func fold) { } public static Akka.Streams.Dsl.SubFlow AggregateAsync(this Akka.Streams.Dsl.SubFlow flow, TOut zero, System.Func> fold) { } public static Akka.Streams.Dsl.SubFlow AlsoTo(this Akka.Streams.Dsl.SubFlow flow, Akka.Streams.IGraph, TMat> that) { } + public static Akka.Streams.Dsl.SubFlow AlsoTo(this Akka.Streams.Dsl.SubFlow flow, Akka.Streams.IGraph, TMat> that, bool propagateFailure) { } public static Akka.Streams.Dsl.SubFlow AlsoToMaterialized(this Akka.Streams.Dsl.SubFlow flow, Akka.Streams.IGraph, TMat2> that, System.Func materializerFunction) { } + public static Akka.Streams.Dsl.SubFlow AlsoToMaterialized(this Akka.Streams.Dsl.SubFlow flow, Akka.Streams.IGraph, TMat2> that, System.Func materializerFunction, bool propagateFailure) { } public static Akka.Streams.Dsl.SubFlow BackpressureTimeout(this Akka.Streams.Dsl.SubFlow flow, System.TimeSpan timeout) { } public static Akka.Streams.Dsl.SubFlow Batch(this Akka.Streams.Dsl.SubFlow flow, long max, System.Func seed, System.Func aggregate) { } public static Akka.Streams.Dsl.SubFlow BatchWeighted(this Akka.Streams.Dsl.SubFlow flow, long max, System.Func costFunction, System.Func seed, System.Func aggregate) { } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt index c3a9e8a63fe..ce092ba63d1 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt @@ -1360,7 +1360,9 @@ namespace Akka.Streams.Dsl public static Akka.Streams.Dsl.Flow Aggregate(this Akka.Streams.Dsl.Flow flow, TOut2 zero, System.Func fold) { } public static Akka.Streams.Dsl.Flow AggregateAsync(this Akka.Streams.Dsl.Flow flow, TOut2 zero, System.Func> fold) { } public static Akka.Streams.Dsl.Flow AlsoTo(this Akka.Streams.Dsl.Flow flow, Akka.Streams.IGraph, TMat> that) { } + public static Akka.Streams.Dsl.Flow AlsoTo(this Akka.Streams.Dsl.Flow flow, Akka.Streams.IGraph, TMat> that, bool propagateFailure) { } public static Akka.Streams.Dsl.Flow AlsoToMaterialized(this Akka.Streams.Dsl.Flow flow, Akka.Streams.IGraph, TMat2> that, System.Func materializerFunction) { } + public static Akka.Streams.Dsl.Flow AlsoToMaterialized(this Akka.Streams.Dsl.Flow flow, Akka.Streams.IGraph, TMat2> that, System.Func materializerFunction, bool propagateFailure) { } public static Akka.Streams.Dsl.FlowWithContext AsFlowWithContext(this Akka.Streams.Dsl.Flow flow, System.Func collapseContext, System.Func extractContext) { } public static Akka.Streams.Dsl.Flow BackpressureTimeout(this Akka.Streams.Dsl.Flow flow, System.TimeSpan timeout) { } public static Akka.Streams.Dsl.Flow Batch(this Akka.Streams.Dsl.Flow flow, long max, System.Func seed, System.Func aggregate) { } @@ -2060,6 +2062,7 @@ namespace Akka.Streams.Dsl public static Akka.Streams.Dsl.Source AlsoTo(this Akka.Streams.Dsl.Source flow, Akka.Streams.IGraph, TMat> that) { } public static Akka.Streams.Dsl.Source AlsoTo(this Akka.Streams.Dsl.Source flow, Akka.Streams.IGraph, TMat> that, bool propagateFailure) { } public static Akka.Streams.Dsl.Source AlsoToMaterialized(this Akka.Streams.Dsl.Source flow, Akka.Streams.IGraph, TMat2> that, System.Func materializerFunction) { } + public static Akka.Streams.Dsl.Source AlsoToMaterialized(this Akka.Streams.Dsl.Source flow, Akka.Streams.IGraph, TMat2> that, System.Func materializerFunction, bool propagateFailure) { } public static Akka.Streams.Dsl.SourceWithContext AsSourceWithContext(this Akka.Streams.Dsl.Source flow, System.Func fn) { } public static Akka.Streams.Dsl.Source BackpressureTimeout(this Akka.Streams.Dsl.Source flow, System.TimeSpan timeout) { } public static Akka.Streams.Dsl.Source Batch(this Akka.Streams.Dsl.Source flow, long max, System.Func seed, System.Func aggregate) { } @@ -2262,7 +2265,9 @@ namespace Akka.Streams.Dsl public static Akka.Streams.Dsl.SubFlow Aggregate(this Akka.Streams.Dsl.SubFlow flow, TOut2 zero, System.Func fold) { } public static Akka.Streams.Dsl.SubFlow AggregateAsync(this Akka.Streams.Dsl.SubFlow flow, TOut zero, System.Func> fold) { } public static Akka.Streams.Dsl.SubFlow AlsoTo(this Akka.Streams.Dsl.SubFlow flow, Akka.Streams.IGraph, TMat> that) { } + public static Akka.Streams.Dsl.SubFlow AlsoTo(this Akka.Streams.Dsl.SubFlow flow, Akka.Streams.IGraph, TMat> that, bool propagateFailure) { } public static Akka.Streams.Dsl.SubFlow AlsoToMaterialized(this Akka.Streams.Dsl.SubFlow flow, Akka.Streams.IGraph, TMat2> that, System.Func materializerFunction) { } + public static Akka.Streams.Dsl.SubFlow AlsoToMaterialized(this Akka.Streams.Dsl.SubFlow flow, Akka.Streams.IGraph, TMat2> that, System.Func materializerFunction, bool propagateFailure) { } public static Akka.Streams.Dsl.SubFlow BackpressureTimeout(this Akka.Streams.Dsl.SubFlow flow, System.TimeSpan timeout) { } public static Akka.Streams.Dsl.SubFlow Batch(this Akka.Streams.Dsl.SubFlow flow, long max, System.Func seed, System.Func aggregate) { } public static Akka.Streams.Dsl.SubFlow BatchWeighted(this Akka.Streams.Dsl.SubFlow flow, long max, System.Func costFunction, System.Func seed, System.Func aggregate) { } diff --git a/src/core/Akka.Streams/Dsl/FlowOperations.cs b/src/core/Akka.Streams/Dsl/FlowOperations.cs index eb14fe3455a..695d72cac98 100644 --- a/src/core/Akka.Streams/Dsl/FlowOperations.cs +++ b/src/core/Akka.Streams/Dsl/FlowOperations.cs @@ -1840,7 +1840,7 @@ public static Flow Throttle(this Flow to this , meaning that elements that passes /// through will also be sent to the . /// - /// @see + /// @see /// /// It is recommended to use the internally optimized and combiners /// where appropriate instead of manually writing functions that pass through one of the values. @@ -1858,7 +1858,35 @@ public static Flow AlsoToMaterialized flow, IGraph, TMat2> that, Func materializerFunction) { - return (Flow)InternalFlowOperations.AlsoToMaterialized(flow, that, materializerFunction); + return (Flow)InternalFlowOperations.AlsoToMaterialized(flow, that, materializerFunction, false); + } + + /// + /// Attaches the given to this , meaning that elements that passes + /// through will also be sent to the . + /// + /// @see + /// + /// It is recommended to use the internally optimized and combiners + /// where appropriate instead of manually writing functions that pass through one of the values. + /// + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + /// Propagate downstream failures and cancels parent stream + /// TBD + public static Flow AlsoToMaterialized( + this Flow flow, + IGraph, TMat2> that, + Func materializerFunction, + bool propagateFailure) + { + return (Flow)InternalFlowOperations.AlsoToMaterialized(flow, that, materializerFunction, propagateFailure); } /// @@ -1881,7 +1909,34 @@ public static Flow AlsoToMaterializedTBD public static Flow AlsoTo(this Flow flow, IGraph, TMat> that) { - return (Flow)InternalFlowOperations.AlsoTo(flow, that); + return (Flow)InternalFlowOperations.AlsoTo(flow, that, false); + } + + /// + /// Attaches the given to this , meaning that elements that passes + /// through will also be sent to the . + /// + /// Emits when element is available and demand exists both from the Sink and the downstream. + /// + /// Backpressures when downstream or Sink backpressures + /// + /// Completes when upstream completes + /// + /// Cancels when downstream cancels + /// + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + /// Propagate downstream failures and cancels parent stream + /// TBD + public static Flow AlsoTo( + this Flow flow, + IGraph, TMat> that, + bool propagateFailure) + { + return (Flow)InternalFlowOperations.AlsoTo(flow, that, propagateFailure); } /// diff --git a/src/core/Akka.Streams/Dsl/Internal/InternalFlowOperations.cs b/src/core/Akka.Streams/Dsl/Internal/InternalFlowOperations.cs index b161d22a406..f9488f7dca6 100644 --- a/src/core/Akka.Streams/Dsl/Internal/InternalFlowOperations.cs +++ b/src/core/Akka.Streams/Dsl/Internal/InternalFlowOperations.cs @@ -2515,22 +2515,6 @@ private static IGraph, TMat> OrElseGraph(IGraph OrElseMaterialized(this IFlow flow, IGraph, TMat2> secondary, Func materializedFunction) => flow.ViaMaterialized(OrElseGraph(secondary), materializedFunction); - /// - /// Attaches the given to this , meaning that elements that passes - /// through will also be sent to the . - /// - /// @see - /// - /// It is recommended to use the internally optimized and combiners - /// where appropriate instead of manually writing functions that pass through one of the values. - /// - public static IFlow AlsoToMaterialized( - this IFlow flow, IGraph, TMat2> that, - Func materializerFunction) - { - return flow.ViaMaterialized(AlsoToGraph(that, false), materializerFunction); - } - /// /// Attaches the given to this , meaning that elements that passes /// through will also be sent to the . @@ -2547,28 +2531,6 @@ public static IFlow AlsoToMaterialized( return flow.ViaMaterialized(AlsoToGraph(that, propagateFailure), materializerFunction); } - /// - /// Attaches the given to this , meaning that elements that passes - /// through will also be sent to the . - /// - /// Emits when element is available and demand exists both from the Sink and the downstream. - /// - /// Backpressures when downstream or Sink backpressures - /// - /// Completes when upstream completes - /// - /// Cancels when downstream cancels - /// - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD - public static IFlow AlsoTo(this IFlow flow, IGraph, TMat> that) - { - return flow.Via(AlsoToGraph(that, false)); - } - /// /// Attaches the given to this , meaning that elements that passes /// through will also be sent to the . diff --git a/src/core/Akka.Streams/Dsl/SourceOperations.cs b/src/core/Akka.Streams/Dsl/SourceOperations.cs index c01f3f5acea..5543a94378e 100644 --- a/src/core/Akka.Streams/Dsl/SourceOperations.cs +++ b/src/core/Akka.Streams/Dsl/SourceOperations.cs @@ -1706,7 +1706,7 @@ public static Source Throttle(this Source fl /// Attaches the given to this , meaning that elements that passes /// through will also be sent to the . /// - /// @see + /// @see /// /// It is recommended to use the internally optimized and combiners /// where appropriate instead of manually writing functions that pass through one of the values. @@ -1723,7 +1723,32 @@ public static Source AlsoToMaterialized( this Source flow, IGraph, TMat2> that, Func materializerFunction) { - return (Source)InternalFlowOperations.AlsoToMaterialized(flow, that, materializerFunction); + return (Source)InternalFlowOperations.AlsoToMaterialized(flow, that, materializerFunction, false); + } + + /// + /// Attaches the given to this , meaning that elements that passes + /// through will also be sent to the . + /// + /// @see + /// + /// It is recommended to use the internally optimized and combiners + /// where appropriate instead of manually writing functions that pass through one of the values. + /// + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + /// Propagate downstream failures and cancels parent stream + /// TBD + public static Source AlsoToMaterialized( + this Source flow, IGraph, TMat2> that, + Func materializerFunction, bool propagateFailure) + { + return (Source)InternalFlowOperations.AlsoToMaterialized(flow, that, materializerFunction, propagateFailure); } /// @@ -1745,7 +1770,7 @@ public static Source AlsoToMaterialized( /// TBD public static Source AlsoTo(this Source flow, IGraph, TMat> that) { - return (Source)InternalFlowOperations.AlsoTo(flow, that); + return (Source)InternalFlowOperations.AlsoTo(flow, that, false); } /// diff --git a/src/core/Akka.Streams/Dsl/SubFlowOperations.cs b/src/core/Akka.Streams/Dsl/SubFlowOperations.cs index 1e789f1bcc7..b495d8a3fde 100644 --- a/src/core/Akka.Streams/Dsl/SubFlowOperations.cs +++ b/src/core/Akka.Streams/Dsl/SubFlowOperations.cs @@ -1542,7 +1542,7 @@ public static SubFlow Throttle(this Su /// Attaches the given to this , meaning that elements that passes /// through will also be sent to the . /// - /// @see + /// @see /// /// It is recommended to use the internally optimized and combiners /// where appropriate instead of manually writing functions that pass through one of the values. @@ -1560,7 +1560,35 @@ public static SubFlow AlsoToMaterialized flow, IGraph, TMat2> that, Func materializerFunction) { - return (SubFlow) InternalFlowOperations.AlsoToMaterialized(flow, that, materializerFunction); + return (SubFlow) InternalFlowOperations.AlsoToMaterialized(flow, that, materializerFunction, false); + } + + /// + /// Attaches the given to this , meaning that elements that passes + /// through will also be sent to the . + /// + /// @see + /// + /// It is recommended to use the internally optimized and combiners + /// where appropriate instead of manually writing functions that pass through one of the values. + /// + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + /// Propagate downstream failures and cancels parent stream + /// TBD + public static SubFlow AlsoToMaterialized( + this SubFlow flow, + IGraph, TMat2> that, + Func materializerFunction, + bool propagateFailure) + { + return (SubFlow) InternalFlowOperations.AlsoToMaterialized(flow, that, materializerFunction, propagateFailure); } /// @@ -1583,7 +1611,34 @@ public static SubFlow AlsoToMaterializedTBD public static SubFlow AlsoTo(this SubFlow flow, IGraph, TMat> that) { - return (SubFlow) InternalFlowOperations.AlsoTo(flow, that); + return (SubFlow) InternalFlowOperations.AlsoTo(flow, that, false); + } + + /// + /// Attaches the given to this , meaning that elements that passes + /// through will also be sent to the . + /// + /// Emits when element is available and demand exists both from the Sink and the downstream. + /// + /// Backpressures when downstream or Sink backpressures + /// + /// Completes when upstream completes + /// + /// Cancels when downstream cancels + /// + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + /// Propagate downstream failures and cancels parent stream + /// TBD + public static SubFlow AlsoTo( + this SubFlow flow, + IGraph, TMat> that, + bool propagateFailure) + { + return (SubFlow) InternalFlowOperations.AlsoTo(flow, that, propagateFailure); } /// @@ -1592,7 +1647,7 @@ public static SubFlow AlsoTo(this SubF /// through will also be sent to the wire-tap Sink, without the latter affecting the mainline flow. If the wire-tap Sink backpressures, /// elements that would've been sent to it will be dropped instead. /// - /// It is similar to which does backpressure instead of dropping elements. + /// It is similar to which does backpressure instead of dropping elements. /// Emits when element is available and demand exists from the downstream; the element will also be sent to the wire-tap Sink if there is demand. /// Backpressures when downstream backpressures /// Completes when upstream completes