Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validate header chain in downloader #1222

Merged
merged 6 commits into from
Nov 8, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.commons.collections4.queue.CircularFifoQueue;
import org.ethereum.core.*;
import org.ethereum.crypto.HashUtil;
import org.ethereum.net.server.Channel;
import org.ethereum.validator.BlockHeaderValidator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spongycastle.util.encoders.Hex;

import java.util.*;
import java.util.concurrent.*;
Expand Down Expand Up @@ -388,20 +390,41 @@ private boolean validateAndAddHeaders(List<BlockHeader> headers, byte[] nodeId)
wrappers.add(new BlockHeaderWrapper(header, nodeId));
}

SyncQueueIfc.ValidatedHeaders res;
synchronized (this) {
List<BlockHeaderWrapper> headersReady = syncQueue.addHeaders(wrappers);
if (headersReady != null && !headersReady.isEmpty()) {
pushHeaders(headersReady);
res = syncQueue.addHeadersAndValidate(wrappers);
if (res.isValid() && !res.getHeaders().isEmpty()) {
pushHeaders(res.getHeaders());
}
}

dropIfValidationFailed(res);

receivedHeadersLatch.countDown();

logger.debug("{}: {} headers added", name, headers.size());

return true;
}

/**
* Checks whether validation has been passed correctly or not
* and drops misleading peer if it hasn't
*/
protected void dropIfValidationFailed(SyncQueueIfc.ValidatedHeaders res) {
if (!res.isValid() && res.getNodeId() != null) {
if (logger.isWarnEnabled()) logger.warn("Invalid header received: {}, reason: {}, peer: {}",
mkalinin marked this conversation as resolved.
Show resolved Hide resolved
res.getHeader() == null ? "" : res.getHeader().getShortDescr(),
res.getReason(),
Hex.toHexString(res.getNodeId()).substring(0, 8));

Channel peer = pool.getByNodeId(res.getNodeId());
if (peer != null) {
peer.dropConnection();
}
}
}

/**
* Runs checks against block's header. <br>
* All these checks make sense before block is added to queue
Expand Down
11 changes: 9 additions & 2 deletions ethereumj-core/src/main/java/org/ethereum/sync/SyncManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.ethereum.net.server.ChannelManager;
import org.ethereum.util.ExecutorPipeline;
import org.ethereum.validator.BlockHeaderValidator;
import org.ethereum.validator.DependentBlockHeaderRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -91,6 +92,9 @@ public void accept(BlockWrapper blockWrapper) {
@Autowired
private FastSyncManager fastSyncManager;

@Autowired
private DependentBlockHeaderRule parentHeaderValidator;

ChannelManager channelManager;

private SystemProperties config;
Expand Down Expand Up @@ -161,7 +165,8 @@ void initRegularSync(EthereumListener.SyncState syncDoneType) {
logger.info("Initializing SyncManager regular sync.");
this.syncDoneType = syncDoneType;

syncQueue = new SyncQueueImpl(blockchain);
syncQueue = new SyncQueueImpl(blockchain)
.withParentHeaderValidator(parentHeaderValidator);
super.init(syncQueue, pool, "RegularSync");

Runnable queueProducer = this::produceQueue;
Expand Down Expand Up @@ -393,7 +398,9 @@ public boolean validateAndAddNewBlock(Block block, byte[] nodeId) {
}

logger.debug("Adding new block to sync queue: " + block.getShortDescr());
syncQueue.addHeaders(singletonList(new BlockHeaderWrapper(block.getHeader(), nodeId)));
SyncQueueIfc.ValidatedHeaders res = syncQueue.addHeadersAndValidate(
singletonList(new BlockHeaderWrapper(block.getHeader(), nodeId)));
dropIfValidationFailed(res);

synchronized (this) {
List<Block> newBlocks = syncQueue.addBlocks(singletonList(block));
Expand Down
67 changes: 67 additions & 0 deletions ethereumj-core/src/main/java/org/ethereum/sync/SyncQueueIfc.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@
package org.ethereum.sync;

import org.ethereum.core.Block;
import org.ethereum.core.BlockHeader;
import org.ethereum.core.BlockHeaderWrapper;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

/**
Expand Down Expand Up @@ -55,6 +59,57 @@ interface BlocksRequest {
List<BlockHeaderWrapper> getBlockHeaders();
}

/**
* Handles result of {@link #addHeadersAndValidate(Collection)} invocation.
*
* <p>
* If {@code valid} is true then validation passed successfully
* and {@code headers} list contains the same result as if {@link #addHeaders(Collection)} was called.
* Otherwise, the list contains invalid headers.
*/
class ValidatedHeaders {
public static final ValidatedHeaders Empty = new ValidatedHeaders(Collections.emptyList(), true);

private final List<BlockHeaderWrapper> headers;
private final boolean valid;
private final String reason;

public ValidatedHeaders(List<BlockHeaderWrapper> headers, boolean valid, String reason) {
this.headers = headers;
this.valid = valid;
this.reason = reason;
}

public ValidatedHeaders(List<BlockHeaderWrapper> headers, boolean valid) {
this(headers, valid, "");
}

public boolean isValid() {
return valid;
}

@Nonnull
public List<BlockHeaderWrapper> getHeaders() {
return headers;
}

public String getReason() {
return reason;
}

@Nullable
public byte[] getNodeId() {
if (headers.isEmpty()) return null;
return headers.get(0).getNodeId();
}

@Nullable
public BlockHeader getHeader() {
if (headers.isEmpty()) return null;
return headers.get(0).getHeader();
}
}

/**
* Returns wanted headers requests
* @param maxSize Maximum number of headers in a singles request
Expand All @@ -76,6 +131,18 @@ interface BlocksRequest {
*/
List<BlockHeaderWrapper> addHeaders(Collection<BlockHeaderWrapper> headers);

