Skip to content

Commit

Permalink
Implement more Prepend overloads.
Browse files Browse the repository at this point in the history
  • Loading branch information
bartdesmet committed Sep 26, 2017
1 parent 60fb0ab commit 9eaa572
Showing 1 changed file with 130 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

using System.Collections.Generic;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Threading.Tasks;

namespace System.Reactive.Linq
Expand All @@ -30,7 +31,7 @@ public static IAsyncObservable<TSource> Prepend<TSource>(this IAsyncObservable<T
if (scheduler == null)
throw new ArgumentNullException(nameof(scheduler));

return Create<TSource>(async observer => await source.SubscribeSafeAsync(await AsyncObserver.Prepend(observer, value, scheduler).ConfigureAwait(false)).ConfigureAwait(false));
return Create<TSource>(observer => AsyncObserver.Prepend(observer, source, value, scheduler));
}

public static IAsyncObservable<TSource> Prepend<TSource>(this IAsyncObservable<TSource> source, params TSource[] values)
Expand All @@ -52,7 +53,7 @@ public static IAsyncObservable<TSource> Prepend<TSource>(this IAsyncObservable<T
if (values == null)
throw new ArgumentNullException(nameof(values));

return Create<TSource>(async observer => await source.SubscribeSafeAsync(await AsyncObserver.Prepend(observer, scheduler, values).ConfigureAwait(false)).ConfigureAwait(false));
return Create<TSource>(observer => AsyncObserver.Prepend(observer, source, scheduler, values));
}

public static IAsyncObservable<TSource> Prepend<TSource>(this IAsyncObservable<TSource> source, IEnumerable<TSource> values)
Expand All @@ -74,12 +75,14 @@ public static IAsyncObservable<TSource> Prepend<TSource>(this IAsyncObservable<T
if (values == null)
throw new ArgumentNullException(nameof(values));

return Create<TSource>(async observer => await source.SubscribeSafeAsync(await AsyncObserver.Prepend(observer, scheduler, values).ConfigureAwait(false)).ConfigureAwait(false));
return Create<TSource>(observer => AsyncObserver.Prepend(observer, source, scheduler, values));
}
}

