Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
Pipeline chain download - fetch and import data (#1207)
Browse files Browse the repository at this point in the history
  • Loading branch information
ajsutton authored Apr 4, 2019
1 parent 46d2af7 commit db4f60c
Show file tree
Hide file tree
Showing 23 changed files with 1,006 additions and 47 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync;

import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.exceptions.InvalidBlockException;
import tech.pegasys.pantheon.ethereum.mainnet.BlockHeaderValidator;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;

import java.util.function.Function;
import java.util.stream.Stream;

public class CheckpointHeaderValidationStep<C>
implements Function<CheckpointRangeHeaders, Stream<BlockHeader>> {

private final ProtocolSchedule<C> protocolSchedule;
private final ProtocolContext<C> protocolContext;
private final ValidationPolicy validationPolicy;

public CheckpointHeaderValidationStep(
final ProtocolSchedule<C> protocolSchedule,
final ProtocolContext<C> protocolContext,
final ValidationPolicy validationPolicy) {
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
this.validationPolicy = validationPolicy;
}

@Override
public Stream<BlockHeader> apply(final CheckpointRangeHeaders checkpointRangeHeaders) {
final BlockHeader expectedParent = checkpointRangeHeaders.getCheckpointRange().getStart();
final BlockHeader firstHeaderToImport = checkpointRangeHeaders.getFirstHeaderToImport();

if (isValid(expectedParent, firstHeaderToImport)) {
return checkpointRangeHeaders.getHeadersToImport().stream();
} else {
throw new InvalidBlockException(
"Provided first header does not connect to last header.",
expectedParent.getNumber(),
expectedParent.getHash());
}
}

private boolean isValid(final BlockHeader expectedParent, final BlockHeader firstHeaderToImport) {
final BlockHeaderValidator<C> validator =
protocolSchedule
.getByBlockNumber(firstHeaderToImport.getNumber())
.getBlockHeaderValidator();
return validator.validateHeader(
firstHeaderToImport,
expectedParent,
protocolContext,
validationPolicy.getValidationModeForNextBlock());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
*/
package tech.pegasys.pantheon.ethereum.eth.sync;

import static java.lang.Math.toIntExact;

import tech.pegasys.pantheon.ethereum.core.BlockHeader;

import java.util.Objects;
Expand Down Expand Up @@ -59,4 +61,8 @@ public String toString() {
.add("end", end.getNumber())
.toString();
}

public int getSegmentLength() {
return toIntExact(end.getNumber() - start.getNumber());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync;

import static com.google.common.base.Preconditions.checkArgument;

import tech.pegasys.pantheon.ethereum.core.BlockHeader;

import java.util.List;
import java.util.Objects;

import com.google.common.base.MoreObjects;

public class CheckpointRangeHeaders {
private final CheckpointRange checkpointRange;
private final List<BlockHeader> headersToImport;

public CheckpointRangeHeaders(
final CheckpointRange checkpointRange, final List<BlockHeader> headersToImport) {
checkArgument(!headersToImport.isEmpty(), "Must have at least one header to import");
this.checkpointRange = checkpointRange;
this.headersToImport = headersToImport;
}

public CheckpointRange getCheckpointRange() {
return checkpointRange;
}

public List<BlockHeader> getHeadersToImport() {
return headersToImport;
}

public BlockHeader getFirstHeaderToImport() {
return headersToImport.get(0);
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final CheckpointRangeHeaders that = (CheckpointRangeHeaders) o;
return Objects.equals(checkpointRange, that.checkpointRange)
&& Objects.equals(headersToImport, that.headersToImport);
}

@Override
public int hashCode() {
return Objects.hash(checkpointRange, headersToImport);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("checkpointRange", checkpointRange)
.add("headersToImport", headersToImport)
.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync;

import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.CompleteBlocksTask;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.metrics.MetricsSystem;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

public class DownloadBodiesStep<C>
implements Function<List<BlockHeader>, CompletableFuture<List<Block>>> {

private final ProtocolSchedule<C> protocolSchedule;
private final EthContext ethContext;
private final MetricsSystem metricsSystem;

public DownloadBodiesStep(
final ProtocolSchedule<C> protocolSchedule,
final EthContext ethContext,
final MetricsSystem metricsSystem) {
this.protocolSchedule = protocolSchedule;
this.ethContext = ethContext;
this.metricsSystem = metricsSystem;
}

@Override
public CompletableFuture<List<Block>> apply(final List<BlockHeader> blockHeaders) {
return CompleteBlocksTask.forHeaders(protocolSchedule, ethContext, blockHeaders, metricsSystem)
.run();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync;

import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.DownloadHeaderSequenceTask;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.util.FutureUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

public class DownloadHeadersStep<C>
implements Function<CheckpointRange, CompletableFuture<CheckpointRangeHeaders>> {

private final ProtocolSchedule<C> protocolSchedule;
private final ProtocolContext<C> protocolContext;
private final EthContext ethContext;
private final ValidationPolicy validationPolicy;
private final MetricsSystem metricsSystem;

public DownloadHeadersStep(
final ProtocolSchedule<C> protocolSchedule,
final ProtocolContext<C> protocolContext,
final EthContext ethContext,
final ValidationPolicy validationPolicy,
final MetricsSystem metricsSystem) {
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
this.ethContext = ethContext;
this.validationPolicy = validationPolicy;
this.metricsSystem = metricsSystem;
}

@Override
public CompletableFuture<CheckpointRangeHeaders> apply(final CheckpointRange checkpointRange) {
final CompletableFuture<List<BlockHeader>> taskFuture = downloadHeaders(checkpointRange);
final CompletableFuture<CheckpointRangeHeaders> processedFuture =
taskFuture.thenApply(headers -> processHeaders(checkpointRange, headers));
FutureUtils.propagateCancellation(processedFuture, taskFuture);
return processedFuture;
}

private CompletableFuture<List<BlockHeader>> downloadHeaders(
final CheckpointRange checkpointRange) {
return DownloadHeaderSequenceTask.endingAtHeader(
protocolSchedule,
protocolContext,
ethContext,
checkpointRange.getEnd(),
// -1 because we don't want to request the range starting header
checkpointRange.getSegmentLength() - 1,
validationPolicy,
metricsSystem)
.run();
}

private CheckpointRangeHeaders processHeaders(
final CheckpointRange checkpointRange, final List<BlockHeader> headers) {
final List<BlockHeader> headersToImport = new ArrayList<>(headers);
headersToImport.add(checkpointRange.getEnd());
return new CheckpointRangeHeaders(checkpointRange, headersToImport);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@

import tech.pegasys.pantheon.ethereum.eth.manager.EthScheduler;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncTarget;
import tech.pegasys.pantheon.metrics.Counter;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.MetricCategory;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.services.pipeline.Pipeline;

import java.util.Optional;
Expand All @@ -37,15 +41,27 @@ public class PipelineChainDownloader<C> implements ChainDownloader {

private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicBoolean cancelled = new AtomicBoolean(false);
private final Counter pipelineCompleteCounter;
private final Counter pipelineErrorCounter;
private Pipeline<?> currentDownloadPipeline;

public PipelineChainDownloader(
final SyncTargetManager<C> syncTargetManager,
final DownloadPipelineFactory downloadPipelineFactory,
final EthScheduler scheduler) {
final EthScheduler scheduler,
final MetricsSystem metricsSystem) {
this.syncTargetManager = syncTargetManager;
this.downloadPipelineFactory = downloadPipelineFactory;
this.scheduler = scheduler;

final LabelledMetric<Counter> labelledCounter =
metricsSystem.createLabelledCounter(
MetricCategory.SYNCHRONIZER,
"chain_download_pipeline_restarts",
"Number of times the chain download pipeline has been restarted",
"reason");
pipelineCompleteCounter = labelledCounter.labels("complete");
pipelineErrorCounter = labelledCounter.labels("error");
}

@Override
Expand All @@ -72,7 +88,8 @@ private CompletableFuture<Void> performDownload() {
private CompletableFuture<Void> selectSyncTargetAndDownload() {
return syncTargetManager
.findSyncTarget(Optional.empty())
.thenCompose(this::startDownloadForSyncTarget);
.thenCompose(this::startDownloadForSyncTarget)
.thenRun(pipelineCompleteCounter::inc);
}

private CompletionStage<Void> repeatUnlessDownloadComplete(
Expand All @@ -87,6 +104,7 @@ private CompletionStage<Void> repeatUnlessDownloadComplete(

private CompletionStage<Void> handleFailedDownload(final Throwable error) {
LOG.debug("Chain download failed. Will restart if required.", error);
pipelineErrorCounter.inc();
if (!cancelled.get() && syncTargetManager.shouldContinueDownloading()) {
// Drop the error, allowing the normal looping logic to retry.
return completedFuture(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,13 @@

import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.core.TransactionReceipt;

import java.util.List;
import java.util.Objects;

import com.google.common.base.MoreObjects;

class BlockWithReceipts {
private final Block block;
Expand All @@ -38,4 +42,37 @@ public Block getBlock() {
public List<TransactionReceipt> getReceipts() {
return receipts;
}

public long getNumber() {
return block.getHeader().getNumber();
}

public Hash getHash() {
return block.getHash();
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final BlockWithReceipts that = (BlockWithReceipts) o;
return Objects.equals(block, that.block) && Objects.equals(receipts, that.receipts);
}

@Override
public int hashCode() {
return Objects.hash(block, receipts);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("block", block)
.add("receipts", receipts)
.toString();
}
}
Loading

0 comments on commit db4f60c

Please sign in to comment.