Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[QUIC] QuicStream reading/writing work #90253

Merged
merged 24 commits into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
844c88e
Added asserts to send buffer helper
ManickaP Jul 15, 2023
ddb8ee7
Postpone confirming the last RECEIVE event until the data are read
ManickaP Jul 15, 2023
bd5a627
Removed lock
ManickaP Jul 24, 2023
08f1bae
Debug tests
ManickaP Jul 24, 2023
a3ae2b7
ReadsClosed test fixed, and some better logging
ManickaP Jul 28, 2023
aeb7495
Final task keep alive, abort order, timeout for graceful write-side s…
ManickaP Aug 8, 2023
9d9334f
Tests
ManickaP Aug 9, 2023
80185f0
Always wait for SEND_COMPLETE
ManickaP Aug 9, 2023
33e18ed
Exclude BigPayload on platforms where it can OOM
ManickaP Aug 10, 2023
f6da31e
Removed unintended code changes
ManickaP Aug 11, 2023
79023fe
Reverted postponing reading FIN, if data have chance to get buffered …
ManickaP Aug 11, 2023
3b24271
Clean ups
ManickaP Aug 11, 2023
e9655b7
Fixed waiting for SEND_COMPLETE
ManickaP Aug 11, 2023
2800015
Hold back setting FinalTaskSource and overwrite result if no waiter i…
ManickaP Aug 12, 2023
0c35a2b
Cancellation and completion
ManickaP Aug 12, 2023
d3ba302
Comments, fixed FinalTaskSource
ManickaP Aug 12, 2023
5c33e2e
Fix assert
ManickaP Aug 14, 2023
77b7636
Test reseting control stream made more resilient
ManickaP Aug 14, 2023
e4c2f17
Merge branch 'main' into mapichov/quic-stream
ManickaP Sep 4, 2023
d6f5061
Attempt to fix still running write while disposing the stream in case…
ManickaP Sep 5, 2023
dbd5aec
Merge branch 'main' into mapichov/quic-stream
ManickaP Sep 6, 2023
8d22715
Attempt to fix stress build
ManickaP Sep 6, 2023
f6261e9
Sync Dispose in H3Stream waits for read and write as well
ManickaP Sep 12, 2023
9f626a8
Merge branch 'main' into mapichov/quic-stream
ManickaP Sep 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ internal sealed class Http3Connection : HttpConnectionBase

// Our control stream.
private QuicStream? _clientControl;
private Task _sendSettingsTask;

// Server-advertised SETTINGS_MAX_FIELD_SECTION_SIZE
// https://www.rfc-editor.org/rfc/rfc9114.html#section-7.2.4.1-2.2.1
Expand Down Expand Up @@ -88,7 +89,7 @@ public Http3Connection(HttpConnectionPool pool, HttpAuthority authority, QuicCon
}

// Errors are observed via Abort().
_ = SendSettingsAsync();
_sendSettingsTask = SendSettingsAsync();

// This process is cleaned up when _connection is disposed, and errors are observed via Abort().
_ = AcceptStreamsAsync();
Expand Down Expand Up @@ -150,6 +151,7 @@ private void CheckForShutdown()

