Skip to content
This repository has been archived by the owner on Jan 23, 2023. It is now read-only.
/ corefx Public archive

Commit

Permalink
Use System.Buffers in System.IO.FileSystem
Browse files Browse the repository at this point in the history
- Change FileStream to use ```ArrayPool<byte>.Shared``` for its internal buffer
- Add to Common a helper implementation of CopyToAsync that uses ```ArrayPool<byte>.Shared``` until such type as the base Stream (in mscorlib) can use a pooled buffer
- Utilize that CopyToAsync helper in FileStream

Having added that:
- Utilize that CopyToAsync helper in DeflateStream/GZipStream, as System.IO.Compression already depends on System.Buffers.
  • Loading branch information
stephentoub committed Feb 8, 2016
1 parent e5066d3 commit 0a4bfb5
Show file tree
Hide file tree
Showing 12 changed files with 175 additions and 30 deletions.
64 changes: 64 additions & 0 deletions src/Common/src/System/IO/StreamHelpers.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System.Buffers;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace System.IO
{
/// <summary>Provides methods to help in the implementation of Stream-derived types.</summary>
internal static class StreamHelpers
{
/// <summary>
/// Provides an implementation usable as an override of Stream.CopyToAsync but that uses the shared
/// ArrayPool for the intermediate buffer rather than allocating a new buffer each time.
/// </summary>
/// <remarks>
/// If/when the base CopyToAsync implementation is changed to use a pooled buffer,
/// this will no longer be necessary.
/// </remarks>
public static Task ArrayPoolCopyToAsync(Stream source, Stream destination, int bufferSize, CancellationToken cancellationToken)
{
Debug.Assert(source != null);

if (destination == null)
{
throw new ArgumentNullException("destination");
}
if (bufferSize <= 0)
{
throw new ArgumentOutOfRangeException("bufferSize", bufferSize, SR.ArgumentOutOfRange_NeedPosNum);
}
if (!source.CanRead)
{
throw new NotSupportedException(SR.NotSupported_UnreadableStream);
}
if (!destination.CanWrite)
{
throw new NotSupportedException(SR.NotSupported_UnwritableStream);
}

byte[] buffer = ArrayPool<byte>.Shared.Rent(bufferSize); // rented here to reduce size of async state machine boxed object
return ArrayPoolCopyToAsyncInternal(source, destination, buffer, cancellationToken);
}

private static async Task ArrayPoolCopyToAsyncInternal(Stream source, Stream destination, byte[] buffer, CancellationToken cancellationToken)
{
try
{
int bytesRead;
while ((bytesRead = await source.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false)) != 0)
{
await destination.WriteAsync(buffer, 0, bytesRead, cancellationToken).ConfigureAwait(false);
}
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
}
}
}
13 changes: 8 additions & 5 deletions src/System.IO.Compression/src/Resources/Strings.resx
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@
<data name="ArgumentOutOfRange_Enum" xml:space="preserve">
<value>Enum value was out of legal range.</value>
</data>
<data name="ArgumentOutOfRange_NeedPosNum" xml:space="preserve">
<value>Positive number required.</value>
</data>
<data name="CannotReadFromDeflateStream" xml:space="preserve">
<value>Reading from the compression stream is not supported.</value>
</data>
Expand All @@ -141,14 +144,14 @@
<data name="InvalidHuffmanData" xml:space="preserve">
<value>Failed to construct a huffman tree using the length array. The stream might be corrupted.</value>
</data>
<data name="NotReadableStream" xml:space="preserve">
<value>The base stream is not readable.</value>
</data>
<data name="NotSupported" xml:space="preserve">
<value>This operation is not supported.</value>
</data>
<data name="NotWriteableStream" xml:space="preserve">
<value>The base stream is not writeable.</value>
<data name="NotSupported_UnreadableStream" xml:space="preserve">
<value>Stream does not support reading.</value>
</data>
<data name="NotSupported_UnwritableStream" xml:space="preserve">
<value>Stream does not support writing.</value>
</data>
<data name="ObjectDisposed_StreamClosed" xml:space="preserve">
<value>Can not access a closed Stream.</value>
Expand Down
3 changes: 3 additions & 0 deletions src/System.IO.Compression/src/System.IO.Compression.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
<Compile Include="$(CommonPath)\System\IO\PathInternal.cs">
<Link>Common\System\IO\PathInternal.cs</Link>
</Compile>
<Compile Include="$(CommonPath)\System\IO\StreamHelpers.cs">
<Link>Common\System\IO\StreamHelpers.cs</Link>
</Compile>
</ItemGroup>
<!-- Files exclusive to Core -->
<ItemGroup Condition="'$(TargetGroup)' != 'net46'">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ internal void InitializeInflater(Stream stream, bool leaveOpen, int windowBits)
{
Debug.Assert(stream != null);
if (!stream.CanRead)
throw new ArgumentException(SR.NotReadableStream, "stream");
throw new ArgumentException(SR.NotSupported_UnreadableStream, "stream");

_inflater = new Inflater(windowBits);

Expand All @@ -107,7 +107,7 @@ internal void InitializeDeflater(Stream stream, bool leaveOpen, int windowBits,
{
Debug.Assert(stream != null);
if (!stream.CanWrite)
throw new ArgumentException(SR.NotWriteableStream, "stream");
throw new ArgumentException(SR.NotSupported_UnreadableStream, "stream");

_deflater = new Deflater(compressionLevel, windowBits);

Expand Down Expand Up @@ -333,6 +333,11 @@ private static void ThrowCannotWriteToDeflateStreamException()
throw new InvalidOperationException(SR.CannotWriteToDeflateStream);
}

public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
{
return StreamHelpers.ArrayPoolCopyToAsync(this, destination, bufferSize, cancellationToken);
}

public override Task<int> ReadAsync(Byte[] array, int offset, int count, CancellationToken cancellationToken)
{
EnsureDecompressionMode();
Expand Down Expand Up @@ -373,7 +378,7 @@ public override Task<int> ReadAsync(Byte[] array, int offset, int count, Cancell
readTask = _stream.ReadAsync(_buffer, 0, _buffer.Length, cancellationToken);
if (readTask == null)
{
throw new InvalidOperationException(SR.NotReadableStream);
throw new InvalidOperationException(SR.NotSupported_UnreadableStream);
}

return ReadAsyncCore(readTask, array, offset, count, cancellationToken);
Expand Down Expand Up @@ -422,7 +427,7 @@ private async Task<int> ReadAsyncCore(Task<int> readTask, byte[] array, int offs
readTask = _stream.ReadAsync(_buffer, 0, _buffer.Length, cancellationToken);
if (readTask == null)
{
throw new InvalidOperationException(SR.NotReadableStream);
throw new InvalidOperationException(SR.NotSupported_UnreadableStream);
}
}
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,11 @@ public Stream BaseStream
}
}

public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
{
return StreamHelpers.ArrayPoolCopyToAsync(this, destination, bufferSize, cancellationToken);
}

public override Task<int> ReadAsync(Byte[] array, int offset, int count, CancellationToken cancellationToken)
{
CheckDeflateStream();
Expand Down
13 changes: 13 additions & 0 deletions src/System.IO.Compression/tests/DeflateStreamTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,19 @@ public void ReadWriteArgumentValidation()
}
}

[Fact]
public void CopyToAsyncArgumentValidation()
{
using (DeflateStream ds = new DeflateStream(new MemoryStream(), CompressionMode.Decompress))
{
Assert.Throws<ArgumentNullException>("destination", () => { ds.CopyToAsync(null); });
Assert.Throws<ArgumentOutOfRangeException>("bufferSize", () => { ds.CopyToAsync(new MemoryStream(), 0); });
Assert.Throws<NotSupportedException>(() => { ds.CopyToAsync(new MemoryStream(new byte[1], writable: false)); });
ds.Dispose();
Assert.Throws<NotSupportedException>(() => { ds.CopyToAsync(new MemoryStream()); });
}
}

[Fact]
public void Precancellation()
{
Expand Down
26 changes: 12 additions & 14 deletions src/System.IO.Compression/tests/GZipStreamTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -143,20 +143,7 @@ private static async Task DecompressAsync(MemoryStream compareStream, MemoryStre
var zip = new GZipStream(gzStream, CompressionMode.Decompress);

var GZipStream = new MemoryStream();

int _bufferSize = 1024;
var bytes = new Byte[_bufferSize];
bool finished = false;
int retCount;
while (!finished)
{
retCount = await zip.ReadAsync(bytes, 0, _bufferSize);

if (retCount != 0)
await GZipStream.WriteAsync(bytes, 0, retCount);
else
finished = true;
}
await zip.CopyToAsync(GZipStream);

GZipStream.Position = 0;
compareStream.Position = 0;
Expand Down Expand Up @@ -225,6 +212,17 @@ public void WriteOnlyStreamThrowsOnDecompress()
});
}

