Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve vc logs #13573

Merged
merged 11 commits into from
Feb 8, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions validator/accounts/testing/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ type Validator struct {
proposerSettings *validatorserviceconfig.ProposerSettings
}

func (_ *Validator) LogSyncCommitteeMessagesSubmitted() {}
func (_ *Validator) LogSubmittedSyncCommitteeMessages() {}

func (_ *Validator) Done() {
panic("implement me")
Expand Down Expand Up @@ -154,7 +154,7 @@ func (_ *Validator) SubmitSignedContributionAndProof(_ context.Context, _ primit
panic("implement me")
}

func (_ *Validator) LogAttestationsSubmitted() {
func (_ *Validator) LogSubmittedAtts() {
panic("implement me")
}

Expand Down
6 changes: 3 additions & 3 deletions validator/client/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,9 @@ func (v *validator) addIndicesToLog(duty *ethpb.DutiesResponse_Duty) error {
v.attLogsLock.Lock()
defer v.attLogsLock.Unlock()

for _, log := range v.attLogs {
if duty.CommitteeIndex == log.data.CommitteeIndex {
log.aggregatorIndices = append(log.aggregatorIndices, duty.ValidatorIndex)
for _, l := range v.submittedAtts {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why using l as a variable name?
(the letter l is not in submittedAtts).

Why not using something like att?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is outdated

if duty.CommitteeIndex == l.data.CommitteeIndex {
l.aggregatorPubkeys = append(l.aggregatorPubkeys, duty.PublicKey)
}
}

Expand Down
22 changes: 13 additions & 9 deletions validator/client/attest.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v4/crypto/hash"
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v4/monitoring/tracing"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
Expand Down Expand Up @@ -144,6 +143,11 @@ func (v *validator) SubmitAttestation(ctx context.Context, slot primitives.Slot,
tracing.AnnotateError(span, err)
return
}
r, err := data.HashTreeRoot()
if err != nil {
return
}
logrus.Infof("Submitting attestation %#x", r)
attResp, err := v.validatorClient.ProposeAttestation(ctx, attestation)
if err != nil {
log.WithError(err).Error("Could not submit attestation to beacon node")
Expand All @@ -154,7 +158,7 @@ func (v *validator) SubmitAttestation(ctx context.Context, slot primitives.Slot,
return
}

if err := v.saveAttesterIndexToData(data, duty.ValidatorIndex); err != nil {
if err := v.saveSubmittedAtt(data, duty.PublicKey); err != nil {
log.WithError(err).Error("Could not save validator index for logging")
if v.emitAccountMetrics {
ValidatorAttestFailVec.WithLabelValues(fmtKey).Inc()
Expand Down Expand Up @@ -228,21 +232,21 @@ func (v *validator) getDomainAndSigningRoot(ctx context.Context, data *ethpb.Att
return domain, root, nil
}

// For logging, this saves the last submitted attester index to its attestation data. The purpose of this
// is to enhance attesting logs to be readable when multiple validator keys ran in a single client.
func (v *validator) saveAttesterIndexToData(data *ethpb.AttestationData, index primitives.ValidatorIndex) error {
// saveSubmittedAtt saves the submitted attestation data along with the attester's pubkey.
// The purpose of this is to display combined attesting logs for all keys managed by the validator client.
func (v *validator) saveSubmittedAtt(data *ethpb.AttestationData, pubkey []byte) error {
v.attLogsLock.Lock()
defer v.attLogsLock.Unlock()

h, err := hash.Proto(data)
r, err := data.HashTreeRoot()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have we benchmarked against the two?

if err != nil {
return err
}

if v.attLogs[h] == nil {
v.attLogs[h] = &attSubmitted{data, []primitives.ValidatorIndex{}, []primitives.ValidatorIndex{}}
if v.submittedAtts[r] == nil {
v.submittedAtts[r] = &submittedAtt{data, [][]byte{}, [][]byte{}}
}
v.attLogs[h] = &attSubmitted{data, append(v.attLogs[h].attesterIndices, index), []primitives.ValidatorIndex{}}
v.submittedAtts[r] = &submittedAtt{data, append(v.submittedAtts[r].attesterPubkeys, pubkey), [][]byte{}}

return nil
}
Expand Down
4 changes: 2 additions & 2 deletions validator/client/iface/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ type Validator interface {
SubmitAggregateAndProof(ctx context.Context, slot primitives.Slot, pubKey [fieldparams.BLSPubkeyLength]byte)
SubmitSyncCommitteeMessage(ctx context.Context, slot primitives.Slot, pubKey [fieldparams.BLSPubkeyLength]byte)
SubmitSignedContributionAndProof(ctx context.Context, slot primitives.Slot, pubKey [fieldparams.BLSPubkeyLength]byte)
LogAttestationsSubmitted()
LogSyncCommitteeMessagesSubmitted()
LogSubmittedAtts()
LogSubmittedSyncCommitteeMessages()
UpdateDomainDataCaches(ctx context.Context, slot primitives.Slot)
WaitForKeymanagerInitialization(ctx context.Context) error
Keymanager() (keymanager.IKeymanager, error)
Expand Down
43 changes: 25 additions & 18 deletions validator/client/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,44 +4,51 @@ import (
"fmt"
"sync/atomic"

"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
"github.com/sirupsen/logrus"
)

var log = logrus.WithField("prefix", "validator")

type attSubmitted struct {
type submittedAtt struct {
data *ethpb.AttestationData
attesterIndices []primitives.ValidatorIndex
aggregatorIndices []primitives.ValidatorIndex
attesterPubkeys [][]byte
aggregatorPubkeys [][]byte
}

// LogAttestationsSubmitted logs info about submitted attestations.
func (v *validator) LogAttestationsSubmitted() {
// LogSubmittedAtts logs info about submitted attestations.
func (v *validator) LogSubmittedAtts() {
v.attLogsLock.Lock()
defer v.attLogsLock.Unlock()

for _, attLog := range v.attLogs {
for _, attLog := range v.submittedAtts {
attesterPubkeys := make([]string, len(attLog.attesterPubkeys))
for i, p := range attLog.attesterPubkeys {
attesterPubkeys[i] = fmt.Sprintf("%#x", bytesutil.Trunc(p))
}
aggregatorPubkeys := make([]string, len(attLog.aggregatorPubkeys))
for i, p := range attLog.aggregatorPubkeys {
aggregatorPubkeys[i] = fmt.Sprintf("%#x", bytesutil.Trunc(p))
}
log.WithFields(logrus.Fields{
"Slot": attLog.data.Slot,
"CommitteeIndex": attLog.data.CommitteeIndex,
"BeaconBlockRoot": fmt.Sprintf("%#x", bytesutil.Trunc(attLog.data.BeaconBlockRoot)),
"SourceEpoch": attLog.data.Source.Epoch,
"SourceRoot": fmt.Sprintf("%#x", bytesutil.Trunc(attLog.data.Source.Root)),
"TargetEpoch": attLog.data.Target.Epoch,
"TargetRoot": fmt.Sprintf("%#x", bytesutil.Trunc(attLog.data.Target.Root)),
"AttesterIndices": attLog.attesterIndices,
"AggregatorIndices": attLog.aggregatorIndices,
"slot": attLog.data.Slot,
"committeeIndex": attLog.data.CommitteeIndex,
"beaconBlockRoot": fmt.Sprintf("%#x", bytesutil.Trunc(attLog.data.BeaconBlockRoot)),
"sourceEpoch": attLog.data.Source.Epoch,
"sourceRoot": fmt.Sprintf("%#x", bytesutil.Trunc(attLog.data.Source.Root)),
"targetEpoch": attLog.data.Target.Epoch,
"targetRoot": fmt.Sprintf("%#x", bytesutil.Trunc(attLog.data.Target.Root)),
"attesterPubkeys": attesterPubkeys,
"aggregatorPubkeys": aggregatorPubkeys,
}).Info("Submitted new attestations")
}

v.attLogs = make(map[[32]byte]*attSubmitted)
v.submittedAtts = make(map[[32]byte]*submittedAtt)
}

// LogSyncCommitteeMessagesSubmitted logs info about submitted sync committee messages.
func (v *validator) LogSyncCommitteeMessagesSubmitted() {
func (v *validator) LogSubmittedSyncCommitteeMessages() {
log.WithField("messages", v.syncCommitteeStats.totalMessagesSubmitted).Debug("Submitted sync committee messages successfully to beacon node")
// Reset the amount.
atomic.StoreUint64(&v.syncCommitteeStats.totalMessagesSubmitted, 0)
Expand Down
2 changes: 1 addition & 1 deletion validator/client/propose_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func setupWithKey(t *testing.T, validatorKey bls.SecretKey) (*validator, *mocks,
keyManager: newMockKeymanager(t, keypair{pub: pubKey, pri: validatorKey}),
validatorClient: m.validatorClient,
graffiti: []byte{},
attLogs: make(map[[32]byte]*attSubmitted),
submittedAtts: make(map[[32]byte]*submittedAtt),
aggregatedSlotCommitteeIDCache: aggregatedSlotCommitteeIDCache,
}

Expand Down
6 changes: 3 additions & 3 deletions validator/client/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,9 @@ func performRoles(slotCtx context.Context, allRoles map[[48]byte][]iface.Validat
" should never happen! Please file a report at github.com/prysmaticlabs/prysm/issues/new")
}
}()
// Log this client performance in the previous epoch
v.LogAttestationsSubmitted()
v.LogSyncCommitteeMessagesSubmitted()
// Log performance in the previous slot
v.LogSubmittedAtts()
v.LogSubmittedSyncCommitteeMessages()
if err := v.LogValidatorGainsAndLosses(slotCtx, slot); err != nil {
log.WithError(err).Error("Could not report validator's rewards/penalties")
}
Expand Down
2 changes: 1 addition & 1 deletion validator/client/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (v *ValidatorService) Start() {
prevBalance: make(map[[fieldparams.BLSPubkeyLength]byte]uint64),
pubkeyToValidatorIndex: make(map[[fieldparams.BLSPubkeyLength]byte]primitives.ValidatorIndex),
signedValidatorRegistrations: make(map[[fieldparams.BLSPubkeyLength]byte]*ethpb.SignedValidatorRegistrationV1),
attLogs: make(map[[32]byte]*attSubmitted),
submittedAtts: make(map[[32]byte]*submittedAtt),
domainDataCache: cache,
aggregatedSlotCommitteeIDCache: aggregatedSlotCommitteeIDCache,
voteStats: voteStats{startEpoch: primitives.Epoch(^uint64(0))},
Expand Down
6 changes: 3 additions & 3 deletions validator/client/testutil/mock_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (fv *FakeValidator) WaitForKeymanagerInitialization(_ context.Context) erro
}

// LogSyncCommitteeMessagesSubmitted --
func (fv *FakeValidator) LogSyncCommitteeMessagesSubmitted() {}
func (fv *FakeValidator) LogSubmittedSyncCommitteeMessages() {}

// WaitForChainStart for mocking.
func (fv *FakeValidator) WaitForChainStart(_ context.Context) error {
Expand Down Expand Up @@ -181,8 +181,8 @@ func (*FakeValidator) SubmitAggregateAndProof(_ context.Context, _ primitives.Sl
func (*FakeValidator) SubmitSyncCommitteeMessage(_ context.Context, _ primitives.Slot, _ [fieldparams.BLSPubkeyLength]byte) {
}

// LogAttestationsSubmitted for mocking.
func (*FakeValidator) LogAttestationsSubmitted() {}
// LogSubmittedAtts for mocking.
func (*FakeValidator) LogSubmittedAtts() {}

// UpdateDomainDataCaches for mocking.
func (*FakeValidator) UpdateDomainDataCaches(context.Context, primitives.Slot) {}
Expand Down
80 changes: 43 additions & 37 deletions validator/client/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ type validator struct {
slashableKeysLock sync.RWMutex
eipImportBlacklistedPublicKeys map[[fieldparams.BLSPubkeyLength]byte]bool
walletInitializedFeed *event.Feed
attLogs map[[32]byte]*attSubmitted
submittedAtts map[[32]byte]*submittedAtt
startBalances map[[fieldparams.BLSPubkeyLength]byte]uint64
dutiesLock sync.RWMutex
duties *ethpb.DutiesResponse
Expand Down Expand Up @@ -540,7 +540,7 @@ func retrieveLatestRecord(recs []*kv.AttestationRecord) *kv.AttestationRecord {
// list of upcoming assignments needs to be updated. For example, at the
// beginning of a new epoch.
func (v *validator) UpdateDuties(ctx context.Context, slot primitives.Slot) error {
if slot%params.BeaconConfig().SlotsPerEpoch != 0 && v.duties != nil {
if !slots.IsEpochStart(slot) && v.duties != nil {
// Do nothing if not epoch start AND assignments already exist.
return nil
}
Expand Down Expand Up @@ -889,12 +889,16 @@ func (v *validator) logDuties(slot primitives.Slot, currentEpochDuties []*ethpb.
attesterKeys[i] = make([]string, 0)
}
proposerKeys := make([]string, params.BeaconConfig().SlotsPerEpoch)
slotOffset := slot - (slot % params.BeaconConfig().SlotsPerEpoch)
var totalAttestingKeys uint64
epochStartSlot, err := slots.EpochStart(slots.ToEpoch(slot))
if err != nil {
log.WithError(err).Error("Could not calculate epoch start. Ignoring logging duties.")
return
}
var totalProposingKeys, totalAttestingKeys uint64
for _, duty := range currentEpochDuties {
validatorNotTruncatedKey := fmt.Sprintf("%#x", duty.PublicKey)
pubkey := fmt.Sprintf("%#x", duty.PublicKey)
if v.emitAccountMetrics {
ValidatorStatusesGaugeVec.WithLabelValues(validatorNotTruncatedKey).Set(float64(duty.Status))
ValidatorStatusesGaugeVec.WithLabelValues(pubkey).Set(float64(duty.Status))
}

// Only interested in validators who are attesting/proposing.
Expand All @@ -903,39 +907,40 @@ func (v *validator) logDuties(slot primitives.Slot, currentEpochDuties []*ethpb.
continue
}

validatorKey := fmt.Sprintf("%#x", bytesutil.Trunc(duty.PublicKey))
attesterIndex := duty.AttesterSlot - slotOffset
if attesterIndex >= params.BeaconConfig().SlotsPerEpoch {
truncatedPubkey := fmt.Sprintf("%#x", bytesutil.Trunc(duty.PublicKey))
attesterSlotInEpoch := duty.AttesterSlot - epochStartSlot
if attesterSlotInEpoch >= params.BeaconConfig().SlotsPerEpoch {
log.WithField("duty", duty).Warn("Invalid attester slot")
} else {
attesterKeys[duty.AttesterSlot-slotOffset] = append(attesterKeys[duty.AttesterSlot-slotOffset], validatorKey)
attesterKeys[attesterSlotInEpoch] = append(attesterKeys[attesterSlotInEpoch], truncatedPubkey)
totalAttestingKeys++
if v.emitAccountMetrics {
ValidatorNextAttestationSlotGaugeVec.WithLabelValues(validatorNotTruncatedKey).Set(float64(duty.AttesterSlot))
ValidatorNextAttestationSlotGaugeVec.WithLabelValues(pubkey).Set(float64(duty.AttesterSlot))
}
}
if v.emitAccountMetrics && duty.IsSyncCommittee {
ValidatorInSyncCommitteeGaugeVec.WithLabelValues(validatorNotTruncatedKey).Set(float64(1))
ValidatorInSyncCommitteeGaugeVec.WithLabelValues(pubkey).Set(float64(1))
} else if v.emitAccountMetrics && !duty.IsSyncCommittee {
// clear the metric out if the validator is not in the current sync committee anymore otherwise it will be left at 1
ValidatorInSyncCommitteeGaugeVec.WithLabelValues(validatorNotTruncatedKey).Set(float64(0))
ValidatorInSyncCommitteeGaugeVec.WithLabelValues(pubkey).Set(float64(0))
}

for _, proposerSlot := range duty.ProposerSlots {
proposerIndex := proposerSlot - slotOffset
if proposerIndex >= params.BeaconConfig().SlotsPerEpoch {
proposerSlotInEpoch := proposerSlot - epochStartSlot
if proposerSlotInEpoch >= params.BeaconConfig().SlotsPerEpoch {
log.WithField("duty", duty).Warn("Invalid proposer slot")
} else {
proposerKeys[proposerIndex] = validatorKey
proposerKeys[proposerSlotInEpoch] = truncatedPubkey
totalProposingKeys++
}
if v.emitAccountMetrics {
ValidatorNextProposalSlotGaugeVec.WithLabelValues(validatorNotTruncatedKey).Set(float64(proposerSlot))
ValidatorNextProposalSlotGaugeVec.WithLabelValues(pubkey).Set(float64(proposerSlot))
}
}
}
for _, duty := range nextEpochDuties {
// for the next epoch, currently we are only interested in whether the validator is in the next sync committee or not
validatorNotTruncatedKey := fmt.Sprintf("%#x", duty.PublicKey)
pubkey := fmt.Sprintf("%#x", duty.PublicKey)

// Only interested in validators who are attesting/proposing.
// Note that slashed validators will have duties but their results are ignored by the network so we don't bother with them.
Expand All @@ -944,36 +949,37 @@ func (v *validator) logDuties(slot primitives.Slot, currentEpochDuties []*ethpb.
}

if v.emitAccountMetrics && duty.IsSyncCommittee {
ValidatorInNextSyncCommitteeGaugeVec.WithLabelValues(validatorNotTruncatedKey).Set(float64(1))
ValidatorInNextSyncCommitteeGaugeVec.WithLabelValues(pubkey).Set(float64(1))
} else if v.emitAccountMetrics && !duty.IsSyncCommittee {
// clear the metric out if the validator is now not in the next sync committee otherwise it will be left at 1
ValidatorInNextSyncCommitteeGaugeVec.WithLabelValues(validatorNotTruncatedKey).Set(float64(0))
ValidatorInNextSyncCommitteeGaugeVec.WithLabelValues(pubkey).Set(float64(0))
}
}

log.WithFields(logrus.Fields{
"proposerCount": totalProposingKeys,
"attesterCount": totalAttestingKeys,
}).Infof("Schedule for epoch %d", slots.ToEpoch(slot))
for i := primitives.Slot(0); i < params.BeaconConfig().SlotsPerEpoch; i++ {
startTime := slots.StartTime(v.genesisTime, slotOffset+i)
startTime := slots.StartTime(v.genesisTime, epochStartSlot+i)
durationTillDuty := (time.Until(startTime) + time.Second).Truncate(time.Second) // Round up to next second.

slotLog := log.WithFields(logrus.Fields{})
if proposerKeys[i] != "" {
slotLog = slotLog.WithField("proposerPubkey", proposerKeys[i])
}
if len(attesterKeys[i]) > 0 {
attestationLog := log.WithFields(logrus.Fields{
"slot": slotOffset + i,
"slotInEpoch": (slotOffset + i) % params.BeaconConfig().SlotsPerEpoch,
"attesterDutiesAtSlot": len(attesterKeys[i]),
"totalAttestersInEpoch": totalAttestingKeys,
"pubKeys": attesterKeys[i],
slotLog = slotLog.WithFields(logrus.Fields{
"slot": epochStartSlot + i,
"slotInEpoch": (epochStartSlot + i) % params.BeaconConfig().SlotsPerEpoch,
"attesterCount": len(attesterKeys[i]),
"attesterPubkeys": attesterKeys[i],
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have a proposerPubkey field too, so can't use pubkeys

})
if durationTillDuty > 0 {
attestationLog = attestationLog.WithField("timeTillDuty", durationTillDuty)
}
attestationLog.Info("Attestation schedule")
}
if proposerKeys[i] != "" {
proposerLog := log.WithField("slot", slotOffset+i).WithField("pubKey", proposerKeys[i])
if durationTillDuty > 0 {
proposerLog = proposerLog.WithField("timeTillDuty", durationTillDuty)
}
proposerLog.Info("Proposal schedule")
if durationTillDuty > 0 {
slotLog = slotLog.WithField("timeTillDuty", durationTillDuty)
}
slotLog.Infof("Duties schedule")
}
}

Expand Down
Loading