Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(7/7) FileManager.java Cleanup and Audit #3690

Merged
merged 22 commits into from
Nov 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
617585d
[PR COMMENTS] Make maxSequenceNumberBeforePurge final
julianknutsen Nov 15, 2019
3bd67ba
[TESTS] Clean up 'Analyze Code' warnings
julianknutsen Nov 18, 2019
b281566
[REFACTOR] HashMapListener::onAdded/onRemoved
julianknutsen Nov 16, 2019
4f08588
[REFACTOR] removeFromMapAndDataStore can operate on Collections
julianknutsen Nov 16, 2019
489b25a
Change removeFromMapAndDataStore to signal listeners at the end in a …
julianknutsen Nov 16, 2019
eae641e
Update removeExpiredEntries to remove all items in a batch
julianknutsen Nov 16, 2019
a50e59f
ProposalService::onProtectedDataRemoved signals listeners once on bat…
julianknutsen Nov 16, 2019
a8139f3
Remove HashmapChangedListener::onBatch operations
julianknutsen Nov 16, 2019
849155a
[TESTS] Regression test for #3629
julianknutsen Nov 19, 2019
e212240
[BUGFIX] Reconstruct HashMap using 32-byte key
julianknutsen Nov 19, 2019
455f7d2
[BUGFIX] Use 32-byte key in requestData path
julianknutsen Nov 19, 2019
793e84d
[DEAD CODE] Remove getProtectedDataStoreMap
julianknutsen Nov 19, 2019
526aee5
[TESTS] Allow tests to validate SequenceNumberMap write separately
julianknutsen Nov 20, 2019
372c26d
Implement remove-before-add message sequence behavior
julianknutsen Nov 20, 2019
931c1f4
[TESTS] Allow remove() verification to be more flexible
julianknutsen Nov 22, 2019
0472ffc
Broadcast remove-before-add messages to P2P network
julianknutsen Nov 22, 2019
6e2ea6e
[TESTS] Clean up remove verification helpers
julianknutsen Nov 22, 2019
3d571c4
[BUGFIX] Fix duplicate sequence number use case (startup)
julianknutsen Nov 22, 2019
2208003
Clean up AtomicBoolean usage in FileManager
julianknutsen Nov 25, 2019
1895802
[DEADCODE] Clean up FileManager.java
julianknutsen Nov 25, 2019
d4d2f26
[BUGFIX] Shorter delay values not taking precedence
julianknutsen Nov 25, 2019
685824b
[REFACTOR] Inline saveNowInternal
julianknutsen Nov 25, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 24 additions & 35 deletions common/src/main/java/bisq/common/storage/FileManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;