if (_clientControl != null)
{
await _sendSettingsTask.ConfigureAwait(false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible this task throws? E.g. connection closed before we were able to send settings? Or it is impossible at this point in time?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, SendSettingsAsync catches all exceptions and calls Abort on them:

await _clientControl.DisposeAsync().ConfigureAwait(false);
_clientControl = null;
}
Expand Down Expand Up @@ -486,7 +488,7 @@ private async Task ProcessServerStreamAsync(QuicStream stream)

if (bytesRead == 0)
{
// https://quicwg.org/base-drafts/draft-ietf-quic-http.html#name-unidirectional-streams
// https://www.rfc-editor.org/rfc/rfc9114.html#name-unidirectional-streams
// A sender can close or reset a unidirectional stream unless otherwise specified. A receiver MUST
// tolerate unidirectional streams being closed or reset prior to the reception of the unidirectional
// stream header.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ internal sealed class Http3RequestStream : IHttpStreamHeadersHandler, IAsyncDisp
private TaskCompletionSource<bool>? _expect100ContinueCompletionSource; // True indicates we should send content (e.g. received 100 Continue).
private bool _disposed;
private readonly CancellationTokenSource _requestBodyCancellationSource;
private Task? _sendRequestTask; // Set with SendContentAsync, must be awaited before QuicStream.DisposeAsync();
private Task? _readResponseTask; // Set with ReadResponseAsync, must be awaited before QuicStream.DisposeAsync();

// Allocated when we receive a :status header.
private HttpResponseMessage? _response;
Expand Down Expand Up @@ -88,9 +90,25 @@ public void Dispose()
{
_disposed = true;
AbortStream();
// We aborted both sides, thus both task should unblock and should be finished before disposing the QuicStream.
WaitUnfinished(_sendRequestTask);
WaitUnfinished(_readResponseTask);
_stream.Dispose();
DisposeSyncHelper();
}

static void WaitUnfinished(Task? task)
{
if (task is not null && !task.IsCompleted)
{
try
{
task.GetAwaiter().GetResult();
}
catch // Exceptions from both tasks are logged via _connection.LogException() in case they're not awaited in SendAsync, so the exception can be ignored here.
{ }
}
}
}

private void RemoveFromConnectionIfDone()
Expand All @@ -107,9 +125,25 @@ public async ValueTask DisposeAsync()
{
_disposed = true;
AbortStream();
// We aborted both sides, thus both task should unblock and should be finished before disposing the QuicStream.
await AwaitUnfinished(_sendRequestTask).ConfigureAwait(false);
await AwaitUnfinished(_readResponseTask).ConfigureAwait(false);
await _stream.DisposeAsync().ConfigureAwait(false);
DisposeSyncHelper();
}

static async ValueTask AwaitUnfinished(Task? task)
{
if (task is not null && !task.IsCompleted)
{
try
{
await task.ConfigureAwait(false);
}
catch // Exceptions from both tasks are logged via _connection.LogException() in case they're not awaited in SendAsync, so the exception can be ignored here.
{ }
}
}
}

private void DisposeSyncHelper()
Expand Down Expand Up @@ -158,52 +192,51 @@ public async Task<HttpResponseMessage> SendAsync(CancellationToken cancellationT
await FlushSendBufferAsync(endStream: _request.Content == null, _requestBodyCancellationSource.Token).ConfigureAwait(false);
}

Task sendContentTask;
if (_request.Content != null)
{
sendContentTask = SendContentAsync(_request.Content!, _requestBodyCancellationSource.Token);
_sendRequestTask = SendContentAsync(_request.Content!, _requestBodyCancellationSource.Token);
}
else
{
sendContentTask = Task.CompletedTask;
_sendRequestTask = Task.CompletedTask;
}

// In parallel, send content and read response.
// Depending on Expect 100 Continue usage, one will depend on the other making progress.
Task readResponseTask = ReadResponseAsync(_requestBodyCancellationSource.Token);
_readResponseTask = ReadResponseAsync(_requestBodyCancellationSource.Token);
bool sendContentObserved = false;

// If we're not doing duplex, wait for content to finish sending here.
// If we are doing duplex and have the unlikely event that it completes here, observe the result.
// See Http2Connection.SendAsync for a full comment on this logic -- it is identical behavior.
if (sendContentTask.IsCompleted ||
if (_sendRequestTask.IsCompleted ||
_request.Content?.AllowDuplex != true ||
await Task.WhenAny(sendContentTask, readResponseTask).ConfigureAwait(false) == sendContentTask ||
sendContentTask.IsCompleted)
await Task.WhenAny(_sendRequestTask, _readResponseTask).ConfigureAwait(false) == _sendRequestTask ||
_sendRequestTask.IsCompleted)
{
try
{
await sendContentTask.ConfigureAwait(false);
await _sendRequestTask.ConfigureAwait(false);
sendContentObserved = true;
}
catch
{
// Exceptions will be bubbled up from sendContentTask here,
// which means the result of readResponseTask won't be observed directly:
// Exceptions will be bubbled up from _sendRequestTask here,
// which means the result of _readResponseTask won't be observed directly:
// Do a background await to log any exceptions.
_connection.LogExceptions(readResponseTask);
_connection.LogExceptions(_readResponseTask);
throw;
}
}
else
{
// Duplex is being used, so we can't wait for content to finish sending.
// Do a background await to log any exceptions.
_connection.LogExceptions(sendContentTask);
_connection.LogExceptions(_sendRequestTask);
}