[Fact]
public void CopyToAsyncArgumentValidation()
{
using (GZipStream gs = new GZipStream(new MemoryStream(), CompressionMode.Decompress))
{
Assert.Throws<ArgumentNullException>("destination", () => { gs.CopyToAsync(null); });
Assert.Throws<ArgumentOutOfRangeException>("bufferSize", () => { gs.CopyToAsync(new MemoryStream(), 0); });
Assert.Throws<NotSupportedException>(() => { gs.CopyToAsync(new MemoryStream(new byte[1], writable: false)); });
}
}

[Fact]
public void TestCtors()
{
Expand Down
3 changes: 3 additions & 0 deletions src/System.IO.FileSystem/src/System.IO.FileSystem.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@
<Compile Include="$(CommonPath)\System\Collections\Generic\EnumerableHelpers.cs">
<Link>Common\System\Collections\Generic\EnumerableHelpers.cs</Link>
</Compile>
<Compile Include="$(CommonPath)\System\IO\StreamHelpers.cs">
<Link>Common\System\IO\StreamHelpers.cs</Link>
</Compile>
<Compile Include="$(CommonPath)\System\IO\StringBuilderCache.cs">
<Link>Common\System\IO\StringBuilderCache.cs</Link>
</Compile>
Expand Down
27 changes: 25 additions & 2 deletions src/System.IO.FileSystem/src/System/IO/UnixFileStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// See the LICENSE file in the project root for more information.

using Microsoft.Win32.SafeHandles;
using System.Buffers;
using System.Diagnostics;
using System.Diagnostics.Contracts;
using System.Runtime.InteropServices;
Expand Down Expand Up @@ -198,8 +199,8 @@ internal UnixFileStream(SafeFileHandle handle, FileAccess access, int bufferSize
/// <returns>The buffer.</returns>
private byte[] GetBuffer()
{
Debug.Assert(_buffer == null || _buffer.Length == _bufferLength);
return _buffer ?? (_buffer = new byte[_bufferLength]);
Debug.Assert(_buffer == null || _buffer.Length >= _bufferLength);
return _buffer ?? (_buffer = ArrayPool<byte>.Shared.Rent(_bufferLength));
}

/// <summary>Translates the FileMode, FileAccess, and FileOptions values into flags to be passed when opening the file.</summary>
Expand Down Expand Up @@ -457,6 +458,15 @@ protected override void Dispose(bool disposing)
{
_fileHandle.Dispose();
}
if (_buffer != null && (_fileHandle == null || _fileHandle.IsClosed))
{
// Return the buffer only if the handle is closed. This helps to avoid race conditions where
// incorrectly Dispose'ing the stream while it's still in use could end up using a buffer concurrently
// with it being returned to the pool.
byte[] buffer = _buffer;
_buffer = null;
ArrayPool<byte>.Shared.Return(buffer);
}
base.Dispose(disposing);
}
}
Expand Down Expand Up @@ -763,6 +773,19 @@ private unsafe int ReadNative(byte[] array, int offset, int count)
return bytesRead;
}

