diff --git a/MarkMpn.Sql4Cds.Engine/DataSource.cs b/MarkMpn.Sql4Cds.Engine/DataSource.cs
index 3792809a..479c85cc 100644
--- a/MarkMpn.Sql4Cds.Engine/DataSource.cs
+++ b/MarkMpn.Sql4Cds.Engine/DataSource.cs
@@ -6,6 +6,10 @@
using static Microsoft.ApplicationInsights.MetricDimensionNames.TelemetryContext;
using Microsoft.Crm.Sdk.Messages;
using System.Linq;
+using System.ServiceModel;
+using System.Threading.Tasks;
+
+
#if NETCOREAPP
using Microsoft.PowerPlatform.Dataverse.Client;
@@ -174,16 +178,65 @@ internal Collation DefaultCollation
}
///
- /// Executes a request against the data source
+ /// Executes a request against the data source, with additional feedback when the response is delayed due to service protection limits
///
/// The request to execute
+ /// The which provides logging capabilities
+ /// The templated log message to display when the request is being throttled
/// The response to the request
- ///
- /// This method automatically preserves the session token for Elastic table consistency
- ///
- internal OrganizationResponse Execute(OrganizationRequest request)
+ internal OrganizationResponse ExecuteWithServiceProtectionLimitLogging(OrganizationRequest request, IQueryExecutionOptions options, string logMessage)
{
- return Execute(Connection, request);
+ var oldRetryCount = 0;
+
+#if NETCOREAPP
+ var crmService = Connection as ServiceClient;
+#else
+ var crmService = Connection as CrmServiceClient;
+#endif
+
+ if (crmService != null)
+ {
+ oldRetryCount = crmService.MaxRetryCount;
+ crmService.MaxRetryCount = 0;
+ }
+
+ try
+ {
+ var task = Task.Run(() =>
+ {
+ for (var retry = 0; ; retry++)
+ {
+ try
+ {
+ return Execute(Connection, request);
+ }
+ catch (FaultException ex)
+ {
+ if (!ex.IsThrottlingException(out var retryDelay))
+ throw;
+
+ options.Progress(0, logMessage + $" (paused until {DateTime.Now.Add(retryDelay).ToShortTimeString()} due to service protection limits)");
+ Task.Delay(retryDelay, options.CancellationToken).Wait();
+ }
+ }
+ });
+
+ try
+ {
+ task.Wait(options.CancellationToken);
+ }
+ catch (AggregateException ex)
+ {
+ throw ex.InnerException;
+ }
+
+ return task.Result;
+ }
+ finally
+ {
+ if (crmService != null)
+ crmService.MaxRetryCount = oldRetryCount;
+ }
}
///
diff --git a/MarkMpn.Sql4Cds.Engine/ExecutionPlan/BaseDmlNode.cs b/MarkMpn.Sql4Cds.Engine/ExecutionPlan/BaseDmlNode.cs
index 5e6b7e7f..64effc68 100644
--- a/MarkMpn.Sql4Cds.Engine/ExecutionPlan/BaseDmlNode.cs
+++ b/MarkMpn.Sql4Cds.Engine/ExecutionPlan/BaseDmlNode.cs
@@ -11,6 +11,7 @@
using Microsoft.Xrm.Sdk;
using Microsoft.Xrm.Sdk.Messages;
using Microsoft.Xrm.Sdk.Metadata;
+using System.Collections.Concurrent;
#if NETCOREAPP
using Microsoft.PowerPlatform.Dataverse.Client;
#else
@@ -60,6 +61,63 @@ public void Dispose()
}
}
+ class ParallelLoopState : DynamicParallelLoopState
+ {
+ private int _successfulExecution;
+
+ public override void RequestReduceThreadCount()
+ {
+ if (Interlocked.CompareExchange(ref _successfulExecution, 0, 0) == 0)
+ return;
+
+ base.RequestReduceThreadCount();
+ }
+
+ public override bool ResetReduceThreadCount()
+ {
+ if (base.ResetReduceThreadCount())
+ {
+ Interlocked.Exchange(ref _successfulExecution, 0);
+ return true;
+ }
+
+ return false;
+ }
+
+ public void IncrementSuccessfulExecution()
+ {
+ Interlocked.Increment(ref _successfulExecution);
+ }
+ }
+
+ class ParallelThreadState
+ {
+ public IOrganizationService Service { get; set; }
+
+ public ExecuteMultipleRequest EMR { get; set; }
+
+ public int NextBatchSize { get; set; }
+
+ public bool Error { get; set; }
+ }
+
+ private int _entityCount;
+ private int _inProgressCount;
+ private int _successCount;
+ private int _errorCount;
+ private int _threadCount;
+ private int _maxThreadCount;
+ private int _pausedThreadCount;
+ private int _batchExecutionCount;
+ private ParallelOptions _parallelOptions;
+ private ConcurrentQueue _retryQueue;
+ private ConcurrentDictionary _delayedUntil;
+ private OrganizationServiceFault _fault;
+ private int _serviceProtectionLimitHits;
+ private int[] _threadCountHistory;
+ private int[] _rpmHistory;
+ private float[] _batchSizeHistory;
+
///
/// The SQL string that the query was converted from
///
@@ -97,12 +155,50 @@ public void Dispose()
///
/// The maximum degree of parallelism to apply to this operation
///
+ [DisplayName("Max Degree of Parallelism")]
[Description("The maximum number of operations that will be performed in parallel")]
public abstract int MaxDOP { get; set; }
+ [Category("Statistics")]
+ [DisplayName("Actual Degree of Parallelism")]
+ [BrowsableInEstimatedPlan(false)]
+ [Description("The number of threads that were running each minute during the operation")]
+ [TypeConverter(typeof(MiniChartConverter))]
+#if !NETCOREAPP
+ [Editor(typeof(MiniChartEditor), typeof(System.Drawing.Design.UITypeEditor))]
+#endif
+ public float[] ActualDOP => _threadCountHistory.Select(i => (float)i).ToArray();
+
+ [Category("Statistics")]
+ [DisplayName("Records Per Minute")]
+ [BrowsableInEstimatedPlan(false)]
+ [Description("The number of records that were processed each minute during the operation")]
+ [TypeConverter(typeof(MiniChartConverter))]
+#if !NETCOREAPP
+ [Editor(typeof(MiniChartEditor), typeof(System.Drawing.Design.UITypeEditor))]
+#endif
+ public float[] RPM => _rpmHistory.Select(i => (float)i).ToArray();
+
+ [Category("Statistics")]
+ [DisplayName("Actual Batch Size")]
+ [BrowsableInEstimatedPlan(false)]
+ [Description("The average number of records that were processed per batch each minute during the operation")]
+ [TypeConverter(typeof(MiniChartConverter))]
+#if !NETCOREAPP
+ [Editor(typeof(MiniChartEditor), typeof(System.Drawing.Design.UITypeEditor))]
+#endif
+ public float[] ActualBatchSize => _batchSizeHistory;
+
+ [Category("Statistics")]
+ [DisplayName("Service Protection Limit Hits")]
+ [BrowsableInEstimatedPlan(false)]
+ [Description("The number of times execution was paused due to service protection limits")]
+ public int ServiceProtectionLimitHits => _serviceProtectionLimitHits;
+
///
/// The number of requests that will be submitted in a single batch
///
+ [DisplayName("Max Batch Size")]
[Description("The number of requests that will be submitted in a single batch")]
public abstract int BatchSize { get; set; }
@@ -380,6 +476,11 @@ protected class OperationNames
/// The completed name of the operation to include in the middle of a log message, e.g. "updated"
///
public string CompletedLowercase { get; set; }
+
+ ///
+ /// The completed name of the operation to include at the start of a log message, e.g. "Updated"
+ ///
+ public string CompletedUppercase { get; set; }
}
///
@@ -397,10 +498,12 @@ protected class OperationNames
/// An optional parameter to handle the response messages from the server
protected void ExecuteDmlOperation(DataSource dataSource, IQueryExecutionOptions options, List entities, EntityMetadata meta, Func requestGenerator, OperationNames operationNames, NodeExecutionContext context, out int recordsAffected, out string message, Action responseHandler = null)
{
- var inProgressCount = 0;
- var count = 0;
- var errorCount = 0;
- var threadCount = 0;
+ _entityCount = entities.Count;
+ _inProgressCount = 0;
+ _successCount = 0;
+ _errorCount = 0;
+ _threadCount = 0;
+ _batchExecutionCount = 0;
#if NETCOREAPP
var svc = dataSource.Connection as ServiceClient;
@@ -408,6 +511,10 @@ protected void ExecuteDmlOperation(DataSource dataSource, IQueryExecutionOptions
var svc = dataSource.Connection as CrmServiceClient;
#endif
+ var threadCountHistory = new List();
+ var rpmHistory = new List();
+ var batchSizeHistory = new List();
+
var maxDop = MaxDOP;
if (!ParallelismHelper.CanParallelise(dataSource.Connection))
@@ -416,7 +523,49 @@ protected void ExecuteDmlOperation(DataSource dataSource, IQueryExecutionOptions
if (maxDop == 1)
svc = null;
- var useAffinityCookie = maxDop == 1 || entities.Count < 100;
+ var useAffinityCookie = maxDop == 1 || _entityCount < 100;
+ var completed = new CancellationTokenSource();
+
+ // Set up one background thread to monitor the performance for debugging
+ var performanceMonitor = Task.Factory.StartNew(async () =>
+ {
+ var lastSuccess = 0;
+ var lastBatchCount = 0;
+
+ while (!completed.Token.IsCancellationRequested)
+ {
+ try
+ {
+ await Task.Delay(TimeSpan.FromMinutes(1), completed.Token);
+ }
+ catch (TaskCanceledException)
+ {
+ break;
+ }
+
+ threadCountHistory.Add(_maxThreadCount);
+ var prevSuccess = Interlocked.Exchange(ref lastSuccess, _successCount);
+ var prevBatchCount = Interlocked.Exchange(ref lastBatchCount, _batchExecutionCount);
+ var recordCount = lastSuccess - prevSuccess;
+ var batchCount = lastBatchCount - prevBatchCount;
+ rpmHistory.Add(recordCount);
+ if (batchCount == 0)
+ batchSizeHistory.Add(0);
+ else
+ batchSizeHistory.Add((float)recordCount / batchCount);
+ _maxThreadCount = _threadCount;
+ lastSuccess = _successCount;
+ }
+
+ threadCountHistory.Add(_maxThreadCount);
+ var finalRecordCount = _successCount - lastSuccess;
+ var finalBatchCount = _batchExecutionCount - lastBatchCount;
+ rpmHistory.Add(finalRecordCount);
+ if (finalBatchCount == 0)
+ batchSizeHistory.Add(0);
+ else
+ batchSizeHistory.Add((float)finalRecordCount / finalBatchCount);
+ });
try
{
@@ -424,94 +573,72 @@ protected void ExecuteDmlOperation(DataSource dataSource, IQueryExecutionOptions
using (UseParallelConnections())
{
- Parallel.ForEach(entities,
- new ParallelOptions { MaxDegreeOfParallelism = maxDop },
+ _parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = maxDop };
+ _retryQueue = new ConcurrentQueue();
+ _delayedUntil = new ConcurrentDictionary();
+
+ new DynamicParallel(
+ entities,
+ _parallelOptions,
() =>
{
var service = svc?.Clone() ?? dataSource.Connection;
#if NETCOREAPP
- if (!useAffinityCookie && service is ServiceClient crmService)
- crmService.EnableAffinityCookie = false;
+ if (service is ServiceClient crmService)
+ {
+ crmService.MaxRetryCount = 0;
+ crmService.EnableAffinityCookie = useAffinityCookie;
+ }
#else
- if (!useAffinityCookie && service is CrmServiceClient crmService)
- crmService.EnableAffinityCookie = false;
-#endif
- Interlocked.Increment(ref threadCount);
-
- return new { Service = service, EMR = default(ExecuteMultipleRequest) };
- },
- (entity, loopState, index, threadLocalState) =>
- {
- if (options.CancellationToken.IsCancellationRequested)
+ if (service is CrmServiceClient crmService)
{
- loopState.Stop();
- return threadLocalState;
+ crmService.MaxRetryCount = 0;
+ crmService.EnableAffinityCookie = useAffinityCookie;
}
+#endif
+ Interlocked.Increment(ref _threadCount);
- var request = requestGenerator(entity);
-
- if (BypassCustomPluginExecution)
- request.Parameters["BypassCustomPluginExecution"] = true;
+ _maxThreadCount = Math.Max(_maxThreadCount, _threadCount);
- if (BatchSize == 1)
+ return new ParallelThreadState
{
- var newCount = Interlocked.Increment(ref inProgressCount);
- var progress = (double)newCount / entities.Count;
-
- if (threadCount < 2)
- options.Progress(progress, $"{operationNames.InProgressUppercase} {newCount:N0} of {entities.Count:N0} {GetDisplayName(0, meta)} ({progress:P0})...");
- else
- options.Progress(progress, $"{operationNames.InProgressUppercase} {newCount - threadCount + 1:N0}-{newCount:N0} of {entities.Count:N0} {GetDisplayName(0, meta)} ({progress:P0}, {threadCount:N0} threads)...");
+ Service = service,
+ EMR = null,
+ Error = false,
+ NextBatchSize = 1
+ };
+ },
+ async (entity, loopState, threadLocalState) =>
+ {
+ var executed = false;
- while (true)
+ try
+ {
+ if (options.CancellationToken.IsCancellationRequested)
{
- try
- {
- var response = dataSource.Execute(threadLocalState.Service, request);
- Interlocked.Increment(ref count);
-
- responseHandler?.Invoke(response);
- break;
- }
- catch (FaultException ex)
- {
- if (ex.Detail.ErrorCode == 429 || // Virtual/elastic tables
- ex.Detail.ErrorCode == -2147015902 || // Number of requests exceeded the limit of 6000 over time window of 300 seconds.
- ex.Detail.ErrorCode == -2147015903 || // Combined execution time of incoming requests exceeded limit of 1,200,000 milliseconds over time window of 300 seconds. Decrease number of concurrent requests or reduce the duration of requests and try again later.
- ex.Detail.ErrorCode == -2147015898) // Number of concurrent requests exceeded the limit of 52.
- {
- // In case throttling isn't handled by normal retry logic in the service client
- var retryAfterSeconds = 2;
+ loopState.Stop();
+ return;
+ }
- if (ex.Detail.ErrorDetails.TryGetValue("Retry-After", out var retryAfter) && (retryAfter is int || retryAfter is string s && Int32.TryParse(s, out _)))
- retryAfterSeconds = Convert.ToInt32(retryAfter);
+ // Generate the request to insert/update/delete this record
+ var request = requestGenerator(entity);
- Thread.Sleep(retryAfterSeconds * 1000);
- continue;
- }
+ if (BypassCustomPluginExecution)
+ request.Parameters["BypassCustomPluginExecution"] = true;
- if (FilterErrors(context, request, ex.Detail))
- {
- if (ContinueOnError)
- fault = fault ?? ex.Detail;
- else
- throw;
- }
-
- Interlocked.Increment(ref errorCount);
- break;
- }
+ if (threadLocalState.NextBatchSize == 1)
+ {
+ await UpdateNextBatchSize(threadLocalState, async () =>
+ {
+ executed = await ExecuteSingleRequest(request, operationNames, meta, options, dataSource, threadLocalState.Service, context, responseHandler, threadLocalState, loopState);
+ });
}
- }
- else
- {
- if (threadLocalState.EMR == null)
+ else
{
- threadLocalState = new
+ if (threadLocalState.EMR == null)
{
- threadLocalState.Service,
- EMR = new ExecuteMultipleRequest
+ threadLocalState.EMR = new ExecuteMultipleRequest
{
Requests = new OrganizationRequestCollection(),
Settings = new ExecuteMultipleSettings
@@ -519,32 +646,86 @@ protected void ExecuteDmlOperation(DataSource dataSource, IQueryExecutionOptions
ContinueOnError = IgnoresSomeErrors,
ReturnResponses = responseHandler != null
}
- }
- };
- }
+ };
+ }
- threadLocalState.EMR.Requests.Add(request);
+ threadLocalState.EMR.Requests.Add(request);
- if (threadLocalState.EMR.Requests.Count == BatchSize)
- {
- ProcessBatch(threadLocalState.EMR, threadCount, ref count, ref inProgressCount, ref errorCount, entities, operationNames, meta, options, dataSource, threadLocalState.Service, context, responseHandler, ref fault);
+ if (threadLocalState.EMR.Requests.Count < threadLocalState.NextBatchSize)
+ return;
+
+ await UpdateNextBatchSize(threadLocalState, async () =>
+ {
+ executed = await ProcessBatch(threadLocalState.EMR, operationNames, meta, options, dataSource, threadLocalState.Service, context, responseHandler, threadLocalState, loopState);
+ });
+
+ threadLocalState.EMR = null;
+ }
+ }
+ catch
+ {
+ threadLocalState.Error = true;
+ throw;
+ }
- threadLocalState = new { threadLocalState.Service, EMR = default(ExecuteMultipleRequest) };
+ // Take any requests from the retry queue and execute them
+ while (executed && _retryQueue.TryDequeue(out var retryReq))
+ {
+ if (options.CancellationToken.IsCancellationRequested)
+ {
+ loopState.Stop();
+ return;
}
+
+ if (loopState.IsStopped)
+ return;
+
+ if (retryReq is ExecuteMultipleRequest emr)
+ executed = await ProcessBatch(emr, operationNames, meta, options, dataSource, threadLocalState.Service, context, responseHandler, threadLocalState, loopState);
+ else
+ executed = await ExecuteSingleRequest(retryReq, operationNames, meta, options, dataSource, threadLocalState.Service, context, responseHandler, threadLocalState, loopState);
}
- return threadLocalState;
+ if (executed)
+ loopState.RequestIncreaseThreadCount();
},
- (threadLocalState) =>
+ async (loopState, threadLocalState) =>
{
- if (threadLocalState.EMR != null)
- ProcessBatch(threadLocalState.EMR, threadCount, ref count, ref inProgressCount, ref errorCount, entities, operationNames, meta, options, dataSource, threadLocalState.Service, context, responseHandler, ref fault);
+ // If we've got a partial batch, execute it now
+ if (threadLocalState.EMR != null && !threadLocalState.Error)
+ await ProcessBatch(threadLocalState.EMR, operationNames, meta, options, dataSource, threadLocalState.Service, context, responseHandler, threadLocalState, loopState);
+
+ // If this is the final thread, execute any remaining requests in the retry queue
+ if (!loopState.IsStopped &&
+ !options.CancellationToken.IsCancellationRequested &&
+ Interlocked.CompareExchange(ref _threadCount, 1, 1) == 1)
+ {
+ // Take any requests from the retry queue and execute them
+ while (_retryQueue.TryDequeue(out var retryReq))
+ {
+ if (options.CancellationToken.IsCancellationRequested)
+ {
+ loopState.Stop();
+ return;
+ }
+
+ if (loopState.IsStopped)
+ return;
+
+ if (retryReq is ExecuteMultipleRequest emr)
+ await ProcessBatch(emr, operationNames, meta, options, dataSource, threadLocalState.Service, context, responseHandler, threadLocalState, loopState);
+ else
+ await ExecuteSingleRequest(retryReq, operationNames, meta, options, dataSource, threadLocalState.Service, context, responseHandler, threadLocalState, loopState);
+ }
+ }
- Interlocked.Decrement(ref threadCount);
+ Interlocked.Decrement(ref _threadCount);
+ ShowProgress(options, operationNames, meta);
if (threadLocalState.Service != dataSource.Connection && threadLocalState.Service is IDisposable disposableClient)
disposableClient.Dispose();
- });
+ })
+ .ForEach(options.CancellationToken);
}
if (fault != null)
@@ -554,21 +735,65 @@ protected void ExecuteDmlOperation(DataSource dataSource, IQueryExecutionOptions
{
var originalEx = ex;
- if (ex is AggregateException agg && agg.InnerExceptions.Count == 1)
- ex = agg.InnerException;
+ if (ex is AggregateException agg)
+ {
+ if (agg.InnerExceptions.Count == 1)
+ ex = agg.InnerException;
+ else if (agg.InnerExceptions.All(inner => inner is OperationCanceledException))
+ ex = agg.InnerExceptions[0];
+ }
- if (count > 0)
- context.Log(new Sql4CdsError(1, 0, $"{count:N0} {GetDisplayName(count, meta)} {operationNames.CompletedLowercase}"));
+ if (_successCount > 0)
+ context.Log(new Sql4CdsError(1, 0, $"{_successCount:N0} {GetDisplayName(_successCount, meta)} {operationNames.CompletedLowercase}"));
if (ex == originalEx)
throw;
else
throw ex;
}
+ finally
+ {
+ completed.Cancel();
+ performanceMonitor.ConfigureAwait(false).GetAwaiter().GetResult();
+
+ _threadCountHistory = threadCountHistory.ToArray();
+ _rpmHistory = rpmHistory.ToArray();
+ _batchSizeHistory = batchSizeHistory.ToArray();
+ }
- recordsAffected = count;
- message = $"({count:N0} {GetDisplayName(count, meta)} {operationNames.CompletedLowercase})";
- context.ParameterValues["@@ROWCOUNT"] = (SqlInt32)count;
+ recordsAffected = _successCount;
+ message = $"({_successCount:N0} {GetDisplayName(_successCount, meta)} {operationNames.CompletedLowercase})";
+ context.ParameterValues["@@ROWCOUNT"] = (SqlInt32)_successCount;
+ }
+
+ private async Task UpdateNextBatchSize(ParallelThreadState threadLocalState, Func action)
+ {
+ // Time how long the action takes
+ var timer = new Timer();
+ using (timer.Run())
+ {
+ await action();
+ }
+
+ // Adjust the batch size based on the time taken to try and keep the total time around 10sec
+ var multiplier = TimeSpan.FromSeconds(10).TotalMilliseconds / timer.Duration.TotalMilliseconds;
+ threadLocalState.NextBatchSize = Math.Max(1, Math.Min(BatchSize, (int)(threadLocalState.NextBatchSize * multiplier)));
+
+ // Update the statistics of the number of batches we have executed
+ Interlocked.Increment(ref _batchExecutionCount);
+ }
+
+ private bool IsRetryableFault(OrganizationServiceFault fault)
+ {
+ if (fault == null)
+ return false;
+
+ if (fault.ErrorCode == -2147188475) // More than one concurrent {0} results detected for an Entity {1} and ObjectTypeCode {2}
+ {
+ return true;
+ }
+
+ return false;
}
protected class BulkApiErrorDetail
@@ -578,39 +803,229 @@ protected class BulkApiErrorDetail
public int StatusCode { get; set; }
}
- private void ProcessBatch(ExecuteMultipleRequest req, int threadCount, ref int count, ref int inProgressCount, ref int errorCount, List entities, OperationNames operationNames, EntityMetadata meta, IQueryExecutionOptions options, DataSource dataSource, IOrganizationService org, NodeExecutionContext context, Action responseHandler, ref OrganizationServiceFault fault)
+ private async Task ExecuteSingleRequest(OrganizationRequest req, OperationNames operationNames, EntityMetadata meta, IQueryExecutionOptions options, DataSource dataSource, IOrganizationService org, NodeExecutionContext context, Action responseHandler, ParallelThreadState threadState, ParallelLoopState loopState)
{
- var newCount = Interlocked.Add(ref inProgressCount, req.Requests.Count);
- var progress = (double)newCount / entities.Count;
- var threadCountMessage = threadCount < 2 ? "" : $" ({threadCount:N0} threads)";
- options.Progress(progress, $"{operationNames.InProgressUppercase} {GetDisplayName(0, meta)} {count + errorCount + 1:N0} - {newCount:N0} of {entities.Count:N0}{threadCountMessage}...");
- var resp = ExecuteMultiple(dataSource, org, meta, req);
+ Interlocked.Increment(ref _inProgressCount);
- if (responseHandler != null)
+ for (var retry = 0; !options.CancellationToken.IsCancellationRequested; retry++)
{
- foreach (var item in resp.Responses)
+ try
{
- if (item.Response != null)
- responseHandler(item.Response);
+ ShowProgress(options, operationNames, meta);
+ var response = dataSource.Execute(org, req);
+ Interlocked.Increment(ref _successCount);
+
+ loopState.IncrementSuccessfulExecution();
+
+ responseHandler?.Invoke(response);
+ return true;
+ }
+ catch (FaultException ex)
+ {
+ if (ex.IsThrottlingException(out var retryDelay))
+ {
+ // Handle service protection limit retries ourselves to manage multi-threading
+ // Wait for the recommended retry time, then add the request to the queue for retrying
+ // Terminate this thread so we don't continue to overload the server
+ Interlocked.Decrement(ref _inProgressCount);
+
+ // The server can report too-long delays. Wait the full 5 minutes to start with,
+ // then reduce to 2 minutes then 1 minute
+ if (retry < 1 && retryDelay > TimeSpan.FromMinutes(5))
+ retryDelay = TimeSpan.FromMinutes(5);
+ else if (retry >= 1 && retry < 3 && retryDelay > TimeSpan.FromMinutes(2))
+ retryDelay = TimeSpan.FromMinutes(2);
+ else if (retry >= 3 && retryDelay > TimeSpan.FromMinutes(1))
+ retryDelay = TimeSpan.FromMinutes(1);
+
+ _delayedUntil[threadState] = DateTime.Now.Add(retryDelay);
+ if (Interlocked.Increment(ref _pausedThreadCount) == 1)
+ Interlocked.Increment(ref _serviceProtectionLimitHits);
+ ShowProgress(options, operationNames, meta);
+
+ loopState.RequestReduceThreadCount();
+ await Task.Delay(retryDelay, options.CancellationToken);
+ Interlocked.Decrement(ref _pausedThreadCount);
+ _delayedUntil.TryRemove(threadState, out _);
+ ShowProgress(options, operationNames, meta);
+ _retryQueue.Enqueue(req);
+ return false;
+ }
+
+ loopState.IncrementSuccessfulExecution();
+
+ if (IsRetryableFault(ex?.Detail))
+ {
+ // Retry the request after a short delay
+ Thread.Sleep(TimeSpan.FromSeconds(2));
+ continue;
+ }
+
+ if (FilterErrors(context, req, ex.Detail))
+ {
+ if (ContinueOnError)
+ _fault = _fault ?? ex.Detail;
+ else
+ throw;
+ }
+
+ Interlocked.Increment(ref _errorCount);
+ return true;
}
}
- var errorResponses = resp.Responses
- .Where(r => r.Fault != null)
- .ToList();
+ return true;
+ }
+
+ private void ShowProgress(IQueryExecutionOptions options, OperationNames operationNames, EntityMetadata meta)
+ {
+ var progress = (double)_inProgressCount / _entityCount;
+ var threadCountMessage = _threadCount < 2 ? "" : $" ({_threadCount:N0} threads)";
+ var operationName = operationNames.InProgressUppercase;
+
+ var delayedUntilValues = _delayedUntil.Values.ToArray();
+ if (delayedUntilValues.Length > 0 && delayedUntilValues.Length == _threadCount)
+ {
+ operationName = operationNames.CompletedUppercase;
+ threadCountMessage += $" (paused until {delayedUntilValues.Min().ToShortTimeString()} due to service protection limits)";
+ }
+ else if (delayedUntilValues.Length > 0)
+ {
+ threadCountMessage += $" ({delayedUntilValues.Length:N0} threads paused until {delayedUntilValues.Min().ToShortTimeString()} due to service protection limits)";
+ }
- Interlocked.Add(ref count, req.Requests.Count - errorResponses.Count);
- Interlocked.Add(ref errorCount, errorResponses.Count);
+ if (_successCount + _errorCount + 1 >= _inProgressCount)
+ options.Progress(progress, $"{operationName} {GetDisplayName(0, meta)} {_inProgressCount:N0} of {_entityCount:N0}{threadCountMessage}...");
+ else
+ options.Progress(progress, $"{operationName} {GetDisplayName(0, meta)} {_successCount + _errorCount + 1:N0} - {_inProgressCount:N0} of {_entityCount:N0}{threadCountMessage}...");
+ }
- var error = errorResponses.FirstOrDefault(item => FilterErrors(context, req.Requests[item.RequestIndex], item.Fault));
+ private async Task ProcessBatch(ExecuteMultipleRequest req, OperationNames operationNames, EntityMetadata meta, IQueryExecutionOptions options, DataSource dataSource, IOrganizationService org, NodeExecutionContext context, Action responseHandler, ParallelThreadState threadState, ParallelLoopState loopState)
+ {
+ Interlocked.Add(ref _inProgressCount, req.Requests.Count);
- if (error != null)
+ for (var retry = 0; !options.CancellationToken.IsCancellationRequested; retry++)
{
- fault = fault ?? error.Fault;
+ try
+ {
+ ShowProgress(options, operationNames, meta);
+ var resp = ExecuteMultiple(dataSource, org, meta, req);
- if (!ContinueOnError)
- throw new FaultException(fault, new FaultReason(fault.Message));
+ loopState.IncrementSuccessfulExecution();
+
+ if (responseHandler != null)
+ {
+ foreach (var item in resp.Responses)
+ {
+ if (item.Response != null)
+ responseHandler(item.Response);
+ }
+ }
+
+ var errorResponses = resp.Responses
+ .Where(r => r.Fault != null)
+ .ToList();
+
+ var nonRetryableErrorResponses = errorResponses
+ .Where(r => !IsRetryableFault(r.Fault))
+ .ToList();
+
+ Interlocked.Add(ref _successCount, req.Requests.Count - errorResponses.Count);
+ Interlocked.Add(ref _errorCount, nonRetryableErrorResponses.Count);
+
+ var error = errorResponses.FirstOrDefault(item => FilterErrors(context, req.Requests[item.RequestIndex], item.Fault) && !IsRetryableFault(item.Fault));
+
+ if (error != null)
+ {
+ _fault = _fault ?? error.Fault;
+
+ if (!ContinueOnError)
+ throw new FaultException(_fault, new FaultReason(_fault.Message));
+ }
+
+ List retryRequests;
+
+ if (ContinueOnError)
+ {
+ // The server will already have tried to execute every request, so we can just pull the ones
+ // that have a fault that we know we can retry
+ var retryableErrors = errorResponses.Where(item => IsRetryableFault(item.Fault)).ToList();
+ retryRequests = retryableErrors.Select(item => req.Requests[item.RequestIndex]).ToList();
+ }
+ else
+ {
+ // The server will have stopped at the first fault. If that was a retryable fault, add that
+ // and all the subsequent requests to try again
+ var firstRetryableError = errorResponses.FirstOrDefault(item => IsRetryableFault(item.Fault));
+
+ if (firstRetryableError == null)
+ retryRequests = new List();
+ else
+ retryRequests = req.Requests.Skip(firstRetryableError.RequestIndex).ToList();
+ }
+
+ if (retryRequests.Count > 0)
+ {
+ // Create a new ExecuteMultipleRequest with all the requests that haven't been processed yet
+ var retryReq = new ExecuteMultipleRequest
+ {
+ Requests = new OrganizationRequestCollection(),
+ Settings = new ExecuteMultipleSettings
+ {
+ ContinueOnError = IgnoresSomeErrors,
+ ReturnResponses = responseHandler != null
+ }
+ };
+
+ foreach (var retryRequest in retryRequests)
+ retryReq.Requests.Add(retryRequest);
+
+ // Wait and retry
+ await Task.Delay(TimeSpan.FromSeconds(2), options.CancellationToken);
+ req = retryReq;
+ continue;
+ }
+
+ break;
+ }
+ catch (FaultException ex)
+ {
+ if (!ex.IsThrottlingException(out var retryDelay))
+ {
+ loopState.IncrementSuccessfulExecution();
+ throw;
+ }
+
+ // Handle service protection limit retries ourselves to manage multi-threading
+ // Wait for the recommended retry time, then add the request to the queue for retrying
+ // Terminate this thread so we don't continue to overload the server
+ Interlocked.Add(ref _inProgressCount, -req.Requests.Count);
+
+ // The server can report too-long delays. Wait the full 5 minutes to start with,
+ // then reduce to 2 minutes then 1 minute
+ if (retry < 1 && retryDelay > TimeSpan.FromMinutes(5))
+ retryDelay = TimeSpan.FromMinutes(5);
+ else if (retry >= 1 && retry < 3 && retryDelay > TimeSpan.FromMinutes(2))
+ retryDelay = TimeSpan.FromMinutes(2);
+ else if (retry >= 3 && retryDelay > TimeSpan.FromMinutes(1))
+ retryDelay = TimeSpan.FromMinutes(1);
+
+ _delayedUntil[threadState] = DateTime.Now.Add(retryDelay);
+ if (Interlocked.Increment(ref _pausedThreadCount) == 1)
+ Interlocked.Increment(ref _serviceProtectionLimitHits);
+ ShowProgress(options, operationNames, meta);
+
+ loopState.RequestReduceThreadCount();
+ await Task.Delay(retryDelay, options.CancellationToken);
+ Interlocked.Decrement(ref _pausedThreadCount);
+ _delayedUntil.TryRemove(threadState, out _);
+ ShowProgress(options, operationNames, meta);
+ _retryQueue.Enqueue(req);
+ return false;
+ }
}
+
+ return true;
}
protected virtual bool FilterErrors(NodeExecutionContext context, OrganizationRequest request, OrganizationServiceFault fault)
diff --git a/MarkMpn.Sql4Cds.Engine/ExecutionPlan/DeleteNode.cs b/MarkMpn.Sql4Cds.Engine/ExecutionPlan/DeleteNode.cs
index 8a783c52..98d82e6a 100644
--- a/MarkMpn.Sql4Cds.Engine/ExecutionPlan/DeleteNode.cs
+++ b/MarkMpn.Sql4Cds.Engine/ExecutionPlan/DeleteNode.cs
@@ -134,7 +134,8 @@ public override void Execute(NodeExecutionContext context, out int recordsAffect
{
InProgressUppercase = "Deleting",
InProgressLowercase = "deleting",
- CompletedLowercase = "deleted"
+ CompletedLowercase = "deleted",
+ CompletedUppercase = "Deleted"
},
context,
out recordsAffected,
@@ -148,6 +149,10 @@ public override void Execute(NodeExecutionContext context, out int recordsAffect
throw;
}
+ catch (OperationCanceledException)
+ {
+ throw;
+ }
catch (Exception ex)
{
throw new QueryExecutionException(ex.Message, ex) { Node = this };
diff --git a/MarkMpn.Sql4Cds.Engine/ExecutionPlan/DynamicParallel.cs b/MarkMpn.Sql4Cds.Engine/ExecutionPlan/DynamicParallel.cs
new file mode 100644
index 00000000..253651d3
--- /dev/null
+++ b/MarkMpn.Sql4Cds.Engine/ExecutionPlan/DynamicParallel.cs
@@ -0,0 +1,165 @@
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace MarkMpn.Sql4Cds.Engine.ExecutionPlan
+{
+ ///
+ /// Provides similar methods to the class, but allows the loop to indicate that the number of threads
+ /// should change during execution.
+ ///
+ class DynamicParallel
+ where TLoop : DynamicParallelLoopState, new()
+ {
+ class DynamicParallelThreadState
+ {
+ public Task Task { get; set; }
+
+ public bool IsStopped { get; set; }
+
+ public CancellationToken CancellationToken { get; set; }
+ }
+
+ private readonly ConcurrentQueue _queue;
+ private readonly List _threads;
+ private readonly TLoop _loopState;
+ private readonly ConcurrentBag _exceptions;
+ private readonly ParallelOptions _parallelOptions;
+ private readonly Func _localInit;
+ private readonly Func _body;
+ private readonly Func _localFinally;
+
+ public DynamicParallel(
+ IEnumerable source,
+ ParallelOptions parallelOptions,
+ Func localInit,
+ Func body,
+ Func localFinally)
+ {
+ _queue = new ConcurrentQueue(source);
+ _threads = new List();
+ _loopState = new TLoop();
+ _exceptions = new ConcurrentBag();
+ _parallelOptions = parallelOptions;
+ _localInit = localInit;
+ _body = body;
+ _localFinally = localFinally;
+ }
+
+ public void ForEach(CancellationToken cancellationToken)
+ {
+ StartThread(cancellationToken);
+
+ var monitor = Task.Factory.StartNew(async () => await MonitorAsync(cancellationToken)).Unwrap();
+ monitor.Wait();
+
+ if (!_exceptions.IsEmpty)
+ throw new AggregateException(_exceptions);
+ }
+
+ private void StartThread(CancellationToken cancellationToken)
+ {
+ var threadState = new DynamicParallelThreadState
+ {
+ CancellationToken = cancellationToken
+ };
+ threadState.Task = Task.Factory.StartNew((state) => RunThread((DynamicParallelThreadState)state), threadState).Unwrap();
+ _threads.Add(threadState);
+ }
+
+ private async Task RunThread(DynamicParallelThreadState threadState)
+ {
+ var local = _localInit();
+
+ try
+ {
+ while (!_loopState.IsStopped &&
+ !threadState.IsStopped &&
+ _queue.TryDequeue(out var item))
+ {
+ await _body(item, _loopState, local);
+ }
+ }
+ catch (Exception ex)
+ {
+ _exceptions.Add(ex);
+ _loopState.Stop();
+ }
+ finally
+ {
+ await _localFinally(_loopState, local);
+ threadState.IsStopped = true;
+ }
+ }
+
+ private async Task MonitorAsync(CancellationToken cancellationToken)
+ {
+ var hasDecreased = false;
+
+ // Periodically check the votes and adjust the number of threads
+ while (!_loopState.IsStopped)
+ {
+ var allTasks = Task.WhenAll(_threads.Select(t => t.Task).ToArray());
+ var timeout = Task.Delay(1000, cancellationToken);
+ var task = await Task.WhenAny(allTasks, timeout);
+
+ if (task == allTasks)
+ break;
+
+ if (_loopState.ResetReduceThreadCount() && _threads.Count(t => !t.IsStopped) > 1)
+ {
+ var thread = _threads.First(t => !t.IsStopped);
+ thread.IsStopped = true;
+ hasDecreased = true;
+ }
+ else if (!hasDecreased && _threads.Count(t => !t.IsStopped) < _parallelOptions.MaxDegreeOfParallelism && _loopState.ResetIncreaseThreadCount())
+ {
+ StartThread(cancellationToken);
+ }
+ }
+ }
+ }
+
+ class DynamicParallelLoopState
+ {
+ private int _reduceThreadCount;
+ private int _increaseThreadCount;
+
+ public bool IsStopped { get; private set; }
+
+ public void Stop()
+ {
+ IsStopped = true;
+ }
+
+ public virtual void RequestReduceThreadCount()
+ {
+ Interlocked.Exchange(ref _reduceThreadCount, 1);
+ }
+
+ public virtual bool ResetReduceThreadCount()
+ {
+ if (Interlocked.CompareExchange(ref _reduceThreadCount, 0, 1) == 1)
+ return true;
+
+ return false;
+ }
+
+ public virtual void RequestIncreaseThreadCount()
+ {
+ Interlocked.Exchange(ref _increaseThreadCount, 1);
+ }
+
+ public virtual bool ResetIncreaseThreadCount()
+ {
+ if (Interlocked.CompareExchange(ref _increaseThreadCount, 0, 1) == 1)
+ return true;
+
+ return false;
+ }
+ }
+}
diff --git a/MarkMpn.Sql4Cds.Engine/ExecutionPlan/ExceptionExtensions.cs b/MarkMpn.Sql4Cds.Engine/ExecutionPlan/ExceptionExtensions.cs
new file mode 100644
index 00000000..e5e884f1
--- /dev/null
+++ b/MarkMpn.Sql4Cds.Engine/ExecutionPlan/ExceptionExtensions.cs
@@ -0,0 +1,41 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.ServiceModel;
+using System.Text;
+using System.Threading.Tasks;
+using Microsoft.Xrm.Sdk;
+
+namespace MarkMpn.Sql4Cds.Engine.ExecutionPlan
+{
+ static class ExceptionExtensions
+ {
+ public static bool IsThrottlingException(this FaultException ex, out TimeSpan retryDelay)
+ {
+ if (ex == null)
+ {
+ retryDelay = TimeSpan.Zero;
+ return false;
+ }
+
+ if (ex.Detail.ErrorCode == 429 || // Virtual/elastic tables
+ ex.Detail.ErrorCode == -2147015902 || // Number of requests exceeded the limit of 6000 over time window of 300 seconds.
+ ex.Detail.ErrorCode == -2147015903 || // Combined execution time of incoming requests exceeded limit of 1,200,000 milliseconds over time window of 300 seconds. Decrease number of concurrent requests or reduce the duration of requests and try again later.
+ ex.Detail.ErrorCode == -2147015898) // Number of concurrent requests exceeded the limit of 52.
+ {
+ retryDelay = TimeSpan.FromSeconds(2);
+
+ if (ex.Detail.ErrorDetails.TryGetValue("Retry-After", out var retryAfter) && retryAfter is TimeSpan ts)
+ retryDelay = ts;
+
+ if (retryDelay > TimeSpan.FromMinutes(5))
+ retryDelay = TimeSpan.FromMinutes(5);
+
+ return true;
+ }
+
+ retryDelay = TimeSpan.Zero;
+ return false;
+ }
+ }
+}
diff --git a/MarkMpn.Sql4Cds.Engine/ExecutionPlan/ExecuteAsNode.cs b/MarkMpn.Sql4Cds.Engine/ExecutionPlan/ExecuteAsNode.cs
index ce738ac2..095f2a34 100644
--- a/MarkMpn.Sql4Cds.Engine/ExecutionPlan/ExecuteAsNode.cs
+++ b/MarkMpn.Sql4Cds.Engine/ExecutionPlan/ExecuteAsNode.cs
@@ -134,7 +134,7 @@ public override void Execute(NodeExecutionContext context, out int recordsAffect
{
var qry = new Microsoft.Xrm.Sdk.Query.QueryExpression("systemuser");
qry.Criteria.AddCondition("systemuserid", ConditionOperator.EqualUserId);
- var actualUserId = ((RetrieveMultipleResponse)dataSource.Execute(new RetrieveMultipleRequest { Query = qry })).EntityCollection.Entities.Single().Id;
+ var actualUserId = ((RetrieveMultipleResponse)dataSource.ExecuteWithServiceProtectionLimitLogging(new RetrieveMultipleRequest { Query = qry }, context.Options, "Impersonating user...")).EntityCollection.Entities.Single().Id;
if (actualUserId != userId)
throw new QueryExecutionException(Sql4CdsError.ImpersonationError(username), new ApplicationException("User was found but the server could not impersonate it"));
diff --git a/MarkMpn.Sql4Cds.Engine/ExecutionPlan/ExecuteMessageNode.cs b/MarkMpn.Sql4Cds.Engine/ExecutionPlan/ExecuteMessageNode.cs
index 159ddbf7..bb50e5fc 100644
--- a/MarkMpn.Sql4Cds.Engine/ExecutionPlan/ExecuteMessageNode.cs
+++ b/MarkMpn.Sql4Cds.Engine/ExecutionPlan/ExecuteMessageNode.cs
@@ -286,7 +286,8 @@ protected override IEnumerable ExecuteInternal(NodeExecutionContext cont
if (!context.Session.DataSources.TryGetValue(DataSource, out var dataSource))
throw new NotSupportedQueryFragmentException("Missing datasource " + DataSource);
- context.Options.Progress(0, $"Executing {MessageName}...");
+ var progressMessage = $"Executing {MessageName}...";
+ context.Options.Progress(0, progressMessage);
// Get the first page of results
if (!context.Options.ContinueRetrieve(0))
@@ -302,7 +303,7 @@ protected override IEnumerable ExecuteInternal(NodeExecutionContext cont
if (BypassCustomPluginExecution)
request.Parameters["BypassCustomPluginExecution"] = true;
- var response = dataSource.Connection.Execute(request);
+ var response = dataSource.ExecuteWithServiceProtectionLimitLogging(request, context.Options, progressMessage);
var entities = GetEntityCollection(response);
PagesRetrieved++;
@@ -325,7 +326,7 @@ protected override IEnumerable ExecuteInternal(NodeExecutionContext cont
};
- response = dataSource.Connection.Execute(request);
+ response = dataSource.ExecuteWithServiceProtectionLimitLogging(request, context.Options, progressMessage);
entities = GetEntityCollection(response);
PagesRetrieved++;
diff --git a/MarkMpn.Sql4Cds.Engine/ExecutionPlan/FetchXmlScan.cs b/MarkMpn.Sql4Cds.Engine/ExecutionPlan/FetchXmlScan.cs
index 48bba560..8e566438 100644
--- a/MarkMpn.Sql4Cds.Engine/ExecutionPlan/FetchXmlScan.cs
+++ b/MarkMpn.Sql4Cds.Engine/ExecutionPlan/FetchXmlScan.cs
@@ -297,8 +297,17 @@ protected override IEnumerable ExecuteInternal(NodeExecutionContext cont
var meta = dataSource.Metadata[name];
_isVirtualEntity = meta.DataProviderId != null && meta.DataProviderId != DataProviders.ElasticDataProvider;
- if (!(Parent is PartitionedAggregateNode))
- context.Options.Progress(0, $"Retrieving {GetDisplayName(0, meta)}...");
+ string progressMessage;
+
+ if (Parent is PartitionedAggregateNode partitioned)
+ {
+ progressMessage = partitioned.ProgressMessage;
+ }
+ else
+ {
+ progressMessage = $"Retrieving {GetDisplayName(0, meta)}...";
+ context.Options.Progress(0, progressMessage);
+ }
// Get the first page of results
if (!context.Options.ContinueRetrieve(0))
@@ -349,21 +358,7 @@ protected override IEnumerable ExecuteInternal(NodeExecutionContext cont
try
{
- var task = Task.Run(() =>
- {
- return ((RetrieveMultipleResponse)dataSource.Execute(req)).EntityCollection;
- });
-
- try
- {
- task.Wait(context.Options.CancellationToken);
- }
- catch (AggregateException ex)
- {
- throw ex.InnerException;
- }
-
- res = task.Result;
+ res = ((RetrieveMultipleResponse)dataSource.ExecuteWithServiceProtectionLimitLogging(req, context.Options, progressMessage)).EntityCollection;
}
catch (FaultException ex)
{
@@ -400,7 +395,10 @@ protected override IEnumerable ExecuteInternal(NodeExecutionContext cont
while (AllPages && res.MoreRecords && context.Options.ContinueRetrieve(count))
{
if (!(Parent is PartitionedAggregateNode))
- context.Options.Progress(0, $"Retrieved {count:N0} {GetDisplayName(count, meta)}...");
+ {
+ progressMessage = $"Retrieved {count:N0} {GetDisplayName(count, meta)}...";
+ context.Options.Progress(0, progressMessage);
+ }
filter pagingFilter = null;
@@ -422,22 +420,7 @@ protected override IEnumerable ExecuteInternal(NodeExecutionContext cont
}
((FetchExpression)req.Query).Query = Serialize(FetchXml);
-
- var task = Task.Run(() =>
- {
- return ((RetrieveMultipleResponse)dataSource.Execute(req)).EntityCollection;
- });
-
- try
- {
- task.Wait(context.Options.CancellationToken);
- }
- catch (AggregateException ex)
- {
- throw ex.InnerException;
- }
-
- var nextPage = task.Result;
+ var nextPage = ((RetrieveMultipleResponse)dataSource.ExecuteWithServiceProtectionLimitLogging(req, context.Options, progressMessage)).EntityCollection;
PagesRetrieved++;
diff --git a/MarkMpn.Sql4Cds.Engine/ExecutionPlan/InsertNode.cs b/MarkMpn.Sql4Cds.Engine/ExecutionPlan/InsertNode.cs
index 64dc7579..9c39f1ea 100644
--- a/MarkMpn.Sql4Cds.Engine/ExecutionPlan/InsertNode.cs
+++ b/MarkMpn.Sql4Cds.Engine/ExecutionPlan/InsertNode.cs
@@ -148,7 +148,8 @@ public override void Execute(NodeExecutionContext context, out int recordsAffect
{
InProgressUppercase = "Inserting",
InProgressLowercase = "inserting",
- CompletedLowercase = "inserted"
+ CompletedLowercase = "inserted",
+ CompletedUppercase = "Inserted",
},
context,
out recordsAffected,
@@ -164,6 +165,10 @@ public override void Execute(NodeExecutionContext context, out int recordsAffect
throw;
}
+ catch (OperationCanceledException)
+ {
+ throw;
+ }
catch (Exception ex)
{
throw new QueryExecutionException(ex.Message, ex) { Node = this };
diff --git a/MarkMpn.Sql4Cds.Engine/ExecutionPlan/PartitionedAggregateNode.cs b/MarkMpn.Sql4Cds.Engine/ExecutionPlan/PartitionedAggregateNode.cs
index 393c0303..d8830b8e 100644
--- a/MarkMpn.Sql4Cds.Engine/ExecutionPlan/PartitionedAggregateNode.cs
+++ b/MarkMpn.Sql4Cds.Engine/ExecutionPlan/PartitionedAggregateNode.cs
@@ -53,6 +53,9 @@ class Partition
[Description("The maximum number of partitions that will be executed in parallel")]
public int MaxDOP { get; set; }
+ [Browsable(false)]
+ internal string ProgressMessage { get; set; }
+
public override IDataExecutionPlanNodeInternal FoldQuery(NodeCompilationContext context, IList hints)
{
Source = Source.FoldQuery(context, hints);
@@ -213,7 +216,9 @@ protected override IEnumerable ExecuteInternal(NodeExecutionContext cont
lock (_lock)
{
_progress += partition.Percentage;
- context.Options.Progress(0, $"Partitioning {GetDisplayName(0, meta)} ({_progress:P0})...");
+ ProgressMessage = $"Partitioning {GetDisplayName(0, meta)} ({_progress:P0})...";
+
+ context.Options.Progress(0, ProgressMessage);
}
if (Interlocked.Decrement(ref _pendingPartitions) == 0)
diff --git a/MarkMpn.Sql4Cds.Engine/ExecutionPlan/UpdateNode.cs b/MarkMpn.Sql4Cds.Engine/ExecutionPlan/UpdateNode.cs
index b67441db..880adbdc 100644
--- a/MarkMpn.Sql4Cds.Engine/ExecutionPlan/UpdateNode.cs
+++ b/MarkMpn.Sql4Cds.Engine/ExecutionPlan/UpdateNode.cs
@@ -459,7 +459,8 @@ public override void Execute(NodeExecutionContext context, out int recordsAffect
{
InProgressUppercase = "Updating",
InProgressLowercase = "updating",
- CompletedLowercase = "updated"
+ CompletedLowercase = "updated",
+ CompletedUppercase = "Updated"
},
context,
out recordsAffected,
@@ -473,6 +474,10 @@ public override void Execute(NodeExecutionContext context, out int recordsAffect
throw;
}
+ catch (OperationCanceledException)
+ {
+ throw;
+ }
catch (Exception ex)
{
throw new QueryExecutionException(ex.Message, ex) { Node = this };
diff --git a/MarkMpn.Sql4Cds.Engine/ExecutionPlanNodeTypeDescriptor.cs b/MarkMpn.Sql4Cds.Engine/ExecutionPlanNodeTypeDescriptor.cs
index 06f80d16..8e740f95 100644
--- a/MarkMpn.Sql4Cds.Engine/ExecutionPlanNodeTypeDescriptor.cs
+++ b/MarkMpn.Sql4Cds.Engine/ExecutionPlanNodeTypeDescriptor.cs
@@ -1,6 +1,8 @@
using System;
using System.Collections.Generic;
using System.ComponentModel;
+using System.Drawing;
+using System.Drawing.Design;
using System.Globalization;
using System.Linq;
using System.Reflection;
@@ -161,6 +163,10 @@ public override TypeConverter Converter
{
get
{
+ var propTypeConverterAttr = _prop?.GetCustomAttribute();
+ if (propTypeConverterAttr != null)
+ return (TypeConverter)Activator.CreateInstance(Type.GetType(propTypeConverterAttr.ConverterTypeName));
+
var type = _value?.GetType() ?? _prop.PropertyType;
if ((type.IsClass || type.IsInterface) && type != typeof(string))
@@ -425,4 +431,46 @@ class DictionaryKeyAttribute : Attribute
class DictionaryValueAttribute : Attribute
{
}
+
+#if !NETCOREAPP
+ class MiniChartEditor : UITypeEditor
+ {
+ public override bool GetPaintValueSupported(ITypeDescriptorContext context)
+ {
+ return true;
+ }
+
+ public override void PaintValue(PaintValueEventArgs e)
+ {
+ var values = (float[])((ExecutionPlanNodeTypeDescriptor)e.Value).GetPropertyOwner(null);
+ var brush = Brushes.DarkBlue;
+ var barWidth = (float)e.Bounds.Width / values.Length;
+ var maxValue = values.Max();
+
+ for (var i = 0; i < values.Length && maxValue > 0; i++)
+ {
+ var height = e.Bounds.Height * values[i] / maxValue;
+ e.Graphics.FillRectangle(brush, e.Bounds.X + i * barWidth, e.Bounds.Bottom - height, barWidth, height);
+ }
+ }
+ }
+#endif
+
+ class MiniChartConverter : DataCollectionConverter
+ {
+ public override object ConvertTo(ITypeDescriptorContext context, CultureInfo culture, object value, Type destinationType)
+ {
+ if (value is ICustomTypeDescriptor desc)
+ value = desc.GetPropertyOwner(null);
+
+ var values = (float[])value;
+ var maxValue = values.Max();
+ var minValue = values.Min();
+
+ if (minValue == maxValue)
+ return minValue.ToString();
+ else
+ return $"{minValue} - {maxValue}";
+ }
+ }
}
diff --git a/MarkMpn.Sql4Cds.Engine/ExpressionFunctions.cs b/MarkMpn.Sql4Cds.Engine/ExpressionFunctions.cs
index 8a531e17..d5d7ec50 100644
--- a/MarkMpn.Sql4Cds.Engine/ExpressionFunctions.cs
+++ b/MarkMpn.Sql4Cds.Engine/ExpressionFunctions.cs
@@ -1435,7 +1435,7 @@ public static SqlVariant ServerProperty(SqlString propertyName, ExpressionExecut
#endif
if (orgVersion == null)
- orgVersion = ((RetrieveVersionResponse)dataSource.Execute(new RetrieveVersionRequest())).Version;
+ orgVersion = ((RetrieveVersionResponse)dataSource.ExecuteWithServiceProtectionLimitLogging(new RetrieveVersionRequest(), context.Options, "Retrieving server version...")).Version;
return new SqlVariant(DataTypeHelpers.NVarChar(128, dataSource.DefaultCollation, CollationLabel.CoercibleDefault), dataSource.DefaultCollation.ToSqlString(orgVersion), context);
}
diff --git a/MarkMpn.Sql4Cds.Engine/SessionContext.cs b/MarkMpn.Sql4Cds.Engine/SessionContext.cs
index d8dda573..14a76f10 100644
--- a/MarkMpn.Sql4Cds.Engine/SessionContext.cs
+++ b/MarkMpn.Sql4Cds.Engine/SessionContext.cs
@@ -56,7 +56,7 @@ private SqlString GetVersion()
#endif
if (orgVersion == null)
- orgVersion = ((RetrieveVersionResponse)dataSource.Execute(new RetrieveVersionRequest())).Version;
+ orgVersion = ((RetrieveVersionResponse)dataSource.ExecuteWithServiceProtectionLimitLogging(new RetrieveVersionRequest(), _context._options, "Retrieving server version...")).Version;
var assembly = typeof(Sql4CdsConnection).Assembly;
var assemblyVersion = assembly.GetName().Version;