From af7ff4d0a8f761d376089b3a4445a0807b80f64e Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Mon, 8 Feb 2016 00:55:13 +0000 Subject: [PATCH] Break read+write dependency in Stream copy op --- .../StreamCopyOperation.cs | 87 +++++++++++++------ 1 file changed, 59 insertions(+), 28 deletions(-) diff --git a/src/Microsoft.AspNetCore.Http.Extensions/StreamCopyOperation.cs b/src/Microsoft.AspNetCore.Http.Extensions/StreamCopyOperation.cs index c42661a1..65956411 100644 --- a/src/Microsoft.AspNetCore.Http.Extensions/StreamCopyOperation.cs +++ b/src/Microsoft.AspNetCore.Http.Extensions/StreamCopyOperation.cs @@ -14,55 +14,86 @@ namespace Microsoft.AspNetCore.Http.Extensions public static class StreamCopyOperation { private const int DefaultBufferSize = 4096; +#if NET451 + private readonly static Task _completedTask = Task.FromResult(0); +#else + private readonly static Task _completedTask = Task.CompletedTask; +#endif - public static async Task CopyToAsync(Stream source, Stream destination, long? length, CancellationToken cancel) + public static Task CopyToAsync(Stream source, Stream destination, long? length, CancellationToken cancellationToken) { - long? bytesRemaining = length; + Debug.Assert(source != null); + Debug.Assert(destination != null); + Debug.Assert(!length.HasValue || length.Value >= 0); - var buffer = ArrayPool.Shared.Rent(DefaultBufferSize); - try + if (length.HasValue && length.Value <= 0) { - Debug.Assert(source != null); - Debug.Assert(destination != null); - Debug.Assert(!bytesRemaining.HasValue || bytesRemaining.Value >= 0); - Debug.Assert(buffer != null); + return _completedTask; + } + + var buffer0 = ArrayPool.Shared.Rent(DefaultBufferSize); + var buffer1 = ArrayPool.Shared.Rent(DefaultBufferSize); + + Debug.Assert(buffer0 != null); + Debug.Assert(buffer1 != null); + + return ArrayPoolCopyToAsyncInternal(source, destination, buffer0, buffer1, length, cancellationToken); + } - while (true) + private static async Task ArrayPoolCopyToAsyncInternal(Stream source, Stream destination, byte[] buffer0, byte[] buffer1, long? length, CancellationToken cancellationToken) + { + try + { + var count = 0; + var readLength = buffer0.Length; + if (length.HasValue) { - // The natural end of the range. - if (bytesRemaining.HasValue && bytesRemaining.Value <= 0) - { - return; - } + readLength = (int)Math.Min(length.Value, (long)readLength); + } - cancel.ThrowIfCancellationRequested(); + var lastReadTask = source.ReadAsync(buffer0, 0, readLength, cancellationToken); + var lastWriteTask = _completedTask; + var otherBuffer = true; - int readLength = buffer.Length; - if (bytesRemaining.HasValue) + while ((count = await lastReadTask) != 0) + { + if (count == 0) { - readLength = (int)Math.Min(bytesRemaining.Value, (long)readLength); + // End of the source stream. + break; } - int count = await source.ReadAsync(buffer, 0, readLength, cancel); - if (bytesRemaining.HasValue) + readLength = (otherBuffer ? buffer1 : buffer0).Length; + if (length.HasValue) { - bytesRemaining -= count; + readLength = (int)Math.Min(length.Value, (long)readLength); } - // End of the source stream. - if (count == 0) + lastReadTask = source.ReadAsync((otherBuffer ? buffer1 : buffer0), 0, readLength, cancellationToken); + cancellationToken.ThrowIfCancellationRequested(); + + await lastWriteTask; + lastWriteTask = destination.WriteAsync((otherBuffer ? buffer0 : buffer1), 0, count, cancellationToken); + + if (length.HasValue) { - return; + length -= count; + if (length.Value <= 0) + { + // The natural end of the range. + break; + } } - cancel.ThrowIfCancellationRequested(); - - await destination.WriteAsync(buffer, 0, count, cancel); + cancellationToken.ThrowIfCancellationRequested(); + otherBuffer = !otherBuffer; } + await lastWriteTask; } finally { - ArrayPool.Shared.Return(buffer); + ArrayPool.Shared.Return(buffer0); + ArrayPool.Shared.Return(buffer1); } } }