Skip to content

Commit

Permalink
validator/client: process Sync Committee roles separately (#13995)
Browse files Browse the repository at this point in the history
* validator/client: process Sync Committee roles separately

In a DV context, to be compatible with the proposed selection endpoint, the VC must push all partial selections to it instead of just one.

Process sync committee role separately within the RolesAt method, so that partial selections can be pushed to the DV client appropriately, if configured.

* testing/util: re-add erroneously deleted state_test.go

* validator/client: fix tests

* validator/client: always process sync committee validator

Process sync committee duty at slot boundary as well.

* don't fail if validator is marked as sync committee but it is not in the list

ignore the duty and continue

* address code review comments

* fix build

---------

Co-authored-by: Radosław Kapka <rkapka@wp.pl>
  • Loading branch information
gsora and rkapka authored Jul 1, 2024
1 parent af5eb82 commit 657750b
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 44 deletions.
108 changes: 72 additions & 36 deletions validator/client/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -701,14 +701,22 @@ func (v *validator) subscribeToSubnets(ctx context.Context, duties *ethpb.Duties

// RolesAt slot returns the validator roles at the given slot. Returns nil if the
// validator is known to not have a roles at the slot. Returns UNKNOWN if the
// validator assignments are unknown. Otherwise returns a valid ValidatorRole map.
// validator assignments are unknown. Otherwise, returns a valid ValidatorRole map.
func (v *validator) RolesAt(ctx context.Context, slot primitives.Slot) (map[[fieldparams.BLSPubkeyLength]byte][]iface.ValidatorRole, error) {
ctx, span := trace.StartSpan(ctx, "validator.RolesAt")
defer span.End()

v.dutiesLock.RLock()
defer v.dutiesLock.RUnlock()
rolesAt := make(map[[fieldparams.BLSPubkeyLength]byte][]iface.ValidatorRole)

var (
rolesAt = make(map[[fieldparams.BLSPubkeyLength]byte][]iface.ValidatorRole)

// store sync committee duties pubkeys and share indices in slices for
// potential DV processing
syncCommitteeValidators = make(map[primitives.ValidatorIndex][fieldparams.BLSPubkeyLength]byte)
)

for validator, duty := range v.duties.CurrentEpochDuties {
var roles []iface.ValidatorRole

Expand All @@ -723,6 +731,7 @@ func (v *validator) RolesAt(ctx context.Context, slot primitives.Slot) (map[[fie
}
}
}

if duty.AttesterSlot == slot {
roles = append(roles, iface.RoleAttester)

Expand Down Expand Up @@ -751,15 +760,9 @@ func (v *validator) RolesAt(ctx context.Context, slot primitives.Slot) (map[[fie
inSyncCommittee = true
}
}

if inSyncCommittee {
aggregator, err := v.isSyncCommitteeAggregator(ctx, slot, bytesutil.ToBytes48(duty.PublicKey), duty.ValidatorIndex)
if err != nil {
aggregator = false
log.WithError(err).Errorf("Could not check if validator %#x is an aggregator", bytesutil.Trunc(duty.PublicKey))
}
if aggregator {
roles = append(roles, iface.RoleSyncCommitteeAggregator)
}
syncCommitteeValidators[duty.ValidatorIndex] = bytesutil.ToBytes48(duty.PublicKey)
}

if len(roles) == 0 {
Expand All @@ -770,6 +773,32 @@ func (v *validator) RolesAt(ctx context.Context, slot primitives.Slot) (map[[fie
copy(pubKey[:], duty.PublicKey)
rolesAt[pubKey] = roles
}

aggregator, err := v.isSyncCommitteeAggregator(
ctx,
slot,
syncCommitteeValidators,
)

if err != nil {
log.WithError(err).Error("Could not check if any validator is a sync committee aggregator")
return rolesAt, nil
}

for valIdx, isAgg := range aggregator {
if isAgg {
valPubkey, ok := syncCommitteeValidators[valIdx]
if !ok {
log.
WithField("pubkey", fmt.Sprintf("%#x", bytesutil.Trunc(valPubkey[:]))).
Warn("Validator is marked as sync committee aggregator but cannot be found in sync committee validator list")
continue
}

rolesAt[bytesutil.ToBytes48(valPubkey[:])] = append(rolesAt[bytesutil.ToBytes48(valPubkey[:])], iface.RoleSyncCommitteeAggregator)
}
}

return rolesAt, nil
}

Expand Down Expand Up @@ -827,54 +856,61 @@ func (v *validator) isAggregator(
//
// modulo = max(1, SYNC_COMMITTEE_SIZE // SYNC_COMMITTEE_SUBNET_COUNT // TARGET_AGGREGATORS_PER_SYNC_SUBCOMMITTEE)
// return bytes_to_uint64(hash(signature)[0:8]) % modulo == 0
func (v *validator) isSyncCommitteeAggregator(ctx context.Context, slot primitives.Slot, pubKey [fieldparams.BLSPubkeyLength]byte, validatorIndex primitives.ValidatorIndex) (bool, error) {
func (v *validator) isSyncCommitteeAggregator(ctx context.Context, slot primitives.Slot, validators map[primitives.ValidatorIndex][fieldparams.BLSPubkeyLength]byte) (map[primitives.ValidatorIndex]bool, error) {
ctx, span := trace.StartSpan(ctx, "validator.isSyncCommitteeAggregator")
defer span.End()

res, err := v.validatorClient.SyncSubcommitteeIndex(ctx, &ethpb.SyncSubcommitteeIndexRequest{
PublicKey: pubKey[:],
Slot: slot,
})
if err != nil {
return false, err
}
var (
selections []iface.SyncCommitteeSelection
isAgg = make(map[primitives.ValidatorIndex]bool)
)

for valIdx, pubKey := range validators {
res, err := v.validatorClient.SyncSubcommitteeIndex(ctx, &ethpb.SyncSubcommitteeIndexRequest{
PublicKey: pubKey[:],
Slot: slot,
})

var selections []iface.SyncCommitteeSelection
for _, index := range res.Indices {
subCommitteeSize := params.BeaconConfig().SyncCommitteeSize / params.BeaconConfig().SyncCommitteeSubnetCount
subnet := uint64(index) / subCommitteeSize
sig, err := v.signSyncSelectionData(ctx, pubKey, subnet, slot)
if err != nil {
return false, err
return nil, errors.Wrap(err, "can't fetch sync subcommittee index")
}

selections = append(selections, iface.SyncCommitteeSelection{
SelectionProof: sig,
Slot: slot,
SubcommitteeIndex: primitives.CommitteeIndex(subnet),
ValidatorIndex: validatorIndex,
})
for _, index := range res.Indices {
subCommitteeSize := params.BeaconConfig().SyncCommitteeSize / params.BeaconConfig().SyncCommitteeSubnetCount
subnet := uint64(index) / subCommitteeSize
sig, err := v.signSyncSelectionData(ctx, pubKey, subnet, slot)
if err != nil {
return nil, errors.Wrap(err, "can't sign selection data")
}

selections = append(selections, iface.SyncCommitteeSelection{
SelectionProof: sig,
Slot: slot,
SubcommitteeIndex: primitives.CommitteeIndex(subnet),
ValidatorIndex: valIdx,
})
}
}

// Override selections with aggregated ones if the node is part of a Distributed Validator.
if v.distributed && len(selections) > 0 {
var err error
selections, err = v.validatorClient.AggregatedSyncSelections(ctx, selections)
if err != nil {
return false, errors.Wrap(err, "failed to get aggregated sync selections")
return nil, errors.Wrap(err, "failed to get aggregated sync selections")
}
}

for _, s := range selections {
isAggregator, err := altair.IsSyncCommitteeAggregator(s.SelectionProof)
if err != nil {
return false, err
}
if isAggregator {
return true, nil
return nil, errors.Wrap(err, "can't detect sync committee aggregator")
}

isAgg[s.ValidatorIndex] = isAggregator
}

return false, nil
return isAgg, nil
}

// UpdateDomainDataCaches by making calls for all of the possible domain data. These can change when
Expand Down
24 changes: 16 additions & 8 deletions validator/client/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1263,9 +1263,11 @@ func TestIsSyncCommitteeAggregator_OK(t *testing.T) {
},
).Return(&ethpb.SyncSubcommitteeIndexResponse{}, nil /*err*/)

aggregator, err := v.isSyncCommitteeAggregator(context.Background(), slot, bytesutil.ToBytes48(pubKey), 0)
aggregator, err := v.isSyncCommitteeAggregator(context.Background(), slot, map[primitives.ValidatorIndex][fieldparams.BLSPubkeyLength]byte{
0: bytesutil.ToBytes48(pubKey),
})
require.NoError(t, err)
require.Equal(t, false, aggregator)
require.Equal(t, false, aggregator[0])

c := params.BeaconConfig().Copy()
c.TargetAggregatorsPerSyncSubcommittee = math.MaxUint64
Expand All @@ -1284,9 +1286,11 @@ func TestIsSyncCommitteeAggregator_OK(t *testing.T) {
},
).Return(&ethpb.SyncSubcommitteeIndexResponse{Indices: []primitives.CommitteeIndex{0}}, nil /*err*/)

aggregator, err = v.isSyncCommitteeAggregator(context.Background(), slot, bytesutil.ToBytes48(pubKey), 0)
aggregator, err = v.isSyncCommitteeAggregator(context.Background(), slot, map[primitives.ValidatorIndex][fieldparams.BLSPubkeyLength]byte{
0: bytesutil.ToBytes48(pubKey),
})
require.NoError(t, err)
require.Equal(t, true, aggregator)
require.Equal(t, true, aggregator[0])
})
}
}
Expand All @@ -1310,9 +1314,11 @@ func TestIsSyncCommitteeAggregator_Distributed_OK(t *testing.T) {
},
).Return(&ethpb.SyncSubcommitteeIndexResponse{}, nil /*err*/)

aggregator, err := v.isSyncCommitteeAggregator(context.Background(), slot, bytesutil.ToBytes48(pubKey), 0)
aggregator, err := v.isSyncCommitteeAggregator(context.Background(), slot, map[primitives.ValidatorIndex][fieldparams.BLSPubkeyLength]byte{
0: bytesutil.ToBytes48(pubKey),
})
require.NoError(t, err)
require.Equal(t, false, aggregator)
require.Equal(t, false, aggregator[0])

c := params.BeaconConfig().Copy()
c.TargetAggregatorsPerSyncSubcommittee = math.MaxUint64
Expand Down Expand Up @@ -1345,9 +1351,11 @@ func TestIsSyncCommitteeAggregator_Distributed_OK(t *testing.T) {
[]iface.SyncCommitteeSelection{selection},
).Return([]iface.SyncCommitteeSelection{selection}, nil)

aggregator, err = v.isSyncCommitteeAggregator(context.Background(), slot, bytesutil.ToBytes48(pubKey), 123)
aggregator, err = v.isSyncCommitteeAggregator(context.Background(), slot, map[primitives.ValidatorIndex][fieldparams.BLSPubkeyLength]byte{
123: bytesutil.ToBytes48(pubKey),
})
require.NoError(t, err)
require.Equal(t, true, aggregator)
require.Equal(t, true, aggregator[123])
})
}
}
Expand Down

0 comments on commit 657750b

Please sign in to comment.