Skip to content

Commit

Permalink
Improve TempProposal processing (fixes bisq-network#3143)
Browse files Browse the repository at this point in the history
  • Loading branch information
wiz committed Aug 27, 2019
1 parent 66e963a commit f5d75c4
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 10 deletions.
3 changes: 3 additions & 0 deletions core/src/main/java/bisq/core/dao/DaoSetup.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import bisq.core.dao.governance.bond.role.BondedRolesRepository;
import bisq.core.dao.governance.period.CycleService;
import bisq.core.dao.governance.proofofburn.ProofOfBurnService;
import bisq.core.dao.governance.proposal.ProposalListPresentation;
import bisq.core.dao.governance.proposal.ProposalService;
import bisq.core.dao.governance.voteresult.MissingDataRequestService;
import bisq.core.dao.governance.voteresult.VoteResultService;
Expand Down Expand Up @@ -59,6 +60,7 @@ public DaoSetup(BsqNodeProvider bsqNodeProvider,
CycleService cycleService,
BallotListService ballotListService,
ProposalService proposalService,
ProposalListPresentation proposalListPresentation,
BlindVoteListService blindVoteListService,
MyBlindVoteListService myBlindVoteListService,
VoteRevealService voteRevealService,
Expand Down Expand Up @@ -90,6 +92,7 @@ public DaoSetup(BsqNodeProvider bsqNodeProvider,
daoSetupServices.add(cycleService);
daoSetupServices.add(ballotListService);
daoSetupServices.add(proposalService);
daoSetupServices.add(proposalListPresentation);
daoSetupServices.add(blindVoteListService);
daoSetupServices.add(myBlindVoteListService);
daoSetupServices.add(voteRevealService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,20 @@
package bisq.core.dao.governance.proposal;

import bisq.core.btc.wallet.BsqWalletService;
import bisq.core.dao.DaoSetupService;
import bisq.core.dao.governance.proposal.storage.appendonly.ProposalPayload;
import bisq.core.dao.governance.proposal.storage.temp.TempProposalPayload;
import bisq.core.dao.state.DaoStateListener;
import bisq.core.dao.state.DaoStateService;
import bisq.core.dao.state.model.blockchain.Block;
import bisq.core.dao.state.model.governance.Proposal;

import bisq.network.p2p.storage.HashMapChangedListener;
import bisq.network.p2p.storage.P2PDataStorage;
import bisq.network.p2p.storage.payload.ProtectedStorageEntry;

import bisq.common.UserThread;

import org.bitcoinj.core.TransactionConfidence;

import com.google.inject.Inject;
Expand All @@ -47,7 +55,8 @@
* our own proposal that is not critical). Foreign proposals are only shown if confirmed and fully validated.
*/
@Slf4j
public class ProposalListPresentation implements DaoStateListener, MyProposalListService.Listener {
public class ProposalListPresentation implements DaoStateListener, HashMapChangedListener,
MyProposalListService.Listener, DaoSetupService {
private final ProposalService proposalService;
private final DaoStateService daoStateService;
private final MyProposalListService myProposalListService;
Expand All @@ -56,6 +65,8 @@ public class ProposalListPresentation implements DaoStateListener, MyProposalLis
private final ObservableList<Proposal> allProposals = FXCollections.observableArrayList();
@Getter
private final FilteredList<Proposal> activeOrMyUnconfirmedProposals = new FilteredList<>(allProposals);
private final ListChangeListener<Proposal> proposalListChangeListener;
private boolean tempProposalsChanged;


///////////////////////////////////////////////////////////////////////////////////////////
Expand All @@ -65,6 +76,7 @@ public class ProposalListPresentation implements DaoStateListener, MyProposalLis
@Inject
public ProposalListPresentation(ProposalService proposalService,
DaoStateService daoStateService,
P2PDataStorage p2PDataStorage,
MyProposalListService myProposalListService,
BsqWalletService bsqWalletService,
ProposalValidatorProvider validatorProvider) {
Expand All @@ -75,13 +87,30 @@ public ProposalListPresentation(ProposalService proposalService,
this.validatorProvider = validatorProvider;

daoStateService.addDaoStateListener(this);
p2PDataStorage.addHashMapChangedListener(this);
myProposalListService.addListener(this);

proposalService.getTempProposals().addListener((ListChangeListener<Proposal>) c -> {
updateLists();
});
proposalService.getProposalPayloads().addListener((ListChangeListener<ProposalPayload>) c -> {
updateLists();
proposalListChangeListener = c -> updateLists();
}


///////////////////////////////////////////////////////////////////////////////////////////
// DaoSetupService
///////////////////////////////////////////////////////////////////////////////////////////

@Override
public void addListeners() {
}

@Override
public void start() {
// We must set the listeners initially and not on onParseBlockChainComplete as activeOrMyUnconfirmedProposals
// is used in voteResults which can be called earlier during sync.
// To avoid unneeded upDateLists calls we delay one render frame so that once the proposalService is complete we
// register out listeners.
UserThread.execute(() -> {
proposalService.getTempProposals().addListener(proposalListChangeListener);
proposalService.getProposalPayloads().addListener((ListChangeListener<ProposalPayload>) c -> updateLists());
});
}

Expand All @@ -96,6 +125,43 @@ public void onParseBlockCompleteAfterBatchProcessing(Block block) {
}


///////////////////////////////////////////////////////////////////////////////////////////
// HashMapChangedListener
///////////////////////////////////////////////////////////////////////////////////////////

@Override
public void onAdded(ProtectedStorageEntry entry) {
if (entry.getProtectedStoragePayload() instanceof TempProposalPayload) {
tempProposalsChanged = true;
}
}

@Override
public void onRemoved(ProtectedStorageEntry entry) {
if (entry.getProtectedStoragePayload() instanceof TempProposalPayload) {
tempProposalsChanged = true;
}
}

@Override
public void onBatchRemoveExpiredDataStarted() {
// We temporary remove the listener when batch processing starts to avoid that we rebuild our lists at each
// remove call. After batch processing at onBatchRemoveExpiredDataCompleted we add again our listener and call
// the updateLists method.
proposalService.getTempProposals().removeListener(proposalListChangeListener);
}

@Override
public void onBatchRemoveExpiredDataCompleted() {
proposalService.getTempProposals().addListener(proposalListChangeListener);
// We only call updateLists if tempProposals have changed. updateLists() is an expensive call and takes 200 ms.
if (tempProposalsChanged) {
updateLists();
tempProposalsChanged = false;
}
}


///////////////////////////////////////////////////////////////////////////////////////////
// MyProposalListService.Listener
///////////////////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ private void onProtectedDataRemoved(ProtectedStorageEntry entry) {
if (inPhase || txInPastCycle || unconfirmedOrNonBsqTx) {
if (tempProposals.contains(proposal)) {
tempProposals.remove(proposal);
log.info("We received a remove request for a TempProposalPayload and have removed the proposal " +
log.debug("We received a remove request for a TempProposalPayload and have removed the proposal " +
"from our list. proposal creation date={}, proposalTxId={}, inPhase={}, " +
"txInPastCycle={}, unconfirmedOrNonBsqTx={}",
proposal.getCreationDateAsDate(), proposal.getTxId(), inPhase, txInPastCycle, unconfirmedOrNonBsqTx);
Expand Down
2 changes: 1 addition & 1 deletion p2p/src/main/java/bisq/network/p2p/network/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ public boolean noCapabilityRequiredOrCapabilityIsSupported(Proto msg) {
data = ((AddDataMessage) msg).getProtectedStorageEntry().getProtectedStoragePayload();
}
// Monitoring nodes have only one capability set, we don't want to log those
log.debug("We did not send the message because the peer does not support our required capabilities. " +
log.info("We did not send the message because the peer does not support our required capabilities. " +
"messageClass={}, peer={}, peers supportedCapabilities={}",
data.getClass().getSimpleName(), peersNodeAddressOptional, capabilities);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,12 @@ public interface HashMapChangedListener {

@SuppressWarnings("UnusedParameters")
void onRemoved(ProtectedStorageEntry data);

// We process all expired entries after a delay (60 s) after onBootstrapComplete.
// We notify listeners of start and completion so they can optimize to only update after batch processing is done.
default void onBatchRemoveExpiredDataStarted() {
}

default void onBatchRemoveExpiredDataCompleted() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,12 @@ public void onBootstrapComplete() {
}
});

toRemoveSet.forEach(
protectedDataToRemove -> hashMapChangedListeners.forEach(
// Batch processing can cause performance issues, so we give listeners a chance to deal with it by notifying
// about start and end of iteration.
hashMapChangedListeners.forEach(HashMapChangedListener::onBatchRemoveExpiredDataStarted);
toRemoveSet.forEach(protectedDataToRemove -> hashMapChangedListeners.forEach(
listener -> listener.onRemoved(protectedDataToRemove)));
hashMapChangedListeners.forEach(HashMapChangedListener::onBatchRemoveExpiredDataCompleted);

if (sequenceNumberMap.size() > 1000)
sequenceNumberMap.setMap(getPurgedSequenceNumberMap(sequenceNumberMap.getMap()));
Expand Down
Binary file removed p2p/src/main/resources/TempProposalStore_BTC_MAINNET
Binary file not shown.

0 comments on commit f5d75c4

Please sign in to comment.