Skip to content
This repository has been archived by the owner on Jan 23, 2023. It is now read-only.
/ corefx Public archive

Commit

Permalink
Revert "Ensure ConcurrentBag's TryTake is linearizable (#30947) (#31009
Browse files Browse the repository at this point in the history
…)"

This reverts commit b6be5d7.
  • Loading branch information
danmoseley authored Jul 17, 2018
1 parent b6be5d7 commit 3fb080c
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 176 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,9 @@ namespace System.Collections.Concurrent
public class ConcurrentBag<T> : IProducerConsumerCollection<T>, IReadOnlyCollection<T>
{
/// <summary>The per-bag, per-thread work-stealing queues.</summary>
private readonly ThreadLocal<WorkStealingQueue> _locals;
private ThreadLocal<WorkStealingQueue> _locals;
/// <summary>The head work stealing queue in a linked list of queues.</summary>
private volatile WorkStealingQueue _workStealingQueues;
/// <summary>Number of times any list transitions from empty to non-empty.</summary>
private long _emptyToNonEmptyListTransitionCount;

/// <summary>Initializes a new instance of the <see cref="ConcurrentBag{T}"/> class.</summary>
public ConcurrentBag()
Expand Down Expand Up @@ -64,7 +62,7 @@ public ConcurrentBag(IEnumerable<T> collection)
WorkStealingQueue queue = GetCurrentThreadWorkStealingQueue(forceCreate: true);
foreach (T item in collection)
{
queue.LocalPush(item, ref _emptyToNonEmptyListTransitionCount);
queue.LocalPush(item);
}
}

Expand All @@ -74,9 +72,7 @@ public ConcurrentBag(IEnumerable<T> collection)
/// <param name="item">The object to be added to the
/// <see cref="ConcurrentBag{T}"/>. The value can be a null reference
/// (Nothing in Visual Basic) for reference types.</param>
public void Add(T item) =>
GetCurrentThreadWorkStealingQueue(forceCreate: true)
.LocalPush(item, ref _emptyToNonEmptyListTransitionCount);
public void Add(T item) => GetCurrentThreadWorkStealingQueue(forceCreate: true).LocalPush(item);

/// <summary>
/// Attempts to add an object to the <see cref="ConcurrentBag{T}"/>.
Expand Down Expand Up @@ -180,55 +176,22 @@ private bool TrySteal(out T result, bool take)
CDSCollectionETWBCLProvider.Log.ConcurrentBag_TryPeekSteals();
}

while (true)
// If there's no local queue for this thread, just start from the head queue
// and try to steal from each queue until we get a result.
WorkStealingQueue localQueue = GetCurrentThreadWorkStealingQueue(forceCreate: false);
if (localQueue == null)
{
// We need to track whether any lists transition from empty to non-empty both before
// and after we attempt the steal in case we don't get an item:
//
// If we don't get an item, we need to handle the possibility of a race condition that led to
// an item being added to a list after we already looked at it in a way that breaks
// linearizability. For example, say there are three threads 0, 1, and 2, each with their own
// list that's currently empty. We could then have the following series of operations:
// - Thread 2 adds an item, such that there's now 1 item in the bag.
// - Thread 1 sees that the count is 1 and does a Take. Its local list is empty, so it tries to
// steal from list 0, but it's empty. Before it can steal from Thread 2, it's pre-empted.
// - Thread 0 adds an item. The count is now 2.
// - Thread 2 takes an item, which comes from its local queue. The count is now 1.
// - Thread 1 continues to try to steal from 2, finds it's empty, and fails its take, even though
// at any given time during its take the count was >= 1. Oops.
// This is particularly problematic for wrapper types that track count using their own synchronization,
// e.g. BlockingCollection, and thus expect that a take will always be successful if the number of items
// is known to be > 0.
//
// We work around this by looking at the number of times any list transitions from == 0 to > 0,
// checking that before and after the steal attempts. We don't care about > 0 to > 0 transitions,
// because a steal from a list with > 0 elements would have been successful.
long initialEmptyToNonEmptyCounts = Interlocked.Read(ref _emptyToNonEmptyListTransitionCount);

// If there's no local queue for this thread, just start from the head queue
// and try to steal from each queue until we get a result. If there is a local queue from this thread,
// then start from the next queue after it, and then iterate around back from the head to this queue,
// not including it.
WorkStealingQueue localQueue = GetCurrentThreadWorkStealingQueue(forceCreate: false);
bool gotItem = localQueue == null ?
TryStealFromTo(_workStealingQueues, null, out result, take) :
(TryStealFromTo(localQueue._nextQueue, null, out result, take) || TryStealFromTo(_workStealingQueues, localQueue, out result, take));
if (gotItem)
{
return true;
}
return TryStealFromTo(_workStealingQueues, null, out result, take);
}

