diff --git a/src/libraries/Common/src/System/Collections/Concurrent/IProducerConsumerQueue.cs b/src/libraries/Common/src/System/Collections/Concurrent/IProducerConsumerQueue.cs
new file mode 100644
index 0000000000000..7207f03e8222e
--- /dev/null
+++ b/src/libraries/Common/src/System/Collections/Concurrent/IProducerConsumerQueue.cs
@@ -0,0 +1,37 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.Collections.Generic;
+using System.Diagnostics.CodeAnalysis;
+
+namespace System.Collections.Concurrent
+{
+ /// Represents a producer/consumer queue.
+ /// Specifies the type of data contained in the queue.
+ internal interface IProducerConsumerQueue : IEnumerable
+ {
+ /// Enqueues an item into the queue.
+ /// The item to enqueue.
+ /// This method is meant to be thread-safe subject to the particular nature of the implementation.
+ void Enqueue(T item);
+
+ /// Attempts to dequeue an item from the queue.
+ /// The dequeued item.
+ /// true if an item could be dequeued; otherwise, false.
+ /// This method is meant to be thread-safe subject to the particular nature of the implementation.
+ bool TryDequeue([MaybeNullWhen(false)] out T result);
+
+ /// Gets whether the collection is currently empty.
+ /// This method may or may not be thread-safe.
+ bool IsEmpty { get; }
+
+ /// Gets the number of items in the collection.
+ /// In many implementations, this method will not be thread-safe.
+ int Count { get; }
+
+ /// A thread-safe way to get the number of items in the collection. May synchronize access by locking the provided synchronization object.
+ /// The sync object used to lock
+ /// The collection count
+ int GetCountSafe(object syncObj);
+ }
+}
diff --git a/src/libraries/Common/src/System/Collections/Concurrent/MultiProducerMultiConsumerQueue.cs b/src/libraries/Common/src/System/Collections/Concurrent/MultiProducerMultiConsumerQueue.cs
new file mode 100644
index 0000000000000..e9da29081a318
--- /dev/null
+++ b/src/libraries/Common/src/System/Collections/Concurrent/MultiProducerMultiConsumerQueue.cs
@@ -0,0 +1,35 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.Diagnostics;
+using System.Diagnostics.CodeAnalysis;
+
+namespace System.Collections.Concurrent
+{
+ ///
+ /// Provides a producer/consumer queue safe to be used by any number of producers and consumers concurrently.
+ ///
+ /// Specifies the type of data contained in the queue.
+ [DebuggerDisplay("Count = {Count}")]
+ internal sealed class MultiProducerMultiConsumerQueue : ConcurrentQueue, IProducerConsumerQueue
+ {
+ /// Enqueues an item into the queue.
+ /// The item to enqueue.
+ void IProducerConsumerQueue.Enqueue(T item) { base.Enqueue(item); }
+
+ /// Attempts to dequeue an item from the queue.
+ /// The dequeued item.
+ /// true if an item could be dequeued; otherwise, false.
+ bool IProducerConsumerQueue.TryDequeue([MaybeNullWhen(false)] out T result) { return base.TryDequeue(out result); }
+
+ /// Gets whether the collection is currently empty.
+ bool IProducerConsumerQueue.IsEmpty => base.IsEmpty;
+
+ /// Gets the number of items in the collection.
+ int IProducerConsumerQueue.Count => base.Count;
+
+ /// A thread-safe way to get the number of items in the collection. May synchronize access by locking the provided synchronization object.
+ /// ConcurrentQueue.Count is thread safe, no need to acquire the lock.
+ int IProducerConsumerQueue.GetCountSafe(object syncObj) => base.Count;
+ }
+}
diff --git a/src/libraries/Common/src/System/Collections/Concurrent/SingleProducerConsumerQueue.cs b/src/libraries/Common/src/System/Collections/Concurrent/SingleProducerSingleConsumerQueue.cs
similarity index 74%
rename from src/libraries/Common/src/System/Collections/Concurrent/SingleProducerConsumerQueue.cs
rename to src/libraries/Common/src/System/Collections/Concurrent/SingleProducerSingleConsumerQueue.cs
index 37ddfdd2811d3..751c1a33a9900 100644
--- a/src/libraries/Common/src/System/Collections/Concurrent/SingleProducerConsumerQueue.cs
+++ b/src/libraries/Common/src/System/Collections/Concurrent/SingleProducerSingleConsumerQueue.cs
@@ -16,7 +16,7 @@ namespace System.Collections.Concurrent
/// Specifies the type of data contained in the queue.
[DebuggerDisplay("Count = {Count}")]
[DebuggerTypeProxy(typeof(SingleProducerSingleConsumerQueue<>.SingleProducerSingleConsumerQueue_DebugView))]
- internal sealed class SingleProducerSingleConsumerQueue : IEnumerable
+ internal sealed class SingleProducerSingleConsumerQueue : IProducerConsumerQueue
{
// Design:
//
@@ -53,7 +53,7 @@ internal sealed class SingleProducerSingleConsumerQueue : IEnumerable
/// The initial size to use for segments (in number of elements).
private const int InitialSegmentSize = 32; // must be a power of 2
/// The maximum size to use for segments (in number of elements).
- private const int MaxSegmentSize = 0x1000000; // this could be made as large as Int32.MaxValue / 2
+ private const int MaxSegmentSize = 0x1000000; // this could be made as large as int.MaxValue / 2
/// The head of the linked list of segments.
private volatile Segment _head;
@@ -67,7 +67,7 @@ public SingleProducerSingleConsumerQueue()
Debug.Assert(InitialSegmentSize > 0, "Initial segment size must be > 0.");
Debug.Assert((InitialSegmentSize & (InitialSegmentSize - 1)) == 0, "Initial segment size must be a power of 2");
Debug.Assert(InitialSegmentSize <= MaxSegmentSize, "Initial segment size should be <= maximum.");
- Debug.Assert(MaxSegmentSize < int.MaxValue / 2, "Max segment size * 2 must be < Int32.MaxValue, or else overflow could occur.");
+ Debug.Assert(MaxSegmentSize < int.MaxValue / 2, "Max segment size * 2 must be < int.MaxValue, or else overflow could occur.");
// Initialize the queue
_head = _tail = new Segment(InitialSegmentSize);
@@ -87,9 +87,11 @@ public void Enqueue(T item)
{
array[last] = item;
segment._state._last = tail2;
+ return;
}
+
// Slow path: there may not be room in the current segment.
- else EnqueueSlow(item, ref segment);
+ EnqueueSlow(item, ref segment);
}
/// Enqueues an item into the queue.
@@ -106,9 +108,8 @@ private void EnqueueSlow(T item, ref Segment segment)
return;
}
- int newSegmentSize = _tail._array.Length << 1; // double size
+ int newSegmentSize = Math.Min(_tail._array.Length * 2, MaxSegmentSize);
Debug.Assert(newSegmentSize > 0, "The max size should always be small enough that we don't overflow.");
- if (newSegmentSize > MaxSegmentSize) newSegmentSize = MaxSegmentSize;
var newSegment = new Segment(newSegmentSize);
newSegment._array[0] = item;
@@ -118,8 +119,8 @@ private void EnqueueSlow(T item, ref Segment segment)
try { }
finally
{
- // Finally block to protect against corruption due to a thread abort
- // between setting _next and setting _tail.
+ // Finally block to protect against corruption due to a thread abort between
+ // setting _next and setting _tail (this is only relevant on .NET Framework).
Volatile.Write(ref _tail._next, newSegment); // ensure segment not published until item is fully stored
_tail = newSegment;
}
@@ -197,7 +198,7 @@ private bool TryDequeueSlow(Segment segment, T[] array, bool peek, [MaybeNullWhe
if (first == segment._state._last)
{
- result = default!;
+ result = default;
return false;
}
@@ -212,15 +213,105 @@ private bool TryDequeueSlow(Segment segment, T[] array, bool peek, [MaybeNullWhe
return true;
}
+ /// Attempts to dequeue an item from the queue.
+ /// The predicate that must return true for the item to be dequeued. If null, all items implicitly return true.
+ /// The dequeued item.
+ /// true if an item could be dequeued; otherwise, false.
+ public bool TryDequeueIf(Predicate? predicate, [MaybeNullWhen(false)] out T result)
+ {
+ Segment segment = _head;
+ T[] array = segment._array;
+ int first = segment._state._first; // local copy to avoid multiple volatile reads
+
+ // Fast path: there's obviously data available in the current segment
+ if (first != segment._state._lastCopy)
+ {
+ result = array[first];
+ if (predicate == null || predicate(result))
+ {
+ array[first] = default!; // Clear the slot to release the element
+ segment._state._first = (first + 1) & (array.Length - 1);
+ return true;
+ }
+
+ result = default;
+ return false;
+ }
+
+ // Slow path: there may not be data available in the current segment
+ return TryDequeueIfSlow(predicate, segment, array, out result);
+ }
+
+ /// Attempts to dequeue an item from the queue.
+ /// The predicate that must return true for the item to be dequeued. If null, all items implicitly return true.
+ /// The array from which the item was dequeued.
+ /// The segment from which the item was dequeued.
+ /// The dequeued item.
+ /// true if an item could be dequeued; otherwise, false.
+ private bool TryDequeueIfSlow(Predicate? predicate, Segment segment, T[] array, [MaybeNullWhen(false)] out T result)
+ {
+ Debug.Assert(segment != null, "Expected a non-null segment.");
+ Debug.Assert(array != null, "Expected a non-null item array.");
+
+ if (segment._state._last != segment._state._lastCopy)
+ {
+ segment._state._lastCopy = segment._state._last;
+ return TryDequeueIf(predicate, out result); // will only recur once for this dequeue operation
+ }
+
+ if (segment._next != null && segment._state._first == segment._state._last)
+ {
+ segment = segment._next;
+ array = segment._array;
+ _head = segment;
+ }
+
+ int first = segment._state._first; // local copy to avoid extraneous volatile reads
+
+ if (first == segment._state._last)
+ {
+ result = default;
+ return false;
+ }
+
+ result = array[first];
+ if (predicate == null || predicate(result))
+ {
+ array[first] = default!; // Clear the slot to release the element
+ segment._state._first = (first + 1) & (segment._array.Length - 1);
+ segment._state._lastCopy = segment._state._last; // Refresh _lastCopy to ensure that _first has not passed _lastCopy
+ return true;
+ }
+
+ result = default;
+ return false;
+ }
+
+ public void Clear()
+ {
+ while (TryDequeue(out _)) ;
+ }
+
/// Gets whether the collection is currently empty.
+ /// WARNING: This should not be used concurrently without further vetting.
public bool IsEmpty
{
- // This implementation is optimized for calls from the consumer.
get
{
+ // This implementation is optimized for calls from the consumer.
+
Segment head = _head;
- if (head._state._first != head._state._lastCopy) return false; // _first is volatile, so the read of _lastCopy cannot get reordered
- if (head._state._first != head._state._last) return false;
+
+ if (head._state._first != head._state._lastCopy)
+ {
+ return false; // _first is volatile, so the read of _lastCopy cannot get reordered
+ }
+
+ if (head._state._first != head._state._last)
+ {
+ return false;
+ }
+
return head._next == null;
}
}
@@ -239,12 +330,13 @@ public IEnumerator GetEnumerator()
}
}
}
-
- IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
+ /// Gets an enumerable for the collection.
+ /// This method is not safe to use concurrently with any other members that may mutate the collection.
+ IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); }
/// Gets the number of items in the collection.
/// This method is not safe to use concurrently with any other members that may mutate the collection.
- internal int Count
+ public int Count
{
get
{
@@ -257,14 +349,29 @@ internal int Count
{
first = segment._state._first;
last = segment._state._last;
- if (first == segment._state._first) break;
+ if (first == segment._state._first)
+ {
+ break;
+ }
}
+
count += (last - first) & (arraySize - 1);
}
return count;
}
}
+ /// A thread-safe way to get the number of items in the collection. May synchronize access by locking the provided synchronization object.
+ /// The Count is not thread safe, so we need to acquire the lock.
+ int IProducerConsumerQueue.GetCountSafe(object syncObj)
+ {
+ Debug.Assert(syncObj != null, "The syncObj parameter is null.");
+ lock (syncObj)
+ {
+ return Count;
+ }
+ }
+
/// A segment in the queue containing one or more items.
[StructLayout(LayoutKind.Sequential)]
private sealed class Segment
diff --git a/src/libraries/System.Private.CoreLib/src/System.Private.CoreLib.Shared.projitems b/src/libraries/System.Private.CoreLib/src/System.Private.CoreLib.Shared.projitems
index 9d02fee390685..24208f62b29c4 100644
--- a/src/libraries/System.Private.CoreLib/src/System.Private.CoreLib.Shared.projitems
+++ b/src/libraries/System.Private.CoreLib/src/System.Private.CoreLib.Shared.projitems
@@ -1115,7 +1115,6 @@
-
@@ -1275,6 +1274,15 @@
Common\System\SR.cs
+
+ System\Collections\Concurrent\IProducerConsumerQueue.cs
+
+
+ System\Collections\Concurrent\MultiProducerMultiConsumerQueue.cs
+
+
+ System\Collections\Concurrent\SingleProducerSingleConsumerQueue.cs
+
Common\System\Collections\Generic\EnumerableHelpers.cs
diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/ConcurrentExclusiveSchedulerPair.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/ConcurrentExclusiveSchedulerPair.cs
index 29a9d1e4bfa3f..a8c15f84f19a9 100644
--- a/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/ConcurrentExclusiveSchedulerPair.cs
+++ b/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/ConcurrentExclusiveSchedulerPair.cs
@@ -13,6 +13,7 @@
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
+using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/ProducerConsumerQueues.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/ProducerConsumerQueues.cs
deleted file mode 100644
index 83bef50e8b204..0000000000000
--- a/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/ProducerConsumerQueues.cs
+++ /dev/null
@@ -1,362 +0,0 @@
-// Licensed to the .NET Foundation under one or more agreements.
-// The .NET Foundation licenses this file to you under the MIT license.
-
-// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
-//
-//
-//
-// Specialized producer/consumer queues.
-//
-//
-// *************************
-//
-// src\ndp\clr\src\bcl\system\threading\tasks\producerConsumerQueue.cs
-// src\ndp\fx\src\dataflow\system\threading\tasks\dataflow\internal\producerConsumerQueue.cs
-// Keep both of them consistent by changing the other file when you change this one, also avoid:
-// 1- To reference interneal types in mscorlib
-// 2- To reference any dataflow specific types
-// This should be fixed post Dev11 when this class becomes public.
-//
-// *************************
-// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
-
-using System.Collections;
-using System.Collections.Concurrent;
-using System.Collections.Generic;
-using System.Diagnostics;
-using System.Diagnostics.CodeAnalysis;
-using System.Runtime.InteropServices;
-
-namespace System.Threading.Tasks
-{
- /// Represents a producer/consumer queue used internally by dataflow blocks.
- /// Specifies the type of data contained in the queue.
- internal interface IProducerConsumerQueue : IEnumerable
- {
- /// Enqueues an item into the queue.
- /// The item to enqueue.
- /// This method is meant to be thread-safe subject to the particular nature of the implementation.
- void Enqueue(T item);
-
- /// Attempts to dequeue an item from the queue.
- /// The dequeued item.
- /// true if an item could be dequeued; otherwise, false.
- /// This method is meant to be thread-safe subject to the particular nature of the implementation.
- bool TryDequeue([MaybeNullWhen(false)] out T result);
-
- /// Gets whether the collection is currently empty.
- /// This method may or may not be thread-safe.
- bool IsEmpty { get; }
-
- /// Gets the number of items in the collection.
- /// In many implementations, this method will not be thread-safe.
- int Count { get; }
- }
-
- ///
- /// Provides a producer/consumer queue safe to be used by any number of producers and consumers concurrently.
- ///
- /// Specifies the type of data contained in the queue.
- [DebuggerDisplay("Count = {Count}")]
- internal sealed class MultiProducerMultiConsumerQueue : ConcurrentQueue, IProducerConsumerQueue
- {
- /// Enqueues an item into the queue.
- /// The item to enqueue.
- void IProducerConsumerQueue.Enqueue(T item) { base.Enqueue(item); }
-
- /// Attempts to dequeue an item from the queue.
- /// The dequeued item.
- /// true if an item could be dequeued; otherwise, false.
- bool IProducerConsumerQueue.TryDequeue([MaybeNullWhen(false)] out T result) { return base.TryDequeue(out result); }
-
- /// Gets whether the collection is currently empty.
- bool IProducerConsumerQueue.IsEmpty => base.IsEmpty;
-
- /// Gets the number of items in the collection.
- int IProducerConsumerQueue.Count => base.Count;
- }
-
- ///
- /// Provides a producer/consumer queue safe to be used by only one producer and one consumer concurrently.
- ///
- /// Specifies the type of data contained in the queue.
- [DebuggerDisplay("Count = {Count}")]
- [DebuggerTypeProxy(typeof(SingleProducerSingleConsumerQueue<>.SingleProducerSingleConsumerQueue_DebugView))]
- internal sealed class SingleProducerSingleConsumerQueue : IProducerConsumerQueue
- {
- // Design:
- //
- // SingleProducerSingleConsumerQueue (SPSCQueue) is a concurrent queue designed to be used
- // by one producer thread and one consumer thread. SPSCQueue does not work correctly when used by
- // multiple producer threads concurrently or multiple consumer threads concurrently.
- //
- // SPSCQueue is based on segments that behave like circular buffers. Each circular buffer is represented
- // as an array with two indexes: m_first and m_last. m_first is the index of the array slot for the consumer
- // to read next, and m_last is the slot for the producer to write next. The circular buffer is empty when
- // (m_first == m_last), and full when ((m_last+1) % m_array.Length == m_first).
- //
- // Since m_first is only ever modified by the consumer thread and m_last by the producer, the two indices can
- // be updated without interlocked operations. As long as the queue size fits inside a single circular buffer,
- // enqueues and dequeues simply advance the corresponding indices around the circular buffer. If an enqueue finds
- // that there is no room in the existing buffer, however, a new circular buffer is allocated that is twice as big
- // as the old buffer. From then on, the producer will insert values into the new buffer. The consumer will first
- // empty out the old buffer and only then follow the producer into the new (larger) buffer.
- //
- // As described above, the enqueue operation on the fast path only modifies the m_first field of the current segment.
- // However, it also needs to read m_last in order to verify that there is room in the current segment. Similarly, the
- // dequeue operation on the fast path only needs to modify m_last, but also needs to read m_first to verify that the
- // queue is non-empty. This results in true cache line sharing between the producer and the consumer.
- //
- // The cache line sharing issue can be mitigating by having a possibly stale copy of m_first that is owned by the producer,
- // and a possibly stale copy of m_last that is owned by the consumer. So, the consumer state is described using
- // (m_first, m_lastCopy) and the producer state using (m_firstCopy, m_last). The consumer state is separated from
- // the producer state by padding, which allows fast-path enqueues and dequeues from hitting shared cache lines.
- // m_lastCopy is the consumer's copy of m_last. Whenever the consumer can tell that there is room in the buffer
- // simply by observing m_lastCopy, the consumer thread does not need to read m_last and thus encounter a cache miss. Only
- // when the buffer appears to be empty will the consumer refresh m_lastCopy from m_last. m_firstCopy is used by the producer
- // in the same way to avoid reading m_first on the hot path.
-
- /// The initial size to use for segments (in number of elements).
- private const int INIT_SEGMENT_SIZE = 32; // must be a power of 2
- /// The maximum size to use for segments (in number of elements).
- private const int MAX_SEGMENT_SIZE = 0x1000000; // this could be made as large as int.MaxValue / 2
-
- /// The head of the linked list of segments.
- private volatile Segment m_head;
- /// The tail of the linked list of segments.
- private volatile Segment m_tail;
-
- /// Initializes the queue.
- internal SingleProducerSingleConsumerQueue()
- {
- // Validate constants in ctor rather than in an explicit cctor that would cause perf degradation
- Debug.Assert(INIT_SEGMENT_SIZE > 0, "Initial segment size must be > 0.");
- Debug.Assert((INIT_SEGMENT_SIZE & (INIT_SEGMENT_SIZE - 1)) == 0, "Initial segment size must be a power of 2");
- Debug.Assert(INIT_SEGMENT_SIZE <= MAX_SEGMENT_SIZE, "Initial segment size should be <= maximum.");
- Debug.Assert(MAX_SEGMENT_SIZE < int.MaxValue / 2, "Max segment size * 2 must be < int.MaxValue, or else overflow could occur.");
-
- // Initialize the queue
- m_head = m_tail = new Segment(INIT_SEGMENT_SIZE);
- }
-
- /// Enqueues an item into the queue.
- /// The item to enqueue.
- public void Enqueue(T item)
- {
- Segment segment = m_tail;
- T[] array = segment.m_array;
- int last = segment.m_state.m_last; // local copy to avoid multiple volatile reads
-
- // Fast path: there's obviously room in the current segment
- int tail2 = (last + 1) & (array.Length - 1);
- if (tail2 != segment.m_state.m_firstCopy)
- {
- array[last] = item;
- segment.m_state.m_last = tail2;
- }
- // Slow path: there may not be room in the current segment.
- else EnqueueSlow(item, ref segment);
- }
-
- /// Enqueues an item into the queue.
- /// The item to enqueue.
- /// The segment in which to first attempt to store the item.
- private void EnqueueSlow(T item, ref Segment segment)
- {
- Debug.Assert(segment != null, "Expected a non-null segment.");
-
- if (segment.m_state.m_firstCopy != segment.m_state.m_first)
- {
- segment.m_state.m_firstCopy = segment.m_state.m_first;
- Enqueue(item); // will only recur once for this enqueue operation
- return;
- }
-
- int newSegmentSize = m_tail.m_array.Length << 1; // double size
- Debug.Assert(newSegmentSize > 0, "The max size should always be small enough that we don't overflow.");
- if (newSegmentSize > MAX_SEGMENT_SIZE) newSegmentSize = MAX_SEGMENT_SIZE;
-
- var newSegment = new Segment(newSegmentSize);
- newSegment.m_array[0] = item;
- newSegment.m_state.m_last = 1;
- newSegment.m_state.m_lastCopy = 1;
-
- Volatile.Write(ref m_tail.m_next, newSegment); // ensure segment not published until item is fully stored
- m_tail = newSegment;
- }
-
- /// Attempts to dequeue an item from the queue.
- /// The dequeued item.
- /// true if an item could be dequeued; otherwise, false.
- public bool TryDequeue([MaybeNullWhen(false)] out T result)
- {
- Segment segment = m_head;
- T[] array = segment.m_array;
- int first = segment.m_state.m_first; // local copy to avoid multiple volatile reads
-
- // Fast path: there's obviously data available in the current segment
- if (first != segment.m_state.m_lastCopy)
- {
- result = array[first];
- array[first] = default!; // Clear the slot to release the element
- segment.m_state.m_first = (first + 1) & (array.Length - 1);
- return true;
- }
- // Slow path: there may not be data available in the current segment
- else return TryDequeueSlow(ref segment, ref array, out result);
- }
-
- /// Attempts to dequeue an item from the queue.
- /// The array from which the item was dequeued.
- /// The segment from which the item was dequeued.
- /// The dequeued item.
- /// true if an item could be dequeued; otherwise, false.
- private bool TryDequeueSlow(ref Segment segment, ref T[] array, [MaybeNullWhen(false)] out T result)
- {
- Debug.Assert(segment != null, "Expected a non-null segment.");
- Debug.Assert(array != null, "Expected a non-null item array.");
-
- if (segment.m_state.m_last != segment.m_state.m_lastCopy)
- {
- segment.m_state.m_lastCopy = segment.m_state.m_last;
- return TryDequeue(out result); // will only recur once for this dequeue operation
- }
-
- if (segment.m_next != null && segment.m_state.m_first == segment.m_state.m_last)
- {
- segment = segment.m_next;
- array = segment.m_array;
- m_head = segment;
- }
-
- int first = segment.m_state.m_first; // local copy to avoid extraneous volatile reads
-
- if (first == segment.m_state.m_last)
- {
- result = default;
- return false;
- }
-
- result = array[first];
- array[first] = default!; // Clear the slot to release the element
- segment.m_state.m_first = (first + 1) & (segment.m_array.Length - 1);
- segment.m_state.m_lastCopy = segment.m_state.m_last; // Refresh m_lastCopy to ensure that m_first has not passed m_lastCopy
-
- return true;
- }
-
- /// Gets whether the collection is currently empty.
- /// WARNING: This should not be used concurrently without further vetting.
- public bool IsEmpty
- {
- // This implementation is optimized for calls from the consumer.
- get
- {
- Segment head = m_head;
- if (head.m_state.m_first != head.m_state.m_lastCopy) return false; // m_first is volatile, so the read of m_lastCopy cannot get reordered
- if (head.m_state.m_first != head.m_state.m_last) return false;
- return head.m_next == null;
- }
- }
-
- /// Gets an enumerable for the collection.
- /// WARNING: This should only be used for debugging purposes. It is not safe to be used concurrently.
- public IEnumerator GetEnumerator()
- {
- for (Segment? segment = m_head; segment != null; segment = segment.m_next)
- {
- for (int pt = segment.m_state.m_first;
- pt != segment.m_state.m_last;
- pt = (pt + 1) & (segment.m_array.Length - 1))
- {
- yield return segment.m_array[pt];
- }
- }
- }
- /// Gets an enumerable for the collection.
- /// WARNING: This should only be used for debugging purposes. It is not safe to be used concurrently.
- IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); }
-
- /// Gets the number of items in the collection.
- /// WARNING: This should only be used for debugging purposes. It is not meant to be used concurrently.
- public int Count
- {
- get
- {
- int count = 0;
- for (Segment? segment = m_head; segment != null; segment = segment.m_next)
- {
- int arraySize = segment.m_array.Length;
- int first, last;
- while (true) // Count is not meant to be used concurrently, but this helps to avoid issues if it is
- {
- first = segment.m_state.m_first;
- last = segment.m_state.m_last;
- if (first == segment.m_state.m_first) break;
- }
- count += (last - first) & (arraySize - 1);
- }
- return count;
- }
- }
-
- /// A segment in the queue containing one or more items.
- [StructLayout(LayoutKind.Sequential)]
- private sealed class Segment
- {
- /// The next segment in the linked list of segments.
- internal Segment? m_next;
- /// The data stored in this segment.
- internal readonly T[] m_array;
- /// Details about the segment.
- internal SegmentState m_state; // separated out to enable StructLayout attribute to take effect
-
- /// Initializes the segment.
- /// The size to use for this segment.
- internal Segment(int size)
- {
- Debug.Assert((size & (size - 1)) == 0, "Size must be a power of 2");
- m_array = new T[size];
- }
- }
-
- /// Stores information about a segment.
- [StructLayout(LayoutKind.Sequential)] // enforce layout so that padding reduces false sharing
- private struct SegmentState
- {
- /// Padding to reduce false sharing between the segment's array and m_first.
- internal Internal.PaddingFor32 m_pad0;
-
- /// The index of the current head in the segment.
- internal volatile int m_first;
- /// A copy of the current tail index.
- internal int m_lastCopy; // not volatile as read and written by the producer, except for IsEmpty, and there m_lastCopy is only read after reading the volatile m_first
-
- /// Padding to reduce false sharing between the first and last.
- internal Internal.PaddingFor32 m_pad1;
-
- /// A copy of the current head index.
- internal int m_firstCopy; // not voliatle as only read and written by the consumer thread
- /// The index of the current tail in the segment.
- internal volatile int m_last;
-
- /// Padding to reduce false sharing with the last and what's after the segment.
- internal Internal.PaddingFor32 m_pad2;
- }
-
- /// Debugger type proxy for a SingleProducerSingleConsumerQueue of T.
- private sealed class SingleProducerSingleConsumerQueue_DebugView
- {
- /// The queue being visualized.
- private readonly SingleProducerSingleConsumerQueue m_queue;
-
- /// Initializes the debug view.
- /// The queue being debugged.
- public SingleProducerSingleConsumerQueue_DebugView(SingleProducerSingleConsumerQueue queue)
- {
- Debug.Assert(queue != null, "Expected a non-null queue.");
- m_queue = queue;
- }
- }
- }
-}
diff --git a/src/libraries/System.Threading.Channels/src/System.Threading.Channels.csproj b/src/libraries/System.Threading.Channels/src/System.Threading.Channels.csproj
index 95daa7a0ce7ee..ed2e1a9fa0077 100644
--- a/src/libraries/System.Threading.Channels/src/System.Threading.Channels.csproj
+++ b/src/libraries/System.Threading.Channels/src/System.Threading.Channels.csproj
@@ -37,8 +37,12 @@ System.Threading.Channel<T>
-
+
+
+
diff --git a/src/libraries/System.Threading.Tasks.Dataflow/src/Internal/ProducerConsumerQueues.cs b/src/libraries/System.Threading.Tasks.Dataflow/src/Internal/ProducerConsumerQueues.cs
deleted file mode 100644
index 2017ef2274e27..0000000000000
--- a/src/libraries/System.Threading.Tasks.Dataflow/src/Internal/ProducerConsumerQueues.cs
+++ /dev/null
@@ -1,540 +0,0 @@
-// Licensed to the .NET Foundation under one or more agreements.
-// The .NET Foundation licenses this file to you under the MIT license.
-
-// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
-//
-// ProducerConsumerQueues.cs
-//
-//
-// Specialized producer/consumer queues.
-//
-//
-// *************************
-//
-// There are two exact copies of this file:
-// src\ndp\clr\src\bcl\system\threading\tasks\producerConsumerQueue.cs
-// src\ndp\fx\src\dataflow\system\threading\tasks\dataflow\internal\producerConsumerQueue.cs
-// Keep both of them consistent by changing the other file when you change this one, also avoid:
-// 1- To reference internal types in mscorlib
-// 2- To reference any dataflow specific types
-// This should be fixed post Dev11 when this class becomes public.
-//
-// *************************
-// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
-
-using System.Collections;
-using System.Collections.Concurrent;
-using System.Collections.Generic;
-using System.Diagnostics;
-using System.Diagnostics.CodeAnalysis;
-using System.Runtime.InteropServices;
-using Internal;
-
-namespace System.Threading.Tasks
-{
- /// Represents a producer/consumer queue used internally by dataflow blocks.
- /// Specifies the type of data contained in the queue.
- internal interface IProducerConsumerQueue : IEnumerable
- {
- /// Enqueues an item into the queue.
- /// The item to enqueue.
- /// This method is meant to be thread-safe subject to the particular nature of the implementation.
- void Enqueue(T item);
-
- /// Attempts to dequeue an item from the queue.
- /// The dequeued item.
- /// true if an item could be dequeued; otherwise, false.
- /// This method is meant to be thread-safe subject to the particular nature of the implementation.
- bool TryDequeue([MaybeNullWhen(false)] out T result);
-
- /// Gets whether the collection is currently empty.
- /// This method may or may not be thread-safe.
- bool IsEmpty { get; }
-
- /// Gets the number of items in the collection.
- /// In many implementations, this method will not be thread-safe.
- int Count { get; }
-
- /// A thread-safe way to get the number of items in the collection. May synchronize access by locking the provided synchronization object.
- /// The sync object used to lock
- /// The collection count
- int GetCountSafe(object syncObj);
- }
-
- ///
- /// Provides a producer/consumer queue safe to be used by any number of producers and consumers concurrently.
- ///
- /// Specifies the type of data contained in the queue.
- [DebuggerDisplay("Count = {Count}")]
- internal sealed class MultiProducerMultiConsumerQueue : ConcurrentQueue, IProducerConsumerQueue
- {
- /// Enqueues an item into the queue.
- /// The item to enqueue.
- void IProducerConsumerQueue.Enqueue(T item) { base.Enqueue(item); }
-
- /// Attempts to dequeue an item from the queue.
- /// The dequeued item.
- /// true if an item could be dequeued; otherwise, false.
- bool IProducerConsumerQueue.TryDequeue([MaybeNullWhen(false)] out T result) { return base.TryDequeue(out result); }
-
- /// Gets whether the collection is currently empty.
- bool IProducerConsumerQueue.IsEmpty { get { return base.IsEmpty; } }
-
- /// Gets the number of items in the collection.
- int IProducerConsumerQueue.Count { get { return base.Count; } }
-
- /// A thread-safe way to get the number of items in the collection. May synchronize access by locking the provided synchronization object.
- /// ConcurrentQueue.Count is thread safe, no need to acquire the lock.
- int IProducerConsumerQueue.GetCountSafe(object syncObj) { return base.Count; }
- }
-
- ///
- /// Provides a producer/consumer queue safe to be used by only one producer and one consumer concurrently.
- ///
- /// Specifies the type of data contained in the queue.
- [DebuggerDisplay("Count = {Count}")]
- [DebuggerTypeProxy(typeof(SingleProducerSingleConsumerQueue<>.SingleProducerSingleConsumerQueue_DebugView))]
- internal sealed class SingleProducerSingleConsumerQueue : IProducerConsumerQueue
- {
- // Design:
- //
- // SingleProducerSingleConsumerQueue (SPSCQueue) is a concurrent queue designed to be used
- // by one producer thread and one consumer thread. SPSCQueue does not work correctly when used by
- // multiple producer threads concurrently or multiple consumer threads concurrently.
- //
- // SPSCQueue is based on segments that behave like circular buffers. Each circular buffer is represented
- // as an array with two indexes: _first and _last. _first is the index of the array slot for the consumer
- // to read next, and _last is the slot for the producer to write next. The circular buffer is empty when
- // (_first == _last), and full when ((_last+1) % _array.Length == _first).
- //
- // Since _first is only ever modified by the consumer thread and _last by the producer, the two indices can
- // be updated without interlocked operations. As long as the queue size fits inside a single circular buffer,
- // enqueues and dequeues simply advance the corresponding indices around the circular buffer. If an enqueue finds
- // that there is no room in the existing buffer, however, a new circular buffer is allocated that is twice as big
- // as the old buffer. From then on, the producer will insert values into the new buffer. The consumer will first
- // empty out the old buffer and only then follow the producer into the new (larger) buffer.
- //
- // As described above, the enqueue operation on the fast path only modifies the _first field of the current segment.
- // However, it also needs to read _last in order to verify that there is room in the current segment. Similarly, the
- // dequeue operation on the fast path only needs to modify _last, but also needs to read _first to verify that the
- // queue is non-empty. This results in true cache line sharing between the producer and the consumer.
- //
- // The cache line sharing issue can be mitigating by having a possibly stale copy of _first that is owned by the producer,
- // and a possibly stale copy of _last that is owned by the consumer. So, the consumer state is described using
- // (_first, _lastCopy) and the producer state using (_firstCopy, _last). The consumer state is separated from
- // the producer state by padding, which allows fast-path enqueues and dequeues from hitting shared cache lines.
- // _lastCopy is the consumer's copy of _last. Whenever the consumer can tell that there is room in the buffer
- // simply by observing _lastCopy, the consumer thread does not need to read _last and thus encounter a cache miss. Only
- // when the buffer appears to be empty will the consumer refresh _lastCopy from _last. _firstCopy is used by the producer
- // in the same way to avoid reading _first on the hot path.
-
- /// The initial size to use for segments (in number of elements).
- private const int INIT_SEGMENT_SIZE = 32; // must be a power of 2
- /// The maximum size to use for segments (in number of elements).
- private const int MAX_SEGMENT_SIZE = 0x1000000; // this could be made as large as Int32.MaxValue / 2
-
- /// The head of the linked list of segments.
- private volatile Segment _head;
- /// The tail of the linked list of segments.
- private volatile Segment _tail;
-
- /// Initializes the queue.
- internal SingleProducerSingleConsumerQueue()
- {
- // Validate constants in ctor rather than in an explicit cctor that would cause perf degradation
- Debug.Assert(INIT_SEGMENT_SIZE > 0, "Initial segment size must be > 0.");
- Debug.Assert((INIT_SEGMENT_SIZE & (INIT_SEGMENT_SIZE - 1)) == 0, "Initial segment size must be a power of 2");
- Debug.Assert(INIT_SEGMENT_SIZE <= MAX_SEGMENT_SIZE, "Initial segment size should be <= maximum.");
- Debug.Assert(MAX_SEGMENT_SIZE < int.MaxValue / 2, "Max segment size * 2 must be < Int32.MaxValue, or else overflow could occur.");
-
- // Initialize the queue
- _head = _tail = new Segment(INIT_SEGMENT_SIZE);
- }
-
- /// Enqueues an item into the queue.
- /// The item to enqueue.
- public void Enqueue(T item)
- {
- Segment segment = _tail;
- T[] array = segment._array;
- int last = segment._state._last; // local copy to avoid multiple volatile reads
-
- // Fast path: there's obviously room in the current segment
- int tail2 = (last + 1) & (array.Length - 1);
- if (tail2 != segment._state._firstCopy)
- {
- array[last] = item;
- segment._state._last = tail2;
- }
- // Slow path: there may not be room in the current segment.
- else EnqueueSlow(item, ref segment);
- }
-
- /// Enqueues an item into the queue.
- /// The item to enqueue.
- /// The segment in which to first attempt to store the item.
- private void EnqueueSlow(T item, ref Segment segment)
- {
- Debug.Assert(segment != null, "Expected a non-null segment.");
-
- if (segment._state._firstCopy != segment._state._first)
- {
- segment._state._firstCopy = segment._state._first;
- Enqueue(item); // will only recur once for this enqueue operation
- return;
- }
-
- int newSegmentSize = _tail._array.Length << 1; // double size
- Debug.Assert(newSegmentSize > 0, "The max size should always be small enough that we don't overflow.");
- if (newSegmentSize > MAX_SEGMENT_SIZE) newSegmentSize = MAX_SEGMENT_SIZE;
-
- var newSegment = new Segment(newSegmentSize);
- newSegment._array[0] = item;
- newSegment._state._last = 1;
- newSegment._state._lastCopy = 1;
-
- try { }
- finally
- {
- // Finally block to protect against corruption due to a thread abort
- // between setting _next and setting _tail.
- Volatile.Write(ref _tail._next, newSegment); // ensure segment not published until item is fully stored
- _tail = newSegment;
- }
- }
-
- /// Attempts to dequeue an item from the queue.
- /// The dequeued item.
- /// true if an item could be dequeued; otherwise, false.
- public bool TryDequeue([MaybeNullWhen(false)] out T result)
- {
- Segment segment = _head;
- T[] array = segment._array;
- int first = segment._state._first; // local copy to avoid multiple volatile reads
-
- // Fast path: there's obviously data available in the current segment
- if (first != segment._state._lastCopy)
- {
- result = array[first];
- array[first] = default(T)!; // Clear the slot to release the element
- segment._state._first = (first + 1) & (array.Length - 1);
- return true;
- }
- // Slow path: there may not be data available in the current segment
- else return TryDequeueSlow(ref segment, ref array, out result);
- }
-
- /// Attempts to dequeue an item from the queue.
- /// The array from which the item was dequeued.
- /// The segment from which the item was dequeued.
- /// The dequeued item.
- /// true if an item could be dequeued; otherwise, false.
- private bool TryDequeueSlow(ref Segment segment, ref T[] array, [MaybeNullWhen(false)] out T result)
- {
- Debug.Assert(segment != null, "Expected a non-null segment.");
- Debug.Assert(array != null, "Expected a non-null item array.");
-
- if (segment._state._last != segment._state._lastCopy)
- {
- segment._state._lastCopy = segment._state._last;
- return TryDequeue(out result); // will only recur once for this dequeue operation
- }
-
- if (segment._next != null && segment._state._first == segment._state._last)
- {
- segment = segment._next;
- array = segment._array;
- _head = segment;
- }
-
- int first = segment._state._first; // local copy to avoid extraneous volatile reads
-
- if (first == segment._state._last)
- {
- result = default(T);
- return false;
- }
-
- result = array[first];
- array[first] = default(T)!; // Clear the slot to release the element
- segment._state._first = (first + 1) & (segment._array.Length - 1);
- segment._state._lastCopy = segment._state._last; // Refresh _lastCopy to ensure that _first has not passed _lastCopy
-
- return true;
- }
-
- /// Attempts to peek at an item in the queue.
- /// The peeked item.
- /// true if an item could be peeked; otherwise, false.
- public bool TryPeek([MaybeNullWhen(false)] out T result)
- {
- Segment segment = _head;
- T[] array = segment._array;
- int first = segment._state._first; // local copy to avoid multiple volatile reads
-
- // Fast path: there's obviously data available in the current segment
- if (first != segment._state._lastCopy)
- {
- result = array[first];
- return true;
- }
- // Slow path: there may not be data available in the current segment
- else return TryPeekSlow(ref segment, ref array, out result);
- }
-
- /// Attempts to peek at an item in the queue.
- /// The array from which the item is peeked.
- /// The segment from which the item is peeked.
- /// The peeked item.
- /// true if an item could be peeked; otherwise, false.
- private bool TryPeekSlow(ref Segment segment, ref T[] array, [MaybeNullWhen(false)] out T result)
- {
- Debug.Assert(segment != null, "Expected a non-null segment.");
- Debug.Assert(array != null, "Expected a non-null item array.");
-
- if (segment._state._last != segment._state._lastCopy)
- {
- segment._state._lastCopy = segment._state._last;
- return TryPeek(out result); // will only recur once for this peek operation
- }
-
- if (segment._next != null && segment._state._first == segment._state._last)
- {
- segment = segment._next;
- array = segment._array;
- _head = segment;
- }
-
- int first = segment._state._first; // local copy to avoid extraneous volatile reads
-
- if (first == segment._state._last)
- {
- result = default(T);
- return false;
- }
-
- result = array[first];
- return true;
- }
-
- /// Attempts to dequeue an item from the queue.
- /// The predicate that must return true for the item to be dequeued. If null, all items implicitly return true.
- /// The dequeued item.
- /// true if an item could be dequeued; otherwise, false.
- public bool TryDequeueIf(Predicate? predicate, [MaybeNullWhen(false)] out T result)
- {
- Segment segment = _head;
- T[] array = segment._array;
- int first = segment._state._first; // local copy to avoid multiple volatile reads
-
- // Fast path: there's obviously data available in the current segment
- if (first != segment._state._lastCopy)
- {
- result = array[first];
- if (predicate == null || predicate(result))
- {
- array[first] = default(T)!; // Clear the slot to release the element
- segment._state._first = (first + 1) & (array.Length - 1);
- return true;
- }
- else
- {
- result = default(T);
- return false;
- }
- }
- // Slow path: there may not be data available in the current segment
- else return TryDequeueIfSlow(predicate, ref segment, ref array, out result);
- }
-
- /// Attempts to dequeue an item from the queue.
- /// The predicate that must return true for the item to be dequeued. If null, all items implicitly return true.
- /// The array from which the item was dequeued.
- /// The segment from which the item was dequeued.
- /// The dequeued item.
- /// true if an item could be dequeued; otherwise, false.
- private bool TryDequeueIfSlow(Predicate? predicate, ref Segment segment, ref T[] array, [MaybeNullWhen(false)] out T result)
- {
- Debug.Assert(segment != null, "Expected a non-null segment.");
- Debug.Assert(array != null, "Expected a non-null item array.");
-
- if (segment._state._last != segment._state._lastCopy)
- {
- segment._state._lastCopy = segment._state._last;
- return TryDequeueIf(predicate, out result); // will only recur once for this dequeue operation
- }
-
- if (segment._next != null && segment._state._first == segment._state._last)
- {
- segment = segment._next;
- array = segment._array;
- _head = segment;
- }
-
- int first = segment._state._first; // local copy to avoid extraneous volatile reads
-
- if (first == segment._state._last)
- {
- result = default(T);
- return false;
- }
-
- result = array[first];
- if (predicate == null || predicate(result))
- {
- array[first] = default(T)!; // Clear the slot to release the element
- segment._state._first = (first + 1) & (segment._array.Length - 1);
- segment._state._lastCopy = segment._state._last; // Refresh _lastCopy to ensure that _first has not passed _lastCopy
- return true;
- }
- else
- {
- result = default(T);
- return false;
- }
- }
-
- public void Clear()
- {
- while (TryDequeue(out _)) ;
- }
-
- /// Gets whether the collection is currently empty.
- /// WARNING: This should not be used concurrently without further vetting.
- public bool IsEmpty
- {
- // This implementation is optimized for calls from the consumer.
- get
- {
- Segment head = _head;
- if (head._state._first != head._state._lastCopy) return false; // _first is volatile, so the read of _lastCopy cannot get reordered
- if (head._state._first != head._state._last) return false;
- return head._next == null;
- }
- }
-
- /// Gets an enumerable for the collection.
- /// WARNING: This should only be used for debugging purposes. It is not safe to be used concurrently.
- public IEnumerator GetEnumerator()
- {
- for (Segment? segment = _head; segment != null; segment = segment._next)
- {
- for (int pt = segment._state._first;
- pt != segment._state._last;
- pt = (pt + 1) & (segment._array.Length - 1))
- {
- yield return segment._array[pt];
- }
- }
- }
- /// Gets an enumerable for the collection.
- /// WARNING: This should only be used for debugging purposes. It is not safe to be used concurrently.
- IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); }
-
- /// Gets the number of items in the collection.
- /// WARNING: This should only be used for debugging purposes. It is not meant to be used concurrently.
- public int Count
- {
- get
- {
- int count = 0;
- for (Segment? segment = _head; segment != null; segment = segment._next)
- {
- int arraySize = segment._array.Length;
- int first, last;
- while (true) // Count is not meant to be used concurrently, but this helps to avoid issues if it is
- {
- first = segment._state._first;
- last = segment._state._last;
- if (first == segment._state._first) break;
- }
- count += (last - first) & (arraySize - 1);
- }
- return count;
- }
- }
-
- /// A thread-safe way to get the number of items in the collection. May synchronize access by locking the provided synchronization object.
- /// The Count is not thread safe, so we need to acquire the lock.
- int IProducerConsumerQueue.GetCountSafe(object syncObj)
- {
- Debug.Assert(syncObj != null, "The syncObj parameter is null.");
- lock (syncObj)
- {
- return Count;
- }
- }
-
- /// A segment in the queue containing one or more items.
- [StructLayout(LayoutKind.Sequential)]
- private sealed class Segment
- {
- /// The next segment in the linked list of segments.
- internal Segment? _next;
- /// The data stored in this segment.
- internal readonly T[] _array;
- /// Details about the segment.
- internal SegmentState _state; // separated out to enable StructLayout attribute to take effect
-
- /// Initializes the segment.
- /// The size to use for this segment.
- internal Segment(int size)
- {
- Debug.Assert((size & (size - 1)) == 0, "Size must be a power of 2");
- _array = new T[size];
- }
- }
-
- /// Stores information about a segment.
- [StructLayout(LayoutKind.Sequential)] // enforce layout so that padding reduces false sharing
- private struct SegmentState
- {
- /// Padding to reduce false sharing between the segment's array and _first.
- internal PaddingFor32 _pad0;
-
- /// The index of the current head in the segment.
- internal volatile int _first;
- /// A copy of the current tail index.
- internal int _lastCopy; // not volatile as read and written by the producer, except for IsEmpty, and there _lastCopy is only read after reading the volatile _first
-
- /// Padding to reduce false sharing between the first and last.
- internal PaddingFor32 _pad1;
-
- /// A copy of the current head index.
- internal int _firstCopy; // not volatile as only read and written by the consumer thread
- /// The index of the current tail in the segment.
- internal volatile int _last;
-
- /// Padding to reduce false sharing with the last and what's after the segment.
- internal PaddingFor32 _pad2;
- }
-
- /// Debugger type proxy for a SingleProducerSingleConsumerQueue of T.
- private sealed class SingleProducerSingleConsumerQueue_DebugView
- {
- /// The queue being visualized.
- private readonly SingleProducerSingleConsumerQueue _queue;
-
- /// Initializes the debug view.
- /// The queue being debugged.
- public SingleProducerSingleConsumerQueue_DebugView(SingleProducerSingleConsumerQueue queue)
- {
- Debug.Assert(queue != null, "Expected a non-null queue.");
- _queue = queue;
- }
-
- /// Gets the contents of the list.
- [DebuggerBrowsable(DebuggerBrowsableState.RootHidden)]
- public T[] Items
- {
- get
- {
- List list = new List();
- foreach (T item in _queue)
- list.Add(item);
- return list.ToArray();
- }
- }
- }
- }
-}
diff --git a/src/libraries/System.Threading.Tasks.Dataflow/src/Internal/SourceCore.cs b/src/libraries/System.Threading.Tasks.Dataflow/src/Internal/SourceCore.cs
index 8d1d6250e5c24..c39bf97d3cf81 100644
--- a/src/libraries/System.Threading.Tasks.Dataflow/src/Internal/SourceCore.cs
+++ b/src/libraries/System.Threading.Tasks.Dataflow/src/Internal/SourceCore.cs
@@ -10,6 +10,7 @@
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
+using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
diff --git a/src/libraries/System.Threading.Tasks.Dataflow/src/Internal/SpscTargetCore.cs b/src/libraries/System.Threading.Tasks.Dataflow/src/Internal/SpscTargetCore.cs
index 804dc77ac0bfe..d610978a54dbe 100644
--- a/src/libraries/System.Threading.Tasks.Dataflow/src/Internal/SpscTargetCore.cs
+++ b/src/libraries/System.Threading.Tasks.Dataflow/src/Internal/SpscTargetCore.cs
@@ -10,6 +10,7 @@
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
+using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
diff --git a/src/libraries/System.Threading.Tasks.Dataflow/src/Internal/TargetCore.cs b/src/libraries/System.Threading.Tasks.Dataflow/src/Internal/TargetCore.cs
index cd7a24d44133d..393c9ca68c2e4 100644
--- a/src/libraries/System.Threading.Tasks.Dataflow/src/Internal/TargetCore.cs
+++ b/src/libraries/System.Threading.Tasks.Dataflow/src/Internal/TargetCore.cs
@@ -10,6 +10,7 @@
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
+using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
diff --git a/src/libraries/System.Threading.Tasks.Dataflow/src/System.Threading.Tasks.Dataflow.csproj b/src/libraries/System.Threading.Tasks.Dataflow/src/System.Threading.Tasks.Dataflow.csproj
index 079bac7b48fc1..68f71b14c8500 100644
--- a/src/libraries/System.Threading.Tasks.Dataflow/src/System.Threading.Tasks.Dataflow.csproj
+++ b/src/libraries/System.Threading.Tasks.Dataflow/src/System.Threading.Tasks.Dataflow.csproj
@@ -1,4 +1,4 @@
-
+$(NetCoreAppCurrent);$(NetCoreAppMinimum);netstandard2.1;netstandard2.0;$(NetFrameworkMinimum)true
@@ -50,7 +50,6 @@ System.Threading.Tasks.Dataflow.WriteOnceBlock<T>
-
@@ -59,6 +58,12 @@ System.Threading.Tasks.Dataflow.WriteOnceBlock<T>
+
+
+