From 59ddd60208ff7ede417d3b623a771ab245430027 Mon Sep 17 00:00:00 2001 From: Gianguido Sora Date: Tue, 14 May 2024 10:57:48 +0200 Subject: [PATCH] validator/client: process Sync Committee roles separately In a DV context, to be compatible with the proposed selection endpoint, the VC must push all partial selections to it instead of just one. Process sync committee role separately within the RolesAt method, so that partial selections can be pushed to the DV client appropriately, if configured. --- validator/client/validator.go | 107 ++++++++++++++++++++++------------ 1 file changed, 69 insertions(+), 38 deletions(-) diff --git a/validator/client/validator.go b/validator/client/validator.go index 83a0c431c155..c6032bec1cdc 100644 --- a/validator/client/validator.go +++ b/validator/client/validator.go @@ -682,11 +682,19 @@ func (v *validator) subscribeToSubnets(ctx context.Context, res *ethpb.DutiesRes // RolesAt slot returns the validator roles at the given slot. Returns nil if the // validator is known to not have a roles at the slot. Returns UNKNOWN if the -// validator assignments are unknown. Otherwise returns a valid ValidatorRole map. +// validator assignments are unknown. Otherwise, returns a valid ValidatorRole map. func (v *validator) RolesAt(ctx context.Context, slot primitives.Slot) (map[[fieldparams.BLSPubkeyLength]byte][]iface.ValidatorRole, error) { v.dutiesLock.RLock() defer v.dutiesLock.RUnlock() - rolesAt := make(map[[fieldparams.BLSPubkeyLength]byte][]iface.ValidatorRole) + + var ( + rolesAt = make(map[[fieldparams.BLSPubkeyLength]byte][]iface.ValidatorRole) + + // store sync committee duties pubkeys and share indices in slices for + // potential DV processing + syncCommitteeValidators = make(map[primitives.ValidatorIndex][fieldparams.BLSPubkeyLength]byte) + ) + for validator, duty := range v.duties.CurrentEpochDuties { var roles []iface.ValidatorRole @@ -701,6 +709,7 @@ func (v *validator) RolesAt(ctx context.Context, slot primitives.Slot) (map[[fie } } } + if duty.AttesterSlot == slot { roles = append(roles, iface.RoleAttester) @@ -726,19 +735,11 @@ func (v *validator) RolesAt(ctx context.Context, slot primitives.Slot) (map[[fie if duty.IsSyncCommittee { roles = append(roles, iface.RoleSyncCommittee) inSyncCommittee = true - } - } - if inSyncCommittee { - aggregator, err := v.isSyncCommitteeAggregator(ctx, slot, bytesutil.ToBytes48(duty.PublicKey), duty.ValidatorIndex) - if err != nil { - return nil, errors.Wrap(err, "could not check if a validator is a sync committee aggregator") - } - if aggregator { - roles = append(roles, iface.RoleSyncCommitteeAggregator) + syncCommitteeValidators[duty.ValidatorIndex] = bytesutil.ToBytes48(duty.PublicKey) } } - if len(roles) == 0 { + if len(roles) == 0 && !inSyncCommittee { roles = append(roles, iface.RoleUnknown) } @@ -746,6 +747,28 @@ func (v *validator) RolesAt(ctx context.Context, slot primitives.Slot) (map[[fie copy(pubKey[:], duty.PublicKey) rolesAt[pubKey] = roles } + + aggregator, err := v.isSyncCommitteeAggregator( + ctx, + slot, + syncCommitteeValidators, + ) + + if err != nil { + return nil, errors.Wrap(err, "could not check if validators are a sync committee aggregator") + } + + for valIdx, isAgg := range aggregator { + if isAgg { + valPubkey, ok := syncCommitteeValidators[valIdx] + if !ok { + return nil, errors.New("validator is marked as sync committee aggregator but cannot be found in sync committee validator list") + } + + rolesAt[bytesutil.ToBytes48(valPubkey[:])] = append(rolesAt[bytesutil.ToBytes48(valPubkey[:])], iface.RoleSyncCommitteeAggregator) + } + } + return rolesAt, nil } @@ -794,51 +817,59 @@ func (v *validator) isAggregator(ctx context.Context, committee []primitives.Val // // modulo = max(1, SYNC_COMMITTEE_SIZE // SYNC_COMMITTEE_SUBNET_COUNT // TARGET_AGGREGATORS_PER_SYNC_SUBCOMMITTEE) // return bytes_to_uint64(hash(signature)[0:8]) % modulo == 0 -func (v *validator) isSyncCommitteeAggregator(ctx context.Context, slot primitives.Slot, pubKey [fieldparams.BLSPubkeyLength]byte, validatorIndex primitives.ValidatorIndex) (bool, error) { - res, err := v.validatorClient.GetSyncSubcommitteeIndex(ctx, ðpb.SyncSubcommitteeIndexRequest{ - PublicKey: pubKey[:], - Slot: slot, - }) - if err != nil { - return false, err - } +func (v *validator) isSyncCommitteeAggregator(ctx context.Context, slot primitives.Slot, validators map[primitives.ValidatorIndex][fieldparams.BLSPubkeyLength]byte) (map[primitives.ValidatorIndex]bool, error) { + var ( + selections []iface.SyncCommitteeSelection + isAgg = make(map[primitives.ValidatorIndex]bool) + ) + + for valIdx, pubKey := range validators { + res, err := v.validatorClient.GetSyncSubcommitteeIndex(ctx, ðpb.SyncSubcommitteeIndexRequest{ + PublicKey: pubKey[:], + Slot: slot, + }) - var selections []iface.SyncCommitteeSelection - for _, index := range res.Indices { - subCommitteeSize := params.BeaconConfig().SyncCommitteeSize / params.BeaconConfig().SyncCommitteeSubnetCount - subnet := uint64(index) / subCommitteeSize - sig, err := v.signSyncSelectionData(ctx, pubKey, subnet, slot) if err != nil { - return false, err + return nil, errors.Wrap(err, "can't fetch sync subcommittee index") + } + + for _, index := range res.Indices { + subCommitteeSize := params.BeaconConfig().SyncCommitteeSize / params.BeaconConfig().SyncCommitteeSubnetCount + subnet := uint64(index) / subCommitteeSize + sig, err := v.signSyncSelectionData(ctx, pubKey, subnet, slot) + if err != nil { + return nil, errors.Wrap(err, "can't sign selection data") + } + + selections = append(selections, iface.SyncCommitteeSelection{ + SelectionProof: sig, + Slot: slot, + SubcommitteeIndex: primitives.CommitteeIndex(subnet), + ValidatorIndex: valIdx, + }) } - selections = append(selections, iface.SyncCommitteeSelection{ - SelectionProof: sig, - Slot: slot, - SubcommitteeIndex: primitives.CommitteeIndex(subnet), - ValidatorIndex: validatorIndex, - }) } // Override selections with aggregated ones if the node is part of a Distributed Validator. if v.distributed && len(selections) > 0 { + var err error selections, err = v.validatorClient.GetAggregatedSyncSelections(ctx, selections) if err != nil { - return false, errors.Wrap(err, "failed to get aggregated sync selections") + return nil, errors.Wrap(err, "failed to get aggregated sync selections") } } for _, s := range selections { isAggregator, err := altair.IsSyncCommitteeAggregator(s.SelectionProof) if err != nil { - return false, err - } - if isAggregator { - return true, nil + return nil, errors.Wrap(err, "can't detect sync committee aggregator") } + + isAgg[s.ValidatorIndex] = isAggregator } - return false, nil + return isAgg, nil } // UpdateDomainDataCaches by making calls for all of the possible domain data. These can change when