// Wait for the response headers to be read.
await readResponseTask.ConfigureAwait(false);
await _readResponseTask.ConfigureAwait(false);

Debug.Assert(_response != null && _response.Content != null);
// Set our content stream.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1664,10 +1664,17 @@ public async Task ServerSendsTrailingHeaders_Success()

}

public enum CloseOutboundControlStream
{
BogusData,
Dispose,
Abort,
}
[Theory]
[InlineData(true)]
[InlineData(false)]
public async Task ServerClosesOutboundControlStream_ClientClosesConnection(bool graceful)
[InlineData(CloseOutboundControlStream.BogusData)]
[InlineData(CloseOutboundControlStream.Dispose)]
[InlineData(CloseOutboundControlStream.Abort)]
public async Task ServerClosesOutboundControlStream_ClientClosesConnection(CloseOutboundControlStream closeType)
{
using Http3LoopbackServer server = CreateHttp3LoopbackServer();

Expand All @@ -1680,13 +1687,31 @@ public async Task ServerClosesOutboundControlStream_ClientClosesConnection(bool
await using Http3LoopbackStream requestStream = await connection.AcceptRequestStreamAsync();

// abort the control stream
if (graceful)
if (closeType == CloseOutboundControlStream.BogusData)
{
await connection.OutboundControlStream.SendResponseBodyAsync(Array.Empty<byte>(), isFinal: true);
}
else
else if (closeType == CloseOutboundControlStream.Dispose)
{
connection.OutboundControlStream.Abort(Http3LoopbackConnection.H3_INTERNAL_ERROR);
await connection.OutboundControlStream.DisposeAsync();
}
else if (closeType == CloseOutboundControlStream.Abort)
{
int iterations = 5;
while (iterations-- > 0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are iterations and delay needed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Depending on timing, the client can receive the control stream abort, i.e. RESET_STREAM, before it even read the HTTP/3 stream type from it, see

private async Task ProcessServerStreamAsync(QuicStream stream)
for details.

QUIC spec says, RESET can discard data, which can happen in this case. Add to it H/3 spec, which says that unrecognized or aborted stream should be ignored:

try
{
bytesRead = await stream.ReadAsync(buffer.AvailableMemory, CancellationToken.None).ConfigureAwait(false);
}
catch (QuicException ex) when (ex.QuicError == QuicError.StreamAborted)
{
// Treat identical to receiving 0. See below comment.
bytesRead = 0;
}

Leading to ignoring the incoming control stream, because client doesn't know it's control, instead of reacting to the abort and closing the connection.

So this fix attempts to do the again, but with slight delay between sending the data and abort, line 1710.

And why this worked before: we would return data for one QuicStream.ReadAsync before propagating the error, not the whole buffer, just one call to read succeed, which was enough to get that 1 byte saying it's control. Also, MsQuic could always discard the data, we just never observed it (timing, too fast on the same machine...)

{
connection.OutboundControlStream.Abort(Http3LoopbackConnection.H3_INTERNAL_ERROR);
// This sends RESET_FRAME which might cause complete discard of any data including stream type, leading to client ignoring the stream.
// Attempt to establish the control stream again then.
if (await semaphore.WaitAsync(100))
{
// Client finished with the expected error.
return;
}
await connection.OutboundControlStream.DisposeAsync();
await connection.EstablishControlStreamAsync(Array.Empty<SettingsEntry>());
await Task.Delay(100);
}
}

// wait for client task before tearing down the requestStream and connection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@
Define this here because the SDK resets it
unconditionally in Microsoft.NETCoreSdk.BundledVersions.props.
-->
<NETCoreAppMaximumVersion>8.0</NETCoreAppMaximumVersion>
<NETCoreAppMaximumVersion>9.0</NETCoreAppMaximumVersion>
</PropertyGroup>
</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
## Usage:
## ./build-local.ps1 [StressConfiguration] [LibrariesConfiguration]

$Version="8.0"
$Version="9.0"
$RepoRoot="$(git rev-parse --show-toplevel)"
$DailyDotnetRoot= "./.dotnet-daily"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
## Usage:
## ./build-local.sh [StressConfiguration] [LibrariesConfiguration]

version=8.0
version=9.0
repo_root=$(git rev-parse --show-toplevel)
daily_dotnet_root=./.dotnet-daily

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// The .NET Foundation licenses this file to you under the MIT license.

using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.InteropServices;
using Microsoft.Quic;

Expand Down Expand Up @@ -32,8 +33,8 @@ private void FreeNativeMemory()
{
QUIC_BUFFER* buffers = _buffers;
_buffers = null;
NativeMemory.Free(buffers);
_count = 0;
NativeMemory.Free(buffers);
}

private void Reserve(int count)
Expand All @@ -48,6 +49,10 @@ private void Reserve(int count)

private void SetBuffer(int index, ReadOnlyMemory<byte> buffer)
{
Debug.Assert(index < _count);
Debug.Assert(_buffers[index].Buffer is null);
Debug.Assert(_buffers[index].Length == 0);

_buffers[index].Buffer = (byte*)NativeMemory.Alloc((nuint)buffer.Length, (nuint)sizeof(byte));
_buffers[index].Length = (uint)buffer.Length;
buffer.Span.CopyTo(_buffers[index].Span);
Expand Down Expand Up @@ -93,8 +98,8 @@ public void Reset()
}
byte* buffer = _buffers[i].Buffer;
_buffers[i].Buffer = null;
NativeMemory.Free(buffer);
_buffers[i].Length = 0;
NativeMemory.Free(buffer);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
using System.Security.Cryptography.X509Certificates;
using System.Threading;
using Microsoft.Quic;
using static Microsoft.Quic.MsQuic;

namespace System.Net.Quic;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ public override string ToString()
=> $"{{ {nameof(SEND_SHUTDOWN_COMPLETE.Graceful)} = {SEND_SHUTDOWN_COMPLETE.Graceful} }}",
QUIC_STREAM_EVENT_TYPE.SHUTDOWN_COMPLETE
=> $"{{ {nameof(SHUTDOWN_COMPLETE.ConnectionShutdown)} = {SHUTDOWN_COMPLETE.ConnectionShutdown}, {nameof(SHUTDOWN_COMPLETE.ConnectionShutdownByApp)} = {SHUTDOWN_COMPLETE.ConnectionShutdownByApp}, {nameof(SHUTDOWN_COMPLETE.ConnectionClosedRemotely)} = {SHUTDOWN_COMPLETE.ConnectionClosedRemotely}, {nameof(SHUTDOWN_COMPLETE.ConnectionErrorCode)} = {SHUTDOWN_COMPLETE.ConnectionErrorCode}, {nameof(SHUTDOWN_COMPLETE.ConnectionCloseStatus)} = {SHUTDOWN_COMPLETE.ConnectionCloseStatus} }}",
QUIC_STREAM_EVENT_TYPE.IDEAL_SEND_BUFFER_SIZE
=> $"{{ {nameof(IDEAL_SEND_BUFFER_SIZE.ByteCount)} = {IDEAL_SEND_BUFFER_SIZE.ByteCount} }}",
_ => string.Empty
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@
// The .NET Foundation licenses this file to you under the MIT license.

#if DEBUG
using System.Collections.Generic;
using System.IO;
using System.Runtime.InteropServices;
using System.Text;
using System.Threading;
using Microsoft.Quic;
using static Microsoft.Quic.MsQuic;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public int CopyFrom(ReadOnlySpan<QUIC_BUFFER> quicBuffers, int totalLength, bool
}
}

public int CopyTo(Memory<byte> buffer, out bool isCompleted, out bool isEmpty)
public int CopyTo(Memory<byte> buffer, out bool completed, out bool empty)
{
lock (_syncRoot)
{
Expand All @@ -79,8 +79,8 @@ public int CopyTo(Memory<byte> buffer, out bool isCompleted, out bool isEmpty)
_buffer.Discard(copied);
}

isCompleted = _buffer.IsEmpty && _final;
isEmpty = _buffer.IsEmpty;
completed = _buffer.IsEmpty && _final;
empty = _buffer.IsEmpty;

return copied;
}
Expand Down
Loading
Loading