if (Interlocked.Read(ref _emptyToNonEmptyListTransitionCount) == initialEmptyToNonEmptyCounts)
{
// The version number matched, so we didn't get an item and we're confident enough
// in our steal attempt to say so.
return false;
}
// If there is a local queue from this thread, then start from the next queue
// after it, and then iterate around back from the head to this queue, not including it.
return
TryStealFromTo(localQueue._nextQueue, null, out result, take) ||
TryStealFromTo(_workStealingQueues, localQueue, out result, take);

// Some list transitioned from empty to non-empty between just before the steal and now.
// Since we don't know if it caused a race condition like the above description, we
// have little choice but to try to steal again.
}
// TODO: Investigate storing the queues in an array instead of a linked list, and then
// randomly choosing a starting location from which to start iterating.
}

/// <summary>
Expand Down Expand Up @@ -721,7 +684,7 @@ internal bool IsEmpty
/// Add new item to the tail of the queue.
/// </summary>
/// <param name="item">The item to add.</param>
internal void LocalPush(T item, ref long emptyToNonEmptyListTransitionCount)
internal void LocalPush(T item)
{
Debug.Assert(Environment.CurrentManagedThreadId == _ownerThreadId);
bool lockTaken = false;
Expand All @@ -738,7 +701,7 @@ internal void LocalPush(T item, ref long emptyToNonEmptyListTransitionCount)
_currentOp = (int)Operation.None; // set back to None temporarily to avoid a deadlock
lock (this)
{
Debug.Assert(_tailIndex == tail, "No other thread should be changing _tailIndex");
Debug.Assert(_tailIndex == int.MaxValue, "No other thread should be changing _tailIndex");

// Rather than resetting to zero, we'll just mask off the bits we don't care about.
// This way we don't need to rearrange the items already in the queue; they'll be found
Expand All @@ -748,31 +711,22 @@ internal void LocalPush(T item, ref long emptyToNonEmptyListTransitionCount)
// bits are set, so all of the bits we're keeping will also be set. Thus it's impossible
// for the head to end up > than the tail, since you can't set any more bits than all of them.
_headIndex = _headIndex & _mask;
_tailIndex = tail = tail & _mask;
_tailIndex = tail = _tailIndex & _mask;
Debug.Assert(_headIndex <= _tailIndex);

Interlocked.Exchange(ref _currentOp, (int)Operation.Add); // ensure subsequent reads aren't reordered before this
_currentOp = (int)Operation.Add;
}
}

