Skip to content
This repository has been archived by the owner on Jan 23, 2023. It is now read-only.
/ corefx Public archive

Optimize overlapped I/O FileStream.CopyToAsync implementation on Windows #11569

Merged
merged 2 commits into from
Sep 13, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/System.IO.FileSystem/src/Resources/Strings.resx
Original file line number Diff line number Diff line change
Expand Up @@ -291,4 +291,7 @@
<data name="UnknownError_Num" xml:space="preserve">
<value>Unknown error '{0}'.</value>
</data>
<data name="ObjectDisposed_StreamClosed" xml:space="preserve">
<value>Cannot access a closed Stream.</value>
</data>
</root>
320 changes: 320 additions & 0 deletions src/System.IO.FileSystem/src/System/IO/Win32FileStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Win32.SafeHandles;
using System.Runtime.CompilerServices;

/*
* Win32FileStream supports different modes of accessing the disk - async mode
Expand Down Expand Up @@ -1684,6 +1685,325 @@ private int GetLastWin32ErrorAndDisposeHandleIfInvalid(bool throwIfInvalidHandle
return errorCode;
}

public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
{
// Validate arguments as would the base implementation
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These lines hurt my eyes. Is this really in line with our style guidelines?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really in line with our style guidelines?

We allow for single lines like this. I find this easier to follow than tripling the size of the arg validation by moving the bodies to their own lines and surrounding by braces on their own lines. But I don't feel strongly about it and can change it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My preference is to match the pattern in the rest of the file.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok.

if (destination == null)
{
throw new ArgumentNullException(nameof(destination));
}
if (bufferSize <= 0)
{
throw new ArgumentOutOfRangeException(nameof(bufferSize), SR.ArgumentOutOfRange_NeedPosNum);
}
bool parentCanRead = _parent.CanRead;
if (!parentCanRead && !_parent.CanWrite)
{
throw new ObjectDisposedException(null, SR.ObjectDisposed_StreamClosed);
}
bool destinationCanWrite = destination.CanWrite;
if (!destination.CanRead && !destinationCanWrite)
{
throw new ObjectDisposedException(nameof(destination), SR.ObjectDisposed_StreamClosed);
}
if (!parentCanRead)
{
throw new NotSupportedException(SR.NotSupported_UnreadableStream);
}
if (!destinationCanWrite)
{
throw new NotSupportedException(SR.NotSupported_UnwritableStream);
}

// Bail early for cancellation if cancellation has been requested
if (cancellationToken.IsCancellationRequested)
{
return Task.FromCanceled<int>(cancellationToken);
}

// Fail if the file was closed
if (_handle.IsClosed)
{
throw Error.GetFileNotOpen();
}

// Do the async copy, with differing implementations based on whether the FileStream was opened as async or sync
Debug.Assert((_readPos == 0 && _readLen == 0 && _writePos >= 0) || (_writePos == 0 && _readPos <= _readLen), "We're either reading or writing, but not both.");
return _isAsync ?
AsyncModeCopyToAsync(destination, bufferSize, cancellationToken) :
base.CopyToAsync(destination, bufferSize, cancellationToken);
}

private async Task AsyncModeCopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
{
Debug.Assert(_isAsync, "This implementation is for async mode only");
Debug.Assert(!_handle.IsClosed, "!_handle.IsClosed");
Debug.Assert(_parent.CanRead, "_parent.CanRead");

// Make sure any pending writes have been flushed before we do a read.
if (_writePos > 0)
{
await FlushWriteAsync(cancellationToken).ConfigureAwait(false);
}

// Typically CopyToAsync would be invoked as the only "read" on the stream, but it's possible some reading is
// done and then the CopyToAsync is issued. For that case, see if we have any data available in the buffer.
if (_buffer != null)
{
int bufferedBytes = _readLen - _readPos;
if (bufferedBytes > 0)
{
await destination.WriteAsync(_buffer, _readPos, bufferedBytes, cancellationToken).ConfigureAwait(false);
_readPos = _readLen = 0;
}
}

// For efficiency, we avoid creating a new task and associated state for each asynchronous read.
// Instead, we create a single reusable awaitable object that will be triggered when an await completes
// and reset before going again.
var readAwaitable = new AsyncCopyToAwaitable(this);

// Make sure we are reading from the position that we think we are.
// Only set the position in the awaitable if we can seek (e.g. not for pipes).
bool canSeek = _parent.CanSeek;
if (canSeek)
{
if (_exposedHandle)
{
VerifyOSHandlePosition();
}
readAwaitable._position = _pos;
}

// Create the buffer to use for the copy operation, as the base CopyToAsync does. We don't try to use
// _buffer here, even if it's not null, as concurrent operations are allowed, and another operation may
// actually be using the buffer already. Plus, it'll be rare for _buffer to be non-null, as typically
// CopyToAsync is used as the only operation performed on the stream, and the buffer is lazily initialized.
// Further, typically the CopyToAsync buffer size will be larger than that used by the FileStream, such that
// we'd likely be unable to use it anyway. A better option than using _buffer would be a future pooling solution.
byte[] copyBuffer = new byte[bufferSize];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you considered using ArrayPool here?

Copy link
Member Author

@stephentoub stephentoub Sep 12, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to. I previously tried that in CopyToAsync, though, and we backed it out.
#5954 (comment)

I'm reading through all the comments again, though, and there wasn't really a strong conclusion about using it in CopyToAsync specifically (or if there was, I'm missing it). The danger in a case like this is that we pass the buffer to the target stream's WriteAsync, and its WriteAsync holds on to the buffer beyond when the task it returns completes. Then we return the buffer to the pool, and some other code elsewhere uses the same buffer, while that WriteAsync may still be erroneously using it.

But, this seems like a reasonable risk to me for the benefit pooling would provide. These are typically very large buffers, and it'd be great to avoid re-allocating it for each copy. Especially since with an optimization like this, we're now forcing users to choose between the benefits of the optimized CopyToAsync from FileStream, and using their own copying implementation that does pool a buffer and just reads/writes from source to target. The main concern with pooling in our streams is non-localized corruption, something over here causing a problem with something over there that's completely unrelated... a valid concern, but the liklihood of that in this situation is low and highlights a very buggy stream implementation that's likely to manifest problems in other ways as well (if the task is returned before it's actually done its work).

@jkotas, @KrzysztofCwalina, @ericstj, @weshaggard, could you guys weigh in again on this specific case, using ArrayPool<byte>.Default to get a buffer used in our stream's CopyToAsync overrides?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using ArrayPool<byte>.Default to get a buffer used in our stream's CopyToAsync overrides?

Sounds reasonable to me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

YESS!!!!!!!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, barring any other feedback to the contrary, I'll do this (in a separate PR).


// Allocate an Overlapped we can use repeatedly for all operations
var awaitableOverlapped = new PreAllocatedOverlapped(AsyncCopyToAwaitable.s_callback, readAwaitable, copyBuffer);
var cancellationReg = default(CancellationTokenRegistration);
try
{
// Register for cancellation. We do this once for the whole copy operation, and just try to cancel
// whatever read operation may currently be in progress, if there is one. It's possible the cancellation
// request could come in between operations, in which case we flag that with explicit calls to ThrowIfCancellationRequested
// in the read/write copy loop.
if (cancellationToken.CanBeCanceled)
{
cancellationReg = cancellationToken.Register(s =>
{
var innerAwaitable = (AsyncCopyToAwaitable)s;
unsafe
{
lock (innerAwaitable.CancellationLock) // synchronize with cleanup of the overlapped
{
if (innerAwaitable._nativeOverlapped != null)
{
// Try to cancel the I/O. We ignore the return value, as cancellation is opportunistic and we
// don't want to fail the operation because we couldn't cancel it.
Interop.mincore.CancelIoEx(innerAwaitable._fileStream._handle, innerAwaitable._nativeOverlapped);
}
}
}
}, readAwaitable);
}

// Repeatedly read from this FileStream and write the results to the destination stream.
while (true)
{
cancellationToken.ThrowIfCancellationRequested();
readAwaitable.ResetForNextOperation();

try
{
bool synchronousSuccess;
int errorCode;
unsafe
{
// Allocate a native overlapped for our reusable overlapped, and set position to read based on the next
// desired address stored in the awaitable. (This position may be 0, if either we're at the beginning or
// if the stream isn't seekable.)
readAwaitable._nativeOverlapped = _handle.ThreadPoolBinding.AllocateNativeOverlapped(awaitableOverlapped);
if (canSeek)
{
readAwaitable._nativeOverlapped->OffsetLow = unchecked((int)readAwaitable._position);
readAwaitable._nativeOverlapped->OffsetHigh = (int)(readAwaitable._position >> 32);
}

// Kick off the read.
synchronousSuccess = ReadFileNative(_handle, copyBuffer, 0, copyBuffer.Length, readAwaitable._nativeOverlapped, out errorCode) >= 0;
}

// If the operation did not synchronously succeed, it either failed or initiated the asynchronous operation.
if (!synchronousSuccess)
{
switch (errorCode)
{
case ERROR_IO_PENDING:
// Async operation in progress.
break;
case ERROR_BROKEN_PIPE:
case ERROR_HANDLE_EOF:
// We're at or past the end of the file, and the overlapped callback
// won't be raised in these cases. Mark it as completed so that the await
// below will see it as such.
readAwaitable.MarkCompleted();
break;
default:
// Everything else is an error (and there won't be a callback).
throw Win32Marshal.GetExceptionForWin32Error(errorCode);
}
}

// Wait for the async operation (which may or may not have already completed), then throw if it failed.
await readAwaitable;
switch (readAwaitable._errorCode)
{
case 0: // success
Debug.Assert(readAwaitable._numBytes >= 0, $"Expected non-negative numBytes, got {readAwaitable._numBytes}");
break;
case ERROR_BROKEN_PIPE: // logically success with 0 bytes read (write end of pipe closed)
case ERROR_HANDLE_EOF: // logically success with 0 bytes read (read at end of file)
Debug.Assert(readAwaitable._numBytes == 0, $"Expected 0 bytes read, got {readAwaitable._numBytes}");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this have previously caused a throw which failed the copy?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous implementation limits the number of bytes requested to what's left in the file, such that it tries to not hit this case. That forces multiple additional synchronous I/O calls on each read, such as to get the length of the file and the current position prior to each read. This implementation avoids that by simply going until it has no more data left to read, and in the process it does avoid throwing in this case. I don't see anything wrong with not throwing here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so long as we aren't introducing a behavior diff I'm ok with this. I wish more of this code could be have been shared with existing ReadAsync codepath.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naw, needs moar perf 😉

break;
case Interop.mincore.Errors.ERROR_OPERATION_ABORTED: // canceled
throw new OperationCanceledException(cancellationToken.IsCancellationRequested ? cancellationToken : new CancellationToken(true));
default: // error
throw Win32Marshal.GetExceptionForWin32Error((int)readAwaitable._errorCode);
}

// Successful operation. If we got zero bytes, we're done: exit the read/write loop.
// Otherwise, update the read position for next time accordingly.
if (readAwaitable._numBytes == 0)
{
break;
}
else if (canSeek)
{
readAwaitable._position += (int)readAwaitable._numBytes;
}
}
finally
{
// Free the resources for this read operation
unsafe
{
NativeOverlapped* overlapped;
lock (readAwaitable.CancellationLock) // just an Exchange, but we need this to be synchronized with cancellation, so using the same lock
{
overlapped = readAwaitable._nativeOverlapped;
readAwaitable._nativeOverlapped = null;
}
if (overlapped != null)
{
_handle.ThreadPoolBinding.FreeNativeOverlapped(overlapped);
}
}
}

// Write out the read data.
await destination.WriteAsync(copyBuffer, 0, (int)readAwaitable._numBytes, cancellationToken).ConfigureAwait(false);
}
}
finally
{
// Cleanup from the whole copy operation
cancellationReg.Dispose();
awaitableOverlapped.Dispose();

// Make sure the stream's current position reflects where we ended up
if (!_handle.IsClosed && _parent.CanSeek)
{
SeekCore(0, SeekOrigin.End);
}
}
}

/// <summary>Used by CopyToAsync to enable awaiting the result of an overlapped I/O operation with minimal overhead.</summary>
private sealed unsafe class AsyncCopyToAwaitable : ICriticalNotifyCompletion
{
/// <summary>Sentinel object used to indicate that the I/O operation has completed before being awaited.</summary>
private readonly static Action s_sentinel = () => { };
/// <summary>Cached delegate to IOCallback.</summary>
internal static readonly IOCompletionCallback s_callback = IOCallback;

/// <summary>The FileStream that owns this instance.</summary>
internal readonly Win32FileStream _fileStream;

/// <summary>Tracked position representing the next location from which to read.</summary>
internal long _position;
/// <summary>The current native overlapped pointer. This changes for each operation.</summary>
internal NativeOverlapped* _nativeOverlapped;
/// <summary>
/// null if the operation is still in progress,
/// s_sentinel if the I/O operation completed before the await,
/// s_callback if it completed after the await yielded.
/// </summary>
internal Action _continuation;
/// <summary>Last error code from completed operation.</summary>
internal uint _errorCode;
/// <summary>Last number of read bytes from completed operation.</summary>
internal uint _numBytes;

/// <summary>Lock object used to protect cancellation-related access to _nativeOverlapped.</summary>
internal object CancellationLock => this;

/// <summary>Initialize the awaitable.</summary>
internal unsafe AsyncCopyToAwaitable(Win32FileStream fileStream)
{
_fileStream = fileStream;
}

/// <summary>Reset state to prepare for the next read operation.</summary>
internal void ResetForNextOperation()
{
Debug.Assert(_position >= 0, $"Expected non-negative position, got {_position}");
_continuation = null;
_errorCode = 0;
_numBytes = 0;
}

/// <summary>Overlapped callback: store the results, then invoke the continuation delegate.</summary>
internal unsafe static void IOCallback(uint errorCode, uint numBytes, NativeOverlapped* pOVERLAP)
{
var awaitable = (AsyncCopyToAwaitable)ThreadPoolBoundHandle.GetNativeOverlappedState(pOVERLAP);

Debug.Assert(awaitable._continuation != s_sentinel, "Sentinel must not have already been set as the continuation");
awaitable._errorCode = errorCode;
awaitable._numBytes = numBytes;

(awaitable._continuation ?? Interlocked.CompareExchange(ref awaitable._continuation, s_sentinel, null))?.Invoke();
}

/// <summary>
/// Called when it's known that the I/O callback for an operation will not be invoked but we'll
/// still be awaiting the awaitable.
/// </summary>
internal void MarkCompleted()
{
Debug.Assert(_continuation == null, "Expected null continuation");
_continuation = s_sentinel;
}

public AsyncCopyToAwaitable GetAwaiter() => this;
public bool IsCompleted => _continuation == s_sentinel;
public void GetResult() { }
public void OnCompleted(Action continuation) => UnsafeOnCompleted(continuation);
public void UnsafeOnCompleted(Action continuation)
{
if (_continuation == s_sentinel ||
Interlocked.CompareExchange(ref _continuation, continuation, null) != null)
{
Debug.Assert(_continuation == s_sentinel, $"Expected continuation set to s_sentinel, got ${_continuation}");
Task.Run(continuation);
}
}
}

[System.Security.SecuritySafeCritical]
public override Task<int> ReadAsync(Byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
Expand Down