diff --git a/AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/Append.cs b/AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/Append.cs index 880969eb95..dc636309c8 100644 --- a/AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/Append.cs +++ b/AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/Append.cs @@ -193,7 +193,30 @@ public static (IAsyncObserver, IAsyncDisposable) Append(IAsync if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); - throw new NotImplementedException(); + var d = new SingleAssignmentAsyncDisposable(); + + return + ( + Create( + observer.OnNextAsync, + observer.OnErrorAsync, + async () => + { + var task = await scheduler.ScheduleAsync(async ct => + { + for (var i = 0; i < values.Length && !ct.IsCancellationRequested; i++) + { + await observer.OnNextAsync(values[i]).RendezVous(scheduler, ct); + } + + await observer.OnCompletedAsync().RendezVous(scheduler, ct); + }).ConfigureAwait(false); + + await d.AssignAsync(task).ConfigureAwait(false); + } + ), + d + ); } public static IAsyncObserver Append(IAsyncObserver observer, IEnumerable values) @@ -227,7 +250,70 @@ public static (IAsyncObserver, IAsyncDisposable) Append(IAsync if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); - throw new NotImplementedException(); + var d = new SingleAssignmentAsyncDisposable(); + + return + ( + Create( + observer.OnNextAsync, + observer.OnErrorAsync, + async () => + { + var task = await scheduler.ScheduleAsync(async ct => + { + var e = default(IEnumerator); + + try + { + e = values.GetEnumerator(); + } + catch (Exception ex) + { + await observer.OnErrorAsync(ex).RendezVous(scheduler, ct); + return; + } + + using (e) + { + while (!ct.IsCancellationRequested) + { + var b = default(bool); + var value = default(TSource); + + try + { + b = e.MoveNext(); + + if (b) + { + value = e.Current; + } + } + catch (Exception ex) + { + await observer.OnErrorAsync(ex).RendezVous(scheduler, ct); + return; + } + + if (b) + { + await observer.OnNextAsync(value).RendezVous(scheduler, ct); + } + else + { + break; + } + } + } + + await observer.OnCompletedAsync().RendezVous(scheduler, ct); + }).ConfigureAwait(false); + + await d.AssignAsync(task).ConfigureAwait(false); + } + ), + d + ); } } }