// We'd like to take the fast path that doesn't require locking, if possible. It's not possible if:
// - another thread is currently requesting that the whole bag synchronize, e.g. a ToArray operation
// - if there are fewer than two spaces available. One space is necessary for obvious reasons:
// to store the element we're trying to push. The other is necessary due to synchronization with steals.
// A stealing thread first increments _headIndex to reserve the slot at its old value, and then tries to
// read from that slot. We could potentially have a race condition whereby _headIndex is incremented just
// before this check, in which case we could overwrite the element being stolen as that slot would appear
// to be empty. Thus, we only allow the fast path if there are two empty slots.
// - if there <= 1 elements in the list. We need to be able to successfully track transitions from
// empty to non-empty in a way that other threads can check, and we can only do that tracking
// correctly if we synchronize with steals when it's possible a steal could take the last item
// in the list just as we're adding. It's possible at this point that there's currently an active steal
// operation happening but that it hasn't yet incremented the head index, such that we could read a smaller
// than accurate by 1 value for the head. However, only one steal could possibly be doing so, as steals
// take the lock, and another steal couldn't then increment the header further because it'll see that
// there's currently an add operation in progress and wait until the add completes.
int head = _headIndex; // read after _currentOp set to Add
if (!_frozen && head < tail - 1 & tail < (head + _mask))
// We'd like to take the fast path that doesn't require locking, if possible. It's not possible if another
// thread is currently requesting that the whole bag synchronize, e.g. a ToArray operation. It's also
// not possible if there are fewer than two spaces available. One space is necessary for obvious reasons:
// to store the element we're trying to push. The other is necessary due to synchronization with steals.
// A stealing thread first increments _headIndex to reserve the slot at its old value, and then tries to
// read from that slot. We could potentially have a race condition whereby _headIndex is incremented just
// before this check, in which case we could overwrite the element being stolen as that slot would appear
// to be empty. Thus, we only allow the fast path if there are two empty slots.
if (!_frozen && tail < (_headIndex + _mask))
{
_array[tail & _mask] = item;
_tailIndex = tail + 1;
Expand All @@ -783,8 +737,8 @@ internal void LocalPush(T item, ref long emptyToNonEmptyListTransitionCount)
_currentOp = (int)Operation.None; // set back to None to avoid a deadlock
Monitor.Enter(this, ref lockTaken);

head = _headIndex;
int count = tail - head; // this count is stable, as we're holding the lock
int head = _headIndex;
int count = _tailIndex - _headIndex;

// If we're full, expand the array.
if (count >= _mask)
Expand Down Expand Up @@ -813,14 +767,6 @@ internal void LocalPush(T item, ref long emptyToNonEmptyListTransitionCount)
_array[tail & _mask] = item;
_tailIndex = tail + 1;

// Now that the item has been added, if we were at 0 (now at 1) item,
// increase the empty to non-empty transition count.
if (count == 0)
{
// We just transitioned from empty to non-empty, so increment the transition count.
Interlocked.Increment(ref emptyToNonEmptyListTransitionCount);
}

// Update the count to avoid overflow. We can trust _stealCount here,
// as we're inside the lock and it's only manipulated there.
_addTakeCount -= _stealCount;
Expand Down Expand Up @@ -962,50 +908,41 @@ internal bool TryLocalPeek(out T result)
/// <param name="take">true to take the item; false to simply peek at it</param>
internal bool TrySteal(out T result, bool take)
{
lock (this)
// Fast-path check to see if the queue is empty.
if (_headIndex < _tailIndex)
{
int head = _headIndex; // _headIndex is only manipulated under the lock
if (take)
// Anything other than empty requires synchronization.
lock (this)
{
// If there are <= 2 items in the list, we need to ensure no add operation
// is in progress, as add operations need to accurately count transitions
// from empty to non-empty, and they can only do that if there are no concurrent
// steal operations happening at the time.
if (head < _tailIndex - 1 && _currentOp != (int)Operation.Add)
int head = _headIndex;
if (take)
{
var spinner = new SpinWait();
do
// Increment head to tentatively take an element: a full fence is used to ensure the read
// of _tailIndex doesn't move earlier, as otherwise we could potentially end up stealing
// the same element that's being popped locally.
Interlocked.Exchange(ref _headIndex, unchecked(head + 1));

// If there's an element to steal, do it.
if (head < _tailIndex)
{
spinner.SpinOnce();
int idx = head & _mask;
result = _array[idx];
_array[idx] = default(T);
_stealCount++;
return true;
}
else
{
// We contended with the local thread and lost the race, so restore the head.
_headIndex = head;
}
while (_currentOp == (int)Operation.Add);
}

// Increment head to tentatively take an element: a full fence is used to ensure the read
// of _tailIndex doesn't move earlier, as otherwise we could potentially end up stealing
// the same element that's being popped locally.
Interlocked.Exchange(ref _headIndex, unchecked(head + 1));

// If there's an element to steal, do it.
if (head < _tailIndex)
else if (head < _tailIndex)
{
int idx = head & _mask;
result = _array[idx];
_array[idx] = default(T);
_stealCount++;
// Peek, if there's an element available
result = _array[head & _mask];
return true;
}
else
{
// We contended with the local thread and lost the race, so restore the head.
_headIndex = head;
}
}
else if (head < _tailIndex)
{
// Peek, if there's an element available
result = _array[head & _mask];
return true;
}
}

Expand Down
56 changes: 0 additions & 56 deletions src/System.Collections.Concurrent/tests/ConcurrentBagTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -92,62 +92,6 @@ public static void AddManyItems_ThenTakeOnDifferentThread_ItemsOutputInExpectedO
}, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default).GetAwaiter().GetResult();
}

[Fact]
public static void SingleProducerAdding_MultiConsumerTaking_SemaphoreThrottling_AllTakesSucceed()
{
var bag = new ConcurrentBag<int>();
var s = new SemaphoreSlim(0);
CountdownEvent ce = null;
const int ItemCount = 200_000;

int producerNextValue = 0;
Action producer = null;
producer = delegate
{
ThreadPool.QueueUserWorkItem(delegate
{
bag.Add(producerNextValue++);
s.Release();
if (producerNextValue < ItemCount)
{
producer();
}
else
{
ce.Signal();
}
});
};

int consumed = 0;
Action consumer = null;
consumer = delegate
{
ThreadPool.QueueUserWorkItem(delegate
{
if (s.Wait(0))
{
Assert.True(bag.TryTake(out _), "There's an item available, but we couldn't take it.");
Interlocked.Increment(ref consumed);
}
else if (Volatile.Read(ref consumed) >= ItemCount)
{
ce.Signal();
return;
}

consumer();
});
};

// one producer, two consumers
ce = new CountdownEvent(3);
producer();
consumer();
consumer();
ce.Wait();
}

[Theory]
[InlineData(0)]
[InlineData(1)]
Expand Down

0 comments on commit 3fb080c

Please sign in to comment.