Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes issue366 #367

Merged
merged 6 commits into from
Feb 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 17 additions & 35 deletions chartfx-chart/src/main/java/de/gsi/chart/XYChart.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package de.gsi.chart;

import java.security.InvalidParameterException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -52,7 +54,7 @@
public class XYChart extends Chart {
private static final Logger LOGGER = LoggerFactory.getLogger(XYChart.class);
protected static final int BURST_LIMIT_MS = 15;
protected BooleanProperty polarPlot = new SimpleBooleanProperty(this, "polarPlot", false);
protected final BooleanProperty polarPlot = new SimpleBooleanProperty(this, "polarPlot", false);
private final ObjectProperty<PolarTickStep> polarStepSize = new SimpleObjectProperty<>(PolarTickStep.THIRTY);
private final GridRenderer gridRenderer = new GridRenderer();
protected final ChangeListener<? super Boolean> gridLineVisibilitychange = (ob, o, n) -> requestLayout();
Expand All @@ -65,7 +67,7 @@ public class XYChart extends Chart {
*
*/
public XYChart() {
this(new Axis[] {});
this(new Axis[] {}); // NOPMD NOSONAR
// N.B. this constructor is needed since JavaFX seems to instantiate fxml using reflection to find the corresponding constructor
}

Expand Down Expand Up @@ -129,7 +131,6 @@ public ObservableList<DataSet> getAllDatasets() {
* that add/remove datasets from a global observable list
*/
public ObservableList<DataSet> getAllShownDatasets() {
// return allVisibleDataSets;
final ObservableList<DataSet> ret = FXCollections.observableArrayList();
ret.addAll(getDatasets());
getRenderers().stream().filter(Renderer::showInLegend).forEach(renderer -> ret.addAll(renderer.getDatasets()));
Expand Down Expand Up @@ -259,33 +260,20 @@ public void updateAxisRange() {
dataSets.parallelStream().forEach(dataset -> dataset.getAxisDescriptions().parallelStream().filter(axisD -> !axisD.isDefined()) //
.forEach(axisDescription -> dataset.lock().writeLockGuard(() -> dataset.recomputeLimits(axisDescription.getDimIndex()))));

// N.B. possible race condition on this line -> for the future to solve
// recomputeLimits holds a writeLock the following sections need a read lock (for allowing parallel axis)
// there isn't an easy way to down-grade the established write locks into read locks (yet)
// Experimental version:
// dataSets.forEach(dataset -> {
// dataset.lock().writeLock();
// dataset.getAxisDescriptions().parallelStream().filter(axisD -> !axisD.isDefined())
// .forEach(axisDescription -> {
// dataset.setAutoNotification(false);
// dataset.recomputeLimits(dataset.getAxisDescriptions().indexOf(axisDescription));
// });
// DefaultDataSetLock<DataSet> myLock = (DefaultDataSetLock<DataSet>) dataset.lock();
// myLock.downGradeWriteLock();
// });

dataSets.forEach(ds -> ds.lock().readLock());
try {
getAxes().forEach(chartAxis -> {
final List<DataSet> dataSetForAxis = getDataSetForAxis(chartAxis);
updateNumericAxis(chartAxis, dataSetForAxis);
// chartAxis.requestAxisLayout();
});
} finally {
dataSets.forEach(ds -> ds.lock().readUnLock());
}
final ArrayDeque<DataSet> lockQueue = new ArrayDeque<>(dataSets);
recursiveLockGuard(lockQueue, () -> getAxes().forEach(chartAxis -> {
final List<DataSet> dataSetForAxis = getDataSetForAxis(chartAxis);
updateNumericAxis(chartAxis, dataSetForAxis);
// chartAxis.requestAxisLayout()
}));
}

// unlock datasets again
protected void recursiveLockGuard(final Deque<DataSet> queue, final Runnable runnable) { // NOPMD
if (queue.isEmpty()) {
runnable.run();
} else {
queue.pop().lock().readLockGuard(() -> recursiveLockGuard(queue, runnable));
}
}

/**
Expand Down Expand Up @@ -455,12 +443,6 @@ protected void redrawCanvas() {
}
}

@Override
protected void rendererChanged(final ListChangeListener.Change<? extends Renderer> change) {
// reset change to allow derived classes to add additional listeners to renderer changes
super.rendererChanged(change);
}

protected static void updateNumericAxis(final Axis axis, final List<DataSet> dataSets) {
if (dataSets == null || dataSets.isEmpty()) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.StampedLock;
import java.util.function.Supplier;

Expand All @@ -18,7 +19,7 @@
* [..] some other code [..]
* lock.writeUnLock(); // restores isAutoNotification state
* </pre>
*
* <p>
* However, the recommended usage is using the lock guard primitives, e.g.
*
* <pre>
Expand All @@ -27,7 +28,7 @@
* return retVal; // N.B. optional return - here: assumes Objects or boxed primitives
* });
* </pre>
*
* <p>
* Alternatively the best performing option for frequent simple reads without major data processing
*
* <pre>
Expand All @@ -36,24 +37,25 @@
* return retVal; // N.B. optional return - here: assumes Objects or boxed primitives
* });
* </pre>
*
* <p>
* The latter assumes infrequent writes (e.g. a single writer thread) and frequent unobstructed reads (ie. many reader
* threads). The lock internally acquires the data w/o explicitly locking, checks afterwards if the data has potentially
* changed a write-lock acquiring thread, and as a automatic fall-back uses the guaranteed (but more expensive) read
* lock to assure that the read data structure is consistent.
*
* @author rstein
* @param <D> generics reference, usually to <code>&lt;? extends DataSet&gt;</code>
* @author rstein
*/
@SuppressWarnings({ "PMD.DoNotUseThreads", "PMD.CommentSize", "PMD.TooManyMethods" }) // Runnable used as functional interface
@SuppressWarnings({ "PMD.DoNotUseThreads", "PMD.CommentSize", "PMD.TooManyMethods" })
// Runnable used as functional interface
public class DefaultDataSetLock<D extends DataSet> implements DataSetLock<D> {
private static final long serialVersionUID = 1L;
private final transient StampedLock stampedLock = new StampedLock();
private transient long lastReadStamp;
private transient long lastWriteStamp;
private transient Thread writeLockedByThread; // NOPMD
private final transient AtomicInteger readerCount = new AtomicInteger(0);
private final transient AtomicInteger writerCount = new AtomicInteger(0);
private final AtomicLong lastReadStamp = new AtomicLong(-1L);
Copy link
Collaborator

@ennerf ennerf Feb 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should these fields still be transient?

private final AtomicLong lastWriteStamp = new AtomicLong(-1L);
private final AtomicLong writerLockedByThreadId = new AtomicLong(-1L);
private final AtomicInteger readerCount = new AtomicInteger(0);
private final AtomicInteger writerCount = new AtomicInteger(0);
private final transient AtomicBoolean autoNotifyState = new AtomicBoolean(true);
private final transient D dataSet;

Expand All @@ -73,60 +75,30 @@ public DefaultDataSetLock(final D dataSet) {
* @return corresponding data set
* @deprecated do not use (yet)
*/
@Deprecated
@Deprecated(since = "still under test")
public D downGradeWriteLock() {
if (!stampedLock.isWriteLocked()) {
throw new IllegalStateException("cannot downconvert lock - lock is not write locked");
throw new IllegalStateException("cannot down-convert lock - lock is not write locked");
}
if (getWriterCount() > 1) {
throw new IllegalStateException("cannot downconvert lock - holding n writelocks = " + getWriterCount());
throw new IllegalStateException("cannot down-convert lock - holding n write locks = " + getWriterCount());
}
final long result = stampedLock.tryConvertToReadLock(lastWriteStamp);
final long result = stampedLock.tryConvertToReadLock(lastWriteStamp.get());
if (result == 0L) { // NOPMD to be expected return value from 'tryConvertToReadLock'
throw new IllegalStateException("cannot downconvert lock - tryConvertToReadLock return '0'");
throw new IllegalStateException("cannot down-convert lock - tryConvertToReadLock return '0'");
}
this.readerCount.getAndIncrement();
this.writerCount.getAndDecrement();
if ((lastReadStamp == 0) && stampedLock.isReadLocked() && (getReaderCount() > 1)) {
stampedLock.unlockRead(lastReadStamp);
readerCount.incrementAndGet();
writerCount.decrementAndGet();
if ((lastReadStamp.get() == 0) && stampedLock.isReadLocked() && (readerCount.get() > 1)) {
stampedLock.unlockRead(lastReadStamp.get());
}
lastReadStamp = result;
lastReadStamp.set(result);

return dataSet;
}

/**
* @return last reader stamp
* @see java.util.concurrent.locks.StampedLock
*/
public long getLastReadStamp() {
return lastReadStamp;
}

/**
* @return the last stored auto-notification state
*/
public boolean getLastStoredAutoNotificationState() { // NOPMD
return autoNotifyState.get();
}

/**
* @return last writer stamp
* @see java.util.concurrent.locks.StampedLock
*/
public long getLastWriteStamp() {
return lastWriteStamp;
}

/**
* @return the internal StampedLock object
*/
public StampedLock getLockObject() {
return stampedLock;
}

/**
* @return number of readers presently locked on this data set
* @return number of readers presently locked on this data set - this counts only (deprecated) readers using read(Un)Lock()
*/
public int getReaderCount() {
return readerCount.get();
Expand All @@ -141,9 +113,16 @@ public int getWriterCount() {

@Override
public D readLock() {
if (readerCount.getAndIncrement() == 0) {
lastReadStamp = stampedLock.readLock();
if (lastReadStamp.get() == -1 && readerCount.get() == 0) {
// first reader needs to acquire a lock to guard against writes
final long stamp = stampedLock.readLock();
if (lastReadStamp.compareAndExchange(-1, stamp) != -1) {
// meanwhile already locked by another thread
stampedLock.unlockRead(stamp);
}
}
// other readers just increment the reader lock
readerCount.getAndIncrement();

return dataSet;
}
Expand All @@ -161,8 +140,8 @@ public D readLockGuard(final Runnable reading) {

@Override
public <R> R readLockGuard(final Supplier<R> reading) {
readLock();
R result;
readLock();
try {
result = reading.get();
} finally {
Expand All @@ -189,9 +168,9 @@ public D readLockGuardOptimistic(final Runnable reading) { // NOPMD -- runnable

@Override
public <R> R readLockGuardOptimistic(final Supplier<R> reading) {
R result = reading.get();
// try optimistic read
final long stamp = stampedLock.tryOptimisticRead();
R result = reading.get();
if (stampedLock.validate(stamp)) {
return result;
}
Expand All @@ -207,28 +186,38 @@ public <R> R readLockGuardOptimistic(final Supplier<R> reading) {

@Override
public D readUnLock() {
if (readerCount.decrementAndGet() == 0) {
stampedLock.unlockRead(lastReadStamp);
lastReadStamp = 0L;
} else if (readerCount.get() < 0) {
throw new IllegalStateException("read lock alread unlocked");
if (readerCount.get() == 1 && lastReadStamp.get() != -1) {
final long lastReadStampLocal = lastReadStamp.get();
//noinspection StatementWithEmptyBody
if (lastReadStamp.compareAndExchange(lastReadStampLocal, -1L) != lastReadStampLocal) { // NOPMD NOSONAR - for better logic readability (humans)
// already unlocked by another thread
} else {
// last reader needs to release the lock that guards against writes
stampedLock.unlockRead(lastReadStampLocal);
}
}
if (readerCount.decrementAndGet() < 0) {
throw new IllegalStateException("read lock/unlock mismatch - already unlocked");
}

return dataSet;
}

@Override
public D writeLock() {
final Thread callingThread = Thread.currentThread();
while (unequalToLockHoldingThread(callingThread)) {
lastWriteStamp = stampedLock.writeLock();
synchronized (stampedLock) {
// copy threadID
writeLockedByThread = callingThread;
// store present auto-notify state
autoNotifyState.set(dataSet.autoNotification().getAndSet(false));
}
final long callingThreadId = Thread.currentThread().getId();
if (writerLockedByThreadId.get() != callingThreadId) {
// new/not matching existing thread holding lock - need to acquire new lock
long stamp;
do {
//stamp = stampedLock.tryWriteLock()
stamp = stampedLock.writeLock();
} while (stamp == 0);
// acquired lock
writerLockedByThreadId.set(callingThreadId);
lastWriteStamp.set(stamp);
autoNotifyState.set(dataSet.autoNotification().getAndSet(false));
}
// we acquired a new lock or are already owner of a previously acquired lock
writerCount.incrementAndGet();
return dataSet;
}
Expand All @@ -237,7 +226,6 @@ public D writeLock() {
public D writeLockGuard(final Runnable writing) { // NOPMD -- runnable not used in a thread context
writeLock();
final boolean oldAutoNotificationState = dataSet.autoNotification().getAndSet(false);

try {
writing.run();
} finally {
Expand Down Expand Up @@ -265,23 +253,16 @@ public <R> R writeLockGuard(final Supplier<R> writing) {
@Override
public D writeUnLock() {
if (writerCount.decrementAndGet() == 0) {
synchronized (stampedLock) {
final long temp = lastWriteStamp;
lastWriteStamp = 0;
// restore present auto-notify state
dataSet.autoNotification().set(autoNotifyState.get());
writeLockedByThread = null; // NOPMD
stampedLock.unlockWrite(temp);
final long callingThreadId = Thread.currentThread().getId();
if (writerLockedByThreadId.get() != callingThreadId) {
throw new IllegalStateException("unlock attempt by tid = " + callingThreadId + " (" + Thread.currentThread() + ") - but locked by " + writerLockedByThreadId.get());
}
} else if (writerCount.get() < 0) {
throw new IllegalStateException("write lock alread unlocked");
}
return dataSet;
}

protected boolean unequalToLockHoldingThread(final Thread thread1) {
synchronized (stampedLock) {
return thread1 != writeLockedByThread; // NOPMD - deliberate use of object identity
// restore present auto-notify state
dataSet.autoNotification().set(autoNotifyState.get());
writerLockedByThreadId.set(-1L);
stampedLock.unlockWrite(lastWriteStamp.getAndSet(-1L));
}
return dataSet;
}
}
Loading