Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐇 Implement AsyncQueue<T>.TryDequeueAsync #43440

Merged
merged 1 commit into from
Apr 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/Compilers/Core/CodeAnalysisTest/AsyncQueueTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,14 @@ public async Task DequeueAfterCompleteWithData()
}

[Fact]
public void DequeueAsyncWithCancellation()
public async Task DequeueAsyncWithCancellation()
{
var queue = new AsyncQueue<int>();
var cts = new CancellationTokenSource();
var task = queue.DequeueAsync(cts.Token);
Assert.False(task.IsCanceled);
cts.Cancel();
await Assert.ThrowsAsync<TaskCanceledException>(() => task);
Assert.Equal(TaskStatus.Canceled, task.Status);
}

Expand Down
35 changes: 17 additions & 18 deletions src/Compilers/Core/Portable/DiagnosticAnalyzer/AnalyzerDriver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1199,29 +1199,28 @@ private async Task<CompilationCompletedEvent> ProcessCompilationEventsCoreAsync(
}

CompilationEvent e;
try
if (!CompilationEventQueue.TryDequeue(out e))
{
if (!CompilationEventQueue.TryDequeue(out e))
if (!prePopulatedEventQueue)
{
if (!prePopulatedEventQueue)
{
e = await CompilationEventQueue.DequeueAsync(cancellationToken).ConfigureAwait(false);
}
else
var optionalEvent = await CompilationEventQueue.TryDequeueAsync(cancellationToken).ConfigureAwait(false);
if (!optionalEvent.HasValue)
{
return completedEvent;
// When the queue is completed with a pending TryDequeueAsync return, the
// the Optional<T> will not have a value. This signals the queue has reached
// completion and no more items will be added to it.

// This failure is being tracked by https://github.com/dotnet/roslyn/issues/5962
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This failure is being tracked by [](start = 35, length = 32)

This comment appears pretty out of date. Can this assertion be uncommented? At the very least we should have an open issue tracking this potential failure.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll submit this as a follow-up

// Debug.Assert(CompilationEventQueue.IsCompleted, "TryDequeueAsync should provide a value unless the AsyncQueue<T> is completed.");
break;
}
}
}
catch (TaskCanceledException) when (!prePopulatedEventQueue)
{
// When the queue is completed with a pending DequeueAsync return then a
// TaskCanceledException will be thrown. This just signals the queue is
// complete and we should finish processing it.

// This failure is being tracked by https://github.com/dotnet/roslyn/issues/5962
// Debug.Assert(CompilationEventQueue.IsCompleted, "DequeueAsync should never throw unless the AsyncQueue<T> is completed.");
break;
e = optionalEvent.Value;
}
else
{
return completedEvent;
}
}

// Don't process the compilation completed event as other worker threads might still be processing other compilation events.
Expand Down
60 changes: 44 additions & 16 deletions src/Compilers/Core/Portable/DiagnosticAnalyzer/AsyncQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ internal sealed class AsyncQueue<TElement>
// Note: All of the below fields are accessed in parallel and may only be accessed
// when protected by lock (SyncObject)
private readonly Queue<TElement> _data = new Queue<TElement>();
private Queue<TaskCompletionSource<TElement>> _waiters;
private Queue<TaskCompletionSource<Optional<TElement>>> _waiters;
private bool _completed;
private bool _disallowEnqueue;

Expand Down Expand Up @@ -76,7 +76,7 @@ private bool EnqueueCore(TElement value)
throw new InvalidOperationException($"Cannot enqueue data after PromiseNotToEnqueue.");
}

