Skip to content
This repository has been archived by the owner on Nov 20, 2018. It is now read-only.

Commit

Permalink
Break read+write dependency in Stream copy op
Browse files Browse the repository at this point in the history
  • Loading branch information
benaadams committed Feb 8, 2016
1 parent 8aa7a09 commit af7ff4d
Showing 1 changed file with 59 additions and 28 deletions.
87 changes: 59 additions & 28 deletions src/Microsoft.AspNetCore.Http.Extensions/StreamCopyOperation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,55 +14,86 @@ namespace Microsoft.AspNetCore.Http.Extensions
public static class StreamCopyOperation
{
private const int DefaultBufferSize = 4096;
#if NET451
private readonly static Task _completedTask = Task.FromResult(0);
#else
private readonly static Task _completedTask = Task.CompletedTask;
#endif

public static async Task CopyToAsync(Stream source, Stream destination, long? length, CancellationToken cancel)
public static Task CopyToAsync(Stream source, Stream destination, long? length, CancellationToken cancellationToken)
{
long? bytesRemaining = length;
Debug.Assert(source != null);
Debug.Assert(destination != null);
Debug.Assert(!length.HasValue || length.Value >= 0);

var buffer = ArrayPool<byte>.Shared.Rent(DefaultBufferSize);
try
if (length.HasValue && length.Value <= 0)
{
Debug.Assert(source != null);
Debug.Assert(destination != null);
Debug.Assert(!bytesRemaining.HasValue || bytesRemaining.Value >= 0);
Debug.Assert(buffer != null);
return _completedTask;
}

var buffer0 = ArrayPool<byte>.Shared.Rent(DefaultBufferSize);
var buffer1 = ArrayPool<byte>.Shared.Rent(DefaultBufferSize);

Debug.Assert(buffer0 != null);
Debug.Assert(buffer1 != null);

return ArrayPoolCopyToAsyncInternal(source, destination, buffer0, buffer1, length, cancellationToken);
}

while (true)
private static async Task ArrayPoolCopyToAsyncInternal(Stream source, Stream destination, byte[] buffer0, byte[] buffer1, long? length, CancellationToken cancellationToken)
{
try
{
var count = 0;
var readLength = buffer0.Length;
if (length.HasValue)
{
// The natural end of the range.
if (bytesRemaining.HasValue && bytesRemaining.Value <= 0)
{
return;
}
readLength = (int)Math.Min(length.Value, (long)readLength);
}

cancel.ThrowIfCancellationRequested();
var lastReadTask = source.ReadAsync(buffer0, 0, readLength, cancellationToken);
var lastWriteTask = _completedTask;
var otherBuffer = true;

int readLength = buffer.Length;
if (bytesRemaining.HasValue)
while ((count = await lastReadTask) != 0)
{
if (count == 0)
{
readLength = (int)Math.Min(bytesRemaining.Value, (long)readLength);
// End of the source stream.
break;
}
int count = await source.ReadAsync(buffer, 0, readLength, cancel);

if (bytesRemaining.HasValue)
readLength = (otherBuffer ? buffer1 : buffer0).Length;
if (length.HasValue)
{
bytesRemaining -= count;
readLength = (int)Math.Min(length.Value, (long)readLength);
}

// End of the source stream.
if (count == 0)
lastReadTask = source.ReadAsync((otherBuffer ? buffer1 : buffer0), 0, readLength, cancellationToken);
cancellationToken.ThrowIfCancellationRequested();

await lastWriteTask;
lastWriteTask = destination.WriteAsync((otherBuffer ? buffer0 : buffer1), 0, count, cancellationToken);

if (length.HasValue)
{
return;
length -= count;
if (length.Value <= 0)
{
// The natural end of the range.
break;
}
}

cancel.ThrowIfCancellationRequested();

await destination.WriteAsync(buffer, 0, count, cancel);
cancellationToken.ThrowIfCancellationRequested();
otherBuffer = !otherBuffer;
}
await lastWriteTask;
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
ArrayPool<byte>.Shared.Return(buffer0);
ArrayPool<byte>.Shared.Return(buffer1);
}
}
}
Expand Down

0 comments on commit af7ff4d

Please sign in to comment.