Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Fix state download race condition by creating a TaskQueue API #853

Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
import tech.pegasys.pantheon.metrics.MetricCategory;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.metrics.OperationTimer;
import tech.pegasys.pantheon.services.queue.BigQueue;
import tech.pegasys.pantheon.services.queue.BytesQueue;
import tech.pegasys.pantheon.services.queue.BytesQueueAdapter;
import tech.pegasys.pantheon.services.queue.RocksDbQueue;
import tech.pegasys.pantheon.services.queue.BytesTaskQueue;
import tech.pegasys.pantheon.services.queue.BytesTaskQueueAdapter;
import tech.pegasys.pantheon.services.queue.RocksDbTaskQueue;
import tech.pegasys.pantheon.services.queue.TaskQueue;

import java.io.File;
import java.io.IOException;
Expand All @@ -50,14 +50,14 @@ class FastSynchronizer<C> {

private final FastSyncDownloader<C> fastSyncDownloader;
private final Path fastSyncDataDirectory;
private final BigQueue<NodeDataRequest> stateQueue;
private final TaskQueue<NodeDataRequest> stateQueue;
private final WorldStateDownloader worldStateDownloader;
private final FastSyncState initialSyncState;

private FastSynchronizer(
final FastSyncDownloader<C> fastSyncDownloader,
final Path fastSyncDataDirectory,
final BigQueue<NodeDataRequest> stateQueue,
final TaskQueue<NodeDataRequest> stateQueue,
final WorldStateDownloader worldStateDownloader,
final FastSyncState initialSyncState) {
this.fastSyncDownloader = fastSyncDownloader;
Expand Down Expand Up @@ -94,7 +94,7 @@ public static <C> Optional<FastSynchronizer<C>> create(
return Optional.empty();
}

final BigQueue<NodeDataRequest> stateQueue =
final TaskQueue<NodeDataRequest> stateQueue =
createWorldStateDownloaderQueue(getStateQueueDirectory(dataDirectory), metricsSystem);
final WorldStateDownloader worldStateDownloader =
new WorldStateDownloader(
Expand Down Expand Up @@ -166,10 +166,10 @@ private static void ensureDirectoryExists(final File dir) {
}
}

private static BigQueue<NodeDataRequest> createWorldStateDownloaderQueue(
private static TaskQueue<NodeDataRequest> createWorldStateDownloaderQueue(
final Path dataDirectory, final MetricsSystem metricsSystem) {
final BytesQueue bytesQueue = RocksDbQueue.create(dataDirectory, metricsSystem);
return new BytesQueueAdapter<>(
final BytesTaskQueue bytesQueue = RocksDbTaskQueue.create(dataDirectory, metricsSystem);
return new BytesTaskQueueAdapter<>(
bytesQueue, NodeDataRequest::serialize, NodeDataRequest::deserialize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
import tech.pegasys.pantheon.metrics.MetricCategory;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.metrics.OperationTimer;
import tech.pegasys.pantheon.services.queue.BigQueue;
import tech.pegasys.pantheon.services.queue.TaskQueue;
import tech.pegasys.pantheon.services.queue.TaskQueue.Task;
import tech.pegasys.pantheon.util.bytes.BytesValue;

import java.time.Duration;
Expand Down Expand Up @@ -55,7 +56,7 @@ private enum Status {
}

private final EthContext ethContext;
private final BigQueue<NodeDataRequest> pendingRequests;
private final TaskQueue<NodeDataRequest> pendingRequests;
private final int hashCountPerRequest;
private final int maxOutstandingRequests;
private final AtomicInteger outstandingRequests = new AtomicInteger(0);
Expand All @@ -69,7 +70,7 @@ private enum Status {
public WorldStateDownloader(
final EthContext ethContext,
final WorldStateStorage worldStateStorage,
final BigQueue<NodeDataRequest> pendingRequests,
final TaskQueue<NodeDataRequest> pendingRequests,
final int hashCountPerRequest,
final int maxOutstandingRequests,
final LabelledMetric<OperationTimer> ethTasksTimer,
Expand Down Expand Up @@ -140,28 +141,31 @@ private void requestNodeData(final BlockHeader header) {
EthPeer peer = maybePeer.get();

// Collect data to be requested
List<NodeDataRequest> toRequest = new ArrayList<>();
List<Task<NodeDataRequest>> toRequest = new ArrayList<>();
while (toRequest.size() < hashCountPerRequest) {
NodeDataRequest pendingRequest = pendingRequests.dequeue();
if (pendingRequest == null) {
Task<NodeDataRequest> pendingRequestTask = pendingRequests.dequeue();
if (pendingRequestTask == null) {
break;
}
NodeDataRequest pendingRequest = pendingRequestTask.getData();
final Optional<BytesValue> existingData =
pendingRequest.getExistingData(worldStateStorage);
if (existingData.isPresent()) {
pendingRequest.setData(existingData.get());
queueChildRequests(pendingRequest);
pendingRequestTask.markCompleted();
continue;
}
toRequest.add(pendingRequest);
toRequest.add(pendingRequestTask);
}

// Request and process node data
outstandingRequests.incrementAndGet();
sendAndProcessRequests(peer, toRequest, header)
.whenComplete(
(res, error) -> {
if (outstandingRequests.decrementAndGet() == 0 && pendingRequests.isEmpty()) {
if (outstandingRequests.decrementAndGet() == 0
&& pendingRequests.allTasksCompleted()) {
// We're done
final Updater updater = worldStateStorage.updater();
updater.putAccountStateTrieNode(header.getStateRoot(), rootNode);
Expand Down Expand Up @@ -201,9 +205,15 @@ private CompletableFuture<?> waitForNewPeer() {
}

private CompletableFuture<?> sendAndProcessRequests(
final EthPeer peer, final List<NodeDataRequest> requests, final BlockHeader blockHeader) {
final EthPeer peer,
final List<Task<NodeDataRequest>> requestTasks,
final BlockHeader blockHeader) {
List<Hash> hashes =
requests.stream().map(NodeDataRequest::getHash).distinct().collect(Collectors.toList());
requestTasks.stream()
.map(Task::getData)
.map(NodeDataRequest::getHash)
.distinct()
.collect(Collectors.toList());
return GetNodeDataFromPeerTask.forHashes(ethContext, hashes, ethTasksTimer)
.assignPeer(peer)
.run()
Expand All @@ -213,11 +223,12 @@ private CompletableFuture<?> sendAndProcessRequests(
(data, err) -> {
boolean requestFailed = err != null;
Updater storageUpdater = worldStateStorage.updater();
for (NodeDataRequest request : requests) {
for (Task<NodeDataRequest> task : requestTasks) {
NodeDataRequest request = task.getData();
BytesValue matchingData = requestFailed ? null : data.get(request.getHash());
if (matchingData == null) {
retriedRequestsTotal.inc();
pendingRequests.enqueue(request);
task.markFailed();
} else {
completedRequestsCounter.inc();
// Persist request data
Expand All @@ -229,6 +240,7 @@ private CompletableFuture<?> sendAndProcessRequests(
}

queueChildRequests(request);
task.markCompleted();
}
}
storageUpdater.commit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage.Updater;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.services.kvstore.InMemoryKeyValueStorage;
import tech.pegasys.pantheon.services.queue.BigQueue;
import tech.pegasys.pantheon.services.queue.InMemoryBigQueue;
import tech.pegasys.pantheon.services.queue.InMemoryTaskQueue;
import tech.pegasys.pantheon.services.queue.TaskQueue;
import tech.pegasys.pantheon.util.bytes.Bytes32;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import tech.pegasys.pantheon.util.uint.UInt256;
Expand Down Expand Up @@ -117,7 +117,7 @@ public void downloadEmptyWorldState() {
.limit(5)
.collect(Collectors.toList());

BigQueue<NodeDataRequest> queue = new InMemoryBigQueue<>();
TaskQueue<NodeDataRequest> queue = new InMemoryTaskQueue<>();
WorldStateStorage localStorage =
new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage());
WorldStateDownloader downloader =
Expand Down Expand Up @@ -164,7 +164,7 @@ public void downloadAlreadyAvailableWorldState() {
.limit(5)
.collect(Collectors.toList());

BigQueue<NodeDataRequest> queue = new InMemoryBigQueue<>();
TaskQueue<NodeDataRequest> queue = new InMemoryTaskQueue<>();
WorldStateDownloader downloader =
new WorldStateDownloader(
ethProtocolManager.ethContext(),
Expand Down Expand Up @@ -210,7 +210,7 @@ public void canRecoverFromTimeouts() {
.limit(5)
.collect(Collectors.toList());

BigQueue<NodeDataRequest> queue = new InMemoryBigQueue<>();
TaskQueue<NodeDataRequest> queue = new InMemoryTaskQueue<>();
WorldStateStorage localStorage =
new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage());
WorldStateDownloader downloader =
Expand Down Expand Up @@ -271,7 +271,7 @@ public void doesNotRequestKnownCodeFromNetwork() {
.limit(5)
.collect(Collectors.toList());

BigQueue<NodeDataRequest> queue = new InMemoryBigQueue<>();
TaskQueue<NodeDataRequest> queue = new InMemoryTaskQueue<>();
WorldStateStorage localStorage =
new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage());

Expand Down Expand Up @@ -349,7 +349,7 @@ public void doesRequestKnownAccountTrieNodesFromNetwork() {
.limit(5)
.collect(Collectors.toList());

BigQueue<NodeDataRequest> queue = new InMemoryBigQueue<>();
TaskQueue<NodeDataRequest> queue = new InMemoryTaskQueue<>();
WorldStateStorage localStorage =
new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage());

Expand Down Expand Up @@ -441,7 +441,7 @@ public void doesRequestKnownStorageTrieNodesFromNetwork() {
.limit(5)
.collect(Collectors.toList());

BigQueue<NodeDataRequest> queue = new InMemoryBigQueue<>();
TaskQueue<NodeDataRequest> queue = new InMemoryTaskQueue<>();
WorldStateStorage localStorage =
new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage());

Expand Down Expand Up @@ -606,7 +606,7 @@ private void downloadAvailableWorldStateFromPeers(
.getHeader();
assertThat(otherStateRoot).isNotEqualTo(stateRoot); // Sanity check

BigQueue<NodeDataRequest> queue = new InMemoryBigQueue<>();
TaskQueue<NodeDataRequest> queue = new InMemoryTaskQueue<>();
WorldStateStorage localStorage =
new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage());
WorldStateArchive localWorldStateArchive = new WorldStateArchive(localStorage);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@

import tech.pegasys.pantheon.util.bytes.BytesValue;

public interface BytesQueue extends BigQueue<BytesValue> {}
public interface BytesTaskQueue extends TaskQueue<BytesValue> {}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
import java.io.IOException;
import java.util.function.Function;

public class BytesQueueAdapter<T> implements BigQueue<T> {
public class BytesTaskQueueAdapter<T> implements TaskQueue<T> {

private final BytesQueue queue;
private final BytesTaskQueue queue;
private final Function<T, BytesValue> serializer;
private final Function<BytesValue, T> deserializer;

public BytesQueueAdapter(
final BytesQueue queue,
public BytesTaskQueueAdapter(
final BytesTaskQueue queue,
final Function<T, BytesValue> serializer,
final Function<BytesValue, T> deserializer) {
this.queue = queue;
Expand All @@ -33,23 +33,63 @@ public BytesQueueAdapter(
}

@Override
public void enqueue(final T value) {
queue.enqueue(serializer.apply(value));
public void enqueue(final T taskData) {
queue.enqueue(serializer.apply(taskData));
}

@Override
public T dequeue() {
BytesValue value = queue.dequeue();
return value == null ? null : deserializer.apply(value);
public Task<T> dequeue() {
Task<BytesValue> task = queue.dequeue();
if (task == null) {
return null;
}

T data = deserializer.apply(task.getData());
return new AdapterTask<>(task, data);
}

@Override
public long size() {
return queue.size();
}

@Override
public boolean isEmpty() {
return queue.isEmpty();
}

@Override
public boolean allTasksCompleted() {
return queue.allTasksCompleted();
}

@Override
public void close() throws IOException {
queue.close();
}

private static class AdapterTask<T> implements Task<T> {
private final Task<BytesValue> wrappedTask;
private final T data;

public AdapterTask(final Task<BytesValue> wrappedTask, final T data) {
this.wrappedTask = wrappedTask;
this.data = data;
}

@Override
public T getData() {
return data;
}

@Override
public void markCompleted() {
wrappedTask.markCompleted();
}

@Override
public void markFailed() {
wrappedTask.markFailed();
}
}
}
Loading