Skip to content

Commit

Permalink
Code review feedback for transcoding Streams
Browse files Browse the repository at this point in the history
  • Loading branch information
pranavkm committed Jul 15, 2019
1 parent 7a1b867 commit 8144c5a
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 107 deletions.
13 changes: 11 additions & 2 deletions src/Mvc/Mvc.Core/src/Formatters/SystemTextJsonOutputFormatter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.IO;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc.Formatters.Json;
Expand Down Expand Up @@ -65,13 +66,21 @@ public sealed override async Task WriteResponseBodyAsync(OutputFormatterWriteCon
// the behavior you get when the user does not declare the return type and with Json.Net at least at the top level.
var objectType = context.Object?.GetType() ?? context.ObjectType;
await JsonSerializer.SerializeAsync(writeStream, context.Object, objectType, SerializerOptions);

// The transcoding streams use Encoders and Decoders that have internal buffers. We need to flush these
// when there is no more data to be written. Stream.FlushAsync isn't suitable since it's
// acceptable to Flush a Stream (multiple times) prior to completion.
if (writeStream is TranscodingWriteStream transcodingStream)
{
await transcodingStream.FinalWriteAsync(CancellationToken.None);
}
await writeStream.FlushAsync();
}
finally
{
if (writeStream is TranscodingWriteStream transcoding)
if (writeStream is TranscodingWriteStream transcodingStream)
{
await transcoding.DisposeAsync();
await transcodingStream.DisposeAsync();
}
}
}
Expand Down
117 changes: 49 additions & 68 deletions src/Mvc/Mvc.Core/src/Formatters/TranscodingReadStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,20 @@
using System.Diagnostics;
using System.IO;
using System.Text;
using System.Text.Unicode;
using System.Threading;
using System.Threading.Tasks;

namespace Microsoft.AspNetCore.Mvc.Formatters.Json
{
internal sealed class TranscodingReadStream : Stream
{
private const int OverflowBufferSize = 4; // The most number of bytes used to represent a single UTF char

internal const int MaxByteBufferSize = 4096;
internal const int MaxCharBufferSize = 3 * MaxByteBufferSize;
private static readonly int MaxByteCountForUTF8Char = Encoding.UTF8.GetMaxByteCount(charCount: 1);

private readonly Stream _stream;
private readonly Encoder _encoder;
private readonly Decoder _decoder;

private ArraySegment<byte> _byteBuffer;
Expand Down Expand Up @@ -48,19 +49,23 @@ public TranscodingReadStream(Stream input, Encoding sourceEncoding)
count: 0);

_overflowBuffer = new ArraySegment<byte>(
ArrayPool<byte>.Shared.Rent(MaxByteCountForUTF8Char),
ArrayPool<byte>.Shared.Rent(OverflowBufferSize),
0,
count: 0);

_encoder = Encoding.UTF8.GetEncoder();
_decoder = sourceEncoding.GetDecoder();
}

public override bool CanRead => true;
public override bool CanSeek => false;
public override bool CanWrite => false;
public override long Length => throw new NotSupportedException();
public override long Position { get; set; }

public override long Position
{
get => throw new NotSupportedException();
set => throw new NotSupportedException();
}

