From 3343c795a476c850a2e2e5724344eee499ba4fb6 Mon Sep 17 00:00:00 2001
From: Mark Carrington <31017244+MarkMpn@users.noreply.github.com>
Date: Wed, 6 Nov 2024 22:32:40 +0000
Subject: [PATCH 1/6] Fixed batch exception handling Fixes #575
---
.../ExecutionPlan/BaseDmlNode.cs | 141 ++++++++++--------
1 file changed, 77 insertions(+), 64 deletions(-)
diff --git a/MarkMpn.Sql4Cds.Engine/ExecutionPlan/BaseDmlNode.cs b/MarkMpn.Sql4Cds.Engine/ExecutionPlan/BaseDmlNode.cs
index 5e6b7e7f..ec69a71e 100644
--- a/MarkMpn.Sql4Cds.Engine/ExecutionPlan/BaseDmlNode.cs
+++ b/MarkMpn.Sql4Cds.Engine/ExecutionPlan/BaseDmlNode.cs
@@ -60,6 +60,15 @@ public void Dispose()
}
}
+ class ParallelThreadState
+ {
+ public IOrganizationService Service { get; set; }
+
+ public ExecuteMultipleRequest EMR { get; set; }
+
+ public bool Error { get; set; }
+ }
+
///
/// The SQL string that the query was converted from
///
@@ -439,79 +448,78 @@ protected void ExecuteDmlOperation(DataSource dataSource, IQueryExecutionOptions
#endif
Interlocked.Increment(ref threadCount);
- return new { Service = service, EMR = default(ExecuteMultipleRequest) };
+ return new ParallelThreadState { Service = service, EMR = default(ExecuteMultipleRequest), Error = false };
},
(entity, loopState, index, threadLocalState) =>
{
- if (options.CancellationToken.IsCancellationRequested)
+ try
{
- loopState.Stop();
- return threadLocalState;
- }
+ if (options.CancellationToken.IsCancellationRequested)
+ {
+ loopState.Stop();
+ return threadLocalState;
+ }
- var request = requestGenerator(entity);
+ var request = requestGenerator(entity);
- if (BypassCustomPluginExecution)
- request.Parameters["BypassCustomPluginExecution"] = true;
+ if (BypassCustomPluginExecution)
+ request.Parameters["BypassCustomPluginExecution"] = true;
- if (BatchSize == 1)
- {
- var newCount = Interlocked.Increment(ref inProgressCount);
- var progress = (double)newCount / entities.Count;
+ if (BatchSize == 1)
+ {
+ var newCount = Interlocked.Increment(ref inProgressCount);
+ var progress = (double)newCount / entities.Count;
- if (threadCount < 2)
- options.Progress(progress, $"{operationNames.InProgressUppercase} {newCount:N0} of {entities.Count:N0} {GetDisplayName(0, meta)} ({progress:P0})...");
- else
- options.Progress(progress, $"{operationNames.InProgressUppercase} {newCount - threadCount + 1:N0}-{newCount:N0} of {entities.Count:N0} {GetDisplayName(0, meta)} ({progress:P0}, {threadCount:N0} threads)...");
+ if (threadCount < 2)
+ options.Progress(progress, $"{operationNames.InProgressUppercase} {newCount:N0} of {entities.Count:N0} {GetDisplayName(0, meta)} ({progress:P0})...");
+ else
+ options.Progress(progress, $"{operationNames.InProgressUppercase} {newCount - threadCount + 1:N0}-{newCount:N0} of {entities.Count:N0} {GetDisplayName(0, meta)} ({progress:P0}, {threadCount:N0} threads)...");
- while (true)
- {
- try
+ while (true)
{
- var response = dataSource.Execute(threadLocalState.Service, request);
- Interlocked.Increment(ref count);
+ try
+ {
+ var response = dataSource.Execute(threadLocalState.Service, request);
+ Interlocked.Increment(ref count);
- responseHandler?.Invoke(response);
- break;
- }
- catch (FaultException ex)
- {
- if (ex.Detail.ErrorCode == 429 || // Virtual/elastic tables
- ex.Detail.ErrorCode == -2147015902 || // Number of requests exceeded the limit of 6000 over time window of 300 seconds.
- ex.Detail.ErrorCode == -2147015903 || // Combined execution time of incoming requests exceeded limit of 1,200,000 milliseconds over time window of 300 seconds. Decrease number of concurrent requests or reduce the duration of requests and try again later.
- ex.Detail.ErrorCode == -2147015898) // Number of concurrent requests exceeded the limit of 52.
+ responseHandler?.Invoke(response);
+ break;
+ }
+ catch (FaultException ex)
{
- // In case throttling isn't handled by normal retry logic in the service client
- var retryAfterSeconds = 2;
+ if (ex.Detail.ErrorCode == 429 || // Virtual/elastic tables
+ ex.Detail.ErrorCode == -2147015902 || // Number of requests exceeded the limit of 6000 over time window of 300 seconds.
+ ex.Detail.ErrorCode == -2147015903 || // Combined execution time of incoming requests exceeded limit of 1,200,000 milliseconds over time window of 300 seconds. Decrease number of concurrent requests or reduce the duration of requests and try again later.
+ ex.Detail.ErrorCode == -2147015898) // Number of concurrent requests exceeded the limit of 52.
+ {
+ // In case throttling isn't handled by normal retry logic in the service client
+ var retryAfterSeconds = 2;
- if (ex.Detail.ErrorDetails.TryGetValue("Retry-After", out var retryAfter) && (retryAfter is int || retryAfter is string s && Int32.TryParse(s, out _)))
- retryAfterSeconds = Convert.ToInt32(retryAfter);
+ if (ex.Detail.ErrorDetails.TryGetValue("Retry-After", out var retryAfter) && (retryAfter is int || retryAfter is string s && Int32.TryParse(s, out _)))
+ retryAfterSeconds = Convert.ToInt32(retryAfter);
- Thread.Sleep(retryAfterSeconds * 1000);
- continue;
- }
+ Thread.Sleep(retryAfterSeconds * 1000);
+ continue;
+ }
- if (FilterErrors(context, request, ex.Detail))
- {
- if (ContinueOnError)
- fault = fault ?? ex.Detail;
- else
- throw;
- }
+ if (FilterErrors(context, request, ex.Detail))
+ {
+ if (ContinueOnError)
+ fault = fault ?? ex.Detail;
+ else
+ throw;
+ }
- Interlocked.Increment(ref errorCount);
- break;
+ Interlocked.Increment(ref errorCount);
+ break;
+ }
}
}
- }
- else
- {
- if (threadLocalState.EMR == null)
+ else
{
- threadLocalState = new
+ if (threadLocalState.EMR == null)
{
- threadLocalState.Service,
- EMR = new ExecuteMultipleRequest
+ threadLocalState.EMR = new ExecuteMultipleRequest
{
Requests = new OrganizationRequestCollection(),
Settings = new ExecuteMultipleSettings
@@ -519,25 +527,30 @@ protected void ExecuteDmlOperation(DataSource dataSource, IQueryExecutionOptions
ContinueOnError = IgnoresSomeErrors,
ReturnResponses = responseHandler != null
}
- }
- };
- }
+ };
+ }
- threadLocalState.EMR.Requests.Add(request);
+ threadLocalState.EMR.Requests.Add(request);
- if (threadLocalState.EMR.Requests.Count == BatchSize)
- {
- ProcessBatch(threadLocalState.EMR, threadCount, ref count, ref inProgressCount, ref errorCount, entities, operationNames, meta, options, dataSource, threadLocalState.Service, context, responseHandler, ref fault);
+ if (threadLocalState.EMR.Requests.Count == BatchSize)
+ {
+ ProcessBatch(threadLocalState.EMR, threadCount, ref count, ref inProgressCount, ref errorCount, entities, operationNames, meta, options, dataSource, threadLocalState.Service, context, responseHandler, ref fault);
- threadLocalState = new { threadLocalState.Service, EMR = default(ExecuteMultipleRequest) };
+ threadLocalState.EMR = null;
+ }
}
- }
- return threadLocalState;
+ return threadLocalState;
+ }
+ catch
+ {
+ threadLocalState.Error = true;
+ throw;
+ }
},
(threadLocalState) =>
{
- if (threadLocalState.EMR != null)
+ if (threadLocalState.EMR != null && !threadLocalState.Error)
ProcessBatch(threadLocalState.EMR, threadCount, ref count, ref inProgressCount, ref errorCount, entities, operationNames, meta, options, dataSource, threadLocalState.Service, context, responseHandler, ref fault);
Interlocked.Decrement(ref threadCount);
From 98290c286f3aac0e6de7f78b0ce4f5e3177a2f33 Mon Sep 17 00:00:00 2001
From: Mark Carrington <31017244+MarkMpn@users.noreply.github.com>
Date: Sat, 9 Nov 2024 20:34:53 +0000
Subject: [PATCH 2/6] Service protection limit reporting
---
.../ExecutionPlan/BaseDmlNode.cs | 426 ++++++++++++++----
.../ExecutionPlan/DeleteNode.cs | 7 +-
.../ExecutionPlan/DynamicParallel.cs | 151 +++++++
.../ExecutionPlan/InsertNode.cs | 7 +-
.../ExecutionPlan/UpdateNode.cs | 7 +-
5 files changed, 499 insertions(+), 99 deletions(-)
create mode 100644 MarkMpn.Sql4Cds.Engine/ExecutionPlan/DynamicParallel.cs
diff --git a/MarkMpn.Sql4Cds.Engine/ExecutionPlan/BaseDmlNode.cs b/MarkMpn.Sql4Cds.Engine/ExecutionPlan/BaseDmlNode.cs
index ec69a71e..b21fb602 100644
--- a/MarkMpn.Sql4Cds.Engine/ExecutionPlan/BaseDmlNode.cs
+++ b/MarkMpn.Sql4Cds.Engine/ExecutionPlan/BaseDmlNode.cs
@@ -11,6 +11,7 @@
using Microsoft.Xrm.Sdk;
using Microsoft.Xrm.Sdk.Messages;
using Microsoft.Xrm.Sdk.Metadata;
+using System.Collections.Concurrent;
#if NETCOREAPP
using Microsoft.PowerPlatform.Dataverse.Client;
#else
@@ -66,9 +67,21 @@ class ParallelThreadState
public ExecuteMultipleRequest EMR { get; set; }
+ public int NextBatchSize { get; set; }
+
public bool Error { get; set; }
}
+ private int _entityCount;
+ private int _inProgressCount;
+ private int _successCount;
+ private int _errorCount;
+ private int _threadCount;
+ private ParallelOptions _parallelOptions;
+ private ConcurrentQueue _retryQueue;
+ private ConcurrentDictionary _delayedUntil;
+ private OrganizationServiceFault _fault;
+
///
/// The SQL string that the query was converted from
///
@@ -389,6 +402,11 @@ protected class OperationNames
/// The completed name of the operation to include in the middle of a log message, e.g. "updated"
///
public string CompletedLowercase { get; set; }
+
+ ///
+ /// The completed name of the operation to include at the start of a log message, e.g. "Updated"
+ ///
+ public string CompletedUppercase { get; set; }
}
///
@@ -406,10 +424,11 @@ protected class OperationNames
/// An optional parameter to handle the response messages from the server
protected void ExecuteDmlOperation(DataSource dataSource, IQueryExecutionOptions options, List entities, EntityMetadata meta, Func requestGenerator, OperationNames operationNames, NodeExecutionContext context, out int recordsAffected, out string message, Action responseHandler = null)
{
- var inProgressCount = 0;
- var count = 0;
- var errorCount = 0;
- var threadCount = 0;
+ _entityCount = entities.Count;
+ _inProgressCount = 0;
+ _successCount = 0;
+ _errorCount = 0;
+ _threadCount = 0;
#if NETCOREAPP
var svc = dataSource.Connection as ServiceClient;
@@ -425,7 +444,7 @@ protected void ExecuteDmlOperation(DataSource dataSource, IQueryExecutionOptions
if (maxDop == 1)
svc = null;
- var useAffinityCookie = maxDop == 1 || entities.Count < 100;
+ var useAffinityCookie = maxDop == 1 || _entityCount < 100;
try
{
@@ -433,87 +452,68 @@ protected void ExecuteDmlOperation(DataSource dataSource, IQueryExecutionOptions
using (UseParallelConnections())
{
- Parallel.ForEach(entities,
- new ParallelOptions { MaxDegreeOfParallelism = maxDop },
+ _parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = maxDop };
+ _retryQueue = new ConcurrentQueue();
+ _delayedUntil = new ConcurrentDictionary();
+
+ new DynamicParallel(
+ entities,
+ _parallelOptions,
() =>
{
var service = svc?.Clone() ?? dataSource.Connection;
#if NETCOREAPP
- if (!useAffinityCookie && service is ServiceClient crmService)
- crmService.EnableAffinityCookie = false;
+ if (service is ServiceClient crmService)
+ {
+ crmService.MaxRetryCount = 0;
+ crmService.EnableAffinityCookie = useAffinityCookie;
+ }
#else
- if (!useAffinityCookie && service is CrmServiceClient crmService)
- crmService.EnableAffinityCookie = false;
+ if (service is CrmServiceClient crmService)
+ {
+ crmService.MaxRetryCount = 0;
+ crmService.EnableAffinityCookie = useAffinityCookie;
+ }
#endif
- Interlocked.Increment(ref threadCount);
+ Interlocked.Increment(ref _threadCount);
- return new ParallelThreadState { Service = service, EMR = default(ExecuteMultipleRequest), Error = false };
+ return new ParallelThreadState
+ {
+ Service = service,
+ EMR = null,
+ Error = false,
+ NextBatchSize = 1
+ };
},
- (entity, loopState, index, threadLocalState) =>
+ async (entity, loopState, threadLocalState) =>
{
try
{
if (options.CancellationToken.IsCancellationRequested)
{
loopState.Stop();
- return threadLocalState;
+ return DynamicParallelVote.DecreaseThreads;
}
+ // TODO: Take any requests from the retry queue and execute them first
+
var request = requestGenerator(entity);
if (BypassCustomPluginExecution)
request.Parameters["BypassCustomPluginExecution"] = true;
- if (BatchSize == 1)
+ if (threadLocalState.NextBatchSize == 1)
{
- var newCount = Interlocked.Increment(ref inProgressCount);
- var progress = (double)newCount / entities.Count;
+ DynamicParallelVote? vote = null;
- if (threadCount < 2)
- options.Progress(progress, $"{operationNames.InProgressUppercase} {newCount:N0} of {entities.Count:N0} {GetDisplayName(0, meta)} ({progress:P0})...");
- else
- options.Progress(progress, $"{operationNames.InProgressUppercase} {newCount - threadCount + 1:N0}-{newCount:N0} of {entities.Count:N0} {GetDisplayName(0, meta)} ({progress:P0}, {threadCount:N0} threads)...");
-
- while (true)
+ await UpdateNextBatchSize(threadLocalState, async () =>
{
- try
- {
- var response = dataSource.Execute(threadLocalState.Service, request);
- Interlocked.Increment(ref count);
+ vote = await ExecuteSingleRequest(request, operationNames, meta, options, dataSource, threadLocalState.Service, context, responseHandler, threadLocalState);
+ });
- responseHandler?.Invoke(response);
- break;
- }
- catch (FaultException ex)
- {
- if (ex.Detail.ErrorCode == 429 || // Virtual/elastic tables
- ex.Detail.ErrorCode == -2147015902 || // Number of requests exceeded the limit of 6000 over time window of 300 seconds.
- ex.Detail.ErrorCode == -2147015903 || // Combined execution time of incoming requests exceeded limit of 1,200,000 milliseconds over time window of 300 seconds. Decrease number of concurrent requests or reduce the duration of requests and try again later.
- ex.Detail.ErrorCode == -2147015898) // Number of concurrent requests exceeded the limit of 52.
- {
- // In case throttling isn't handled by normal retry logic in the service client
- var retryAfterSeconds = 2;
-
- if (ex.Detail.ErrorDetails.TryGetValue("Retry-After", out var retryAfter) && (retryAfter is int || retryAfter is string s && Int32.TryParse(s, out _)))
- retryAfterSeconds = Convert.ToInt32(retryAfter);
-
- Thread.Sleep(retryAfterSeconds * 1000);
- continue;
- }
-
- if (FilterErrors(context, request, ex.Detail))
- {
- if (ContinueOnError)
- fault = fault ?? ex.Detail;
- else
- throw;
- }
-
- Interlocked.Increment(ref errorCount);
- break;
- }
- }
+ if (vote != null)
+ return vote;
}
else
{
@@ -532,15 +532,23 @@ protected void ExecuteDmlOperation(DataSource dataSource, IQueryExecutionOptions
threadLocalState.EMR.Requests.Add(request);
- if (threadLocalState.EMR.Requests.Count == BatchSize)
+ if (threadLocalState.EMR.Requests.Count < threadLocalState.NextBatchSize)
+ return null;
+
+ DynamicParallelVote? vote = null;
+
+ await UpdateNextBatchSize(threadLocalState, async () =>
{
- ProcessBatch(threadLocalState.EMR, threadCount, ref count, ref inProgressCount, ref errorCount, entities, operationNames, meta, options, dataSource, threadLocalState.Service, context, responseHandler, ref fault);
+ vote = await ProcessBatch(threadLocalState.EMR, operationNames, meta, options, dataSource, threadLocalState.Service, context, responseHandler, threadLocalState);
+ });
- threadLocalState.EMR = null;
- }
+ threadLocalState.EMR = null;
+
+ if (vote != null)
+ return vote;
}
- return threadLocalState;
+ return DynamicParallelVote.IncreaseThreads;
}
catch
{
@@ -548,16 +556,17 @@ protected void ExecuteDmlOperation(DataSource dataSource, IQueryExecutionOptions
throw;
}
},
- (threadLocalState) =>
+ async (threadLocalState) =>
{
if (threadLocalState.EMR != null && !threadLocalState.Error)
- ProcessBatch(threadLocalState.EMR, threadCount, ref count, ref inProgressCount, ref errorCount, entities, operationNames, meta, options, dataSource, threadLocalState.Service, context, responseHandler, ref fault);
+ await ProcessBatch(threadLocalState.EMR, operationNames, meta, options, dataSource, threadLocalState.Service, context, responseHandler, threadLocalState);
- Interlocked.Decrement(ref threadCount);
+ Interlocked.Decrement(ref _threadCount);
if (threadLocalState.Service != dataSource.Connection && threadLocalState.Service is IDisposable disposableClient)
disposableClient.Dispose();
- });
+ })
+ .ForEach(options.CancellationToken);
}
if (fault != null)
@@ -567,11 +576,16 @@ protected void ExecuteDmlOperation(DataSource dataSource, IQueryExecutionOptions
{
var originalEx = ex;
- if (ex is AggregateException agg && agg.InnerExceptions.Count == 1)
- ex = agg.InnerException;
+ if (ex is AggregateException agg)
+ {
+ if (agg.InnerExceptions.Count == 1)
+ ex = agg.InnerException;
+ else if (agg.InnerExceptions.All(inner => inner is OperationCanceledException))
+ ex = agg.InnerExceptions[0];
+ }
- if (count > 0)
- context.Log(new Sql4CdsError(1, 0, $"{count:N0} {GetDisplayName(count, meta)} {operationNames.CompletedLowercase}"));
+ if (_successCount > 0)
+ context.Log(new Sql4CdsError(1, 0, $"{_successCount:N0} {GetDisplayName(_successCount, meta)} {operationNames.CompletedLowercase}"));
if (ex == originalEx)
throw;
@@ -579,9 +593,52 @@ protected void ExecuteDmlOperation(DataSource dataSource, IQueryExecutionOptions
throw ex;
}
- recordsAffected = count;
- message = $"({count:N0} {GetDisplayName(count, meta)} {operationNames.CompletedLowercase})";
- context.ParameterValues["@@ROWCOUNT"] = (SqlInt32)count;
+ recordsAffected = _successCount;
+ message = $"({_successCount:N0} {GetDisplayName(_successCount, meta)} {operationNames.CompletedLowercase})";
+ context.ParameterValues["@@ROWCOUNT"] = (SqlInt32)_successCount;
+ }
+
+ private async Task UpdateNextBatchSize(ParallelThreadState threadLocalState, Func action)
+ {
+ // Time how long the action takes
+ var timer = new Timer();
+ using (timer.Run())
+ {
+ await action();
+ }
+
+ // Adjust the batch size based on the time taken to try and keep the total time around 10sec
+ var multiplier = TimeSpan.FromSeconds(10).TotalMilliseconds / timer.Duration.TotalMilliseconds;
+ threadLocalState.NextBatchSize = Math.Max(1, Math.Min(BatchSize, (int)(threadLocalState.NextBatchSize * multiplier)));
+ }
+
+ private bool IsThrottlingException(FaultException ex)
+ {
+ if (ex == null)
+ return false;
+
+ if (ex.Detail.ErrorCode == 429 || // Virtual/elastic tables
+ ex.Detail.ErrorCode == -2147015902 || // Number of requests exceeded the limit of 6000 over time window of 300 seconds.
+ ex.Detail.ErrorCode == -2147015903 || // Combined execution time of incoming requests exceeded limit of 1,200,000 milliseconds over time window of 300 seconds. Decrease number of concurrent requests or reduce the duration of requests and try again later.
+ ex.Detail.ErrorCode == -2147015898) // Number of concurrent requests exceeded the limit of 52.
+ {
+ return true;
+ }
+
+ return false;
+ }
+
+ private bool IsRetryableFault(OrganizationServiceFault fault)
+ {
+ if (fault == null)
+ return false;
+
+ if (fault.ErrorCode == -2147188475) // More than one concurrent {0} results detected for an Entity {1} and ObjectTypeCode {2}
+ {
+ return true;
+ }
+
+ return false;
}
protected class BulkApiErrorDetail
@@ -591,39 +648,216 @@ protected class BulkApiErrorDetail
public int StatusCode { get; set; }
}
- private void ProcessBatch(ExecuteMultipleRequest req, int threadCount, ref int count, ref int inProgressCount, ref int errorCount, List entities, OperationNames operationNames, EntityMetadata meta, IQueryExecutionOptions options, DataSource dataSource, IOrganizationService org, NodeExecutionContext context, Action responseHandler, ref OrganizationServiceFault fault)
+ private async Task ExecuteSingleRequest(OrganizationRequest req, OperationNames operationNames, EntityMetadata meta, IQueryExecutionOptions options, DataSource dataSource, IOrganizationService org, NodeExecutionContext context, Action responseHandler, ParallelThreadState threadState)
{
- var newCount = Interlocked.Add(ref inProgressCount, req.Requests.Count);
- var progress = (double)newCount / entities.Count;
- var threadCountMessage = threadCount < 2 ? "" : $" ({threadCount:N0} threads)";
- options.Progress(progress, $"{operationNames.InProgressUppercase} {GetDisplayName(0, meta)} {count + errorCount + 1:N0} - {newCount:N0} of {entities.Count:N0}{threadCountMessage}...");
- var resp = ExecuteMultiple(dataSource, org, meta, req);
+ var newCount = Interlocked.Increment(ref _inProgressCount);
+ ShowProgress(options, newCount, operationNames, meta);
- if (responseHandler != null)
+ for (var retry = 0; ; retry++)
{
- foreach (var item in resp.Responses)
+ try
{
- if (item.Response != null)
- responseHandler(item.Response);
+ var response = dataSource.Execute(org, req);
+ Interlocked.Increment(ref _successCount);
+
+ responseHandler?.Invoke(response);
+ break;
+ }
+ catch (FaultException ex)
+ {
+ if (IsThrottlingException(ex))
+ {
+ // Handle service protection limit retries ourselves to manage multi-threading
+ // Wait for the recommended retry time, then add the request to the queue for retrying
+ // Terminate this thread so we don't continue to overload the server
+ var retryDelay = TimeSpan.FromSeconds(2);
+
+ if (ex.Detail.ErrorDetails.TryGetValue("Retry-After", out var retryAfter) && retryAfter is TimeSpan ts)
+ retryDelay = ts;
+
+ newCount = Interlocked.Decrement(ref _inProgressCount);
+ _delayedUntil[threadState] = DateTime.Now.Add(retryDelay);
+ ShowProgress(options, newCount, operationNames, meta);
+
+ await Task.Delay(retryDelay, options.CancellationToken);
+ _delayedUntil.TryRemove(threadState, out _);
+ _retryQueue.Enqueue(req);
+ return DynamicParallelVote.DecreaseThreads;
+ }
+
+ if (IsRetryableFault(ex?.Detail))
+ {
+ // Retry the request after a short delay
+ Thread.Sleep(TimeSpan.FromSeconds(2));
+ continue;
+ }
+
+ if (FilterErrors(context, req, ex.Detail))
+ {
+ if (ContinueOnError)
+ _fault = _fault ?? ex.Detail;
+ else
+ throw;
+ }
+
+ Interlocked.Increment(ref _errorCount);
+ break;
}
}
- var errorResponses = resp.Responses
- .Where(r => r.Fault != null)
- .ToList();
+ return null;
+ }
- Interlocked.Add(ref count, req.Requests.Count - errorResponses.Count);
- Interlocked.Add(ref errorCount, errorResponses.Count);
+ private void ShowProgress(IQueryExecutionOptions options, int newCount, OperationNames operationNames, EntityMetadata meta)
+ {
+ var progress = (double)newCount / _entityCount;
+ var threadCountMessage = _threadCount < 2 ? "" : $" ({_threadCount:N0} threads)";
+ var operationName = operationNames.InProgressUppercase;
- var error = errorResponses.FirstOrDefault(item => FilterErrors(context, req.Requests[item.RequestIndex], item.Fault));
+ var delayedUntilValues = _delayedUntil.Values.ToArray();
+ if (delayedUntilValues.Length > 0)
+ {
+ operationName = operationNames.CompletedUppercase;
+ threadCountMessage += $" ({delayedUntilValues.Length:N0} threads paused until {delayedUntilValues.Min().ToShortTimeString()} due to service protection limits)";
+ }
- if (error != null)
+ if (_successCount + _errorCount + 1 >= newCount)
+ options.Progress(progress, $"{operationName} {GetDisplayName(0, meta)} {newCount:N0} of {_entityCount:N0}{threadCountMessage}...");
+ else
+ options.Progress(progress, $"{operationName} {GetDisplayName(0, meta)} {_successCount + _errorCount + 1:N0} - {newCount:N0} of {_entityCount:N0}{threadCountMessage}...");
+ }
+
+ private async Task ProcessBatch(ExecuteMultipleRequest req, OperationNames operationNames, EntityMetadata meta, IQueryExecutionOptions options, DataSource dataSource, IOrganizationService org, NodeExecutionContext context, Action responseHandler, ParallelThreadState threadState)
+ {
+ var newCount = Interlocked.Add(ref _inProgressCount, req.Requests.Count);
+ ShowProgress(options, newCount, operationNames, meta);
+
+ for (var retry = 0; !options.CancellationToken.IsCancellationRequested; retry++)
{
- fault = fault ?? error.Fault;
+ try
+ {
+ var resp = ExecuteMultiple(dataSource, org, meta, req);
- if (!ContinueOnError)
- throw new FaultException(fault, new FaultReason(fault.Message));
+ if (responseHandler != null)
+ {
+ foreach (var item in resp.Responses)
+ {
+ if (item.Response != null)
+ responseHandler(item.Response);
+ }
+ }
+
+ var errorResponses = resp.Responses
+ .Where(r => r.Fault != null)
+ .ToList();
+
+ var nonRetryableErrorResponses = errorResponses
+ .Where(r => !IsRetryableFault(r.Fault))
+ .ToList();
+
+ Interlocked.Add(ref _successCount, req.Requests.Count - errorResponses.Count);
+ Interlocked.Add(ref _errorCount, nonRetryableErrorResponses.Count);
+
+ var error = errorResponses.FirstOrDefault(item => FilterErrors(context, req.Requests[item.RequestIndex], item.Fault) && !IsRetryableFault(item.Fault));
+
+ if (error != null)
+ {
+ _fault = _fault ?? error.Fault;
+
+ if (!ContinueOnError)
+ throw new FaultException(_fault, new FaultReason(_fault.Message));
+ }
+
+ if (ContinueOnError)
+ {
+ var retryableErrors = errorResponses.Where(item => IsRetryableFault(item.Fault)).ToList();
+
+ if (retryableErrors.Count > 0)
+ {
+ // Create a new ExecuteMultipleRequest with all the requests that haven't been processed yet
+ var retryReq = new ExecuteMultipleRequest
+ {
+ Requests = new OrganizationRequestCollection(),
+ Settings = new ExecuteMultipleSettings
+ {
+ ContinueOnError = IgnoresSomeErrors,
+ ReturnResponses = responseHandler != null
+ }
+ };
+
+ foreach (var errorItem in retryableErrors)
+ retryReq.Requests.Add(req.Requests[errorItem.RequestIndex]);
+
+ // Wait and retry
+ var retryDelay = TimeSpan.FromSeconds(2);
+
+ if (retryableErrors[0].Fault.ErrorDetails.TryGetValue("Retry-After", out var retryAfter) && retryDelay is TimeSpan ts)
+ retryDelay = ts;
+
+ await Task.Delay(retryDelay, options.CancellationToken);
+ req = retryReq;
+ continue;
+ }
+ }
+ else
+ {
+ var firstRetryableError = errorResponses.FirstOrDefault(item => IsRetryableFault(item.Fault));
+
+ if (firstRetryableError != null)
+ {
+ // Create a new ExecuteMultipleRequest with all the requests that haven't been processed yet
+ var retryReq = new ExecuteMultipleRequest
+ {
+ Requests = new OrganizationRequestCollection(),
+ Settings = new ExecuteMultipleSettings
+ {
+ ContinueOnError = IgnoresSomeErrors,
+ ReturnResponses = responseHandler != null
+ }
+ };
+
+ for (var i = firstRetryableError.RequestIndex; i < req.Requests.Count; i++)
+ retryReq.Requests.Add(req.Requests[i]);
+
+ // Wait and retry
+ var retryDelay = TimeSpan.FromSeconds(2);
+
+ if (firstRetryableError.Fault.ErrorDetails.TryGetValue("Retry-After", out var retryAfter) && retryAfter is TimeSpan ts)
+ retryDelay = ts;
+
+ await Task.Delay(retryDelay, options.CancellationToken);
+ req = retryReq;
+ continue;
+ }
+ }
+
+ break;
+ }
+ catch (FaultException ex)
+ {
+ if (!IsThrottlingException(ex))
+ throw;
+
+ // Handle service protection limit retries ourselves to manage multi-threading
+ // Wait for the recommended retry time, then add the request to the queue for retrying
+ // Terminate this thread so we don't continue to overload the server
+ var retryDelay = TimeSpan.FromSeconds(2);
+
+ if (ex.Detail.ErrorDetails.TryGetValue("Retry-After", out var retryAfter) && retryAfter is TimeSpan ts)
+ retryDelay = ts;
+
+ newCount = Interlocked.Add(ref _inProgressCount, -req.Requests.Count);
+ _delayedUntil[threadState] = DateTime.Now.Add(retryDelay);
+ ShowProgress(options, newCount, operationNames, meta);
+
+ await Task.Delay(retryDelay, options.CancellationToken);
+ _delayedUntil.TryRemove(threadState, out _);
+ _retryQueue.Enqueue(req);
+ return DynamicParallelVote.DecreaseThreads;
+ }
}
+
+ return null;
}
protected virtual bool FilterErrors(NodeExecutionContext context, OrganizationRequest request, OrganizationServiceFault fault)
diff --git a/MarkMpn.Sql4Cds.Engine/ExecutionPlan/DeleteNode.cs b/MarkMpn.Sql4Cds.Engine/ExecutionPlan/DeleteNode.cs
index 8a783c52..98d82e6a 100644
--- a/MarkMpn.Sql4Cds.Engine/ExecutionPlan/DeleteNode.cs
+++ b/MarkMpn.Sql4Cds.Engine/ExecutionPlan/DeleteNode.cs
@@ -134,7 +134,8 @@ public override void Execute(NodeExecutionContext context, out int recordsAffect
{
InProgressUppercase = "Deleting",
InProgressLowercase = "deleting",
- CompletedLowercase = "deleted"
+ CompletedLowercase = "deleted",
+ CompletedUppercase = "Deleted"
},
context,
out recordsAffected,
@@ -148,6 +149,10 @@ public override void Execute(NodeExecutionContext context, out int recordsAffect
throw;
}
+ catch (OperationCanceledException)
+ {
+ throw;
+ }
catch (Exception ex)
{
throw new QueryExecutionException(ex.Message, ex) { Node = this };
diff --git a/MarkMpn.Sql4Cds.Engine/ExecutionPlan/DynamicParallel.cs b/MarkMpn.Sql4Cds.Engine/ExecutionPlan/DynamicParallel.cs
new file mode 100644
index 00000000..fa6c14a5
--- /dev/null
+++ b/MarkMpn.Sql4Cds.Engine/ExecutionPlan/DynamicParallel.cs
@@ -0,0 +1,151 @@
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace MarkMpn.Sql4Cds.Engine.ExecutionPlan
+{
+ ///
+ /// Provides similar methods to the class, but allows the loop to indicate that the number of threads
+ /// should change during execution.
+ ///
+ class DynamicParallel
+ {
+ class DynamicParallelThreadState
+ {
+ public Task Task { get; set; }
+
+ public DynamicParallelVote? Vote { get; set; }
+
+ public bool IsStopped { get; set; }
+
+ public CancellationToken CancellationToken { get; set; }
+ }
+
+ private readonly ConcurrentQueue _queue;
+ private readonly List _threads;
+ private readonly DynamicParallelLoopState _loopState;
+ private readonly ConcurrentBag _exceptions;
+ private readonly ParallelOptions _parallelOptions;
+ private readonly Func _localInit;
+ private readonly Func> _body;
+ private readonly Func _localFinally;
+
+ public DynamicParallel(
+ IEnumerable source,
+ ParallelOptions parallelOptions,
+ Func localInit,
+ Func> body,
+ Func localFinally)
+ {
+ _queue = new ConcurrentQueue(source);
+ _threads = new List();
+ _loopState = new DynamicParallelLoopState();
+ _exceptions = new ConcurrentBag();
+ _parallelOptions = parallelOptions;
+ _localInit = localInit;
+ _body = body;
+ _localFinally = localFinally;
+ }
+
+ public void ForEach(CancellationToken cancellationToken)
+ {
+ StartThread(cancellationToken);
+
+ var monitor = Task.Factory.StartNew(async () => await MonitorAsync(cancellationToken)).Unwrap();
+ monitor.Wait();
+
+ if (!_exceptions.IsEmpty)
+ throw new AggregateException(_exceptions);
+ }
+
+ private void StartThread(CancellationToken cancellationToken)
+ {
+ var threadState = new DynamicParallelThreadState
+ {
+ CancellationToken = cancellationToken
+ };
+ threadState.Task = Task.Factory.StartNew((state) => RunThread((DynamicParallelThreadState)state), threadState).Unwrap();
+ _threads.Add(threadState);
+ }
+
+ private async Task RunThread(DynamicParallelThreadState threadState)
+ {
+ var local = _localInit();
+
+ try
+ {
+ while (!_loopState.IsStopped &&
+ !threadState.IsStopped &&
+ _queue.TryDequeue(out var item))
+ {
+ threadState.Vote = await _body(item, _loopState, local) ?? threadState.Vote;
+ }
+ }
+ catch (Exception ex)
+ {
+ _exceptions.Add(ex);
+ _loopState.Stop();
+ }
+ finally
+ {
+ await _localFinally(local);
+ threadState.Vote = null;
+ }
+ }
+
+ private async Task MonitorAsync(CancellationToken cancellationToken)
+ {
+ var hasDecreased = false;
+
+ // Periodically check the votes and adjust the number of threads
+ while (!_loopState.IsStopped)
+ {
+ var allTasks = Task.WhenAll(_threads.Select(t => t.Task).ToArray());
+ var timeout = Task.Delay(1000, cancellationToken);
+ var task = await Task.WhenAny(allTasks, timeout);
+
+ if (task == allTasks)
+ break;
+
+ var votes = _threads.Where(t => t.Vote != null && !t.IsStopped).Select(t => t.Vote.Value).ToList();
+ var increase = votes.Count(v => v == DynamicParallelVote.IncreaseThreads);
+ var decrease = votes.Count(v => v == DynamicParallelVote.DecreaseThreads);
+
+ if (decrease > 0 && _threads.Count(t => !t.IsStopped) > 1)
+ {
+ var thread = _threads.First(t => !t.IsStopped && t.Vote == DynamicParallelVote.DecreaseThreads);
+ thread.IsStopped = true;
+ hasDecreased = true;
+ }
+ else if (increase > 0 && !hasDecreased && _threads.Count(t => !t.IsStopped) < _parallelOptions.MaxDegreeOfParallelism)
+ {
+ StartThread(cancellationToken);
+ }
+
+ foreach (var thread in _threads)
+ thread.Vote = null;
+
+ }
+ }
+ }
+
+ class DynamicParallelLoopState
+ {
+ public bool IsStopped { get; private set; }
+
+ public void Stop()
+ {
+ IsStopped = true;
+ }
+ }
+
+ enum DynamicParallelVote
+ {
+ IncreaseThreads,
+ DecreaseThreads,
+ }
+}
diff --git a/MarkMpn.Sql4Cds.Engine/ExecutionPlan/InsertNode.cs b/MarkMpn.Sql4Cds.Engine/ExecutionPlan/InsertNode.cs
index 64dc7579..9c39f1ea 100644
--- a/MarkMpn.Sql4Cds.Engine/ExecutionPlan/InsertNode.cs
+++ b/MarkMpn.Sql4Cds.Engine/ExecutionPlan/InsertNode.cs
@@ -148,7 +148,8 @@ public override void Execute(NodeExecutionContext context, out int recordsAffect
{
InProgressUppercase = "Inserting",
InProgressLowercase = "inserting",
- CompletedLowercase = "inserted"
+ CompletedLowercase = "inserted",
+ CompletedUppercase = "Inserted",
},
context,
out recordsAffected,
@@ -164,6 +165,10 @@ public override void Execute(NodeExecutionContext context, out int recordsAffect
throw;
}
+ catch (OperationCanceledException)
+ {
+ throw;
+ }
catch (Exception ex)
{
throw new QueryExecutionException(ex.Message, ex) { Node = this };
diff --git a/MarkMpn.Sql4Cds.Engine/ExecutionPlan/UpdateNode.cs b/MarkMpn.Sql4Cds.Engine/ExecutionPlan/UpdateNode.cs
index b67441db..880adbdc 100644
--- a/MarkMpn.Sql4Cds.Engine/ExecutionPlan/UpdateNode.cs
+++ b/MarkMpn.Sql4Cds.Engine/ExecutionPlan/UpdateNode.cs
@@ -459,7 +459,8 @@ public override void Execute(NodeExecutionContext context, out int recordsAffect
{
InProgressUppercase = "Updating",
InProgressLowercase = "updating",
- CompletedLowercase = "updated"
+ CompletedLowercase = "updated",
+ CompletedUppercase = "Updated"
},
context,
out recordsAffected,
@@ -473,6 +474,10 @@ public override void Execute(NodeExecutionContext context, out int recordsAffect
throw;
}
+ catch (OperationCanceledException)
+ {
+ throw;
+ }
catch (Exception ex)
{
throw new QueryExecutionException(ex.Message, ex) { Node = this };
From eca9902c2562b9d46fef9c4ffba710453cde9e3a Mon Sep 17 00:00:00 2001
From: Mark Carrington <31017244+MarkMpn@users.noreply.github.com>
Date: Sun, 10 Nov 2024 16:42:28 +0000
Subject: [PATCH 3/6] Handle service protection limit reporting & cancellation
for FetchXML and message execution
---
MarkMpn.Sql4Cds.Engine/DataSource.cs | 65 +++++++++++++++++--
.../ExecutionPlan/BaseDmlNode.cs | 52 +++++++--------
.../ExecutionPlan/ExceptionExtensions.cs | 41 ++++++++++++
.../ExecutionPlan/ExecuteAsNode.cs | 2 +-
.../ExecutionPlan/ExecuteMessageNode.cs | 7 +-
.../ExecutionPlan/FetchXmlScan.cs | 51 +++++----------
.../ExecutionPlan/PartitionedAggregateNode.cs | 7 +-
MarkMpn.Sql4Cds.Engine/ExpressionFunctions.cs | 2 +-
MarkMpn.Sql4Cds.Engine/SessionContext.cs | 2 +-
9 files changed, 155 insertions(+), 74 deletions(-)
create mode 100644 MarkMpn.Sql4Cds.Engine/ExecutionPlan/ExceptionExtensions.cs
diff --git a/MarkMpn.Sql4Cds.Engine/DataSource.cs b/MarkMpn.Sql4Cds.Engine/DataSource.cs
index 3792809a..479c85cc 100644
--- a/MarkMpn.Sql4Cds.Engine/DataSource.cs
+++ b/MarkMpn.Sql4Cds.Engine/DataSource.cs
@@ -6,6 +6,10 @@
using static Microsoft.ApplicationInsights.MetricDimensionNames.TelemetryContext;
using Microsoft.Crm.Sdk.Messages;
using System.Linq;
+using System.ServiceModel;
+using System.Threading.Tasks;
+
+
#if NETCOREAPP
using Microsoft.PowerPlatform.Dataverse.Client;
@@ -174,16 +178,65 @@ internal Collation DefaultCollation
}
///
- /// Executes a request against the data source
+ /// Executes a request against the data source, with additional feedback when the response is delayed due to service protection limits
///
/// The request to execute
+ /// The which provides logging capabilities
+ /// The templated log message to display when the request is being throttled
/// The response to the request
- ///
- /// This method automatically preserves the session token for Elastic table consistency
- ///
- internal OrganizationResponse Execute(OrganizationRequest request)
+ internal OrganizationResponse ExecuteWithServiceProtectionLimitLogging(OrganizationRequest request, IQueryExecutionOptions options, string logMessage)
{
- return Execute(Connection, request);
+ var oldRetryCount = 0;
+
+#if NETCOREAPP
+ var crmService = Connection as ServiceClient;
+#else
+ var crmService = Connection as CrmServiceClient;
+#endif
+
+ if (crmService != null)
+ {
+ oldRetryCount = crmService.MaxRetryCount;
+ crmService.MaxRetryCount = 0;
+ }
+
+ try
+ {
+ var task = Task.Run(() =>
+ {
+ for (var retry = 0; ; retry++)
+ {
+ try
+ {
+ return Execute(Connection, request);
+ }
+ catch (FaultException ex)
+ {
+ if (!ex.IsThrottlingException(out var retryDelay))
+ throw;
+
+ options.Progress(0, logMessage + $" (paused until {DateTime.Now.Add(retryDelay).ToShortTimeString()} due to service protection limits)");
+ Task.Delay(retryDelay, options.CancellationToken).Wait();
+ }
+ }
+ });
+
+ try
+ {
+ task.Wait(options.CancellationToken);
+ }
+ catch (AggregateException ex)
+ {
+ throw ex.InnerException;
+ }
+
+ return task.Result;
+ }
+ finally
+ {
+ if (crmService != null)
+ crmService.MaxRetryCount = oldRetryCount;
+ }
}
///
diff --git a/MarkMpn.Sql4Cds.Engine/ExecutionPlan/BaseDmlNode.cs b/MarkMpn.Sql4Cds.Engine/ExecutionPlan/BaseDmlNode.cs
index b21fb602..d36a5df0 100644
--- a/MarkMpn.Sql4Cds.Engine/ExecutionPlan/BaseDmlNode.cs
+++ b/MarkMpn.Sql4Cds.Engine/ExecutionPlan/BaseDmlNode.cs
@@ -612,22 +612,6 @@ private async Task UpdateNextBatchSize(ParallelThreadState threadLocalState, Fun
threadLocalState.NextBatchSize = Math.Max(1, Math.Min(BatchSize, (int)(threadLocalState.NextBatchSize * multiplier)));
}
- private bool IsThrottlingException(FaultException ex)
- {
- if (ex == null)
- return false;
-
- if (ex.Detail.ErrorCode == 429 || // Virtual/elastic tables
- ex.Detail.ErrorCode == -2147015902 || // Number of requests exceeded the limit of 6000 over time window of 300 seconds.
- ex.Detail.ErrorCode == -2147015903 || // Combined execution time of incoming requests exceeded limit of 1,200,000 milliseconds over time window of 300 seconds. Decrease number of concurrent requests or reduce the duration of requests and try again later.
- ex.Detail.ErrorCode == -2147015898) // Number of concurrent requests exceeded the limit of 52.
- {
- return true;
- }
-
- return false;
- }
-
private bool IsRetryableFault(OrganizationServiceFault fault)
{
if (fault == null)
@@ -665,17 +649,22 @@ protected class BulkApiErrorDetail
}
catch (FaultException ex)
{
- if (IsThrottlingException(ex))
+ if (ex.IsThrottlingException(out var retryDelay))
{
// Handle service protection limit retries ourselves to manage multi-threading
// Wait for the recommended retry time, then add the request to the queue for retrying
// Terminate this thread so we don't continue to overload the server
- var retryDelay = TimeSpan.FromSeconds(2);
+ newCount = Interlocked.Decrement(ref _inProgressCount);
- if (ex.Detail.ErrorDetails.TryGetValue("Retry-After", out var retryAfter) && retryAfter is TimeSpan ts)
- retryDelay = ts;
+ // The server can report too-long delays. Wait the full 5 minutes to start with,
+ // then reduce to 2 minutes then 1 minute
+ if (retry < 1 && retryDelay > TimeSpan.FromMinutes(5))
+ retryDelay = TimeSpan.FromMinutes(5);
+ else if (retry >= 1 && retry < 3 && retryDelay > TimeSpan.FromMinutes(2))
+ retryDelay = TimeSpan.FromMinutes(2);
+ else if (retry >= 3 && retryDelay > TimeSpan.FromMinutes(1))
+ retryDelay = TimeSpan.FromMinutes(1);
- newCount = Interlocked.Decrement(ref _inProgressCount);
_delayedUntil[threadState] = DateTime.Now.Add(retryDelay);
ShowProgress(options, newCount, operationNames, meta);
@@ -715,9 +704,13 @@ private void ShowProgress(IQueryExecutionOptions options, int newCount, Operatio
var operationName = operationNames.InProgressUppercase;
var delayedUntilValues = _delayedUntil.Values.ToArray();
- if (delayedUntilValues.Length > 0)
+ if (delayedUntilValues.Length == _threadCount)
{
operationName = operationNames.CompletedUppercase;
+ threadCountMessage += $" (paused until {delayedUntilValues.Min().ToShortTimeString()} due to service protection limits)";
+ }
+ else if (delayedUntilValues.Length > 0)
+ {
threadCountMessage += $" ({delayedUntilValues.Length:N0} threads paused until {delayedUntilValues.Min().ToShortTimeString()} due to service protection limits)";
}
@@ -835,18 +828,23 @@ private void ShowProgress(IQueryExecutionOptions options, int newCount, Operatio
}
catch (FaultException ex)
{
- if (!IsThrottlingException(ex))
+ if (!ex.IsThrottlingException(out var retryDelay))
throw;
// Handle service protection limit retries ourselves to manage multi-threading
// Wait for the recommended retry time, then add the request to the queue for retrying
// Terminate this thread so we don't continue to overload the server
- var retryDelay = TimeSpan.FromSeconds(2);
+ newCount = Interlocked.Add(ref _inProgressCount, -req.Requests.Count);
- if (ex.Detail.ErrorDetails.TryGetValue("Retry-After", out var retryAfter) && retryAfter is TimeSpan ts)
- retryDelay = ts;
+ // The server can report too-long delays. Wait the full 5 minutes to start with,
+ // then reduce to 2 minutes then 1 minute
+ if (retry < 1 && retryDelay > TimeSpan.FromMinutes(5))
+ retryDelay = TimeSpan.FromMinutes(5);
+ else if (retry >= 1 && retry < 3 && retryDelay > TimeSpan.FromMinutes(2))
+ retryDelay = TimeSpan.FromMinutes(2);
+ else if (retry >= 3 && retryDelay > TimeSpan.FromMinutes(1))
+ retryDelay = TimeSpan.FromMinutes(1);
- newCount = Interlocked.Add(ref _inProgressCount, -req.Requests.Count);
_delayedUntil[threadState] = DateTime.Now.Add(retryDelay);
ShowProgress(options, newCount, operationNames, meta);
diff --git a/MarkMpn.Sql4Cds.Engine/ExecutionPlan/ExceptionExtensions.cs b/MarkMpn.Sql4Cds.Engine/ExecutionPlan/ExceptionExtensions.cs
new file mode 100644
index 00000000..e5e884f1
--- /dev/null
+++ b/MarkMpn.Sql4Cds.Engine/ExecutionPlan/ExceptionExtensions.cs
@@ -0,0 +1,41 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.ServiceModel;
+using System.Text;
+using System.Threading.Tasks;
+using Microsoft.Xrm.Sdk;
+
+namespace MarkMpn.Sql4Cds.Engine.ExecutionPlan
+{
+ static class ExceptionExtensions
+ {
+ public static bool IsThrottlingException(this FaultException ex, out TimeSpan retryDelay)
+ {
+ if (ex == null)
+ {
+ retryDelay = TimeSpan.Zero;
+ return false;
+ }
+
+ if (ex.Detail.ErrorCode == 429 || // Virtual/elastic tables
+ ex.Detail.ErrorCode == -2147015902 || // Number of requests exceeded the limit of 6000 over time window of 300 seconds.
+ ex.Detail.ErrorCode == -2147015903 || // Combined execution time of incoming requests exceeded limit of 1,200,000 milliseconds over time window of 300 seconds. Decrease number of concurrent requests or reduce the duration of requests and try again later.
+ ex.Detail.ErrorCode == -2147015898) // Number of concurrent requests exceeded the limit of 52.
+ {
+ retryDelay = TimeSpan.FromSeconds(2);
+
+ if (ex.Detail.ErrorDetails.TryGetValue("Retry-After", out var retryAfter) && retryAfter is TimeSpan ts)
+ retryDelay = ts;
+
+ if (retryDelay > TimeSpan.FromMinutes(5))
+ retryDelay = TimeSpan.FromMinutes(5);
+
+ return true;
+ }
+
+ retryDelay = TimeSpan.Zero;
+ return false;
+ }
+ }
+}
diff --git a/MarkMpn.Sql4Cds.Engine/ExecutionPlan/ExecuteAsNode.cs b/MarkMpn.Sql4Cds.Engine/ExecutionPlan/ExecuteAsNode.cs
index ce738ac2..095f2a34 100644
--- a/MarkMpn.Sql4Cds.Engine/ExecutionPlan/ExecuteAsNode.cs
+++ b/MarkMpn.Sql4Cds.Engine/ExecutionPlan/ExecuteAsNode.cs
@@ -134,7 +134,7 @@ public override void Execute(NodeExecutionContext context, out int recordsAffect
{
var qry = new Microsoft.Xrm.Sdk.Query.QueryExpression("systemuser");
qry.Criteria.AddCondition("systemuserid", ConditionOperator.EqualUserId);
- var actualUserId = ((RetrieveMultipleResponse)dataSource.Execute(new RetrieveMultipleRequest { Query = qry })).EntityCollection.Entities.Single().Id;
+ var actualUserId = ((RetrieveMultipleResponse)dataSource.ExecuteWithServiceProtectionLimitLogging(new RetrieveMultipleRequest { Query = qry }, context.Options, "Impersonating user...")).EntityCollection.Entities.Single().Id;
if (actualUserId != userId)
throw new QueryExecutionException(Sql4CdsError.ImpersonationError(username), new ApplicationException("User was found but the server could not impersonate it"));
diff --git a/MarkMpn.Sql4Cds.Engine/ExecutionPlan/ExecuteMessageNode.cs b/MarkMpn.Sql4Cds.Engine/ExecutionPlan/ExecuteMessageNode.cs
index 159ddbf7..bb50e5fc 100644
--- a/MarkMpn.Sql4Cds.Engine/ExecutionPlan/ExecuteMessageNode.cs
+++ b/MarkMpn.Sql4Cds.Engine/ExecutionPlan/ExecuteMessageNode.cs
@@ -286,7 +286,8 @@ protected override IEnumerable ExecuteInternal(NodeExecutionContext cont
if (!context.Session.DataSources.TryGetValue(DataSource, out var dataSource))
throw new NotSupportedQueryFragmentException("Missing datasource " + DataSource);
- context.Options.Progress(0, $"Executing {MessageName}...");
+ var progressMessage = $"Executing {MessageName}...";
+ context.Options.Progress(0, progressMessage);
// Get the first page of results
if (!context.Options.ContinueRetrieve(0))
@@ -302,7 +303,7 @@ protected override IEnumerable ExecuteInternal(NodeExecutionContext cont
if (BypassCustomPluginExecution)
request.Parameters["BypassCustomPluginExecution"] = true;
- var response = dataSource.Connection.Execute(request);
+ var response = dataSource.ExecuteWithServiceProtectionLimitLogging(request, context.Options, progressMessage);
var entities = GetEntityCollection(response);
PagesRetrieved++;
@@ -325,7 +326,7 @@ protected override IEnumerable ExecuteInternal(NodeExecutionContext cont
};
- response = dataSource.Connection.Execute(request);
+ response = dataSource.ExecuteWithServiceProtectionLimitLogging(request, context.Options, progressMessage);
entities = GetEntityCollection(response);
PagesRetrieved++;
diff --git a/MarkMpn.Sql4Cds.Engine/ExecutionPlan/FetchXmlScan.cs b/MarkMpn.Sql4Cds.Engine/ExecutionPlan/FetchXmlScan.cs
index 48bba560..8e566438 100644
--- a/MarkMpn.Sql4Cds.Engine/ExecutionPlan/FetchXmlScan.cs
+++ b/MarkMpn.Sql4Cds.Engine/ExecutionPlan/FetchXmlScan.cs
@@ -297,8 +297,17 @@ protected override IEnumerable ExecuteInternal(NodeExecutionContext cont
var meta = dataSource.Metadata[name];
_isVirtualEntity = meta.DataProviderId != null && meta.DataProviderId != DataProviders.ElasticDataProvider;
- if (!(Parent is PartitionedAggregateNode))
- context.Options.Progress(0, $"Retrieving {GetDisplayName(0, meta)}...");
+ string progressMessage;
+
+ if (Parent is PartitionedAggregateNode partitioned)
+ {
+ progressMessage = partitioned.ProgressMessage;
+ }
+ else
+ {
+ progressMessage = $"Retrieving {GetDisplayName(0, meta)}...";
+ context.Options.Progress(0, progressMessage);
+ }
// Get the first page of results
if (!context.Options.ContinueRetrieve(0))
@@ -349,21 +358,7 @@ protected override IEnumerable ExecuteInternal(NodeExecutionContext cont
try
{
- var task = Task.Run(() =>
- {
- return ((RetrieveMultipleResponse)dataSource.Execute(req)).EntityCollection;
- });
-
- try
- {
- task.Wait(context.Options.CancellationToken);
- }
- catch (AggregateException ex)
- {
- throw ex.InnerException;
- }
-
- res = task.Result;
+ res = ((RetrieveMultipleResponse)dataSource.ExecuteWithServiceProtectionLimitLogging(req, context.Options, progressMessage)).EntityCollection;
}
catch (FaultException ex)
{
@@ -400,7 +395,10 @@ protected override IEnumerable ExecuteInternal(NodeExecutionContext cont
while (AllPages && res.MoreRecords && context.Options.ContinueRetrieve(count))
{
if (!(Parent is PartitionedAggregateNode))
- context.Options.Progress(0, $"Retrieved {count:N0} {GetDisplayName(count, meta)}...");
+ {
+ progressMessage = $"Retrieved {count:N0} {GetDisplayName(count, meta)}...";
+ context.Options.Progress(0, progressMessage);
+ }
filter pagingFilter = null;
@@ -422,22 +420,7 @@ protected override IEnumerable ExecuteInternal(NodeExecutionContext cont
}
((FetchExpression)req.Query).Query = Serialize(FetchXml);
-
- var task = Task.Run(() =>
- {
- return ((RetrieveMultipleResponse)dataSource.Execute(req)).EntityCollection;
- });
-
- try
- {
- task.Wait(context.Options.CancellationToken);
- }
- catch (AggregateException ex)
- {
- throw ex.InnerException;
- }
-
- var nextPage = task.Result;
+ var nextPage = ((RetrieveMultipleResponse)dataSource.ExecuteWithServiceProtectionLimitLogging(req, context.Options, progressMessage)).EntityCollection;
PagesRetrieved++;
diff --git a/MarkMpn.Sql4Cds.Engine/ExecutionPlan/PartitionedAggregateNode.cs b/MarkMpn.Sql4Cds.Engine/ExecutionPlan/PartitionedAggregateNode.cs
index 393c0303..d8830b8e 100644
--- a/MarkMpn.Sql4Cds.Engine/ExecutionPlan/PartitionedAggregateNode.cs
+++ b/MarkMpn.Sql4Cds.Engine/ExecutionPlan/PartitionedAggregateNode.cs
@@ -53,6 +53,9 @@ class Partition
[Description("The maximum number of partitions that will be executed in parallel")]
public int MaxDOP { get; set; }
+ [Browsable(false)]
+ internal string ProgressMessage { get; set; }
+
public override IDataExecutionPlanNodeInternal FoldQuery(NodeCompilationContext context, IList hints)
{
Source = Source.FoldQuery(context, hints);
@@ -213,7 +216,9 @@ protected override IEnumerable ExecuteInternal(NodeExecutionContext cont
lock (_lock)
{
_progress += partition.Percentage;
- context.Options.Progress(0, $"Partitioning {GetDisplayName(0, meta)} ({_progress:P0})...");
+ ProgressMessage = $"Partitioning {GetDisplayName(0, meta)} ({_progress:P0})...";
+
+ context.Options.Progress(0, ProgressMessage);
}
if (Interlocked.Decrement(ref _pendingPartitions) == 0)
diff --git a/MarkMpn.Sql4Cds.Engine/ExpressionFunctions.cs b/MarkMpn.Sql4Cds.Engine/ExpressionFunctions.cs
index 8a531e17..d5d7ec50 100644
--- a/MarkMpn.Sql4Cds.Engine/ExpressionFunctions.cs
+++ b/MarkMpn.Sql4Cds.Engine/ExpressionFunctions.cs
@@ -1435,7 +1435,7 @@ public static SqlVariant ServerProperty(SqlString propertyName, ExpressionExecut
#endif
if (orgVersion == null)
- orgVersion = ((RetrieveVersionResponse)dataSource.Execute(new RetrieveVersionRequest())).Version;
+ orgVersion = ((RetrieveVersionResponse)dataSource.ExecuteWithServiceProtectionLimitLogging(new RetrieveVersionRequest(), context.Options, "Retrieving server version...")).Version;
return new SqlVariant(DataTypeHelpers.NVarChar(128, dataSource.DefaultCollation, CollationLabel.CoercibleDefault), dataSource.DefaultCollation.ToSqlString(orgVersion), context);
}
diff --git a/MarkMpn.Sql4Cds.Engine/SessionContext.cs b/MarkMpn.Sql4Cds.Engine/SessionContext.cs
index d8dda573..14a76f10 100644
--- a/MarkMpn.Sql4Cds.Engine/SessionContext.cs
+++ b/MarkMpn.Sql4Cds.Engine/SessionContext.cs
@@ -56,7 +56,7 @@ private SqlString GetVersion()
#endif
if (orgVersion == null)
- orgVersion = ((RetrieveVersionResponse)dataSource.Execute(new RetrieveVersionRequest())).Version;
+ orgVersion = ((RetrieveVersionResponse)dataSource.ExecuteWithServiceProtectionLimitLogging(new RetrieveVersionRequest(), _context._options, "Retrieving server version...")).Version;
var assembly = typeof(Sql4CdsConnection).Assembly;
var assemblyVersion = assembly.GetName().Version;
From 2531aa17a651017cfa11926586bae73c73780994 Mon Sep 17 00:00:00 2001
From: Mark Carrington <31017244+MarkMpn@users.noreply.github.com>
Date: Thu, 14 Nov 2024 18:59:32 +0000
Subject: [PATCH 4/6] More intuitive thread scaling & retries
---
.../ExecutionPlan/BaseDmlNode.cs | 178 ++++++++++++------
.../ExecutionPlan/DynamicParallel.cs | 70 ++++---
2 files changed, 164 insertions(+), 84 deletions(-)
diff --git a/MarkMpn.Sql4Cds.Engine/ExecutionPlan/BaseDmlNode.cs b/MarkMpn.Sql4Cds.Engine/ExecutionPlan/BaseDmlNode.cs
index d36a5df0..ddd71121 100644
--- a/MarkMpn.Sql4Cds.Engine/ExecutionPlan/BaseDmlNode.cs
+++ b/MarkMpn.Sql4Cds.Engine/ExecutionPlan/BaseDmlNode.cs
@@ -61,6 +61,35 @@ public void Dispose()
}
}
+ class ParallelLoopState : DynamicParallelLoopState
+ {
+ private int _successfulExecution;
+
+ public override void RequestReduceThreadCount()
+ {
+ if (Interlocked.CompareExchange(ref _successfulExecution, 0, 0) == 0)
+ return;
+
+ base.RequestReduceThreadCount();
+ }
+
+ public override bool ResetReduceThreadCount()
+ {
+ if (base.ResetReduceThreadCount())
+ {
+ Interlocked.Exchange(ref _successfulExecution, 0);
+ return true;
+ }
+
+ return false;
+ }
+
+ public void IncrementSuccessfulExecution()
+ {
+ Interlocked.Increment(ref _successfulExecution);
+ }
+ }
+
class ParallelThreadState
{
public IOrganizationService Service { get; set; }
@@ -456,7 +485,7 @@ protected void ExecuteDmlOperation(DataSource dataSource, IQueryExecutionOptions
_retryQueue = new ConcurrentQueue();
_delayedUntil = new ConcurrentDictionary();
- new DynamicParallel(
+ new DynamicParallel(
entities,
_parallelOptions,
() =>
@@ -488,16 +517,17 @@ protected void ExecuteDmlOperation(DataSource dataSource, IQueryExecutionOptions
},
async (entity, loopState, threadLocalState) =>
{
+ var executed = false;
+
try
{
if (options.CancellationToken.IsCancellationRequested)
{
loopState.Stop();
- return DynamicParallelVote.DecreaseThreads;
+ return;
}
- // TODO: Take any requests from the retry queue and execute them first
-
+ // Generate the request to insert/update/delete this record
var request = requestGenerator(entity);
if (BypassCustomPluginExecution)
@@ -505,15 +535,10 @@ protected void ExecuteDmlOperation(DataSource dataSource, IQueryExecutionOptions
if (threadLocalState.NextBatchSize == 1)
{
- DynamicParallelVote? vote = null;
-
await UpdateNextBatchSize(threadLocalState, async () =>
{
- vote = await ExecuteSingleRequest(request, operationNames, meta, options, dataSource, threadLocalState.Service, context, responseHandler, threadLocalState);
+ executed = await ExecuteSingleRequest(request, operationNames, meta, options, dataSource, threadLocalState.Service, context, responseHandler, threadLocalState, loopState);
});
-
- if (vote != null)
- return vote;
}
else
{
@@ -533,35 +558,75 @@ await UpdateNextBatchSize(threadLocalState, async () =>
threadLocalState.EMR.Requests.Add(request);
if (threadLocalState.EMR.Requests.Count < threadLocalState.NextBatchSize)
- return null;
-
- DynamicParallelVote? vote = null;
+ return;
await UpdateNextBatchSize(threadLocalState, async () =>
{
- vote = await ProcessBatch(threadLocalState.EMR, operationNames, meta, options, dataSource, threadLocalState.Service, context, responseHandler, threadLocalState);
+ executed = await ProcessBatch(threadLocalState.EMR, operationNames, meta, options, dataSource, threadLocalState.Service, context, responseHandler, threadLocalState, loopState);
});
threadLocalState.EMR = null;
-
- if (vote != null)
- return vote;
}
-
- return DynamicParallelVote.IncreaseThreads;
}
catch
{
threadLocalState.Error = true;
throw;
}
+
+ // Take any requests from the retry queue and execute them
+ while (executed && _retryQueue.TryDequeue(out var retryReq))
+ {
+ if (options.CancellationToken.IsCancellationRequested)
+ {
+ loopState.Stop();
+ return;
+ }
+
+ if (loopState.IsStopped)
+ return;
+
+ if (retryReq is ExecuteMultipleRequest emr)
+ executed = await ProcessBatch(emr, operationNames, meta, options, dataSource, threadLocalState.Service, context, responseHandler, threadLocalState, loopState);
+ else
+ executed = await ExecuteSingleRequest(retryReq, operationNames, meta, options, dataSource, threadLocalState.Service, context, responseHandler, threadLocalState, loopState);
+ }
+
+ if (executed)
+ loopState.RequestIncreaseThreadCount();
},
- async (threadLocalState) =>
+ async (loopState, threadLocalState) =>
{
+ // If we've got a partial batch, execute it now
if (threadLocalState.EMR != null && !threadLocalState.Error)
- await ProcessBatch(threadLocalState.EMR, operationNames, meta, options, dataSource, threadLocalState.Service, context, responseHandler, threadLocalState);
+ await ProcessBatch(threadLocalState.EMR, operationNames, meta, options, dataSource, threadLocalState.Service, context, responseHandler, threadLocalState, loopState);
+
+ // If this is the final thread, execute any remaining requests in the retry queue
+ if (!loopState.IsStopped &&
+ !options.CancellationToken.IsCancellationRequested &&
+ Interlocked.CompareExchange(ref _threadCount, 1, 1) == 1)
+ {
+ // Take any requests from the retry queue and execute them
+ while (_retryQueue.TryDequeue(out var retryReq))
+ {
+ if (options.CancellationToken.IsCancellationRequested)
+ {
+ loopState.Stop();
+ return;
+ }
+
+ if (loopState.IsStopped)
+ return;
+
+ if (retryReq is ExecuteMultipleRequest emr)
+ await ProcessBatch(emr, operationNames, meta, options, dataSource, threadLocalState.Service, context, responseHandler, threadLocalState, loopState);
+ else
+ await ExecuteSingleRequest(retryReq, operationNames, meta, options, dataSource, threadLocalState.Service, context, responseHandler, threadLocalState, loopState);
+ }
+ }
Interlocked.Decrement(ref _threadCount);
+ ShowProgress(options, operationNames, meta);
if (threadLocalState.Service != dataSource.Connection && threadLocalState.Service is IDisposable disposableClient)
disposableClient.Dispose();
@@ -632,20 +697,22 @@ protected class BulkApiErrorDetail
public int StatusCode { get; set; }
}
- private async Task ExecuteSingleRequest(OrganizationRequest req, OperationNames operationNames, EntityMetadata meta, IQueryExecutionOptions options, DataSource dataSource, IOrganizationService org, NodeExecutionContext context, Action responseHandler, ParallelThreadState threadState)
+ private async Task ExecuteSingleRequest(OrganizationRequest req, OperationNames operationNames, EntityMetadata meta, IQueryExecutionOptions options, DataSource dataSource, IOrganizationService org, NodeExecutionContext context, Action responseHandler, ParallelThreadState threadState, ParallelLoopState loopState)
{
- var newCount = Interlocked.Increment(ref _inProgressCount);
- ShowProgress(options, newCount, operationNames, meta);
+ Interlocked.Increment(ref _inProgressCount);
for (var retry = 0; ; retry++)
{
try
{
+ ShowProgress(options, operationNames, meta);
var response = dataSource.Execute(org, req);
Interlocked.Increment(ref _successCount);
+ loopState.IncrementSuccessfulExecution();
+
responseHandler?.Invoke(response);
- break;
+ return true;
}
catch (FaultException ex)
{
@@ -654,7 +721,7 @@ protected class BulkApiErrorDetail
// Handle service protection limit retries ourselves to manage multi-threading
// Wait for the recommended retry time, then add the request to the queue for retrying
// Terminate this thread so we don't continue to overload the server
- newCount = Interlocked.Decrement(ref _inProgressCount);
+ Interlocked.Decrement(ref _inProgressCount);
// The server can report too-long delays. Wait the full 5 minutes to start with,
// then reduce to 2 minutes then 1 minute
@@ -666,14 +733,18 @@ protected class BulkApiErrorDetail
retryDelay = TimeSpan.FromMinutes(1);
_delayedUntil[threadState] = DateTime.Now.Add(retryDelay);
- ShowProgress(options, newCount, operationNames, meta);
+ ShowProgress(options, operationNames, meta);
+ loopState.RequestReduceThreadCount();
await Task.Delay(retryDelay, options.CancellationToken);
_delayedUntil.TryRemove(threadState, out _);
+ ShowProgress(options, operationNames, meta);
_retryQueue.Enqueue(req);
- return DynamicParallelVote.DecreaseThreads;
+ return false;
}
+ loopState.IncrementSuccessfulExecution();
+
if (IsRetryableFault(ex?.Detail))
{
// Retry the request after a short delay
@@ -690,21 +761,19 @@ protected class BulkApiErrorDetail
}
Interlocked.Increment(ref _errorCount);
- break;
+ return true;
}
}
-
- return null;
}
- private void ShowProgress(IQueryExecutionOptions options, int newCount, OperationNames operationNames, EntityMetadata meta)
+ private void ShowProgress(IQueryExecutionOptions options, OperationNames operationNames, EntityMetadata meta)
{
- var progress = (double)newCount / _entityCount;
+ var progress = (double)_inProgressCount / _entityCount;
var threadCountMessage = _threadCount < 2 ? "" : $" ({_threadCount:N0} threads)";
var operationName = operationNames.InProgressUppercase;
var delayedUntilValues = _delayedUntil.Values.ToArray();
- if (delayedUntilValues.Length == _threadCount)
+ if (delayedUntilValues.Length > 0 && delayedUntilValues.Length == _threadCount)
{
operationName = operationNames.CompletedUppercase;
threadCountMessage += $" (paused until {delayedUntilValues.Min().ToShortTimeString()} due to service protection limits)";
@@ -714,23 +783,25 @@ private void ShowProgress(IQueryExecutionOptions options, int newCount, Operatio
threadCountMessage += $" ({delayedUntilValues.Length:N0} threads paused until {delayedUntilValues.Min().ToShortTimeString()} due to service protection limits)";
}
- if (_successCount + _errorCount + 1 >= newCount)
- options.Progress(progress, $"{operationName} {GetDisplayName(0, meta)} {newCount:N0} of {_entityCount:N0}{threadCountMessage}...");
+ if (_successCount + _errorCount + 1 >= _inProgressCount)
+ options.Progress(progress, $"{operationName} {GetDisplayName(0, meta)} {_inProgressCount:N0} of {_entityCount:N0}{threadCountMessage}...");
else
- options.Progress(progress, $"{operationName} {GetDisplayName(0, meta)} {_successCount + _errorCount + 1:N0} - {newCount:N0} of {_entityCount:N0}{threadCountMessage}...");
+ options.Progress(progress, $"{operationName} {GetDisplayName(0, meta)} {_successCount + _errorCount + 1:N0} - {_inProgressCount:N0} of {_entityCount:N0}{threadCountMessage}...");
}
- private async Task ProcessBatch(ExecuteMultipleRequest req, OperationNames operationNames, EntityMetadata meta, IQueryExecutionOptions options, DataSource dataSource, IOrganizationService org, NodeExecutionContext context, Action responseHandler, ParallelThreadState threadState)
+ private async Task ProcessBatch(ExecuteMultipleRequest req, OperationNames operationNames, EntityMetadata meta, IQueryExecutionOptions options, DataSource dataSource, IOrganizationService org, NodeExecutionContext context, Action responseHandler, ParallelThreadState threadState, ParallelLoopState loopState)
{
- var newCount = Interlocked.Add(ref _inProgressCount, req.Requests.Count);
- ShowProgress(options, newCount, operationNames, meta);
+ Interlocked.Add(ref _inProgressCount, req.Requests.Count);
for (var retry = 0; !options.CancellationToken.IsCancellationRequested; retry++)
{
try
{
+ ShowProgress(options, operationNames, meta);
var resp = ExecuteMultiple(dataSource, org, meta, req);
+ loopState.IncrementSuccessfulExecution();
+
if (responseHandler != null)
{
foreach (var item in resp.Responses)
@@ -782,12 +853,7 @@ private void ShowProgress(IQueryExecutionOptions options, int newCount, Operatio
retryReq.Requests.Add(req.Requests[errorItem.RequestIndex]);
// Wait and retry
- var retryDelay = TimeSpan.FromSeconds(2);
-
- if (retryableErrors[0].Fault.ErrorDetails.TryGetValue("Retry-After", out var retryAfter) && retryDelay is TimeSpan ts)
- retryDelay = ts;
-
- await Task.Delay(retryDelay, options.CancellationToken);
+ await Task.Delay(TimeSpan.FromSeconds(2), options.CancellationToken);
req = retryReq;
continue;
}
@@ -813,12 +879,7 @@ private void ShowProgress(IQueryExecutionOptions options, int newCount, Operatio
retryReq.Requests.Add(req.Requests[i]);
// Wait and retry
- var retryDelay = TimeSpan.FromSeconds(2);
-
- if (firstRetryableError.Fault.ErrorDetails.TryGetValue("Retry-After", out var retryAfter) && retryAfter is TimeSpan ts)
- retryDelay = ts;
-
- await Task.Delay(retryDelay, options.CancellationToken);
+ await Task.Delay(TimeSpan.FromSeconds(2), options.CancellationToken);
req = retryReq;
continue;
}
@@ -829,12 +890,15 @@ private void ShowProgress(IQueryExecutionOptions options, int newCount, Operatio
catch (FaultException ex)
{
if (!ex.IsThrottlingException(out var retryDelay))
+ {
+ loopState.IncrementSuccessfulExecution();
throw;
+ }
// Handle service protection limit retries ourselves to manage multi-threading
// Wait for the recommended retry time, then add the request to the queue for retrying
// Terminate this thread so we don't continue to overload the server
- newCount = Interlocked.Add(ref _inProgressCount, -req.Requests.Count);
+ Interlocked.Add(ref _inProgressCount, -req.Requests.Count);
// The server can report too-long delays. Wait the full 5 minutes to start with,
// then reduce to 2 minutes then 1 minute
@@ -846,16 +910,18 @@ private void ShowProgress(IQueryExecutionOptions options, int newCount, Operatio
retryDelay = TimeSpan.FromMinutes(1);
_delayedUntil[threadState] = DateTime.Now.Add(retryDelay);
- ShowProgress(options, newCount, operationNames, meta);
+ ShowProgress(options, operationNames, meta);
+ loopState.RequestReduceThreadCount();
await Task.Delay(retryDelay, options.CancellationToken);
_delayedUntil.TryRemove(threadState, out _);
+ ShowProgress(options, operationNames, meta);
_retryQueue.Enqueue(req);
- return DynamicParallelVote.DecreaseThreads;
+ return false;
}
}
- return null;
+ return true;
}
protected virtual bool FilterErrors(NodeExecutionContext context, OrganizationRequest request, OrganizationServiceFault fault)
diff --git a/MarkMpn.Sql4Cds.Engine/ExecutionPlan/DynamicParallel.cs b/MarkMpn.Sql4Cds.Engine/ExecutionPlan/DynamicParallel.cs
index fa6c14a5..253651d3 100644
--- a/MarkMpn.Sql4Cds.Engine/ExecutionPlan/DynamicParallel.cs
+++ b/MarkMpn.Sql4Cds.Engine/ExecutionPlan/DynamicParallel.cs
@@ -12,14 +12,13 @@ namespace MarkMpn.Sql4Cds.Engine.ExecutionPlan
/// Provides similar methods to the class, but allows the loop to indicate that the number of threads
/// should change during execution.
///
- class DynamicParallel
+ class DynamicParallel
+ where TLoop : DynamicParallelLoopState, new()
{
class DynamicParallelThreadState
{
public Task Task { get; set; }
- public DynamicParallelVote? Vote { get; set; }
-
public bool IsStopped { get; set; }
public CancellationToken CancellationToken { get; set; }
@@ -27,23 +26,23 @@ class DynamicParallelThreadState
private readonly ConcurrentQueue _queue;
private readonly List _threads;
- private readonly DynamicParallelLoopState _loopState;
+ private readonly TLoop _loopState;
private readonly ConcurrentBag _exceptions;
private readonly ParallelOptions _parallelOptions;
private readonly Func _localInit;
- private readonly Func> _body;
- private readonly Func _localFinally;
+ private readonly Func _body;
+ private readonly Func _localFinally;
public DynamicParallel(
IEnumerable source,
ParallelOptions parallelOptions,
Func localInit,
- Func> body,
- Func localFinally)
+ Func body,
+ Func localFinally)
{
_queue = new ConcurrentQueue(source);
_threads = new List();
- _loopState = new DynamicParallelLoopState();
+ _loopState = new TLoop();
_exceptions = new ConcurrentBag();
_parallelOptions = parallelOptions;
_localInit = localInit;
@@ -82,7 +81,7 @@ private async Task RunThread(DynamicParallelThreadState threadState)
!threadState.IsStopped &&
_queue.TryDequeue(out var item))
{
- threadState.Vote = await _body(item, _loopState, local) ?? threadState.Vote;
+ await _body(item, _loopState, local);
}
}
catch (Exception ex)
@@ -92,8 +91,8 @@ private async Task RunThread(DynamicParallelThreadState threadState)
}
finally
{
- await _localFinally(local);
- threadState.Vote = null;
+ await _localFinally(_loopState, local);
+ threadState.IsStopped = true;
}
}
@@ -111,41 +110,56 @@ private async Task MonitorAsync(CancellationToken cancellationToken)
if (task == allTasks)
break;
- var votes = _threads.Where(t => t.Vote != null && !t.IsStopped).Select(t => t.Vote.Value).ToList();
- var increase = votes.Count(v => v == DynamicParallelVote.IncreaseThreads);
- var decrease = votes.Count(v => v == DynamicParallelVote.DecreaseThreads);
-
- if (decrease > 0 && _threads.Count(t => !t.IsStopped) > 1)
+ if (_loopState.ResetReduceThreadCount() && _threads.Count(t => !t.IsStopped) > 1)
{
- var thread = _threads.First(t => !t.IsStopped && t.Vote == DynamicParallelVote.DecreaseThreads);
+ var thread = _threads.First(t => !t.IsStopped);
thread.IsStopped = true;
hasDecreased = true;
}
- else if (increase > 0 && !hasDecreased && _threads.Count(t => !t.IsStopped) < _parallelOptions.MaxDegreeOfParallelism)
+ else if (!hasDecreased && _threads.Count(t => !t.IsStopped) < _parallelOptions.MaxDegreeOfParallelism && _loopState.ResetIncreaseThreadCount())
{
StartThread(cancellationToken);
}
-
- foreach (var thread in _threads)
- thread.Vote = null;
-
}
}
}
class DynamicParallelLoopState
{
+ private int _reduceThreadCount;
+ private int _increaseThreadCount;
+
public bool IsStopped { get; private set; }
public void Stop()
{
IsStopped = true;
}
- }
- enum DynamicParallelVote
- {
- IncreaseThreads,
- DecreaseThreads,
+ public virtual void RequestReduceThreadCount()
+ {
+ Interlocked.Exchange(ref _reduceThreadCount, 1);
+ }
+
+ public virtual bool ResetReduceThreadCount()
+ {
+ if (Interlocked.CompareExchange(ref _reduceThreadCount, 0, 1) == 1)
+ return true;
+
+ return false;
+ }
+
+ public virtual void RequestIncreaseThreadCount()
+ {
+ Interlocked.Exchange(ref _increaseThreadCount, 1);
+ }
+
+ public virtual bool ResetIncreaseThreadCount()
+ {
+ if (Interlocked.CompareExchange(ref _increaseThreadCount, 0, 1) == 1)
+ return true;
+
+ return false;
+ }
}
}
From 1be3e81b50c429de9d5431a31aaffa3a7cfd18ce Mon Sep 17 00:00:00 2001
From: Mark Carrington <31017244+MarkMpn@users.noreply.github.com>
Date: Sun, 17 Nov 2024 11:13:41 +0000
Subject: [PATCH 5/6] Improved stats feedback for large DML operations
---
.../ExecutionPlan/BaseDmlNode.cs | 112 ++++++++++++++++++
.../ExecutionPlanNodeTypeDescriptor.cs | 48 ++++++++
2 files changed, 160 insertions(+)
diff --git a/MarkMpn.Sql4Cds.Engine/ExecutionPlan/BaseDmlNode.cs b/MarkMpn.Sql4Cds.Engine/ExecutionPlan/BaseDmlNode.cs
index ddd71121..5d1a5562 100644
--- a/MarkMpn.Sql4Cds.Engine/ExecutionPlan/BaseDmlNode.cs
+++ b/MarkMpn.Sql4Cds.Engine/ExecutionPlan/BaseDmlNode.cs
@@ -106,10 +106,17 @@ class ParallelThreadState
private int _successCount;
private int _errorCount;
private int _threadCount;
+ private int _maxThreadCount;
+ private int _pausedThreadCount;
+ private int _batchExecutionCount;
private ParallelOptions _parallelOptions;
private ConcurrentQueue _retryQueue;
private ConcurrentDictionary _delayedUntil;
private OrganizationServiceFault _fault;
+ private int _serviceProtectionLimitHits;
+ private int[] _threadCountHistory;
+ private int[] _rpmHistory;
+ private float[] _batchSizeHistory;
///
/// The SQL string that the query was converted from
@@ -148,12 +155,50 @@ class ParallelThreadState
///
/// The maximum degree of parallelism to apply to this operation
///
+ [DisplayName("Max Degree of Parallelism")]
[Description("The maximum number of operations that will be performed in parallel")]
public abstract int MaxDOP { get; set; }
+ [Category("Statistics")]
+ [DisplayName("Actual Degree of Parallelism")]
+ [BrowsableInEstimatedPlan(false)]
+ [Description("The number of threads that were running each minute during the operation")]
+ [TypeConverter(typeof(MiniChartConverter))]
+#if !NETCOREAPP
+ [Editor(typeof(MiniChartEditor), typeof(System.Drawing.Design.UITypeEditor))]
+#endif
+ public float[] ActualDOP => _threadCountHistory.Select(i => (float)i).ToArray();
+
+ [Category("Statistics")]
+ [DisplayName("Records Per Minute")]
+ [BrowsableInEstimatedPlan(false)]
+ [Description("The number of records that were processed each minute during the operation")]
+ [TypeConverter(typeof(MiniChartConverter))]
+#if !NETCOREAPP
+ [Editor(typeof(MiniChartEditor), typeof(System.Drawing.Design.UITypeEditor))]
+#endif
+ public float[] RPM => _rpmHistory.Select(i => (float)i).ToArray();
+
+ [Category("Statistics")]
+ [DisplayName("Actual Batch Size")]
+ [BrowsableInEstimatedPlan(false)]
+ [Description("The average number of records that were processed per batch each minute during the operation")]
+ [TypeConverter(typeof(MiniChartConverter))]
+#if !NETCOREAPP
+ [Editor(typeof(MiniChartEditor), typeof(System.Drawing.Design.UITypeEditor))]
+#endif
+ public float[] ActualBatchSize => _batchSizeHistory;
+
+ [Category("Statistics")]
+ [DisplayName("Service Protection Limit Hits")]
+ [BrowsableInEstimatedPlan(false)]
+ [Description("The number of times execution was paused due to service protection limits")]
+ public int ServiceProtectionLimitHits => _serviceProtectionLimitHits;
+
///
/// The number of requests that will be submitted in a single batch
///
+ [DisplayName("Max Batch Size")]
[Description("The number of requests that will be submitted in a single batch")]
public abstract int BatchSize { get; set; }
@@ -458,6 +503,7 @@ protected void ExecuteDmlOperation(DataSource dataSource, IQueryExecutionOptions
_successCount = 0;
_errorCount = 0;
_threadCount = 0;
+ _batchExecutionCount = 0;
#if NETCOREAPP
var svc = dataSource.Connection as ServiceClient;
@@ -465,6 +511,10 @@ protected void ExecuteDmlOperation(DataSource dataSource, IQueryExecutionOptions
var svc = dataSource.Connection as CrmServiceClient;
#endif
+ var threadCountHistory = new List();
+ var rpmHistory = new List();
+ var batchSizeHistory = new List();
+
var maxDop = MaxDOP;
if (!ParallelismHelper.CanParallelise(dataSource.Connection))
@@ -474,6 +524,48 @@ protected void ExecuteDmlOperation(DataSource dataSource, IQueryExecutionOptions
svc = null;
var useAffinityCookie = maxDop == 1 || _entityCount < 100;
+ var completed = new CancellationTokenSource();
+
+ // Set up one background thread to monitor the performance for debugging
+ var performanceMonitor = Task.Factory.StartNew(async () =>
+ {
+ var lastSuccess = 0;
+ var lastBatchCount = 0;
+
+ while (!completed.Token.IsCancellationRequested)
+ {
+ try
+ {
+ await Task.Delay(TimeSpan.FromMinutes(1), completed.Token);
+ }
+ catch (TaskCanceledException)
+ {
+ break;
+ }
+
+ threadCountHistory.Add(_maxThreadCount);
+ var prevSuccess = Interlocked.Exchange(ref lastSuccess, _successCount);
+ var prevBatchCount = Interlocked.Exchange(ref lastBatchCount, _batchExecutionCount);
+ var recordCount = lastSuccess - prevSuccess;
+ var batchCount = lastBatchCount - prevBatchCount;
+ rpmHistory.Add(recordCount);
+ if (batchCount == 0)
+ batchSizeHistory.Add(0);
+ else
+ batchSizeHistory.Add((float)recordCount / batchCount);
+ _maxThreadCount = _threadCount;
+ lastSuccess = _successCount;
+ }
+
+ threadCountHistory.Add(_maxThreadCount);
+ var finalRecordCount = _successCount - lastSuccess;
+ var finalBatchCount = _batchExecutionCount - lastBatchCount;
+ rpmHistory.Add(finalRecordCount);
+ if (finalBatchCount == 0)
+ batchSizeHistory.Add(0);
+ else
+ batchSizeHistory.Add((float)finalRecordCount / finalBatchCount);
+ });
try
{
@@ -507,6 +599,8 @@ protected void ExecuteDmlOperation(DataSource dataSource, IQueryExecutionOptions
#endif
Interlocked.Increment(ref _threadCount);
+ _maxThreadCount = Math.Max(_maxThreadCount, _threadCount);
+
return new ParallelThreadState
{
Service = service,
@@ -657,6 +751,15 @@ await UpdateNextBatchSize(threadLocalState, async () =>
else
throw ex;
}
+ finally
+ {
+ completed.Cancel();
+ performanceMonitor.ConfigureAwait(false).GetAwaiter().GetResult();
+
+ _threadCountHistory = threadCountHistory.ToArray();
+ _rpmHistory = rpmHistory.ToArray();
+ _batchSizeHistory = batchSizeHistory.ToArray();
+ }
recordsAffected = _successCount;
message = $"({_successCount:N0} {GetDisplayName(_successCount, meta)} {operationNames.CompletedLowercase})";
@@ -675,6 +778,9 @@ private async Task UpdateNextBatchSize(ParallelThreadState threadLocalState, Fun
// Adjust the batch size based on the time taken to try and keep the total time around 10sec
var multiplier = TimeSpan.FromSeconds(10).TotalMilliseconds / timer.Duration.TotalMilliseconds;
threadLocalState.NextBatchSize = Math.Max(1, Math.Min(BatchSize, (int)(threadLocalState.NextBatchSize * multiplier)));
+
+ // Update the statistics of the number of batches we have executed
+ Interlocked.Increment(ref _batchExecutionCount);
}
private bool IsRetryableFault(OrganizationServiceFault fault)
@@ -733,10 +839,13 @@ private async Task ExecuteSingleRequest(OrganizationRequest req, Operation
retryDelay = TimeSpan.FromMinutes(1);
_delayedUntil[threadState] = DateTime.Now.Add(retryDelay);
+ if (Interlocked.Increment(ref _pausedThreadCount) == 1)
+ Interlocked.Increment(ref _serviceProtectionLimitHits);
ShowProgress(options, operationNames, meta);
loopState.RequestReduceThreadCount();
await Task.Delay(retryDelay, options.CancellationToken);
+ Interlocked.Decrement(ref _pausedThreadCount);
_delayedUntil.TryRemove(threadState, out _);
ShowProgress(options, operationNames, meta);
_retryQueue.Enqueue(req);
@@ -910,10 +1019,13 @@ private async Task ProcessBatch(ExecuteMultipleRequest req, OperationNames
retryDelay = TimeSpan.FromMinutes(1);
_delayedUntil[threadState] = DateTime.Now.Add(retryDelay);
+ if (Interlocked.Increment(ref _pausedThreadCount) == 1)
+ Interlocked.Increment(ref _serviceProtectionLimitHits);
ShowProgress(options, operationNames, meta);
loopState.RequestReduceThreadCount();
await Task.Delay(retryDelay, options.CancellationToken);
+ Interlocked.Decrement(ref _pausedThreadCount);
_delayedUntil.TryRemove(threadState, out _);
ShowProgress(options, operationNames, meta);
_retryQueue.Enqueue(req);
diff --git a/MarkMpn.Sql4Cds.Engine/ExecutionPlanNodeTypeDescriptor.cs b/MarkMpn.Sql4Cds.Engine/ExecutionPlanNodeTypeDescriptor.cs
index 06f80d16..8e740f95 100644
--- a/MarkMpn.Sql4Cds.Engine/ExecutionPlanNodeTypeDescriptor.cs
+++ b/MarkMpn.Sql4Cds.Engine/ExecutionPlanNodeTypeDescriptor.cs
@@ -1,6 +1,8 @@
using System;
using System.Collections.Generic;
using System.ComponentModel;
+using System.Drawing;
+using System.Drawing.Design;
using System.Globalization;
using System.Linq;
using System.Reflection;
@@ -161,6 +163,10 @@ public override TypeConverter Converter
{
get
{
+ var propTypeConverterAttr = _prop?.GetCustomAttribute();
+ if (propTypeConverterAttr != null)
+ return (TypeConverter)Activator.CreateInstance(Type.GetType(propTypeConverterAttr.ConverterTypeName));
+
var type = _value?.GetType() ?? _prop.PropertyType;
if ((type.IsClass || type.IsInterface) && type != typeof(string))
@@ -425,4 +431,46 @@ class DictionaryKeyAttribute : Attribute
class DictionaryValueAttribute : Attribute
{
}
+
+#if !NETCOREAPP
+ class MiniChartEditor : UITypeEditor
+ {
+ public override bool GetPaintValueSupported(ITypeDescriptorContext context)
+ {
+ return true;
+ }
+
+ public override void PaintValue(PaintValueEventArgs e)
+ {
+ var values = (float[])((ExecutionPlanNodeTypeDescriptor)e.Value).GetPropertyOwner(null);
+ var brush = Brushes.DarkBlue;
+ var barWidth = (float)e.Bounds.Width / values.Length;
+ var maxValue = values.Max();
+
+ for (var i = 0; i < values.Length && maxValue > 0; i++)
+ {
+ var height = e.Bounds.Height * values[i] / maxValue;
+ e.Graphics.FillRectangle(brush, e.Bounds.X + i * barWidth, e.Bounds.Bottom - height, barWidth, height);
+ }
+ }
+ }
+#endif
+
+ class MiniChartConverter : DataCollectionConverter
+ {
+ public override object ConvertTo(ITypeDescriptorContext context, CultureInfo culture, object value, Type destinationType)
+ {
+ if (value is ICustomTypeDescriptor desc)
+ value = desc.GetPropertyOwner(null);
+
+ var values = (float[])value;
+ var maxValue = values.Max();
+ var minValue = values.Min();
+
+ if (minValue == maxValue)
+ return minValue.ToString();
+ else
+ return $"{minValue} - {maxValue}";
+ }
+ }
}
From 0efc56e9d36db3b9926843c95fc01276ae0ee4f8 Mon Sep 17 00:00:00 2001
From: Mark Carrington <31017244+MarkMpn@users.noreply.github.com>
Date: Mon, 18 Nov 2024 20:38:46 +0000
Subject: [PATCH 6/6] Refactored request retrying
---
.../ExecutionPlan/BaseDmlNode.cs | 72 +++++++++----------
1 file changed, 32 insertions(+), 40 deletions(-)
diff --git a/MarkMpn.Sql4Cds.Engine/ExecutionPlan/BaseDmlNode.cs b/MarkMpn.Sql4Cds.Engine/ExecutionPlan/BaseDmlNode.cs
index 5d1a5562..64effc68 100644
--- a/MarkMpn.Sql4Cds.Engine/ExecutionPlan/BaseDmlNode.cs
+++ b/MarkMpn.Sql4Cds.Engine/ExecutionPlan/BaseDmlNode.cs
@@ -807,7 +807,7 @@ private async Task ExecuteSingleRequest(OrganizationRequest req, Operation
{
Interlocked.Increment(ref _inProgressCount);
- for (var retry = 0; ; retry++)
+ for (var retry = 0; !options.CancellationToken.IsCancellationRequested; retry++)
{
try
{
@@ -873,6 +873,8 @@ private async Task ExecuteSingleRequest(OrganizationRequest req, Operation
return true;
}
}
+
+ return true;
}
private void ShowProgress(IQueryExecutionOptions options, OperationNames operationNames, EntityMetadata meta)
@@ -941,57 +943,47 @@ private async Task ProcessBatch(ExecuteMultipleRequest req, OperationNames
throw new FaultException(_fault, new FaultReason(_fault.Message));
}
+ List retryRequests;
+
if (ContinueOnError)
{
+ // The server will already have tried to execute every request, so we can just pull the ones
+ // that have a fault that we know we can retry
var retryableErrors = errorResponses.Where(item => IsRetryableFault(item.Fault)).ToList();
-
- if (retryableErrors.Count > 0)
- {
- // Create a new ExecuteMultipleRequest with all the requests that haven't been processed yet
- var retryReq = new ExecuteMultipleRequest
- {
- Requests = new OrganizationRequestCollection(),
- Settings = new ExecuteMultipleSettings
- {
- ContinueOnError = IgnoresSomeErrors,
- ReturnResponses = responseHandler != null
- }
- };
-
- foreach (var errorItem in retryableErrors)
- retryReq.Requests.Add(req.Requests[errorItem.RequestIndex]);
-
- // Wait and retry
- await Task.Delay(TimeSpan.FromSeconds(2), options.CancellationToken);
- req = retryReq;
- continue;
- }
+ retryRequests = retryableErrors.Select(item => req.Requests[item.RequestIndex]).ToList();
}
else
{
+ // The server will have stopped at the first fault. If that was a retryable fault, add that
+ // and all the subsequent requests to try again
var firstRetryableError = errorResponses.FirstOrDefault(item => IsRetryableFault(item.Fault));
- if (firstRetryableError != null)
+ if (firstRetryableError == null)
+ retryRequests = new List();
+ else
+ retryRequests = req.Requests.Skip(firstRetryableError.RequestIndex).ToList();
+ }
+
+ if (retryRequests.Count > 0)
+ {
+ // Create a new ExecuteMultipleRequest with all the requests that haven't been processed yet
+ var retryReq = new ExecuteMultipleRequest
{
- // Create a new ExecuteMultipleRequest with all the requests that haven't been processed yet
- var retryReq = new ExecuteMultipleRequest
+ Requests = new OrganizationRequestCollection(),
+ Settings = new ExecuteMultipleSettings
{
- Requests = new OrganizationRequestCollection(),
- Settings = new ExecuteMultipleSettings
- {
- ContinueOnError = IgnoresSomeErrors,
- ReturnResponses = responseHandler != null
- }
- };
+ ContinueOnError = IgnoresSomeErrors,
+ ReturnResponses = responseHandler != null
+ }
+ };
- for (var i = firstRetryableError.RequestIndex; i < req.Requests.Count; i++)
- retryReq.Requests.Add(req.Requests[i]);
+ foreach (var retryRequest in retryRequests)
+ retryReq.Requests.Add(retryRequest);
- // Wait and retry
- await Task.Delay(TimeSpan.FromSeconds(2), options.CancellationToken);
- req = retryReq;
- continue;
- }
+ // Wait and retry
+ await Task.Delay(TimeSpan.FromSeconds(2), options.CancellationToken);
+ req = retryReq;
+ continue;
}
break;