Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix initial data request handling #6947

Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public void onAllServicesInitialized() {
} else {
p2PService.addP2PServiceListener(new BootstrapListener() {
@Override
public void onUpdatedDataReceived() {
public void onDataReceived() {
onBootstrapComplete();
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ public void onAllServicesInitialized() {
} else {
p2PService.addP2PServiceListener(new BootstrapListener() {
@Override
public void onUpdatedDataReceived() {
public void onDataReceived() {
onBootStrapped();
}
});
Expand Down
16 changes: 4 additions & 12 deletions core/src/main/java/bisq/core/app/AppStartupState.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class AppStartupState {
private final BooleanProperty walletAndNetworkReady = new SimpleBooleanProperty();
private final BooleanProperty allDomainServicesInitialized = new SimpleBooleanProperty();
private final BooleanProperty applicationFullyInitialized = new SimpleBooleanProperty();
private final BooleanProperty updatedDataReceived = new SimpleBooleanProperty();
private final BooleanProperty dataReceived = new SimpleBooleanProperty();
private final BooleanProperty isBlockDownloadComplete = new SimpleBooleanProperty();
private final BooleanProperty hasSufficientPeersForBroadcast = new SimpleBooleanProperty();

Expand All @@ -57,8 +57,8 @@ public AppStartupState(WalletsSetup walletsSetup, P2PService p2PService) {

p2PService.addP2PServiceListener(new BootstrapListener() {
@Override
public void onUpdatedDataReceived() {
updatedDataReceived.set(true);
public void onDataReceived() {
dataReceived.set(true);
}
});

Expand All @@ -72,7 +72,7 @@ public void onUpdatedDataReceived() {
hasSufficientPeersForBroadcast.set(true);
});

p2pNetworkAndWalletInitialized = EasyBind.combine(updatedDataReceived,
p2pNetworkAndWalletInitialized = EasyBind.combine(dataReceived,
isBlockDownloadComplete,
hasSufficientPeersForBroadcast,
allDomainServicesInitialized,
Expand Down Expand Up @@ -122,14 +122,6 @@ public ReadOnlyBooleanProperty applicationFullyInitializedProperty() {
return applicationFullyInitialized;
}

public boolean isUpdatedDataReceived() {
return updatedDataReceived.get();
}

public ReadOnlyBooleanProperty updatedDataReceivedProperty() {
return updatedDataReceived;
}

public boolean isBlockDownloadComplete() {
return isBlockDownloadComplete.get();
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/bisq/core/app/BisqSetup.java
Original file line number Diff line number Diff line change
Expand Up @@ -849,8 +849,8 @@ public StringProperty getP2PNetworkStatusIconId() {
return p2PNetworkSetup.getP2PNetworkStatusIconId();
}

public BooleanProperty getUpdatedDataReceived() {
return p2PNetworkSetup.getUpdatedDataReceived();
public BooleanProperty getDataReceived() {
return p2PNetworkSetup.getDataReceived();
}

public StringProperty getP2pNetworkLabelId() {
Expand Down
8 changes: 3 additions & 5 deletions core/src/main/java/bisq/core/app/P2PNetworkSetup.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public class P2PNetworkSetup {
@Getter
final StringProperty p2pNetworkWarnMsg = new SimpleStringProperty();
@Getter
final BooleanProperty updatedDataReceived = new SimpleBooleanProperty();
final BooleanProperty dataReceived = new SimpleBooleanProperty();
@Getter
final BooleanProperty p2pNetworkFailed = new SimpleBooleanProperty();
final FilterManager filterManager;
Expand All @@ -97,12 +97,11 @@ BooleanProperty init(Runnable initWalletServiceHandler,
StringProperty bootstrapState = new SimpleStringProperty();
StringProperty bootstrapWarning = new SimpleStringProperty();
BooleanProperty hiddenServicePublished = new SimpleBooleanProperty();
BooleanProperty initialP2PNetworkDataReceived = new SimpleBooleanProperty();

addP2PMessageFilter();

p2PNetworkInfoBinding = EasyBind.combine(bootstrapState, bootstrapWarning, p2PService.getNumConnectedPeers(),
walletsSetup.numPeersProperty(), hiddenServicePublished, initialP2PNetworkDataReceived,
walletsSetup.numPeersProperty(), hiddenServicePublished, dataReceived,
(state, warning, numP2pPeers, numBtcPeers, hiddenService, dataReceived) -> {
String result;
String daoFullNode = preferences.isDaoFullNode() ? Res.get("mainView.footer.daoFullNode") + " / " : "";
Expand Down Expand Up @@ -171,8 +170,8 @@ public void onHiddenServicePublished() {
@Override
public void onDataReceived() {
log.debug("onRequestingDataCompleted");
initialP2PNetworkDataReceived.set(true);
bootstrapState.set(Res.get("mainView.bootstrapState.initialDataReceived"));
dataReceived.set(true);
splashP2PNetworkAnimationVisible.set(false);
p2pNetworkInitialized.set(true);
}
Expand Down Expand Up @@ -208,7 +207,6 @@ public void onNoPeersAvailable() {
public void onUpdatedDataReceived() {
log.debug("onUpdatedDataReceived");
splashP2PNetworkAnimationVisible.set(false);
updatedDataReceived.set(true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public void onNoSeedNodeAvailable() {
}

@Override
public void onUpdatedDataReceived() {
public void onDataReceived() {
onP2PNetworkReady();
}
};
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/bisq/core/dao/node/BsqNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public void onRequestCustomBridges() {

@Override
public void onDataReceived() {
onP2PNetworkReady();
}

@Override
Expand All @@ -126,7 +127,6 @@ public void onNoPeersAvailable() {

@Override
public void onUpdatedDataReceived() {
onP2PNetworkReady();
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,12 @@ public GetBlocksRequestHandler(NetworkNode networkNode, DaoStateService daoState

public void onGetBlocksRequest(GetBlocksRequest getBlocksRequest, Connection connection) {
long ts = System.currentTimeMillis();
// We limit number of blocks to 3000 which is about 3 weeks.
List<Block> blocks = new LinkedList<>(daoStateService.getBlocksFromBlockHeight(getBlocksRequest.getFromBlockHeight(), 3000));
List<RawBlock> rawBlocks = blocks.stream().map(RawBlock::fromBlock).collect(Collectors.toList());
// We limit number of blocks to 3000 which is about 3 weeks and about 5 MB on data
List<Block> blocks = daoStateService.getBlocksFromBlockHeight(getBlocksRequest.getFromBlockHeight());
List<RawBlock> rawBlocks = new LinkedList<>(blocks).stream()
.map(RawBlock::fromBlock)
.limit(3000)
.collect(Collectors.toList());
GetBlocksResponse getBlocksResponse = new GetBlocksResponse(rawBlocks, getBlocksRequest.getNonce());
log.info("Received GetBlocksRequest from {} for blocks from height {}. " +
"Building GetBlocksResponse with {} blocks took {} ms.",
Expand Down
20 changes: 18 additions & 2 deletions core/src/main/java/bisq/core/dao/node/lite/LiteNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public LiteNode(BlockParser blockParser,
blockDownloadListener = (observable, oldValue, newValue) -> {
if ((double) newValue == 1) {
setupWalletBestBlockListener();
maybeStartRequestingBlocks();
}
};
}
Expand Down Expand Up @@ -176,8 +177,13 @@ public void onFault(String errorMessage, @Nullable Connection connection) {
}
});

if (!parseBlockchainComplete)
maybeStartRequestingBlocks();
}

private void maybeStartRequestingBlocks() {
if (walletsSetup.isDownloadComplete() && p2pNetworkReady && !parseBlockchainComplete) {
startParseBlocks();
}
}

// First we request the blocks from a full node
Expand All @@ -192,7 +198,8 @@ protected void startParseBlocks() {
return;
}

// If we request blocks we increment the ConnectionState counter.
// If we request blocks we increment the ConnectionState counter so that the connection does not get reset from
// INITIAL_DATA_EXCHANGE to PEER and therefore lower priority for getting closed
ConnectionState.incrementExpectedInitialDataResponses();

if (chainHeight == daoStateService.getGenesisBlockHeight()) {
Expand Down Expand Up @@ -229,6 +236,11 @@ private void onRequestedBlocksReceived(List<RawBlock> blockList, Runnable onPars
return;
}

if (walletsSetup.isDownloadComplete() && chainTipHeight < bsqWalletService.getBestChainHeight()) {
// We need to request more blocks and increment the ConnectionState counter so that the connection does not get reset from
// INITIAL_DATA_EXCHANGE to PEER and therefore lower priority for getting closed
ConnectionState.incrementExpectedInitialDataResponses();
}
runDelayedBatchProcessing(new ArrayList<>(blockList),
() -> {
double duration = System.currentTimeMillis() - ts;
Expand All @@ -239,8 +251,12 @@ private void onRequestedBlocksReceived(List<RawBlock> blockList, Runnable onPars
// We only request again if wallet is synced, otherwise we would get repeated calls we want to avoid.
// We deal with that case at the setupWalletBestBlockListener method above.
if (walletsSetup.isDownloadComplete() && daoStateService.getChainHeight() < bsqWalletService.getBestChainHeight()) {
log.info("We have completed batch processing of {} blocks but we have still {} missing blocks and request again.",
blockList.size(), bsqWalletService.getBestChainHeight() - daoStateService.getChainHeight());

liteNodeNetworkService.requestBlocks(daoStateService.getChainHeight() + 1);
} else {
log.info("We have completed batch processing of {} blocks and we have reached the chain tip of the wallet.", blockList.size());
onParsingComplete.run();
onParseBlockChainComplete();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,15 +128,15 @@ public void requestBlocks() {
},
TIMEOUT_MIN, TimeUnit.MINUTES);

log.info("We request blocks from peer {} from block height {}.", nodeAddress, getBlocksRequest.getFromBlockHeight());
log.info("\n\n>> We request blocks from peer {} from block height {}.\n", nodeAddress, getBlocksRequest.getFromBlockHeight());

networkNode.addMessageListener(this);

SettableFuture<Connection> future = networkNode.sendMessage(nodeAddress, getBlocksRequest);
Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(Connection connection) {
log.info("Sending of GetBlocksRequest message to peer {} succeeded.", nodeAddress.getFullAddress());
log.debug("Sending of GetBlocksRequest message to peer {} succeeded.", nodeAddress.getFullAddress());
}

@Override
Expand Down Expand Up @@ -190,7 +190,9 @@ public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) {
}

terminate();
log.info("We received from peer {} a BlocksResponse with {} blocks",
log.info("\n#################################################################\n" +
"We received from peer {} a BlocksResponse with {} blocks" +
"\n#################################################################\n",
nodeAddress.getFullAddress(), getBlocksResponse.getBlocks().size());
listener.onComplete(getBlocksResponse);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public static NetworkEnvelope fromProto(protobuf.GetBlocksResponse proto, int me
List<RawBlock> list = proto.getRawBlocksList().stream()
.map(RawBlock::fromProto)
.collect(Collectors.toList());
log.info("Received a GetBlocksResponse with {} blocks and {} kB size", list.size(), proto.getSerializedSize() / 1000d);
log.info("\n\n<< Received a GetBlocksResponse with {} blocks and {} kB size\n", list.size(), proto.getSerializedSize() / 1000d);
return new GetBlocksResponse(proto.getRawBlocksList().isEmpty() ?
new ArrayList<>() :
list,
Expand Down
7 changes: 0 additions & 7 deletions core/src/main/java/bisq/core/dao/state/DaoStateService.java
Original file line number Diff line number Diff line change
Expand Up @@ -349,15 +349,8 @@ public long getBlockTime(int height) {
}

public List<Block> getBlocksFromBlockHeight(int fromBlockHeight) {
return getBlocksFromBlockHeight(fromBlockHeight, Integer.MAX_VALUE);
}

public List<Block> getBlocksFromBlockHeight(int fromBlockHeight, int numMaxBlocks) {
// We limit requests to numMaxBlocks blocks, to avoid performance issues and too
// large network data in case a node requests too far back in history.
return getBlocks().stream()
.filter(block -> block.getHeight() >= fromBlockHeight)
.limit(numMaxBlocks)
.collect(Collectors.toList());
}

Expand Down
12 changes: 6 additions & 6 deletions core/src/main/java/bisq/core/filter/FilterManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,12 @@ public void onRemoved(Collection<ProtectedStorageEntry> protectedStorageEntries)
p2PService.addP2PServiceListener(new P2PServiceListener() {
@Override
public void onDataReceived() {
// We should have received all data at that point and if the filters were not set we
// clean up the persisted banned nodes in the options file as it might be that we missed the filter
// remove message if we have not been online.
if (filterProperty.get() == null) {
clearBannedNodes();
}
}

@Override
Expand All @@ -200,12 +206,6 @@ public void onNoPeersAvailable() {

@Override
public void onUpdatedDataReceived() {
// We should have received all data at that point and if the filters were not set we
// clean up the persisted banned nodes in the options file as it might be that we missed the filter
// remove message if we have not been online.
if (filterProperty.get() == null) {
clearBannedNodes();
}
}

@Override
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/bisq/core/offer/OfferBookService.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public void onRemoved(Collection<ProtectedStorageEntry> protectedStorageEntries)
if (dumpStatistics) {
p2PService.addP2PServiceListener(new BootstrapListener() {
@Override
public void onUpdatedDataReceived() {
public void onDataReceived() {
addOfferBookChangedListener(new OfferBookChangedListener() {
@Override
public void onAdded(Offer offer) {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/bisq/core/offer/OpenOfferManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ public void onAllServicesInitialized() {
} else {
p2PService.addP2PServiceListener(new BootstrapListener() {
@Override
public void onUpdatedDataReceived() {
public void onDataReceived() {
onBootstrapComplete();
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void onAllServicesInitialized() {
} else {
p2PService.addP2PServiceListener(new BootstrapListener() {
@Override
public void onUpdatedDataReceived() {
public void onDataReceived() {
onBootstrapComplete();
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public OpenBsqSwapOfferService(OpenOfferManager openOfferManager,
};
bootstrapListener = new BootstrapListener() {
@Override
public void onUpdatedDataReceived() {
public void onDataReceived() {
onP2PServiceReady();
p2PService.removeP2PServiceListener(bootstrapListener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ public void onAllServicesInitialized() {

p2PService.addP2PServiceListener(new BootstrapListener() {
@Override
public void onUpdatedDataReceived() {
public void onDataReceived() {
tryApplyMessages();
checkDisputesForUpdates();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public void onRemoved(Collection<ProtectedStorageEntry> protectedStorageEntries)
else
p2PService.addP2PServiceListener(new BootstrapListener() {
@Override
public void onUpdatedDataReceived() {
public void onDataReceived() {
startRepublishDisputeAgent();
}
});
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/bisq/core/trade/TradeManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ public void onAllServicesInitialized() {
} else {
p2PService.addP2PServiceListener(new BootstrapListener() {
@Override
public void onUpdatedDataReceived() {
public void onDataReceived() {
initPersistedTrades();
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void handleTrades(List<Trade> trades) {
} else {
p2PService.addP2PServiceListener(new BootstrapListener() {
@Override
public void onUpdatedDataReceived() {
public void onDataReceived() {
cleanupMailboxMessages(trades);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public void onTorNodeReady() {
}

@Override
public void onUpdatedDataReceived() {
public void onDataReceived() {
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ private BooleanProperty isP2pBootstrapped() {
} else {
bootstrapListener = new BootstrapListener() {
@Override
public void onUpdatedDataReceived() {
public void onDataReceived() {
p2PService.removeP2PServiceListener(bootstrapListener);
result.set(true);
}
Expand Down
2 changes: 1 addition & 1 deletion desktop/src/main/java/bisq/desktop/main/MainView.java
Original file line number Diff line number Diff line change
Expand Up @@ -820,7 +820,7 @@ private AnchorPane createFooter() {
}
});

model.getUpdatedDataReceived().addListener((observable, oldValue, newValue) -> {
model.getDataReceived().addListener((observable, oldValue, newValue) -> {
p2PNetworkIcon.setOpacity(1);
p2pNetworkProgressBar.setProgress(0);
});
Expand Down
Loading