From 6799e3964ee48b9ef2f9d82f763ac9b3c22f4637 Mon Sep 17 00:00:00 2001 From: Stephen Toub Date: Tue, 2 Apr 2024 15:58:37 -0400 Subject: [PATCH] Add Channel.CreateUnboundedPrioritized --- .../ref/System.Threading.Channels.csproj | 6 +- .../System.Threading.Channels.netcoreapp.cs | 15 +- ...System.Threading.Channels.netstandard21.cs | 17 + .../src/System.Threading.Channels.csproj | 42 +- .../src/System/Threading/Channels/Channel.cs | 2 +- .../Threading/Channels/Channel.netcoreapp.cs | 36 ++ .../Threading/Channels/ChannelOptions.cs | 4 +- .../Channels/ChannelOptions.netcoreapp.cs | 14 + .../Channels/UnboundedPriorityChannel.cs | 369 ++++++++++++++++++ .../tests/PriorityUnboundedChannelTests.cs | 108 +++++ .../System.Threading.Channels.Tests.csproj | 5 +- 11 files changed, 587 insertions(+), 31 deletions(-) create mode 100644 src/libraries/System.Threading.Channels/ref/System.Threading.Channels.netstandard21.cs create mode 100644 src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel.netcoreapp.cs create mode 100644 src/libraries/System.Threading.Channels/src/System/Threading/Channels/ChannelOptions.netcoreapp.cs create mode 100644 src/libraries/System.Threading.Channels/src/System/Threading/Channels/UnboundedPriorityChannel.cs create mode 100644 src/libraries/System.Threading.Channels/tests/PriorityUnboundedChannelTests.cs diff --git a/src/libraries/System.Threading.Channels/ref/System.Threading.Channels.csproj b/src/libraries/System.Threading.Channels/ref/System.Threading.Channels.csproj index fae9b57404976..eb088734e55ab 100644 --- a/src/libraries/System.Threading.Channels/ref/System.Threading.Channels.csproj +++ b/src/libraries/System.Threading.Channels/ref/System.Threading.Channels.csproj @@ -5,13 +5,17 @@ - + + + + + diff --git a/src/libraries/System.Threading.Channels/ref/System.Threading.Channels.netcoreapp.cs b/src/libraries/System.Threading.Channels/ref/System.Threading.Channels.netcoreapp.cs index c591bf9888138..37492efd644ba 100644 --- a/src/libraries/System.Threading.Channels/ref/System.Threading.Channels.netcoreapp.cs +++ b/src/libraries/System.Threading.Channels/ref/System.Threading.Channels.netcoreapp.cs @@ -4,14 +4,17 @@ // Changes to this file must follow the https://aka.ms/api-review process. // ------------------------------------------------------------------------------ +using System.Collections.Generic; + namespace System.Threading.Channels { - public partial class ChannelClosedException : System.InvalidOperationException + public partial class Channel + { + public static System.Threading.Channels.Channel CreateUnboundedPrioritized() { throw null; } + public static System.Threading.Channels.Channel CreateUnboundedPrioritized(System.Threading.Channels.UnboundedPrioritizedChannelOptions options) { throw null; } + } + public sealed partial class UnboundedPrioritizedChannelOptions : System.Threading.Channels.ChannelOptions { -#if NET8_0_OR_GREATER - [System.ObsoleteAttribute("This API supports obsolete formatter-based serialization. It should not be called or extended by application code.", DiagnosticId = "SYSLIB0051", UrlFormat = "https://aka.ms/dotnet-warnings/{0}")] - [System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)] -#endif - protected ChannelClosedException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) { } + public System.Collections.Generic.IComparer? Comparer { get; set; } } } diff --git a/src/libraries/System.Threading.Channels/ref/System.Threading.Channels.netstandard21.cs b/src/libraries/System.Threading.Channels/ref/System.Threading.Channels.netstandard21.cs new file mode 100644 index 0000000000000..c591bf9888138 --- /dev/null +++ b/src/libraries/System.Threading.Channels/ref/System.Threading.Channels.netstandard21.cs @@ -0,0 +1,17 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// ------------------------------------------------------------------------------ +// Changes to this file must follow the https://aka.ms/api-review process. +// ------------------------------------------------------------------------------ + +namespace System.Threading.Channels +{ + public partial class ChannelClosedException : System.InvalidOperationException + { +#if NET8_0_OR_GREATER + [System.ObsoleteAttribute("This API supports obsolete formatter-based serialization. It should not be called or extended by application code.", DiagnosticId = "SYSLIB0051", UrlFormat = "https://aka.ms/dotnet-warnings/{0}")] + [System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)] +#endif + protected ChannelClosedException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) { } + } +} diff --git a/src/libraries/System.Threading.Channels/src/System.Threading.Channels.csproj b/src/libraries/System.Threading.Channels/src/System.Threading.Channels.csproj index ac68cda0be662..21e101ecd6d94 100644 --- a/src/libraries/System.Threading.Channels/src/System.Threading.Channels.csproj +++ b/src/libraries/System.Threading.Channels/src/System.Threading.Channels.csproj @@ -14,16 +14,10 @@ System.Threading.Channel<T> - - - @@ -32,21 +26,25 @@ System.Threading.Channel<T> - - - - - - - + + + + + + + + + + + + + + + + + + @@ -57,6 +55,10 @@ System.Threading.Channel<T> + + + + diff --git a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel.cs b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel.cs index 1e260f4c871a0..317636579a05f 100644 --- a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel.cs +++ b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel.cs @@ -4,7 +4,7 @@ namespace System.Threading.Channels { /// Provides static methods for creating channels. - public static class Channel + public static partial class Channel { /// Creates an unbounded channel usable by any number of readers and writers concurrently. /// The created channel. diff --git a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel.netcoreapp.cs b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel.netcoreapp.cs new file mode 100644 index 0000000000000..6c24b3e41ec7b --- /dev/null +++ b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel.netcoreapp.cs @@ -0,0 +1,36 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Collections.Generic; + +namespace System.Threading.Channels +{ + /// Provides static methods for creating channels. + public static partial class Channel + { + /// Creates an unbounded prioritized channel usable by any number of readers and writers concurrently. + /// The created channel. + /// + /// is used to determine priority of elements. + /// The next item read from the channel will be the element available in the channel with the lowest priority value. + /// + public static Channel CreateUnboundedPrioritized() => + new UnboundedPrioritizedChannel(runContinuationsAsynchronously: true, comparer: null); + + /// Creates an unbounded prioritized channel subject to the provided options. + /// Specifies the type of data in the channel. + /// Options that guide the behavior of the channel. + /// The created channel. + /// + /// The supplied ' is used to determine priority of elements, + /// or if the provided comparer is null. + /// The next item read from the channel will be the element available in the channel with the lowest priority value. + /// + public static Channel CreateUnboundedPrioritized(UnboundedPrioritizedChannelOptions options) + { + ArgumentNullException.ThrowIfNull(options); + + return new UnboundedPrioritizedChannel(!options.AllowSynchronousContinuations, options.Comparer); + } + } +} diff --git a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/ChannelOptions.cs b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/ChannelOptions.cs index bbde42638c845..ab3a722bb909d 100644 --- a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/ChannelOptions.cs +++ b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/ChannelOptions.cs @@ -39,7 +39,7 @@ public abstract class ChannelOptions public bool AllowSynchronousContinuations { get; set; } } - /// Provides options that control the behavior of instances. + /// Provides options that control the behavior of instances created by . public sealed class BoundedChannelOptions : ChannelOptions { /// The maximum number of items the bounded channel may store. @@ -94,7 +94,7 @@ public BoundedChannelFullMode FullMode } } - /// Provides options that control the behavior of instances. + /// Provides options that control the behavior of instances created by . public sealed class UnboundedChannelOptions : ChannelOptions { } diff --git a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/ChannelOptions.netcoreapp.cs b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/ChannelOptions.netcoreapp.cs new file mode 100644 index 0000000000000..bfa672fa8a931 --- /dev/null +++ b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/ChannelOptions.netcoreapp.cs @@ -0,0 +1,14 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Collections.Generic; + +namespace System.Threading.Channels +{ + /// Provides options that control the behavior of instances created by . + public sealed class UnboundedPrioritizedChannelOptions : ChannelOptions + { + /// Gets or sets the comparer used by the channel to prioritize elements. + public IComparer? Comparer { get; set; } + } +} diff --git a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/UnboundedPriorityChannel.cs b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/UnboundedPriorityChannel.cs new file mode 100644 index 0000000000000..7af18b9413ee2 --- /dev/null +++ b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/UnboundedPriorityChannel.cs @@ -0,0 +1,369 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Collections.Generic; +using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; +using System.Threading.Tasks; + +// This file is primarily a copy of UnboundedChannel, subsequently tweaked to account for differences +// between ConcurrentQueue and PriorityQueue, e.g. that PQ isn't thread safe and so fast +// paths outside of locks need to be removed, that Enqueue/Dequeue methods take priorities, etc. Any +// changes made to this or that file should largely be kept in sync. + +namespace System.Threading.Channels +{ + /// Provides a buffered channel of unbounded capacity. + [DebuggerDisplay("Items = {ItemsCountForDebugger}, Closed = {ChannelIsClosedForDebugger}")] + [DebuggerTypeProxy(typeof(DebugEnumeratorDebugView<>))] + internal sealed class UnboundedPrioritizedChannel : Channel, IDebugEnumerable + { + /// Task that indicates the channel has completed. + private readonly TaskCompletionSource _completion; + /// The items in the channel. + /// To avoid double storing of a potentially large struct T, the priority doubles as the element and the element is ignored. + private readonly PriorityQueue _items; + /// Readers blocked reading from the channel. + private readonly Deque> _blockedReaders = new Deque>(); + /// Whether to force continuations to be executed asynchronously from producer writes. + private readonly bool _runContinuationsAsynchronously; + + /// Readers waiting for a notification that data is available. + private AsyncOperation? _waitingReadersTail; + /// Set to non-null once Complete has been called. + private Exception? _doneWriting; + + /// Initialize the channel. + internal UnboundedPrioritizedChannel(bool runContinuationsAsynchronously, IComparer? comparer) + { + _runContinuationsAsynchronously = runContinuationsAsynchronously; + _completion = new TaskCompletionSource(runContinuationsAsynchronously ? TaskCreationOptions.RunContinuationsAsynchronously : TaskCreationOptions.None); + Reader = new UnboundedPrioritizedChannelReader(this); + Writer = new UnboundedPrioritizedChannelWriter(this); + _items = new PriorityQueue(comparer); + } + + [DebuggerDisplay("Items = {Count}")] + [DebuggerTypeProxy(typeof(DebugEnumeratorDebugView<>))] + private sealed class UnboundedPrioritizedChannelReader : ChannelReader, IDebugEnumerable + { + internal readonly UnboundedPrioritizedChannel _parent; + private readonly AsyncOperation _readerSingleton; + private readonly AsyncOperation _waiterSingleton; + + internal UnboundedPrioritizedChannelReader(UnboundedPrioritizedChannel parent) + { + _parent = parent; + _readerSingleton = new AsyncOperation(parent._runContinuationsAsynchronously, pooled: true); + _waiterSingleton = new AsyncOperation(parent._runContinuationsAsynchronously, pooled: true); + } + + public override Task Completion => _parent._completion.Task; + + public override bool CanCount => true; + + public override bool CanPeek => true; + + public override int Count => _parent._items.Count; + + public override ValueTask ReadAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return ValueTask.FromCanceled(cancellationToken); + } + + // Dequeue an item if we can. + UnboundedPrioritizedChannel parent = _parent; + lock (parent.SyncObj) + { + parent.AssertInvariants(); + + // Try to dequeue again, now that we hold the lock. + if (parent._items.TryDequeue(out _, out T? item)) + { + CompleteIfDone(parent); + return new ValueTask(item); + } + + // There are no items, so if we're done writing, fail. + if (parent._doneWriting != null) + { + return ChannelUtilities.GetInvalidCompletionValueTask(parent._doneWriting); + } + + // If we're able to use the singleton reader, do so. + if (!cancellationToken.CanBeCanceled) + { + AsyncOperation singleton = _readerSingleton; + if (singleton.TryOwnAndReset()) + { + parent._blockedReaders.EnqueueTail(singleton); + return singleton.ValueTaskOfT; + } + } + + // Otherwise, create and queue a reader. + var reader = new AsyncOperation(parent._runContinuationsAsynchronously, cancellationToken); + parent._blockedReaders.EnqueueTail(reader); + return reader.ValueTaskOfT; + } + } + + public override bool TryRead([MaybeNullWhen(false)] out T item) + { + UnboundedPrioritizedChannel parent = _parent; + lock (parent.SyncObj) + { + // Dequeue an item if we can + if (parent._items.TryDequeue(out _, out item)) + { + CompleteIfDone(parent); + return true; + } + + item = default; + return false; + } + } + + public override bool TryPeek([MaybeNullWhen(false)] out T item) => + _parent._items.TryPeek(out _, out item); + + private static void CompleteIfDone(UnboundedPrioritizedChannel parent) + { + Debug.Assert(Monitor.IsEntered(parent.SyncObj)); + + if (parent._doneWriting != null && parent._items.Count == 0) + { + // If we've now emptied the items queue and we're not getting any more, complete. + ChannelUtilities.Complete(parent._completion, parent._doneWriting); + } + } + + public override ValueTask WaitToReadAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return ValueTask.FromCanceled(cancellationToken); + } + + UnboundedPrioritizedChannel parent = _parent; + lock (parent.SyncObj) + { + parent.AssertInvariants(); + + // Try again to read now that we're synchronized with writers. + if (parent._items.Count != 0) + { + return new ValueTask(true); + } + + // There are no items, so if we're done writing, there's never going to be data available. + if (parent._doneWriting != null) + { + return parent._doneWriting != ChannelUtilities.s_doneWritingSentinel ? + ValueTask.FromException(parent._doneWriting) : + default; + } + + // If we're able to use the singleton waiter, do so. + if (!cancellationToken.CanBeCanceled) + { + AsyncOperation singleton = _waiterSingleton; + if (singleton.TryOwnAndReset()) + { + ChannelUtilities.QueueWaiter(ref parent._waitingReadersTail, singleton); + return singleton.ValueTaskOfT; + } + } + + // Otherwise, create and queue a waiter. + var waiter = new AsyncOperation(parent._runContinuationsAsynchronously, cancellationToken); + ChannelUtilities.QueueWaiter(ref parent._waitingReadersTail, waiter); + return waiter.ValueTaskOfT; + } + } + + /// Gets an enumerator the debugger can use to show the contents of the channel. + IEnumerator IDebugEnumerable.GetEnumerator() => _parent.GetEnumerator(); + } + + [DebuggerDisplay("Items = {ItemsCountForDebugger}")] + [DebuggerTypeProxy(typeof(DebugEnumeratorDebugView<>))] + private sealed class UnboundedPrioritizedChannelWriter : ChannelWriter, IDebugEnumerable + { + internal readonly UnboundedPrioritizedChannel _parent; + internal UnboundedPrioritizedChannelWriter(UnboundedPrioritizedChannel parent) => _parent = parent; + + public override bool TryComplete(Exception? error) + { + UnboundedPrioritizedChannel parent = _parent; + bool completeTask; + + lock (parent.SyncObj) + { + parent.AssertInvariants(); + + // If we've already marked the channel as completed, bail. + if (parent._doneWriting != null) + { + return false; + } + + // Mark that we're done writing. + parent._doneWriting = error ?? ChannelUtilities.s_doneWritingSentinel; + completeTask = parent._items.Count == 0; + } + + // If there are no items in the queue, complete the channel's task, + // as no more data can possibly arrive at this point. We do this outside + // of the lock in case we'll be running synchronous completions, and we + // do it before completing blocked/waiting readers, so that when they + // wake up they'll see the task as being completed. + if (completeTask) + { + ChannelUtilities.Complete(parent._completion, error); + } + + // At this point, _blockedReaders and _waitingReaders will not be mutated: + // they're only mutated by readers while holding the lock, and only if _doneWriting is null. + // freely manipulate _blockedReaders and _waitingReaders without any concurrency concerns. + ChannelUtilities.FailOperations, T>(parent._blockedReaders, ChannelUtilities.CreateInvalidCompletionException(error)); + ChannelUtilities.WakeUpWaiters(ref parent._waitingReadersTail, result: false, error: error); + + // Successfully transitioned to completed. + return true; + } + + public override bool TryWrite(T item) + { + UnboundedPrioritizedChannel parent = _parent; + while (true) + { + AsyncOperation? blockedReader = null; + AsyncOperation? waitingReadersTail = null; + lock (parent.SyncObj) + { + // If writing has already been marked as done, fail the write. + parent.AssertInvariants(); + if (parent._doneWriting != null) + { + return false; + } + + // If there aren't any blocked readers, just add the data to the queue, + // and let any waiting readers know that they should try to read it. + // We can only complete such waiters here under the lock if they run + // continuations asynchronously (otherwise the synchronous continuations + // could be invoked under the lock). If we don't complete them here, we + // need to do so outside of the lock. + if (parent._blockedReaders.IsEmpty) + { + parent._items.Enqueue(true, item); + waitingReadersTail = parent._waitingReadersTail; + if (waitingReadersTail == null) + { + return true; + } + parent._waitingReadersTail = null; + } + else + { + // There were blocked readers. Grab one, and then complete it outside of the lock. + blockedReader = parent._blockedReaders.DequeueHead(); + } + } + + if (blockedReader != null) + { + // Complete the reader. It's possible the reader was canceled, in which + // case we loop around to try everything again. + if (blockedReader.TrySetResult(item)) + { + return true; + } + } + else + { + // Wake up all of the waiters. Since we've released the lock, it's possible + // we could cause some spurious wake-ups here, if we tell a waiter there's + // something available but all data has already been removed. It's a benign + // race condition, though, as consumers already need to account for such things. + ChannelUtilities.WakeUpWaiters(ref waitingReadersTail, result: true); + return true; + } + } + } + + public override ValueTask WaitToWriteAsync(CancellationToken cancellationToken) + { + Exception? doneWriting = _parent._doneWriting; + return + cancellationToken.IsCancellationRequested ? ValueTask.FromCanceled(cancellationToken) : + doneWriting == null ? new ValueTask(true) : // unbounded writing can always be done if we haven't completed + doneWriting != ChannelUtilities.s_doneWritingSentinel ? ValueTask.FromException(doneWriting) : + default; + } + + public override ValueTask WriteAsync(T item, CancellationToken cancellationToken) => + cancellationToken.IsCancellationRequested ? ValueTask.FromCanceled(cancellationToken) : + TryWrite(item) ? default : + ValueTask.FromException(ChannelUtilities.CreateInvalidCompletionException(_parent._doneWriting)); + + /// Gets the number of items in the channel. This should only be used by the debugger. + private int ItemsCountForDebugger => _parent._items.Count; + + /// Gets an enumerator the debugger can use to show the contents of the channel. + IEnumerator IDebugEnumerable.GetEnumerator() => _parent.GetEnumerator(); + } + + /// Gets the object used to synchronize access to all state on this instance. + private object SyncObj => _items; + + [Conditional("DEBUG")] + private void AssertInvariants() + { + Debug.Assert(SyncObj != null, "The sync obj must not be null."); + Debug.Assert(Monitor.IsEntered(SyncObj), "Invariants can only be validated while holding the lock."); + + if (_items.Count != 0) + { + if (_runContinuationsAsynchronously) + { + Debug.Assert(_blockedReaders.IsEmpty, "There's data available, so there shouldn't be any blocked readers."); + Debug.Assert(_waitingReadersTail == null, "There's data available, so there shouldn't be any waiting readers."); + } + Debug.Assert(!_completion.Task.IsCompleted, "We still have data available, so shouldn't be completed."); + } + if ((!_blockedReaders.IsEmpty || _waitingReadersTail != null) && _runContinuationsAsynchronously) + { + Debug.Assert(_items.Count == 0, "There are blocked/waiting readers, so there shouldn't be any data available."); + } + if (_completion.Task.IsCompleted) + { + Debug.Assert(_doneWriting != null, "We're completed, so we must be done writing."); + } + } + + /// Gets the number of items in the channel. This should only be used by the debugger. + private int ItemsCountForDebugger => _items.Count; + + /// Report if the channel is closed or not. This should only be used by the debugger. + private bool ChannelIsClosedForDebugger => _doneWriting != null; + + /// Gets an enumerator the debugger can use to show the contents of the channel. + public IEnumerator GetEnumerator() + { + List list = []; + foreach ((bool _, T Priority) item in _items.UnorderedItems) + { + list.Add(item.Priority); + } + + list.Sort(_items.Comparer); + + return list.GetEnumerator(); + } + } +} diff --git a/src/libraries/System.Threading.Channels/tests/PriorityUnboundedChannelTests.cs b/src/libraries/System.Threading.Channels/tests/PriorityUnboundedChannelTests.cs new file mode 100644 index 0000000000000..1280db17b3fb1 --- /dev/null +++ b/src/libraries/System.Threading.Channels/tests/PriorityUnboundedChannelTests.cs @@ -0,0 +1,108 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Collections.Generic; +using Xunit; + +namespace System.Threading.Channels.Tests +{ + public class UnboundedPrioritizedChannelTests : UnboundedChannelTests + { + protected override Channel CreateChannel() => Channel.CreateUnboundedPrioritized(new() { AllowSynchronousContinuations = AllowSynchronousContinuations }); + + [Fact] + public void ItemsReadInExpectedOrder_NoComparer() + { + Channel c = CreateChannel(); + + for (int trial = 0; trial < 2; trial++) + { + Assert.True(c.Writer.WriteAsync(new Person { Age = 20 }).IsCompletedSuccessfully); + Assert.Equal(1, c.Reader.Count); + + Assert.True(c.Writer.WriteAsync(new Person { Age = 21 }).IsCompletedSuccessfully); + Assert.Equal(2, c.Reader.Count); + + Assert.True(c.Writer.WriteAsync(new Person { Age = 19 }).IsCompletedSuccessfully); + Assert.Equal(3, c.Reader.Count); + + Assert.True(c.Reader.TryRead(out Person p)); + Assert.Equal(19, p.Age); + Assert.Equal(2, c.Reader.Count); + + Assert.True(c.Writer.WriteAsync(new Person { Age = 18 }).IsCompletedSuccessfully); + Assert.Equal(3, c.Reader.Count); + + Assert.True(c.Writer.WriteAsync(new Person { Age = 22 }).IsCompletedSuccessfully); + Assert.Equal(4, c.Reader.Count); + + Assert.True(c.Reader.TryRead(out p)); + Assert.Equal(18, p.Age); + Assert.Equal(3, c.Reader.Count); + + Assert.True(c.Reader.TryRead(out p)); + Assert.Equal(20, p.Age); + Assert.Equal(2, c.Reader.Count); + + Assert.True(c.Reader.TryRead(out p)); + Assert.Equal(21, p.Age); + Assert.Equal(1, c.Reader.Count); + + Assert.True(c.Reader.TryRead(out p)); + Assert.Equal(22, p.Age); + Assert.Equal(0, c.Reader.Count); + } + } + + [Fact] + public void ItemsReadInExpectedOrder_Comparer() + { + Channel c = Channel.CreateUnboundedPrioritized(new UnboundedPrioritizedChannelOptions { Comparer = Comparer.Create((p1, p2) => p2.Age.CompareTo(p1.Age)) }); + + for (int trial = 0; trial < 2; trial++) + { + Assert.True(c.Writer.WriteAsync(new Person { Age = 20 }).IsCompletedSuccessfully); + Assert.Equal(1, c.Reader.Count); + + Assert.True(c.Writer.WriteAsync(new Person { Age = 21 }).IsCompletedSuccessfully); + Assert.Equal(2, c.Reader.Count); + + Assert.True(c.Writer.WriteAsync(new Person { Age = 19 }).IsCompletedSuccessfully); + Assert.Equal(3, c.Reader.Count); + + Assert.True(c.Reader.TryRead(out Person p)); + Assert.Equal(21, p.Age); + Assert.Equal(2, c.Reader.Count); + + Assert.True(c.Writer.WriteAsync(new Person { Age = 18 }).IsCompletedSuccessfully); + Assert.Equal(3, c.Reader.Count); + + Assert.True(c.Writer.WriteAsync(new Person { Age = 22 }).IsCompletedSuccessfully); + Assert.Equal(4, c.Reader.Count); + + Assert.True(c.Reader.TryRead(out p)); + Assert.Equal(22, p.Age); + Assert.Equal(3, c.Reader.Count); + + Assert.True(c.Reader.TryRead(out p)); + Assert.Equal(20, p.Age); + Assert.Equal(2, c.Reader.Count); + + Assert.True(c.Reader.TryRead(out p)); + Assert.Equal(19, p.Age); + Assert.Equal(1, c.Reader.Count); + + Assert.True(c.Reader.TryRead(out p)); + Assert.Equal(18, p.Age); + Assert.Equal(0, c.Reader.Count); + } + } + + private struct Person : IComparable + { + public int Age { get; set; } + + public int CompareTo(Person other) => Age.CompareTo(other.Age); + } + } +} diff --git a/src/libraries/System.Threading.Channels/tests/System.Threading.Channels.Tests.csproj b/src/libraries/System.Threading.Channels/tests/System.Threading.Channels.Tests.csproj index 7d929a96eade6..7d22b2afdee09 100644 --- a/src/libraries/System.Threading.Channels/tests/System.Threading.Channels.Tests.csproj +++ b/src/libraries/System.Threading.Channels/tests/System.Threading.Channels.Tests.csproj @@ -6,7 +6,6 @@ - @@ -15,6 +14,10 @@ + + + +