From d7f17d1327180f2e7a33e6fe1c60e3a6d61f1f73 Mon Sep 17 00:00:00 2001 From: Malcolm Daigle Date: Mon, 15 Jul 2024 15:50:32 -0700 Subject: [PATCH] Add scaffolding for pool throttling (#2667) * Add basic rate limiters * Add license headers. Clean up usings. * Add docs. * Instantiate pooling data source via builder. Set rate limiter based on connection string options. * Compilation fixes. Remove auth params for now. * Use delegate to better capture callback signature. * Use state type to remove closed params. Clean up docs. * Implement review suggestions. * Implement dispose. * Dispose next rate limiter. * Dispose rate limiter on pool shutdown. * Address review nits. --- .../src/Microsoft.Data.SqlClient.csproj | 7 +- .../Data/SqlClientX/PoolingDataSource.cs | 53 ++++++++++++-- .../SqlClientX/RateLimiters/AsyncFlagFunc.cs | 20 ++++++ .../RateLimiters/BlockingPeriodRateLimiter.cs | 34 +++++++++ .../RateLimiters/ConcurrencyRateLimiter.cs | 67 ++++++++++++++++++ .../RateLimiters/PassthroughRateLimiter.cs | 39 +++++++++++ .../RateLimiters/RateLimiterBase.cs | 40 +++++++++++ .../Data/SqlClientX/SqlDataSource.cs | 10 +-- .../Data/SqlClientX/SqlDataSourceBuilder.cs | 70 +++++++++++++++++-- .../Data/SqlClientX/UnpooledDataSource.cs | 14 +--- 10 files changed, 320 insertions(+), 34 deletions(-) create mode 100644 src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClientX/RateLimiters/AsyncFlagFunc.cs create mode 100644 src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClientX/RateLimiters/BlockingPeriodRateLimiter.cs create mode 100644 src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClientX/RateLimiters/ConcurrencyRateLimiter.cs create mode 100644 src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClientX/RateLimiters/PassthroughRateLimiter.cs create mode 100644 src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClientX/RateLimiters/RateLimiterBase.cs diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft.Data.SqlClient.csproj b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft.Data.SqlClient.csproj index 171574379f..b9c6a78660 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft.Data.SqlClient.csproj +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft.Data.SqlClient.csproj @@ -954,6 +954,11 @@ + + + + + @@ -982,7 +987,7 @@ - + diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClientX/PoolingDataSource.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClientX/PoolingDataSource.cs index 225b807a36..7ea395dc46 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClientX/PoolingDataSource.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClientX/PoolingDataSource.cs @@ -13,6 +13,7 @@ using System.Threading.Tasks; using Microsoft.Data.ProviderBase; using Microsoft.Data.SqlClient; +using Microsoft.Data.SqlClientX.RateLimiters; namespace Microsoft.Data.SqlClientX { @@ -21,22 +22,29 @@ namespace Microsoft.Data.SqlClientX /// internal sealed class PoolingDataSource : SqlDataSource { - private DbConnectionPoolGroupOptions _connectionPoolGroupOptions; + private readonly DbConnectionPoolGroupOptions _connectionPoolGroupOptions; + private RateLimiterBase _connectionRateLimiter; internal int MinPoolSize => _connectionPoolGroupOptions.MinPoolSize; internal int MaxPoolSize => _connectionPoolGroupOptions.MaxPoolSize; + internal int ObjectID => _objectID; + + private static int _objectTypeCount; // EventSource counter + private readonly int _objectID = Interlocked.Increment(ref _objectTypeCount); + /// /// Initializes a new PoolingDataSource. /// //TODO: support auth contexts and provider info - PoolingDataSource(SqlConnectionStringBuilder connectionStringBuilder, + internal PoolingDataSource(SqlConnectionStringBuilder connectionStringBuilder, SqlCredential credential, - RemoteCertificateValidationCallback userCertificateValidationCallback, - Action clientCertificatesCallback, - DbConnectionPoolGroupOptions options) : base(connectionStringBuilder, credential, userCertificateValidationCallback, clientCertificatesCallback) + DbConnectionPoolGroupOptions options, + RateLimiterBase connectionRateLimiter) + : base(connectionStringBuilder, credential) { _connectionPoolGroupOptions = options; + _connectionRateLimiter = connectionRateLimiter; //TODO: other construction } @@ -46,10 +54,32 @@ internal override ValueTask GetInternalConnection(SqlConnectionX o throw new NotImplementedException(); } + internal readonly struct OpenInternalConnectionState + { + readonly SqlConnectionX _owningConnection; + readonly TimeSpan _timeout; + + internal OpenInternalConnectionState(SqlConnectionX owningConnection, TimeSpan timeout) + { + _owningConnection = owningConnection; + _timeout = timeout; + } + } + /// internal override ValueTask OpenNewInternalConnection(SqlConnectionX owningConnection, TimeSpan timeout, bool async, CancellationToken cancellationToken) { - throw new NotImplementedException(); + return _connectionRateLimiter.Execute( + RateLimitedOpen, + new OpenInternalConnectionState(owningConnection, timeout), + async, + cancellationToken + ); + + ValueTask RateLimitedOpen(OpenInternalConnectionState state, bool async, CancellationToken cancellationToken) + { + throw new NotImplementedException(); + } } /// @@ -75,7 +105,16 @@ internal void WarmUp() { throw new NotImplementedException(); } + + internal void Shutdown() + { + SqlClientEventSource.Log.TryPoolerTraceEvent(" {0}", ObjectID); + if (_connectionRateLimiter != null) { + _connectionRateLimiter.Dispose(); + _connectionRateLimiter = null; + } + } } } -#endif \ No newline at end of file +#endif diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClientX/RateLimiters/AsyncFlagFunc.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClientX/RateLimiters/AsyncFlagFunc.cs new file mode 100644 index 0000000000..ff589338fd --- /dev/null +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClientX/RateLimiters/AsyncFlagFunc.cs @@ -0,0 +1,20 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System.Threading; + +namespace Microsoft.Data.SqlClientX.RateLimiters +{ + /// + /// A function that operates asynchronously based on a flag. If isAsync is true, the function operates asynchronously. + /// If isAsync is false, the function operates synchronously. + /// + /// The type accepted by the callback as input. + /// The type returned by the callback. + /// An instance of State to be passed to the callback. + /// Indicates whether the function should operate asynchronously. + /// Allows cancellation of the operation. + /// Returns the result of the callback. + internal delegate TResult AsyncFlagFunc(TState state, bool isAsync, CancellationToken cancellationToken); +} diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClientX/RateLimiters/BlockingPeriodRateLimiter.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClientX/RateLimiters/BlockingPeriodRateLimiter.cs new file mode 100644 index 0000000000..d43013a883 --- /dev/null +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClientX/RateLimiters/BlockingPeriodRateLimiter.cs @@ -0,0 +1,34 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System; +using System.Threading; +using System.Threading.Tasks; + +#nullable enable + +namespace Microsoft.Data.SqlClientX.RateLimiters +{ + /// + /// A rate limiter that enforces a backoff (blocking) period upon error. + /// Each subsequent error increases the blocking duration, up to a maximum, until a success occurs. + /// + internal sealed class BlockingPeriodRateLimiter : RateLimiterBase + { + /// + internal override ValueTask Execute( + AsyncFlagFunc> callback, + State state, + bool isAsync, + CancellationToken cancellationToken = default) + { + throw new NotImplementedException(); + } + + public override void Dispose() + { + throw new NotImplementedException(); + } + } +} diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClientX/RateLimiters/ConcurrencyRateLimiter.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClientX/RateLimiters/ConcurrencyRateLimiter.cs new file mode 100644 index 0000000000..39815999db --- /dev/null +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClientX/RateLimiters/ConcurrencyRateLimiter.cs @@ -0,0 +1,67 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace Microsoft.Data.SqlClientX.RateLimiters +{ + /// + /// A rate limiter that enforces a concurrency limit. + /// When the limit is reached, new requests must wait until a spot is freed upon completion of an existing request. + /// + internal class ConcurrencyRateLimiter : RateLimiterBase + { + private readonly SemaphoreSlim _concurrencyLimitSemaphore; + + /// + /// Initializes a new ConcurrencyRateLimiter with the specified concurrency limit. + /// + /// The maximum number of concurrent requests. + internal ConcurrencyRateLimiter(int concurrencyLimit) + { + _concurrencyLimitSemaphore = new SemaphoreSlim(concurrencyLimit); + } + + /// + internal sealed override async ValueTask Execute(AsyncFlagFunc> callback, State state, bool isAsync, CancellationToken cancellationToken = default) + { + cancellationToken.ThrowIfCancellationRequested(); + + //TODO: in the future, we can enforce order + if (isAsync) + { + await _concurrencyLimitSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + } + else + { + _concurrencyLimitSemaphore.Wait(cancellationToken); + } + + try + { + cancellationToken.ThrowIfCancellationRequested(); + if (Next != null) + { + return await Next.Execute(callback, state, isAsync, cancellationToken).ConfigureAwait(false); + } + else + { + return await callback(state, isAsync, cancellationToken).ConfigureAwait(false); + } + } + finally + { + _concurrencyLimitSemaphore.Release(); + } + } + + public override void Dispose() + { + _concurrencyLimitSemaphore.Dispose(); + Next.Dispose(); + } + } +} diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClientX/RateLimiters/PassthroughRateLimiter.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClientX/RateLimiters/PassthroughRateLimiter.cs new file mode 100644 index 0000000000..881b84151f --- /dev/null +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClientX/RateLimiters/PassthroughRateLimiter.cs @@ -0,0 +1,39 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System.Threading; +using System.Threading.Tasks; + +namespace Microsoft.Data.SqlClientX.RateLimiters +{ + /// + /// A no-op rate limiter that simply executes the callback or passes through to the next rate limiter. + /// + internal sealed class PassthroughRateLimiter : RateLimiterBase + { + //TODO: no state, add static instance + + /// + internal override ValueTask Execute( + AsyncFlagFunc> callback, + State state, + bool isAsync, + CancellationToken cancellationToken = default) + { + if (Next != null) + { + return Next.Execute(callback, state, isAsync, cancellationToken); + } + else + { + return callback(state, isAsync, cancellationToken); + } + } + + public override void Dispose() + { + Next.Dispose(); + } + } +} diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClientX/RateLimiters/RateLimiterBase.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClientX/RateLimiters/RateLimiterBase.cs new file mode 100644 index 0000000000..c83118c3a7 --- /dev/null +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClientX/RateLimiters/RateLimiterBase.cs @@ -0,0 +1,40 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace Microsoft.Data.SqlClientX.RateLimiters +{ + /// + /// An interface for rate limiters that execute arbitraty code. Intended to be small and self contained and chained together to achieve more complex behavior. + /// + internal abstract class RateLimiterBase : IDisposable + { + + /// + /// The next rate limiter that should be executed within the context of this rate limiter. + /// + protected readonly RateLimiterBase Next; + + /// + /// Execute the provided callback within the context of the rate limit, or pass the responsibility along to the next rate limiter. + /// + /// The type accepted by the callback as input. + /// The type of the result returned by the callback. + /// The callback function to execute. + /// An instance of State to be passed to the callback. + /// Whether this method should run asynchronously. + /// Cancels outstanding requests. + /// Returns the result of the callback or the next rate limiter. + internal abstract ValueTask Execute( + AsyncFlagFunc> callback, + State state, + bool isAsync, + CancellationToken cancellationToken = default); + + public abstract void Dispose(); + } +} diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClientX/SqlDataSource.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClientX/SqlDataSource.cs index 17acff6897..add7ce318a 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClientX/SqlDataSource.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClientX/SqlDataSource.cs @@ -22,10 +22,6 @@ internal abstract class SqlDataSource : DbDataSource { private readonly SqlConnectionStringBuilder _connectionStringBuilder; - private readonly RemoteCertificateValidationCallback _userCertificateValidationCallback; - - private readonly Action _clientCertificatesCallback; - internal SqlCredential Credential { get; } //TODO: return SqlConnection after it is updated to wrap SqlConnectionX @@ -40,14 +36,10 @@ protected override SqlConnectionX CreateDbConnection() internal SqlDataSource( SqlConnectionStringBuilder connectionStringBuilder, - SqlCredential credential, - RemoteCertificateValidationCallback userCertificateValidationCallback, - Action clientCertificatesCallback) + SqlCredential credential) { _connectionStringBuilder = connectionStringBuilder; Credential = credential; - _userCertificateValidationCallback = userCertificateValidationCallback; - _clientCertificatesCallback = clientCertificatesCallback; } /// diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClientX/SqlDataSourceBuilder.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClientX/SqlDataSourceBuilder.cs index 2ba4928842..3aa15b9946 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClientX/SqlDataSourceBuilder.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClientX/SqlDataSourceBuilder.cs @@ -5,9 +5,13 @@ #if NET8_0_OR_GREATER using System; +using System.Diagnostics; using System.Net.Security; using System.Security.Cryptography.X509Certificates; +using Microsoft.Data.Common; +using Microsoft.Data.ProviderBase; using Microsoft.Data.SqlClient; +using Microsoft.Data.SqlClientX.RateLimiters; namespace Microsoft.Data.SqlClientX { @@ -42,11 +46,67 @@ public SqlDataSourceBuilder(string connectionString = null, SqlCredential creden /// public SqlDataSource Build() { - return new UnpooledDataSource( - ConnectionStringBuilder, - Credential, - UserCertificateValidationCallback, - ClientCertificatesCallback); + if (ConnectionStringBuilder.Pooling) + { + //TODO: pool group layer + + DbConnectionPoolGroupOptions poolGroupOptions = new DbConnectionPoolGroupOptions( + ConnectionStringBuilder.IntegratedSecurity, + ConnectionStringBuilder.MinPoolSize, + ConnectionStringBuilder.MaxPoolSize, + //TODO: carry over connect timeout conversion logic from SqlConnectionFactory? if not, don't need an extra allocation for this object, just use connection string builder + ConnectionStringBuilder.ConnectTimeout, + ConnectionStringBuilder.LoadBalanceTimeout, + ConnectionStringBuilder.Enlist); + + //TODO: evaluate app context switch for concurrency limit + RateLimiterBase rateLimiter = IsBlockingPeriodEnabled() ? new BlockingPeriodRateLimiter() : new PassthroughRateLimiter(); + + return new PoolingDataSource(ConnectionStringBuilder, + Credential, + poolGroupOptions, + rateLimiter); + } + else + { + return new UnpooledDataSource( + ConnectionStringBuilder, + Credential); + } + } + + private bool IsBlockingPeriodEnabled() + { + var policy = ConnectionStringBuilder.PoolBlockingPeriod; + + switch (policy) + { + case PoolBlockingPeriod.Auto: + { + if (ADP.IsAzureSqlServerEndpoint(ConnectionStringBuilder.DataSource)) + { + return false; // in Azure it will be Disabled + } + else + { + return true; // in Non Azure, it will be Enabled + } + } + case PoolBlockingPeriod.AlwaysBlock: + { + return true; //Enabled + } + case PoolBlockingPeriod.NeverBlock: + { + return false; //Disabled + } + default: + { + //we should never get into this path. + Debug.Fail("Unknown PoolBlockingPeriod. Please specify explicit results in above switch case statement."); + return true; + } + } } } } diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClientX/UnpooledDataSource.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClientX/UnpooledDataSource.cs index 993045e33c..7cde57e390 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClientX/UnpooledDataSource.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClientX/UnpooledDataSource.cs @@ -23,18 +23,8 @@ internal sealed class UnpooledDataSource : SqlDataSource /// /// /// - /// - /// - internal UnpooledDataSource( - SqlConnectionStringBuilder connectionStringBuilder, - SqlCredential credential, - RemoteCertificateValidationCallback userCertificateValidationCallback, - Action clientCertificatesCallback) : - base( - connectionStringBuilder, - credential, - userCertificateValidationCallback, - clientCertificatesCallback) + internal UnpooledDataSource(SqlConnectionStringBuilder connectionStringBuilder, SqlCredential credential) : + base(connectionStringBuilder, credential) { }