Skip to content

Commit

Permalink
Implementing more Append overloads.
Browse files Browse the repository at this point in the history
  • Loading branch information
bartdesmet committed Sep 26, 2017
1 parent cad34bd commit 60fb0ab
Showing 1 changed file with 88 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,30 @@ public static (IAsyncObserver<TSource>, IAsyncDisposable) Append<TSource>(IAsync
if (scheduler == null)
throw new ArgumentNullException(nameof(scheduler));

throw new NotImplementedException();
var d = new SingleAssignmentAsyncDisposable();

return
(
Create<TSource>(
observer.OnNextAsync,
observer.OnErrorAsync,
async () =>
{
var task = await scheduler.ScheduleAsync(async ct =>
{
for (var i = 0; i < values.Length && !ct.IsCancellationRequested; i++)
{
await observer.OnNextAsync(values[i]).RendezVous(scheduler, ct);
}

await observer.OnCompletedAsync().RendezVous(scheduler, ct);
}).ConfigureAwait(false);

await d.AssignAsync(task).ConfigureAwait(false);
}
),
d
);
}

public static IAsyncObserver<TSource> Append<TSource>(IAsyncObserver<TSource> observer, IEnumerable<TSource> values)
Expand Down Expand Up @@ -227,7 +250,70 @@ public static (IAsyncObserver<TSource>, IAsyncDisposable) Append<TSource>(IAsync
if (scheduler == null)
throw new ArgumentNullException(nameof(scheduler));

throw new NotImplementedException();
var d = new SingleAssignmentAsyncDisposable();

return
(
Create<TSource>(
observer.OnNextAsync,
observer.OnErrorAsync,
async () =>
{
var task = await scheduler.ScheduleAsync(async ct =>
{
var e = default(IEnumerator<TSource>);

try
{
e = values.GetEnumerator();
}
catch (Exception ex)
{
await observer.OnErrorAsync(ex).RendezVous(scheduler, ct);
return;
}

using (e)
{
while (!ct.IsCancellationRequested)
{
var b = default(bool);
var value = default(TSource);

try
{
b = e.MoveNext();

if (b)
{
value = e.Current;
}
}
catch (Exception ex)
{
await observer.OnErrorAsync(ex).RendezVous(scheduler, ct);
return;
}

if (b)
{
await observer.OnNextAsync(value).RendezVous(scheduler, ct);
}
else
{
break;
}
}
}

await observer.OnCompletedAsync().RendezVous(scheduler, ct);
}).ConfigureAwait(false);

await d.AssignAsync(task).ConfigureAwait(false);
}
),
d
);
}
}
}

0 comments on commit 60fb0ab

Please sign in to comment.