TaskCompletionSource<TElement> waiter;
TaskCompletionSource<Optional<TElement>> waiter;
lock (SyncObject)
{
if (_completed)
Expand Down Expand Up @@ -163,7 +163,7 @@ public bool TryComplete()

private bool CompleteCore()
{
Queue<TaskCompletionSource<TElement>> existingWaiters;
Queue<TaskCompletionSource<Optional<TElement>>> existingWaiters;
lock (SyncObject)
{
if (_completed)
Expand Down Expand Up @@ -191,7 +191,7 @@ private bool CompleteCore()
Debug.Assert(this.Count == 0, "we should not be cancelling the waiters when we have items in the queue");
foreach (var tcs in existingWaiters)
{
tcs.SetCanceled();
tcs.SetResult(default);
}
}

Expand Down Expand Up @@ -224,15 +224,44 @@ public Task WhenCompletedTask
[PerformanceSensitive("https://github.com/dotnet/roslyn/issues/23582", OftenCompletesSynchronously = true)]
public Task<TElement> DequeueAsync(CancellationToken cancellationToken = default(CancellationToken))
{
return WithCancellationAsync(DequeueCoreAsync(), cancellationToken);
var optionalResult = TryDequeueAsync(cancellationToken);
if (optionalResult.IsCompletedSuccessfully)
{
var result = optionalResult.Result;
return result.HasValue
? Task.FromResult(result.Value)
: Task.FromCanceled<TElement>(new CancellationToken(canceled: true));
}

return dequeueSlowAsync(optionalResult);

static async Task<TElement> dequeueSlowAsync(ValueTask<Optional<TElement>> optionalResult)
{
var result = await optionalResult.ConfigureAwait(false);
if (!result.HasValue)
new CancellationToken(canceled: true).ThrowIfCancellationRequested();

return result.Value;
}
}

/// <summary>
/// Gets a task whose result is the element at the head of the queue. If the queue
/// is empty, the returned task waits for an element to be enqueued. If <see cref="Complete"/>
/// is called before an element becomes available, the returned task is completed and
/// <see cref="Optional{T}.HasValue"/> will be <see langword="false"/>.
/// </summary>
public ValueTask<Optional<TElement>> TryDequeueAsync(CancellationToken cancellationToken)
{
return WithCancellationAsync(TryDequeueCoreAsync(), cancellationToken);
}

/// <summary>
///
/// Note: The early cancellation behavior is intentional.
/// </summary>
[PerformanceSensitive("https://github.com/dotnet/roslyn/issues/23582", OftenCompletesSynchronously = true)]
private static Task<T> WithCancellationAsync<T>(Task<T> task, CancellationToken cancellationToken)
private static ValueTask<T> WithCancellationAsync<T>(ValueTask<T> task, CancellationToken cancellationToken)
{
if (task.IsCompleted || !cancellationToken.CanBeCanceled)
{
Expand All @@ -241,40 +270,39 @@ private static Task<T> WithCancellationAsync<T>(Task<T> task, CancellationToken

if (cancellationToken.IsCancellationRequested)
{
return new Task<T>(() => default(T), cancellationToken);
task.Preserve();
return new ValueTask<T>(Task.FromCanceled<T>(cancellationToken));
}

return task.ContinueWith(t => t, cancellationToken, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default).Unwrap();
return new ValueTask<T>(task.AsTask().ContinueWith(t => t, cancellationToken, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default).Unwrap());
}

[PerformanceSensitive("https://github.com/dotnet/roslyn/issues/23582", OftenCompletesSynchronously = true)]
private Task<TElement> DequeueCoreAsync()
private ValueTask<Optional<TElement>> TryDequeueCoreAsync()
{
lock (SyncObject)
{
// No matter what the state we allow DequeueAsync to drain the existing items
// in the queue. This keeps the behavior in line with TryDequeue
if (_data.Count > 0)
{
return Task.FromResult(_data.Dequeue());
return new ValueTask<Optional<TElement>>(_data.Dequeue());
}

if (_completed)
{
var tcs = new TaskCompletionSource<TElement>();
tcs.SetCanceled();
return tcs.Task;
return new ValueTask<Optional<TElement>>(default(Optional<TElement>));
}
else
{
if (_waiters == null)
{
_waiters = new Queue<TaskCompletionSource<TElement>>();
_waiters = new Queue<TaskCompletionSource<Optional<TElement>>>();
}

var waiter = new TaskCompletionSource<TElement>();
var waiter = new TaskCompletionSource<Optional<TElement>>();
_waiters.Enqueue(waiter);
return waiter.Task;
return new ValueTask<Optional<TElement>>(waiter.Task);
}
}
}
Expand Down