From 9ca1ea0734afbf68e158d6ecc8da79bbdde68a11 Mon Sep 17 00:00:00 2001 From: Adrian Sutton Date: Fri, 22 Feb 2019 10:27:25 +1000 Subject: [PATCH 01/10] Use a single thread to persist received world state nodes to avoid RocksDB timeouts due to contention on the write lock. --- .../sync/worldstate/WorldStateDownloader.java | 103 +++++++++++++----- 1 file changed, 78 insertions(+), 25 deletions(-) diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java index e7786c3140..c66b408deb 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java @@ -17,6 +17,7 @@ import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer; import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.EthTaskException; +import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractEthTask; import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractPeerTask; import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult; import tech.pegasys.pantheon.ethereum.eth.manager.task.EthTask; @@ -42,9 +43,11 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -57,6 +60,7 @@ public class WorldStateDownloader { private static final Logger LOG = LogManager.getLogger(); private final Counter completedRequestsCounter; private final Counter retriedRequestsTotal; + private final ArrayBlockingQueue> requestsToPersist; private enum Status { IDLE, @@ -96,6 +100,9 @@ public WorldStateDownloader( this.maxOutstandingRequests = maxOutstandingRequests; this.maxNodeRequestRetries = maxNodeRequestRetries; this.ethTasksTimer = ethTasksTimer; + // Room for the requests we expect to do in parallel plus some buffer but not unlimited. + this.requestsToPersist = + new ArrayBlockingQueue<>(hashCountPerRequest * maxOutstandingRequests * 2); metricsSystem.createLongGauge( MetricCategory.SYNCHRONIZER, "world_state_pending_requests_current", @@ -124,6 +131,12 @@ public WorldStateDownloader( "world_state_node_request_failures_max", "Highest number of times a node data request has been retried in this download", highestRetryCount::get); + + metricsSystem.createIntegerGauge( + MetricCategory.SYNCHRONIZER, + "world_state_node_persistence_queue_length_current", + "Current number of node data requests waiting to be persisted", + requestsToPersist::size); } public CompletableFuture run(final BlockHeader header) { @@ -152,6 +165,7 @@ public CompletableFuture run(final BlockHeader header) { } } ethContext.getScheduler().scheduleSyncWorkerTask(() -> requestNodeData(header)); + ethContext.getScheduler().scheduleServiceTask(new PersistNodeDataTask(header)); return future; } @@ -186,24 +200,11 @@ private void requestDataFromPeer(final BlockHeader header, final EthPeer peer) { sendAndProcessRequests(peer, toRequest, header) .whenComplete( (task, error) -> { - final boolean done; - synchronized (this) { - outstandingRequests.remove(task); - done = - status == Status.RUNNING - && outstandingRequests.size() == 0 - && pendingRequests.allTasksCompleted(); - } - if (done) { - // We're done - final Updater updater = worldStateStorage.updater(); - updater.putAccountStateTrieNode(header.getStateRoot(), rootNode); - updater.commit(); - markDone(); - } else { - // Send out additional requests - requestNodeData(header); + if (error != null) { + LOG.error("World state data request failed", error); } + outstandingRequests.remove(task); + requestNodeData(header); }); } @@ -281,7 +282,6 @@ private CompletableFuture>> storeData( final BlockHeader blockHeader, final AbstractPeerTask> ethTask, final Map data) { - final Updater storageUpdater = worldStateStorage.updater(); for (final Task task : requestTasks) { final NodeDataRequest request = task.getData(); final BytesValue matchingData = data.get(request.getHash()); @@ -294,23 +294,34 @@ private CompletableFuture>> storeData( } task.markFailed(); } else { - completedRequestsCounter.inc(); - // Persist request data request.setData(matchingData); + queueChildRequests(request); if (isRootState(blockHeader, request)) { rootNode = request.getData(); + task.markCompleted(); } else { - request.persist(storageUpdater); + addToPersistenceQueue(task); } - - queueChildRequests(request); - task.markCompleted(); } } - storageUpdater.commit(); + requestNodeData(blockHeader); return CompletableFuture.completedFuture(ethTask); } + private void addToPersistenceQueue(final Task task) { + while (!future.isDone()) { + try { + if (requestsToPersist.offer(task, 1, TimeUnit.SECONDS)) { + break; + } + } catch (final InterruptedException e) { + task.markFailed(); + Thread.currentThread().interrupt(); + break; + } + } + } + private void updateHighestRetryCount(final int requestFailures) { int previousHighestRetry = highestRetryCount.get(); while (requestFailures > previousHighestRetry) { @@ -385,4 +396,46 @@ private Map mapNodeDataByHash(final List data) { data.forEach(d -> dataByHash.put(Hash.hash(d), d)); return dataByHash; } + + private class PersistNodeDataTask extends AbstractEthTask { + + private final List> batch; + private final BlockHeader header; + + public PersistNodeDataTask(final BlockHeader header) { + super(WorldStateDownloader.this.ethTasksTimer); + this.header = header; + batch = new ArrayList<>(); + } + + @Override + protected void executeTask() { + while (!isDone() && !future.isDone()) { + try { + final Task task = requestsToPersist.poll(1, TimeUnit.SECONDS); + if (task != null) { + batch.clear(); + batch.add(task); + requestsToPersist.drainTo(batch, 1000); + final Updater storageUpdater = worldStateStorage.updater(); + batch.forEach( + taskToPersist -> { + completedRequestsCounter.inc(); + taskToPersist.getData().persist(storageUpdater); + taskToPersist.markCompleted(); + }); + storageUpdater.commit(); + if (pendingRequests.allTasksCompleted()) { + final Updater updater = worldStateStorage.updater(); + updater.putAccountStateTrieNode(header.getStateRoot(), rootNode); + updater.commit(); + markDone(); + } + } + } catch (final InterruptedException ignore) { + Thread.currentThread().interrupt(); + } + } + } + } } From 03f14395c28002865378c2b2a11b6afbe17260ed Mon Sep 17 00:00:00 2001 From: Adrian Sutton Date: Fri, 22 Feb 2019 10:44:30 +1000 Subject: [PATCH 02/10] Use a dedicated thread to persist world state data. --- .../eth/sync/worldstate/WorldStateDownloader.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java index c66b408deb..fdd7b33511 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java @@ -47,6 +47,7 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -82,6 +83,7 @@ private enum Status { private volatile CompletableFuture future; private volatile Status status = Status.IDLE; private volatile BytesValue rootNode; + private volatile PersistNodeDataTask persistenceTask; private final AtomicInteger highestRetryCount = new AtomicInteger(0); public WorldStateDownloader( @@ -165,7 +167,8 @@ public CompletableFuture run(final BlockHeader header) { } } ethContext.getScheduler().scheduleSyncWorkerTask(() -> requestNodeData(header)); - ethContext.getScheduler().scheduleServiceTask(new PersistNodeDataTask(header)); + persistenceTask = new PersistNodeDataTask(header); + ethContext.getScheduler().scheduleServiceTask(persistenceTask); return future; } @@ -200,7 +203,8 @@ private void requestDataFromPeer(final BlockHeader header, final EthPeer peer) { sendAndProcessRequests(peer, toRequest, header) .whenComplete( (task, error) -> { - if (error != null) { + if (error != null + && !(ExceptionUtils.rootCause(error) instanceof RejectedExecutionException)) { LOG.error("World state data request failed", error); } outstandingRequests.remove(task); @@ -372,6 +376,7 @@ private synchronized void handleStalledDownload() { private synchronized void doCancelDownload() { status = Status.CANCELLED; + persistenceTask.cancel(); pendingRequests.clear(); for (final EthTask outstandingRequest : outstandingRequests) { outstandingRequest.cancel(); @@ -410,7 +415,7 @@ public PersistNodeDataTask(final BlockHeader header) { @Override protected void executeTask() { - while (!isDone() && !future.isDone()) { + while (!isDone()) { try { final Task task = requestsToPersist.poll(1, TimeUnit.SECONDS); if (task != null) { From 5846dc95ffcc96efb299c05049efafe4b1bfc625 Mon Sep 17 00:00:00 2001 From: Adrian Sutton Date: Fri, 22 Feb 2019 12:37:50 +1000 Subject: [PATCH 03/10] Fix RocksDbTaskQueue so it doesn't read stale data when resuming a transfer. --- .../services/queue/RocksDbTaskQueue.java | 12 +++++-- .../services/queue/RocksDbTaskQueueTest.java | 34 ++++++++++++++++--- 2 files changed, 39 insertions(+), 7 deletions(-) diff --git a/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/RocksDbTaskQueue.java b/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/RocksDbTaskQueue.java index 4160cfd955..21cd6e6a3c 100644 --- a/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/RocksDbTaskQueue.java +++ b/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/RocksDbTaskQueue.java @@ -39,6 +39,7 @@ public class RocksDbTaskQueue implements TaskQueue { private long lastEnqueuedKey = 0; private long lastDequeuedKey = 0; private RocksIterator dequeueIterator; + private long lastValidKeyFromIterator; private long oldestKey = 0; private final Set> outstandingTasks = new HashSet<>(); @@ -104,14 +105,14 @@ public synchronized Task dequeue() { } try (final OperationTimer.TimingContext ignored = dequeueLatency.startTimer()) { if (dequeueIterator == null) { - dequeueIterator = db.newIterator(); + createNewIterator(); } final long key = ++lastDequeuedKey; dequeueIterator.seek(Longs.toByteArray(key)); - if (!dequeueIterator.isValid()) { + if (key > lastValidKeyFromIterator || !dequeueIterator.isValid()) { // Reached the end of the snapshot this iterator was loaded with dequeueIterator.close(); - dequeueIterator = db.newIterator(); + createNewIterator(); dequeueIterator.seek(Longs.toByteArray(key)); if (!dequeueIterator.isValid()) { throw new IllegalStateException("Next expected value is missing"); @@ -125,6 +126,11 @@ public synchronized Task dequeue() { } } + private void createNewIterator() { + dequeueIterator = db.newIterator(); + lastValidKeyFromIterator = lastEnqueuedKey; + } + @Override public synchronized long size() { assertNotClosed(); diff --git a/services/queue/src/test/java/tech/pegasys/pantheon/services/queue/RocksDbTaskQueueTest.java b/services/queue/src/test/java/tech/pegasys/pantheon/services/queue/RocksDbTaskQueueTest.java index 4cbb725a05..7d01093528 100644 --- a/services/queue/src/test/java/tech/pegasys/pantheon/services/queue/RocksDbTaskQueueTest.java +++ b/services/queue/src/test/java/tech/pegasys/pantheon/services/queue/RocksDbTaskQueueTest.java @@ -12,13 +12,17 @@ */ package tech.pegasys.pantheon.services.queue; +import static org.assertj.core.api.Assertions.assertThat; + import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; import tech.pegasys.pantheon.util.bytes.BytesValue; import java.io.IOException; +import java.nio.file.Path; import java.util.function.Function; import org.junit.Rule; +import org.junit.Test; import org.junit.rules.TemporaryFolder; public class RocksDbTaskQueueTest extends AbstractTaskQueueTest> { @@ -27,10 +31,32 @@ public class RocksDbTaskQueueTest extends AbstractTaskQueueTest createQueue() throws IOException { + final Path dataDir = folder.newFolder().toPath(); + return createQueue(dataDir); + } + + private RocksDbTaskQueue createQueue(final Path dataDir) { return RocksDbTaskQueue.create( - folder.newFolder().toPath(), - Function.identity(), - Function.identity(), - new NoOpMetricsSystem()); + dataDir, Function.identity(), Function.identity(), new NoOpMetricsSystem()); + } + + @Test + public void shouldIgnoreExistingData() throws Exception { + final Path dataDir = folder.newFolder().toPath(); + try (final RocksDbTaskQueue queue = createQueue(dataDir)) { + queue.enqueue(BytesValue.of(1)); + queue.enqueue(BytesValue.of(2)); + queue.enqueue(BytesValue.of(3)); + } + + try (final RocksDbTaskQueue resumedQueue = createQueue(dataDir)) { + assertThat(resumedQueue.dequeue()).isEqualTo(null); + + resumedQueue.enqueue(BytesValue.of(50)); + assertThat(resumedQueue.dequeue().getData()).isEqualTo(BytesValue.of(50)); + + resumedQueue.enqueue(BytesValue.of(60)); + assertThat(resumedQueue.dequeue().getData()).isEqualTo(BytesValue.of(60)); + } } } From 5a739ea9dd0b6c3e25c53c7de9ca16d2d28520b7 Mon Sep 17 00:00:00 2001 From: Adrian Sutton Date: Fri, 22 Feb 2019 13:05:38 +1000 Subject: [PATCH 04/10] Include state root in world state download starting log message. --- .../ethereum/eth/sync/worldstate/WorldStateDownloader.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java index fdd7b33511..11ce16259d 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java @@ -143,9 +143,10 @@ public WorldStateDownloader( public CompletableFuture run(final BlockHeader header) { LOG.info( - "Begin downloading world state from peers for block {} ({})", + "Begin downloading world state from peers for block {} ({}). State root {}", header.getNumber(), - header.getHash()); + header.getHash(), + header.getStateRoot()); synchronized (this) { if (status == Status.RUNNING) { final CompletableFuture failed = new CompletableFuture<>(); From 03c974a0f7ea2e874d2287479e190babbd7d4986 Mon Sep 17 00:00:00 2001 From: Adrian Sutton Date: Sun, 24 Feb 2019 06:15:12 +1000 Subject: [PATCH 05/10] More progress --- .../eth/sync/SynchronizerConfiguration.java | 2 +- .../AccountTrieNodeDataRequest.java | 5 +- .../sync/worldstate/CodeNodeDataRequest.java | 5 +- .../eth/sync/worldstate/NodeDataRequest.java | 27 +- .../StorageTrieNodeDataRequest.java | 5 +- .../sync/worldstate/TrieNodeDataRequest.java | 7 +- .../sync/worldstate/WorldStateDownloader.java | 44 +++- .../manager/DeterministicEthScheduler.java | 4 + .../manager/EthProtocolManagerTestUtil.java | 19 +- .../eth/manager/MockExecutorService.java | 2 +- .../fastsync/FastSyncChainDownloaderTest.java | 2 - .../sync/fullsync/FullSyncDownloaderTest.java | 2 - .../fullsync/FullSyncTargetManagerTest.java | 2 - .../worldstate/WorldStateDownloaderTest.java | 245 +++++++++--------- 14 files changed, 198 insertions(+), 173 deletions(-) diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SynchronizerConfiguration.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SynchronizerConfiguration.java index 01aefba71c..2f72dd8468 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SynchronizerConfiguration.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SynchronizerConfiguration.java @@ -25,7 +25,7 @@ public class SynchronizerConfiguration { // TODO: Determine reasonable defaults here private static final int DEFAULT_PIVOT_DISTANCE_FROM_HEAD = 50; private static final float DEFAULT_FULL_VALIDATION_RATE = .1f; - private static final int DEFAULT_FAST_SYNC_MINIMUM_PEERS = 5; + private static final int DEFAULT_FAST_SYNC_MINIMUM_PEERS = 1; private static final Duration DEFAULT_FAST_SYNC_MAXIMUM_PEER_WAIT_TIME = Duration.ofMinutes(5); private static final int DEFAULT_WORLD_STATE_HASH_COUNT_PER_REQUEST = 384; private static final int DEFAULT_WORLD_STATE_REQUEST_PARALLELISM = 10; diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/AccountTrieNodeDataRequest.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/AccountTrieNodeDataRequest.java index 39017347a3..53496fd82b 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/AccountTrieNodeDataRequest.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/AccountTrieNodeDataRequest.java @@ -12,8 +12,6 @@ */ package tech.pegasys.pantheon.ethereum.eth.sync.worldstate; -import static com.google.common.base.Preconditions.checkNotNull; - import tech.pegasys.pantheon.ethereum.core.Hash; import tech.pegasys.pantheon.ethereum.rlp.RLP; import tech.pegasys.pantheon.ethereum.trie.MerklePatriciaTrie; @@ -33,8 +31,7 @@ class AccountTrieNodeDataRequest extends TrieNodeDataRequest { } @Override - public void persist(final Updater updater) { - checkNotNull(getData(), "Must set data before node can be persisted."); + protected void doPersist(final Updater updater) { updater.putAccountStateTrieNode(getHash(), getData()); } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/CodeNodeDataRequest.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/CodeNodeDataRequest.java index beff1cb601..c27b1c5e94 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/CodeNodeDataRequest.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/CodeNodeDataRequest.java @@ -12,8 +12,6 @@ */ package tech.pegasys.pantheon.ethereum.eth.sync.worldstate; -import static com.google.common.base.Preconditions.checkNotNull; - import tech.pegasys.pantheon.ethereum.core.Hash; import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage; import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage.Updater; @@ -29,8 +27,7 @@ class CodeNodeDataRequest extends NodeDataRequest { } @Override - public void persist(final Updater updater) { - checkNotNull(getData(), "Must set data before node can be persisted."); + protected void doPersist(final Updater updater) { updater.putCode(getHash(), getData()); } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/NodeDataRequest.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/NodeDataRequest.java index 776665bc2b..7d46b187aa 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/NodeDataRequest.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/NodeDataRequest.java @@ -12,6 +12,8 @@ */ package tech.pegasys.pantheon.ethereum.eth.sync.worldstate; +import static com.google.common.base.Preconditions.checkNotNull; + import tech.pegasys.pantheon.ethereum.core.Hash; import tech.pegasys.pantheon.ethereum.rlp.RLP; import tech.pegasys.pantheon.ethereum.rlp.RLPInput; @@ -28,6 +30,7 @@ public abstract class NodeDataRequest { private final RequestType requestType; private final Hash hash; private BytesValue data; + private boolean requiresPersisting = true; private final AtomicInteger failedRequestCount = new AtomicInteger(0); protected NodeDataRequest(final RequestType requestType, final Hash hash) { @@ -52,14 +55,14 @@ public static BytesValue serialize(final NodeDataRequest request) { } public static NodeDataRequest deserialize(final BytesValue encoded) { - RLPInput in = RLP.input(encoded); + final RLPInput in = RLP.input(encoded); in.enterList(); - RequestType requestType = RequestType.fromValue(in.readByte()); - Hash hash = Hash.wrap(in.readBytes32()); - int failureCount = in.readIntScalar(); + final RequestType requestType = RequestType.fromValue(in.readByte()); + final Hash hash = Hash.wrap(in.readBytes32()); + final int failureCount = in.readIntScalar(); in.leaveList(); - NodeDataRequest deserialized; + final NodeDataRequest deserialized; switch (requestType) { case ACCOUNT_TRIE_NODE: deserialized = createAccountDataRequest(hash); @@ -105,6 +108,11 @@ public NodeDataRequest setData(final BytesValue data) { return this; } + public NodeDataRequest setRequiresPersisting(final boolean requiresPersisting) { + this.requiresPersisting = requiresPersisting; + return this; + } + public int trackFailure() { return failedRequestCount.incrementAndGet(); } @@ -113,7 +121,14 @@ private void setFailureCount(final int failures) { failedRequestCount.set(failures); } - public abstract void persist(final WorldStateStorage.Updater updater); + public final void persist(final WorldStateStorage.Updater updater) { + if (requiresPersisting) { + checkNotNull(getData(), "Must set data before node can be persisted."); + doPersist(updater); + } + } + + protected abstract void doPersist(final WorldStateStorage.Updater updater); public abstract Stream getChildRequests(); diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/StorageTrieNodeDataRequest.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/StorageTrieNodeDataRequest.java index 165c30e117..153864ea90 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/StorageTrieNodeDataRequest.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/StorageTrieNodeDataRequest.java @@ -12,8 +12,6 @@ */ package tech.pegasys.pantheon.ethereum.eth.sync.worldstate; -import static com.google.common.base.Preconditions.checkNotNull; - import tech.pegasys.pantheon.ethereum.core.Hash; import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage; import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage.Updater; @@ -30,8 +28,7 @@ class StorageTrieNodeDataRequest extends TrieNodeDataRequest { } @Override - public void persist(final Updater updater) { - checkNotNull(getData(), "Must set data before node can be persisted."); + protected void doPersist(final Updater updater) { updater.putAccountStorageTrieNode(getHash(), getData()); } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/TrieNodeDataRequest.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/TrieNodeDataRequest.java index a4e771ae30..55e4311850 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/TrieNodeDataRequest.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/TrieNodeDataRequest.java @@ -13,6 +13,7 @@ package tech.pegasys.pantheon.ethereum.eth.sync.worldstate; import tech.pegasys.pantheon.ethereum.core.Hash; +import tech.pegasys.pantheon.ethereum.trie.MerklePatriciaTrie; import tech.pegasys.pantheon.ethereum.trie.Node; import tech.pegasys.pantheon.ethereum.trie.TrieNodeDecoder; import tech.pegasys.pantheon.util.bytes.BytesValue; @@ -58,7 +59,11 @@ private Stream getRequestsFromLoadedTrieNode(final Node getRequestsFromChildTrieNode(final Node trieNode) { if (trieNode.isReferencedByHash()) { // If child nodes are reference by hash, we need to download them - NodeDataRequest req = createChildNodeDataRequest(Hash.wrap(trieNode.getHash())); + final Hash hash = Hash.wrap(trieNode.getHash()); + if (hash.equals(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH) || hash.equals(BytesValue.EMPTY)) { + return Stream.empty(); + } + final NodeDataRequest req = createChildNodeDataRequest(hash); return Stream.of(req); } // Otherwise if the child's value has been inlined we can go ahead and process it diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java index 11ce16259d..caa94a8028 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java @@ -60,7 +60,8 @@ public class WorldStateDownloader { private static final Logger LOG = LogManager.getLogger(); private final Counter completedRequestsCounter; - private final Counter retriedRequestsTotal; + private final Counter retriedRequestsCounter; + private final Counter existingNodeCounter; private final ArrayBlockingQueue> requestsToPersist; private enum Status { @@ -122,12 +123,18 @@ public WorldStateDownloader( MetricCategory.SYNCHRONIZER, "world_state_completed_requests_total", "Total number of node data requests completed as part of fast sync world state download"); - retriedRequestsTotal = + retriedRequestsCounter = metricsSystem.createCounter( MetricCategory.SYNCHRONIZER, "world_state_retried_requests_total", "Total number of node data requests repeated as part of fast sync world state download"); + existingNodeCounter = + metricsSystem.createCounter( + MetricCategory.SYNCHRONIZER, + "world_state_existing_nodes_total", + "Total number of node data requests completed using existing data"); + metricsSystem.createIntegerGauge( MetricCategory.SYNCHRONIZER, "world_state_node_request_failures_max", @@ -165,11 +172,12 @@ public CompletableFuture run(final BlockHeader header) { markDone(); } else { pendingRequests.enqueue(NodeDataRequest.createAccountDataRequest(stateRoot)); + + ethContext.getScheduler().scheduleSyncWorkerTask(() -> requestNodeData(header)); + persistenceTask = new PersistNodeDataTask(header); + ethContext.getScheduler().scheduleServiceTask(persistenceTask); } } - ethContext.getScheduler().scheduleSyncWorkerTask(() -> requestNodeData(header)); - persistenceTask = new PersistNodeDataTask(header); - ethContext.getScheduler().scheduleServiceTask(persistenceTask); return future; } @@ -200,6 +208,11 @@ private void requestDataFromPeer(final BlockHeader header, final EthPeer peer) { // Collect data to be requested final List> toRequest = getTasksForNextRequest(); + if (toRequest.isEmpty()) { + requestNodeData(header); + return; + } + // Request and process node data sendAndProcessRequests(peer, toRequest, header) .whenComplete( @@ -223,10 +236,9 @@ private List> getTasksForNextRequest() { final NodeDataRequest pendingRequest = pendingRequestTask.getData(); final Optional existingData = pendingRequest.getExistingData(worldStateStorage); if (existingData.isPresent()) { - pendingRequest.setData(existingData.get()); - queueChildRequests(pendingRequest); - completedRequestsCounter.inc(); - pendingRequestTask.markCompleted(); + existingNodeCounter.inc(); + pendingRequest.setData(existingData.get()).setRequiresPersisting(false); + addToPersistenceQueue(pendingRequestTask); continue; } toRequest.add(pendingRequestTask); @@ -291,7 +303,7 @@ private CompletableFuture>> storeData( final NodeDataRequest request = task.getData(); final BytesValue matchingData = data.get(request.getHash()); if (matchingData == null) { - retriedRequestsTotal.inc(); + retriedRequestsCounter.inc(); final int requestFailures = request.trackFailure(); updateHighestRetryCount(requestFailures); if (requestFailures > maxNodeRequestRetries) { @@ -300,8 +312,8 @@ private CompletableFuture>> storeData( task.markFailed(); } else { request.setData(matchingData); - queueChildRequests(request); if (isRootState(blockHeader, request)) { + queueChildRequests(request); rootNode = request.getData(); task.markCompleted(); } else { @@ -426,16 +438,22 @@ protected void executeTask() { final Updater storageUpdater = worldStateStorage.updater(); batch.forEach( taskToPersist -> { - completedRequestsCounter.inc(); - taskToPersist.getData().persist(storageUpdater); + final NodeDataRequest request = taskToPersist.getData(); + request.persist(storageUpdater); + queueChildRequests(request); taskToPersist.markCompleted(); + completedRequestsCounter.inc(); }); storageUpdater.commit(); + if (pendingRequests.allTasksCompleted()) { final Updater updater = worldStateStorage.updater(); updater.putAccountStateTrieNode(header.getStateRoot(), rootNode); updater.commit(); markDone(); + result.get().complete(null); + } else { + ethContext.getScheduler().scheduleSyncWorkerTask(() -> requestNodeData(header)); } } } catch (final InterruptedException ignore) { diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/DeterministicEthScheduler.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/DeterministicEthScheduler.java index bdbb2c6775..a96247ae57 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/DeterministicEthScheduler.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/DeterministicEthScheduler.java @@ -69,6 +69,10 @@ MockScheduledExecutor mockTransactionsExecutor() { return (MockScheduledExecutor) txWorkerExecutor; } + public MockExecutorService mockServiceExecutor() { + return (MockExecutorService) servicesExecutor; + } + @Override public void failAfterTimeout(final CompletableFuture promise, final Duration timeout) { if (timeoutPolicy.shouldTimeout()) { diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTestUtil.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTestUtil.java index 753578144c..85785039eb 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTestUtil.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTestUtil.java @@ -36,13 +36,12 @@ public static EthProtocolManager create( final WorldStateArchive worldStateArchive, final TimeoutPolicy timeoutPolicy) { return create( - blockchain, worldStateArchive, timeoutPolicy, new DeterministicEthScheduler(timeoutPolicy)); + blockchain, worldStateArchive, new DeterministicEthScheduler(timeoutPolicy)); } public static EthProtocolManager create( final Blockchain blockchain, final WorldStateArchive worldStateArchive, - final TimeoutPolicy timeoutPolicy, final EthScheduler ethScheduler) { final int networkId = 1; return new EthProtocolManager( @@ -59,17 +58,21 @@ public static EthProtocolManager create( return create(blockchain, worldStateArchive, TimeoutPolicy.NEVER); } - public static EthProtocolManager create() { - return create(TimeoutPolicy.NEVER); - } - - public static EthProtocolManager create(final TimeoutPolicy timeoutPolicy) { + public static EthProtocolManager create(final EthScheduler ethScheduler) { final ProtocolSchedule protocolSchedule = MainnetProtocolSchedule.create(); final GenesisConfigFile config = GenesisConfigFile.mainnet(); final GenesisState genesisState = GenesisState.fromConfig(config, protocolSchedule); final Blockchain blockchain = createInMemoryBlockchain(genesisState.getBlock()); final WorldStateArchive worldStateArchive = createInMemoryWorldStateArchive(); - return create(blockchain, worldStateArchive, timeoutPolicy); + return create(blockchain, worldStateArchive, ethScheduler); + } + + public static EthProtocolManager create() { + return create(TimeoutPolicy.NEVER); + } + + public static EthProtocolManager create(final TimeoutPolicy timeoutPolicy) { + return create(new DeterministicEthScheduler(timeoutPolicy)); } // Utility to prevent scheduler from automatically running submitted tasks diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/MockExecutorService.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/MockExecutorService.java index 555898b8dd..c8e54dce30 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/MockExecutorService.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/MockExecutorService.java @@ -27,7 +27,7 @@ import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; -class MockExecutorService implements ExecutorService { +public class MockExecutorService implements ExecutorService { private boolean autoRun = true; private final List> tasks = new ArrayList<>(); diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncChainDownloaderTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncChainDownloaderTest.java index 4df7085d06..6f9dfe0994 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncChainDownloaderTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncChainDownloaderTest.java @@ -21,7 +21,6 @@ import tech.pegasys.pantheon.ethereum.ProtocolContext; import tech.pegasys.pantheon.ethereum.chain.Blockchain; import tech.pegasys.pantheon.ethereum.chain.MutableBlockchain; -import tech.pegasys.pantheon.ethereum.eth.manager.DeterministicEthScheduler; import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManager; import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManagerTestUtil; @@ -68,7 +67,6 @@ public void setup() { EthProtocolManagerTestUtil.create( localBlockchain, localBlockchainSetup.getWorldArchive(), - DeterministicEthScheduler.TimeoutPolicy.NEVER, new EthScheduler(1, 1, 1, new NoOpMetricsSystem())); ethContext = ethProtocolManager.ethContext(); syncState = new SyncState(protocolContext.getBlockchain(), ethContext.getEthPeers()); diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncDownloaderTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncDownloaderTest.java index 175407cc9e..b6294fca2c 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncDownloaderTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncDownloaderTest.java @@ -28,7 +28,6 @@ import tech.pegasys.pantheon.ethereum.core.BlockDataGenerator; import tech.pegasys.pantheon.ethereum.core.BlockHeader; import tech.pegasys.pantheon.ethereum.core.TransactionReceipt; -import tech.pegasys.pantheon.ethereum.eth.manager.DeterministicEthScheduler; import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManager; import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManagerTestUtil; @@ -87,7 +86,6 @@ public void setupTest() { EthProtocolManagerTestUtil.create( localBlockchain, localBlockchainSetup.getWorldArchive(), - DeterministicEthScheduler.TimeoutPolicy.NEVER, new EthScheduler(1, 1, 1, new NoOpMetricsSystem())); ethContext = ethProtocolManager.ethContext(); syncState = new SyncState(protocolContext.getBlockchain(), ethContext.getEthPeers()); diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncTargetManagerTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncTargetManagerTest.java index 55b1d4539e..01c0d4bb7c 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncTargetManagerTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncTargetManagerTest.java @@ -19,7 +19,6 @@ import tech.pegasys.pantheon.ethereum.ProtocolContext; import tech.pegasys.pantheon.ethereum.chain.Blockchain; import tech.pegasys.pantheon.ethereum.chain.MutableBlockchain; -import tech.pegasys.pantheon.ethereum.eth.manager.DeterministicEthScheduler; import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManager; import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManagerTestUtil; @@ -65,7 +64,6 @@ public void setup() { EthProtocolManagerTestUtil.create( localBlockchain, localWorldState, - DeterministicEthScheduler.TimeoutPolicy.NEVER, new EthScheduler(1, 1, 1, new NoOpMetricsSystem())); final EthContext ethContext = ethProtocolManager.ethContext(); final SyncState syncState = diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java index 102f955f56..1fb848a42c 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java @@ -34,6 +34,7 @@ import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManager; import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManagerTestUtil; +import tech.pegasys.pantheon.ethereum.eth.manager.EthScheduler; import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer; import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer.Responder; import tech.pegasys.pantheon.ethereum.eth.messages.EthPV63; @@ -80,6 +81,8 @@ public class WorldStateDownloaderTest { private static final Hash EMPTY_TRIE_ROOT = Hash.wrap(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH); + final BlockDataGenerator dataGen = new BlockDataGenerator(1); + @Test public void downloadWorldStateFromPeers_onePeerOneWithManyRequestsOneAtATime() { downloadAvailableWorldStateFromPeers(1, 50, 1, 1); @@ -112,7 +115,6 @@ public void downloadWorldStateFromPeers_singleRequestWithMultiplePeers() { @Test public void downloadEmptyWorldState() { - BlockDataGenerator dataGen = new BlockDataGenerator(1); final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(); final BlockHeader header = dataGen @@ -120,30 +122,29 @@ public void downloadEmptyWorldState() { .getHeader(); // Create some peers - List peers = + final List peers = Stream.generate( () -> EthProtocolManagerTestUtil.createPeer(ethProtocolManager, header.getNumber())) .limit(5) .collect(Collectors.toList()); - TaskQueue queue = new InMemoryTaskQueue<>(); - WorldStateStorage localStorage = + final TaskQueue queue = new InMemoryTaskQueue<>(); + final WorldStateStorage localStorage = new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage()); - WorldStateDownloader downloader = + final WorldStateDownloader downloader = createDownloader(ethProtocolManager.ethContext(), localStorage, queue); - CompletableFuture future = downloader.run(header); + final CompletableFuture future = downloader.run(header); assertThat(future).isDone(); // Peers should not have been queried - for (RespondingEthPeer peer : peers) { + for (final RespondingEthPeer peer : peers) { assertThat(peer.hasOutstandingRequests()).isFalse(); } } @Test public void downloadAlreadyAvailableWorldState() { - BlockDataGenerator dataGen = new BlockDataGenerator(1); final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(); // Setup existing state @@ -160,29 +161,28 @@ public void downloadAlreadyAvailableWorldState() { dataGen.block(BlockOptions.create().setStateRoot(stateRoot).setBlockNumber(10)).getHeader(); // Create some peers - List peers = + final List peers = Stream.generate( () -> EthProtocolManagerTestUtil.createPeer(ethProtocolManager, header.getNumber())) .limit(5) .collect(Collectors.toList()); - TaskQueue queue = new InMemoryTaskQueue<>(); - WorldStateDownloader downloader = + final TaskQueue queue = new InMemoryTaskQueue<>(); + final WorldStateDownloader downloader = createDownloader(ethProtocolManager.ethContext(), storage, queue); - CompletableFuture future = downloader.run(header); + final CompletableFuture future = downloader.run(header); assertThat(future).isDone(); // Peers should not have been queried because we already had the state - for (RespondingEthPeer peer : peers) { + for (final RespondingEthPeer peer : peers) { assertThat(peer.hasOutstandingRequests()).isFalse(); } } @Test public void canRecoverFromTimeouts() { - BlockDataGenerator dataGen = new BlockDataGenerator(1); - TimeoutPolicy timeoutPolicy = TimeoutPolicy.timeoutXTimes(2); + final TimeoutPolicy timeoutPolicy = TimeoutPolicy.timeoutXTimes(2); final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(timeoutPolicy); // Setup "remote" state @@ -199,31 +199,31 @@ public void canRecoverFromTimeouts() { dataGen.block(BlockOptions.create().setStateRoot(stateRoot).setBlockNumber(10)).getHeader(); // Create some peers - List peers = + final List peers = Stream.generate( () -> EthProtocolManagerTestUtil.createPeer(ethProtocolManager, header.getNumber())) .limit(5) .collect(Collectors.toList()); - TaskQueue queue = new InMemoryTaskQueue<>(); - WorldStateStorage localStorage = + final TaskQueue queue = new InMemoryTaskQueue<>(); + final WorldStateStorage localStorage = new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage()); - WorldStateDownloader downloader = + final WorldStateDownloader downloader = createDownloader(ethProtocolManager.ethContext(), localStorage, queue); - CompletableFuture result = downloader.run(header); + final CompletableFuture result = downloader.run(header); // Respond to node data requests - Responder responder = + final Responder responder = RespondingEthPeer.blockchainResponder(mock(Blockchain.class), remoteWorldStateArchive); while (!result.isDone()) { - for (RespondingEthPeer peer : peers) { + for (final RespondingEthPeer peer : peers) { peer.respond(responder); } } // Check that all expected account data was downloaded - WorldStateArchive localWorldStateArchive = new WorldStateArchive(localStorage); + final WorldStateArchive localWorldStateArchive = new WorldStateArchive(localStorage); final WorldState localWorldState = localWorldStateArchive.get(stateRoot).get(); assertThat(result).isDone(); assertAccountsMatch(localWorldState, accounts); @@ -236,7 +236,6 @@ public void handlesPartialResponsesFromNetwork() { @Test public void doesNotRequestKnownCodeFromNetwork() { - BlockDataGenerator dataGen = new BlockDataGenerator(1); final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(); // Setup "remote" state @@ -253,43 +252,43 @@ public void doesNotRequestKnownCodeFromNetwork() { dataGen.block(BlockOptions.create().setStateRoot(stateRoot).setBlockNumber(10)).getHeader(); // Create some peers - List peers = + final List peers = Stream.generate( () -> EthProtocolManagerTestUtil.createPeer(ethProtocolManager, header.getNumber())) .limit(5) .collect(Collectors.toList()); - TaskQueue queue = new InMemoryTaskQueue<>(); - WorldStateStorage localStorage = + final TaskQueue queue = new InMemoryTaskQueue<>(); + final WorldStateStorage localStorage = new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage()); // Seed local storage with some contract values - Map knownCode = new HashMap<>(); + final Map knownCode = new HashMap<>(); accounts.subList(0, 5).forEach(a -> knownCode.put(a.getCodeHash(), a.getCode())); - Updater localStorageUpdater = localStorage.updater(); + final Updater localStorageUpdater = localStorage.updater(); knownCode.forEach(localStorageUpdater::putCode); localStorageUpdater.commit(); - WorldStateDownloader downloader = + final WorldStateDownloader downloader = createDownloader(ethProtocolManager.ethContext(), localStorage, queue); - CompletableFuture result = downloader.run(header); + final CompletableFuture result = downloader.run(header); // Respond to node data requests - List sentMessages = new ArrayList<>(); - Responder blockChainResponder = + final List sentMessages = new ArrayList<>(); + final Responder blockChainResponder = RespondingEthPeer.blockchainResponder(mock(Blockchain.class), remoteWorldStateArchive); - Responder responder = + final Responder responder = RespondingEthPeer.wrapResponderWithCollector(blockChainResponder, sentMessages); while (!result.isDone()) { - for (RespondingEthPeer peer : peers) { + for (final RespondingEthPeer peer : peers) { peer.respond(responder); } } // Check that known code was not requested - List requestedHashes = + final List requestedHashes = sentMessages.stream() .filter(m -> m.getCode() == EthPV63.GET_NODE_DATA) .map(GetNodeDataMessage::readFrom) @@ -299,7 +298,7 @@ public void doesNotRequestKnownCodeFromNetwork() { assertThat(Collections.disjoint(requestedHashes, knownCode.keySet())).isTrue(); // Check that all expected account data was downloaded - WorldStateArchive localWorldStateArchive = new WorldStateArchive(localStorage); + final WorldStateArchive localWorldStateArchive = new WorldStateArchive(localStorage); final WorldState localWorldState = localWorldStateArchive.get(stateRoot).get(); assertThat(result).isDone(); assertAccountsMatch(localWorldState, accounts); @@ -317,7 +316,6 @@ public void cancelDownloaderFuture() { @SuppressWarnings("unchecked") private void testCancellation(final boolean shouldCancelFuture) { - BlockDataGenerator dataGen = new BlockDataGenerator(1); final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(); // Setup "remote" state @@ -333,27 +331,27 @@ private void testCancellation(final boolean shouldCancelFuture) { dataGen.block(BlockOptions.create().setStateRoot(stateRoot).setBlockNumber(10)).getHeader(); // Create some peers - List peers = + final List peers = Stream.generate( () -> EthProtocolManagerTestUtil.createPeer(ethProtocolManager, header.getNumber())) .limit(5) .collect(Collectors.toList()); - TaskQueue queue = spy(new InMemoryTaskQueue<>()); - WorldStateStorage localStorage = + final TaskQueue queue = spy(new InMemoryTaskQueue<>()); + final WorldStateStorage localStorage = new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage()); - WorldStateDownloader downloader = + final WorldStateDownloader downloader = createDownloader(ethProtocolManager.ethContext(), localStorage, queue); - CompletableFuture result = downloader.run(header); + final CompletableFuture result = downloader.run(header); // Send a few responses - Responder responder = + final Responder responder = RespondingEthPeer.blockchainResponder(mock(Blockchain.class), remoteWorldStateArchive); for (int i = 0; i < 3; i++) { - for (RespondingEthPeer peer : peers) { + for (final RespondingEthPeer peer : peers) { peer.respond(responder); } } @@ -370,7 +368,7 @@ private void testCancellation(final boolean shouldCancelFuture) { // Send some more responses after cancelling for (int i = 0; i < 3; i++) { - for (RespondingEthPeer peer : peers) { + for (final RespondingEthPeer peer : peers) { peer.respond(responder); } } @@ -384,7 +382,6 @@ private void testCancellation(final boolean shouldCancelFuture) { @Test public void doesRequestKnownAccountTrieNodesFromNetwork() { - BlockDataGenerator dataGen = new BlockDataGenerator(1); final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(); // Setup "remote" state @@ -401,23 +398,23 @@ public void doesRequestKnownAccountTrieNodesFromNetwork() { dataGen.block(BlockOptions.create().setStateRoot(stateRoot).setBlockNumber(10)).getHeader(); // Create some peers - List peers = + final List peers = Stream.generate( () -> EthProtocolManagerTestUtil.createPeer(ethProtocolManager, header.getNumber())) .limit(5) .collect(Collectors.toList()); - TaskQueue queue = new InMemoryTaskQueue<>(); - WorldStateStorage localStorage = + final TaskQueue queue = new InMemoryTaskQueue<>(); + final WorldStateStorage localStorage = new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage()); // Seed local storage with some trie node values - Map allNodes = + final Map allNodes = collectTrieNodesToBeRequested(remoteStorage, remoteWorldState.rootHash(), 5); final Set knownNodes = new HashSet<>(); final Set unknownNodes = new HashSet<>(); assertThat(allNodes.size()).isGreaterThan(0); // Sanity check - Updater localStorageUpdater = localStorage.updater(); + final Updater localStorageUpdater = localStorage.updater(); final AtomicBoolean storeNode = new AtomicBoolean(true); allNodes.forEach( (nodeHash, node) -> { @@ -431,26 +428,26 @@ public void doesRequestKnownAccountTrieNodesFromNetwork() { }); localStorageUpdater.commit(); - WorldStateDownloader downloader = + final WorldStateDownloader downloader = createDownloader(ethProtocolManager.ethContext(), localStorage, queue); - CompletableFuture result = downloader.run(header); + final CompletableFuture result = downloader.run(header); // Respond to node data requests - List sentMessages = new ArrayList<>(); - Responder blockChainResponder = + final List sentMessages = new ArrayList<>(); + final Responder blockChainResponder = RespondingEthPeer.blockchainResponder(mock(Blockchain.class), remoteWorldStateArchive); - Responder responder = + final Responder responder = RespondingEthPeer.wrapResponderWithCollector(blockChainResponder, sentMessages); while (!result.isDone()) { - for (RespondingEthPeer peer : peers) { + for (final RespondingEthPeer peer : peers) { peer.respond(responder); } } // Check that known trie nodes were requested - List requestedHashes = + final List requestedHashes = sentMessages.stream() .filter(m -> m.getCode() == EthPV63.GET_NODE_DATA) .map(GetNodeDataMessage::readFrom) @@ -461,7 +458,7 @@ public void doesRequestKnownAccountTrieNodesFromNetwork() { assertThat(requestedHashes).doesNotContainAnyElementsOf(knownNodes); // Check that all expected account data was downloaded - WorldStateArchive localWorldStateArchive = new WorldStateArchive(localStorage); + final WorldStateArchive localWorldStateArchive = new WorldStateArchive(localStorage); final WorldState localWorldState = localWorldStateArchive.get(stateRoot).get(); assertThat(result).isDone(); assertAccountsMatch(localWorldState, accounts); @@ -469,7 +466,6 @@ public void doesRequestKnownAccountTrieNodesFromNetwork() { @Test public void doesRequestKnownStorageTrieNodesFromNetwork() { - BlockDataGenerator dataGen = new BlockDataGenerator(1); final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(); // Setup "remote" state @@ -486,18 +482,18 @@ public void doesRequestKnownStorageTrieNodesFromNetwork() { dataGen.block(BlockOptions.create().setStateRoot(stateRoot).setBlockNumber(10)).getHeader(); // Create some peers - List peers = + final List peers = Stream.generate( () -> EthProtocolManagerTestUtil.createPeer(ethProtocolManager, header.getNumber())) .limit(5) .collect(Collectors.toList()); - TaskQueue queue = new InMemoryTaskQueue<>(); - WorldStateStorage localStorage = + final TaskQueue queue = new InMemoryTaskQueue<>(); + final WorldStateStorage localStorage = new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage()); // Seed local storage with some trie node values - List storageRootHashes = + final List storageRootHashes = new StoredMerklePatriciaTrie<>( remoteStorage::getNodeData, remoteWorldState.rootHash(), @@ -508,18 +504,18 @@ public void doesRequestKnownStorageTrieNodesFromNetwork() { .map(StateTrieAccountValue::readFrom) .map(StateTrieAccountValue::getStorageRoot) .collect(Collectors.toList()); - Map allTrieNodes = new HashMap<>(); + final Map allTrieNodes = new HashMap<>(); final Set knownNodes = new HashSet<>(); final Set unknownNodes = new HashSet<>(); - for (Bytes32 storageRootHash : storageRootHashes) { + for (final Bytes32 storageRootHash : storageRootHashes) { allTrieNodes.putAll(collectTrieNodesToBeRequested(remoteStorage, storageRootHash, 5)); } assertThat(allTrieNodes.size()).isGreaterThan(0); // Sanity check - Updater localStorageUpdater = localStorage.updater(); + final Updater localStorageUpdater = localStorage.updater(); boolean storeNode = true; - for (Entry entry : allTrieNodes.entrySet()) { - Bytes32 hash = entry.getKey(); - BytesValue data = entry.getValue(); + for (final Entry entry : allTrieNodes.entrySet()) { + final Bytes32 hash = entry.getKey(); + final BytesValue data = entry.getValue(); if (storeNode) { localStorageUpdater.putAccountStorageTrieNode(hash, data); knownNodes.add(hash); @@ -530,22 +526,22 @@ public void doesRequestKnownStorageTrieNodesFromNetwork() { } localStorageUpdater.commit(); - WorldStateDownloader downloader = + final WorldStateDownloader downloader = createDownloader(ethProtocolManager.ethContext(), localStorage, queue); - CompletableFuture result = downloader.run(header); + final CompletableFuture result = downloader.run(header); // Respond to node data requests - List sentMessages = new ArrayList<>(); - Responder blockChainResponder = + final List sentMessages = new ArrayList<>(); + final Responder blockChainResponder = RespondingEthPeer.blockchainResponder(mock(Blockchain.class), remoteWorldStateArchive); - Responder responder = + final Responder responder = RespondingEthPeer.wrapResponderWithCollector(blockChainResponder, sentMessages); while (!result.isDone()) { // World state should not be available until the entire state is downloaded assertThat(localStorage.isWorldStateAvailable(stateRoot)).isFalse(); - for (RespondingEthPeer peer : peers) { + for (final RespondingEthPeer peer : peers) { peer.respond(responder); } } @@ -553,7 +549,7 @@ public void doesRequestKnownStorageTrieNodesFromNetwork() { assertThat(localStorage.isWorldStateAvailable(stateRoot)).isTrue(); // Check that known trie nodes were requested - List requestedHashes = + final List requestedHashes = sentMessages.stream() .filter(m -> m.getCode() == EthPV63.GET_NODE_DATA) .map(GetNodeDataMessage::readFrom) @@ -564,7 +560,7 @@ public void doesRequestKnownStorageTrieNodesFromNetwork() { assertThat(requestedHashes).doesNotContainAnyElementsOf(knownNodes); // Check that all expected account data was downloaded - WorldStateArchive localWorldStateArchive = new WorldStateArchive(localStorage); + final WorldStateArchive localWorldStateArchive = new WorldStateArchive(localStorage); final WorldState localWorldState = localWorldStateArchive.get(stateRoot).get(); assertThat(result).isDone(); assertAccountsMatch(localWorldState, accounts); @@ -587,7 +583,6 @@ public void stalledDownloaderWithNoRetries() { private void simulateStalledDownload(final int maxRetries) { final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(); - BlockDataGenerator dataGen = new BlockDataGenerator(1); // Setup "remote" state final WorldStateStorage remoteStorage = @@ -602,26 +597,26 @@ private void simulateStalledDownload(final int maxRetries) { final BlockHeader header = dataGen.block(BlockOptions.create().setStateRoot(stateRoot).setBlockNumber(10)).getHeader(); - TaskQueue queue = new InMemoryTaskQueue<>(); - WorldStateStorage localStorage = + final TaskQueue queue = new InMemoryTaskQueue<>(); + final WorldStateStorage localStorage = new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage()); - SynchronizerConfiguration syncConfig = + final SynchronizerConfiguration syncConfig = SynchronizerConfiguration.builder().worldStateRequestMaxRetries(maxRetries).build(); - WorldStateDownloader downloader = + final WorldStateDownloader downloader = createDownloader(syncConfig, ethProtocolManager.ethContext(), localStorage, queue); // Create a peer that can respond - RespondingEthPeer peer = + final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, header.getNumber()); // Start downloader - CompletableFuture result = downloader.run(header); + final CompletableFuture result = downloader.run(header); // A second run should return an error without impacting the first result - CompletableFuture secondResult = downloader.run(header); + final CompletableFuture secondResult = downloader.run(header); assertThat(secondResult).isCompletedExceptionally(); assertThat(result).isNotCompletedExceptionally(); - Responder emptyResponder = RespondingEthPeer.emptyResponder(); + final Responder emptyResponder = RespondingEthPeer.emptyResponder(); for (int i = 0; i < maxRetries; i++) { peer.respond(emptyResponder); } @@ -646,22 +641,22 @@ private void simulateStalledDownload(final int maxRetries) { */ private Map collectTrieNodesToBeRequested( final WorldStateStorage storage, final Bytes32 rootHash, final int maxNodes) { - Map trieNodes = new HashMap<>(); + final Map trieNodes = new HashMap<>(); - TrieNodeDecoder decoder = TrieNodeDecoder.create(); - BytesValue rootNode = storage.getNodeData(rootHash).get(); + final TrieNodeDecoder decoder = TrieNodeDecoder.create(); + final BytesValue rootNode = storage.getNodeData(rootHash).get(); // Walk through hash-referenced nodes - List> hashReferencedNodes = new ArrayList<>(); + final List> hashReferencedNodes = new ArrayList<>(); hashReferencedNodes.add(decoder.decode(rootNode)); while (!hashReferencedNodes.isEmpty() && trieNodes.size() < maxNodes) { - Node currentNode = hashReferencedNodes.remove(0); - List> children = new ArrayList<>(); + final Node currentNode = hashReferencedNodes.remove(0); + final List> children = new ArrayList<>(); currentNode.getChildren().ifPresent(children::addAll); while (!children.isEmpty() && trieNodes.size() < maxNodes) { - Node child = children.remove(0); + final Node child = children.remove(0); if (child.isReferencedByHash()) { - BytesValue childNode = storage.getNodeData(child.getHash()).get(); + final BytesValue childNode = storage.getNodeData(child.getHash()).get(); trieNodes.put(child.getHash(), childNode); hashReferencedNodes.add(decoder.decode(childNode)); } else { @@ -688,9 +683,9 @@ private void downloadAvailableWorldStateFromPeers( final int hashesPerRequest, final int maxOutstandingRequests, final NetworkResponder networkResponder) { - final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(); + final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(new EthScheduler(1, 1, 1, new NoOpMetricsSystem())); + final int trailingPeerCount = 5; - BlockDataGenerator dataGen = new BlockDataGenerator(1); // Setup "remote" state final WorldStateStorage remoteStorage = @@ -707,33 +702,33 @@ private void downloadAvailableWorldStateFromPeers( // Generate more data that should not be downloaded final List otherAccounts = dataGen.createRandomAccounts(remoteWorldState, 5); - Hash otherStateRoot = remoteWorldState.rootHash(); - BlockHeader otherHeader = + final Hash otherStateRoot = remoteWorldState.rootHash(); + final BlockHeader otherHeader = dataGen .block(BlockOptions.create().setStateRoot(otherStateRoot).setBlockNumber(11)) .getHeader(); assertThat(otherStateRoot).isNotEqualTo(stateRoot); // Sanity check - TaskQueue queue = new InMemoryTaskQueue<>(); - WorldStateStorage localStorage = + final TaskQueue queue = new InMemoryTaskQueue<>(); + final WorldStateStorage localStorage = new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage()); - WorldStateArchive localWorldStateArchive = new WorldStateArchive(localStorage); - SynchronizerConfiguration syncConfig = + final WorldStateArchive localWorldStateArchive = new WorldStateArchive(localStorage); + final SynchronizerConfiguration syncConfig = SynchronizerConfiguration.builder() .worldStateHashCountPerRequest(hashesPerRequest) .worldStateRequestParallelism(maxOutstandingRequests) .build(); - WorldStateDownloader downloader = + final WorldStateDownloader downloader = createDownloader(syncConfig, ethProtocolManager.ethContext(), localStorage, queue); // Create some peers that can respond - List usefulPeers = + final List usefulPeers = Stream.generate( () -> EthProtocolManagerTestUtil.createPeer(ethProtocolManager, header.getNumber())) .limit(peerCount) .collect(Collectors.toList()); // And some irrelevant peers - List trailingPeers = + final List trailingPeers = Stream.generate( () -> EthProtocolManagerTestUtil.createPeer( @@ -742,17 +737,17 @@ private void downloadAvailableWorldStateFromPeers( .collect(Collectors.toList()); // Start downloader - CompletableFuture result = downloader.run(header); + final CompletableFuture result = downloader.run(header); // A second run should return an error without impacting the first result - CompletableFuture secondResult = downloader.run(header); + final CompletableFuture secondResult = downloader.run(header); assertThat(secondResult).isCompletedExceptionally(); assertThat(result).isNotCompletedExceptionally(); // Respond to node data requests // Send one round of full responses, so that we can get multiple requests queued up - Responder fullResponder = + final Responder fullResponder = RespondingEthPeer.blockchainResponder(mock(Blockchain.class), remoteWorldStateArchive); - for (RespondingEthPeer peer : usefulPeers) { + for (final RespondingEthPeer peer : usefulPeers) { peer.respond(fullResponder); } // Respond to remaining queued requests in custom way @@ -761,7 +756,7 @@ private void downloadAvailableWorldStateFromPeers( } // Check that trailing peers were not queried for data - for (RespondingEthPeer trailingPeer : trailingPeers) { + for (final RespondingEthPeer trailingPeer : trailingPeers) { assertThat(trailingPeer.hasOutstandingRequests()).isFalse(); } @@ -772,7 +767,7 @@ private void downloadAvailableWorldStateFromPeers( // We shouldn't have any extra data locally assertThat(localStorage.contains(otherHeader.getStateRoot())).isFalse(); - for (Account otherAccount : otherAccounts) { + for (final Account otherAccount : otherAccounts) { assertThat(localWorldState.get(otherAccount.getAddress())).isNull(); } } @@ -781,10 +776,10 @@ private void respondFully( final List peers, final WorldStateArchive remoteWorldStateArchive, final CompletableFuture downloaderFuture) { - Responder responder = + final Responder responder = RespondingEthPeer.blockchainResponder(mock(Blockchain.class), remoteWorldStateArchive); while (!downloaderFuture.isDone()) { - for (RespondingEthPeer peer : peers) { + for (final RespondingEthPeer peer : peers) { peer.respond(responder); } } @@ -794,16 +789,16 @@ private void respondPartially( final List peers, final WorldStateArchive remoteWorldStateArchive, final CompletableFuture downloaderFuture) { - Responder fullResponder = + final Responder fullResponder = RespondingEthPeer.blockchainResponder(mock(Blockchain.class), remoteWorldStateArchive); - Responder partialResponder = + final Responder partialResponder = RespondingEthPeer.partialResponder( mock(Blockchain.class), remoteWorldStateArchive, MainnetProtocolSchedule.create(), .5f); - Responder emptyResponder = RespondingEthPeer.emptyResponder(); + final Responder emptyResponder = RespondingEthPeer.emptyResponder(); // Send a few partial responses for (int i = 0; i < 5; i++) { - for (RespondingEthPeer peer : peers) { + for (final RespondingEthPeer peer : peers) { peer.respond(partialResponder); } } @@ -813,7 +808,7 @@ private void respondPartially( // Send a few empty responses for (int i = 0; i < 3; i++) { - for (RespondingEthPeer peer : peers) { + for (final RespondingEthPeer peer : peers) { peer.respond(emptyResponder); } } @@ -822,7 +817,7 @@ private void respondPartially( assertThat(downloaderFuture).isNotDone(); while (!downloaderFuture.isDone()) { - for (RespondingEthPeer peer : peers) { + for (final RespondingEthPeer peer : peers) { peer.respond(fullResponder); } } @@ -830,16 +825,16 @@ private void respondPartially( private void assertAccountsMatch( final WorldState worldState, final List expectedAccounts) { - for (Account expectedAccount : expectedAccounts) { - Account actualAccount = worldState.get(expectedAccount.getAddress()); + for (final Account expectedAccount : expectedAccounts) { + final Account actualAccount = worldState.get(expectedAccount.getAddress()); assertThat(actualAccount).isNotNull(); // Check each field assertThat(actualAccount.getNonce()).isEqualTo(expectedAccount.getNonce()); assertThat(actualAccount.getCode()).isEqualTo(expectedAccount.getCode()); assertThat(actualAccount.getBalance()).isEqualTo(expectedAccount.getBalance()); - Map actualStorage = actualAccount.storageEntriesFrom(Bytes32.ZERO, 500); - Map expectedStorage = expectedAccount.storageEntriesFrom(Bytes32.ZERO, 500); + final Map actualStorage = actualAccount.storageEntriesFrom(Bytes32.ZERO, 500); + final Map expectedStorage = expectedAccount.storageEntriesFrom(Bytes32.ZERO, 500); assertThat(actualStorage).isEqualTo(expectedStorage); } } From 09072103897fd36221bb1c487a1e06183aae822e Mon Sep 17 00:00:00 2001 From: Adrian Sutton Date: Sun, 24 Feb 2019 08:00:22 +1000 Subject: [PATCH 06/10] Fix minimum peers. --- .../pantheon/ethereum/eth/sync/SynchronizerConfiguration.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SynchronizerConfiguration.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SynchronizerConfiguration.java index 2f72dd8468..01aefba71c 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SynchronizerConfiguration.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SynchronizerConfiguration.java @@ -25,7 +25,7 @@ public class SynchronizerConfiguration { // TODO: Determine reasonable defaults here private static final int DEFAULT_PIVOT_DISTANCE_FROM_HEAD = 50; private static final float DEFAULT_FULL_VALIDATION_RATE = .1f; - private static final int DEFAULT_FAST_SYNC_MINIMUM_PEERS = 1; + private static final int DEFAULT_FAST_SYNC_MINIMUM_PEERS = 5; private static final Duration DEFAULT_FAST_SYNC_MAXIMUM_PEER_WAIT_TIME = Duration.ofMinutes(5); private static final int DEFAULT_WORLD_STATE_HASH_COUNT_PER_REQUEST = 384; private static final int DEFAULT_WORLD_STATE_REQUEST_PARALLELISM = 10; From b39595d4f224bebdd927b8718a220d2d66004871 Mon Sep 17 00:00:00 2001 From: Adrian Sutton Date: Mon, 25 Feb 2019 06:26:58 +1000 Subject: [PATCH 07/10] Fix most of FullSyncTargetManagerTest. --- .../manager/EthProtocolManagerTestUtil.java | 3 +- .../fullsync/FullSyncTargetManagerTest.java | 4 +- .../worldstate/WorldStateDownloaderTest.java | 83 +++++++++++-------- 3 files changed, 51 insertions(+), 39 deletions(-) diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTestUtil.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTestUtil.java index 85785039eb..9fed7e8435 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTestUtil.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTestUtil.java @@ -35,8 +35,7 @@ public static EthProtocolManager create( final Blockchain blockchain, final WorldStateArchive worldStateArchive, final TimeoutPolicy timeoutPolicy) { - return create( - blockchain, worldStateArchive, new DeterministicEthScheduler(timeoutPolicy)); + return create(blockchain, worldStateArchive, new DeterministicEthScheduler(timeoutPolicy)); } public static EthProtocolManager create( diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncTargetManagerTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncTargetManagerTest.java index 01c0d4bb7c..4b0cfb2ab3 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncTargetManagerTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncTargetManagerTest.java @@ -62,9 +62,7 @@ public void setup() { new ProtocolContext<>(localBlockchain, localWorldState, null); ethProtocolManager = EthProtocolManagerTestUtil.create( - localBlockchain, - localWorldState, - new EthScheduler(1, 1, 1, new NoOpMetricsSystem())); + localBlockchain, localWorldState, new EthScheduler(1, 1, 1, new NoOpMetricsSystem())); final EthContext ethContext = ethProtocolManager.ethContext(); final SyncState syncState = new SyncState(protocolContext.getBlockchain(), ethContext.getEthPeers()); diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java index 1fb848a42c..8ae3cc8004 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java @@ -30,11 +30,13 @@ import tech.pegasys.pantheon.ethereum.core.Hash; import tech.pegasys.pantheon.ethereum.core.MutableWorldState; import tech.pegasys.pantheon.ethereum.core.WorldState; +import tech.pegasys.pantheon.ethereum.eth.manager.DeterministicEthScheduler; import tech.pegasys.pantheon.ethereum.eth.manager.DeterministicEthScheduler.TimeoutPolicy; import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManager; import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManagerTestUtil; import tech.pegasys.pantheon.ethereum.eth.manager.EthScheduler; +import tech.pegasys.pantheon.ethereum.eth.manager.MockExecutorService; import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer; import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer.Responder; import tech.pegasys.pantheon.ethereum.eth.messages.EthPV63; @@ -70,11 +72,13 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.LockSupport; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; +import org.junit.Ignore; import org.junit.Test; public class WorldStateDownloaderTest { @@ -115,7 +119,8 @@ public void downloadWorldStateFromPeers_singleRequestWithMultiplePeers() { @Test public void downloadEmptyWorldState() { - final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(); + final EthProtocolManager ethProtocolManager = + EthProtocolManagerTestUtil.create(new EthScheduler(1, 1, 1, new NoOpMetricsSystem())); final BlockHeader header = dataGen .block(BlockOptions.create().setStateRoot(EMPTY_TRIE_ROOT).setBlockNumber(10)) @@ -145,7 +150,8 @@ public void downloadEmptyWorldState() { @Test public void downloadAlreadyAvailableWorldState() { - final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(); + final EthProtocolManager ethProtocolManager = + EthProtocolManagerTestUtil.create(new EthScheduler(1, 1, 1, new NoOpMetricsSystem())); // Setup existing state final WorldStateStorage storage = @@ -181,6 +187,7 @@ public void downloadAlreadyAvailableWorldState() { } @Test + @Ignore public void canRecoverFromTimeouts() { final TimeoutPolicy timeoutPolicy = TimeoutPolicy.timeoutXTimes(2); final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(timeoutPolicy); @@ -220,6 +227,7 @@ public void canRecoverFromTimeouts() { for (final RespondingEthPeer peer : peers) { peer.respond(responder); } + giveOtherThreadsAGo(); } // Check that all expected account data was downloaded @@ -236,7 +244,8 @@ public void handlesPartialResponsesFromNetwork() { @Test public void doesNotRequestKnownCodeFromNetwork() { - final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(); + final EthProtocolManager ethProtocolManager = + EthProtocolManagerTestUtil.create(new EthScheduler(1, 1, 1, new NoOpMetricsSystem())); // Setup "remote" state final WorldStateStorage remoteStorage = @@ -285,6 +294,7 @@ public void doesNotRequestKnownCodeFromNetwork() { for (final RespondingEthPeer peer : peers) { peer.respond(responder); } + giveOtherThreadsAGo(); } // Check that known code was not requested @@ -317,6 +327,11 @@ public void cancelDownloaderFuture() { @SuppressWarnings("unchecked") private void testCancellation(final boolean shouldCancelFuture) { final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(); + // Prevent the persistence service from running + final MockExecutorService serviceExecutor = + ((DeterministicEthScheduler) ethProtocolManager.ethContext().getScheduler()) + .mockServiceExecutor(); + serviceExecutor.setAutoRun(false); // Setup "remote" state final WorldStateStorage remoteStorage = @@ -354,6 +369,7 @@ private void testCancellation(final boolean shouldCancelFuture) { for (final RespondingEthPeer peer : peers) { peer.respond(responder); } + giveOtherThreadsAGo(); } assertThat(result.isDone()).isFalse(); // Sanity check @@ -371,8 +387,12 @@ private void testCancellation(final boolean shouldCancelFuture) { for (final RespondingEthPeer peer : peers) { peer.respond(responder); } + giveOtherThreadsAGo(); } + // Now allow the persistence service to run which should exit immediately + serviceExecutor.runPendingFutures(); + verify(queue, times(1)).clear(); verify(queue, never()).dequeue(); verify(queue, never()).enqueue(any()); @@ -382,7 +402,8 @@ private void testCancellation(final boolean shouldCancelFuture) { @Test public void doesRequestKnownAccountTrieNodesFromNetwork() { - final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(); + final EthProtocolManager ethProtocolManager = + EthProtocolManagerTestUtil.create(new EthScheduler(1, 1, 1, new NoOpMetricsSystem())); // Setup "remote" state final WorldStateStorage remoteStorage = @@ -444,6 +465,7 @@ public void doesRequestKnownAccountTrieNodesFromNetwork() { for (final RespondingEthPeer peer : peers) { peer.respond(responder); } + giveOtherThreadsAGo(); } // Check that known trie nodes were requested @@ -466,7 +488,8 @@ public void doesRequestKnownAccountTrieNodesFromNetwork() { @Test public void doesRequestKnownStorageTrieNodesFromNetwork() { - final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(); + final EthProtocolManager ethProtocolManager = + EthProtocolManagerTestUtil.create(new EthScheduler(1, 1, 1, new NoOpMetricsSystem())); // Setup "remote" state final WorldStateStorage remoteStorage = @@ -539,11 +562,10 @@ public void doesRequestKnownStorageTrieNodesFromNetwork() { RespondingEthPeer.wrapResponderWithCollector(blockChainResponder, sentMessages); while (!result.isDone()) { - // World state should not be available until the entire state is downloaded - assertThat(localStorage.isWorldStateAvailable(stateRoot)).isFalse(); for (final RespondingEthPeer peer : peers) { peer.respond(responder); } + giveOtherThreadsAGo(); } // World state should be available by the time the result is complete assertThat(localStorage.isWorldStateAvailable(stateRoot)).isTrue(); @@ -568,21 +590,8 @@ public void doesRequestKnownStorageTrieNodesFromNetwork() { @Test public void stalledDownloader() { - simulateStalledDownload(10); - } - - @Test - public void stalledDownloaderWithOneRetry() { - simulateStalledDownload(1); - } - - @Test - public void stalledDownloaderWithNoRetries() { - simulateStalledDownload(0); - } - - private void simulateStalledDownload(final int maxRetries) { - final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(); + final EthProtocolManager ethProtocolManager = + EthProtocolManagerTestUtil.create(new EthScheduler(1, 1, 1, new NoOpMetricsSystem())); // Setup "remote" state final WorldStateStorage remoteStorage = @@ -601,7 +610,7 @@ private void simulateStalledDownload(final int maxRetries) { final WorldStateStorage localStorage = new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage()); final SynchronizerConfiguration syncConfig = - SynchronizerConfiguration.builder().worldStateRequestMaxRetries(maxRetries).build(); + SynchronizerConfiguration.builder().worldStateRequestMaxRetries(10).build(); final WorldStateDownloader downloader = createDownloader(syncConfig, ethProtocolManager.ethContext(), localStorage, queue); @@ -617,14 +626,8 @@ private void simulateStalledDownload(final int maxRetries) { assertThat(result).isNotCompletedExceptionally(); final Responder emptyResponder = RespondingEthPeer.emptyResponder(); - for (int i = 0; i < maxRetries; i++) { - peer.respond(emptyResponder); - } - // Downloader should not be done yet - assertThat(result).isNotDone(); + peer.respondWhileOtherThreadsWork(emptyResponder, () -> !result.isDone()); - // One more empty response should trigger a failure - peer.respond(emptyResponder); assertThat(result).isCompletedExceptionally(); assertThatThrownBy(result::get).hasCauseInstanceOf(StalledDownloadException.class); } @@ -683,7 +686,8 @@ private void downloadAvailableWorldStateFromPeers( final int hashesPerRequest, final int maxOutstandingRequests, final NetworkResponder networkResponder) { - final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(new EthScheduler(1, 1, 1, new NoOpMetricsSystem())); + final EthProtocolManager ethProtocolManager = + EthProtocolManagerTestUtil.create(new EthScheduler(1, 1, 1, new NoOpMetricsSystem())); final int trailingPeerCount = 5; @@ -751,8 +755,9 @@ private void downloadAvailableWorldStateFromPeers( peer.respond(fullResponder); } // Respond to remaining queued requests in custom way - if (!result.isDone()) { + while (!result.isDone()) { networkResponder.respond(usefulPeers, remoteWorldStateArchive, result); + giveOtherThreadsAGo(); } // Check that trailing peers were not queried for data @@ -782,6 +787,7 @@ private void respondFully( for (final RespondingEthPeer peer : peers) { peer.respond(responder); } + giveOtherThreadsAGo(); } } @@ -801,6 +807,7 @@ private void respondPartially( for (final RespondingEthPeer peer : peers) { peer.respond(partialResponder); } + giveOtherThreadsAGo(); } // Downloader should not complete with partial responses @@ -811,6 +818,7 @@ private void respondPartially( for (final RespondingEthPeer peer : peers) { peer.respond(emptyResponder); } + giveOtherThreadsAGo(); } // Downloader should not complete with empty responses @@ -820,6 +828,7 @@ private void respondPartially( for (final RespondingEthPeer peer : peers) { peer.respond(fullResponder); } + giveOtherThreadsAGo(); } } @@ -833,8 +842,10 @@ private void assertAccountsMatch( assertThat(actualAccount.getCode()).isEqualTo(expectedAccount.getCode()); assertThat(actualAccount.getBalance()).isEqualTo(expectedAccount.getBalance()); - final Map actualStorage = actualAccount.storageEntriesFrom(Bytes32.ZERO, 500); - final Map expectedStorage = expectedAccount.storageEntriesFrom(Bytes32.ZERO, 500); + final Map actualStorage = + actualAccount.storageEntriesFrom(Bytes32.ZERO, 500); + final Map expectedStorage = + expectedAccount.storageEntriesFrom(Bytes32.ZERO, 500); assertThat(actualStorage).isEqualTo(expectedStorage); } } @@ -862,6 +873,10 @@ private WorldStateDownloader createDownloader( new NoOpMetricsSystem()); } + private void giveOtherThreadsAGo() { + LockSupport.parkNanos(2000); + } + @FunctionalInterface private interface NetworkResponder { void respond( From c1428b94344e31476d08258c073e98c5144bc54f Mon Sep 17 00:00:00 2001 From: Adrian Sutton Date: Mon, 25 Feb 2019 06:36:03 +1000 Subject: [PATCH 08/10] Fix timeout test. --- .../worldstate/WorldStateDownloaderTest.java | 28 +++++++++++++++++-- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java index 8ae3cc8004..4bf33f8889 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java @@ -71,6 +71,9 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.LockSupport; import java.util.function.Function; @@ -78,14 +81,27 @@ import java.util.stream.Stream; import java.util.stream.StreamSupport; -import org.junit.Ignore; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.junit.After; import org.junit.Test; public class WorldStateDownloaderTest { private static final Hash EMPTY_TRIE_ROOT = Hash.wrap(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH); - final BlockDataGenerator dataGen = new BlockDataGenerator(1); + private final BlockDataGenerator dataGen = new BlockDataGenerator(1); + private final ExecutorService persistenceThread = + Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat(WorldStateDownloaderTest.class.getSimpleName() + "-persistence-%d") + .build()); + + @After + public void tearDown() throws Exception { + persistenceThread.shutdownNow(); + assertThat(persistenceThread.awaitTermination(10, TimeUnit.SECONDS)).isTrue(); + } @Test public void downloadWorldStateFromPeers_onePeerOneWithManyRequestsOneAtATime() { @@ -187,10 +203,13 @@ public void downloadAlreadyAvailableWorldState() { } @Test - @Ignore public void canRecoverFromTimeouts() { final TimeoutPolicy timeoutPolicy = TimeoutPolicy.timeoutXTimes(2); final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(timeoutPolicy); + final MockExecutorService serviceExecutor = + ((DeterministicEthScheduler) ethProtocolManager.ethContext().getScheduler()) + .mockServiceExecutor(); + serviceExecutor.setAutoRun(false); // Setup "remote" state final WorldStateStorage remoteStorage = @@ -220,9 +239,12 @@ public void canRecoverFromTimeouts() { final CompletableFuture result = downloader.run(header); + persistenceThread.submit(serviceExecutor::runPendingFutures); + // Respond to node data requests final Responder responder = RespondingEthPeer.blockchainResponder(mock(Blockchain.class), remoteWorldStateArchive); + while (!result.isDone()) { for (final RespondingEthPeer peer : peers) { peer.respond(responder); From 4d26cf681a9a4216a791cf7df683253848c2bf9d Mon Sep 17 00:00:00 2001 From: Adrian Sutton Date: Mon, 25 Feb 2019 06:50:23 +1000 Subject: [PATCH 09/10] Log and recover from errors in the persistence thread. --- .../ethereum/eth/sync/worldstate/WorldStateDownloader.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java index 9b009a70e5..a1ae06fee4 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java @@ -449,6 +449,12 @@ protected void executeTask() { } } catch (final InterruptedException ignore) { Thread.currentThread().interrupt(); + } catch (final RuntimeException e) { + LOG.error("Unexpected error while persisting world state", e); + // Assume we failed to persist any of the requests and ensure we have something + // scheduled to kick off another round of requests. + batch.forEach(Task::markFailed); + ethContext.getScheduler().scheduleSyncWorkerTask(() -> requestNodeData(header)); } } } From 8587c59d2c08ec98368205a01ad517b088cee0f1 Mon Sep 17 00:00:00 2001 From: Adrian Sutton Date: Tue, 26 Feb 2019 07:20:51 +1000 Subject: [PATCH 10/10] Speak like yoda. --- .../ethereum/eth/sync/worldstate/TrieNodeDataRequest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/TrieNodeDataRequest.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/TrieNodeDataRequest.java index 55e4311850..35840c15eb 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/TrieNodeDataRequest.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/TrieNodeDataRequest.java @@ -60,7 +60,7 @@ private Stream getRequestsFromChildTrieNode(final Node