Skip to content

Commit

Permalink
Merge pull request #6429 from HenrikJannsen/handle-rpc-errors-gracefully
Browse files Browse the repository at this point in the history
Handle HttpExceptions at block handler
  • Loading branch information
ripcurlx authored Dec 1, 2022
2 parents 397ec38 + 0b07c10 commit 16166c4
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 6 deletions.
3 changes: 2 additions & 1 deletion core/src/main/java/bisq/core/dao/node/BsqNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public abstract class BsqNode implements DaoSetupService {
protected Consumer<String> errorMessageHandler;
@Nullable
protected Consumer<String> warnMessageHandler;
private final List<RawBlock> pendingBlocks = new ArrayList<>();
protected final List<RawBlock> pendingBlocks = new ArrayList<>();

// The chain height of the latest Block we either get reported by Bitcoin Core or from the seed node
// This property should not be used in consensus code but only for retrieving blocks as it is not in sync with the
Expand Down Expand Up @@ -268,6 +268,7 @@ protected Optional<Block> doParseBlock(RawBlock rawBlock) throws RequiredReorgFr

pendingBlocks.clear();
startReOrgFromLastSnapshot();
startParseBlocks();
throw new RequiredReorgFromSnapshotException(rawBlock);
}
return Optional.empty();
Expand Down
23 changes: 22 additions & 1 deletion core/src/main/java/bisq/core/dao/node/full/FullNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public class FullNode extends BsqNode {
private int blocksToParseInBatch;
private long parseInBatchStartTime;
private int parseBlocksOnHeadHeightCounter;
private int numExceptions;


///////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -109,6 +110,8 @@ protected void startParseBlocks() {
int startBlockHeight = daoStateService.getChainHeight();
log.info("startParseBlocks: startBlockHeight={}", startBlockHeight);
rpcService.requestChainHeadHeight(chainHeight -> {
// If our persisted block is equal to the chain height we have startBlockHeight 1 block higher,
// so we do not call parseBlocksOnHeadHeight
log.info("startParseBlocks: chainHeight={}", chainHeight);
if (startBlockHeight <= chainHeight) {
parseBlocksOnHeadHeight(startBlockHeight, chainHeight);
Expand Down Expand Up @@ -257,7 +260,24 @@ private void parseBlockRecursively(int blockHeight,
}

private void handleError(Throwable throwable) {
if (throwable instanceof BlockHashNotConnectingException || throwable instanceof BlockHeightNotConnectingException) {
if (throwable instanceof com.googlecode.jsonrpc4j.HttpException) {
numExceptions++;
if (numExceptions > 10) {
log.warn("We got {} RPC HttpExceptions at our block handler.", numExceptions);
pendingBlocks.clear();
startReOrgFromLastSnapshot();
startParseBlocks();
numExceptions = 0;
}
int delayInSec = Math.min(60, numExceptions * numExceptions);
log.warn("We got a RPC HttpException at our block handler. Last persisted block height: {}. " +
"We try after a delay of {} sec to request from the last height. error={}",
daoStateService.getBlockHeightOfLastBlock(), delayInSec, throwable.toString());
UserThread.runAfter(() -> rpcService.requestChainHeadHeight(
chainHeight -> parseBlocksOnHeadHeight(daoStateService.getBlockHeightOfLastBlock(), chainHeight),
this::handleError),
delayInSec);
} else if (throwable instanceof BlockHashNotConnectingException || throwable instanceof BlockHeightNotConnectingException) {
// We do not escalate that exception as it is handled with the snapshot manager to recover its state.
log.warn(throwable.toString());
} else {
Expand All @@ -277,6 +297,7 @@ private void handleError(Throwable throwable) {
} else if (cause instanceof NotificationHandlerException) {
log.error("Error from within block notification daemon: {}", cause.getCause().toString());
startReOrgFromLastSnapshot();
startParseBlocks();
return;
} else if (cause instanceof Error) {
throw (Error) cause;
Expand Down
11 changes: 7 additions & 4 deletions core/src/main/java/bisq/core/dao/node/full/RpcService.java
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,9 @@ void addNewDtoBlockHandler(Consumer<RawBlock> dtoBlockHandler,

var block = getBlockFromRawDtoBlock(rawDtoBlock);
UserThread.execute(() -> dtoBlockHandler.accept(block));
} catch (Throwable t) {
errorHandler.accept(t);
} catch (Throwable throwable) {
log.error("Error at BlockHandler", throwable);
errorHandler.accept(throwable);
}
});
}
Expand All @@ -241,12 +242,13 @@ public void onSuccess(Integer chainHeight) {
}

public void onFailure(@NotNull Throwable throwable) {
log.error("Error at requestChainHeadHeight", throwable);
UserThread.execute(() -> errorHandler.accept(throwable));
}
}, MoreExecutors.directExecutor());
} catch (Exception e) {
if (!isShutDown || !(e instanceof RejectedExecutionException)) {
log.warn(e.toString(), e);
log.error("Exception at requestChainHeadHeight", e);
throw e;
}
}
Expand Down Expand Up @@ -274,11 +276,12 @@ public void onSuccess(RawBlock block) {

@Override
public void onFailure(@NotNull Throwable throwable) {
log.error("Error at requestDtoBlock: blockHeight={}", blockHeight);
log.error("Error at requestDtoBlock: blockHeight={}, error={}", blockHeight, throwable);
UserThread.execute(() -> errorHandler.accept(throwable));
}
}, MoreExecutors.directExecutor());
} catch (Exception e) {
log.error("Exception at requestDtoBlock", e);
if (!isShutDown || !(e instanceof RejectedExecutionException)) {
log.warn(e.toString(), e);
throw e;
Expand Down

0 comments on commit 16166c4

Please sign in to comment.