internal int ByteBufferCount => _byteBuffer.Count;
internal int CharBufferCount => _charBuffer.Count;
Expand All @@ -76,6 +81,11 @@ public override async Task<int> ReadAsync(byte[] buffer, int offset, int count,
{
ThrowArgumentOutOfRangeException(buffer, offset, count);

if (count == 0)
{
return 0;
}

var readBuffer = new ArraySegment<byte>(buffer, offset, count);

if (_overflowBuffer.Count > 0)
Expand All @@ -90,76 +100,50 @@ public override async Task<int> ReadAsync(byte[] buffer, int offset, int count,
return bytesToCopy;
}

var totalBytes = 0;
bool encoderCompleted;
int bytesEncoded;
if (_charBuffer.Count == 0)
{
// Only read more content from the input stream if we have exhausted all the buffered chars.
await ReadInputChars(cancellationToken);
}

var operationStatus = Utf8.FromUtf16(_charBuffer, readBuffer, out var charsRead, out var bytesWritten, isFinalBlock: false);
_charBuffer = _charBuffer.Slice(charsRead);

do
switch (operationStatus)
{
// If we had left-over bytes from a previous read, move it to the start of the buffer and read content in to
// the segment that follows.
var eof = false;
if (_charBuffer.Count == 0)
{
// Only read more content from the input stream if we have exhausted all the buffered chars.
eof = await ReadInputChars(cancellationToken);
}

// We need to flush on the last write. This is true when we exhaust the input Stream and any buffered content.
var allContentRead = eof && _charBuffer.Count == 0 && _byteBuffer.Count == 0;

if (_charBuffer.Count > 0 && readBuffer.Count < MaxByteCountForUTF8Char && readBuffer.Count < Encoding.UTF8.GetByteCount(_charBuffer.AsSpan(0, 1)))
{
// It's possible that the passed in buffer is smaller than the size required to encode a single
// char. For instance, the JsonSerializer may pass in a buffer of size 1 or 2 which
// is insufficient if the character requires more than 2 bytes to represent. In this case, read
// content in to an overflow buffer and fill up the passed in buffer.
_encoder.Convert(
_charBuffer,
_overflowBuffer.Array,
flush: false,
out var charsUsed,
out var bytesUsed,
out _);
case OperationStatus.Done:
return bytesWritten;

case OperationStatus.DestinationTooSmall:
if (bytesWritten != 0)
{
return bytesWritten;
}

_charBuffer = _charBuffer.Slice(charsUsed);
// Overflow buffer is always empty when we get here and we can use it's full length to write contents to.
Utf8.FromUtf16(_charBuffer, _overflowBuffer.Array, out var overFlowChars, out var overflowBytes, isFinalBlock: false);
Debug.Assert(overflowBytes > 0 && overFlowChars > 0, "We expect writes to the overflow buffer to always succeed since it is large enough to accomodate at least one char.");
Debug.Assert(readBuffer.Count < overflowBytes);

Debug.Assert(readBuffer.Count < bytesUsed);
_charBuffer = _charBuffer.Slice(overFlowChars);
_overflowBuffer.Array.AsSpan(0, readBuffer.Count).CopyTo(readBuffer);

_overflowBuffer = new ArraySegment<byte>(
_overflowBuffer.Array,
readBuffer.Count,
bytesUsed - readBuffer.Count);

totalBytes += readBuffer.Count;
// At this point we're done writing.
break;
}
else
{
_encoder.Convert(
_charBuffer,
readBuffer,
flush: allContentRead,
out var charsUsed,
out bytesEncoded,
out encoderCompleted);

totalBytes += bytesEncoded;
_charBuffer = _charBuffer.Slice(charsUsed);
readBuffer = readBuffer.Slice(bytesEncoded);
}

// We need to exit in one of the 2 conditions:
// * encoderCompleted will return false if "buffer" was too small for all the chars to be encoded.
// * no bytes were converted in an iteration. This can occur if there wasn't any input.
} while (encoderCompleted && bytesEncoded > 0);

return totalBytes;
overflowBytes - readBuffer.Count);

Debug.Assert(_overflowBuffer.Count != 0);

return readBuffer.Count;

default:
Debug.Fail("We should never see this");
throw new InvalidOperationException();
}
}

private async ValueTask<bool> ReadInputChars(CancellationToken cancellationToken)
private async Task ReadInputChars(CancellationToken cancellationToken)
{
// If we had left-over bytes from a previous read, move it to the start of the buffer and read content in to
// the segment that follows.
Expand All @@ -184,15 +168,12 @@ private async ValueTask<bool> ReadInputChars(CancellationToken cancellationToken
out _);

_byteBuffer = _byteBuffer.Slice(bytesUsed);

_charBuffer = new ArraySegment<char>(_charBuffer.Array, 0, charsUsed);

return readBytes == 0;
}

