From 2043e01430be991df037cb94d9f1b4b0fd8713e7 Mon Sep 17 00:00:00 2001 From: Sam Harwell Date: Thu, 16 Apr 2020 22:54:12 -0700 Subject: [PATCH] Implement AsyncQueue.TryDequeueAsync This method uses Optional instead of T for the return value of TryDequeueAsync, allowing the method to support normal termination of the queue without throwing cancellation exceptions. --- .../Core/CodeAnalysisTest/AsyncQueueTests.cs | 3 +- .../DiagnosticAnalyzer/AnalyzerDriver.cs | 35 ++++++----- .../Portable/DiagnosticAnalyzer/AsyncQueue.cs | 60 ++++++++++++++----- 3 files changed, 63 insertions(+), 35 deletions(-) diff --git a/src/Compilers/Core/CodeAnalysisTest/AsyncQueueTests.cs b/src/Compilers/Core/CodeAnalysisTest/AsyncQueueTests.cs index 097b7ba7f5c88..18b4e497dfdcd 100644 --- a/src/Compilers/Core/CodeAnalysisTest/AsyncQueueTests.cs +++ b/src/Compilers/Core/CodeAnalysisTest/AsyncQueueTests.cs @@ -158,13 +158,14 @@ public async Task DequeueAfterCompleteWithData() } [Fact] - public void DequeueAsyncWithCancellation() + public async Task DequeueAsyncWithCancellation() { var queue = new AsyncQueue(); var cts = new CancellationTokenSource(); var task = queue.DequeueAsync(cts.Token); Assert.False(task.IsCanceled); cts.Cancel(); + await Assert.ThrowsAsync(() => task); Assert.Equal(TaskStatus.Canceled, task.Status); } diff --git a/src/Compilers/Core/Portable/DiagnosticAnalyzer/AnalyzerDriver.cs b/src/Compilers/Core/Portable/DiagnosticAnalyzer/AnalyzerDriver.cs index 069dbe0e341e7..62d2af97f11b6 100644 --- a/src/Compilers/Core/Portable/DiagnosticAnalyzer/AnalyzerDriver.cs +++ b/src/Compilers/Core/Portable/DiagnosticAnalyzer/AnalyzerDriver.cs @@ -1199,29 +1199,28 @@ private async Task ProcessCompilationEventsCoreAsync( } CompilationEvent e; - try + if (!CompilationEventQueue.TryDequeue(out e)) { - if (!CompilationEventQueue.TryDequeue(out e)) + if (!prePopulatedEventQueue) { - if (!prePopulatedEventQueue) - { - e = await CompilationEventQueue.DequeueAsync(cancellationToken).ConfigureAwait(false); - } - else + var optionalEvent = await CompilationEventQueue.TryDequeueAsync(cancellationToken).ConfigureAwait(false); + if (!optionalEvent.HasValue) { - return completedEvent; + // When the queue is completed with a pending TryDequeueAsync return, the + // the Optional will not have a value. This signals the queue has reached + // completion and no more items will be added to it. + + // This failure is being tracked by https://github.com/dotnet/roslyn/issues/5962 + // Debug.Assert(CompilationEventQueue.IsCompleted, "TryDequeueAsync should provide a value unless the AsyncQueue is completed."); + break; } - } - } - catch (TaskCanceledException) when (!prePopulatedEventQueue) - { - // When the queue is completed with a pending DequeueAsync return then a - // TaskCanceledException will be thrown. This just signals the queue is - // complete and we should finish processing it. - // This failure is being tracked by https://github.com/dotnet/roslyn/issues/5962 - // Debug.Assert(CompilationEventQueue.IsCompleted, "DequeueAsync should never throw unless the AsyncQueue is completed."); - break; + e = optionalEvent.Value; + } + else + { + return completedEvent; + } } // Don't process the compilation completed event as other worker threads might still be processing other compilation events. diff --git a/src/Compilers/Core/Portable/DiagnosticAnalyzer/AsyncQueue.cs b/src/Compilers/Core/Portable/DiagnosticAnalyzer/AsyncQueue.cs index cf53533147abd..4d344edd23d8c 100644 --- a/src/Compilers/Core/Portable/DiagnosticAnalyzer/AsyncQueue.cs +++ b/src/Compilers/Core/Portable/DiagnosticAnalyzer/AsyncQueue.cs @@ -22,7 +22,7 @@ internal sealed class AsyncQueue // Note: All of the below fields are accessed in parallel and may only be accessed // when protected by lock (SyncObject) private readonly Queue _data = new Queue(); - private Queue> _waiters; + private Queue>> _waiters; private bool _completed; private bool _disallowEnqueue; @@ -76,7 +76,7 @@ private bool EnqueueCore(TElement value) throw new InvalidOperationException($"Cannot enqueue data after PromiseNotToEnqueue."); } - TaskCompletionSource waiter; + TaskCompletionSource> waiter; lock (SyncObject) { if (_completed) @@ -163,7 +163,7 @@ public bool TryComplete() private bool CompleteCore() { - Queue> existingWaiters; + Queue>> existingWaiters; lock (SyncObject) { if (_completed) @@ -191,7 +191,7 @@ private bool CompleteCore() Debug.Assert(this.Count == 0, "we should not be cancelling the waiters when we have items in the queue"); foreach (var tcs in existingWaiters) { - tcs.SetCanceled(); + tcs.SetResult(default); } } @@ -224,7 +224,36 @@ public Task WhenCompletedTask [PerformanceSensitive("https://github.com/dotnet/roslyn/issues/23582", OftenCompletesSynchronously = true)] public Task DequeueAsync(CancellationToken cancellationToken = default(CancellationToken)) { - return WithCancellationAsync(DequeueCoreAsync(), cancellationToken); + var optionalResult = TryDequeueAsync(cancellationToken); + if (optionalResult.IsCompletedSuccessfully) + { + var result = optionalResult.Result; + return result.HasValue + ? Task.FromResult(result.Value) + : Task.FromCanceled(new CancellationToken(canceled: true)); + } + + return dequeueSlowAsync(optionalResult); + + static async Task dequeueSlowAsync(ValueTask> optionalResult) + { + var result = await optionalResult.ConfigureAwait(false); + if (!result.HasValue) + new CancellationToken(canceled: true).ThrowIfCancellationRequested(); + + return result.Value; + } + } + + /// + /// Gets a task whose result is the element at the head of the queue. If the queue + /// is empty, the returned task waits for an element to be enqueued. If + /// is called before an element becomes available, the returned task is completed and + /// will be . + /// + public ValueTask> TryDequeueAsync(CancellationToken cancellationToken) + { + return WithCancellationAsync(TryDequeueCoreAsync(), cancellationToken); } /// @@ -232,7 +261,7 @@ public Task WhenCompletedTask /// Note: The early cancellation behavior is intentional. /// [PerformanceSensitive("https://github.com/dotnet/roslyn/issues/23582", OftenCompletesSynchronously = true)] - private static Task WithCancellationAsync(Task task, CancellationToken cancellationToken) + private static ValueTask WithCancellationAsync(ValueTask task, CancellationToken cancellationToken) { if (task.IsCompleted || !cancellationToken.CanBeCanceled) { @@ -241,14 +270,15 @@ private static Task WithCancellationAsync(Task task, CancellationToken if (cancellationToken.IsCancellationRequested) { - return new Task(() => default(T), cancellationToken); + task.Preserve(); + return new ValueTask(Task.FromCanceled(cancellationToken)); } - return task.ContinueWith(t => t, cancellationToken, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default).Unwrap(); + return new ValueTask(task.AsTask().ContinueWith(t => t, cancellationToken, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default).Unwrap()); } [PerformanceSensitive("https://github.com/dotnet/roslyn/issues/23582", OftenCompletesSynchronously = true)] - private Task DequeueCoreAsync() + private ValueTask> TryDequeueCoreAsync() { lock (SyncObject) { @@ -256,25 +286,23 @@ private Task DequeueCoreAsync() // in the queue. This keeps the behavior in line with TryDequeue if (_data.Count > 0) { - return Task.FromResult(_data.Dequeue()); + return new ValueTask>(_data.Dequeue()); } if (_completed) { - var tcs = new TaskCompletionSource(); - tcs.SetCanceled(); - return tcs.Task; + return new ValueTask>(default(Optional)); } else { if (_waiters == null) { - _waiters = new Queue>(); + _waiters = new Queue>>(); } - var waiter = new TaskCompletionSource(); + var waiter = new TaskCompletionSource>(); _waiters.Enqueue(waiter); - return waiter.Task; + return new ValueTask>(waiter.Task); } } }