diff --git a/src/libraries/System.Net.Http/ref/System.Net.Http.cs b/src/libraries/System.Net.Http/ref/System.Net.Http.cs index c529bdcda2d49..1261932527cca 100644 --- a/src/libraries/System.Net.Http/ref/System.Net.Http.cs +++ b/src/libraries/System.Net.Http/ref/System.Net.Http.cs @@ -424,6 +424,7 @@ public SocketsHttpHandler() { } public System.Net.ICredentials? Credentials { get { throw null; } set { } } public System.Net.ICredentials? DefaultProxyCredentials { get { throw null; } set { } } public bool EnableMultipleHttp2Connections { get { throw null; } set { } } + public bool EnableMultipleHttp3Connections { get { throw null; } set { } } public System.TimeSpan Expect100ContinueTimeout { get { throw null; } set { } } public int InitialHttp2StreamWindowSize { get { throw null; } set { } } [System.Runtime.Versioning.UnsupportedOSPlatformGuardAttribute("browser")] diff --git a/src/libraries/System.Net.Http/src/Resources/Strings.resx b/src/libraries/System.Net.Http/src/Resources/Strings.resx index f234af52c717c..ea0161ab02b02 100644 --- a/src/libraries/System.Net.Http/src/Resources/Strings.resx +++ b/src/libraries/System.Net.Http/src/Resources/Strings.resx @@ -471,6 +471,9 @@ Received an invalid header name: '{0}'. + + An HTTP/3 connection could not be established because the server did not complete the HTTP/3 handshake. + The HTTP/3 server sent invalid data on the connection. HTTP/3 error code '{0}' (0x{1}). diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/BrowserHttpHandler/SocketsHttpHandler.cs b/src/libraries/System.Net.Http/src/System/Net/Http/BrowserHttpHandler/SocketsHttpHandler.cs index 1e82a7b46ea08..955375e331793 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/BrowserHttpHandler/SocketsHttpHandler.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/BrowserHttpHandler/SocketsHttpHandler.cs @@ -196,6 +196,12 @@ public bool EnableMultipleHttp2Connections set => throw new PlatformNotSupportedException(); } + public bool EnableMultipleHttp3Connections + { + get => throw new PlatformNotSupportedException(); + set => throw new PlatformNotSupportedException(); + } + public Func>? ConnectCallback { get => throw new PlatformNotSupportedException(); diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/ConnectHelper.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/ConnectHelper.cs index d8d43a4d250ea..ef6c5db24f33f 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/ConnectHelper.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/ConnectHelper.cs @@ -109,10 +109,10 @@ public static async ValueTask EstablishSslConnectionAsync(SslClientAu return sslStream; } - [SupportedOSPlatform("windows")] [SupportedOSPlatform("linux")] [SupportedOSPlatform("macos")] - public static async ValueTask ConnectQuicAsync(HttpRequestMessage request, DnsEndPoint endPoint, TimeSpan idleTimeout, SslClientAuthenticationOptions clientAuthenticationOptions, CancellationToken cancellationToken) + [SupportedOSPlatform("windows")] + public static async ValueTask ConnectQuicAsync(HttpRequestMessage request, DnsEndPoint endPoint, TimeSpan idleTimeout, SslClientAuthenticationOptions clientAuthenticationOptions, Action streamCapacityCallback, CancellationToken cancellationToken) { clientAuthenticationOptions = SetUpRemoteCertificateValidationCallback(clientAuthenticationOptions, request); @@ -126,7 +126,8 @@ public static async ValueTask ConnectQuicAsync(HttpRequestMessag DefaultStreamErrorCode = (long)Http3ErrorCode.RequestCancelled, DefaultCloseErrorCode = (long)Http3ErrorCode.NoError, RemoteEndPoint = endPoint, - ClientAuthenticationOptions = clientAuthenticationOptions + ClientAuthenticationOptions = clientAuthenticationOptions, + StreamCapacityCallback = streamCapacityCallback, }, cancellationToken).ConfigureAwait(false); } catch (Exception ex) when (ex is not OperationCanceledException) diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/ConnectionPool/HttpConnectionPool.Http3.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/ConnectionPool/HttpConnectionPool.Http3.cs index 227aa11ef8163..b2d17af903c61 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/ConnectionPool/HttpConnectionPool.Http3.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/ConnectionPool/HttpConnectionPool.Http3.cs @@ -3,9 +3,11 @@ using System.Collections.Generic; using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; using System.Net.Http.Headers; using System.Net.Quic; using System.Net.Security; +using System.Runtime.ExceptionServices; using System.Runtime.Versioning; using System.Threading; using System.Threading.Tasks; @@ -25,12 +27,19 @@ internal sealed partial class HttpConnectionPool [SupportedOSPlatformGuard("linux")] [SupportedOSPlatformGuard("macOS")] - [SupportedOSPlatformGuard("Windows")] + [SupportedOSPlatformGuard("windows")] internal static bool IsHttp3Supported() => (OperatingSystem.IsLinux() && !OperatingSystem.IsAndroid()) || OperatingSystem.IsWindows() || OperatingSystem.IsMacOS(); + /// List of available HTTP/3 connections stored in the pool. + private List? _availableHttp3Connections; + /// The number of HTTP/3 connections associated with the pool, including in use, available, and pending. + private int _associatedHttp3ConnectionCount; + /// Indicates whether an HTTP/3 connection is in the process of being established. + private bool _pendingHttp3Connection; + /// Queue of requests waiting for an HTTP/3 connection. + private RequestQueue _http3RequestQueue; + private bool _http3Enabled; - private Http3Connection? _http3Connection; - private SemaphoreSlim? _http3ConnectionCreateLock; internal readonly byte[]? _http3EncodedAuthorityHostHeader; /// Initially set to null, this can be set to enable HTTP/3 based on Alt-Svc. @@ -50,40 +59,45 @@ internal sealed partial class HttpConnectionPool private CancellationTokenSource? _altSvcBlocklistTimerCancellation; private volatile bool _altSvcEnabled = true; + private bool EnableMultipleHttp3Connections => _poolManager.Settings.EnableMultipleHttp3Connections; + // Returns null if HTTP3 cannot be used. [SupportedOSPlatform("windows")] [SupportedOSPlatform("linux")] [SupportedOSPlatform("macos")] private async ValueTask TrySendUsingHttp3Async(HttpRequestMessage request, CancellationToken cancellationToken) { + Debug.Assert(IsHttp3Supported()); + + Debug.Assert(_kind == HttpConnectionKind.Https); + Debug.Assert(_http3Enabled); + // Loop in case we get a 421 and need to send the request to a different authority. while (true) { - HttpAuthority? authority = _http3Authority; - - // If H3 is explicitly requested, assume prenegotiated H3. - if (request.Version.Major >= 3 && request.VersionPolicy != HttpVersionPolicy.RequestVersionOrLower) + if (!TryGetHttp3Authority(request, out _, out Exception? reasonException)) { - authority ??= _originAuthority; + if (reasonException is null) + { + return null; + } + ThrowGetVersionException(request, 3, reasonException); } - if (authority == null) + long queueStartingTimestamp = HttpTelemetry.Log.IsEnabled() || Settings._metrics!.RequestsQueueDuration.Enabled ? Stopwatch.GetTimestamp() : 0; + + if (!TryGetPooledHttp3Connection(request, out Http3Connection? connection, out HttpConnectionWaiter? http3ConnectionWaiter)) { - return null; + connection = await http3ConnectionWaiter.WaitWithCancellationAsync(cancellationToken).ConfigureAwait(false); } - Exception? reasonException; - if (IsAltSvcBlocked(authority, out reasonException)) + // Request cannot be sent over H/3 connection, try downgrade or report failure. + // Note that if there's an H/3 suitable origin authority but is unavailable or blocked via Alt-Svc, exception is thrown instead. + if (connection is null) { - ThrowGetVersionException(request, 3, reasonException); + return null; } - long queueStartingTimestamp = HttpTelemetry.Log.IsEnabled() || Settings._metrics!.RequestsQueueDuration.Enabled ? Stopwatch.GetTimestamp() : 0; - - ValueTask connectionTask = GetHttp3ConnectionAsync(request, authority, cancellationToken); - - Http3Connection connection = await connectionTask.ConfigureAwait(false); - HttpResponseMessage response = await connection.SendAsync(request, queueStartingTimestamp, cancellationToken).ConfigureAwait(false); // If an Alt-Svc authority returns 421, it means it can't actually handle the request. @@ -103,100 +117,503 @@ internal sealed partial class HttpConnectionPool [SupportedOSPlatform("windows")] [SupportedOSPlatform("linux")] [SupportedOSPlatform("macos")] - private async ValueTask GetHttp3ConnectionAsync(HttpRequestMessage request, HttpAuthority authority, CancellationToken cancellationToken) + private bool TryGetPooledHttp3Connection(HttpRequestMessage request, [NotNullWhen(true)] out Http3Connection? connection, [NotNullWhen(false)] out HttpConnectionWaiter? waiter) { - Debug.Assert(_kind == HttpConnectionKind.Https); - Debug.Assert(_http3Enabled); + Debug.Assert(IsHttp3Supported()); - Http3Connection? http3Connection = Volatile.Read(ref _http3Connection); - - if (http3Connection != null) + // Look for a usable connection. + while (true) { - if (CheckExpirationOnGet(http3Connection) || http3Connection.Authority != authority) + lock (SyncObj) + { + int availableConnectionCount = _availableHttp3Connections?.Count ?? 0; + if (availableConnectionCount > 0) + { + // We have a connection that we can attempt to use. + // Validate it below outside the lock, to avoid doing expensive operations while holding the lock. + connection = _availableHttp3Connections![availableConnectionCount - 1]; + } + else + { + // No available connections. Add to the request queue. + waiter = _http3RequestQueue.EnqueueRequest(request); + + CheckForHttp3ConnectionInjection(); + + // There were no available connections. This request has been added to the request queue. + if (NetEventSource.Log.IsEnabled()) Trace($"No available HTTP/3 connections; request queued."); + connection = null; + return false; + } + } + + if (CheckExpirationOnGet(connection)) { - // Connection expired. - if (NetEventSource.Log.IsEnabled()) http3Connection.Trace("Found expired HTTP3 connection."); - http3Connection.Dispose(); - InvalidateHttp3Connection(http3Connection); + if (NetEventSource.Log.IsEnabled()) connection.Trace("Found expired HTTP/3 connection in pool."); + + InvalidateHttp3Connection(connection); + continue; } - else + + // Disable and remove the connection from the pool only if we can open another. + // If we have only single connection, use the underlying QuicConnection mechanism to wait for available streams. + if (!connection.TryReserveStream() && EnableMultipleHttp3Connections) { - // Connection exists and it is still good to use. - if (NetEventSource.Log.IsEnabled()) Trace("Using existing HTTP3 connection."); - return http3Connection; + if (NetEventSource.Log.IsEnabled()) connection.Trace("Found HTTP/3 connection in pool without available streams."); + + bool found = false; + lock (SyncObj) + { + int index = _availableHttp3Connections.IndexOf(connection); + if (index != -1) + { + found = true; + _availableHttp3Connections.RemoveAt(index); + } + } + + // If we didn't find the connection, then someone beat us to removing it (or it shut down) + if (found) + { + DisableHttp3Connection(connection); + } + continue; } + + if (NetEventSource.Log.IsEnabled()) connection.Trace("Found usable HTTP/3 connection in pool."); + waiter = null; + return true; } + } - // Ensure that the connection creation semaphore is created - if (_http3ConnectionCreateLock == null) + [SupportedOSPlatform("windows")] + [SupportedOSPlatform("linux")] + [SupportedOSPlatform("macos")] + private void CheckForHttp3ConnectionInjection() + { + Debug.Assert(IsHttp3Supported()); + + Debug.Assert(HasSyncObjLock); + + _http3RequestQueue.PruneCompletedRequestsFromHeadOfQueue(this); + + // Determine if we can and should add a new connection to the pool. + int availableHttp3ConnectionCount = _availableHttp3Connections?.Count ?? 0; + bool willInject = availableHttp3ConnectionCount == 0 && // No available connections + !_pendingHttp3Connection && // Only allow one pending HTTP3 connection at a time + _http3RequestQueue.Count > 0 && // There are requests left on the queue + (_associatedHttp3ConnectionCount == 0 || EnableMultipleHttp3Connections) && // We allow multiple connections, or don't have a connection currently + _http3RequestQueue.RequestsWithoutAConnectionAttempt > 0; // There are requests we haven't issued a connection attempt for + + if (NetEventSource.Log.IsEnabled()) { - lock (SyncObj) - { - _http3ConnectionCreateLock ??= new SemaphoreSlim(1); - } + Trace($"Available HTTP/3.0 connections: {availableHttp3ConnectionCount}, " + + $"Pending HTTP/3.0 connection: {_pendingHttp3Connection}, " + + $"Requests in the queue: {_http3RequestQueue.Count}, " + + $"Requests without a connection attempt: {_http3RequestQueue.RequestsWithoutAConnectionAttempt}, " + + $"Total associated HTTP/3.0 connections: {_associatedHttp3ConnectionCount}, " + + $"Will inject connection: {willInject}."); } - await _http3ConnectionCreateLock.WaitAsync(cancellationToken).ConfigureAwait(false); + if (willInject) + { + _associatedHttp3ConnectionCount++; + _pendingHttp3Connection = true; + + RequestQueue.QueueItem queueItem = _http3RequestQueue.PeekNextRequestForConnectionAttempt(); + _ = InjectNewHttp3ConnectionAsync(queueItem); // ignore returned task + } + } + + [SupportedOSPlatform("windows")] + [SupportedOSPlatform("linux")] + [SupportedOSPlatform("macos")] + private async Task InjectNewHttp3ConnectionAsync(RequestQueue.QueueItem queueItem) + { + Debug.Assert(IsHttp3Supported()); + + if (NetEventSource.Log.IsEnabled()) Trace("Creating new HTTP/3 connection for pool."); + + // Queue the remainder of the work so that this method completes quickly + // and escapes locks held by the caller. + await Task.CompletedTask.ConfigureAwait(ConfigureAwaitOptions.ForceYielding); + + Http3Connection? connection = null; + Exception? connectionException = null; + HttpAuthority? authority = null; + HttpConnectionWaiter waiter = queueItem.Waiter; + + CancellationTokenSource cts = GetConnectTimeoutCancellationTokenSource(); + waiter.ConnectionCancellationTokenSource = cts; try { - if (_http3Connection != null) + if (TryGetHttp3Authority(queueItem.Request, out authority, out Exception? reasonException)) { - // Someone beat us to creating the connection. + // If the authority was sent as an option through alt-svc then include alt-used header. + connection = new Http3Connection(this, authority, includeAltUsedHeader: _http3Authority == authority); - if (NetEventSource.Log.IsEnabled()) + QuicConnection quicConnection = await ConnectHelper.ConnectQuicAsync(queueItem.Request, new DnsEndPoint(authority.IdnHost, authority.Port), _poolManager.Settings._pooledConnectionIdleTimeout, _sslOptionsHttp3!, connection.StreamCapacityCallback, cts.Token).ConfigureAwait(false); + if (quicConnection.NegotiatedApplicationProtocol != SslApplicationProtocol.Http3) { - Trace("Using existing HTTP3 connection."); + await quicConnection.DisposeAsync().ConfigureAwait(false); + throw new HttpRequestException(HttpRequestError.ConnectionError, "QUIC connected but no HTTP/3 indicated via ALPN.", null, RequestRetryType.RetryOnConnectionFailure); } + connection.InitQuicConnection(quicConnection); + } + else if (reasonException is not null) + { + ThrowGetVersionException(queueItem.Request, 3, reasonException); + } + } + catch (Exception e) + { + connectionException = e is OperationCanceledException oce && oce.CancellationToken == cts.Token && !waiter.CancelledByOriginatingRequestCompletion ? + CreateConnectTimeoutException(oce) : + e; - return _http3Connection; + // If the connection hasn't been initialized with QuicConnection, get rid of it. + connection?.Dispose(); + connection = null; + } + finally + { + lock (waiter) + { + waiter.ConnectionCancellationTokenSource = null; + cts.Dispose(); } + } - if (NetEventSource.Log.IsEnabled()) + if (connection is not null) + { + // Add the new connection to the pool. + ReturnHttp3Connection(connection, isNewConnection: true, waiter); + } + else + { + // Block list authority only if the connection attempt was not cancelled. + if (connectionException is not null && connectionException is not OperationCanceledException && authority is not null) { - Trace("Attempting new HTTP3 connection."); + // Disables HTTP/3 until server announces it can handle it via Alt-Svc. + BlocklistAuthority(authority, connectionException); } - QuicConnection quicConnection; - try + HandleHttp3ConnectionFailure(waiter, connectionException); + } + } + + [SupportedOSPlatform("windows")] + [SupportedOSPlatform("linux")] + [SupportedOSPlatform("macos")] + private void HandleHttp3ConnectionFailure(HttpConnectionWaiter requestWaiter, Exception? e) + { + Debug.Assert(IsHttp3Supported()); + + if (NetEventSource.Log.IsEnabled()) Trace($"HTTP3 connection failed: {e}"); + + // We don't care if this fails; that means the request was previously canceled or handled by a different connection. + if (e is null) + { + requestWaiter.TrySetResult(null); + } + else + { + requestWaiter.TrySetException(e); + } + + lock (SyncObj) + { + Debug.Assert(_associatedHttp3ConnectionCount > 0); + Debug.Assert(_pendingHttp3Connection); + + _associatedHttp3ConnectionCount--; + _pendingHttp3Connection = false; + + CheckForHttp3ConnectionInjection(); + } + } + + [SupportedOSPlatform("windows")] + [SupportedOSPlatform("linux")] + [SupportedOSPlatform("macos")] + private void ReturnHttp3Connection(Http3Connection connection, bool isNewConnection, HttpConnectionWaiter? initialRequestWaiter = null) + { + Debug.Assert(IsHttp3Supported()); + + if (NetEventSource.Log.IsEnabled()) connection.Trace($"{nameof(isNewConnection)}={isNewConnection}"); + + Debug.Assert(!HasSyncObjLock); + Debug.Assert(isNewConnection || initialRequestWaiter is null, "Shouldn't have a request unless the connection is new"); + + if (!isNewConnection && CheckExpirationOnReturn(connection)) + { + lock (SyncObj) { - quicConnection = await ConnectHelper.ConnectQuicAsync(request, new DnsEndPoint(authority.IdnHost, authority.Port), _poolManager.Settings._pooledConnectionIdleTimeout, _sslOptionsHttp3!, cancellationToken).ConfigureAwait(false); + Debug.Assert(_availableHttp3Connections is null || !_availableHttp3Connections.Contains(connection)); + Debug.Assert(_associatedHttp3ConnectionCount > (_availableHttp3Connections?.Count ?? 0)); + _associatedHttp3ConnectionCount--; } - catch (Exception ex) + + if (NetEventSource.Log.IsEnabled()) connection.Trace("Disposing HTTP3 connection return to pool. Connection lifetime expired."); + connection.Dispose(); + return; + } + + bool reserved; + while ((reserved = connection.TryReserveStream()) || !EnableMultipleHttp3Connections) + { + // Loop in case we get a request that has already been canceled or handled by a different connection. + while (true) { - if (NetEventSource.Log.IsEnabled()) Trace($"QUIC connection failed: {ex}"); + HttpConnectionWaiter? waiter = null; + bool added = false; + lock (SyncObj) + { + Debug.Assert(_availableHttp3Connections is null || !_availableHttp3Connections.Contains(connection), $"HTTP3 connection already in available list"); + Debug.Assert(_associatedHttp3ConnectionCount > (_availableHttp3Connections?.Count ?? 0), + $"Expected _associatedHttp3ConnectionCount={_associatedHttp3ConnectionCount} > _availableHttp3Connections.Count={(_availableHttp3Connections?.Count ?? 0)}"); + + if (isNewConnection) + { + Debug.Assert(_pendingHttp3Connection); + _pendingHttp3Connection = false; + isNewConnection = false; + } + + if (initialRequestWaiter is not null) + { + // Try to handle the request that we initiated the connection for first + waiter = initialRequestWaiter; + initialRequestWaiter = null; + + // If this method found a request to service, that request must be removed from the queue if it was at the head to avoid rooting it forever. + // Normally, TryDequeueWaiter would handle the removal. TryDequeueSpecificWaiter matches this behavior for the initial request case. + // We don't care if this fails; that means the request was previously canceled, handled by a different connection, or not at the head of the queue. + _http3RequestQueue.TryDequeueSpecificWaiter(waiter); + } + else if (_http3RequestQueue.TryDequeueWaiter(this, out waiter)) + { + Debug.Assert((_availableHttp3Connections?.Count ?? 0) == 0, $"With {(_availableHttp3Connections?.Count ?? 0)} available HTTP3 connections, we shouldn't have a waiter."); + } + else if (_disposed) + { + // The pool has been disposed. We will dispose this connection below outside the lock. + // We do this check after processing the request queue so that any queued requests will be handled by existing connections if possible. + _associatedHttp3ConnectionCount--; + } + else + { + // Add connection to the pool. + added = true; + _availableHttp3Connections ??= new List(); + _availableHttp3Connections.Add(connection); + } + } - // Block list authority only if the connection attempt was not cancelled. - if (ex is not OperationCanceledException oce || !cancellationToken.IsCancellationRequested || oce.CancellationToken != cancellationToken) + if (waiter is not null) { - // Disables HTTP/3 until server announces it can handle it via Alt-Svc. - BlocklistAuthority(authority, ex); + Debug.Assert(!added); + + if (waiter.TrySignal(connection)) + { + break; + } + + // Loop and process the queue again + } + else + { + if (reserved) + { + connection.ReleaseStream(); + } + if (added) + { + if (NetEventSource.Log.IsEnabled()) connection.Trace("Put HTTP3 connection in pool."); + return; + } + else + { + Debug.Assert(_disposed); + if (NetEventSource.Log.IsEnabled()) connection.Trace("Disposing HTTP3 connection returned to pool. Pool was disposed."); + connection.Dispose(); + return; + } } - throw; } + } - if (quicConnection.NegotiatedApplicationProtocol != SslApplicationProtocol.Http3) + // Since we only inject one connection at a time, we may want to inject another now. + lock (SyncObj) + { + CheckForHttp3ConnectionInjection(); + } + + // We need to wait until the connection is usable again. + DisableHttp3Connection(connection); + } + + /// + /// Disable usage of the specified connection because it cannot handle any more streams at the moment. + /// We will register to be notified when it can handle more streams (or becomes permanently unusable). + /// + [SupportedOSPlatform("windows")] + [SupportedOSPlatform("linux")] + [SupportedOSPlatform("macos")] + private void DisableHttp3Connection(Http3Connection connection) + { + Debug.Assert(IsHttp3Supported()); + + if (NetEventSource.Log.IsEnabled()) connection.Trace(""); + + _ = DisableHttp3ConnectionAsync(connection); // ignore returned task + + async Task DisableHttp3ConnectionAsync(Http3Connection connection) + { + bool usable = await connection.WaitForAvailableStreamsAsync().ConfigureAwait(ConfigureAwaitOptions.ForceYielding); + + if (NetEventSource.Log.IsEnabled()) connection.Trace($"{nameof(connection.WaitForAvailableStreamsAsync)} completed, {nameof(usable)}={usable}"); + + if (usable) { - BlocklistAuthority(authority); - throw new HttpRequestException(HttpRequestError.ConnectionError, "QUIC connected but no HTTP/3 indicated via ALPN.", null, RequestRetryType.RetryOnConnectionFailure); + ReturnHttp3Connection(connection, isNewConnection: false); } + else + { + // Connection has shut down. + lock (SyncObj) + { + Debug.Assert(_availableHttp3Connections is null || !_availableHttp3Connections.Contains(connection)); + Debug.Assert(_associatedHttp3ConnectionCount > 0); - // if the authority was sent as an option through alt-svc then include alt-used header - http3Connection = new Http3Connection(this, authority, quicConnection, includeAltUsedHeader: _http3Authority == authority); - _http3Connection = http3Connection; + _associatedHttp3ConnectionCount--; + + CheckForHttp3ConnectionInjection(); + } - if (NetEventSource.Log.IsEnabled()) + if (NetEventSource.Log.IsEnabled()) connection.Trace("HTTP3 connection no longer usable"); + connection.Dispose(); + } + }; + } + + /// + /// Called when an Http3Connection from this pool is no longer usable. + /// + [SupportedOSPlatform("windows")] + [SupportedOSPlatform("linux")] + [SupportedOSPlatform("macos")] + public void InvalidateHttp3Connection(Http3Connection connection) + { + Debug.Assert(IsHttp3Supported()); + + if (NetEventSource.Log.IsEnabled()) connection.Trace(""); + + bool found = false; + lock (SyncObj) + { + if (_availableHttp3Connections is not null) { - Trace("New HTTP3 connection established."); + Debug.Assert(_associatedHttp3ConnectionCount >= _availableHttp3Connections.Count); + + int index = _availableHttp3Connections.IndexOf(connection); + if (index != -1) + { + found = true; + _availableHttp3Connections.RemoveAt(index); + _associatedHttp3ConnectionCount--; + } } - return http3Connection; + CheckForHttp3ConnectionInjection(); } - finally + + // If we found the connection in the available list, then dispose it now. + // Otherwise, when we try to put it back in the pool, we will see it is shut down and dispose it (and adjust connection counts). + if (found) { - _http3ConnectionCreateLock.Release(); + connection.Dispose(); } } + [SupportedOSPlatform("windows")] + [SupportedOSPlatform("linux")] + [SupportedOSPlatform("macos")] + private static int ScavengeHttp3ConnectionList(List list, ref List? toDispose, long nowTicks, TimeSpan pooledConnectionLifetime, TimeSpan pooledConnectionIdleTimeout) + { + Debug.Assert(IsHttp3Supported()); + + int freeIndex = 0; + while (freeIndex < list.Count && list[freeIndex].IsUsable(nowTicks, pooledConnectionLifetime, pooledConnectionIdleTimeout)) + { + freeIndex++; + } + + // If freeIndex == list.Count, nothing needs to be removed. + // But if it's < list.Count, at least one connection needs to be purged. + int removed = 0; + if (freeIndex < list.Count) + { + // We know the connection at freeIndex is unusable, so dispose of it. + toDispose ??= new List(); + toDispose.Add(list[freeIndex]); + + // Find the first item after the one to be removed that should be kept. + int current = freeIndex + 1; + while (current < list.Count) + { + // Look for the first item to be kept. Along the way, any + // that shouldn't be kept are disposed of. + while (current < list.Count && !list[current].IsUsable(nowTicks, pooledConnectionLifetime, pooledConnectionIdleTimeout)) + { + toDispose.Add(list[current]); + current++; + } + + // If we found something to keep, copy it down to the known free slot. + if (current < list.Count) + { + // copy item to the free slot + list[freeIndex++] = list[current++]; + } + + // Keep going until there are no more good items. + } + + // At this point, good connections have been moved below freeIndex, and garbage connections have + // been added to the dispose list, so clear the end of the list past freeIndex. + removed = list.Count - freeIndex; + list.RemoveRange(freeIndex, removed); + } + + return removed; + } + + private bool TryGetHttp3Authority(HttpRequestMessage request, [NotNullWhen(true)] out HttpAuthority? authority, out Exception? reasonException) + { + authority = _http3Authority; + + // If H3 is explicitly requested, assume pre-negotiated H3. + if (request.Version.Major >= 3 && request.VersionPolicy != HttpVersionPolicy.RequestVersionOrLower) + { + authority ??= _originAuthority; + } + + if (authority is null) + { + reasonException = null; + return false; + } + + if (IsAltSvcBlocked(authority, out reasonException)) + { + return false; + } + + return true; + } + + /// Check for the Alt-Svc header, to upgrade to HTTP/3. private void ProcessAltSvc(HttpResponseMessage response) { @@ -291,7 +708,10 @@ internal void HandleAltSvc(IEnumerable altSvcHeaderValues, TimeSpan? res var wr = (WeakReference)o!; if (wr.TryGetTarget(out HttpConnectionPool? @this)) { - @this.ExpireAltSvcAuthority(); + lock (@this.SyncObj) + { + @this.ExpireAltSvcAuthority(); + } } }, thisRef, nextAuthorityMaxAge, Timeout.InfiniteTimeSpan); } @@ -319,6 +739,8 @@ internal void HandleAltSvc(IEnumerable altSvcHeaderValues, TimeSpan? res /// private void ExpireAltSvcAuthority() { + Debug.Assert(HasSyncObjLock); + // If we ever support prenegotiated HTTP/3, this should be set to origin, not nulled out. _http3Authority = null; } @@ -436,17 +858,6 @@ internal void BlocklistAuthority(HttpAuthority badAuthority, Exception? exceptio } } - public void InvalidateHttp3Connection(Http3Connection connection) - { - lock (SyncObj) - { - if (_http3Connection == connection) - { - _http3Connection = null; - } - } - } - public void OnNetworkChanged() { lock (SyncObj) diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/ConnectionPool/HttpConnectionPool.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/ConnectionPool/HttpConnectionPool.cs index 3fc2f43b01a8c..d4f271acfa681 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/ConnectionPool/HttpConnectionPool.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/ConnectionPool/HttpConnectionPool.cs @@ -245,6 +245,10 @@ public HttpConnectionPool(HttpConnectionPoolManager poolManager, HttpConnectionK { _http2RequestQueue = new RequestQueue(); } + if (IsHttp3Supported() && _http3Enabled) + { + _http3RequestQueue = new RequestQueue(); + } if (_proxyUri != null && HttpUtilities.IsSupportedSecureScheme(_proxyUri.Scheme)) { @@ -881,11 +885,12 @@ public void Dispose() _availableHttp2Connections.Clear(); } - if (_http3Connection is not null) + if (IsHttp3Supported() && _availableHttp3Connections is not null) { toDispose ??= new(); - toDispose.Add(_http3Connection); - _http3Connection = null; + toDispose.AddRange(_availableHttp3Connections); + _associatedHttp3ConnectionCount -= _availableHttp3Connections.Count; + _availableHttp3Connections.Clear(); } if (_authorityExpireTimer != null) @@ -956,6 +961,14 @@ public bool CleanCacheAndDisposeIfUnused() // Note: Http11 connections will decrement the _associatedHttp11ConnectionCount when disposed. // Http2 connections will not, hence the difference in handing _associatedHttp2ConnectionCount. } + if (IsHttp3Supported() && _availableHttp3Connections is not null) + { + int removed = ScavengeHttp3ConnectionList(_availableHttp3Connections, ref toDispose, nowTicks, pooledConnectionLifetime, pooledConnectionIdleTimeout); + _associatedHttp3ConnectionCount -= removed; + + // Note: Http11 connections will decrement the _associatedHttp11ConnectionCount when disposed. + // Http3 connections will not, hence the difference in handing _associatedHttp3ConnectionCount. + } } // Dispose the stale connections outside the pool lock, to avoid holding the lock too long. diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http2Connection.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http2Connection.cs index aec467cf2c740..2575c76201ec3 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http2Connection.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http2Connection.cs @@ -25,7 +25,6 @@ internal sealed partial class Http2Connection : HttpConnectionBase private TaskCompletionSourceWithCancellation? _initialSettingsReceived; - private readonly HttpConnectionPool _pool; private readonly Stream _stream; // NOTE: These are mutable structs; do not make these readonly. @@ -132,7 +131,6 @@ internal enum KeepAliveState public Http2Connection(HttpConnectionPool pool, Stream stream, IPEndPoint? remoteEndPoint) : base(pool, remoteEndPoint) { - _pool = pool; _stream = stream; _incomingBuffer = new ArrayBuffer(initialSize: 0, usePool: true); @@ -1794,18 +1792,6 @@ private bool ForceSendConnectionWindowUpdate() return true; } - public override long GetIdleTicks(long nowTicks) - { - // The pool is holding the lock as part of its scavenging logic. - // We must not lock on Http2Connection.SyncObj here as that could lead to lock ordering problems. - Debug.Assert(_pool.HasSyncObjLock); - - // There is a race condition here where the connection pool may see this connection as idle right before - // we start processing a new request and start its disposal. This is okay as we will either - // return false from TryReserveStream, or process pending requests before tearing down the transport. - return _streamsInUse == 0 ? base.GetIdleTicks(nowTicks) : 0; - } - /// Abort all streams and cause further processing to fail. /// Exception causing Abort to be called. private void Abort(Exception abortException) diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3Connection.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3Connection.cs index 75b9d9b1f8a96..c00071ac3897c 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3Connection.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3Connection.cs @@ -14,12 +14,11 @@ namespace System.Net.Http { - [SupportedOSPlatform("windows")] [SupportedOSPlatform("linux")] [SupportedOSPlatform("macos")] + [SupportedOSPlatform("windows")] internal sealed class Http3Connection : HttpConnectionBase { - private readonly HttpConnectionPool _pool; private readonly HttpAuthority _authority; private readonly byte[]? _altUsedEncodedHeader; private QuicConnection? _connection; @@ -33,7 +32,7 @@ internal sealed class Http3Connection : HttpConnectionBase // Our control stream. private QuicStream? _clientControl; - private Task _sendSettingsTask; + private Task? _sendSettingsTask; // Server-advertised SETTINGS_MAX_FIELD_SECTION_SIZE // https://www.rfc-editor.org/rfc/rfc9114.html#section-7.2.4.1-2.2.1 @@ -54,6 +53,9 @@ internal sealed class Http3Connection : HttpConnectionBase public Exception? AbortException => Volatile.Read(ref _abortException); private object SyncObj => _activeRequests; + private int _availableRequestStreamsCount; + private TaskCompletionSource? _availableStreamsWaiter; + /// /// If true, we've received GOAWAY, are aborting due to a connection-level error, or are disposing due to pool limits. /// @@ -66,12 +68,10 @@ private bool ShuttingDown } } - public Http3Connection(HttpConnectionPool pool, HttpAuthority authority, QuicConnection connection, bool includeAltUsedHeader) - : base(pool, connection.RemoteEndPoint) + public Http3Connection(HttpConnectionPool pool, HttpAuthority authority, bool includeAltUsedHeader) + : base(pool) { - _pool = pool; _authority = authority; - _connection = connection; if (includeAltUsedHeader) { @@ -87,6 +87,13 @@ public Http3Connection(HttpConnectionPool pool, HttpAuthority authority, QuicCon // Use this as an initial value before we receive the SETTINGS frame. _maxHeaderListSize = maxHeaderListSize; } + } + + public void InitQuicConnection(QuicConnection connection) + { + MarkConnectionAsEstablished(connection.RemoteEndPoint); + + _connection = connection; // Errors are observed via Abort(). _sendSettingsTask = SendSettingsAsync(); @@ -128,6 +135,9 @@ private void CheckForShutdown() { // Close the QuicConnection in the background. + _availableStreamsWaiter?.SetResult(false); + _availableStreamsWaiter = null; + _connectionClosedTask ??= _connection.CloseAsync((long)Http3ErrorCode.NoError).AsTask(); QuicConnection connection = _connection; @@ -151,7 +161,7 @@ private void CheckForShutdown() if (_clientControl != null) { - await _sendSettingsTask.ConfigureAwait(false); + await _sendSettingsTask!.ConfigureAwait(false); await _clientControl.DisposeAsync().ConfigureAwait(false); _clientControl = null; } @@ -162,6 +172,75 @@ private void CheckForShutdown() } } + public bool TryReserveStream() + { + lock (SyncObj) + { + Debug.Assert(_availableRequestStreamsCount >= 0); + + if (NetEventSource.Log.IsEnabled()) Trace($"_availableRequestStreamsCount = {_availableRequestStreamsCount}"); + + if (_availableRequestStreamsCount == 0) + { + return false; + } + + --_availableRequestStreamsCount; + return true; + } + } + + public void ReleaseStream() + { + lock (SyncObj) + { + Debug.Assert(_availableRequestStreamsCount >= 0); + + if (NetEventSource.Log.IsEnabled()) Trace($"_availableRequestStreamsCount = {_availableRequestStreamsCount}"); + ++_availableRequestStreamsCount; + + _availableStreamsWaiter?.SetResult(!ShuttingDown); + _availableStreamsWaiter = null; + } + } + + public void StreamCapacityCallback(QuicConnection connection, QuicStreamCapacityChangedArgs args) + { + Debug.Assert(_connection is null || connection == _connection); + + lock (SyncObj) + { + Debug.Assert(_availableRequestStreamsCount >= 0); + + if (NetEventSource.Log.IsEnabled()) Trace($"_availableRequestStreamsCount = {_availableRequestStreamsCount} + bidirectionalStreamsCountIncrement = {args.BidirectionalIncrement}"); + + _availableRequestStreamsCount += args.BidirectionalIncrement; + _availableStreamsWaiter?.SetResult(!ShuttingDown); + _availableStreamsWaiter = null; + } + } + + public Task WaitForAvailableStreamsAsync() + { + lock (SyncObj) + { + Debug.Assert(_availableRequestStreamsCount >= 0); + + if (ShuttingDown) + { + return Task.FromResult(false); + } + if (_availableRequestStreamsCount > 0) + { + return Task.FromResult(true); + } + + Debug.Assert(_availableStreamsWaiter is null); + _availableStreamsWaiter = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + return _availableStreamsWaiter.Task; + } + } + public async Task SendAsync(HttpRequestMessage request, long queueStartingTimestamp, CancellationToken cancellationToken) { // Allocate an active request @@ -184,7 +263,6 @@ public async Task SendAsync(HttpRequestMessage request, lon { MarkConnectionAsNotIdle(); } - _activeRequests.Add(quicStream, requestStream); } } @@ -363,10 +441,8 @@ public void RemoveStream(QuicStream stream) } } - public override long GetIdleTicks(long nowTicks) => throw new NotImplementedException("We aren't scavenging HTTP3 connections yet"); - public override void Trace(string message, [CallerMemberName] string? memberName = null) => - Trace(0, message, memberName); + Trace(0, _connection is not null ? $"{_connection} {message}" : message, memberName); internal void Trace(long streamId, string message, [CallerMemberName] string? memberName = null) => NetEventSource.Log.HandlerMessage( diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3RequestStream.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3RequestStream.cs index ef2532b2b22d0..9e4c967c58de0 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3RequestStream.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3RequestStream.cs @@ -17,9 +17,9 @@ namespace System.Net.Http { - [SupportedOSPlatform("windows")] [SupportedOSPlatform("linux")] [SupportedOSPlatform("macos")] + [SupportedOSPlatform("windows")] internal sealed class Http3RequestStream : IHttpStreamHeadersHandler, IAsyncDisposable, IDisposable { private readonly HttpRequestMessage _request; diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnection.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnection.cs index 2208a6265c97d..0d5fca33126b8 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnection.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnection.cs @@ -42,7 +42,6 @@ internal sealed partial class HttpConnection : HttpConnectionBase private static readonly ulong s_http10Bytes = BitConverter.ToUInt64("HTTP/1.0"u8); private static readonly ulong s_http11Bytes = BitConverter.ToUInt64("HTTP/1.1"u8); - private readonly HttpConnectionPool _pool; internal readonly Stream _stream; private readonly TransportContext? _transportContext; @@ -77,10 +76,8 @@ public HttpConnection( IPEndPoint? remoteEndPoint) : base(pool, remoteEndPoint) { - Debug.Assert(pool != null); Debug.Assert(stream != null); - _pool = pool; _stream = stream; _transportContext = transportContext; diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnectionBase.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnectionBase.cs index d1a48491674ed..fa9d5bc4e2b3c 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnectionBase.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnectionBase.cs @@ -16,17 +16,19 @@ namespace System.Net.Http { internal abstract class HttpConnectionBase : IDisposable, IHttpTrace { + protected readonly HttpConnectionPool _pool; + private static long s_connectionCounter = -1; // May be null if none of the counters were enabled when the connection was established. - private readonly ConnectionMetrics? _connectionMetrics; + private ConnectionMetrics? _connectionMetrics; // Indicates whether we've counted this connection as established, so that we can // avoid decrementing the counter once it's closed in case telemetry was enabled in between. - private readonly bool _httpTelemetryMarkedConnectionAsOpened; + private bool _httpTelemetryMarkedConnectionAsOpened; private readonly long _creationTickCount = Environment.TickCount64; - private long _idleSinceTickCount; + private long? _idleSinceTickCount; /// Cached string for the last Date header received on this connection. private string? _lastDateHeaderValue; @@ -35,13 +37,23 @@ internal abstract class HttpConnectionBase : IDisposable, IHttpTrace public long Id { get; } = Interlocked.Increment(ref s_connectionCounter); - public HttpConnectionBase(HttpConnectionPool pool, IPEndPoint? remoteEndPoint) + public HttpConnectionBase(HttpConnectionPool pool) { Debug.Assert(this is HttpConnection or Http2Connection or Http3Connection); - Debug.Assert(pool.Settings._metrics is not null); + Debug.Assert(pool != null); + _pool = pool; + } + public HttpConnectionBase(HttpConnectionPool pool, IPEndPoint? remoteEndPoint) + : this(pool) + { + MarkConnectionAsEstablished(remoteEndPoint); + } - SocketsHttpHandlerMetrics metrics = pool.Settings._metrics; + protected void MarkConnectionAsEstablished(IPEndPoint? remoteEndPoint) + { + Debug.Assert(_pool.Settings._metrics is not null); + SocketsHttpHandlerMetrics metrics = _pool.Settings._metrics; if (metrics.OpenConnections.Enabled || metrics.ConnectionDuration.Enabled) { // While requests may report HTTP/1.0 as the protocol, we treat all HTTP/1.X connections as HTTP/1.1. @@ -53,9 +65,9 @@ public HttpConnectionBase(HttpConnectionPool pool, IPEndPoint? remoteEndPoint) _connectionMetrics = new ConnectionMetrics( metrics, protocol, - pool.IsSecure ? "https" : "http", - pool.OriginAuthority.HostValue, - pool.IsDefaultPort ? null : pool.OriginAuthority.Port, + _pool.IsSecure ? "https" : "http", + _pool.OriginAuthority.HostValue, + _pool.IsDefaultPort ? null : _pool.OriginAuthority.Port, remoteEndPoint?.Address?.ToString()); _connectionMetrics.ConnectionEstablished(); @@ -67,9 +79,9 @@ public HttpConnectionBase(HttpConnectionPool pool, IPEndPoint? remoteEndPoint) { _httpTelemetryMarkedConnectionAsOpened = true; - string scheme = pool.IsSecure ? "https" : "http"; - string host = pool.OriginAuthority.HostValue; - int port = pool.OriginAuthority.Port; + string scheme = _pool.IsSecure ? "https" : "http"; + string host = _pool.OriginAuthority.HostValue; + int port = _pool.OriginAuthority.Port; if (this is HttpConnection) HttpTelemetry.Log.Http11ConnectionEstablished(Id, scheme, host, port, remoteEndPoint); else if (this is Http2Connection) HttpTelemetry.Log.Http20ConnectionEstablished(Id, scheme, host, port, remoteEndPoint); @@ -101,6 +113,7 @@ public void MarkConnectionAsIdle() public void MarkConnectionAsNotIdle() { + _idleSinceTickCount = null; _connectionMetrics?.IdleStateChanged(idle: false); } @@ -146,7 +159,7 @@ protected void TraceConnection(Stream stream) public long GetLifetimeTicks(long nowTicks) => nowTicks - _creationTickCount; - public virtual long GetIdleTicks(long nowTicks) => nowTicks - _idleSinceTickCount; + public long GetIdleTicks(long nowTicks) => _idleSinceTickCount is long idleSinceTickCount ? nowTicks - idleSinceTickCount : 0; /// Check whether a connection is still usable, or should be scavenged. /// True if connection can be used. diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnectionSettings.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnectionSettings.cs index 5cd1ee218c324..95026565c6ae4 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnectionSettings.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnectionSettings.cs @@ -61,6 +61,8 @@ internal sealed class HttpConnectionSettings internal bool _enableMultipleHttp2Connections; + internal bool _enableMultipleHttp3Connections; + internal Func>? _connectCallback; internal Func>? _plaintextStreamFilter; @@ -123,6 +125,7 @@ public HttpConnectionSettings CloneAndNormalize() _requestHeaderEncodingSelector = _requestHeaderEncodingSelector, _responseHeaderEncodingSelector = _responseHeaderEncodingSelector, _enableMultipleHttp2Connections = _enableMultipleHttp2Connections, + _enableMultipleHttp3Connections = _enableMultipleHttp3Connections, _connectCallback = _connectCallback, _plaintextStreamFilter = _plaintextStreamFilter, _initialHttp2StreamWindowSize = _initialHttp2StreamWindowSize, @@ -140,6 +143,8 @@ public HttpConnectionSettings CloneAndNormalize() public bool EnableMultipleHttp2Connections => _enableMultipleHttp2Connections; + public bool EnableMultipleHttp3Connections => _enableMultipleHttp3Connections; + private byte[]? _http3SettingsFrame; [SupportedOSPlatform("windows")] diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/SocketsHttpHandler.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/SocketsHttpHandler.cs index 47d210a6ba838..a20b9c5648541 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/SocketsHttpHandler.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/SocketsHttpHandler.cs @@ -358,9 +358,11 @@ public HttpKeepAlivePingPolicy KeepAlivePingPolicy } /// - /// Gets or sets a value that indicates whether additional HTTP/2 connections can be established to the same server - /// when the maximum of concurrent streams is reached on all existing connections. + /// Gets or sets a value that indicates whether additional HTTP/2 connections can be established to the same server. /// + /// + /// Enabling multiple connections to the same server explicitly goes against RFC 9113 - HTTP/2. + /// public bool EnableMultipleHttp2Connections { get => _settings._enableMultipleHttp2Connections; @@ -372,6 +374,23 @@ public bool EnableMultipleHttp2Connections } } + /// + /// Gets or sets a value that indicates whether additional HTTP/3 connections can be established to the same server. + /// + /// + /// Enabling multiple connections to the same server explicitly goes against RFC 9114 - HTTP/3. + /// + public bool EnableMultipleHttp3Connections + { + get => _settings._enableMultipleHttp3Connections; + set + { + CheckDisposedOrStarted(); + + _settings._enableMultipleHttp3Connections = value; + } + } + internal const bool SupportsAutomaticDecompression = true; internal const bool SupportsProxy = true; internal const bool SupportsRedirectConfiguration = true; diff --git a/src/libraries/System.Net.Http/tests/FunctionalTests/SocketsHttpHandlerTest.cs b/src/libraries/System.Net.Http/tests/FunctionalTests/SocketsHttpHandlerTest.cs index 62594756490ec..8c8b02536522b 100644 --- a/src/libraries/System.Net.Http/tests/FunctionalTests/SocketsHttpHandlerTest.cs +++ b/src/libraries/System.Net.Http/tests/FunctionalTests/SocketsHttpHandlerTest.cs @@ -2841,6 +2841,7 @@ private async Task VerifySendTasks(IReadOnlyList> send private static SocketsHttpHandler CreateHandler() => new SocketsHttpHandler { EnableMultipleHttp2Connections = true, + EnableMultipleHttp3Connections = true, PooledConnectionIdleTimeout = TimeSpan.FromHours(1), PooledConnectionLifetime = TimeSpan.FromHours(1), SslOptions = { RemoteCertificateValidationCallback = delegate { return true; } } diff --git a/src/libraries/System.Net.Http/tests/StressTests/HttpStress/StressClient.cs b/src/libraries/System.Net.Http/tests/StressTests/HttpStress/StressClient.cs index 4abd52e025edf..7520b49fae7f1 100644 --- a/src/libraries/System.Net.Http/tests/StressTests/HttpStress/StressClient.cs +++ b/src/libraries/System.Net.Http/tests/StressTests/HttpStress/StressClient.cs @@ -57,6 +57,8 @@ HttpMessageHandler CreateHttpHandler() return new SocketsHttpHandler() { PooledConnectionLifetime = _config.ConnectionLifetime.GetValueOrDefault(Timeout.InfiniteTimeSpan), + EnableMultipleHttp2Connections = true, + EnableMultipleHttp3Connections = true, SslOptions = new SslClientAuthenticationOptions { RemoteCertificateValidationCallback = delegate { return true; } diff --git a/src/libraries/System.Net.Quic/ref/System.Net.Quic.cs b/src/libraries/System.Net.Quic/ref/System.Net.Quic.cs index 96a53e2bceb15..21ffbdf461b88 100644 --- a/src/libraries/System.Net.Quic/ref/System.Net.Quic.cs +++ b/src/libraries/System.Net.Quic/ref/System.Net.Quic.cs @@ -47,6 +47,7 @@ internal QuicConnectionOptions() { } public System.TimeSpan KeepAliveInterval { get { throw null; } set { } } public int MaxInboundBidirectionalStreams { get { throw null; } set { } } public int MaxInboundUnidirectionalStreams { get { throw null; } set { } } + public System.Action? StreamCapacityCallback { get { throw null; } set { } } } public enum QuicError { @@ -141,6 +142,13 @@ public override void Write(System.ReadOnlySpan buffer) { } public override System.Threading.Tasks.ValueTask WriteAsync(System.ReadOnlyMemory buffer, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } public override void WriteByte(byte value) { } } + public readonly partial struct QuicStreamCapacityChangedArgs + { + private readonly object _dummy; + private readonly int _dummyPrimitive; + public int BidirectionalIncrement { get { throw null; } init { } } + public int UnidirectionalIncrement { get { throw null; } init { } } + } public enum QuicStreamType { Unidirectional = 0, diff --git a/src/libraries/System.Net.Quic/src/Resources/Strings.resx b/src/libraries/System.Net.Quic/src/Resources/Strings.resx index 86e319ecaf782..65585ab0e43d5 100644 --- a/src/libraries/System.Net.Quic/src/Resources/Strings.resx +++ b/src/libraries/System.Net.Quic/src/Resources/Strings.resx @@ -202,7 +202,10 @@ The server refused the connection. - A QUIC protocol error was encountered + A QUIC protocol error was encountered. + + + A specified application protocol is already in use. A version negotiation error was encountered. diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ThrowHelper.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ThrowHelper.cs index 85a06e12a0f91..456370e3d15d4 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ThrowHelper.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ThrowHelper.cs @@ -89,7 +89,7 @@ static Exception GetExceptionInternal(int status, long? errorCode, string? messa if (status == QUIC_STATUS_VER_NEG_ERROR) return new QuicException(QuicError.VersionNegotiationError, null, errorCode, SR.net_quic_ver_neg_error); if (status == QUIC_STATUS_CONNECTION_IDLE) return new QuicException(QuicError.ConnectionIdle, null, errorCode, SR.net_quic_connection_idle); if (status == QUIC_STATUS_PROTOCOL_ERROR) return new QuicException(QuicError.TransportError, null, errorCode, SR.net_quic_protocol_error); - if (status == QUIC_STATUS_ALPN_IN_USE) return new QuicException(QuicError.AlpnInUse, null, errorCode, SR.net_quic_protocol_error); + if (status == QUIC_STATUS_ALPN_IN_USE) return new QuicException(QuicError.AlpnInUse, null, errorCode, SR.net_quic_alpn_in_use); // // Transport errors will throw SocketException diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs index 3316fc8050d33..f1cd5e7c3fe28 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs @@ -18,6 +18,7 @@ using PEER_ADDRESS_CHANGED_DATA = Microsoft.Quic.QUIC_CONNECTION_EVENT._Anonymous_e__Union._PEER_ADDRESS_CHANGED_e__Struct; using PEER_CERTIFICATE_RECEIVED_DATA = Microsoft.Quic.QUIC_CONNECTION_EVENT._Anonymous_e__Union._PEER_CERTIFICATE_RECEIVED_e__Struct; using PEER_STREAM_STARTED_DATA = Microsoft.Quic.QUIC_CONNECTION_EVENT._Anonymous_e__Union._PEER_STREAM_STARTED_e__Struct; +using STREAMS_AVAILABLE_DATA = Microsoft.Quic.QUIC_CONNECTION_EVENT._Anonymous_e__Union._STREAMS_AVAILABLE_e__Struct; using SHUTDOWN_COMPLETE_DATA = Microsoft.Quic.QUIC_CONNECTION_EVENT._Anonymous_e__Union._SHUTDOWN_COMPLETE_e__Struct; using SHUTDOWN_INITIATED_BY_PEER_DATA = Microsoft.Quic.QUIC_CONNECTION_EVENT._Anonymous_e__Union._SHUTDOWN_INITIATED_BY_PEER_e__Struct; using SHUTDOWN_INITIATED_BY_TRANSPORT_DATA = Microsoft.Quic.QUIC_CONNECTION_EVENT._Anonymous_e__Union._SHUTDOWN_INITIATED_BY_TRANSPORT_e__Struct; @@ -179,6 +180,23 @@ static async ValueTask StartConnectAsync(QuicClientConnectionOpt /// private IPEndPoint _localEndPoint = null!; /// + /// Occurres when an additional stream capacity has been released by the peer. Corresponds to receiving a MAX_STREAMS frame. + /// + private Action? _streamCapacityCallback; + /// + /// Optimization to avoid `Action` instantiation with every . + /// Holds method. + /// + private Action _decrementStreamCapacity; + /// + /// Represents how many bidirectional streams can be accepted by the peer. Is only manipulated from MsQuic thread. + /// + private int _bidirectionalStreamCapacity; + /// + /// Represents how many unidirectional streams can be accepted by the peer. Is only manipulated from MsQuic thread. + /// + private int _unidirectionalStreamCapacity; + /// /// Keeps track whether has been accessed so that we know whether to dispose the certificate or not. /// private bool _remoteCertificateExposed; @@ -207,6 +225,40 @@ static async ValueTask StartConnectAsync(QuicClientConnectionOpt /// public IPEndPoint LocalEndPoint => _localEndPoint; + private async void OnStreamCapacityIncreased(int bidirectionalIncrement, int unidirectionalIncrement) + { + // Bail out early to avoid queueing work on the thread pool as well as event args instantiation. + if (_streamCapacityCallback is null) + { + return; + } + // No increment, nothing to report. + if (bidirectionalIncrement == 0 && unidirectionalIncrement == 0) + { + return; + } + + // Do not invoke user-defined event handler code on MsQuic thread. + await Task.CompletedTask.ConfigureAwait(ConfigureAwaitOptions.ForceYielding); + + try + { + if (NetEventSource.Log.IsEnabled()) + { + NetEventSource.Info(this, $"{this} Signaling StreamCapacityIncreased with {bidirectionalIncrement} bidirectional increment (absolute value {_bidirectionalStreamCapacity}) and {unidirectionalIncrement} unidirectional increment (absolute value {_unidirectionalStreamCapacity})."); + } + _streamCapacityCallback(this, new QuicStreamCapacityChangedArgs { BidirectionalIncrement = bidirectionalIncrement, UnidirectionalIncrement = unidirectionalIncrement }); + } + catch (Exception ex) + { + // Just log the exception, we're on a thread-pool thread and there's no way to report this to anyone. + if (NetEventSource.Log.IsEnabled()) + { + NetEventSource.Info(this, $"{this} {nameof(QuicConnectionOptions.StreamCapacityCallback)} failed with {ex}."); + } + } + } + /// /// Gets the name of the server the client is trying to connect to. That name is used for server certificate validation. It can be a DNS name or an IP address. /// @@ -262,6 +314,7 @@ private unsafe QuicConnection() NetEventSource.Info(this, $"{this} New outbound connection."); } + _decrementStreamCapacity = DecrementStreamCapacity; _tlsSecret = MsQuicTlsSecret.Create(_handle); } @@ -290,6 +343,7 @@ internal unsafe QuicConnection(QUIC_HANDLE* handle, QUIC_NEW_CONNECTION_INFO* in _remoteEndPoint = MsQuicHelpers.QuicAddrToIPEndPoint(info->RemoteAddress); _localEndPoint = MsQuicHelpers.QuicAddrToIPEndPoint(info->LocalAddress); + _decrementStreamCapacity = DecrementStreamCapacity; _tlsSecret = MsQuicTlsSecret.Create(_handle); } @@ -300,6 +354,7 @@ private async ValueTask FinishConnectAsync(QuicClientConnectionOptions options, _canAccept = options.MaxInboundBidirectionalStreams > 0 || options.MaxInboundUnidirectionalStreams > 0; _defaultStreamErrorCode = options.DefaultStreamErrorCode; _defaultCloseErrorCode = options.DefaultCloseErrorCode; + _streamCapacityCallback = options.StreamCapacityCallback; if (!options.RemoteEndPoint.TryParse(out string? host, out IPAddress? address, out int port)) { @@ -376,6 +431,7 @@ internal ValueTask FinishHandshakeAsync(QuicServerConnectionOptions options, str _canAccept = options.MaxInboundBidirectionalStreams > 0 || options.MaxInboundUnidirectionalStreams > 0; _defaultStreamErrorCode = options.DefaultStreamErrorCode; _defaultCloseErrorCode = options.DefaultCloseErrorCode; + _streamCapacityCallback = options.StreamCapacityCallback; // RFC 6066 forbids IP literals, avoid setting IP address here for consistency with SslStream if (TargetHostNameHelper.IsValidAddress(targetHost)) @@ -405,6 +461,33 @@ internal ValueTask FinishHandshakeAsync(QuicServerConnectionOptions options, str return valueTask; } + /// + /// In order to provide meaningful increments in , available streams count can be only manipulated from MsQuic thread. + /// For that purpose we pass this function to so that it can call it from START_COMPLETE event handler. + /// + /// Note that MsQuic itself manipulates stream counts right before indicating START_COMPLETE event. + /// + /// Type of the stream to decrement appropriate field. + private void DecrementStreamCapacity(QuicStreamType streamType) + { + if (streamType == QuicStreamType.Unidirectional) + { + --_unidirectionalStreamCapacity; + if (NetEventSource.Log.IsEnabled()) + { + NetEventSource.Info(this, $"{this} decremented stream count for {streamType} to {_unidirectionalStreamCapacity}."); + } + } + if (streamType == QuicStreamType.Bidirectional) + { + --_bidirectionalStreamCapacity; + if (NetEventSource.Log.IsEnabled()) + { + NetEventSource.Info(this, $"{this} decremented stream count for {streamType} to {_bidirectionalStreamCapacity}."); + } + } + } + /// /// Create an outbound uni/bidirectional . /// In case the connection doesn't have any available stream capacity, i.e.: the peer limits the concurrent stream count, @@ -427,7 +510,7 @@ public async ValueTask OpenOutboundStreamAsync(QuicStreamType type, NetEventSource.Info(this, $"{this} New outbound {type} stream {stream}."); } - await stream.StartAsync(cancellationToken).ConfigureAwait(false); + await stream.StartAsync(_decrementStreamCapacity, cancellationToken).ConfigureAwait(false); } catch (Exception ex) { @@ -604,11 +687,28 @@ private unsafe int HandleEventPeerStreamStarted(ref PEER_STREAM_STARTED_DATA dat data.Flags |= QUIC_STREAM_OPEN_FLAGS.DELAY_ID_FC_UPDATES; return QUIC_STATUS_SUCCESS; } + private unsafe int HandleEventStreamsAvailable(ref STREAMS_AVAILABLE_DATA data) + { + int bidirectionalIncrement = 0; + int unidirectionalIncrement = 0; + if (data.BidirectionalCount > 0) + { + bidirectionalIncrement = data.BidirectionalCount - _bidirectionalStreamCapacity; + _bidirectionalStreamCapacity = data.BidirectionalCount; + } + if (data.UnidirectionalCount > 0) + { + unidirectionalIncrement = data.UnidirectionalCount - _unidirectionalStreamCapacity; + _unidirectionalStreamCapacity = data.UnidirectionalCount; + } + OnStreamCapacityIncreased(bidirectionalIncrement, unidirectionalIncrement); + return QUIC_STATUS_SUCCESS; + } private unsafe int HandleEventPeerCertificateReceived(ref PEER_CERTIFICATE_RECEIVED_DATA data) { // // The certificate validation is an expensive operation and we don't want to delay MsQuic - // worker thread. So we offload the validation to the .NET threadpool. Incidentally, this + // worker thread. So we offload the validation to the .NET thread pool. Incidentally, this // also prevents potential user RemoteCertificateValidationCallback from blocking MsQuic // worker threads. // @@ -635,6 +735,7 @@ private unsafe int HandleConnectionEvent(ref QUIC_CONNECTION_EVENT connectionEve QUIC_CONNECTION_EVENT_TYPE.LOCAL_ADDRESS_CHANGED => HandleEventLocalAddressChanged(ref connectionEvent.LOCAL_ADDRESS_CHANGED), QUIC_CONNECTION_EVENT_TYPE.PEER_ADDRESS_CHANGED => HandleEventPeerAddressChanged(ref connectionEvent.PEER_ADDRESS_CHANGED), QUIC_CONNECTION_EVENT_TYPE.PEER_STREAM_STARTED => HandleEventPeerStreamStarted(ref connectionEvent.PEER_STREAM_STARTED), + QUIC_CONNECTION_EVENT_TYPE.STREAMS_AVAILABLE => HandleEventStreamsAvailable(ref connectionEvent.STREAMS_AVAILABLE), QUIC_CONNECTION_EVENT_TYPE.PEER_CERTIFICATE_RECEIVED => HandleEventPeerCertificateReceived(ref connectionEvent.PEER_CERTIFICATE_RECEIVED), _ => QUIC_STATUS_SUCCESS, }; diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnectionOptions.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnectionOptions.cs index b2d3b0d811f8f..a007f64fd5b6f 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnectionOptions.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnectionOptions.cs @@ -50,6 +50,21 @@ static void ValidatePowerOf2(string argumentName, int value, [CallerArgumentExpr } } +/// +/// Arguments for . +/// +public readonly struct QuicStreamCapacityChangedArgs +{ + /// + /// The increment saying how many additional bidirectional streams can be opened on the connection, increased via the latest MAX_STREAMS frame. + /// + public int BidirectionalIncrement { get; init; } + /// + /// The increment saying how many additional unidirectional streams can be opened on the connection, increased via the latest MAX_STREAMS frame. + /// + public int UnidirectionalIncrement { get; init; } +} + /// /// Shared options for both client (outbound) and server (inbound) . /// @@ -124,6 +139,14 @@ public QuicReceiveWindowSizes InitialReceiveWindowSizes /// public TimeSpan HandshakeTimeout { get; set; } = QuicDefaults.HandshakeTimeout; + /// + /// Optional callback that is invoked when new stream limit is released by the peer. Corresponds to receiving a MAX_STREAMS frame. + /// The callback values represent increments of stream limits, e.g.: current limit is 10 bidirectional streams, callback arguments notify 5 more additional bidirectional streams => 15 bidirectional streams can be opened in total at the moment. + /// The initial capacity is reported with the first invocation of the callback that might happen before the instance is handed out via either + /// or . + /// + public Action? StreamCapacityCallback { get; set; } + /// /// Validates the options and potentially sets platform specific defaults. /// diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs index 55058958a2141..c2469482c130c 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs @@ -120,6 +120,12 @@ public sealed partial class QuicStream private long _id = -1; private readonly QuicStreamType _type; + /// + /// Provided via from so that can decrement its available stream count field. + /// When START_COMPLETE arrives it gets invoked and unset back to null to not to hold any unintended reference to . + /// + private Action? _decrementStreamCapacity; + /// /// Stream id, see . /// @@ -239,22 +245,26 @@ internal unsafe QuicStream(MsQuicContextSafeHandle connectionHandle, QUIC_HANDLE /// If no more concurrent streams can be opened at the moment, the operation will wait until it can, /// either by closing some existing streams or receiving more available stream ids from the peer. /// + /// /// A cancellation token that can be used to cancel the asynchronous operation. /// An asynchronous task that completes with the opened . - internal ValueTask StartAsync(CancellationToken cancellationToken = default) + internal ValueTask StartAsync(Action decrementStreamCapacity, CancellationToken cancellationToken = default) { + Debug.Assert(!_startedTcs.IsCompleted); + + // Always call StreamStart to get consistent behavior (events, stream count, frames send to peer) regardless of cancellation. _startedTcs.TryInitialize(out ValueTask valueTask, this, cancellationToken); + _decrementStreamCapacity = decrementStreamCapacity; + unsafe { - unsafe - { - int status = MsQuicApi.Api.StreamStart( - _handle, - QUIC_STREAM_START_FLAGS.SHUTDOWN_ON_FAIL | QUIC_STREAM_START_FLAGS.INDICATE_PEER_ACCEPT); + int status = MsQuicApi.Api.StreamStart( + _handle, + QUIC_STREAM_START_FLAGS.SHUTDOWN_ON_FAIL | QUIC_STREAM_START_FLAGS.INDICATE_PEER_ACCEPT); - if (ThrowHelper.TryGetStreamExceptionForMsQuicStatus(status, out Exception? exception, streamWasSuccessfullyStarted: false)) - { - _startedTcs.TrySetException(exception); - } + if (ThrowHelper.TryGetStreamExceptionForMsQuicStatus(status, out Exception? exception, streamWasSuccessfullyStarted: false)) + { + _decrementStreamCapacity = null; + _startedTcs.TrySetException(exception); } } @@ -525,9 +535,13 @@ public void CompleteWrites() private unsafe int HandleEventStartComplete(ref START_COMPLETE_DATA data) { + Debug.Assert(_decrementStreamCapacity is not null); + _id = unchecked((long)data.ID); if (StatusSucceeded(data.Status)) { + _decrementStreamCapacity(Type); + if (data.PeerAccepted != 0) { _startedTcs.TrySetResult(); @@ -542,6 +556,7 @@ private unsafe int HandleEventStartComplete(ref START_COMPLETE_DATA data) } } + _decrementStreamCapacity = null; return QUIC_STATUS_SUCCESS; } private unsafe int HandleEventReceive(ref RECEIVE_DATA data) @@ -628,7 +643,7 @@ private unsafe int HandleEventShutdownComplete(ref SHUTDOWN_COMPLETE_DATA data) _receiveTcs.TrySetException(exception, final: true); _sendTcs.TrySetException(exception, final: true); } - _startedTcs.TrySetResult(); + _startedTcs.TrySetException(ThrowHelper.GetOperationAbortedException()); _shutdownTcs.TrySetResult(); return QUIC_STATUS_SUCCESS; } diff --git a/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicConnectionTests.cs b/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicConnectionTests.cs index 3526ab513cd14..668711c68a2e8 100644 --- a/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicConnectionTests.cs +++ b/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicConnectionTests.cs @@ -2,11 +2,13 @@ // The .NET Foundation licenses this file to you under the MIT license. using System.Collections.Generic; +using System.Linq; using System.Net.Sockets; using System.Security.Cryptography.X509Certificates; using System.Threading; using System.Threading.Tasks; using Microsoft.DotNet.XUnitExtensions; +using TestUtilities; using Xunit; using Xunit.Abstractions; @@ -188,6 +190,252 @@ await RunClientServer( }); } + [Fact] + public async Task GetStreamCapacity_OpenCloseStream_CountsCorrectly() + { + SemaphoreSlim streamsAvailableFired = new SemaphoreSlim(0); + int bidiIncrement = -1, unidiIncrement = -1; + + var clientOptions = CreateQuicClientOptions(new IPEndPoint(0, 0)); + clientOptions.StreamCapacityCallback = (connection, args) => + { + bidiIncrement = args.BidirectionalIncrement; + unidiIncrement = args.UnidirectionalIncrement; + streamsAvailableFired.Release(); + }; + + (QuicConnection clientConnection, QuicConnection serverConnection) = await CreateConnectedQuicConnection(clientOptions); + await streamsAvailableFired.WaitAsync(); + Assert.Equal(QuicDefaults.DefaultServerMaxInboundBidirectionalStreams, bidiIncrement); + Assert.Equal(QuicDefaults.DefaultServerMaxInboundUnidirectionalStreams, unidiIncrement); + + var clientStreamBidi = await clientConnection.OpenOutboundStreamAsync(QuicStreamType.Bidirectional); + await clientStreamBidi.DisposeAsync(); + var serverStreamBidi = await serverConnection.AcceptInboundStreamAsync(); + await serverStreamBidi.DisposeAsync(); + + // STREAMS_AVAILABLE event comes asynchronously, give it a chance to propagate + await streamsAvailableFired.WaitAsync(); + Assert.Equal(1, bidiIncrement); + Assert.Equal(0, unidiIncrement); + + var clientStreamUnidi = await clientConnection.OpenOutboundStreamAsync(QuicStreamType.Unidirectional); + await clientStreamUnidi.DisposeAsync(); + var serverStreamUnidi = await serverConnection.AcceptInboundStreamAsync(); + await serverStreamUnidi.DisposeAsync(); + + // STREAMS_AVAILABLE event comes asynchronously, give it a chance to propagate + await streamsAvailableFired.WaitAsync(); + Assert.Equal(0, bidiIncrement); + Assert.Equal(1, unidiIncrement); + } + + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task GetStreamCapacity_OpenCloseStreamIntoNegative_CountsCorrectly(bool unidirectional) + { + SemaphoreSlim streamsAvailableFired = new SemaphoreSlim(0); + int bidiIncrement = -1, unidiIncrement = -1; + int bidiTotal = 0; + int unidiTotal = 0; + + var clientOptions = CreateQuicClientOptions(new IPEndPoint(0, 0)); + clientOptions.StreamCapacityCallback = (connection, args) => + { + Interlocked.Exchange(ref bidiIncrement, args.BidirectionalIncrement); + Interlocked.Exchange(ref unidiIncrement, args.UnidirectionalIncrement); + Interlocked.Add(ref bidiTotal, args.BidirectionalIncrement); + Interlocked.Add(ref unidiTotal, args.UnidirectionalIncrement); + streamsAvailableFired.Release(); + }; + + (QuicConnection clientConnection, QuicConnection serverConnection) = await CreateConnectedQuicConnection(clientOptions); + await streamsAvailableFired.WaitAsync(); + Assert.Equal(QuicDefaults.DefaultServerMaxInboundBidirectionalStreams, bidiIncrement); + Assert.Equal(QuicDefaults.DefaultServerMaxInboundUnidirectionalStreams, unidiIncrement); + Assert.Equal(QuicDefaults.DefaultServerMaxInboundBidirectionalStreams, bidiTotal); + Assert.Equal(QuicDefaults.DefaultServerMaxInboundUnidirectionalStreams, unidiTotal); + + // Open # of streams up to the capacity. + List clientStreams = (await Task.WhenAll(Enumerable.Range(0, unidirectional ? QuicDefaults.DefaultServerMaxInboundUnidirectionalStreams : QuicDefaults.DefaultServerMaxInboundBidirectionalStreams) + .Select(i => clientConnection.OpenOutboundStreamAsync(unidirectional ? QuicStreamType.Unidirectional : QuicStreamType.Bidirectional).AsTask()))) + .ToList(); + // Open another # of streams up to 2x capacity all together. + CancellationTokenSource cts = new CancellationTokenSource(); + List> pendingClientStreams = Enumerable.Range(0, unidirectional ? QuicDefaults.DefaultServerMaxInboundUnidirectionalStreams : QuicDefaults.DefaultServerMaxInboundBidirectionalStreams) + .Select(i => clientConnection.OpenOutboundStreamAsync(unidirectional ? QuicStreamType.Unidirectional : QuicStreamType.Bidirectional, cts.Token).AsTask()) + .ToList(); + foreach (var task in pendingClientStreams) + { + Assert.False(task.IsCompleted); + } + Assert.False(streamsAvailableFired.CurrentCount > 0); + + // Dispose streams to release capacity up to 0 (nothing gets reported yet). + foreach (var clientStream in clientStreams) + { + await clientStream.DisposeAsync(); + await (await serverConnection.AcceptInboundStreamAsync()).DisposeAsync(); + } + clientStreams.Clear(); + Assert.False(streamsAvailableFired.CurrentCount > 0); + + // All the pending streams should get accepted now. + clientStreams.AddRange(await Task.WhenAll(pendingClientStreams)); + + // Disposing the pending streams now should lead to stream capacity increments. + bool first = true; // The stream capacity is cumulatively reported only after the STREAMS_AVAILABLE reached over 0. + foreach (var clientStream in clientStreams) + { + await clientStream.DisposeAsync(); + await (await serverConnection.AcceptInboundStreamAsync()).DisposeAsync(); + await streamsAvailableFired.WaitAsync(); + Assert.Equal(unidirectional ? 0 : (first ? QuicDefaults.DefaultServerMaxInboundBidirectionalStreams + 1 : 1), bidiIncrement); + Assert.Equal(unidirectional ? (first ? QuicDefaults.DefaultServerMaxInboundUnidirectionalStreams + 1 : 1) : 0, unidiIncrement); + first = false; + } + Assert.False(streamsAvailableFired.CurrentCount > 0); + Assert.Equal(unidirectional ? QuicDefaults.DefaultServerMaxInboundBidirectionalStreams : QuicDefaults.DefaultServerMaxInboundBidirectionalStreams * 3, bidiTotal); + Assert.Equal(unidirectional ? QuicDefaults.DefaultServerMaxInboundUnidirectionalStreams * 3 : QuicDefaults.DefaultServerMaxInboundUnidirectionalStreams, unidiTotal); + } + + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task GetStreamCapacity_OpenCloseStreamCanceledIntoNegative_CountsCorrectly(bool unidirectional) + { + SemaphoreSlim streamsAvailableFired = new SemaphoreSlim(0); + int bidiIncrement = -1, unidiIncrement = -1; + int bidiTotal = 0; + int unidiTotal = 0; + + var clientOptions = CreateQuicClientOptions(new IPEndPoint(0, 0)); + clientOptions.StreamCapacityCallback = (connection, args) => + { + Interlocked.Exchange(ref bidiIncrement, args.BidirectionalIncrement); + Interlocked.Exchange(ref unidiIncrement, args.UnidirectionalIncrement); + Interlocked.Add(ref bidiTotal, args.BidirectionalIncrement); + Interlocked.Add(ref unidiTotal, args.UnidirectionalIncrement); + streamsAvailableFired.Release(); + }; + + (QuicConnection clientConnection, QuicConnection serverConnection) = await CreateConnectedQuicConnection(clientOptions); + await streamsAvailableFired.WaitAsync(); + Assert.Equal(QuicDefaults.DefaultServerMaxInboundBidirectionalStreams, bidiIncrement); + Assert.Equal(QuicDefaults.DefaultServerMaxInboundUnidirectionalStreams, unidiIncrement); + Assert.Equal(QuicDefaults.DefaultServerMaxInboundBidirectionalStreams, bidiTotal); + Assert.Equal(QuicDefaults.DefaultServerMaxInboundUnidirectionalStreams, unidiTotal); + + // Open # of streams up to the capacity. + List clientStreams = (await Task.WhenAll(Enumerable.Range(0, unidirectional ? QuicDefaults.DefaultServerMaxInboundUnidirectionalStreams : QuicDefaults.DefaultServerMaxInboundBidirectionalStreams) + .Select(i => clientConnection.OpenOutboundStreamAsync(unidirectional ? QuicStreamType.Unidirectional : QuicStreamType.Bidirectional).AsTask()))) + .ToList(); + // Open another # of streams up to 2x capacity all together. + CancellationTokenSource cts = new CancellationTokenSource(); + List> pendingClientStreams = Enumerable.Range(0, unidirectional ? QuicDefaults.DefaultServerMaxInboundUnidirectionalStreams : QuicDefaults.DefaultServerMaxInboundBidirectionalStreams) + .Select(i => clientConnection.OpenOutboundStreamAsync(unidirectional ? QuicStreamType.Unidirectional : QuicStreamType.Bidirectional, cts.Token).AsTask()) + .ToList(); + foreach (var task in pendingClientStreams) + { + Assert.False(task.IsCompleted); + } + Assert.False(streamsAvailableFired.CurrentCount > 0); + + // Cancel pending streams if requested. + cts.Cancel(); + + // Dispose streams to release capacity up to 0 (nothing gets reported yet). + foreach (var clientStream in clientStreams) + { + await clientStream.DisposeAsync(); + await (await serverConnection.AcceptInboundStreamAsync()).DisposeAsync(); + } + clientStreams.Clear(); + Assert.False(streamsAvailableFired.CurrentCount > 0); + + // Pending streams should get cancelled and disposing the streams now should lead to stream capacity increments. + bool first = true; // The stream capacity is cumulatively reported only after the STREAMS_AVAILABLE reached over 0. + foreach (var cancelledStream in pendingClientStreams) + { + Assert.True(cancelledStream.IsCanceled); + await (await serverConnection.AcceptInboundStreamAsync()).DisposeAsync(); + await streamsAvailableFired.WaitAsync(); + Assert.Equal(unidirectional ? 0 : (first ? QuicDefaults.DefaultServerMaxInboundBidirectionalStreams + 1 : 1), bidiIncrement); + Assert.Equal(unidirectional ? (first ? QuicDefaults.DefaultServerMaxInboundUnidirectionalStreams + 1 : 1) : 0, unidiIncrement); + first = false; + } + Assert.False(streamsAvailableFired.CurrentCount > 0); + Assert.Equal(unidirectional ? QuicDefaults.DefaultServerMaxInboundBidirectionalStreams : QuicDefaults.DefaultServerMaxInboundBidirectionalStreams * 3, bidiTotal); + Assert.Equal(unidirectional ? QuicDefaults.DefaultServerMaxInboundUnidirectionalStreams * 3 : QuicDefaults.DefaultServerMaxInboundUnidirectionalStreams, unidiTotal); + } + + [Fact] + public async Task GetStreamCapacity_SumInvariant() + { + int maxStreamIndex = 0; + const int Limit = 5; + + var clientOptions = CreateQuicClientOptions(new IPEndPoint(0, 0)); + clientOptions.StreamCapacityCallback = (connection, args) => + { + Interlocked.Add(ref maxStreamIndex, args.BidirectionalIncrement); + }; + + var listenerOptions = CreateQuicListenerOptions(); + listenerOptions.ConnectionOptionsCallback = (_, _, _) => + { + var options = CreateQuicServerOptions(); + options.MaxInboundBidirectionalStreams = Limit; + return ValueTask.FromResult(options); + }; + + (QuicConnection clientConnection, QuicConnection serverConnection) = await CreateConnectedQuicConnection(clientOptions, listenerOptions); + + Assert.Equal(Limit, maxStreamIndex); + + Queue<(QuicStream client, QuicStream server)> streams = new(); + + for (int i = 0; i < Limit; i++) + { + QuicStream clientStream = await clientConnection.OpenOutboundStreamAsync(QuicStreamType.Bidirectional); + await clientStream.WriteAsync(new byte[1]); + QuicStream serverStream = await serverConnection.AcceptInboundStreamAsync(); + streams.Enqueue((clientStream, serverStream)); + } + + Queue> tasks = new(); + // enqueue more stream creations + for (int i = 0; i < Limit; i++) + { + var newClientStreamTask = clientConnection.OpenOutboundStreamAsync(QuicStreamType.Bidirectional); + Assert.False(newClientStreamTask.IsCompleted, "Stream creation should not be completed synchronously"); + tasks.Enqueue(newClientStreamTask.AsTask()); + } + + // dispose streams + while (streams.Count > 0) + { + var (clientStream, serverStream) = streams.Dequeue(); + await clientStream.DisposeAsync(); + await serverStream.DisposeAsync(); + + if (tasks.TryDequeue(out var task)) + { + clientStream = await task; + await clientStream.WriteAsync(new byte[1]); + serverStream = await serverConnection.AcceptInboundStreamAsync(); + streams.Enqueue((clientStream, serverStream)); + } + } + + // give time to update the count + await Task.Delay(1000); + + // by now, we opened and closed 2 * Limit, and expect a budget of 'Limit' more + Assert.Equal(3 * Limit, maxStreamIndex); + } + [Fact] public async Task ConnectionClosedByPeer_WithPendingAcceptAndConnect_PendingAndSubsequentThrowConnectionAbortedException() {