/// <summary>
/// Asynchronously reads the bytes from the current stream and writes them to another
/// stream, using a specified buffer size.
/// </summary>
/// <param name="destination">The stream to which the contents of the current stream will be copied.</param>
/// <param name="bufferSize">The size, in bytes, of the buffer. This value must be greater than zero.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
/// <returns>A task that represents the asynchronous copy operation.</returns>
public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
{
return StreamHelpers.ArrayPoolCopyToAsync(this, destination, bufferSize, cancellationToken);
}

/// <summary>
/// Asynchronously reads a sequence of bytes from the current stream and advances
/// the position within the stream by the number of bytes read.
Expand Down
24 changes: 19 additions & 5 deletions src/System.IO.FileSystem/src/System/IO/Win32FileStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System.Buffers;
using System.Diagnostics;
using System.Diagnostics.Contracts;
using System.Runtime.InteropServices;
Expand Down Expand Up @@ -468,10 +469,18 @@ protected override void Dispose(bool disposing)
_canRead = false;
_canWrite = false;
_canSeek = false;
// Don't set the buffer to null, to avoid a NullReferenceException
// when users have a race condition in their code (ie, they call
// Close when calling another method on Stream like Read).
//_buffer = null;

if (_buffer != null && (_handle == null || _handle.IsClosed))
{
// Return the buffer only if the handle is closed and only after we've marked the stream
// as non-readable and non-writable. This helps to avoid race conditions where incorrectly Dispose'ing
// the stream while it's still in use could end up using a buffer concurrently with it being.
// returned to the pool.
byte[] buffer = _buffer;
_buffer = null;
ArrayPool<byte>.Shared.Return(buffer, clearArray: true);
}