private static void ThrowArgumentOutOfRangeException(byte[] buffer, int offset, int count)
{
if (count <= 0)
if (count < 0)
{
throw new ArgumentOutOfRangeException(nameof(count));
}
Expand Down
72 changes: 44 additions & 28 deletions src/Mvc/Mvc.Core/src/Formatters/TranscodingWriteStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ public override void Flush()

public override async Task FlushAsync(CancellationToken cancellationToken)
{
await WriteAsync(ArraySegment<byte>.Empty, flush: true, cancellationToken);
await _stream.FlushAsync(cancellationToken);
}

Expand All @@ -73,12 +72,11 @@ public override Task WriteAsync(byte[] buffer, int offset, int count, Cancellati
{
ThrowArgumentException(buffer, offset, count);
var bufferSegment = new ArraySegment<byte>(buffer, offset, count);
return WriteAsync(bufferSegment, flush: false, cancellationToken);
return WriteAsync(bufferSegment, cancellationToken);
}

private async Task WriteAsync(
ArraySegment<byte> bufferSegment,
bool flush,
CancellationToken cancellationToken)
{
var decoderCompleted = false;
Expand All @@ -87,50 +85,43 @@ private async Task WriteAsync(
_decoder.Convert(
bufferSegment,
_charBuffer.AsSpan(_charsDecoded),
flush,
flush: false,
out var bytesDecoded,
out var charsDecoded,
out decoderCompleted);

_charsDecoded += charsDecoded;
bufferSegment = bufferSegment.Slice(bytesDecoded);

if (flush || !decoderCompleted)
if (!decoderCompleted)
{
// This is being invoked from FlushAsync or the char buffer is not large enough
// to accomodate all writes.
await WriteBufferAsync(flush, cancellationToken);
await WriteBufferAsync(cancellationToken);
}
}
}

private async Task WriteBufferAsync(bool flush, CancellationToken cancellationToken)
private async Task WriteBufferAsync(CancellationToken cancellationToken)
{
var encoderCompletd = false;
var encoderCompleted = false;
var charsWritten = 0;
var byteBuffer = ArrayPool<byte>.Shared.Rent(_maxByteBufferSize);

try
while (!encoderCompleted && charsWritten < _charsDecoded)
{
while (!encoderCompletd && charsWritten < _charsDecoded)
{
_encoder.Convert(
_charBuffer.AsSpan(charsWritten, _charsDecoded - charsWritten),
byteBuffer,
flush,
out var charsEncoded,
out var bytesUsed,
out encoderCompletd);

await _stream.WriteAsync(byteBuffer.AsMemory(0, bytesUsed), cancellationToken);
charsWritten += charsEncoded;
}
}
finally
{
ArrayPool<byte>.Shared.Return(byteBuffer);
_encoder.Convert(
_charBuffer.AsSpan(charsWritten, _charsDecoded - charsWritten),
byteBuffer,
flush: false,
out var charsEncoded,
out var bytesUsed,
out encoderCompleted);

await _stream.WriteAsync(byteBuffer.AsMemory(0, bytesUsed), cancellationToken);
charsWritten += charsEncoded;
}

ArrayPool<byte>.Shared.Return(byteBuffer);

// At this point, we've written all the buffered chars to the underlying Stream.
_charsDecoded = 0;
}
Expand Down Expand Up @@ -161,5 +152,30 @@ protected override void Dispose(bool disposing)
ArrayPool<char>.Shared.Return(_charBuffer);
}
}

public async Task FinalWriteAsync(CancellationToken cancellationToken)
{
// First write any buffered content
await WriteBufferAsync(cancellationToken);

// Now flush the encoder.
var byteBuffer = ArrayPool<byte>.Shared.Rent(_maxByteBufferSize);
var encoderCompleted = false;

while (!encoderCompleted)
{
_encoder.Convert(
Array.Empty<char>(),
byteBuffer,
flush: true,
out _,
out var bytesUsed,
out encoderCompleted);

await _stream.WriteAsync(byteBuffer.AsMemory(0, bytesUsed), cancellationToken);
}

ArrayPool<byte>.Shared.Return(byteBuffer);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.IO;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc.Core;
Expand Down Expand Up @@ -83,13 +84,21 @@ public async Task ExecuteAsync(ActionContext context, JsonResult result)

var type = value?.GetType() ?? typeof(object);
await JsonSerializer.SerializeAsync(writeStream, value, type, jsonSerializerOptions);

// The transcoding streams use Encoders and Decoders that have internal buffers. We need to flush these
// when there is no more data to be written. Stream.FlushAsync isn't suitable since it's
// acceptable to Flush a Stream (multiple times) prior to completion.
if (writeStream is TranscodingWriteStream transcodingStream)
{
await transcodingStream.FinalWriteAsync(CancellationToken.None);
}
await writeStream.FlushAsync();
}
finally
{
if (writeStream is TranscodingWriteStream transcoding)
if (writeStream is TranscodingWriteStream transcodingStream)
{
await transcoding.DisposeAsync();
await transcodingStream.DisposeAsync();
}
}
}
Expand Down
Loading

0 comments on commit 8144c5a

Please sign in to comment.