Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HTTP2: Ensure we flush when a window limit is hit #52797

Merged
merged 1 commit into from
May 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1452,7 +1452,7 @@ await PerformWriteAsync(totalSize, (thisRef: this, http2Stream, headerBytes, end
}
}

private async Task SendStreamDataAsync(int streamId, ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken)
private async Task SendStreamDataAsync(int streamId, ReadOnlyMemory<byte> buffer, bool finalFlush, CancellationToken cancellationToken)
{
ReadOnlyMemory<byte> remaining = buffer;

Expand All @@ -1464,17 +1464,30 @@ private async Task SendStreamDataAsync(int streamId, ReadOnlyMemory<byte> buffer

ReadOnlyMemory<byte> current;
(current, remaining) = SplitBuffer(remaining, frameSize);

bool flush = false;
if (finalFlush && remaining.Length == 0)
{
flush = true;
}
Comment on lines +1468 to +1472
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:

Suggested change
bool flush = false;
if (finalFlush && remaining.Length == 0)
{
flush = true;
}
bool flush = finalFlush && remaining.Length;


// Force a flush if we are out of credit, because we don't know that we will be sending more data any time soon
if (!_connectionWindow.IsCreditAvailable)
{
flush = true;
}

try
{
await PerformWriteAsync(FrameHeader.Size + current.Length, (thisRef: this, streamId, current), static (s, writeBuffer) =>
await PerformWriteAsync(FrameHeader.Size + current.Length, (thisRef: this, streamId, current, flush), static (s, writeBuffer) =>
{
// Invoked while holding the lock:
if (NetEventSource.Log.IsEnabled()) s.thisRef.Trace(s.streamId, $"Started writing. {nameof(writeBuffer.Length)}={writeBuffer.Length}");

FrameHeader.WriteTo(writeBuffer.Span, s.current.Length, FrameType.Data, FrameFlags.None, s.streamId);
s.current.CopyTo(writeBuffer.Slice(FrameHeader.Size));

return false; // no need to flush, as the request content may do so explicitly, or worst case we'll do so as part of the end data frame
return s.flush;
}, cancellationToken).ConfigureAwait(false);
}
catch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1223,12 +1223,19 @@ private async ValueTask SendDataAsync(ReadOnlyMemory<byte> buffer, CancellationT
while (buffer.Length > 0)
{
int sendSize = -1;
bool flush = false;
lock (_creditSyncObject)
{
if (_availableCredit > 0)
{
sendSize = Math.Min(buffer.Length, _availableCredit);
_availableCredit -= sendSize;

// Force a flush if we are out of credit, because we don't know that we will be sending more data any time soon
if (_availableCredit == 0)
{
flush = true;
}
}
else
{
Expand All @@ -1249,12 +1256,23 @@ private async ValueTask SendDataAsync(ReadOnlyMemory<byte> buffer, CancellationT
// Logically this is part of the else block above, but we can't await while holding the lock.
Debug.Assert(_creditWaiter != null);
sendSize = await _creditWaiter.AsValueTask().ConfigureAwait(false);

lock (_creditSyncObject)
{
// Force a flush if we are out of credit, because we don't know that we will be sending more data any time soon
if (_availableCredit == 0)
{
flush = true;
}
}
}

Debug.Assert(sendSize > 0);

ReadOnlyMemory<byte> current;
(current, buffer) = SplitBuffer(buffer, sendSize);

await _connection.SendStreamDataAsync(StreamId, current, _requestBodyCancellationSource.Token).ConfigureAwait(false);
await _connection.SendStreamDataAsync(StreamId, current, flush, _requestBodyCancellationSource.Token).ConfigureAwait(false);
}
}
finally
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1396,6 +1396,23 @@ public async Task GoAwayFrame_AbortAllPendingStreams_StreamFailWithExpectedExcep
}
}

private static async Task ReadExactDataSizeFromStream(Http2LoopbackConnection connection, int size, int streamId)
{
int bytesReceived = 0;
while (bytesReceived < size)
{
Frame frame = await connection.ReadFrameAsync(TimeSpan.FromSeconds(30));
Assert.Equal(streamId, frame.StreamId);
Assert.Equal(FrameType.Data, frame.Type);
Assert.Equal(FrameFlags.None, frame.Flags);
Assert.True(frame.Length > 0);

bytesReceived += frame.Length;
}

Assert.Equal(size, bytesReceived);
}

private static async Task<int> ReadToEndOfStream(Http2LoopbackConnection connection, int streamId)
{
int bytesReceived = 0;
Expand All @@ -1420,11 +1437,12 @@ private static async Task<int> ReadToEndOfStream(Http2LoopbackConnection connect
return bytesReceived;
}

const int DefaultInitialWindowSize = 65535;

[OuterLoop("Uses Task.Delay")]
[ConditionalFact(nameof(SupportsAlpn))]
public async Task Http2_FlowControl_ClientDoesNotExceedWindows()
{
const int InitialWindowSize = 65535;
const int ContentSize = 100_000;

var content = new ByteAtATimeContent(ContentSize);
Expand All @@ -1442,19 +1460,8 @@ public async Task Http2_FlowControl_ClientDoesNotExceedWindows()
Assert.Equal(FrameFlags.EndHeaders, frame.Flags);

// Receive up to initial window size
int bytesReceived = 0;
while (bytesReceived < InitialWindowSize)
{
frame = await connection.ReadFrameAsync(TimeSpan.FromSeconds(30));
Assert.Equal(streamId, frame.StreamId);
Assert.Equal(FrameType.Data, frame.Type);
Assert.Equal(FrameFlags.None, frame.Flags);
Assert.True(frame.Length > 0);

bytesReceived += frame.Length;
}

Assert.Equal(InitialWindowSize, bytesReceived);
int bytesReceived = DefaultInitialWindowSize;
await ReadExactDataSizeFromStream(connection, bytesReceived, streamId);

// Issue another read. It shouldn't complete yet. Wait a brief period of time to ensure it doesn't complete.
Task<Frame> readFrameTask = connection.ReadFrameAsync(TimeSpan.FromSeconds(30));
Expand Down Expand Up @@ -1528,7 +1535,6 @@ public async Task Http2_FlowControl_ClientDoesNotExceedWindows()
[ConditionalFact(nameof(SupportsAlpn))]
public async Task Http2_InitialWindowSize_ClientDoesNotExceedWindows()
{
const int DefaultInitialWindowSize = 65535;
const int ContentSize = DefaultInitialWindowSize + 1000;

var content = new ByteAtATimeContent(ContentSize);
Expand All @@ -1548,20 +1554,8 @@ public async Task Http2_InitialWindowSize_ClientDoesNotExceedWindows()
Assert.Equal(FrameType.Headers, frame.Type);
Assert.Equal(FrameFlags.EndHeaders, frame.Flags);

// Receive up to initial window size
int bytesReceived = 0;
while (bytesReceived < DefaultInitialWindowSize)
{
frame = await connection.ReadFrameAsync(TimeSpan.FromSeconds(30));
Assert.Equal(streamId, frame.StreamId);
Assert.Equal(FrameType.Data, frame.Type);
Assert.Equal(FrameFlags.None, frame.Flags);
Assert.True(frame.Length > 0);

bytesReceived += frame.Length;
}

Assert.Equal(DefaultInitialWindowSize, bytesReceived);
int bytesReceived = DefaultInitialWindowSize;
await ReadExactDataSizeFromStream(connection, bytesReceived, streamId);

// Issue another read. It shouldn't complete yet. Wait a brief period of time to ensure it doesn't complete.
Task<Frame> readFrameTask = connection.ReadFrameAsync(TimeSpan.FromSeconds(30));
Expand Down Expand Up @@ -1648,6 +1642,115 @@ public async Task Http2_InitialWindowSize_ClientDoesNotExceedWindows()
}
}

// Flush behavior is heuristic-based and may change in the future.
// Try various content sizes here to ensure we are not simply getting lucky with the flush heuristic.
[ConditionalTheory(nameof(SupportsAlpn))]
[InlineData(DefaultInitialWindowSize + 1)]
[InlineData(DefaultInitialWindowSize + 4 * 1024)]
[InlineData(DefaultInitialWindowSize + 8 * 1024)]
[InlineData(DefaultInitialWindowSize + 16 * 1024)]
[InlineData(DefaultInitialWindowSize + 32 * 1024)]
[InlineData(DefaultInitialWindowSize + 64 * 1024)]
[InlineData(DefaultInitialWindowSize + 96 * 1024)]
public async Task Http2_SendOverStreamWindowSizeWithoutExplicitFlush_ClientSendsUpToFullWindowSize(int contentSize)
{
var content = new ByteArrayContent(new byte[contentSize]);

using (Http2LoopbackServer server = Http2LoopbackServer.CreateServer())
using (HttpClient client = CreateHttpClient())
{
Task<HttpResponseMessage> clientTask = client.PostAsync(server.Address, content);

Http2LoopbackConnection connection = await server.EstablishConnectionAsync();

// Bump connection window so it won't block the client.
await connection.WriteFrameAsync(new WindowUpdateFrame(contentSize - DefaultInitialWindowSize, 0));

Frame frame = await connection.ReadFrameAsync(TimeSpan.FromSeconds(30));
int streamId = frame.StreamId;
Assert.Equal(FrameType.Headers, frame.Type);
Assert.Equal(FrameFlags.EndHeaders, frame.Flags);

// Bump stream window so the client should send the entire body minus one byte.
if ((contentSize - 1) > DefaultInitialWindowSize)
{
await connection.WriteFrameAsync(new WindowUpdateFrame((contentSize - 1) - DefaultInitialWindowSize, streamId));
}

int bytesReceived = contentSize - 1;
await ReadExactDataSizeFromStream(connection, bytesReceived, streamId);

// Bump stream window so the client should send the final byte.
await connection.WriteFrameAsync(new WindowUpdateFrame(1, streamId));

// Read to end of stream
bytesReceived += await ReadToEndOfStream(connection, streamId);

Assert.Equal(contentSize, bytesReceived);

await connection.SendDefaultResponseAsync(streamId);

HttpResponseMessage response = await clientTask;
Assert.Equal(HttpStatusCode.OK, response.StatusCode);
}
}

// Flush behavior is heuristic-based and may change in the future.
// Try various content sizes here to ensure we are not simply getting lucky with the flush heuristic.
[ConditionalTheory(nameof(SupportsAlpn))]
[InlineData(DefaultInitialWindowSize + 1)]
[InlineData(DefaultInitialWindowSize + 4 * 1024)]
[InlineData(DefaultInitialWindowSize + 8 * 1024)]
[InlineData(DefaultInitialWindowSize + 16 * 1024)]
[InlineData(DefaultInitialWindowSize + 32 * 1024)]
[InlineData(DefaultInitialWindowSize + 64 * 1024)]
[InlineData(DefaultInitialWindowSize + 96 * 1024)]
public async Task Http2_SendOverConnectionWindowSizeWithoutExplicitFlush_ClientSendsUpToFullWindowSize(int contentSize)
{
var content = new ByteArrayContent(new byte[contentSize]);

using (Http2LoopbackServer server = Http2LoopbackServer.CreateServer())
using (HttpClient client = CreateHttpClient())
{
Task<HttpResponseMessage> clientTask = client.PostAsync(server.Address, content);

Http2LoopbackConnection connection = await server.EstablishConnectionAsync();

// Bump connection window so the client should send the entire body minus one byte.
if ((contentSize - 1) > DefaultInitialWindowSize)
{
await connection.WriteFrameAsync(new WindowUpdateFrame((contentSize - 1) - DefaultInitialWindowSize, 0));
}

Frame frame = await connection.ReadFrameAsync(TimeSpan.FromSeconds(30));
int streamId = frame.StreamId;
Assert.Equal(FrameType.Headers, frame.Type);
Assert.Equal(FrameFlags.EndHeaders, frame.Flags);

// Bump stream window so it won't block the client.
await connection.WriteFrameAsync(new WindowUpdateFrame(contentSize - DefaultInitialWindowSize, streamId));

// Receive up to window size (i.e. contentSize - 1)
int bytesReceived = contentSize - 1;
await ReadExactDataSizeFromStream(connection, bytesReceived, streamId);

Assert.Equal((contentSize - 1), bytesReceived);

// Bump connection window so the client should send the final byte.
await connection.WriteFrameAsync(new WindowUpdateFrame(1, 0));

// Read to end of stream
bytesReceived += await ReadToEndOfStream(connection, streamId);

Assert.Equal(contentSize, bytesReceived);

await connection.SendDefaultResponseAsync(streamId);

HttpResponseMessage response = await clientTask;
Assert.Equal(HttpStatusCode.OK, response.StatusCode);
}
}

public static IEnumerable<object[]> KeepAliveTestDataSource()
{
yield return new object[] { Timeout.InfiniteTimeSpan, HttpKeepAlivePingPolicy.Always, false };
Expand Down Expand Up @@ -1901,8 +2004,7 @@ public async Task Http2_WaitingOnWindowCredit_Cancellation()
// The goal of this test is to get the client into the state where it has sent the headers,
// but is waiting on window credit before it will send the body. We then issue a cancellation
// to ensure the request is cancelled as expected.
const int InitialWindowSize = 65535;
const int ContentSize = InitialWindowSize + 1;
const int ContentSize = DefaultInitialWindowSize + 1;

HttpClientHandler handler = CreateHttpClientHandler();
handler.ServerCertificateCustomValidationCallback = TestHelper.AllowAllCertificates;
Expand All @@ -1923,17 +2025,8 @@ public async Task Http2_WaitingOnWindowCredit_Cancellation()
Assert.Equal(FrameFlags.EndHeaders, frame.Flags);

// Receive up to initial window size
int bytesReceived = 0;
while (bytesReceived < InitialWindowSize)
{
frame = await connection.ReadFrameAsync(TimeSpan.FromSeconds(30));
Assert.Equal(streamId, frame.StreamId);
Assert.Equal(FrameType.Data, frame.Type);
Assert.Equal(FrameFlags.None, frame.Flags);
Assert.True(frame.Length > 0);

bytesReceived += frame.Length;
}
int bytesReceived = DefaultInitialWindowSize;
await ReadExactDataSizeFromStream(connection, bytesReceived, streamId);

// The client is waiting for more credit in order to send the last byte of the
// request body. Test cancellation at this point.
Expand All @@ -1957,8 +2050,7 @@ public async Task Http2_PendingSend_Cancellation()
{
// The goal of this test is to get the client into the state where it is sending content,
// but the send pends because the TCP window is full.
const int InitialWindowSize = 65535;
const int ContentSize = InitialWindowSize * 2; // Double the default TCP window size.
const int ContentSize = DefaultInitialWindowSize * 2; // Double the default TCP window size.

var content = new ByteArrayContent(TestHelper.GenerateRandomContent(ContentSize));

Expand All @@ -1978,7 +2070,7 @@ public async Task Http2_PendingSend_Cancellation()

// Increase the size of the HTTP/2 Window, so that it is large enough to fill the
// TCP window when we do not perform any reads on the server side.
await connection.WriteFrameAsync(new WindowUpdateFrame(InitialWindowSize, streamId));
await connection.WriteFrameAsync(new WindowUpdateFrame(DefaultInitialWindowSize, streamId));

// Give the client time to read the window update frame, and for the write to pend.
await Task.Delay(1000);
Expand Down