partial class AsyncObserver
{
// REVIEW: There's some asymmetry on these overloads. Should standardize to Concat style.

public static Task<IAsyncObserver<TSource>> Prepend<TSource>(IAsyncObserver<TSource> observer, TSource value)
{
if (observer == null)
Expand All @@ -89,20 +92,43 @@ public static Task<IAsyncObserver<TSource>> Prepend<TSource>(IAsyncObserver<TSou

async Task<IAsyncObserver<TSource>> Core()
{
await observer.OnNextAsync(value);
await observer.OnNextAsync(value).ConfigureAwait(false);

return observer;
}
}

public static Task<IAsyncObserver<TSource>> Prepend<TSource>(IAsyncObserver<TSource> observer, TSource value, IAsyncScheduler scheduler)
public static Task<IAsyncDisposable> Prepend<TSource>(IAsyncObserver<TSource> observer, IAsyncObservable<TSource> source, TSource value, IAsyncScheduler scheduler)
{
if (observer == null)
throw new ArgumentNullException(nameof(observer));
if (source == null)
throw new ArgumentNullException(nameof(source));
if (scheduler == null)
throw new ArgumentNullException(nameof(scheduler));

throw new NotImplementedException();
return Core();

async Task<IAsyncDisposable> Core()
{
var subscription = new SingleAssignmentAsyncDisposable();

var task = await scheduler.ScheduleAsync(async ct =>
{
if (ct.IsCancellationRequested)
return;

await observer.OnNextAsync(value).RendezVous(scheduler, ct);

if (ct.IsCancellationRequested)
return;

var inner = await source.SubscribeSafeAsync(observer).ConfigureAwait(false);
await subscription.AssignAsync(inner).RendezVous(scheduler, ct);
}).ConfigureAwait(false);

return StableCompositeAsyncDisposable.Create(task, subscription);
}
}

public static Task<IAsyncObserver<TSource>> Prepend<TSource>(IAsyncObserver<TSource> observer, params TSource[] values)
Expand All @@ -118,23 +144,49 @@ async Task<IAsyncObserver<TSource>> Core()
{
foreach (var value in values)
{
await observer.OnNextAsync(value);
await observer.OnNextAsync(value).ConfigureAwait(false);
}

return observer;
}
}

public static Task<IAsyncObserver<TSource>> Prepend<TSource>(IAsyncObserver<TSource> observer, IAsyncScheduler scheduler, params TSource[] values)
public static Task<IAsyncDisposable> Prepend<TSource>(IAsyncObserver<TSource> observer, IAsyncObservable<TSource> source, IAsyncScheduler scheduler, params TSource[] values)
{
if (observer == null)
throw new ArgumentNullException(nameof(observer));
if (source == null)
throw new ArgumentNullException(nameof(source));
if (scheduler == null)
throw new ArgumentNullException(nameof(scheduler));
if (values == null)
throw new ArgumentNullException(nameof(values));

throw new NotImplementedException();
return Core();

async Task<IAsyncDisposable> Core()
{
var subscription = new SingleAssignmentAsyncDisposable();

var task = await scheduler.ScheduleAsync(async ct =>
{
if (ct.IsCancellationRequested)
return;

for (var i = 0; i < values.Length && !ct.IsCancellationRequested; i++)
{
await observer.OnNextAsync(values[i]).RendezVous(scheduler, ct);
}

if (ct.IsCancellationRequested)
return;

var inner = await source.SubscribeSafeAsync(observer).ConfigureAwait(false);
await subscription.AssignAsync(inner).RendezVous(scheduler, ct);
}).ConfigureAwait(false);

return StableCompositeAsyncDisposable.Create(task, subscription);
}
}

public static Task<IAsyncObserver<TSource>> Prepend<TSource>(IAsyncObserver<TSource> observer, IEnumerable<TSource> values)
Expand All @@ -150,23 +202,89 @@ async Task<IAsyncObserver<TSource>> Core()
{
foreach (var value in values)
{
await observer.OnNextAsync(value);
await observer.OnNextAsync(value).ConfigureAwait(false);
}

return observer;
}
}

public static Task<IAsyncObserver<TSource>> Prepend<TSource>(IAsyncObserver<TSource> observer, IAsyncScheduler scheduler, IEnumerable<TSource> values)
public static Task<IAsyncDisposable> Prepend<TSource>(IAsyncObserver<TSource> observer, IAsyncObservable<TSource> source, IAsyncScheduler scheduler, IEnumerable<TSource> values)
{
if (observer == null)
throw new ArgumentNullException(nameof(observer));
if (source == null)
throw new ArgumentNullException(nameof(source));
if (scheduler == null)
throw new ArgumentNullException(nameof(scheduler));
if (values == null)
throw new ArgumentNullException(nameof(values));

throw new NotImplementedException();
return Core();

async Task<IAsyncDisposable> Core()
{
var subscription = new SingleAssignmentAsyncDisposable();

var task = await scheduler.ScheduleAsync(async ct =>
{
if (ct.IsCancellationRequested)
return;

var e = default(IEnumerator<TSource>);

try
{
e = values.GetEnumerator();
}
catch (Exception ex)
{
await observer.OnErrorAsync(ex).RendezVous(scheduler, ct);
return;
}

using (e)
{
while (!ct.IsCancellationRequested)
{
var b = default(bool);
var value = default(TSource);

try
{
b = e.MoveNext();

if (b)
{
value = e.Current;
}
}
catch (Exception ex)
{
await observer.OnErrorAsync(ex).RendezVous(scheduler, ct);
return;
}

if (b)
{
await observer.OnNextAsync(value).RendezVous(scheduler, ct);
}
else
{
break;
}
}
}

if (ct.IsCancellationRequested)
return;

var inner = await source.SubscribeSafeAsync(observer).ConfigureAwait(false);
await subscription.AssignAsync(inner).RendezVous(scheduler, ct);
}).ConfigureAwait(false);

return StableCompositeAsyncDisposable.Create(task, subscription);
}
}
}
}

0 comments on commit 9eaa572

Please sign in to comment.