import lombok.extern.slf4j.Slf4j;
Expand All @@ -46,10 +46,9 @@ public class FileManager<T extends PersistableEnvelope> {
private final File dir;
private final File storageFile;
private final ScheduledThreadPoolExecutor executor;
private final AtomicBoolean savePending;
private final long delay;
private final Callable<Void> saveFileTask;
private T persistable;
private final AtomicReference<T> nextWrite;
private final PersistenceProtoResolver persistenceProtoResolver;
private final ReentrantLock writeLock = CycleDetectingLockFactory.newInstance(CycleDetectingLockFactory.Policies.THROW).newReentrantLock("writeLock");

Expand All @@ -61,26 +60,28 @@ public FileManager(File dir, File storageFile, long delay, PersistenceProtoResol
this.dir = dir;
this.storageFile = storageFile;
this.persistenceProtoResolver = persistenceProtoResolver;
this.nextWrite = new AtomicReference<>(null);

executor = Utilities.getScheduledThreadPoolExecutor("FileManager", 1, 10, 5);

// File must only be accessed from the auto-save executor from now on, to avoid simultaneous access.
savePending = new AtomicBoolean();
this.delay = delay;

saveFileTask = () -> {
try {
Thread.currentThread().setName("Save-file-task-" + new Random().nextInt(10000));
// Runs in an auto save thread.
// TODO: this looks like it could cause corrupt data as the savePending is unset before the actual
// save. By moving to after the save there might be some persist operations that are not performed
// and data would be lost. Probably all persist operations should happen sequencially rather than
// skip one when there is already one scheduled
if (!savePending.getAndSet(false)) {
// Some other scheduled request already beat us to it.

// Atomically take the next object to write and set the value to null so concurrent saveFileTask
// won't duplicate work.
T persistable = this.nextWrite.getAndSet(null);

// If null, a concurrent saveFileTask already grabbed the data. Don't duplicate work.
if (persistable == null)
return null;
}
saveNowInternal(persistable);

long now = System.currentTimeMillis();
saveToFile(persistable, dir, storageFile);
log.debug("Save {} completed in {} msec", storageFile, System.currentTimeMillis() - now);
} catch (Throwable e) {
log.error("Error during saveFileTask", e);
}
Expand All @@ -96,26 +97,20 @@ public FileManager(File dir, File storageFile, long delay, PersistenceProtoResol
// API
///////////////////////////////////////////////////////////////////////////////////////////

/**
* Actually write the wallet file to disk, using an atomic rename when possible. Runs on the current thread.
*/
public void saveNow(T persistable) {
saveNowInternal(persistable);
}

/**
* Queues up a save in the background. Useful for not very important wallet changes.
*/
public void saveLater(T persistable) {
void saveLater(T persistable) {
saveLater(persistable, delay);
}

public void saveLater(T persistable, long delayInMilli) {
this.persistable = persistable;

if (savePending.getAndSet(true))
return; // Already pending.
// Atomically set the value of the next write. This allows batching of multiple writes of the same data
// structure if there are multiple calls to saveLater within a given `delayInMillis`.
this.nextWrite.set(persistable);

// Always schedule a write. It is possible that a previous saveLater was called with a larger `delayInMilli`
// and we want the lower delay to execute. The saveFileTask handles concurrent operations.
executor.schedule(saveFileTask, delayInMilli, TimeUnit.MILLISECONDS);
}

Expand All @@ -134,7 +129,7 @@ public synchronized T read(File file) {
}
}

public synchronized void removeFile(String fileName) {
synchronized void removeFile(String fileName) {
File file = new File(dir, fileName);
boolean result = file.delete();
if (!result)
Expand All @@ -155,7 +150,7 @@ public synchronized void removeFile(String fileName) {
/**
* Shut down auto-saving.
*/
void shutDown() {
private void shutDown() {
executor.shutdown();
try {
executor.awaitTermination(5, TimeUnit.SECONDS);
Expand All @@ -175,24 +170,18 @@ public static void removeAndBackupFile(File dbDir, File storageFile, String file
FileUtil.renameFile(storageFile, corruptedFile);
}

public synchronized void removeAndBackupFile(String fileName) throws IOException {
synchronized void removeAndBackupFile(String fileName) throws IOException {
removeAndBackupFile(dir, storageFile, fileName, "backup_of_corrupted_data");
}

public synchronized void backupFile(String fileName, int numMaxBackupFiles) {
synchronized void backupFile(String fileName, int numMaxBackupFiles) {
FileUtil.rollingBackup(dir, fileName, numMaxBackupFiles);
}

///////////////////////////////////////////////////////////////////////////////////////////
// Private
///////////////////////////////////////////////////////////////////////////////////////////

private void saveNowInternal(T persistable) {
long now = System.currentTimeMillis();
saveToFile(persistable, dir, storageFile);
log.debug("Save {} completed in {} msec", storageFile, System.currentTimeMillis() - now);
}

private synchronized void saveToFile(T persistable, File dir, File storageFile) {
File tempFile = null;
FileOutputStream fileOutputStream = null;
Expand Down
32 changes: 19 additions & 13 deletions core/src/main/java/bisq/core/alert/AlertManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@

import java.math.BigInteger;

import java.util.Collection;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -79,22 +81,26 @@ public AlertManager(P2PService p2PService,
if (!ignoreDevMsg) {
p2PService.addHashSetChangedListener(new HashMapChangedListener() {
@Override
public void onAdded(ProtectedStorageEntry data) {
final ProtectedStoragePayload protectedStoragePayload = data.getProtectedStoragePayload();
if (protectedStoragePayload instanceof Alert) {
Alert alert = (Alert) protectedStoragePayload;
if (verifySignature(alert))
alertMessageProperty.set(alert);
}
public void onAdded(Collection<ProtectedStorageEntry> protectedStorageEntries) {
protectedStorageEntries.forEach(protectedStorageEntry -> {
final ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload();
if (protectedStoragePayload instanceof Alert) {
Alert alert = (Alert) protectedStoragePayload;
if (verifySignature(alert))
alertMessageProperty.set(alert);
}
});
}

@Override
public void onRemoved(ProtectedStorageEntry data) {
final ProtectedStoragePayload protectedStoragePayload = data.getProtectedStoragePayload();
if (protectedStoragePayload instanceof Alert) {
if (verifySignature((Alert) protectedStoragePayload))
alertMessageProperty.set(null);
}
public void onRemoved(Collection<ProtectedStorageEntry> protectedStorageEntries) {
protectedStorageEntries.forEach(protectedStorageEntry -> {
final ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload();
if (protectedStoragePayload instanceof Alert) {
if (verifySignature((Alert) protectedStoragePayload))
alertMessageProperty.set(null);
}
});
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,11 @@
import bisq.core.btc.wallet.BsqWalletService;
import bisq.core.dao.DaoSetupService;
import bisq.core.dao.governance.proposal.storage.appendonly.ProposalPayload;
import bisq.core.dao.governance.proposal.storage.temp.TempProposalPayload;
import bisq.core.dao.state.DaoStateListener;
import bisq.core.dao.state.DaoStateService;
import bisq.core.dao.state.model.blockchain.Block;
import bisq.core.dao.state.model.governance.Proposal;

import bisq.network.p2p.storage.HashMapChangedListener;
import bisq.network.p2p.storage.P2PDataStorage;
import bisq.network.p2p.storage.payload.ProtectedStorageEntry;

import bisq.common.UserThread;

import org.bitcoinj.core.TransactionConfidence;
Expand All @@ -55,8 +50,7 @@
* our own proposal that is not critical). Foreign proposals are only shown if confirmed and fully validated.
*/
@Slf4j
public class ProposalListPresentation implements DaoStateListener, HashMapChangedListener,
MyProposalListService.Listener, DaoSetupService {
public class ProposalListPresentation implements DaoStateListener, MyProposalListService.Listener, DaoSetupService {
private final ProposalService proposalService;
private final DaoStateService daoStateService;
private final MyProposalListService myProposalListService;
Expand All @@ -66,7 +60,6 @@ public class ProposalListPresentation implements DaoStateListener, HashMapChange
@Getter
private final FilteredList<Proposal> activeOrMyUnconfirmedProposals = new FilteredList<>(allProposals);
private final ListChangeListener<Proposal> proposalListChangeListener;
private boolean tempProposalsChanged;


///////////////////////////////////////////////////////////////////////////////////////////
Expand All @@ -76,7 +69,6 @@ public class ProposalListPresentation implements DaoStateListener, HashMapChange
@Inject
public ProposalListPresentation(ProposalService proposalService,
DaoStateService daoStateService,
P2PDataStorage p2PDataStorage,
MyProposalListService myProposalListService,
BsqWalletService bsqWalletService,
ProposalValidatorProvider validatorProvider) {
Expand All @@ -87,7 +79,6 @@ public ProposalListPresentation(ProposalService proposalService,
this.validatorProvider = validatorProvider;

daoStateService.addDaoStateListener(this);
p2PDataStorage.addHashMapChangedListener(this);
myProposalListService.addListener(this);

proposalListChangeListener = c -> updateLists();
Expand Down Expand Up @@ -124,44 +115,6 @@ public void onParseBlockCompleteAfterBatchProcessing(Block block) {
updateLists();
}


///////////////////////////////////////////////////////////////////////////////////////////
// HashMapChangedListener
///////////////////////////////////////////////////////////////////////////////////////////

@Override
public void onAdded(ProtectedStorageEntry entry) {
if (entry.getProtectedStoragePayload() instanceof TempProposalPayload) {
tempProposalsChanged = true;
}
}

@Override
public void onRemoved(ProtectedStorageEntry entry) {
if (entry.getProtectedStoragePayload() instanceof TempProposalPayload) {
tempProposalsChanged = true;
}
}

@Override
public void onBatchRemoveExpiredDataStarted() {
// We temporary remove the listener when batch processing starts to avoid that we rebuild our lists at each
// remove call. After batch processing at onBatchRemoveExpiredDataCompleted we add again our listener and call
// the updateLists method.
proposalService.getTempProposals().removeListener(proposalListChangeListener);
}

@Override
public void onBatchRemoveExpiredDataCompleted() {
proposalService.getTempProposals().addListener(proposalListChangeListener);
// We only call updateLists if tempProposals have changed. updateLists() is an expensive call and takes 200 ms.
if (tempProposalsChanged) {
updateLists();
tempProposalsChanged = false;
}
}


///////////////////////////////////////////////////////////////////////////////////////////
// MyProposalListService.Listener
///////////////////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import javafx.collections.FXCollections;
import javafx.collections.ObservableList;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -133,13 +135,15 @@ public void start() {
///////////////////////////////////////////////////////////////////////////////////////////

@Override
public void onAdded(ProtectedStorageEntry entry) {
onProtectedDataAdded(entry, true);
public void onAdded(Collection<ProtectedStorageEntry> protectedStorageEntries) {
protectedStorageEntries.forEach(protectedStorageEntry -> {
onProtectedDataAdded(protectedStorageEntry, true);
});
}

@Override
public void onRemoved(ProtectedStorageEntry entry) {
onProtectedDataRemoved(entry);
public void onRemoved(Collection<ProtectedStorageEntry> protectedStorageEntries) {
onProtectedDataRemoved(protectedStorageEntries);
}


Expand Down Expand Up @@ -266,30 +270,39 @@ private void onProtectedDataAdded(ProtectedStorageEntry entry, boolean fromBroad
}
}

private void onProtectedDataRemoved(ProtectedStorageEntry entry) {
ProtectedStoragePayload protectedStoragePayload = entry.getProtectedStoragePayload();
if (protectedStoragePayload instanceof TempProposalPayload) {
Proposal proposal = ((TempProposalPayload) protectedStoragePayload).getProposal();
// We allow removal only if we are in the proposal phase.
boolean inPhase = periodService.isInPhase(daoStateService.getChainHeight(), DaoPhase.Phase.PROPOSAL);
boolean txInPastCycle = periodService.isTxInPastCycle(proposal.getTxId(), daoStateService.getChainHeight());
Optional<Tx> tx = daoStateService.getTx(proposal.getTxId());
boolean unconfirmedOrNonBsqTx = !tx.isPresent();
// if the tx is unconfirmed we need to be in the PROPOSAL phase, otherwise the tx must be confirmed.
if (inPhase || txInPastCycle || unconfirmedOrNonBsqTx) {
if (tempProposals.contains(proposal)) {
tempProposals.remove(proposal);
log.debug("We received a remove request for a TempProposalPayload and have removed the proposal " +
"from our list. proposal creation date={}, proposalTxId={}, inPhase={}, " +
"txInPastCycle={}, unconfirmedOrNonBsqTx={}",
proposal.getCreationDateAsDate(), proposal.getTxId(), inPhase, txInPastCycle, unconfirmedOrNonBsqTx);
private void onProtectedDataRemoved(Collection<ProtectedStorageEntry> protectedStorageEntries) {

// The listeners of tmpProposals can do large amounts of work that cause performance issues. Apply all of the
// updates at once using retainAll which will cause all listeners to be updated only once.
ArrayList<Proposal> tempProposalsWithUpdates = new ArrayList<>(tempProposals);

protectedStorageEntries.forEach(protectedStorageEntry -> {
ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload();
if (protectedStoragePayload instanceof TempProposalPayload) {
Proposal proposal = ((TempProposalPayload) protectedStoragePayload).getProposal();
// We allow removal only if we are in the proposal phase.
boolean inPhase = periodService.isInPhase(daoStateService.getChainHeight(), DaoPhase.Phase.PROPOSAL);
boolean txInPastCycle = periodService.isTxInPastCycle(proposal.getTxId(), daoStateService.getChainHeight());
Optional<Tx> tx = daoStateService.getTx(proposal.getTxId());
boolean unconfirmedOrNonBsqTx = !tx.isPresent();
// if the tx is unconfirmed we need to be in the PROPOSAL phase, otherwise the tx must be confirmed.
if (inPhase || txInPastCycle || unconfirmedOrNonBsqTx) {
if (tempProposalsWithUpdates.contains(proposal)) {
tempProposalsWithUpdates.remove(proposal);
log.debug("We received a remove request for a TempProposalPayload and have removed the proposal " +
"from our list. proposal creation date={}, proposalTxId={}, inPhase={}, " +
"txInPastCycle={}, unconfirmedOrNonBsqTx={}",
proposal.getCreationDateAsDate(), proposal.getTxId(), inPhase, txInPastCycle, unconfirmedOrNonBsqTx);
}
} else {
log.warn("We received a remove request outside the PROPOSAL phase. " +
"Proposal creation date={}, proposal.txId={}, current blockHeight={}",
proposal.getCreationDateAsDate(), proposal.getTxId(), daoStateService.getChainHeight());
}
} else {
log.warn("We received a remove request outside the PROPOSAL phase. " +
"Proposal creation date={}, proposal.txId={}, current blockHeight={}",
proposal.getCreationDateAsDate(), proposal.getTxId(), daoStateService.getChainHeight());
}
}
});

tempProposals.retainAll(tempProposalsWithUpdates);
}

private void onAppendOnlyDataAdded(PersistableNetworkPayload persistableNetworkPayload, boolean fromBroadcastMessage) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class TempProposalStore implements PersistableEnvelope {
///////////////////////////////////////////////////////////////////////////////////////////

private TempProposalStore(List<ProtectedStorageEntry> list) {
list.forEach(entry -> map.put(P2PDataStorage.getCompactHashAsByteArray(entry.getProtectedStoragePayload()), entry));
list.forEach(entry -> map.put(P2PDataStorage.get32ByteHashAsByteArray(entry.getProtectedStoragePayload()), entry));
}

public Message toProtoMessage() {
Expand Down
Loading