Skip to content

Commit

Permalink
modified read-lock and re-introduced atomic guards
Browse files Browse the repository at this point in the history
only the first reader acquires a lock, subsequent readers increase atomic counter, and only the last remaining reader unlocks.

detects race-condition on initial lock and final unlock
  • Loading branch information
RalphSteinhagen committed Feb 22, 2021
1 parent 56e7c9c commit e0a2e9f
Showing 1 changed file with 37 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,10 @@
public class DefaultDataSetLock<D extends DataSet> implements DataSetLock<D> {
private static final long serialVersionUID = 1L;
private final transient StampedLock stampedLock = new StampedLock();
private transient long lastReadStamp;
private final AtomicLong lastReadStamp = new AtomicLong(-1L);
private final AtomicLong lastWriteStamp = new AtomicLong(-1L);
private final AtomicLong writerLockedByThreadId = new AtomicLong(-1L);
private final transient Object readerCountLock = new Object();
private int readerCount;
private final AtomicInteger readerCount = new AtomicInteger(0);
private final AtomicInteger writerCount = new AtomicInteger(0);
private final transient AtomicBoolean autoNotifyState = new AtomicBoolean(true);
private final transient D dataSet;
Expand Down Expand Up @@ -88,12 +87,12 @@ public D downGradeWriteLock() {
if (result == 0L) { // NOPMD to be expected return value from 'tryConvertToReadLock'
throw new IllegalStateException("cannot down-convert lock - tryConvertToReadLock return '0'");
}
readerCount++;
readerCount.incrementAndGet();
writerCount.decrementAndGet();
if ((lastReadStamp == 0) && stampedLock.isReadLocked() && (getReaderCount() > 1)) {
stampedLock.unlockRead(lastReadStamp);
if ((lastReadStamp.get() == 0) && stampedLock.isReadLocked() && (readerCount.get() > 1)) {
stampedLock.unlockRead(lastReadStamp.get());
}
lastReadStamp = result;
lastReadStamp.set(result);

return dataSet;
}
Expand All @@ -102,9 +101,7 @@ public D downGradeWriteLock() {
* @return number of readers presently locked on this data set - this counts only (deprecated) readers using read(Un)Lock()
*/
public int getReaderCount() {
synchronized (readerCountLock) {
return readerCount;
}
return readerCount.get();
}

/**
Expand All @@ -116,35 +113,39 @@ public int getWriterCount() {

@Override
public D readLock() {
synchronized (readerCountLock) {
if (readerCount == 0) {
lastReadStamp = stampedLock.readLock();
if (lastReadStamp.get() == -1 && readerCount.get() == 0) {
// first reader needs to acquire a lock to guard against writes
final long stamp = stampedLock.readLock();
if (lastReadStamp.compareAndExchange(-1, stamp) != -1) {
// meanwhile already locked by another thread
stampedLock.unlockRead(stamp);
}
readerCount++;
}
// other readers just increment the reader lock
readerCount.getAndIncrement();

return dataSet;
}

@Override
public D readLockGuard(final Runnable reading) {
final long stamp = stampedLock.readLock();
readLock();
try {
reading.run();
} finally {
stampedLock.unlockRead(stamp);
readUnLock();
}
return dataSet;
}

@Override
public <R> R readLockGuard(final Supplier<R> reading) {
R result;
final long stamp = stampedLock.readLock();
readLock();
try {
result = reading.get();
} finally {
stampedLock.unlockRead(stamp);
readUnLock();
}
return result;
}
Expand All @@ -156,11 +157,11 @@ public D readLockGuardOptimistic(final Runnable reading) { // NOPMD -- runnable
if (stampedLock.validate(stamp)) {
return dataSet;
}
final long stampHard = stampedLock.readLock();
readLock();
try {
reading.run();
} finally {
stampedLock.unlockRead(stampHard);
readUnLock();
}
return dataSet;
}
Expand All @@ -174,39 +175,42 @@ public <R> R readLockGuardOptimistic(final Supplier<R> reading) {
return result;
}
// fallback to blocking read
final long stampHard = stampedLock.readLock();
readLock();
try {
result = reading.get();
} finally {
stampedLock.unlockRead(stampHard);
readUnLock();
}
return result;
}

@Override
public D readUnLock() {
synchronized (readerCountLock) {
readerCount--;
if (readerCount == 0) {
stampedLock.unlockRead(lastReadStamp);
lastReadStamp = 0L;
} else if (readerCount < 0) {
throw new IllegalStateException("read lock already unlocked");
if (readerCount.get() == 1 && lastReadStamp.get() != -1) {
final long lastReadStampLocal = lastReadStamp.get();
//noinspection StatementWithEmptyBody
if (lastReadStamp.compareAndExchange(lastReadStampLocal, -1L) != lastReadStampLocal) { // NOPMD NOSONAR - for better logic readability (humans)
// already unlocked by another thread
} else {
// last reader needs to release the lock that guards against writes
stampedLock.unlockRead(lastReadStampLocal);
}
}

if (readerCount.decrementAndGet() < 0) {
throw new IllegalStateException("read lock/unlock mismatch - already unlocked");
}
return dataSet;
}

@Override
public D writeLock() {
final long callingThreadId = Thread.currentThread().getId();
if (writerLockedByThreadId.get() != callingThreadId) {
// wrong not matching thread - need to acquire lock
// new/not matching existing thread holding lock - need to acquire new lock
long stamp;
do {
stamp = stampedLock.tryWriteLock();
// stamp = stampedLock.writeLock() - would be the natural choice (performance) but blocks in write-lock racing conditions
//stamp = stampedLock.tryWriteLock()
stamp = stampedLock.writeLock();
} while (stamp == 0);
// acquired lock
writerLockedByThreadId.set(callingThreadId);
Expand All @@ -222,7 +226,6 @@ public D writeLock() {
public D writeLockGuard(final Runnable writing) { // NOPMD -- runnable not used in a thread context
writeLock();
final boolean oldAutoNotificationState = dataSet.autoNotification().getAndSet(false);

try {
writing.run();
} finally {
Expand Down

0 comments on commit e0a2e9f

Please sign in to comment.