/**
* In general, does the same work as {@link #addHeaders(Collection)} does.
* But before trimming, the longest chain is checked with parent header validator.
* If validation is failed, the chain is erased from the queue.
*
* <p>
* <b>Note:</b> in reverse queue falls to {@link #addHeaders(Collection)} invocation
*
* @return check {@link ValidatedHeaders} for details
*/
ValidatedHeaders addHeadersAndValidate(Collection<BlockHeaderWrapper> headers);

/**
* Returns wanted blocks hashes
*/
Expand Down
92 changes: 92 additions & 0 deletions ethereumj-core/src/main/java/org/ethereum/sync/SyncQueueImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.ethereum.core.Blockchain;
import org.ethereum.db.ByteArrayWrapper;
import org.ethereum.util.ByteArrayMap;
import org.ethereum.validator.DependentBlockHeaderRule;

import java.util.*;
import java.util.function.Function;
Expand Down Expand Up @@ -183,6 +184,8 @@ public List<HeaderElement> getChildren() {

Random rnd = new Random(); // ;)

DependentBlockHeaderRule parentHeaderValidator = null;

public SyncQueueImpl(List<Block> initBlocks) {
init(initBlocks);
}
Expand Down Expand Up @@ -264,6 +267,10 @@ private boolean hasGaps() {

private void trimChain() {
List<HeaderElement> longestChain = getLongestChain();
trimChainImpl(longestChain);
}

private void trimChainImpl(List<HeaderElement> longestChain) {
if (longestChain.size() > MAX_CHAIN_LEN) {
long newTrimNum = getLongestChain().get(longestChain.size() - MAX_CHAIN_LEN).header.getNumber();
for (int i = 0; darkZoneNum < newTrimNum; darkZoneNum++, i++) {
Expand Down Expand Up @@ -360,6 +367,86 @@ public synchronized List<BlockHeaderWrapper> addHeaders(Collection<BlockHeaderWr
return null;
}

@Override
public ValidatedHeaders addHeadersAndValidate(Collection<BlockHeaderWrapper> headers) {
for (BlockHeaderWrapper header : headers) {
addHeader(header);
}

List<HeaderElement> longestChain = getLongestChain();

// do not run the payload if chain is too short
if (longestChain.size() > MAX_CHAIN_LEN) {
ValidatedHeaders result = validateChain(longestChain);
if (result.isValid()) {
trimChainImpl(longestChain);
} else {
// erase chain starting from first invalid header
eraseChain(longestChain, result.getHeaders().get(0).getNumber());
}

return result;
}

return ValidatedHeaders.Empty;
}

/**
* Runs parent header validation and returns after first occurrence of invalid header
*/
ValidatedHeaders validateChain(List<HeaderElement> chain) {
if (parentHeaderValidator == null)
return ValidatedHeaders.Empty;

for (int i = 1; i < chain.size(); i++) {
BlockHeaderWrapper parent = chain.get(i - 1).header;
BlockHeaderWrapper header = chain.get(i).header;
if (!parentHeaderValidator.validate(header.getHeader(), parent.getHeader())) {
return new ValidatedHeaders(Collections.singletonList(header), false,
parentHeaderValidator.getErrors().isEmpty() ? "" : parentHeaderValidator.getErrors().get(0));
}
}

return ValidatedHeaders.Empty;
}

void eraseChain(List<HeaderElement> chain, long startFrom) {
if (chain.isEmpty())
return;

// prevent from going beyond dark zone
startFrom = Math.max(darkZoneNum + 1, startFrom);

HeaderElement head = chain.get(chain.size() - 1);
for (int i = chain.size() - 1; i >= 0; i--) {
HeaderElement el = chain.get(i);
if (el.header.getNumber() < startFrom) break; // erase up to startFrom number
Map<ByteArrayWrapper, HeaderElement> gen = headers.get(el.header.getNumber());
gen.remove(new ByteArrayWrapper(el.header.getHash()));
// clean empty gens
if (gen.isEmpty()) {
headers.remove(el.header.getNumber());
}
}

// adjust maxNum
if (head.header.getNumber() == maxNum) {
Map<ByteArrayWrapper, HeaderElement> lastValidatedGen = headers.get(darkZoneNum);
assert lastValidatedGen.size() == 1;
long maxNotEmptyGen = lastValidatedGen.values().iterator().next().header.getNumber();

// find new maxNum after chain has been erased
for (long num = head.header.getNumber(); num >= darkZoneNum; num--) {
Map<ByteArrayWrapper, HeaderElement> gen = headers.get(num);
if (gen != null && !gen.isEmpty() && num > maxNotEmptyGen) {
maxNotEmptyGen = num;
break;
}
}
maxNum = maxNotEmptyGen;
}
}

@Override
public synchronized int getHeadersCount() {
return (int) (maxNum - minNum);
Expand Down Expand Up @@ -438,6 +525,11 @@ public synchronized List<Block> pollBlocks() {
return null;
}

public SyncQueueImpl withParentHeaderValidator(DependentBlockHeaderRule validator) {
this.parentHeaderValidator = validator;
return this;
}


interface Visitor<T> {
T visit(HeaderElement el, List<T> childrenRes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,12 @@ public synchronized List<BlockHeaderWrapper> addHeaders(Collection<BlockHeaderWr
}
}

@Override
public ValidatedHeaders addHeadersAndValidate(Collection<BlockHeaderWrapper> headers) {
List<BlockHeaderWrapper> added = addHeaders(headers);
return new ValidatedHeaders(added, true);
}

@Override
public synchronized BlocksRequest requestBlocks(int maxSize) {
List<BlockHeaderWrapper> reqHeaders = new ArrayList<>();
Expand Down
Loading