Skip to content

Commit

Permalink
Merge fbad895 into 1bd730a
Browse files Browse the repository at this point in the history
  • Loading branch information
twoeths authored Jan 16, 2022
2 parents 1bd730a + fbad895 commit d635395
Show file tree
Hide file tree
Showing 13 changed files with 411 additions and 56 deletions.
15 changes: 14 additions & 1 deletion packages/lodestar/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import fs from "fs";
import {CachedBeaconState, computeStartSlotAtEpoch} from "@chainsafe/lodestar-beacon-state-transition";
import {IBeaconConfig} from "@chainsafe/lodestar-config";
import {IForkChoice} from "@chainsafe/lodestar-fork-choice";
import {allForks, Number64, Root, phase0, Slot} from "@chainsafe/lodestar-types";
import {allForks, Number64, Root, phase0, Slot, RootHex} from "@chainsafe/lodestar-types";
import {ILogger} from "@chainsafe/lodestar-utils";
import {fromHexString, TreeBacked} from "@chainsafe/ssz";
import {AbortController} from "@chainsafe/abort-controller";
Expand Down Expand Up @@ -43,6 +43,7 @@ import {Archiver} from "./archiver";
import {IEth1ForBlockProduction} from "../eth1";
import {IExecutionEngine} from "../executionEngine";
import {PrecomputeNextEpochTransitionScheduler} from "./precomputeNextEpochTransition";
import {ReprocessController} from "./reprocess";

