Skip to content

Commit

Permalink
P2PDataStorage and FileManager improvements (#3690)
Browse files Browse the repository at this point in the history
* [PR COMMENTS] Make maxSequenceNumberBeforePurge final

Instead of using a subclass that overwrites a value, utilize Guice
to inject the real value of 10000 in the app and let the tests overwrite
it with their own.

* [TESTS] Clean up 'Analyze Code' warnings

Remove unused imports and clean up some access modifiers now that
the final test structure is complete

* [REFACTOR] HashMapListener::onAdded/onRemoved

Previously, this interface was called each time an item was changed. This
required listeners to understand performance implications of multiple
adds or removes in a short time span.

Instead, give each listener the ability to process a list of added or
removed entrys which can help them avoid performance issues.

This patch is just a refactor. Each listener is called once for each
ProtectedStorageEntry. Future patches will change this.

* [REFACTOR] removeFromMapAndDataStore can operate on Collections

Minor performance overhead for constructing MapEntry and Collections
of one element, but keeps the code cleaner and all removes can still
use the same logic to remove from map, delete from data store, signal
listeners, etc.

The MapEntry type is used instead of Pair since it will require less
operations when this is eventually used in the removeExpiredEntries path.

* Change removeFromMapAndDataStore to signal listeners at the end in a batch

All current users still call this one-at-a-time. But, it gives the ability
for the expire code path to remove in a batch.

* Update removeExpiredEntries to remove all items in a batch

This will cause HashMapChangedListeners to receive just one onRemoved()
call for the expire work instead of multiple onRemoved() calls for each
item.

This required a bit of updating for the remove validation in tests so
that it correctly compares onRemoved with multiple items.

* ProposalService::onProtectedDataRemoved signals listeners once on batch removes

#3143 identified an issue that tempProposals listeners were being
signaled once for each item that was removed during the P2PDataStore
operation that expired old TempProposal objects. Some of the listeners
are very expensive (ProposalListPresentation::updateLists()) which results
in large UI performance issues.

Now that the infrastructure is in place to receive updates from the
P2PDataStore in a batch, the ProposalService can apply all of the removes
received from the P2PDataStore at once. This results in only 1 onChanged()
callback for each listener.

The end result is that updateLists() is only called once and the performance
problems are reduced.

This removes the need for #3148 and those interfaces will be removed in
the next patch.

* Remove HashmapChangedListener::onBatch operations

Now that the only user of this interface has been removed, go ahead
and delete it. This is a partial revert of
f5d75c4 that includes the code that was
added into ProposalService that subscribed to the P2PDataStore.

* [TESTS] Regression test for #3629

Write a test that shows the incorrect behavior for #3629, the hashmap
is rebuilt from disk using the 20-byte key instead of the 32-byte key.

* [BUGFIX] Reconstruct HashMap using 32-byte key

Addresses the first half of #3629 by ensuring that the reconstructed
HashMap always has the 32-byte key for each payload.

It turns out, the TempProposalStore persists the ProtectedStorageEntrys
on-disk as a List and doesn't persist the key at all. Then, on
reconstruction, it creates the 20-byte key for its internal map.

The fix is to update the TempProposalStore to use the 32-byte key instead.
This means that all writes, reads, and reconstrution of the TempProposalStore
uses the 32-byte key which matches perfectly with the in-memory map
of the P2PDataStorage that expects 32-byte keys.

Important to note that until all seednodes receive this update, nodes
will continue to have both the 20-byte and 32-byte keys in their HashMap.

* [BUGFIX] Use 32-byte key in requestData path

Addresses the second half of #3629 by using the HashMap, not the
protectedDataStore to generate the known keys in the requestData path.

This won't have any bandwidth reduction until all seednodes have the
update and only have the 32-byte key in their HashMap.

fixes #3629

* [DEAD CODE] Remove getProtectedDataStoreMap

The only user has been migrated to getMap(). Delete it so future
development doesn't have the same 20-byte vs 32-byte key issue.

* [TESTS] Allow tests to validate SequenceNumberMap write separately

In order to implement remove-before-add behavior, we need a way to
verify that the SequenceNumberMap was the only item updated.

* Implement remove-before-add message sequence behavior

It is possible to receive a RemoveData or RemoveMailboxData message
before the relevant AddData, but the current code does not handle
it.

This results in internal state updates and signal handler's being called
when an Add is received with a lower sequence number than a previously
seen Remove.

Minor test validation changes to allow tests to specify that only the
SequenceNumberMap should be written during an operation.

* [TESTS] Allow remove() verification to be more flexible

Now that we have introduced remove-before-add, we need a way
to validate that the SequenceNumberMap was written, but nothing
else. Add this feature to the validation path.

* Broadcast remove-before-add messages to P2P network

In order to aid in propagation of remove() messages, broadcast them
in the event the remove is seen before the add.

* [TESTS] Clean up remove verification helpers

Now that there are cases where the SequenceNumberMap and Broadcast
are called, but no other internal state is updated, the existing helper
functions conflate too many decisions. Remove them in favor of explicitly
defining each state change expected.

* [BUGFIX] Fix duplicate sequence number use case (startup)

Fix a bug introduced in d484617 that
did not properly handle a valid use case for duplicate sequence numbers.

For in-memory-only ProtectedStoragePayloads, the client nodes need a way
to reconstruct the Payloads after startup from peer and seed nodes. This
involves sending a ProtectedStorageEntry with a sequence number that
is equal to the last one the client had already seen.

This patch adds tests to confirm the bug and fix as well as the changes
necessary to allow adding of Payloads that were previously seen, but
removed during a restart.

* Clean up AtomicBoolean usage in FileManager

Although the code was correct, it was hard to understand the relationship
between the to-be-written object and the savePending flag.

Trade two dependent atomics for one and comment the code to make it more
clear for the next reader.

* [DEADCODE] Clean up FileManager.java

* [BUGFIX] Shorter delay values not taking precedence

Fix a bug in the FileManager where a saveLater called with a low delay
won't execute until the delay specified by a previous saveLater call.

The trade off here is the execution of a task that returns early vs.
losing the requested delay.

* [REFACTOR] Inline saveNowInternal

Only one caller after deadcode removal.
  • Loading branch information
ripcurlx authored Nov 26, 2019
2 parents d12843a + 685824b commit 66b2306
Show file tree
Hide file tree
Showing 21 changed files with 605 additions and 344 deletions.
59 changes: 24 additions & 35 deletions common/src/main/java/bisq/common/storage/FileManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;

import lombok.extern.slf4j.Slf4j;
Expand All @@ -46,10 +46,9 @@ public class FileManager<T extends PersistableEnvelope> {
private final File dir;
private final File storageFile;
private final ScheduledThreadPoolExecutor executor;
private final AtomicBoolean savePending;
private final long delay;
private final Callable<Void> saveFileTask;
private T persistable;
private final AtomicReference<T> nextWrite;
private final PersistenceProtoResolver persistenceProtoResolver;
private final ReentrantLock writeLock = CycleDetectingLockFactory.newInstance(CycleDetectingLockFactory.Policies.THROW).newReentrantLock("writeLock");

Expand All @@ -61,26 +60,28 @@ public FileManager(File dir, File storageFile, long delay, PersistenceProtoResol
this.dir = dir;
this.storageFile = storageFile;
this.persistenceProtoResolver = persistenceProtoResolver;
this.nextWrite = new AtomicReference<>(null);

executor = Utilities.getScheduledThreadPoolExecutor("FileManager", 1, 10, 5);

// File must only be accessed from the auto-save executor from now on, to avoid simultaneous access.
savePending = new AtomicBoolean();
this.delay = delay;

saveFileTask = () -> {
try {
Thread.currentThread().setName("Save-file-task-" + new Random().nextInt(10000));
// Runs in an auto save thread.
// TODO: this looks like it could cause corrupt data as the savePending is unset before the actual
// save. By moving to after the save there might be some persist operations that are not performed
// and data would be lost. Probably all persist operations should happen sequencially rather than
// skip one when there is already one scheduled
if (!savePending.getAndSet(false)) {
// Some other scheduled request already beat us to it.

// Atomically take the next object to write and set the value to null so concurrent saveFileTask
// won't duplicate work.
T persistable = this.nextWrite.getAndSet(null);

// If null, a concurrent saveFileTask already grabbed the data. Don't duplicate work.
if (persistable == null)
return null;
}
saveNowInternal(persistable);

long now = System.currentTimeMillis();
saveToFile(persistable, dir, storageFile);
log.debug("Save {} completed in {} msec", storageFile, System.currentTimeMillis() - now);
} catch (Throwable e) {
log.error("Error during saveFileTask", e);
}
Expand All @@ -96,26 +97,20 @@ public FileManager(File dir, File storageFile, long delay, PersistenceProtoResol
// API
///////////////////////////////////////////////////////////////////////////////////////////

/**
* Actually write the wallet file to disk, using an atomic rename when possible. Runs on the current thread.
*/
public void saveNow(T persistable) {
saveNowInternal(persistable);
}

/**
* Queues up a save in the background. Useful for not very important wallet changes.
*/
public void saveLater(T persistable) {
void saveLater(T persistable) {
saveLater(persistable, delay);
}

public void saveLater(T persistable, long delayInMilli) {
this.persistable = persistable;

if (savePending.getAndSet(true))
return; // Already pending.
// Atomically set the value of the next write. This allows batching of multiple writes of the same data
// structure if there are multiple calls to saveLater within a given `delayInMillis`.
this.nextWrite.set(persistable);

// Always schedule a write. It is possible that a previous saveLater was called with a larger `delayInMilli`
// and we want the lower delay to execute. The saveFileTask handles concurrent operations.
executor.schedule(saveFileTask, delayInMilli, TimeUnit.MILLISECONDS);
}

Expand All @@ -134,7 +129,7 @@ public synchronized T read(File file) {
}
}

public synchronized void removeFile(String fileName) {
synchronized void removeFile(String fileName) {
File file = new File(dir, fileName);
boolean result = file.delete();
if (!result)
Expand All @@ -155,7 +150,7 @@ public synchronized void removeFile(String fileName) {
/**
* Shut down auto-saving.
*/
void shutDown() {
private void shutDown() {
executor.shutdown();
try {
executor.awaitTermination(5, TimeUnit.SECONDS);
Expand All @@ -175,24 +170,18 @@ public static void removeAndBackupFile(File dbDir, File storageFile, String file
FileUtil.renameFile(storageFile, corruptedFile);
}

public synchronized void removeAndBackupFile(String fileName) throws IOException {
synchronized void removeAndBackupFile(String fileName) throws IOException {
removeAndBackupFile(dir, storageFile, fileName, "backup_of_corrupted_data");
}

public synchronized void backupFile(String fileName, int numMaxBackupFiles) {
synchronized void backupFile(String fileName, int numMaxBackupFiles) {
FileUtil.rollingBackup(dir, fileName, numMaxBackupFiles);
}

///////////////////////////////////////////////////////////////////////////////////////////
// Private
///////////////////////////////////////////////////////////////////////////////////////////

private void saveNowInternal(T persistable) {
long now = System.currentTimeMillis();
saveToFile(persistable, dir, storageFile);
log.debug("Save {} completed in {} msec", storageFile, System.currentTimeMillis() - now);
}

private synchronized void saveToFile(T persistable, File dir, File storageFile) {
File tempFile = null;
FileOutputStream fileOutputStream = null;
Expand Down
32 changes: 19 additions & 13 deletions core/src/main/java/bisq/core/alert/AlertManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@

import java.math.BigInteger;

import java.util.Collection;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -79,22 +81,26 @@ public AlertManager(P2PService p2PService,
if (!ignoreDevMsg) {
p2PService.addHashSetChangedListener(new HashMapChangedListener() {
@Override
public void onAdded(ProtectedStorageEntry data) {
final ProtectedStoragePayload protectedStoragePayload = data.getProtectedStoragePayload();
if (protectedStoragePayload instanceof Alert) {
Alert alert = (Alert) protectedStoragePayload;
if (verifySignature(alert))
alertMessageProperty.set(alert);
}
public void onAdded(Collection<ProtectedStorageEntry> protectedStorageEntries) {
protectedStorageEntries.forEach(protectedStorageEntry -> {
final ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload();
if (protectedStoragePayload instanceof Alert) {
Alert alert = (Alert) protectedStoragePayload;
if (verifySignature(alert))
alertMessageProperty.set(alert);
}
});
}

@Override
public void onRemoved(ProtectedStorageEntry data) {
final ProtectedStoragePayload protectedStoragePayload = data.getProtectedStoragePayload();
if (protectedStoragePayload instanceof Alert) {
if (verifySignature((Alert) protectedStoragePayload))
alertMessageProperty.set(null);
}
public void onRemoved(Collection<ProtectedStorageEntry> protectedStorageEntries) {
protectedStorageEntries.forEach(protectedStorageEntry -> {
final ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload();
if (protectedStoragePayload instanceof Alert) {
if (verifySignature((Alert) protectedStoragePayload))
alertMessageProperty.set(null);
}
});
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,11 @@
import bisq.core.btc.wallet.BsqWalletService;
import bisq.core.dao.DaoSetupService;
import bisq.core.dao.governance.proposal.storage.appendonly.ProposalPayload;
import bisq.core.dao.governance.proposal.storage.temp.TempProposalPayload;
import bisq.core.dao.state.DaoStateListener;
import bisq.core.dao.state.DaoStateService;
import bisq.core.dao.state.model.blockchain.Block;
import bisq.core.dao.state.model.governance.Proposal;

import bisq.network.p2p.storage.HashMapChangedListener;
import bisq.network.p2p.storage.P2PDataStorage;
import bisq.network.p2p.storage.payload.ProtectedStorageEntry;

import bisq.common.UserThread;

import org.bitcoinj.core.TransactionConfidence;
Expand All @@ -55,8 +50,7 @@
* our own proposal that is not critical). Foreign proposals are only shown if confirmed and fully validated.
*/
@Slf4j
public class ProposalListPresentation implements DaoStateListener, HashMapChangedListener,
MyProposalListService.Listener, DaoSetupService {
public class ProposalListPresentation implements DaoStateListener, MyProposalListService.Listener, DaoSetupService {
private final ProposalService proposalService;
private final DaoStateService daoStateService;
private final MyProposalListService myProposalListService;
Expand All @@ -66,7 +60,6 @@ public class ProposalListPresentation implements DaoStateListener, HashMapChange
@Getter
private final FilteredList<Proposal> activeOrMyUnconfirmedProposals = new FilteredList<>(allProposals);
private final ListChangeListener<Proposal> proposalListChangeListener;
private boolean tempProposalsChanged;


///////////////////////////////////////////////////////////////////////////////////////////
Expand All @@ -76,7 +69,6 @@ public class ProposalListPresentation implements DaoStateListener, HashMapChange
@Inject
public ProposalListPresentation(ProposalService proposalService,
DaoStateService daoStateService,
P2PDataStorage p2PDataStorage,
MyProposalListService myProposalListService,
BsqWalletService bsqWalletService,
ProposalValidatorProvider validatorProvider) {
Expand All @@ -87,7 +79,6 @@ public ProposalListPresentation(ProposalService proposalService,
this.validatorProvider = validatorProvider;

daoStateService.addDaoStateListener(this);
p2PDataStorage.addHashMapChangedListener(this);
myProposalListService.addListener(this);

proposalListChangeListener = c -> updateLists();
Expand Down Expand Up @@ -124,44 +115,6 @@ public void onParseBlockCompleteAfterBatchProcessing(Block block) {
updateLists();
}


///////////////////////////////////////////////////////////////////////////////////////////
// HashMapChangedListener
///////////////////////////////////////////////////////////////////////////////////////////

@Override
public void onAdded(ProtectedStorageEntry entry) {
if (entry.getProtectedStoragePayload() instanceof TempProposalPayload) {
tempProposalsChanged = true;
}
}

@Override
public void onRemoved(ProtectedStorageEntry entry) {
if (entry.getProtectedStoragePayload() instanceof TempProposalPayload) {
tempProposalsChanged = true;
}
}

@Override
public void onBatchRemoveExpiredDataStarted() {
// We temporary remove the listener when batch processing starts to avoid that we rebuild our lists at each
// remove call. After batch processing at onBatchRemoveExpiredDataCompleted we add again our listener and call
// the updateLists method.
proposalService.getTempProposals().removeListener(proposalListChangeListener);
}

@Override
public void onBatchRemoveExpiredDataCompleted() {
proposalService.getTempProposals().addListener(proposalListChangeListener);
// We only call updateLists if tempProposals have changed. updateLists() is an expensive call and takes 200 ms.
if (tempProposalsChanged) {
updateLists();
tempProposalsChanged = false;
}
}


///////////////////////////////////////////////////////////////////////////////////////////
// MyProposalListService.Listener
///////////////////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import javafx.collections.FXCollections;
import javafx.collections.ObservableList;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -133,13 +135,15 @@ public void start() {
///////////////////////////////////////////////////////////////////////////////////////////

@Override
public void onAdded(ProtectedStorageEntry entry) {
onProtectedDataAdded(entry, true);
public void onAdded(Collection<ProtectedStorageEntry> protectedStorageEntries) {
protectedStorageEntries.forEach(protectedStorageEntry -> {
onProtectedDataAdded(protectedStorageEntry, true);
});
}

@Override
public void onRemoved(ProtectedStorageEntry entry) {
onProtectedDataRemoved(entry);
public void onRemoved(Collection<ProtectedStorageEntry> protectedStorageEntries) {
onProtectedDataRemoved(protectedStorageEntries);
}


Expand Down Expand Up @@ -266,30 +270,39 @@ private void onProtectedDataAdded(ProtectedStorageEntry entry, boolean fromBroad
}
}

private void onProtectedDataRemoved(ProtectedStorageEntry entry) {
ProtectedStoragePayload protectedStoragePayload = entry.getProtectedStoragePayload();
if (protectedStoragePayload instanceof TempProposalPayload) {
Proposal proposal = ((TempProposalPayload) protectedStoragePayload).getProposal();
// We allow removal only if we are in the proposal phase.
boolean inPhase = periodService.isInPhase(daoStateService.getChainHeight(), DaoPhase.Phase.PROPOSAL);
boolean txInPastCycle = periodService.isTxInPastCycle(proposal.getTxId(), daoStateService.getChainHeight());
Optional<Tx> tx = daoStateService.getTx(proposal.getTxId());
boolean unconfirmedOrNonBsqTx = !tx.isPresent();
// if the tx is unconfirmed we need to be in the PROPOSAL phase, otherwise the tx must be confirmed.
if (inPhase || txInPastCycle || unconfirmedOrNonBsqTx) {
if (tempProposals.contains(proposal)) {
tempProposals.remove(proposal);
log.debug("We received a remove request for a TempProposalPayload and have removed the proposal " +
"from our list. proposal creation date={}, proposalTxId={}, inPhase={}, " +
"txInPastCycle={}, unconfirmedOrNonBsqTx={}",
proposal.getCreationDateAsDate(), proposal.getTxId(), inPhase, txInPastCycle, unconfirmedOrNonBsqTx);
private void onProtectedDataRemoved(Collection<ProtectedStorageEntry> protectedStorageEntries) {

// The listeners of tmpProposals can do large amounts of work that cause performance issues. Apply all of the
// updates at once using retainAll which will cause all listeners to be updated only once.
ArrayList<Proposal> tempProposalsWithUpdates = new ArrayList<>(tempProposals);

protectedStorageEntries.forEach(protectedStorageEntry -> {
ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload();
if (protectedStoragePayload instanceof TempProposalPayload) {
Proposal proposal = ((TempProposalPayload) protectedStoragePayload).getProposal();
// We allow removal only if we are in the proposal phase.
boolean inPhase = periodService.isInPhase(daoStateService.getChainHeight(), DaoPhase.Phase.PROPOSAL);
boolean txInPastCycle = periodService.isTxInPastCycle(proposal.getTxId(), daoStateService.getChainHeight());
Optional<Tx> tx = daoStateService.getTx(proposal.getTxId());
boolean unconfirmedOrNonBsqTx = !tx.isPresent();
// if the tx is unconfirmed we need to be in the PROPOSAL phase, otherwise the tx must be confirmed.
if (inPhase || txInPastCycle || unconfirmedOrNonBsqTx) {
if (tempProposalsWithUpdates.contains(proposal)) {
tempProposalsWithUpdates.remove(proposal);
log.debug("We received a remove request for a TempProposalPayload and have removed the proposal " +
"from our list. proposal creation date={}, proposalTxId={}, inPhase={}, " +
"txInPastCycle={}, unconfirmedOrNonBsqTx={}",
proposal.getCreationDateAsDate(), proposal.getTxId(), inPhase, txInPastCycle, unconfirmedOrNonBsqTx);
}
} else {
log.warn("We received a remove request outside the PROPOSAL phase. " +
"Proposal creation date={}, proposal.txId={}, current blockHeight={}",
proposal.getCreationDateAsDate(), proposal.getTxId(), daoStateService.getChainHeight());
}
} else {
log.warn("We received a remove request outside the PROPOSAL phase. " +
"Proposal creation date={}, proposal.txId={}, current blockHeight={}",
proposal.getCreationDateAsDate(), proposal.getTxId(), daoStateService.getChainHeight());
}
}
});

tempProposals.retainAll(tempProposalsWithUpdates);
}

private void onAppendOnlyDataAdded(PersistableNetworkPayload persistableNetworkPayload, boolean fromBroadcastMessage) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class TempProposalStore implements PersistableEnvelope {
///////////////////////////////////////////////////////////////////////////////////////////

private TempProposalStore(List<ProtectedStorageEntry> list) {
list.forEach(entry -> map.put(P2PDataStorage.getCompactHashAsByteArray(entry.getProtectedStoragePayload()), entry));
list.forEach(entry -> map.put(P2PDataStorage.get32ByteHashAsByteArray(entry.getProtectedStoragePayload()), entry));
}

public Message toProtoMessage() {
Expand Down
Loading

0 comments on commit 66b2306

Please sign in to comment.