base.Dispose(disposing);
}
}
Expand Down Expand Up @@ -905,7 +914,7 @@ private void AllocateBuffer()
Debug.Assert(_buffer == null);
Debug.Assert(_preallocatedOverlapped == null);

_buffer = new byte[_bufferSize]; // TODO: Issue #5598: Use ArrayPool.
_buffer = ArrayPool<byte>.Shared.Rent(_bufferSize);
if (_isAsync)
{
_preallocatedOverlapped = new PreAllocatedOverlapped(s_ioCallback, this, _buffer);
Expand Down Expand Up @@ -1689,6 +1698,11 @@ private int GetLastWin32ErrorAndDisposeHandleIfInvalid(bool throwIfInvalidHandle
return errorCode;
}

public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
{
return StreamHelpers.ArrayPoolCopyToAsync(this, destination, bufferSize, cancellationToken);
}

[System.Security.SecuritySafeCritical]
public override Task<int> ReadAsync(Byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
Expand Down
1 change: 1 addition & 0 deletions src/System.IO.FileSystem/src/project.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"dependencies": {
"System.Buffers": "4.0.0-rc3-23803",
"System.Collections": "4.0.10",
"System.Diagnostics.Contracts": "4.0.0",
"System.Diagnostics.Debug": "4.0.10",
Expand Down
13 changes: 13 additions & 0 deletions src/System.IO.FileSystem/tests/FileStream/WriteAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,19 @@ public Task CopyToAsyncBetweenFileStreams()
numWrites: 10);
}

[Fact]
public void CopyToAsync_InvalidArgs_Throws()
{
using (FileStream fs = new FileStream(GetTestFilePath(), FileMode.Create))
{
Assert.Throws<ArgumentNullException>("destination", () => { fs.CopyToAsync(null); });
Assert.Throws<ArgumentOutOfRangeException>("bufferSize", () => { fs.CopyToAsync(new MemoryStream(), 0); });
Assert.Throws<NotSupportedException>(() => { fs.CopyToAsync(new MemoryStream(new byte[1], writable: false)); });
fs.Dispose();
Assert.Throws<NotSupportedException>(() => { fs.CopyToAsync(new MemoryStream()); });
}
}

[Theory]
[MemberData("MemberData_FileStreamAsyncWriting")]
[OuterLoop] // many combinations: we test just one in inner loop and the rest outer
Expand Down

0 comments on commit 0a4bfb5

Please sign in to comment.