export class BeaconChain implements IBeaconChain {
readonly genesisTime: Number64;
Expand All @@ -61,6 +62,7 @@ export class BeaconChain implements IBeaconChain {
checkpointStateCache: CheckpointStateCache;
regen: IStateRegenerator;
readonly lightClientServer: LightClientServer;
readonly reprocessController: ReprocessController;

// Ops pool
readonly attestationPool = new AttestationPool();
Expand Down Expand Up @@ -142,6 +144,8 @@ export class BeaconChain implements IBeaconChain {
{genesisTime: this.genesisTime, genesisValidatorsRoot: this.genesisValidatorsRoot as Uint8Array}
);

this.reprocessController = new ReprocessController(this.metrics);

this.blockProcessor = new BlockProcessor(
{
clock,
Expand Down Expand Up @@ -252,6 +256,15 @@ export class BeaconChain implements IBeaconChain {
};
}

/**
* Returns Promise that resolves either on block found or once 1 slot passes.
* Used to handle unknown block root for both unaggregated and aggregated attestations.
* @returns true if blockFound
*/
waitForBlockOfAttestation(slot: Slot, root: RootHex): Promise<boolean> {
return this.reprocessController.waitForBlockOfAttestation(slot, root);
}

persistInvalidSszObject(type: SSZObjectType, bytes: Uint8Array, suffix = ""): string | null {
const now = new Date();
// yyyy-MM-dd
Expand Down
2 changes: 1 addition & 1 deletion packages/lodestar/src/chain/errors/attestationError.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ export type AttestationErrorType =
| {code: AttestationErrorCode.ATTESTATION_ALREADY_KNOWN; targetEpoch: Epoch; validatorIndex: number}
| {code: AttestationErrorCode.AGGREGATOR_ALREADY_KNOWN; targetEpoch: Epoch; aggregatorIndex: number}
| {code: AttestationErrorCode.AGGREGATOR_INDEX_TOO_HIGH; aggregatorIndex: ValidatorIndex}
| {code: AttestationErrorCode.UNKNOWN_BEACON_BLOCK_ROOT; root: Uint8Array}
| {code: AttestationErrorCode.UNKNOWN_BEACON_BLOCK_ROOT; root: RootHex}
| {code: AttestationErrorCode.BAD_TARGET_EPOCH}
| {code: AttestationErrorCode.HEAD_NOT_TARGET_DESCENDANT}
| {code: AttestationErrorCode.UNKNOWN_TARGET_ROOT; root: Uint8Array}
Expand Down
10 changes: 8 additions & 2 deletions packages/lodestar/src/chain/eventHandlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {CachedBeaconState, computeStartSlotAtEpoch} from "@chainsafe/lodestar-be
import {AttestationError, BlockError, BlockErrorCode} from "./errors";
import {ChainEvent, IChainEvents} from "./emitter";
import {BeaconChain} from "./chain";
import {REPROCESS_MIN_TIME_TO_NEXT_SLOT_SEC} from "./reprocess";

// eslint-disable-next-line @typescript-eslint/no-explicit-any
type AnyCallback = () => Promise<void>;
Expand Down Expand Up @@ -86,6 +87,7 @@ export async function onClockSlot(this: BeaconChain, slot: Slot): Promise<void>
this.aggregatedAttestationPool.prune(slot);
this.syncCommitteeMessagePool.prune(slot);
this.seenSyncCommitteeMessages.prune(slot);
this.reprocessController.onSlot(slot);
}

export function onClockEpoch(this: BeaconChain, currentEpoch: Epoch): void {
Expand Down Expand Up @@ -181,10 +183,14 @@ export async function onBlock(
block: allForks.SignedBeaconBlock,
_postState: CachedBeaconState<allForks.BeaconState>
): Promise<void> {
const blockRoot = this.config.getForkTypes(block.message.slot).BeaconBlock.hashTreeRoot(block.message);
const blockRoot = toHexString(this.config.getForkTypes(block.message.slot).BeaconBlock.hashTreeRoot(block.message));
const advancedSlot = this.clock.slotWithFutureTolerance(REPROCESS_MIN_TIME_TO_NEXT_SLOT_SEC);

this.reprocessController.onBlockImported({slot: block.message.slot, root: blockRoot}, advancedSlot);

this.logger.verbose("Block processed", {
slot: block.message.slot,
root: toHexString(blockRoot),
root: blockRoot,
});
}

Expand Down
6 changes: 5 additions & 1 deletion packages/lodestar/src/chain/interface.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {allForks, Number64, Root, phase0, Slot} from "@chainsafe/lodestar-types";
import {allForks, Number64, Root, phase0, Slot, RootHex} from "@chainsafe/lodestar-types";
import {CachedBeaconState} from "@chainsafe/lodestar-beacon-state-transition";
import {IForkChoice} from "@chainsafe/lodestar-fork-choice";
import {IBeaconConfig} from "@chainsafe/lodestar-config";
Expand All @@ -21,6 +21,7 @@ import {AttestationPool, OpPool, SyncCommitteeMessagePool, SyncContributionAndPr
import {LightClientServer} from "./lightClient";
import {AggregatedAttestationPool} from "./opPools/aggregatedAttestationPool";
import {PartiallyVerifiedBlockFlags} from "./blocks/types";
import {ReprocessController} from "./reprocess";

export type Eth2Context = {
activeValidatorCount: number;
Expand Down Expand Up @@ -51,6 +52,7 @@ export interface IBeaconChain {
checkpointStateCache: CheckpointStateCache;
regen: IStateRegenerator;
readonly lightClientServer: LightClientServer;
readonly reprocessController: ReprocessController;

// Ops pool
readonly attestationPool: AttestationPool;
Expand Down Expand Up @@ -92,6 +94,8 @@ export interface IBeaconChain {

getStatus(): phase0.Status;

waitForBlockOfAttestation(slot: Slot, root: RootHex): Promise<boolean>;

/** Persist bad items to persistInvalidSszObjectsDir dir, for example invalid state, attestations etc. */
persistInvalidSszObject(type: SSZObjectType, bytes: Uint8Array, suffix: string): string | null;
}
Expand Down
159 changes: 159 additions & 0 deletions packages/lodestar/src/chain/reprocess.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
import {Slot, RootHex} from "@chainsafe/lodestar-types";
import {IMetrics} from "../metrics";
import {MapDef} from "../util/map";

/**
* To prevent our node from having to reprocess while struggling to sync,
* we only want to reprocess attestations if block reaches our node before this time.
*/
export const REPROCESS_MIN_TIME_TO_NEXT_SLOT_SEC = 2;

/**
* Reprocess status for metrics
*/
enum ReprocessStatus {
/**
* There are too many attestations that have unknown block root.
*/
reached_limit = "reached_limit",
/**
* The awaiting attestation is pruned per clock slot.
*/
expired = "expired",
}

type AwaitingAttestationPromise = {
resolve: (foundBlock: boolean) => void;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
promise: Promise<boolean>;
// there are multiple subnet/aggregated attestations waiting for same promise
awaitingAttestationsCount: number;
// we only resolve to true or false to make the code simpler so no need to keep reject here
addedTimeMs: number;
};

// How many attestations (aggregate + unaggregate) we keep before new ones get dropped.
const MAXIMUM_QUEUED_ATTESTATIONS = 16_384;

type SlotRoot = {slot: Slot; root: RootHex};

/**
* Some attestations may reach our node before the voted block, so we manage a cache to reprocess them
* when the block come.
* (n) (n + 1)
* |----------------|----------------|----------|------|
* | | |
* att agg att |
* block
* Since the gossip handler has to return validation result to js-libp2p-gossipsub, this class should not
* reprocess attestations, it should control when the attestations are ready to reprocess instead.
*/
export class ReprocessController {
private readonly awaitingPromisesByRootBySlot: MapDef<Slot, Map<RootHex, AwaitingAttestationPromise>>;
private awaitingPromisesCount = 0;

constructor(private readonly metrics: IMetrics | null) {
this.awaitingPromisesByRootBySlot = new MapDef(() => new Map<RootHex, AwaitingAttestationPromise>());
}

/**
* Returns Promise that resolves either on block found or once 1 slot passes.
* Used to handle unknown block root for both unaggregated and aggregated attestations.
* @returns true if blockFound
*/
waitForBlockOfAttestation(slot: Slot, root: RootHex): Promise<boolean> {
this.metrics?.reprocessAttestations.total.inc();

if (this.awaitingPromisesCount >= MAXIMUM_QUEUED_ATTESTATIONS) {
this.metrics?.reprocessAttestations.reject.inc({reason: ReprocessStatus.reached_limit});
return Promise.resolve(false);
}

this.awaitingPromisesCount++;
const awaitingPromisesByRoot = this.awaitingPromisesByRootBySlot.getOrDefault(slot);
const promiseCached = awaitingPromisesByRoot.get(root);
if (promiseCached) {
promiseCached.awaitingAttestationsCount++;
return promiseCached.promise;
}

// Capture both the promise and its callbacks.
// It is not spec'ed but in tests in Firefox and NodeJS the promise constructor is run immediately
let resolve: AwaitingAttestationPromise["resolve"] | null = null;
const promise = new Promise<boolean>((resolveCB) => {
resolve = resolveCB;
});

if (resolve === null) {
throw Error("Promise Constructor was not executed immediately");
}

awaitingPromisesByRoot.set(root, {
promise,
awaitingAttestationsCount: 1,
resolve,
addedTimeMs: Date.now(),
});

return promise;
}

/**
* It's important to make sure our node is synced before we reprocess,
* it means the processed slot is same to clock slot
* Note that we want to use clock advanced by REPROCESS_MIN_TIME_TO_NEXT_SLOT instead of
* clockSlot because we want to make sure our node is healthy while reprocessing attestations.
* If a block reach our node 1s before the next slot, for example, then probably node
* is struggling and we don't want to reprocess anything at that time.
*/
onBlockImported({slot: blockSlot, root}: SlotRoot, advancedSlot: Slot): void {
// we are probably resyncing, don't want to reprocess attestations here
if (blockSlot < advancedSlot) return;

// resolve all related promises
const awaitingPromisesBySlot = this.awaitingPromisesByRootBySlot.getOrDefault(blockSlot);
const awaitingPromise = awaitingPromisesBySlot.get(root);
if (awaitingPromise) {
const {resolve, addedTimeMs, awaitingAttestationsCount} = awaitingPromise;
resolve(true);
this.awaitingPromisesCount -= awaitingAttestationsCount;
this.metrics?.reprocessAttestations.resolve.inc(awaitingAttestationsCount);
this.metrics?.reprocessAttestations.waitTimeBeforeResolve.set((Date.now() - addedTimeMs) / 1000);
}

// prune
awaitingPromisesBySlot.delete(root);
}

/**
* It's important to make sure our node is synced before reprocessing attestations,
* it means clockSlot is the same to last processed block's slot, and we don't reprocess
* attestations of old slots.
* So we reject and prune all old awaiting promises per clock slot.
* @param clockSlot
*/
onSlot(clockSlot: Slot): void {
const now = Date.now();

for (const [key, awaitingPromisesByRoot] of this.awaitingPromisesByRootBySlot.entries()) {
if (key < clockSlot) {
// reject all related promises
for (const awaitingPromise of awaitingPromisesByRoot.values()) {
const {resolve, addedTimeMs} = awaitingPromise;
resolve(false);
this.metrics?.reprocessAttestations.waitTimeBeforeReject.set((now - addedTimeMs) / 1000);
this.metrics?.reprocessAttestations.reject.inc({reason: ReprocessStatus.expired});
}

// prune
this.awaitingPromisesByRootBySlot.delete(key);
} else {
break;
}
}

// in theory there are maybe some awaiting promises waiting for a slot > clockSlot
// in reality this never happens so reseting awaitingPromisesCount to 0 to make it simple
this.awaitingPromisesCount = 0;
}
}
2 changes: 1 addition & 1 deletion packages/lodestar/src/chain/validation/attestation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ function verifyHeadBlockIsKnown(chain: IBeaconChain, beaconBlockRoot: Root): IPr
if (headBlock === null) {
throw new AttestationError(GossipAction.IGNORE, {
code: AttestationErrorCode.UNKNOWN_BEACON_BLOCK_ROOT,
root: beaconBlockRoot.valueOf() as Uint8Array,
root: toHexString(beaconBlockRoot.valueOf() as typeof beaconBlockRoot),
});
}

Expand Down
25 changes: 25 additions & 0 deletions packages/lodestar/src/metrics/metrics/lodestar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -710,5 +710,30 @@ export function createLodestarMetrics(
help: "Total number of precomputing next epoch transition wasted",
}),
},

// reprocess attestations
reprocessAttestations: {
total: register.gauge({
name: "lodestar_reprocess_attestations_total",
help: "Total number of attestations waiting to reprocess",
}),
resolve: register.gauge({
name: "lodestar_reprocess_attestations_resolve_total",
help: "Total number of attestations are reprocessed",
}),
waitTimeBeforeResolve: register.gauge({
name: "lodestar_reprocess_attestations_wait_time_resolve_seconds",
help: "Time to wait for unknown block in seconds",
}),
reject: register.gauge<"reason">({
name: "lodestar_reprocess_attestations_reject_total",
help: "Total number of attestations are rejected to reprocess",
labelNames: ["reason"],
}),
waitTimeBeforeReject: register.gauge<"reason">({
name: "lodestar_reprocess_attestations_wait_time_reject_seconds",
help: "Time to wait for unknown block before being rejected",
}),
},
};
}
Loading

0 comments on commit d635395

Please sign in to comment.