Skip to content

Commit

Permalink
7549 - VC aggregation (#8300)
Browse files Browse the repository at this point in the history
  • Loading branch information
tbenr authored May 8, 2024
1 parent c55f69e commit 1991869
Show file tree
Hide file tree
Showing 26 changed files with 431 additions and 154 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -476,13 +476,15 @@ private AttestationData createAttestationData(

@Override
public SafeFuture<Optional<Attestation>> createAggregate(
final UInt64 slot, final Bytes32 attestationHashTreeRoot) {
final UInt64 slot,
final Bytes32 attestationHashTreeRoot,
final Optional<UInt64> committeeIndex) {
if (isSyncActive()) {
return NodeSyncingException.failedFuture();
}
return SafeFuture.completedFuture(
attestationPool
.createAggregateFor(attestationHashTreeRoot)
.createAggregateFor(attestationHashTreeRoot, committeeIndex)
.filter(attestation -> attestation.getData().getSlot().equals(slot))
.map(ValidatableAttestation::getAttestation));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,7 @@ public void createAggregate_shouldFailWhenNodeIsSyncing() {
nodeIsSyncing();
final SafeFuture<Optional<Attestation>> result =
validatorApiHandler.createAggregate(
ONE, dataStructureUtil.randomAttestationData().hashTreeRoot());
ONE, dataStructureUtil.randomAttestationData().hashTreeRoot(), Optional.empty());

assertThat(result).isCompletedExceptionally();
assertThatThrownBy(result::get).hasRootCauseInstanceOf(NodeSyncingException.class);
Expand All @@ -712,12 +712,15 @@ public void createSyncCommitteeContribution() {
public void createAggregate_shouldReturnAggregateFromAttestationPool() {
final AttestationData attestationData = dataStructureUtil.randomAttestationData();
final Optional<Attestation> aggregate = Optional.of(dataStructureUtil.randomAttestation());
when(attestationPool.createAggregateFor(eq(attestationData.hashTreeRoot())))
when(attestationPool.createAggregateFor(
eq(attestationData.hashTreeRoot()), eq(Optional.empty())))
.thenReturn(aggregate.map(attestation -> ValidatableAttestation.from(spec, attestation)));

assertThat(
validatorApiHandler.createAggregate(
aggregate.get().getData().getSlot(), attestationData.hashTreeRoot()))
aggregate.get().getData().getSlot(),
attestationData.hashTreeRoot(),
Optional.empty()))
.isCompletedWithValue(aggregate);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void handleRequest(final RestApiRequest request) throws JsonProcessingExc
final UInt64 slot = request.getQueryParameter(SLOT_PARAM);

final SafeFuture<Optional<Attestation>> future =
provider.createAggregate(slot, beaconBlockRoot);
provider.createAggregate(slot, beaconBlockRoot, Optional.empty());

request.respondAsync(
future.thenApply(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ public void shouldReturnAttestationInformation() throws JsonProcessingException
request.setQueryParameter("attestation_data_root", attestationDataRoot.toHexString());

Attestation attestation = dataStructureUtil.randomAttestation();
when(validatorDataProvider.createAggregate(eq(UInt64.valueOf(1)), eq(attestationDataRoot)))
when(validatorDataProvider.createAggregate(
eq(UInt64.valueOf(1)), eq(attestationDataRoot), eq(Optional.empty())))
.thenReturn(SafeFuture.completedFuture(Optional.of(attestation)));

handler.handleRequest(request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,10 @@ private Optional<SyncCommitteeMessage> checkInternalCommitteeSignature(
}

public SafeFuture<Optional<Attestation>> createAggregate(
final UInt64 slot, final Bytes32 attestationHashTreeRoot) {
return validatorApiChannel.createAggregate(slot, attestationHashTreeRoot);
final UInt64 slot,
final Bytes32 attestationHashTreeRoot,
final Optional<UInt64> committeeIndex) {
return validatorApiChannel.createAggregate(slot, attestationHashTreeRoot, committeeIndex);
}

public SafeFuture<List<SubmitDataError>> sendAggregateAndProofs(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public IndexedAttestation getIndexedAttestation(

final IndexedAttestationSchema indexedAttestationSchema =
schemaDefinitions.getIndexedAttestationSchema();
specConfig.getMaxCommitteesPerSlot();

return indexedAttestationSchema.create(
attestingIndices.stream()
.sorted()
Expand All @@ -123,11 +123,9 @@ public IndexedAttestation getIndexedAttestation(
* <a>https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/beacon-chain.md#get_attesting_indices</a>
*/
public IntList getAttestingIndices(final BeaconState state, final Attestation attestation) {
return IntList.of(streamAttestingIndices(state, attestation).toArray());
}

public IntStream streamAttestingIndices(final BeaconState state, final Attestation attestation) {
return streamAttestingIndices(state, attestation.getData(), attestation.getAggregationBits());
return IntList.of(
streamAttestingIndices(state, attestation.getData(), attestation.getAggregationBits())
.toArray());
}

public IntStream streamAttestingIndices(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ public void assumeMilestoneActive(SpecMilestone milestone) {
Assumptions.assumeTrue(specMilestone.isGreaterThanOrEqualTo(milestone), "Milestone skipped");
}

public void assumeElectraActive() {
assumeMilestoneActive(SpecMilestone.ELECTRA);
}

public void assumeDenebActive() {
assumeMilestoneActive(SpecMilestone.DENEB);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1846,6 +1846,13 @@ public BeaconState randomBeaconState(final int validatorCount, final int numItem
.build();
}

public BeaconState randomBeaconState(
final int validatorCount, final int numItemsInSSZLists, final UInt64 slot) {
return stateBuilder(spec.getGenesisSpec().getMilestone(), validatorCount, numItemsInSSZLists)
.slot(slot)
.build();
}

public AbstractBeaconStateBuilder<
? extends BeaconState,
? extends MutableBeaconState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.function.Supplier;
import tech.pegasys.teku.bls.BLS;
import tech.pegasys.teku.infrastructure.ssz.collections.SszBitlist;
import tech.pegasys.teku.infrastructure.ssz.collections.SszBitvector;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.SpecMilestone;
import tech.pegasys.teku.spec.SpecVersion;
import tech.pegasys.teku.spec.datastructures.attestation.ValidatableAttestation;
import tech.pegasys.teku.spec.datastructures.operations.Attestation;
import tech.pegasys.teku.spec.datastructures.operations.AttestationData;
Expand Down Expand Up @@ -62,21 +66,45 @@ public void aggregate(final ValidatableAttestation attestation) {

public ValidatableAttestation buildAggregate() {
checkState(currentAggregateBits != null, "Must aggregate at least one attestation");
final SpecVersion specVersion = spec.atSlot(attestationData.getSlot());
final Supplier<SszBitvector> committeeBitsSupplier = buildCommitteeBitsSupplier(specVersion);
return ValidatableAttestation.from(
spec,
spec.atSlot(attestationData.getSlot())
specVersion
.getSchemaDefinitions()
.getAttestationSchema()
.create(
currentAggregateBits,
attestationData,
committeeBitsSupplier,
BLS.aggregate(
includedAttestations.stream()
.map(ValidatableAttestation::getAttestation)
.map(Attestation::getAggregateSignature)
.toList())));
}

/*
TODO Electra, we currently assume we aggregate attestations having the same committee bits
*/
private Supplier<SszBitvector> buildCommitteeBitsSupplier(final SpecVersion specVersion) {
final Supplier<SszBitvector> committeeBitsSupplier;
if (specVersion.getMilestone().isGreaterThanOrEqualTo(SpecMilestone.ELECTRA)) {
committeeBitsSupplier =
() ->
includedAttestations.stream()
.map(
includedAttestation ->
includedAttestation.getAttestation().getCommitteeBitsRequired())
.findFirst()
.orElseThrow(
() -> new IllegalArgumentException("Error while aggregating committee bit"));
} else {
committeeBitsSupplier = () -> null;
}
return committeeBitsSupplier;
}

public Collection<ValidatableAttestation> getIncludedAttestations() {
return includedAttestations;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ public synchronized List<Attestation> getAttestations(
final Predicate<Map.Entry<UInt64, Set<Bytes>>> filterForSlot =
(entry) -> maybeSlot.map(slot -> entry.getKey().equals(slot)).orElse(true);

// TODO fix for electra
final Predicate<MatchingDataAttestationGroup> filterForCommitteeIndex =
(group) ->
maybeCommitteeIndex
Expand All @@ -238,9 +239,9 @@ private boolean isValid(
}

public synchronized Optional<ValidatableAttestation> createAggregateFor(
final Bytes32 attestationHashTreeRoot) {
final Bytes32 attestationHashTreeRoot, final Optional<UInt64> committeeIndex) {
return Optional.ofNullable(attestationGroupByDataHash.get(attestationHashTreeRoot))
.flatMap(attestations -> attestations.stream().findFirst());
.flatMap(attestations -> attestations.stream(committeeIndex).findFirst());
}

public synchronized void onReorg(final UInt64 commonAncestorSlot) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@
import java.util.NavigableMap;
import java.util.Optional;
import java.util.Set;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.TreeMap;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.tuweni.bytes.Bytes32;
import tech.pegasys.teku.infrastructure.ssz.collections.SszBitlist;
import tech.pegasys.teku.infrastructure.ssz.collections.SszBitvector;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.datastructures.attestation.ValidatableAttestation;
Expand Down Expand Up @@ -112,6 +115,9 @@ public boolean add(final ValidatableAttestation attestation) {
* Iterates through the aggregation of attestations in this group. The iterator attempts to create
* the minimum number of attestations that include all attestations in the group.
*
* <p>committeeIndex is an optional parameter that enables aggregation over a specified committee
* (applies to Electra only)
*
* <p>While it is guaranteed that every validator from an attestation in this group is included in
* an aggregate produced by this iterator, there is no guarantee that the added attestation
* instances themselves will be included.
Expand All @@ -120,11 +126,23 @@ public boolean add(final ValidatableAttestation attestation) {
*/
@Override
public Iterator<ValidatableAttestation> iterator() {
return new AggregatingIterator();
return new AggregatingIterator(Optional.empty());
}

public Iterator<ValidatableAttestation> iterator(final Optional<UInt64> committeeIndex) {
return new AggregatingIterator(committeeIndex);
}

public Stream<ValidatableAttestation> stream() {
return StreamSupport.stream(spliterator(), false);
return StreamSupport.stream(spliterator(Optional.empty()), false);
}

public Stream<ValidatableAttestation> stream(final Optional<UInt64> committeeIndex) {
return StreamSupport.stream(spliterator(committeeIndex), false);
}

public Spliterator<ValidatableAttestation> spliterator(final Optional<UInt64> committeeIndex) {
return Spliterators.spliteratorUnknownSize(iterator(committeeIndex), 0);
}

/**
Expand Down Expand Up @@ -200,6 +218,11 @@ public boolean matchesCommitteeShufflingSeed(final Set<Bytes32> validSeeds) {

private class AggregatingIterator implements Iterator<ValidatableAttestation> {
private SszBitlist includedValidators = MatchingDataAttestationGroup.this.includedValidators;
private final Optional<UInt64> maybeCommitteeIndex;

private AggregatingIterator(final Optional<UInt64> committeeIndex) {
this.maybeCommitteeIndex = committeeIndex;
}

@Override
public boolean hasNext() {
Expand All @@ -226,10 +249,28 @@ public ValidatableAttestation next() {
public Stream<ValidatableAttestation> streamRemainingAttestations() {
return attestationsByValidatorCount.values().stream()
.flatMap(Set::stream)
.filter(this::maybeFilterOnCommitteeIndex)
.filter(
candidate ->
!includedValidators.isSuperSetOf(
candidate.getAttestation().getAggregationBits()));
}

/*
If we have attestations with committeeBits (Electra) then, if maybeCommitteeIndex is specified, we will consider attestation related to that committee only
*/
private boolean maybeFilterOnCommitteeIndex(ValidatableAttestation candidate) {
final Optional<SszBitvector> maybeCommitteeBits =
candidate.getAttestation().getCommitteeBits();
if (maybeCommitteeBits.isEmpty() || maybeCommitteeIndex.isEmpty()) {
return true;
}

final SszBitvector committeeBits = maybeCommitteeBits.get();
if (committeeBits.getBitCount() != 1) {
return false;
}
return committeeBits.isSet(maybeCommitteeIndex.get().intValue());
}
}
}
Loading

0 comments on commit 1991869

Please sign in to comment.