Skip to content

Commit

Permalink
Log attestation journey
Browse files Browse the repository at this point in the history
  • Loading branch information
twoeths committed Apr 6, 2022
1 parent 60fc678 commit a7769d9
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 34 deletions.
11 changes: 3 additions & 8 deletions packages/lodestar/src/api/impl/beacon/pool/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import {validateGossipProposerSlashing} from "../../../../chain/validation/propo
import {validateGossipVoluntaryExit} from "../../../../chain/validation/voluntaryExit";
import {validateSyncCommitteeSigOnly} from "../../../../chain/validation/syncCommittee";
import {ApiModules} from "../../types";
import {OpSource} from "../../../../metrics/validatorMonitor";
import {toHexString} from "@chainsafe/ssz";
import {AttestationError, GossipAction, SyncCommitteeError} from "../../../../chain/errors";

Expand Down Expand Up @@ -43,20 +42,16 @@ export function getBeaconPoolApi({
},

async submitPoolAttestations(attestations) {
const seenTimestampSec = Date.now() / 1000;
const errors: Error[] = [];

await Promise.all(
attestations.map(async (attestation, i) => {
try {
const {indexedAttestation, subnet} = await validateGossipAttestation(chain, attestation, null);

metrics?.registerUnaggregatedAttestation(OpSource.api, seenTimestampSec, indexedAttestation);

await Promise.all([
network.gossip.publishBeaconAttestation(attestation, subnet),
chain.attestationPool.add(attestation),
]);
chain.attestationPool.add(attestation);
const sentPeers = await network.gossip.publishBeaconAttestation(attestation, subnet);
metrics?.submitUnaggregatedAttestation(indexedAttestation, subnet, sentPeers);
} catch (e) {
errors.push(e as Error);
logger.error(
Expand Down
22 changes: 6 additions & 16 deletions packages/lodestar/src/api/impl/validator/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import {toGraffitiBuffer} from "../../../util/graffiti";
import {ApiError, NodeIsSyncing} from "../errors";
import {validateSyncCommitteeGossipContributionAndProof} from "../../../chain/validation/syncCommitteeContributionAndProof";
import {CommitteeSubscription} from "../../../network/subnets";
import {OpSource} from "../../../metrics/validatorMonitor";
import {computeSubnetForCommitteesAtSlot, getPubkeysForIndices} from "./utils";
import {ApiModules} from "../types";
import {RegenCaller} from "../../../chain/regen";
Expand Down Expand Up @@ -424,7 +423,6 @@ export function getValidatorApi({chain, config, logger, metrics, network, sync}:
async publishAggregateAndProofs(signedAggregateAndProofs) {
notWhileSyncing();

const seenTimestampSec = Date.now() / 1000;
const errors: Error[] = [];

await Promise.all(
Expand All @@ -436,21 +434,13 @@ export function getValidatorApi({chain, config, logger, metrics, network, sync}:
signedAggregateAndProof
);

metrics?.registerAggregatedAttestation(
OpSource.api,
seenTimestampSec,
signedAggregateAndProof,
indexedAttestation
chain.aggregatedAttestationPool.add(
signedAggregateAndProof.message.aggregate,
indexedAttestation.attestingIndices,
committeeIndices
);

await Promise.all([
chain.aggregatedAttestationPool.add(
signedAggregateAndProof.message.aggregate,
indexedAttestation.attestingIndices,
committeeIndices
),
network.gossip.publishBeaconAggregateAndProof(signedAggregateAndProof),
]);
const sentPeers = await network.gossip.publishBeaconAggregateAndProof(signedAggregateAndProof);
metrics?.submitAggregatedAttestation(indexedAttestation, sentPeers);
} catch (e) {
if (e instanceof AttestationError && e.type.code === AttestationErrorCode.AGGREGATOR_ALREADY_KNOWN) {
logger.debug("Ignoring known signedAggregateAndProof");
Expand Down
6 changes: 4 additions & 2 deletions packages/lodestar/src/metrics/metrics.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/**
* @module metrics
*/
import {ILogger} from "@chainsafe/lodestar-utils";
import {BeaconStateAllForks, getCurrentSlot} from "@chainsafe/lodestar-beacon-state-transition";
import {IChainForkConfig} from "@chainsafe/lodestar-config";
import {collectDefaultMetrics, Counter, Metric, Registry} from "prom-client";
Expand All @@ -18,14 +19,15 @@ export function createMetrics(
opts: IMetricsOptions,
config: IChainForkConfig,
anchorState: BeaconStateAllForks,
externalRegistries: Registry[] = []
logger: ILogger,
externalRegistries: Registry[] = [],
): IMetrics {
const register = new RegistryMetricCreator();
const beacon = createBeaconMetrics(register);
const lodestar = createLodestarMetrics(register, opts.metadata, anchorState);

const genesisTime = anchorState.genesisTime;
const validatorMonitor = createValidatorMonitor(lodestar, config, genesisTime);
const validatorMonitor = createValidatorMonitor(lodestar, config, genesisTime, logger);
// Register a single collect() function to run all validatorMonitor metrics
lodestar.validatorMonitor.validatorsTotal.addCollect(() => {
const clockSlot = getCurrentSlot(config, genesisTime);
Expand Down
6 changes: 6 additions & 0 deletions packages/lodestar/src/metrics/metrics/lodestar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,12 @@ export function createLodestarMetrics(
labelNames: ["index", "src"],
buckets: [0.1, 1],
}),
unaggregatedAttestationSubmittedSentPeers: register.histogram<"index">({
name: "validator_monitor_unaggregated_attestation_submited_sent_peers_total",
help: "Number of unaggregated attestations submitted by local validator that has no subnet peers",
labelNames: ["index"],
buckets: [0, 2, 5, 10],
}),
aggregatedAttestationTotal: register.gauge<"index" | "src">({
name: "validator_monitor_aggregated_attestation_total",
help: "Number of aggregated attestations seen",
Expand Down
67 changes: 66 additions & 1 deletion packages/lodestar/src/metrics/validatorMonitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
IAttesterStatus,
parseAttesterFlags,
} from "@chainsafe/lodestar-beacon-state-transition";
import {ILogger} from "@chainsafe/lodestar-utils";
import {allForks} from "@chainsafe/lodestar-beacon-state-transition";
import {IChainForkConfig} from "@chainsafe/lodestar-config";
import {MIN_ATTESTATION_INCLUSION_DELAY, SLOTS_PER_EPOCH} from "@chainsafe/lodestar-params";
Expand All @@ -26,11 +27,13 @@ export interface IValidatorMonitor {
registerLocalValidator(index: number): void;
registerValidatorStatuses(currentEpoch: Epoch, statuses: IAttesterStatus[], balances?: number[]): void;
registerBeaconBlock(src: OpSource, seenTimestampSec: Seconds, block: allForks.BeaconBlock): void;
submitUnaggregatedAttestation(indexedAttestation: IndexedAttestation, subnet: number, sentPeers: number): void;
registerUnaggregatedAttestation(
src: OpSource,
seenTimestampSec: Seconds,
indexedAttestation: IndexedAttestation
): void;
submitAggregatedAttestation(indexedAttestation: IndexedAttestation, sentPeers: number): void;
registerAggregatedAttestation(
src: OpSource,
seenTimestampSec: Seconds,
Expand Down Expand Up @@ -166,7 +169,8 @@ type MonitoredValidator = {
export function createValidatorMonitor(
metrics: ILodestarMetrics,
config: IChainForkConfig,
genesisTime: number
genesisTime: number,
logger: ILogger
): IValidatorMonitor {
/** The validators that require additional monitoring. */
const validators = new Map<ValidatorIndex, MonitoredValidator>();
Expand Down Expand Up @@ -201,23 +205,37 @@ export function createValidatorMonitor(
}

const summary = statusToSummary(status);
let failedAttestation = false;

if (summary.isPrevSourceAttester) {
metrics.validatorMonitor.prevEpochOnChainAttesterHit.inc({index});
} else {
failedAttestation = true;
metrics.validatorMonitor.prevEpochOnChainAttesterMiss.inc({index});
}
if (summary.isPrevHeadAttester) {
metrics.validatorMonitor.prevEpochOnChainHeadAttesterHit.inc({index});
} else {
failedAttestation = true;
metrics.validatorMonitor.prevEpochOnChainHeadAttesterMiss.inc({index});
}
if (summary.isPrevTargetAttester) {
metrics.validatorMonitor.prevEpochOnChainTargetAttesterHit.inc({index});
} else {
failedAttestation = true;
metrics.validatorMonitor.prevEpochOnChainTargetAttesterMiss.inc({index});
}

if (failedAttestation) {
logger.verbose("Failed attestation in previous epoch", {
validatorIndex: index,
currentEpoch,
isPrevSourceAttester: summary.isPrevSourceAttester,
isPrevHeadAttester: summary.isPrevHeadAttester,
isPrevTargetAttester: summary.isPrevTargetAttester,
});
}

const prevEpochSummary = monitoredValidator.summaries.get(previousEpoch);
const attestationCorrectHead = prevEpochSummary?.attestationCorrectHead;
if (attestationCorrectHead !== null && attestationCorrectHead !== undefined) {
Expand Down Expand Up @@ -257,6 +275,25 @@ export function createValidatorMonitor(
}
},

submitUnaggregatedAttestation(indexedAttestation, subnet, sentPeers) {
const data = indexedAttestation.data;
for (const index of indexedAttestation.attestingIndices) {
const validator = validators.get(index);
if (validator) {
if (sentPeers <= 0) {
metrics.validatorMonitor.unaggregatedAttestationSubmittedSentPeers.observe({index}, sentPeers);
}
logger.verbose("Local validator published unaggregated attestation", {
validatorIndex: validator.index,
slot: data.slot,
committeeIndex: data.index,
subnet,
sentPeers,
});
}
}
},

registerUnaggregatedAttestation(src, seenTimestampSec, indexedAttestation) {
const data = indexedAttestation.data;
const epoch = computeEpochAtSlot(data.slot);
Expand All @@ -276,6 +313,21 @@ export function createValidatorMonitor(
}
},

submitAggregatedAttestation(indexedAttestation, sentPeers) {
const data = indexedAttestation.data;
for (const index of indexedAttestation.attestingIndices) {
const validator = validators.get(index);
if (validator) {
logger.verbose("Local validator published aggregated attestation", {
validatorIndex: validator.index,
slot: data.slot,
committeeIndex: data.index,
sentPeers,
});
}
}
},

registerAggregatedAttestation(src, seenTimestampSec, signedAggregateAndProof, indexedAttestation) {
const data = indexedAttestation.data;
const epoch = computeEpochAtSlot(data.slot);
Expand All @@ -302,6 +354,11 @@ export function createValidatorMonitor(
withEpochSummary(validator, epoch, (summary) => {
summary.attestationAggregateIncusions += 1;
});
logger.verbose("Local validator attestation is included in AggregatedAndProof", {
validatorIndex: validator.index,
slot: data.slot,
committeeIndex: data.index,
});
}
}
},
Expand Down Expand Up @@ -336,6 +393,14 @@ export function createValidatorMonitor(
}
summary.attestationCorrectHead = correctHead;
});

logger.verbose("Local validator attestation is included in block", {
validatorIndex: validator.index,
slot: data.slot,
committeeIndex: data.index,
inclusionDistance,
correctHead,
});
}
}
},
Expand Down
11 changes: 6 additions & 5 deletions packages/lodestar/src/network/gossip/gossipsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,13 @@ export class Eth2Gossipsub extends Gossipsub {
/**
* Publish a `GossipObject` on a `GossipTopic`
*/
async publishObject<K extends GossipType>(topic: GossipTopicMap[K], object: GossipTypeMap[K]): Promise<void> {
async publishObject<K extends GossipType>(topic: GossipTopicMap[K], object: GossipTypeMap[K]): Promise<number> {
const topicStr = this.getGossipTopicString(topic);
const sszType = getGossipSSZType(topic);
const messageData = (sszType.serialize as (object: GossipTypeMap[GossipType]) => Uint8Array)(object);
const sentPeers = await this.publish(topicStr, messageData);
this.logger.verbose("Publish to topic", {topic: topicStr, sentPeers});
return sentPeers;
}

/**
Expand Down Expand Up @@ -182,17 +183,17 @@ export class Eth2Gossipsub extends Gossipsub {
await this.publishObject<GossipType.beacon_block>({type: GossipType.beacon_block, fork}, signedBlock);
}

async publishBeaconAggregateAndProof(aggregateAndProof: phase0.SignedAggregateAndProof): Promise<void> {
async publishBeaconAggregateAndProof(aggregateAndProof: phase0.SignedAggregateAndProof): Promise<number> {
const fork = this.config.getForkName(aggregateAndProof.message.aggregate.data.slot);
await this.publishObject<GossipType.beacon_aggregate_and_proof>(
return await this.publishObject<GossipType.beacon_aggregate_and_proof>(
{type: GossipType.beacon_aggregate_and_proof, fork},
aggregateAndProof
);
}

async publishBeaconAttestation(attestation: phase0.Attestation, subnet: number): Promise<void> {
async publishBeaconAttestation(attestation: phase0.Attestation, subnet: number): Promise<number> {
const fork = this.config.getForkName(attestation.data.slot);
await this.publishObject<GossipType.beacon_attestation>(
return await this.publishObject<GossipType.beacon_attestation>(
{type: GossipType.beacon_attestation, fork, subnet},
attestation
);
Expand Down
4 changes: 3 additions & 1 deletion packages/lodestar/src/node/nodejs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,9 @@ export class BeaconNode {
// start db if not already started
await db.start();

const metrics = opts.metrics.enabled ? createMetrics(opts.metrics, config, anchorState, metricsRegistries) : null;
const metrics = opts.metrics.enabled
? createMetrics(opts.metrics, config, anchorState, logger.child({module: "VMON"}), metricsRegistries)
: null;
if (metrics) {
initBeaconMetrics(metrics, anchorState);
}
Expand Down
4 changes: 3 additions & 1 deletion packages/lodestar/test/unit/metrics/utils.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import {config} from "@chainsafe/lodestar-config/default";
import {ssz} from "@chainsafe/lodestar-types";
import {WinstonLogger} from "@chainsafe/lodestar-utils";
import {createMetrics, IMetrics} from "../../../src/metrics";

export function createMetricsTest(): IMetrics {
const state = ssz.phase0.BeaconState.defaultViewDU();
return createMetrics({enabled: true, timeout: 12000}, config, state);
const logger = new WinstonLogger();
return createMetrics({enabled: true, timeout: 12000}, config, state, logger);
}

0 comments on commit a7769d9

Please sign in to comment.