diff --git a/src/libraries/Common/src/System/Net/ArrayBuffer.cs b/src/libraries/Common/src/System/Net/ArrayBuffer.cs index fc0bbeb099bfd..6c536d03267a0 100644 --- a/src/libraries/Common/src/System/Net/ArrayBuffer.cs +++ b/src/libraries/Common/src/System/Net/ArrayBuffer.cs @@ -58,10 +58,12 @@ public void Dispose() public int ActiveLength => _availableStart - _activeStart; public Span ActiveSpan => new Span(_bytes, _activeStart, _availableStart - _activeStart); public ReadOnlySpan ActiveReadOnlySpan => new ReadOnlySpan(_bytes, _activeStart, _availableStart - _activeStart); - public int AvailableLength => _bytes.Length - _availableStart; - public Span AvailableSpan => new Span(_bytes, _availableStart, AvailableLength); public Memory ActiveMemory => new Memory(_bytes, _activeStart, _availableStart - _activeStart); - public Memory AvailableMemory => new Memory(_bytes, _availableStart, _bytes.Length - _availableStart); + + public int AvailableLength => _bytes.Length - _availableStart; + public Span AvailableSpan => _bytes.AsSpan(_availableStart); + public Memory AvailableMemory => _bytes.AsMemory(_availableStart); + public Memory AvailableMemorySliced(int length) => new Memory(_bytes, _availableStart, length); public int Capacity => _bytes.Length; diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http2Connection.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http2Connection.cs index 9af644504a03e..846ed88bc39e6 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http2Connection.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http2Connection.cs @@ -30,8 +30,6 @@ internal sealed partial class Http2Connection : HttpConnectionBase, IDisposable [ThreadStatic] private static string[]? t_headerValues; - private int _currentWriteSize; // as passed to StartWriteAsync - private readonly HPackDecoder _hpackDecoder; private readonly Dictionary _httpStreams; @@ -145,7 +143,7 @@ public async ValueTask SetupAsync() // Send initial connection-level WINDOW_UPDATE FrameHeader.WriteTo(_outgoingBuffer.AvailableSpan, FrameHeader.WindowUpdateLength, FrameType.WindowUpdate, FrameFlags.None, streamId: 0); _outgoingBuffer.Commit(FrameHeader.Size); - BinaryPrimitives.WriteUInt32BigEndian(_outgoingBuffer.AvailableSpan, (ConnectionWindowSize - DefaultInitialWindowSize)); + BinaryPrimitives.WriteUInt32BigEndian(_outgoingBuffer.AvailableSpan, ConnectionWindowSize - DefaultInitialWindowSize); _outgoingBuffer.Commit(4); await _stream.WriteAsync(_outgoingBuffer.ActiveMemory).ConfigureAwait(false); @@ -584,7 +582,7 @@ private void ChangeMaxConcurrentStreams(uint newValue) // The value is provided as a uint. // Limit this to int.MaxValue since the CreditManager implementation only supports singed values. // In practice, we should never reach this value. - int effectiveValue = (newValue > (uint)int.MaxValue ? int.MaxValue : (int)newValue); + int effectiveValue = newValue > (uint)int.MaxValue ? int.MaxValue : (int)newValue; int delta = effectiveValue - _maxConcurrentStreams; _maxConcurrentStreams = effectiveValue; @@ -754,13 +752,15 @@ private void ProcessGoAwayFrame(FrameHeader frameHeader) _incomingBuffer.Discard(frameHeader.PayloadLength); } - internal async Task FlushAsync(CancellationToken cancellationToken = default) - { - await StartWriteAsync(0, cancellationToken).ConfigureAwait(false); - FinishWrite(FlushTiming.Now); - } + internal Task FlushAsync(CancellationToken cancellationToken) => + PerformWriteAsync(0, 0, (_, __) => FlushTiming.Now, cancellationToken); - private async ValueTask> StartWriteAsync(int writeBytes, CancellationToken cancellationToken = default) + /// Performs a write operation serialized via the . + /// The number of bytes to be written. + /// The state to pass through to the callbacks. + /// The action to be invoked while the writer lock is held and that actually writes the data to the provided buffer. + /// The cancellation token to use while waiting. + private async Task PerformWriteAsync(int writeBytes, T state, Func, FlushTiming> lockedAction, CancellationToken cancellationToken = default) { if (NetEventSource.IsEnabled) Trace($"{nameof(writeBytes)}={writeBytes}"); @@ -805,7 +805,7 @@ private async ValueTask> StartWriteAsync(int writeBytes, Cancellati ThrowRequestAborted(_abortException); } - // Flush anything necessary, and return back the write buffer to use. + // Flush waiting state, then invoke the supplied action. try { // If there is a pending write that was canceled while in progress, wait for it to complete. @@ -815,117 +815,87 @@ private async ValueTask> StartWriteAsync(int writeBytes, Cancellati _inProgressWrite = null; } - int totalBufferLength = _outgoingBuffer.Capacity; - int activeBufferLength = _outgoingBuffer.ActiveLength; - // If the buffer has already grown to 32k, does not have room for the next request, // and is non-empty, flush the current contents to the wire. - if (totalBufferLength >= UnflushedOutgoingBufferSize && - writeBytes >= totalBufferLength - activeBufferLength && - activeBufferLength > 0) + int totalBufferLength = _outgoingBuffer.Capacity; + if (totalBufferLength >= UnflushedOutgoingBufferSize) { - // We explicitly do not pass cancellationToken here, as this flush impacts more than just this operation. - await new ValueTask(FlushOutgoingBytesAsync()).ConfigureAwait(false); // await ValueTask to minimize number of awaiter fields + int activeBufferLength = _outgoingBuffer.ActiveLength; + if (writeBytes >= totalBufferLength - activeBufferLength && activeBufferLength > 0) + { + // We explicitly do not pass cancellationToken here, as this flush impacts more than just this operation. + await new ValueTask(FlushOutgoingBytesAsync()).ConfigureAwait(false); // await ValueTask to minimize number of awaiter fields + } } + // Invoke the callback with the supplied state and the target write buffer. _outgoingBuffer.EnsureAvailableSpace(writeBytes); - Memory writeBuffer = _outgoingBuffer.AvailableMemory.Slice(0, writeBytes); - _currentWriteSize = writeBytes; + FlushTiming flush = lockedAction(state, _outgoingBuffer.AvailableMemorySliced(writeBytes)); - return writeBuffer; + // Finish the write + _outgoingBuffer.Commit(writeBytes); + _lastPendingWriterShouldFlush |= flush == FlushTiming.AfterPendingWrites; + EndWrite(forceFlush: flush == FlushTiming.Now); } - catch + finally { _writerLock.Exit(); - throw; } } - /// Flushes buffered bytes to the wire. - /// When a flush should be performed for this write. - /// - /// Writes here need to be atomic, so as to avoid killing the whole connection. - /// Callers must hold the write lock, which this will release. - /// - private void FinishWrite(FlushTiming flush) - { - if (NetEventSource.IsEnabled) Trace($"{nameof(flush)}={flush}"); - - // We can't validate that we hold the mutex, but we can at least validate that someone is holding it. - Debug.Assert(_writerLock.IsHeld); - - _outgoingBuffer.Commit(_currentWriteSize); - _lastPendingWriterShouldFlush |= (flush == FlushTiming.AfterPendingWrites); - EndWrite(forceFlush: (flush == FlushTiming.Now)); - } - - private void CancelWrite() - { - if (NetEventSource.IsEnabled) Trace(""); - - // We can't validate that we hold the mutex, but we can at least validate that someone is holding it. - Debug.Assert(_writerLock.IsHeld); - - EndWrite(forceFlush: false); - } - private void EndWrite(bool forceFlush) { // We can't validate that we hold the mutex, but we can at least validate that someone is holding it. Debug.Assert(_writerLock.IsHeld); - try + // We must flush if the caller requires it or if this or a recent frame wanted to be flushed + // once there were no more pending writers that themselves could have forced the flush. + if (forceFlush || (_pendingWriters == 0 && _lastPendingWriterShouldFlush)) { - // We must flush if the caller requires it or if this or a recent frame wanted to be flushed - // once there were no more pending writers that themselves could have forced the flush. - if (forceFlush || (_pendingWriters == 0 && _lastPendingWriterShouldFlush)) + Debug.Assert(_inProgressWrite == null); + if (_outgoingBuffer.ActiveLength > 0) { - Debug.Assert(_inProgressWrite == null); - if (_outgoingBuffer.ActiveLength > 0) - { - _inProgressWrite = FlushOutgoingBytesAsync(); - } + _inProgressWrite = FlushOutgoingBytesAsync(); } } - finally - { - _writerLock.Exit(); - } } - private async Task SendSettingsAckAsync() - { - Memory writeBuffer = await StartWriteAsync(FrameHeader.Size).ConfigureAwait(false); - if (NetEventSource.IsEnabled) Trace("Started writing."); + private Task SendSettingsAckAsync() => + PerformWriteAsync(FrameHeader.Size, this, (thisRef, writeBuffer) => + { + if (NetEventSource.IsEnabled) thisRef.Trace("Started writing."); - FrameHeader.WriteTo(writeBuffer.Span, 0, FrameType.Settings, FrameFlags.Ack, streamId: 0); + FrameHeader.WriteTo(writeBuffer.Span, 0, FrameType.Settings, FrameFlags.Ack, streamId: 0); - FinishWrite(FlushTiming.AfterPendingWrites); - } + return FlushTiming.AfterPendingWrites; + }); /// The 8-byte ping content to send, read as a big-endian integer. - private async Task SendPingAckAsync(long pingContent) - { - Memory writeBuffer = await StartWriteAsync(FrameHeader.Size + FrameHeader.PingLength).ConfigureAwait(false); - if (NetEventSource.IsEnabled) Trace("Started writing."); + private Task SendPingAckAsync(long pingContent) => + PerformWriteAsync(FrameHeader.Size + FrameHeader.PingLength, (thisRef: this, pingContent), (state, writeBuffer) => + { + if (NetEventSource.IsEnabled) state.thisRef.Trace("Started writing."); - Debug.Assert(sizeof(long) == FrameHeader.PingLength); - FrameHeader.WriteTo(writeBuffer.Span, FrameHeader.PingLength, FrameType.Ping, FrameFlags.Ack, streamId: 0); - BinaryPrimitives.WriteInt64BigEndian(writeBuffer.Span.Slice(FrameHeader.Size), pingContent); + Debug.Assert(sizeof(long) == FrameHeader.PingLength); - FinishWrite(FlushTiming.AfterPendingWrites); - } + Span span = writeBuffer.Span; + FrameHeader.WriteTo(span, FrameHeader.PingLength, FrameType.Ping, FrameFlags.Ack, streamId: 0); + BinaryPrimitives.WriteInt64BigEndian(span.Slice(FrameHeader.Size), state.pingContent); - private async Task SendRstStreamAsync(int streamId, Http2ProtocolErrorCode errorCode) - { - Memory writeBuffer = await StartWriteAsync(FrameHeader.Size + FrameHeader.RstStreamLength).ConfigureAwait(false); - if (NetEventSource.IsEnabled) Trace(streamId, $"Started writing. {nameof(errorCode)}={errorCode}"); + return FlushTiming.AfterPendingWrites; + }); - FrameHeader.WriteTo(writeBuffer.Span, FrameHeader.RstStreamLength, FrameType.RstStream, FrameFlags.None, streamId); - BinaryPrimitives.WriteInt32BigEndian(writeBuffer.Span.Slice(FrameHeader.Size), (int)errorCode); + private Task SendRstStreamAsync(int streamId, Http2ProtocolErrorCode errorCode) => + PerformWriteAsync(FrameHeader.Size + FrameHeader.RstStreamLength, (thisRef: this, streamId, errorCode), (s, writeBuffer) => + { + if (NetEventSource.IsEnabled) s.thisRef.Trace(s.streamId, $"Started writing. {nameof(s.errorCode)}={s.errorCode}"); - FinishWrite(FlushTiming.Now); // ensure cancellation is seen as soon as possible - } + Span span = writeBuffer.Span; + FrameHeader.WriteTo(span, FrameHeader.RstStreamLength, FrameType.RstStream, FrameFlags.None, s.streamId); + BinaryPrimitives.WriteInt32BigEndian(span.Slice(FrameHeader.Size), (int)s.errorCode); + + return FlushTiming.Now; // ensure cancellation is seen as soon as possible + }); private static (ReadOnlyMemory first, ReadOnlyMemory rest) SplitBuffer(ReadOnlyMemory buffer, int maxSize) => buffer.Length > maxSize ? @@ -1245,63 +1215,67 @@ private async ValueTask SendHeadersAsync(HttpRequestMessage request // Start the write. This serializes access to write to the connection, and ensures that HEADERS // and CONTINUATION frames stay together, as they must do. We use the lock as well to ensure new // streams are created and started in order. - Memory writeBuffer = await StartWriteAsync(totalSize, cancellationToken).ConfigureAwait(false); - try + await PerformWriteAsync(totalSize, (thisRef: this, http2Stream, current, remaining, totalSize, flags, mustFlush), (s, writeBuffer) => { - // Allocate the next available stream ID. Note that if we fail before sending the headers, - // we'll just skip this stream ID, which is fine. - lock (SyncObject) + try { - if (_nextStream == MaxStreamId || _disposed || _lastStreamId != -1) + if (NetEventSource.IsEnabled) s.thisRef.Trace(s.http2Stream.StreamId, $"Started writing. {nameof(s.totalSize)}={s.totalSize}"); + + // Allocate the next available stream ID. Note that if we fail before sending the headers, + // we'll just skip this stream ID, which is fine. + lock (s.thisRef.SyncObject) { - // We ran out of stream IDs or we raced between acquiring the connection from the pool and shutting down. - // Throw a retryable request exception. This will cause retry logic to kick in - // and perform another connection attempt. The user should never see this exception. - ThrowShutdownException(); + if (s.thisRef._nextStream == MaxStreamId || s.thisRef._disposed || s.thisRef._lastStreamId != -1) + { + // We ran out of stream IDs or we raced between acquiring the connection from the pool and shutting down. + // Throw a retryable request exception. This will cause retry logic to kick in + // and perform another connection attempt. The user should never see this exception. + s.thisRef.ThrowShutdownException(); + } + + // Client-initiated streams are always odd-numbered, so increase by 2. + s.http2Stream.StreamId = s.thisRef._nextStream; + s.thisRef._nextStream += 2; + + // We're about to flush the HEADERS frame, so add the stream to the dictionary now. + // The lifetime of the stream is now controlled by the stream itself and the connection. + // This can fail if the connection is shutting down, in which case we will cancel sending this frame. + s.thisRef._httpStreams.Add(s.http2Stream.StreamId, s.http2Stream); } - // Client-initiated streams are always odd-numbered, so increase by 2. - http2Stream.StreamId = _nextStream; - _nextStream += 2; + Span span = writeBuffer.Span; - // We're about to flush the HEADERS frame, so add the stream to the dictionary now. - // The lifetime of the stream is now controlled by the stream itself and the connection. - // This can fail if the connection is shutting down, in which case we will cancel sending this frame. - _httpStreams.Add(http2Stream.StreamId, http2Stream); - } + // Copy the HEADERS frame. + FrameHeader.WriteTo(span, s.current.Length, FrameType.Headers, s.flags, s.http2Stream.StreamId); + span = span.Slice(FrameHeader.Size); + s.current.Span.CopyTo(span); + span = span.Slice(s.current.Length); + if (NetEventSource.IsEnabled) s.thisRef.Trace(s.http2Stream.StreamId, $"Wrote HEADERS frame. Length={s.current.Length}, flags={s.flags}"); - if (NetEventSource.IsEnabled) Trace(http2Stream.StreamId, $"Started writing. {nameof(totalSize)}={totalSize}"); + // Copy CONTINUATION frames, if any. + while (s.remaining.Length > 0) + { + (s.current, s.remaining) = SplitBuffer(s.remaining, FrameHeader.MaxPayloadLength); + s.flags = s.remaining.Length == 0 ? FrameFlags.EndHeaders : FrameFlags.None; + + FrameHeader.WriteTo(span, s.current.Length, FrameType.Continuation, s.flags, s.http2Stream.StreamId); + span = span.Slice(FrameHeader.Size); + s.current.Span.CopyTo(span); + span = span.Slice(s.current.Length); + if (NetEventSource.IsEnabled) s.thisRef.Trace(s.http2Stream.StreamId, $"Wrote CONTINUATION frame. Length={s.current.Length}, flags={s.flags}"); + } - // Copy the HEADERS frame. - FrameHeader.WriteTo(writeBuffer.Span, current.Length, FrameType.Headers, flags, http2Stream.StreamId); - writeBuffer = writeBuffer.Slice(FrameHeader.Size); - current.CopyTo(writeBuffer); - writeBuffer = writeBuffer.Slice(current.Length); - if (NetEventSource.IsEnabled) Trace(http2Stream.StreamId, $"Wrote HEADERS frame. Length={current.Length}, flags={flags}"); + Debug.Assert(span.Length == 0); - // Copy CONTINUATION frames, if any. - while (remaining.Length > 0) + return s.mustFlush || (s.flags & FrameFlags.EndStream) != 0 ? FlushTiming.AfterPendingWrites : FlushTiming.Eventually; + } + catch { - (current, remaining) = SplitBuffer(remaining, FrameHeader.MaxPayloadLength); - flags = remaining.Length == 0 ? FrameFlags.EndHeaders : FrameFlags.None; - - FrameHeader.WriteTo(writeBuffer.Span, current.Length, FrameType.Continuation, flags, http2Stream.StreamId); - writeBuffer = writeBuffer.Slice(FrameHeader.Size); - current.CopyTo(writeBuffer); - writeBuffer = writeBuffer.Slice(current.Length); - if (NetEventSource.IsEnabled) Trace(http2Stream.StreamId, $"Wrote CONTINUATION frame. Length={current.Length}, flags={flags}"); + s.thisRef.EndWrite(forceFlush: false); + throw; } - - Debug.Assert(writeBuffer.Length == 0); - - FinishWrite(mustFlush || (flags & FrameFlags.EndStream) != 0 ? FlushTiming.AfterPendingWrites : FlushTiming.Eventually); - return http2Stream; - } - catch - { - CancelWrite(); - throw; - } + }, cancellationToken).ConfigureAwait(false); + return http2Stream; } catch { @@ -1320,57 +1294,58 @@ private async Task SendStreamDataAsync(int streamId, ReadOnlyMemory buffer while (remaining.Length > 0) { - int frameSize = Math.Min(remaining.Length, FrameHeader.MaxPayloadLength); - // Once credit had been granted, we want to actually consume those bytes. + int frameSize = Math.Min(remaining.Length, FrameHeader.MaxPayloadLength); frameSize = await _connectionWindow.RequestCreditAsync(frameSize, cancellationToken).ConfigureAwait(false); ReadOnlyMemory current; (current, remaining) = SplitBuffer(remaining, frameSize); - - // It's possible that a cancellation will occur while we wait for the write lock. In that case, we need to - // return the credit that we have acquired and don't plan to use. - Memory writeBuffer; try { - writeBuffer = await StartWriteAsync(FrameHeader.Size + current.Length, cancellationToken).ConfigureAwait(false); - if (NetEventSource.IsEnabled) Trace(streamId, $"Started writing. {nameof(writeBuffer.Length)}={writeBuffer.Length}"); + await PerformWriteAsync(FrameHeader.Size + current.Length, (thisRef: this, streamId, current), (s, writeBuffer) => + { + // Invoked while holding the lock: + if (NetEventSource.IsEnabled) s.thisRef.Trace(s.streamId, $"Started writing. {nameof(writeBuffer.Length)}={writeBuffer.Length}"); + + FrameHeader.WriteTo(writeBuffer.Span, s.current.Length, FrameType.Data, FrameFlags.None, s.streamId); + s.current.CopyTo(writeBuffer.Slice(FrameHeader.Size)); + + return FlushTiming.Eventually; // no need to flush, as the request content may do so explicitly, or worst case we'll do so as part of the end data frame + }, cancellationToken).ConfigureAwait(false); } catch { + // Invoked if waiting for the lock is canceled (in that case, we need to return the credit that we have acquired and don't plan to use): _connectionWindow.AdjustCredit(frameSize); throw; } - - FrameHeader.WriteTo(writeBuffer.Span, current.Length, FrameType.Data, FrameFlags.None, streamId); - current.CopyTo(writeBuffer.Slice(FrameHeader.Size)); - - FinishWrite(FlushTiming.Eventually); // no need to flush, as the request content may do so explicitly, or worst case we'll do so as part of the end data frame } } - private async Task SendEndStreamAsync(int streamId) - { - Memory writeBuffer = await StartWriteAsync(FrameHeader.Size).ConfigureAwait(false); - if (NetEventSource.IsEnabled) Trace(streamId, "Started writing."); + private Task SendEndStreamAsync(int streamId) => + PerformWriteAsync(FrameHeader.Size, (thisRef: this, streamId), (s, writeBuffer) => + { + if (NetEventSource.IsEnabled) s.thisRef.Trace(s.streamId, "Started writing."); - FrameHeader.WriteTo(writeBuffer.Span, 0, FrameType.Data, FrameFlags.EndStream, streamId); + FrameHeader.WriteTo(writeBuffer.Span, 0, FrameType.Data, FrameFlags.EndStream, s.streamId); - FinishWrite(FlushTiming.AfterPendingWrites); // finished sending request body, so flush soon (but ok to wait for pending packets) - } + return FlushTiming.AfterPendingWrites; // finished sending request body, so flush soon (but ok to wait for pending packets) + }); - private async Task SendWindowUpdateAsync(int streamId, int amount) + private Task SendWindowUpdateAsync(int streamId, int amount) { - Debug.Assert(amount > 0); - // We update both the connection-level and stream-level windows at the same time - Memory writeBuffer = await StartWriteAsync(FrameHeader.Size + FrameHeader.WindowUpdateLength).ConfigureAwait(false); - if (NetEventSource.IsEnabled) Trace(streamId, $"Started writing. {nameof(amount)}={amount}"); + Debug.Assert(amount > 0); + return PerformWriteAsync(FrameHeader.Size + FrameHeader.WindowUpdateLength, (thisRef: this, streamId, amount), (s, writeBuffer) => + { + if (NetEventSource.IsEnabled) s.thisRef.Trace(s.streamId, $"Started writing. {nameof(s.amount)}={s.amount}"); - FrameHeader.WriteTo(writeBuffer.Span, FrameHeader.WindowUpdateLength, FrameType.WindowUpdate, FrameFlags.None, streamId); - BinaryPrimitives.WriteInt32BigEndian(writeBuffer.Span.Slice(FrameHeader.Size), amount); + Span span = writeBuffer.Span; + FrameHeader.WriteTo(span, FrameHeader.WindowUpdateLength, FrameType.WindowUpdate, FrameFlags.None, s.streamId); + BinaryPrimitives.WriteInt32BigEndian(span.Slice(FrameHeader.Size), s.amount); - FinishWrite(FlushTiming.Now); // make sure window updates are seen as soon as possible + return FlushTiming.Now; // make sure window updates are seen as soon as possible + }); } private void ExtendWindow(int amount) @@ -1424,11 +1399,9 @@ private void Abort(Exception abortException) /// terminate it, which would be considered a failure, so this race condition is largely benign and inherent to /// the nature of connection pooling. /// - public bool IsExpired(long nowTicks, TimeSpan connectionLifetime, TimeSpan connectionIdleTimeout) - { if (_disposed) { @@ -1440,7 +1413,7 @@ public bool IsExpired(long nowTicks, (_httpStreams.Count == 0) && ((nowTicks - _idleSinceTickCount) > connectionIdleTimeout.TotalMilliseconds)) { - if (NetEventSource.IsEnabled) Trace($"Connection no longer usable. Idle {TimeSpan.FromMilliseconds((nowTicks - _idleSinceTickCount))} > {connectionIdleTimeout}."); + if (NetEventSource.IsEnabled) Trace($"Connection no longer usable. Idle {TimeSpan.FromMilliseconds(nowTicks - _idleSinceTickCount)} > {connectionIdleTimeout}."); return true; }