Skip to content

Commit

Permalink
Detriplicate internal SingleProducerSingleConsumerQueue (#76932)
Browse files Browse the repository at this point in the history
System.Private.CoreLib, System.Threading.Tasks.Dataflow, and System.Threading.Channels have all ended up with their own copy of SingleProducerSingleConsumerQueue and its associated helpers.  This consolidates them down to a single shared copy.

There's no functional change here, just deleting duplicates and moving things around.
  • Loading branch information
stephentoub authored Oct 24, 2022
1 parent 0c0102d commit 70fb135
Show file tree
Hide file tree
Showing 12 changed files with 221 additions and 923 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;

namespace System.Collections.Concurrent
{
/// <summary>Represents a producer/consumer queue.</summary>
/// <typeparam name="T">Specifies the type of data contained in the queue.</typeparam>
internal interface IProducerConsumerQueue<T> : IEnumerable<T>
{
/// <summary>Enqueues an item into the queue.</summary>
/// <param name="item">The item to enqueue.</param>
/// <remarks>This method is meant to be thread-safe subject to the particular nature of the implementation.</remarks>
void Enqueue(T item);

/// <summary>Attempts to dequeue an item from the queue.</summary>
/// <param name="result">The dequeued item.</param>
/// <returns>true if an item could be dequeued; otherwise, false.</returns>
/// <remarks>This method is meant to be thread-safe subject to the particular nature of the implementation.</remarks>
bool TryDequeue([MaybeNullWhen(false)] out T result);

/// <summary>Gets whether the collection is currently empty.</summary>
/// <remarks>This method may or may not be thread-safe.</remarks>
bool IsEmpty { get; }

/// <summary>Gets the number of items in the collection.</summary>
/// <remarks>In many implementations, this method will not be thread-safe.</remarks>
int Count { get; }

/// <summary>A thread-safe way to get the number of items in the collection. May synchronize access by locking the provided synchronization object.</summary>
/// <param name="syncObj">The sync object used to lock</param>
/// <returns>The collection count</returns>
int GetCountSafe(object syncObj);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;

namespace System.Collections.Concurrent
{
/// <summary>
/// Provides a producer/consumer queue safe to be used by any number of producers and consumers concurrently.
/// </summary>
/// <typeparam name="T">Specifies the type of data contained in the queue.</typeparam>
[DebuggerDisplay("Count = {Count}")]
internal sealed class MultiProducerMultiConsumerQueue<T> : ConcurrentQueue<T>, IProducerConsumerQueue<T>
{
/// <summary>Enqueues an item into the queue.</summary>
/// <param name="item">The item to enqueue.</param>
void IProducerConsumerQueue<T>.Enqueue(T item) { base.Enqueue(item); }

/// <summary>Attempts to dequeue an item from the queue.</summary>
/// <param name="result">The dequeued item.</param>
/// <returns>true if an item could be dequeued; otherwise, false.</returns>
bool IProducerConsumerQueue<T>.TryDequeue([MaybeNullWhen(false)] out T result) { return base.TryDequeue(out result); }

/// <summary>Gets whether the collection is currently empty.</summary>
bool IProducerConsumerQueue<T>.IsEmpty => base.IsEmpty;

/// <summary>Gets the number of items in the collection.</summary>
int IProducerConsumerQueue<T>.Count => base.Count;

/// <summary>A thread-safe way to get the number of items in the collection. May synchronize access by locking the provided synchronization object.</summary>
/// <remarks>ConcurrentQueue.Count is thread safe, no need to acquire the lock.</remarks>
int IProducerConsumerQueue<T>.GetCountSafe(object syncObj) => base.Count;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace System.Collections.Concurrent
/// <typeparam name="T">Specifies the type of data contained in the queue.</typeparam>
[DebuggerDisplay("Count = {Count}")]
[DebuggerTypeProxy(typeof(SingleProducerSingleConsumerQueue<>.SingleProducerSingleConsumerQueue_DebugView))]
internal sealed class SingleProducerSingleConsumerQueue<T> : IEnumerable<T>
internal sealed class SingleProducerSingleConsumerQueue<T> : IProducerConsumerQueue<T>
{
// Design:
//
Expand Down Expand Up @@ -53,7 +53,7 @@ internal sealed class SingleProducerSingleConsumerQueue<T> : IEnumerable<T>
/// <summary>The initial size to use for segments (in number of elements).</summary>
private const int InitialSegmentSize = 32; // must be a power of 2
/// <summary>The maximum size to use for segments (in number of elements).</summary>
private const int MaxSegmentSize = 0x1000000; // this could be made as large as Int32.MaxValue / 2
private const int MaxSegmentSize = 0x1000000; // this could be made as large as int.MaxValue / 2

/// <summary>The head of the linked list of segments.</summary>
private volatile Segment _head;
Expand All @@ -67,7 +67,7 @@ public SingleProducerSingleConsumerQueue()
Debug.Assert(InitialSegmentSize > 0, "Initial segment size must be > 0.");
Debug.Assert((InitialSegmentSize & (InitialSegmentSize - 1)) == 0, "Initial segment size must be a power of 2");
Debug.Assert(InitialSegmentSize <= MaxSegmentSize, "Initial segment size should be <= maximum.");
Debug.Assert(MaxSegmentSize < int.MaxValue / 2, "Max segment size * 2 must be < Int32.MaxValue, or else overflow could occur.");
Debug.Assert(MaxSegmentSize < int.MaxValue / 2, "Max segment size * 2 must be < int.MaxValue, or else overflow could occur.");

// Initialize the queue
_head = _tail = new Segment(InitialSegmentSize);
Expand All @@ -87,9 +87,11 @@ public void Enqueue(T item)
{
array[last] = item;
segment._state._last = tail2;
return;
}

// Slow path: there may not be room in the current segment.
else EnqueueSlow(item, ref segment);
EnqueueSlow(item, ref segment);
}

/// <summary>Enqueues an item into the queue.</summary>
Expand All @@ -106,9 +108,8 @@ private void EnqueueSlow(T item, ref Segment segment)
return;
}

int newSegmentSize = _tail._array.Length << 1; // double size
int newSegmentSize = Math.Min(_tail._array.Length * 2, MaxSegmentSize);
Debug.Assert(newSegmentSize > 0, "The max size should always be small enough that we don't overflow.");
if (newSegmentSize > MaxSegmentSize) newSegmentSize = MaxSegmentSize;

var newSegment = new Segment(newSegmentSize);
newSegment._array[0] = item;
Expand All @@ -118,8 +119,8 @@ private void EnqueueSlow(T item, ref Segment segment)
try { }
finally
{
// Finally block to protect against corruption due to a thread abort
// between setting _next and setting _tail.
// Finally block to protect against corruption due to a thread abort between
// setting _next and setting _tail (this is only relevant on .NET Framework).
Volatile.Write(ref _tail._next, newSegment); // ensure segment not published until item is fully stored
_tail = newSegment;
}
Expand Down Expand Up @@ -197,7 +198,7 @@ private bool TryDequeueSlow(Segment segment, T[] array, bool peek, [MaybeNullWhe

if (first == segment._state._last)
{
result = default!;
result = default;
return false;
}

Expand All @@ -212,15 +213,105 @@ private bool TryDequeueSlow(Segment segment, T[] array, bool peek, [MaybeNullWhe
return true;
}

/// <summary>Attempts to dequeue an item from the queue.</summary>
/// <param name="predicate">The predicate that must return true for the item to be dequeued. If null, all items implicitly return true.</param>
/// <param name="result">The dequeued item.</param>
/// <returns>true if an item could be dequeued; otherwise, false.</returns>
public bool TryDequeueIf(Predicate<T>? predicate, [MaybeNullWhen(false)] out T result)
{
Segment segment = _head;
T[] array = segment._array;
int first = segment._state._first; // local copy to avoid multiple volatile reads

// Fast path: there's obviously data available in the current segment
if (first != segment._state._lastCopy)
{
result = array[first];
if (predicate == null || predicate(result))
{
array[first] = default!; // Clear the slot to release the element
segment._state._first = (first + 1) & (array.Length - 1);
return true;
}

result = default;
return false;
}

// Slow path: there may not be data available in the current segment
return TryDequeueIfSlow(predicate, segment, array, out result);
}

/// <summary>Attempts to dequeue an item from the queue.</summary>
/// <param name="predicate">The predicate that must return true for the item to be dequeued. If null, all items implicitly return true.</param>
/// <param name="array">The array from which the item was dequeued.</param>
/// <param name="segment">The segment from which the item was dequeued.</param>
/// <param name="result">The dequeued item.</param>
/// <returns>true if an item could be dequeued; otherwise, false.</returns>
private bool TryDequeueIfSlow(Predicate<T>? predicate, Segment segment, T[] array, [MaybeNullWhen(false)] out T result)
{
Debug.Assert(segment != null, "Expected a non-null segment.");
Debug.Assert(array != null, "Expected a non-null item array.");

if (segment._state._last != segment._state._lastCopy)
{
segment._state._lastCopy = segment._state._last;
return TryDequeueIf(predicate, out result); // will only recur once for this dequeue operation
}

if (segment._next != null && segment._state._first == segment._state._last)
{
segment = segment._next;
array = segment._array;
_head = segment;
}

int first = segment._state._first; // local copy to avoid extraneous volatile reads

if (first == segment._state._last)
{
result = default;
return false;
}

result = array[first];
if (predicate == null || predicate(result))
{
array[first] = default!; // Clear the slot to release the element
segment._state._first = (first + 1) & (segment._array.Length - 1);
segment._state._lastCopy = segment._state._last; // Refresh _lastCopy to ensure that _first has not passed _lastCopy
return true;
}

result = default;
return false;
}

public void Clear()
{
while (TryDequeue(out _)) ;
}

/// <summary>Gets whether the collection is currently empty.</summary>
/// <remarks>WARNING: This should not be used concurrently without further vetting.</remarks>
public bool IsEmpty
{
// This implementation is optimized for calls from the consumer.
get
{
// This implementation is optimized for calls from the consumer.

Segment head = _head;
if (head._state._first != head._state._lastCopy) return false; // _first is volatile, so the read of _lastCopy cannot get reordered
if (head._state._first != head._state._last) return false;

if (head._state._first != head._state._lastCopy)
{
return false; // _first is volatile, so the read of _lastCopy cannot get reordered
}

if (head._state._first != head._state._last)
{
return false;
}

return head._next == null;
}
}
Expand All @@ -239,12 +330,13 @@ public IEnumerator<T> GetEnumerator()
}
}
}

IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
/// <summary>Gets an enumerable for the collection.</summary>
/// <remarks>This method is not safe to use concurrently with any other members that may mutate the collection.</remarks>
IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); }

/// <summary>Gets the number of items in the collection.</summary>
/// <remarks>This method is not safe to use concurrently with any other members that may mutate the collection.</remarks>
internal int Count
public int Count
{
get
{
Expand All @@ -257,14 +349,29 @@ internal int Count
{
first = segment._state._first;
last = segment._state._last;
if (first == segment._state._first) break;
if (first == segment._state._first)
{
break;
}
}

count += (last - first) & (arraySize - 1);
}
return count;
}
}

/// <summary>A thread-safe way to get the number of items in the collection. May synchronize access by locking the provided synchronization object.</summary>
/// <remarks>The Count is not thread safe, so we need to acquire the lock.</remarks>
int IProducerConsumerQueue<T>.GetCountSafe(object syncObj)
{
Debug.Assert(syncObj != null, "The syncObj parameter is null.");
lock (syncObj)
{
return Count;
}
}

/// <summary>A segment in the queue containing one or more items.</summary>
[StructLayout(LayoutKind.Sequential)]
private sealed class Segment
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1115,7 +1115,6 @@
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\Tasks\ConcurrentExclusiveSchedulerPair.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\Tasks\Future.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\Tasks\FutureFactory.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\Tasks\ProducerConsumerQueues.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\Tasks\Sources\IValueTaskSource.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\Tasks\Sources\ManualResetValueTaskSourceCore.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\Tasks\Task.cs" />
Expand Down Expand Up @@ -1275,6 +1274,15 @@
<Compile Include="$(CommonPath)System\SR.cs">
<Link>Common\System\SR.cs</Link>
</Compile>
<Compile Include="$(CommonPath)System\Collections\Concurrent\IProducerConsumerQueue.cs">
<Link>System\Collections\Concurrent\IProducerConsumerQueue.cs</Link>
</Compile>
<Compile Include="$(CommonPath)System\Collections\Concurrent\MultiProducerMultiConsumerQueue.cs">
<Link>System\Collections\Concurrent\MultiProducerMultiConsumerQueue.cs</Link>
</Compile>
<Compile Include="$(CommonPath)System\Collections\Concurrent\SingleProducerSingleConsumerQueue.cs">
<Link>System\Collections\Concurrent\SingleProducerSingleConsumerQueue.cs</Link>
</Compile>
<Compile Include="$(CommonPath)System\Collections\Generic\EnumerableHelpers.cs">
<Link>Common\System\Collections\Generic\EnumerableHelpers.cs</Link>
</Compile>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-

using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;

Expand Down
Loading

0 comments on commit 70fb135

Please sign in to comment.