From b4e0169bfe10cfe69f6e7a8952b8f80fdfe9e31e Mon Sep 17 00:00:00 2001 From: Stephen Toub Date: Tue, 23 Apr 2024 09:11:55 -0400 Subject: [PATCH] Dedup UnboundedChannel and UnboundedPriorityChannel (#101396) * Dedup UnboundedChannel and UnboundedPriorityChannel We can use generic specialization to avoid duplicating all the code for the different queue types. This should also make it much simpler to add other queue types in the future. * Address PR feedback --- .../src/System.Threading.Channels.csproj | 2 +- .../src/System/Threading/Channels/Channel.cs | 35 +- .../Threading/Channels/Channel.netcoreapp.cs | 44 ++- .../Threading/Channels/IDebugEnumerator.cs | 4 +- .../Channels/IUnboundedChannelQueue.cs | 35 ++ .../Threading/Channels/UnboundedChannel.cs | 92 +++-- .../Channels/UnboundedPriorityChannel.cs | 369 ------------------ 7 files changed, 179 insertions(+), 402 deletions(-) create mode 100644 src/libraries/System.Threading.Channels/src/System/Threading/Channels/IUnboundedChannelQueue.cs delete mode 100644 src/libraries/System.Threading.Channels/src/System/Threading/Channels/UnboundedPriorityChannel.cs diff --git a/src/libraries/System.Threading.Channels/src/System.Threading.Channels.csproj b/src/libraries/System.Threading.Channels/src/System.Threading.Channels.csproj index c058b0b216b17..a5030bcee6561 100644 --- a/src/libraries/System.Threading.Channels/src/System.Threading.Channels.csproj +++ b/src/libraries/System.Threading.Channels/src/System.Threading.Channels.csproj @@ -25,6 +25,7 @@ System.Threading.Channel<T> + @@ -44,7 +45,6 @@ System.Threading.Channel<T> - diff --git a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel.cs b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel.cs index 317636579a05f..834d8ad88ed1d 100644 --- a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel.cs +++ b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel.cs @@ -1,6 +1,10 @@ // Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT license. +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; + namespace System.Threading.Channels { /// Provides static methods for creating channels. @@ -9,7 +13,7 @@ public static partial class Channel /// Creates an unbounded channel usable by any number of readers and writers concurrently. /// The created channel. public static Channel CreateUnbounded() => - new UnboundedChannel(runContinuationsAsynchronously: true); + new UnboundedChannel>(new(new()), runContinuationsAsynchronously: true); /// Creates an unbounded channel subject to the provided options. /// Specifies the type of data in the channel. @@ -27,7 +31,7 @@ public static Channel CreateUnbounded(UnboundedChannelOptions options) return new SingleConsumerUnboundedChannel(!options.AllowSynchronousContinuations); } - return new UnboundedChannel(!options.AllowSynchronousContinuations); + return new UnboundedChannel>(new(new()), !options.AllowSynchronousContinuations); } /// Creates a channel with the specified maximum capacity. @@ -71,5 +75,32 @@ public static Channel CreateBounded(BoundedChannelOptions options, Action< return new BoundedChannel(options.Capacity, options.FullMode, !options.AllowSynchronousContinuations, itemDropped); } + + /// Provides an for a . + private readonly struct UnboundedChannelConcurrentQueue(ConcurrentQueue queue) : IUnboundedChannelQueue + { + private readonly ConcurrentQueue _queue = queue; + + /// + public bool IsThreadSafe => true; + + /// + public void Enqueue(T item) => _queue.Enqueue(item); + + /// + public bool TryDequeue([MaybeNullWhen(false)] out T item) => _queue.TryDequeue(out item); + + /// + public bool TryPeek([MaybeNullWhen(false)] out T item) => _queue.TryPeek(out item); + + /// + public int Count => _queue.Count; + + /// + public bool IsEmpty => _queue.IsEmpty; + + /// + public IEnumerator GetEnumerator() => _queue.GetEnumerator(); + } } } diff --git a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel.netcoreapp.cs b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel.netcoreapp.cs index 6c24b3e41ec7b..c8fc9baebae7d 100644 --- a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel.netcoreapp.cs +++ b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel.netcoreapp.cs @@ -2,6 +2,7 @@ // The .NET Foundation licenses this file to you under the MIT license. using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; namespace System.Threading.Channels { @@ -9,13 +10,14 @@ namespace System.Threading.Channels public static partial class Channel { /// Creates an unbounded prioritized channel usable by any number of readers and writers concurrently. + /// Specifies the type of data in the channel. /// The created channel. /// /// is used to determine priority of elements. /// The next item read from the channel will be the element available in the channel with the lowest priority value. /// public static Channel CreateUnboundedPrioritized() => - new UnboundedPrioritizedChannel(runContinuationsAsynchronously: true, comparer: null); + new UnboundedChannel>(new(new()), runContinuationsAsynchronously: true); /// Creates an unbounded prioritized channel subject to the provided options. /// Specifies the type of data in the channel. @@ -30,7 +32,45 @@ public static Channel CreateUnboundedPrioritized(UnboundedPrioritizedChann { ArgumentNullException.ThrowIfNull(options); - return new UnboundedPrioritizedChannel(!options.AllowSynchronousContinuations, options.Comparer); + return new UnboundedChannel>(new(new(options.Comparer)), !options.AllowSynchronousContinuations); + } + + /// Provides an for a . + private readonly struct UnboundedChannelPriorityQueue(PriorityQueue queue) : IUnboundedChannelQueue + { + private readonly PriorityQueue _queue = queue; + + /// + public bool IsThreadSafe => false; + + /// + public void Enqueue(T item) => _queue.Enqueue(true, item); + + /// + public bool TryDequeue([MaybeNullWhen(false)] out T item) => _queue.TryDequeue(out _, out item); + + /// + public bool TryPeek([MaybeNullWhen(false)] out T item) => _queue.TryPeek(out _, out item); + + /// + public int Count => _queue.Count; + + /// + public bool IsEmpty => _queue.Count == 0; + + /// + public IEnumerator GetEnumerator() + { + List list = []; + foreach ((bool _, T Priority) item in _queue.UnorderedItems) + { + list.Add(item.Priority); + } + + list.Sort(_queue.Comparer); + + return list.GetEnumerator(); + } } } } diff --git a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/IDebugEnumerator.cs b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/IDebugEnumerator.cs index a3d072ee9f7cb..af2a77bb1bf77 100644 --- a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/IDebugEnumerator.cs +++ b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/IDebugEnumerator.cs @@ -11,7 +11,7 @@ internal interface IDebugEnumerable IEnumerator GetEnumerator(); } - internal sealed class DebugEnumeratorDebugView + internal class DebugEnumeratorDebugView { public DebugEnumeratorDebugView(IDebugEnumerable enumerable) { @@ -26,4 +26,6 @@ public DebugEnumeratorDebugView(IDebugEnumerable enumerable) [DebuggerBrowsable(DebuggerBrowsableState.RootHidden)] public T[] Items { get; } } + + internal sealed class DebugEnumeratorDebugView(IDebugEnumerable enumerable) : DebugEnumeratorDebugView(enumerable); } diff --git a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/IUnboundedChannelQueue.cs b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/IUnboundedChannelQueue.cs new file mode 100644 index 0000000000000..b1b65a1dffeb1 --- /dev/null +++ b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/IUnboundedChannelQueue.cs @@ -0,0 +1,35 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; + +namespace System.Threading.Channels +{ + /// Representation of the queue data structure used by . + internal interface IUnboundedChannelQueue : IDebugEnumerable + { + /// Gets whether the other members are safe to use concurrently with each other and themselves. + bool IsThreadSafe { get; } + + /// Enqueues an item into the queue. + /// The item to enqueue. + void Enqueue(T item); + + /// Dequeues an item from the queue, if possible. + /// The dequeued item, or default if the queue was empty. + /// Whether an item was dequeued. + bool TryDequeue([MaybeNullWhen(false)] out T item); + + /// Peeks at the next item from the queue that would be dequeued, if possible. + /// The peeked item, or default if the queue was empty. + /// Whether an item was peeked. + bool TryPeek([MaybeNullWhen(false)] out T item); + + /// Gets the number of elements in the queue. + int Count { get; } + + /// Gets whether the queue is empty. + bool IsEmpty { get; } + } +} diff --git a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/UnboundedChannel.cs b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/UnboundedChannel.cs index fb3facf83dc47..ad7ee0e3608d3 100644 --- a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/UnboundedChannel.cs +++ b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/UnboundedChannel.cs @@ -5,19 +5,20 @@ using System.Collections.Generic; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; +using System.Runtime.CompilerServices; using System.Threading.Tasks; namespace System.Threading.Channels { /// Provides a buffered channel of unbounded capacity. [DebuggerDisplay("Items = {ItemsCountForDebugger}, Closed = {ChannelIsClosedForDebugger}")] - [DebuggerTypeProxy(typeof(DebugEnumeratorDebugView<>))] - internal sealed class UnboundedChannel : Channel, IDebugEnumerable + [DebuggerTypeProxy(typeof(DebugEnumeratorDebugView<,>))] + internal sealed class UnboundedChannel : Channel, IDebugEnumerable where TQueue : struct, IUnboundedChannelQueue { /// Task that indicates the channel has completed. private readonly TaskCompletionSource _completion; /// The items in the channel. - private readonly ConcurrentQueue _items = new ConcurrentQueue(); + private readonly TQueue _items; /// Readers blocked reading from the channel. private readonly Deque> _blockedReaders = new Deque>(); /// Whether to force continuations to be executed asynchronously from producer writes. @@ -29,8 +30,9 @@ internal sealed class UnboundedChannel : Channel, IDebugEnumerable private Exception? _doneWriting; /// Initialize the channel. - internal UnboundedChannel(bool runContinuationsAsynchronously) + internal UnboundedChannel(TQueue items, bool runContinuationsAsynchronously) { + _items = items; _runContinuationsAsynchronously = runContinuationsAsynchronously; _completion = new TaskCompletionSource(runContinuationsAsynchronously ? TaskCreationOptions.RunContinuationsAsynchronously : TaskCreationOptions.None); Reader = new UnboundedChannelReader(this); @@ -38,14 +40,14 @@ internal UnboundedChannel(bool runContinuationsAsynchronously) } [DebuggerDisplay("Items = {Count}")] - [DebuggerTypeProxy(typeof(DebugEnumeratorDebugView<>))] + [DebuggerTypeProxy(typeof(DebugEnumeratorDebugView<,>))] private sealed class UnboundedChannelReader : ChannelReader, IDebugEnumerable { - internal readonly UnboundedChannel _parent; + internal readonly UnboundedChannel _parent; private readonly AsyncOperation _readerSingleton; private readonly AsyncOperation _waiterSingleton; - internal UnboundedChannelReader(UnboundedChannel parent) + internal UnboundedChannelReader(UnboundedChannel parent) { _parent = parent; _readerSingleton = new AsyncOperation(parent._runContinuationsAsynchronously, pooled: true); @@ -68,8 +70,8 @@ public override ValueTask ReadAsync(CancellationToken cancellationToken) } // Dequeue an item if we can. - UnboundedChannel parent = _parent; - if (parent._items.TryDequeue(out T? item)) + UnboundedChannel parent = _parent; + if (parent._items.IsThreadSafe && parent._items.TryDequeue(out T? item)) { CompleteIfDone(parent); return new ValueTask(item); @@ -112,24 +114,60 @@ public override ValueTask ReadAsync(CancellationToken cancellationToken) public override bool TryRead([MaybeNullWhen(false)] out T item) { - UnboundedChannel parent = _parent; + UnboundedChannel parent = _parent; + return parent._items.IsThreadSafe ? + LockFree(parent, out item) : + Locked(parent, out item); - // Dequeue an item if we can - if (parent._items.TryDequeue(out item)) + static bool LockFree(UnboundedChannel parent, [MaybeNullWhen(false)] out T item) { - CompleteIfDone(parent); - return true; + if (parent._items.TryDequeue(out item)) + { + CompleteIfDone(parent); + return true; + } + + item = default; + return false; } - item = default; - return false; + static bool Locked(UnboundedChannel parent, [MaybeNullWhen(false)] out T item) + { + lock (parent.SyncObj) + { + if (parent._items.TryDequeue(out item)) + { + CompleteIfDone(parent); + return true; + } + } + + item = default; + return false; + } } - public override bool TryPeek([MaybeNullWhen(false)] out T item) => - _parent._items.TryPeek(out item); + public override bool TryPeek([MaybeNullWhen(false)] out T item) + { + UnboundedChannel parent = _parent; + return parent._items.IsThreadSafe ? + parent._items.TryPeek(out item) : + Locked(parent, out item); + + // Separated out to keep the try/finally from preventing TryPeek from being inlined + static bool Locked(UnboundedChannel parent, [MaybeNullWhen(false)] out T item) + { + lock (parent.SyncObj) + { + return parent._items.TryPeek(out item); + } + } + } - private static void CompleteIfDone(UnboundedChannel parent) + private static void CompleteIfDone(UnboundedChannel parent) { + Debug.Assert(parent._items.IsThreadSafe || Monitor.IsEntered(parent.SyncObj)); + if (parent._doneWriting != null && parent._items.IsEmpty) { // If we've now emptied the items queue and we're not getting any more, complete. @@ -144,12 +182,12 @@ public override ValueTask WaitToReadAsync(CancellationToken cancellationTo return new ValueTask(Task.FromCanceled(cancellationToken)); } - if (!_parent._items.IsEmpty) + if (_parent._items.IsThreadSafe && !_parent._items.IsEmpty) { return new ValueTask(true); } - UnboundedChannel parent = _parent; + UnboundedChannel parent = _parent; lock (parent.SyncObj) { @@ -192,15 +230,15 @@ public override ValueTask WaitToReadAsync(CancellationToken cancellationTo } [DebuggerDisplay("Items = {ItemsCountForDebugger}")] - [DebuggerTypeProxy(typeof(DebugEnumeratorDebugView<>))] + [DebuggerTypeProxy(typeof(DebugEnumeratorDebugView<,>))] private sealed class UnboundedChannelWriter : ChannelWriter, IDebugEnumerable { - internal readonly UnboundedChannel _parent; - internal UnboundedChannelWriter(UnboundedChannel parent) => _parent = parent; + internal readonly UnboundedChannel _parent; + internal UnboundedChannelWriter(UnboundedChannel parent) => _parent = parent; public override bool TryComplete(Exception? error) { - UnboundedChannel parent = _parent; + UnboundedChannel parent = _parent; bool completeTask; lock (parent.SyncObj) @@ -240,7 +278,7 @@ public override bool TryComplete(Exception? error) public override bool TryWrite(T item) { - UnboundedChannel parent = _parent; + UnboundedChannel parent = _parent; while (true) { AsyncOperation? blockedReader = null; @@ -321,7 +359,7 @@ public override ValueTask WriteAsync(T item, CancellationToken cancellationToken } /// Gets the object used to synchronize access to all state on this instance. - private object SyncObj => _items; + private object SyncObj => _blockedReaders; [Conditional("DEBUG")] private void AssertInvariants() diff --git a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/UnboundedPriorityChannel.cs b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/UnboundedPriorityChannel.cs deleted file mode 100644 index 7af18b9413ee2..0000000000000 --- a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/UnboundedPriorityChannel.cs +++ /dev/null @@ -1,369 +0,0 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. - -using System.Collections.Generic; -using System.Diagnostics; -using System.Diagnostics.CodeAnalysis; -using System.Threading.Tasks; - -// This file is primarily a copy of UnboundedChannel, subsequently tweaked to account for differences -// between ConcurrentQueue and PriorityQueue, e.g. that PQ isn't thread safe and so fast -// paths outside of locks need to be removed, that Enqueue/Dequeue methods take priorities, etc. Any -// changes made to this or that file should largely be kept in sync. - -namespace System.Threading.Channels -{ - /// Provides a buffered channel of unbounded capacity. - [DebuggerDisplay("Items = {ItemsCountForDebugger}, Closed = {ChannelIsClosedForDebugger}")] - [DebuggerTypeProxy(typeof(DebugEnumeratorDebugView<>))] - internal sealed class UnboundedPrioritizedChannel : Channel, IDebugEnumerable - { - /// Task that indicates the channel has completed. - private readonly TaskCompletionSource _completion; - /// The items in the channel. - /// To avoid double storing of a potentially large struct T, the priority doubles as the element and the element is ignored. - private readonly PriorityQueue _items; - /// Readers blocked reading from the channel. - private readonly Deque> _blockedReaders = new Deque>(); - /// Whether to force continuations to be executed asynchronously from producer writes. - private readonly bool _runContinuationsAsynchronously; - - /// Readers waiting for a notification that data is available. - private AsyncOperation? _waitingReadersTail; - /// Set to non-null once Complete has been called. - private Exception? _doneWriting; - - /// Initialize the channel. - internal UnboundedPrioritizedChannel(bool runContinuationsAsynchronously, IComparer? comparer) - { - _runContinuationsAsynchronously = runContinuationsAsynchronously; - _completion = new TaskCompletionSource(runContinuationsAsynchronously ? TaskCreationOptions.RunContinuationsAsynchronously : TaskCreationOptions.None); - Reader = new UnboundedPrioritizedChannelReader(this); - Writer = new UnboundedPrioritizedChannelWriter(this); - _items = new PriorityQueue(comparer); - } - - [DebuggerDisplay("Items = {Count}")] - [DebuggerTypeProxy(typeof(DebugEnumeratorDebugView<>))] - private sealed class UnboundedPrioritizedChannelReader : ChannelReader, IDebugEnumerable - { - internal readonly UnboundedPrioritizedChannel _parent; - private readonly AsyncOperation _readerSingleton; - private readonly AsyncOperation _waiterSingleton; - - internal UnboundedPrioritizedChannelReader(UnboundedPrioritizedChannel parent) - { - _parent = parent; - _readerSingleton = new AsyncOperation(parent._runContinuationsAsynchronously, pooled: true); - _waiterSingleton = new AsyncOperation(parent._runContinuationsAsynchronously, pooled: true); - } - - public override Task Completion => _parent._completion.Task; - - public override bool CanCount => true; - - public override bool CanPeek => true; - - public override int Count => _parent._items.Count; - - public override ValueTask ReadAsync(CancellationToken cancellationToken) - { - if (cancellationToken.IsCancellationRequested) - { - return ValueTask.FromCanceled(cancellationToken); - } - - // Dequeue an item if we can. - UnboundedPrioritizedChannel parent = _parent; - lock (parent.SyncObj) - { - parent.AssertInvariants(); - - // Try to dequeue again, now that we hold the lock. - if (parent._items.TryDequeue(out _, out T? item)) - { - CompleteIfDone(parent); - return new ValueTask(item); - } - - // There are no items, so if we're done writing, fail. - if (parent._doneWriting != null) - { - return ChannelUtilities.GetInvalidCompletionValueTask(parent._doneWriting); - } - - // If we're able to use the singleton reader, do so. - if (!cancellationToken.CanBeCanceled) - { - AsyncOperation singleton = _readerSingleton; - if (singleton.TryOwnAndReset()) - { - parent._blockedReaders.EnqueueTail(singleton); - return singleton.ValueTaskOfT; - } - } - - // Otherwise, create and queue a reader. - var reader = new AsyncOperation(parent._runContinuationsAsynchronously, cancellationToken); - parent._blockedReaders.EnqueueTail(reader); - return reader.ValueTaskOfT; - } - } - - public override bool TryRead([MaybeNullWhen(false)] out T item) - { - UnboundedPrioritizedChannel parent = _parent; - lock (parent.SyncObj) - { - // Dequeue an item if we can - if (parent._items.TryDequeue(out _, out item)) - { - CompleteIfDone(parent); - return true; - } - - item = default; - return false; - } - } - - public override bool TryPeek([MaybeNullWhen(false)] out T item) => - _parent._items.TryPeek(out _, out item); - - private static void CompleteIfDone(UnboundedPrioritizedChannel parent) - { - Debug.Assert(Monitor.IsEntered(parent.SyncObj)); - - if (parent._doneWriting != null && parent._items.Count == 0) - { - // If we've now emptied the items queue and we're not getting any more, complete. - ChannelUtilities.Complete(parent._completion, parent._doneWriting); - } - } - - public override ValueTask WaitToReadAsync(CancellationToken cancellationToken) - { - if (cancellationToken.IsCancellationRequested) - { - return ValueTask.FromCanceled(cancellationToken); - } - - UnboundedPrioritizedChannel parent = _parent; - lock (parent.SyncObj) - { - parent.AssertInvariants(); - - // Try again to read now that we're synchronized with writers. - if (parent._items.Count != 0) - { - return new ValueTask(true); - } - - // There are no items, so if we're done writing, there's never going to be data available. - if (parent._doneWriting != null) - { - return parent._doneWriting != ChannelUtilities.s_doneWritingSentinel ? - ValueTask.FromException(parent._doneWriting) : - default; - } - - // If we're able to use the singleton waiter, do so. - if (!cancellationToken.CanBeCanceled) - { - AsyncOperation singleton = _waiterSingleton; - if (singleton.TryOwnAndReset()) - { - ChannelUtilities.QueueWaiter(ref parent._waitingReadersTail, singleton); - return singleton.ValueTaskOfT; - } - } - - // Otherwise, create and queue a waiter. - var waiter = new AsyncOperation(parent._runContinuationsAsynchronously, cancellationToken); - ChannelUtilities.QueueWaiter(ref parent._waitingReadersTail, waiter); - return waiter.ValueTaskOfT; - } - } - - /// Gets an enumerator the debugger can use to show the contents of the channel. - IEnumerator IDebugEnumerable.GetEnumerator() => _parent.GetEnumerator(); - } - - [DebuggerDisplay("Items = {ItemsCountForDebugger}")] - [DebuggerTypeProxy(typeof(DebugEnumeratorDebugView<>))] - private sealed class UnboundedPrioritizedChannelWriter : ChannelWriter, IDebugEnumerable - { - internal readonly UnboundedPrioritizedChannel _parent; - internal UnboundedPrioritizedChannelWriter(UnboundedPrioritizedChannel parent) => _parent = parent; - - public override bool TryComplete(Exception? error) - { - UnboundedPrioritizedChannel parent = _parent; - bool completeTask; - - lock (parent.SyncObj) - { - parent.AssertInvariants(); - - // If we've already marked the channel as completed, bail. - if (parent._doneWriting != null) - { - return false; - } - - // Mark that we're done writing. - parent._doneWriting = error ?? ChannelUtilities.s_doneWritingSentinel; - completeTask = parent._items.Count == 0; - } - - // If there are no items in the queue, complete the channel's task, - // as no more data can possibly arrive at this point. We do this outside - // of the lock in case we'll be running synchronous completions, and we - // do it before completing blocked/waiting readers, so that when they - // wake up they'll see the task as being completed. - if (completeTask) - { - ChannelUtilities.Complete(parent._completion, error); - } - - // At this point, _blockedReaders and _waitingReaders will not be mutated: - // they're only mutated by readers while holding the lock, and only if _doneWriting is null. - // freely manipulate _blockedReaders and _waitingReaders without any concurrency concerns. - ChannelUtilities.FailOperations, T>(parent._blockedReaders, ChannelUtilities.CreateInvalidCompletionException(error)); - ChannelUtilities.WakeUpWaiters(ref parent._waitingReadersTail, result: false, error: error); - - // Successfully transitioned to completed. - return true; - } - - public override bool TryWrite(T item) - { - UnboundedPrioritizedChannel parent = _parent; - while (true) - { - AsyncOperation? blockedReader = null; - AsyncOperation? waitingReadersTail = null; - lock (parent.SyncObj) - { - // If writing has already been marked as done, fail the write. - parent.AssertInvariants(); - if (parent._doneWriting != null) - { - return false; - } - - // If there aren't any blocked readers, just add the data to the queue, - // and let any waiting readers know that they should try to read it. - // We can only complete such waiters here under the lock if they run - // continuations asynchronously (otherwise the synchronous continuations - // could be invoked under the lock). If we don't complete them here, we - // need to do so outside of the lock. - if (parent._blockedReaders.IsEmpty) - { - parent._items.Enqueue(true, item); - waitingReadersTail = parent._waitingReadersTail; - if (waitingReadersTail == null) - { - return true; - } - parent._waitingReadersTail = null; - } - else - { - // There were blocked readers. Grab one, and then complete it outside of the lock. - blockedReader = parent._blockedReaders.DequeueHead(); - } - } - - if (blockedReader != null) - { - // Complete the reader. It's possible the reader was canceled, in which - // case we loop around to try everything again. - if (blockedReader.TrySetResult(item)) - { - return true; - } - } - else - { - // Wake up all of the waiters. Since we've released the lock, it's possible - // we could cause some spurious wake-ups here, if we tell a waiter there's - // something available but all data has already been removed. It's a benign - // race condition, though, as consumers already need to account for such things. - ChannelUtilities.WakeUpWaiters(ref waitingReadersTail, result: true); - return true; - } - } - } - - public override ValueTask WaitToWriteAsync(CancellationToken cancellationToken) - { - Exception? doneWriting = _parent._doneWriting; - return - cancellationToken.IsCancellationRequested ? ValueTask.FromCanceled(cancellationToken) : - doneWriting == null ? new ValueTask(true) : // unbounded writing can always be done if we haven't completed - doneWriting != ChannelUtilities.s_doneWritingSentinel ? ValueTask.FromException(doneWriting) : - default; - } - - public override ValueTask WriteAsync(T item, CancellationToken cancellationToken) => - cancellationToken.IsCancellationRequested ? ValueTask.FromCanceled(cancellationToken) : - TryWrite(item) ? default : - ValueTask.FromException(ChannelUtilities.CreateInvalidCompletionException(_parent._doneWriting)); - - /// Gets the number of items in the channel. This should only be used by the debugger. - private int ItemsCountForDebugger => _parent._items.Count; - - /// Gets an enumerator the debugger can use to show the contents of the channel. - IEnumerator IDebugEnumerable.GetEnumerator() => _parent.GetEnumerator(); - } - - /// Gets the object used to synchronize access to all state on this instance. - private object SyncObj => _items; - - [Conditional("DEBUG")] - private void AssertInvariants() - { - Debug.Assert(SyncObj != null, "The sync obj must not be null."); - Debug.Assert(Monitor.IsEntered(SyncObj), "Invariants can only be validated while holding the lock."); - - if (_items.Count != 0) - { - if (_runContinuationsAsynchronously) - { - Debug.Assert(_blockedReaders.IsEmpty, "There's data available, so there shouldn't be any blocked readers."); - Debug.Assert(_waitingReadersTail == null, "There's data available, so there shouldn't be any waiting readers."); - } - Debug.Assert(!_completion.Task.IsCompleted, "We still have data available, so shouldn't be completed."); - } - if ((!_blockedReaders.IsEmpty || _waitingReadersTail != null) && _runContinuationsAsynchronously) - { - Debug.Assert(_items.Count == 0, "There are blocked/waiting readers, so there shouldn't be any data available."); - } - if (_completion.Task.IsCompleted) - { - Debug.Assert(_doneWriting != null, "We're completed, so we must be done writing."); - } - } - - /// Gets the number of items in the channel. This should only be used by the debugger. - private int ItemsCountForDebugger => _items.Count; - - /// Report if the channel is closed or not. This should only be used by the debugger. - private bool ChannelIsClosedForDebugger => _doneWriting != null; - - /// Gets an enumerator the debugger can use to show the contents of the channel. - public IEnumerator GetEnumerator() - { - List list = []; - foreach ((bool _, T Priority) item in _items.UnorderedItems) - { - list.Add(item.Priority); - } - - list.Sort(_items.Comparer); - - return list.GetEnumerator(); - } - } -}