From ddc56105523efb1576bcb8242561f6837c5cdfa7 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Tue, 13 Feb 2024 16:28:53 +0100 Subject: [PATCH 01/11] `getChunk` ==> `getChunkFromDatabase`. --- beacon-chain/slasher/detect_attestations.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/beacon-chain/slasher/detect_attestations.go b/beacon-chain/slasher/detect_attestations.go index a96d17d2a764..937c094d75cf 100644 --- a/beacon-chain/slasher/detect_attestations.go +++ b/beacon-chain/slasher/detect_attestations.go @@ -290,7 +290,7 @@ func (s *Service) epochUpdateForValidator( currentChunk, ok := updatedChunks[chunkIndex] if !ok { - currentChunk, err = s.getChunk(ctx, chunkKind, validatorChunkIndex, chunkIndex) + currentChunk, err = s.getChunkFromDatabase(ctx, chunkKind, validatorChunkIndex, chunkIndex) if err != nil { return errors.Wrap(err, "could not get chunk") } @@ -409,7 +409,7 @@ func (s *Service) applyAttestationForValidator( chunk, ok := chunksByChunkIdx[chunkIndex] if !ok { - chunk, err = s.getChunk(ctx, chunkKind, validatorChunkIndex, chunkIndex) + chunk, err = s.getChunkFromDatabase(ctx, chunkKind, validatorChunkIndex, chunkIndex) if err != nil { return nil, errors.Wrapf(err, "could not get chunk at index %d", chunkIndex) } @@ -451,7 +451,7 @@ func (s *Service) applyAttestationForValidator( chunk, ok := chunksByChunkIdx[chunkIndex] if !ok { - chunk, err = s.getChunk(ctx, chunkKind, validatorChunkIndex, chunkIndex) + chunk, err = s.getChunkFromDatabase(ctx, chunkKind, validatorChunkIndex, chunkIndex) if err != nil { return nil, errors.Wrapf(err, "could not get chunk at index %d", chunkIndex) } @@ -490,8 +490,8 @@ func (s *Service) applyAttestationForValidator( return nil, nil } -// Retrieve a chunk from database from database. -func (s *Service) getChunk( +// Retrieve a chunk from database. +func (s *Service) getChunkFromDatabase( ctx context.Context, chunkKind slashertypes.ChunkKind, validatorChunkIndex uint64, From 40fe599ff9f4984a7c328a4b107f54b19bff7483 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Tue, 13 Feb 2024 16:29:33 +0100 Subject: [PATCH 02/11] `loadChunks`: Rename variables. --- beacon-chain/slasher/detect_attestations.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/beacon-chain/slasher/detect_attestations.go b/beacon-chain/slasher/detect_attestations.go index 937c094d75cf..2b3f89b5d695 100644 --- a/beacon-chain/slasher/detect_attestations.go +++ b/beacon-chain/slasher/detect_attestations.go @@ -517,14 +517,14 @@ func (s *Service) loadChunks( ctx context.Context, validatorChunkIndex uint64, chunkKind slashertypes.ChunkKind, - chunkIndices []uint64, + chunkIndexes []uint64, ) (map[uint64]Chunker, error) { ctx, span := trace.StartSpan(ctx, "Slasher.loadChunks") defer span.End() - chunkKeys := make([][]byte, 0, len(chunkIndices)) - for _, chunkIdx := range chunkIndices { - chunkKeys = append(chunkKeys, s.params.flatSliceID(validatorChunkIndex, chunkIdx)) + chunkKeys := make([][]byte, 0, len(chunkIndexes)) + for _, chunkIndex := range chunkIndexes { + chunkKeys = append(chunkKeys, s.params.flatSliceID(validatorChunkIndex, chunkIndex)) } rawChunks, chunksExist, err := s.serviceCfg.Database.LoadSlasherChunks(ctx, chunkKind, chunkKeys) @@ -563,7 +563,7 @@ func (s *Service) loadChunks( return nil, errors.Wrap(err, "could not initialize chunk") } - chunksByChunkIdx[chunkIndices[i]] = chunk + chunksByChunkIdx[chunkIndexes[i]] = chunk } return chunksByChunkIdx, nil From e138851543a7918fdcb9de6301929f2735dadf12 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Tue, 13 Feb 2024 18:01:32 +0100 Subject: [PATCH 03/11] `Update`: Use explicit arguments. --- beacon-chain/slasher/chunks.go | 27 ++++---- beacon-chain/slasher/chunks_test.go | 68 ++++++--------------- beacon-chain/slasher/detect_attestations.go | 6 +- 3 files changed, 31 insertions(+), 70 deletions(-) diff --git a/beacon-chain/slasher/chunks.go b/beacon-chain/slasher/chunks.go index 7639ba14e832..6d34c864774d 100644 --- a/beacon-chain/slasher/chunks.go +++ b/beacon-chain/slasher/chunks.go @@ -14,14 +14,6 @@ import ( "github.com/sirupsen/logrus" ) -// A struct encapsulating input arguments to -// functions used for attester slashing detection and -// loading, saving, and updating min/max span chunks. -type chunkUpdateArgs struct { - chunkIndex uint64 - currentEpoch primitives.Epoch -} - // Chunker defines a struct which represents a slice containing a chunk for K different validator's // min/max spans used for surround vote detection in slasher. The interface defines methods used to check // if an attestation is slashable for a validator index based on the contents of @@ -36,7 +28,8 @@ type Chunker interface { attestation *slashertypes.IndexedAttestationWrapper, ) (*ethpb.AttesterSlashing, error) Update( - args *chunkUpdateArgs, + chunkIndex uint64, + currentEpoch primitives.Epoch, validatorIndex primitives.ValidatorIndex, startEpoch, newTargetEpoch primitives.Epoch, @@ -384,21 +377,22 @@ func (m *MaxSpanChunksSlice) CheckSlashable( // to update. In our example, we stop at 2, which is still part of chunk 0, so no need // to jump to another min span chunks slice to perform updates. func (m *MinSpanChunksSlice) Update( - args *chunkUpdateArgs, + chunkIndex uint64, + currentEpoch primitives.Epoch, validatorIndex primitives.ValidatorIndex, startEpoch, newTargetEpoch primitives.Epoch, ) (keepGoing bool, err error) { // The lowest epoch we need to update. minEpoch := primitives.Epoch(0) - if args.currentEpoch > (m.params.historyLength - 1) { - minEpoch = args.currentEpoch - (m.params.historyLength - 1) + if currentEpoch > (m.params.historyLength - 1) { + minEpoch = currentEpoch - (m.params.historyLength - 1) } epochInChunk := startEpoch // We go down the chunk for the validator, updating every value starting at startEpoch down to minEpoch. // As long as the epoch, e, in the same chunk index and e >= minEpoch, we proceed with // a for loop. - for m.params.chunkIndex(epochInChunk) == args.chunkIndex && epochInChunk >= minEpoch { + for m.params.chunkIndex(epochInChunk) == chunkIndex && epochInChunk >= minEpoch { var chunkTarget primitives.Epoch chunkTarget, err = chunkDataAtEpoch(m.params, m.data, validatorIndex, epochInChunk) if err != nil { @@ -433,7 +427,8 @@ func (m *MinSpanChunksSlice) Update( // more about how update exactly works, refer to the detailed documentation for the Update function for // MinSpanChunksSlice. func (m *MaxSpanChunksSlice) Update( - args *chunkUpdateArgs, + chunkIndex uint64, + currentEpoch primitives.Epoch, validatorIndex primitives.ValidatorIndex, startEpoch, newTargetEpoch primitives.Epoch, @@ -442,7 +437,7 @@ func (m *MaxSpanChunksSlice) Update( // We go down the chunk for the validator, updating every value starting at startEpoch up to // and including the current epoch. As long as the epoch, e, is in the same chunk index and e <= currentEpoch, // we proceed with a for loop. - for m.params.chunkIndex(epochInChunk) == args.chunkIndex && epochInChunk <= args.currentEpoch { + for m.params.chunkIndex(epochInChunk) == chunkIndex && epochInChunk <= currentEpoch { var chunkTarget primitives.Epoch chunkTarget, err = chunkDataAtEpoch(m.params, m.data, validatorIndex, epochInChunk) if err != nil { @@ -465,7 +460,7 @@ func (m *MaxSpanChunksSlice) Update( } // If the epoch to update now lies beyond the current chunk, then // continue to the next chunk to update it. - keepGoing = epochInChunk <= args.currentEpoch + keepGoing = epochInChunk <= currentEpoch return } diff --git a/beacon-chain/slasher/chunks_test.go b/beacon-chain/slasher/chunks_test.go index 2385ae27dba7..a9519b8a2b77 100644 --- a/beacon-chain/slasher/chunks_test.go +++ b/beacon-chain/slasher/chunks_test.go @@ -127,14 +127,10 @@ func TestMinSpanChunksSlice_CheckSlashable(t *testing.T) { source = primitives.Epoch(1) target = primitives.Epoch(2) att = createAttestationWrapperEmptySig(t, source, target, nil, nil) - chunkIdx := uint64(0) + chunkIndex := uint64(0) startEpoch := target currentEpoch := target - args := &chunkUpdateArgs{ - chunkIndex: chunkIdx, - currentEpoch: currentEpoch, - } - _, err = chunk.Update(args, validatorIdx, startEpoch, target) + _, err = chunk.Update(chunkIndex, currentEpoch, validatorIdx, startEpoch, target) require.NoError(t, err) // Next up, we create a surrounding vote, but it should NOT be slashable @@ -209,14 +205,10 @@ func TestMaxSpanChunksSlice_CheckSlashable(t *testing.T) { source = primitives.Epoch(0) target = primitives.Epoch(3) att = createAttestationWrapperEmptySig(t, source, target, nil, nil) - chunkIdx := uint64(0) + chunkIndex := uint64(0) startEpoch := source currentEpoch := target - args := &chunkUpdateArgs{ - chunkIndex: chunkIdx, - currentEpoch: currentEpoch, - } - _, err = chunk.Update(args, validatorIdx, startEpoch, target) + _, err = chunk.Update(chunkIndex, currentEpoch, validatorIdx, startEpoch, target) require.NoError(t, err) // Next up, we create a surrounded vote, but it should NOT be slashable @@ -288,15 +280,11 @@ func TestMinSpanChunksSlice_Update_MultipleChunks(t *testing.T) { } chunk := EmptyMinSpanChunksSlice(params) target := primitives.Epoch(3) - chunkIdx := uint64(1) - validatorIdx := primitives.ValidatorIndex(0) + chunkIndex := uint64(1) + validatorIndex := primitives.ValidatorIndex(0) startEpoch := target currentEpoch := target - args := &chunkUpdateArgs{ - chunkIndex: chunkIdx, - currentEpoch: currentEpoch, - } - keepGoing, err := chunk.Update(args, validatorIdx, startEpoch, target) + keepGoing, err := chunk.Update(chunkIndex, currentEpoch, validatorIndex, startEpoch, target) require.NoError(t, err) // We should keep going! We still have to update the data for chunk index 0. @@ -306,15 +294,11 @@ func TestMinSpanChunksSlice_Update_MultipleChunks(t *testing.T) { // Now we update for chunk index 0. chunk = EmptyMinSpanChunksSlice(params) - chunkIdx = uint64(0) - validatorIdx = primitives.ValidatorIndex(0) + chunkIndex = uint64(0) + validatorIndex = primitives.ValidatorIndex(0) startEpoch = primitives.Epoch(1) currentEpoch = target - args = &chunkUpdateArgs{ - chunkIndex: chunkIdx, - currentEpoch: currentEpoch, - } - keepGoing, err = chunk.Update(args, validatorIdx, startEpoch, target) + keepGoing, err = chunk.Update(chunkIndex, currentEpoch, validatorIndex, startEpoch, target) require.NoError(t, err) require.Equal(t, false, keepGoing) want = []uint16{3, 2, math.MaxUint16, math.MaxUint16, math.MaxUint16, math.MaxUint16} @@ -329,15 +313,11 @@ func TestMaxSpanChunksSlice_Update_MultipleChunks(t *testing.T) { } chunk := EmptyMaxSpanChunksSlice(params) target := primitives.Epoch(3) - chunkIdx := uint64(0) + chunkIndex := uint64(0) validatorIdx := primitives.ValidatorIndex(0) startEpoch := primitives.Epoch(0) currentEpoch := target - args := &chunkUpdateArgs{ - chunkIndex: chunkIdx, - currentEpoch: currentEpoch, - } - keepGoing, err := chunk.Update(args, validatorIdx, startEpoch, target) + keepGoing, err := chunk.Update(chunkIndex, currentEpoch, validatorIdx, startEpoch, target) require.NoError(t, err) // We should keep going! We still have to update the data for chunk index 1. @@ -347,15 +327,11 @@ func TestMaxSpanChunksSlice_Update_MultipleChunks(t *testing.T) { // Now we update for chunk index 1. chunk = EmptyMaxSpanChunksSlice(params) - chunkIdx = uint64(1) + chunkIndex = uint64(1) validatorIdx = primitives.ValidatorIndex(0) startEpoch = primitives.Epoch(2) currentEpoch = target - args = &chunkUpdateArgs{ - chunkIndex: chunkIdx, - currentEpoch: currentEpoch, - } - keepGoing, err = chunk.Update(args, validatorIdx, startEpoch, target) + keepGoing, err = chunk.Update(chunkIndex, currentEpoch, validatorIdx, startEpoch, target) require.NoError(t, err) require.Equal(t, false, keepGoing) want = []uint16{1, 0, 0, 0, 0, 0} @@ -393,15 +369,11 @@ func TestMinSpanChunksSlice_Update_SingleChunk(t *testing.T) { } chunk := EmptyMinSpanChunksSlice(params) target := primitives.Epoch(1) - chunkIdx := uint64(0) + chunkIndex := uint64(0) validatorIdx := primitives.ValidatorIndex(0) startEpoch := target currentEpoch := target - args := &chunkUpdateArgs{ - chunkIndex: chunkIdx, - currentEpoch: currentEpoch, - } - keepGoing, err := chunk.Update(args, validatorIdx, startEpoch, target) + keepGoing, err := chunk.Update(chunkIndex, currentEpoch, validatorIdx, startEpoch, target) require.NoError(t, err) require.Equal(t, false, keepGoing) want := []uint16{1, 0, math.MaxUint16, math.MaxUint16, math.MaxUint16, math.MaxUint16} @@ -416,15 +388,11 @@ func TestMaxSpanChunksSlice_Update_SingleChunk(t *testing.T) { } chunk := EmptyMaxSpanChunksSlice(params) target := primitives.Epoch(3) - chunkIdx := uint64(0) + chunkIndex := uint64(0) validatorIdx := primitives.ValidatorIndex(0) startEpoch := primitives.Epoch(0) currentEpoch := target - args := &chunkUpdateArgs{ - chunkIndex: chunkIdx, - currentEpoch: currentEpoch, - } - keepGoing, err := chunk.Update(args, validatorIdx, startEpoch, target) + keepGoing, err := chunk.Update(chunkIndex, currentEpoch, validatorIdx, startEpoch, target) require.NoError(t, err) require.Equal(t, false, keepGoing) want := []uint16{3, 2, 1, 0, 0, 0, 0, 0} diff --git a/beacon-chain/slasher/detect_attestations.go b/beacon-chain/slasher/detect_attestations.go index 2b3f89b5d695..3acef5020a6e 100644 --- a/beacon-chain/slasher/detect_attestations.go +++ b/beacon-chain/slasher/detect_attestations.go @@ -458,10 +458,8 @@ func (s *Service) applyAttestationForValidator( } keepGoing, err := chunk.Update( - &chunkUpdateArgs{ - chunkIndex: chunkIndex, - currentEpoch: currentEpoch, - }, + chunkIndex, + currentEpoch, validatorIndex, startEpoch, targetEpoch, From 4c90b2c3c9720f8018963a14d4f14d31c5aa997f Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Tue, 13 Feb 2024 19:13:54 +0100 Subject: [PATCH 04/11] `detect_attestations.go`: Reduce abstraction layers. --- beacon-chain/slasher/detect_attestations.go | 146 +++++++++----------- 1 file changed, 64 insertions(+), 82 deletions(-) diff --git a/beacon-chain/slasher/detect_attestations.go b/beacon-chain/slasher/detect_attestations.go index 3acef5020a6e..aaf01b1067ff 100644 --- a/beacon-chain/slasher/detect_attestations.go +++ b/beacon-chain/slasher/detect_attestations.go @@ -20,14 +20,11 @@ import ( func (s *Service) checkSlashableAttestations( ctx context.Context, currentEpoch primitives.Epoch, atts []*slashertypes.IndexedAttestationWrapper, ) (map[[fieldparams.RootLength]byte]*ethpb.AttesterSlashing, error) { - totalStart := time.Now() + start := time.Now() slashings := map[[fieldparams.RootLength]byte]*ethpb.AttesterSlashing{} // Double votes - log.Debug("Checking for double votes") - start := time.Now() - doubleVoteSlashings, err := s.checkDoubleVotes(ctx, atts) if err != nil { return nil, errors.Wrap(err, "could not check slashable double votes") @@ -47,40 +44,20 @@ func (s *Service) checkSlashableAttestations( } // Surrounding / surrounded votes - groupedByValidatorChunkIndexAtts := s.groupByValidatorChunkIndex(atts) - log.WithField("numBatches", len(groupedByValidatorChunkIndexAtts)).Debug("Batching attestations by validator chunk index") - groupsCount := len(groupedByValidatorChunkIndexAtts) - - surroundStart := time.Now() - - for validatorChunkIndex, attestations := range groupedByValidatorChunkIndexAtts { - surroundSlashings, err := s.checkSurrounds(ctx, attestations, currentEpoch, validatorChunkIndex) - if err != nil { - return nil, err - } - - for root, slashing := range surroundSlashings { - slashings[root] = slashing - } + surroundSlashings, err := s.checkSurroundVotes(ctx, atts, currentEpoch) + if err != nil { + return nil, errors.Wrap(err, "could not check slashable surround votes") + } - indices := s.params.validatorIndexesInChunk(validatorChunkIndex) - for _, idx := range indices { - s.latestEpochWrittenForValidator[idx] = currentEpoch - } + for root, slashing := range surroundSlashings { + slashings[root] = slashing } - surroundElapsed := time.Since(surroundStart) - totalElapsed := time.Since(totalStart) + elapsed := time.Since(start) fields := logrus.Fields{ - "numAttestations": len(atts), - "numBatchesByValidatorChunkIndex": groupsCount, - "elapsed": totalElapsed, - } - - if groupsCount > 0 { - avgProcessingTimePerBatch := surroundElapsed / time.Duration(groupsCount) - fields["avgBatchProcessingTime"] = avgProcessingTimePerBatch + "numAttestations": len(atts), + "elapsed": elapsed, } log.WithFields(fields).Info("Done checking slashable attestations") @@ -92,70 +69,75 @@ func (s *Service) checkSlashableAttestations( return slashings, nil } -// Given a list of attestations all corresponding to a validator chunk index as well -// as the current epoch in time, we perform slashing detection. -// The process is as follows given a list of attestations: -// -// 1. Group the attestations by chunk index. -// 2. Update the min and max spans for those grouped attestations, check if any slashings are -// found in the process -// 3. Update the latest written epoch for all validators involved to the current epoch. -// -// This function performs a lot of critical actions and is split into smaller helpers for cleanliness. -func (s *Service) checkSurrounds( +// Check for surrounding and surrounded votes in our database given a list of incoming attestations. +func (s *Service) checkSurroundVotes( ctx context.Context, - attestations []*slashertypes.IndexedAttestationWrapper, + attWrappers []*slashertypes.IndexedAttestationWrapper, currentEpoch primitives.Epoch, - validatorChunkIndex uint64, ) (map[[fieldparams.RootLength]byte]*ethpb.AttesterSlashing, error) { - // Map of updated chunks by chunk index, which will be saved at the end. - updatedMinChunks, updatedMaxChunks := map[uint64]Chunker{}, map[uint64]Chunker{} + slashings := map[[fieldparams.RootLength]byte]*ethpb.AttesterSlashing{} - groupedByChunkIndexAtts := s.groupByChunkIndex(attestations) - validatorIndexes := s.params.validatorIndexesInChunk(validatorChunkIndex) + // Group attestation wrappers by validator chunk index. + attWrappersByValidatorChunkIndex := s.groupByValidatorChunkIndex(attWrappers) - slashings := map[[fieldparams.RootLength]byte]*ethpb.AttesterSlashing{} + for validatorChunkIndex, attWrappers := range attWrappersByValidatorChunkIndex { + // Map of updated chunks by chunk index, which will be saved at the end. + updatedMinChunks, updatedMaxChunks := map[uint64]Chunker{}, map[uint64]Chunker{} + + // Get all validator indexes corresponding to this validator chunk index. + validatorIndexes := s.params.validatorIndexesInChunk(validatorChunkIndex) + + // For all validators in the chunk corresponding to this validator chunk index, + // update `updatedMinChunks` and `updatedMaxChunks` with default values up to the current epoch. + for _, validatorIndex := range validatorIndexes { + // This function modifies `updatedMinChunks` in place. + if err := s.epochUpdateForValidator(ctx, updatedMinChunks, validatorChunkIndex, slashertypes.MinSpan, currentEpoch, validatorIndex); err != nil { + return nil, errors.Wrapf(err, "could not update validator index for min chunks %d", validatorIndex) + } - // Update epoch for validators. - for _, validatorIndex := range validatorIndexes { - // This function modifies `updatedMinChunks` in place. - if err := s.epochUpdateForValidator(ctx, updatedMinChunks, validatorChunkIndex, slashertypes.MinSpan, currentEpoch, validatorIndex); err != nil { - return nil, errors.Wrapf(err, "could not update validator index for min chunks %d", validatorIndex) + // This function modifies `updatedMaxChunks` in place. + if err := s.epochUpdateForValidator(ctx, updatedMaxChunks, validatorChunkIndex, slashertypes.MaxSpan, currentEpoch, validatorIndex); err != nil { + return nil, errors.Wrapf(err, "could not update validator index for max chunks %d", validatorIndex) + } } - // This function modifies `updatedMaxChunks` in place. - if err := s.epochUpdateForValidator(ctx, updatedMaxChunks, validatorChunkIndex, slashertypes.MaxSpan, currentEpoch, validatorIndex); err != nil { - return nil, errors.Wrapf(err, "could not update validator index for max chunks %d", validatorIndex) + // Group (already grouped by validator chunk index) attestation wrappers by chunk index. + attWrappersByChunkIndex := s.groupByChunkIndex(attWrappers) + + // Check for surrounding votes. + surroundingSlashings, err := s.updateSpans(ctx, updatedMinChunks, attWrappersByChunkIndex, slashertypes.MinSpan, validatorChunkIndex, currentEpoch) + if err != nil { + return nil, errors.Wrapf(err, "could not update min attestation spans for validator chunk index %d", validatorChunkIndex) } - } - // Check for surrounding votes. - surroundingSlashings, err := s.updateSpans(ctx, updatedMinChunks, groupedByChunkIndexAtts, slashertypes.MinSpan, validatorChunkIndex, currentEpoch) - if err != nil { - return nil, errors.Wrapf(err, "could not update min attestation spans for validator chunk index %d", validatorChunkIndex) - } + for root, slashing := range surroundingSlashings { + slashings[root] = slashing + } - for root, slashing := range surroundingSlashings { - slashings[root] = slashing - } + // Check for surrounded votes. + surroundedSlashings, err := s.updateSpans(ctx, updatedMaxChunks, attWrappersByChunkIndex, slashertypes.MaxSpan, validatorChunkIndex, currentEpoch) + if err != nil { + return nil, errors.Wrapf(err, "could not update max attestation spans for validator chunk index %d", validatorChunkIndex) + } - // Check for surrounded votes. - surroundedSlashings, err := s.updateSpans(ctx, updatedMaxChunks, groupedByChunkIndexAtts, slashertypes.MaxSpan, validatorChunkIndex, currentEpoch) - if err != nil { - return nil, errors.Wrapf(err, "could not update max attestation spans for validator chunk index %d", validatorChunkIndex) - } + for root, slashing := range surroundedSlashings { + slashings[root] = slashing + } - for root, slashing := range surroundedSlashings { - slashings[root] = slashing - } + // Save updated chunks into the database. + if err := s.saveUpdatedChunks(ctx, updatedMinChunks, slashertypes.MinSpan, validatorChunkIndex); err != nil { + return nil, errors.Wrap(err, "could not save chunks for min spans") + } - // Save updated chunks into the database. - if err := s.saveUpdatedChunks(ctx, updatedMinChunks, slashertypes.MinSpan, validatorChunkIndex); err != nil { - return nil, errors.Wrap(err, "could not save chunks for min spans") - } + if err := s.saveUpdatedChunks(ctx, updatedMaxChunks, slashertypes.MaxSpan, validatorChunkIndex); err != nil { + return nil, errors.Wrap(err, "could not save chunks for max spans") + } - if err := s.saveUpdatedChunks(ctx, updatedMaxChunks, slashertypes.MaxSpan, validatorChunkIndex); err != nil { - return nil, errors.Wrap(err, "could not save chunks for max spans") + // Update the latest written epoch for all validators involved to the current chunk. + indices := s.params.validatorIndexesInChunk(validatorChunkIndex) + for _, idx := range indices { + s.latestEpochWrittenForValidator[idx] = currentEpoch + } } return slashings, nil From 8622e42fc4d0e55d27a997a9510bc8661e4d7743 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Tue, 13 Feb 2024 23:10:09 +0100 Subject: [PATCH 05/11] `loadAndUpdateChunks`: Change arguments order. --- beacon-chain/slasher/detect_attestations.go | 14 +++++++------- beacon-chain/slasher/detect_attestations_test.go | 8 ++++---- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/beacon-chain/slasher/detect_attestations.go b/beacon-chain/slasher/detect_attestations.go index aaf01b1067ff..9dbb9b7f6beb 100644 --- a/beacon-chain/slasher/detect_attestations.go +++ b/beacon-chain/slasher/detect_attestations.go @@ -91,12 +91,12 @@ func (s *Service) checkSurroundVotes( // update `updatedMinChunks` and `updatedMaxChunks` with default values up to the current epoch. for _, validatorIndex := range validatorIndexes { // This function modifies `updatedMinChunks` in place. - if err := s.epochUpdateForValidator(ctx, updatedMinChunks, validatorChunkIndex, slashertypes.MinSpan, currentEpoch, validatorIndex); err != nil { + if err := s.loadAndUpdateChunks(ctx, updatedMinChunks, slashertypes.MinSpan, currentEpoch, validatorChunkIndex, validatorIndex); err != nil { return nil, errors.Wrapf(err, "could not update validator index for min chunks %d", validatorIndex) } // This function modifies `updatedMaxChunks` in place. - if err := s.epochUpdateForValidator(ctx, updatedMaxChunks, validatorChunkIndex, slashertypes.MaxSpan, currentEpoch, validatorIndex); err != nil { + if err := s.loadAndUpdateChunks(ctx, updatedMaxChunks, slashertypes.MaxSpan, currentEpoch, validatorChunkIndex, validatorIndex); err != nil { return nil, errors.Wrapf(err, "could not update validator index for max chunks %d", validatorIndex) } } @@ -252,12 +252,12 @@ func (s *Service) checkDoubleVotes( // we update entries in the slashing spans with their neutral element for epochs N+1 to N+4. // This also puts any loaded chunks in a map used as a cache for further processing and minimizing // database reads later on. -func (s *Service) epochUpdateForValidator( +func (s *Service) loadAndUpdateChunks( ctx context.Context, - updatedChunks map[uint64]Chunker, - validatorChunkIndex uint64, + chunkByChunkIndex map[uint64]Chunker, chunkKind slashertypes.ChunkKind, currentEpoch primitives.Epoch, + validatorChunkIndex uint64, validatorIndex primitives.ValidatorIndex, ) error { var err error @@ -270,7 +270,7 @@ func (s *Service) epochUpdateForValidator( for latestEpochWritten <= currentEpoch { chunkIndex := s.params.chunkIndex(latestEpochWritten) - currentChunk, ok := updatedChunks[chunkIndex] + currentChunk, ok := chunkByChunkIndex[chunkIndex] if !ok { currentChunk, err = s.getChunkFromDatabase(ctx, chunkKind, validatorChunkIndex, chunkIndex) if err != nil { @@ -289,7 +289,7 @@ func (s *Service) epochUpdateForValidator( return err } - updatedChunks[chunkIndex] = currentChunk + chunkByChunkIndex[chunkIndex] = currentChunk latestEpochWritten++ } } diff --git a/beacon-chain/slasher/detect_attestations_test.go b/beacon-chain/slasher/detect_attestations_test.go index 34782d93e901..dbab3e9d002b 100644 --- a/beacon-chain/slasher/detect_attestations_test.go +++ b/beacon-chain/slasher/detect_attestations_test.go @@ -860,12 +860,12 @@ func Test_epochUpdateForValidators(t *testing.T) { // no chunks to be loaded nor updated to. updatedChunks := make(map[uint64]Chunker) for _, valIdx := range validators { - err := s.epochUpdateForValidator( + err := s.loadAndUpdateChunks( ctx, updatedChunks, - 0, // validatorChunkIndex slashertypes.MinSpan, currentEpoch, + 0, // validatorChunkIndex valIdx, ) require.NoError(t, err) @@ -891,12 +891,12 @@ func Test_epochUpdateForValidators(t *testing.T) { // safe contained in chunk index 1. updatedChunks := make(map[uint64]Chunker) for _, valIdx := range validators { - err := s.epochUpdateForValidator( + err := s.loadAndUpdateChunks( ctx, updatedChunks, - 0, // validatorChunkIndex, slashertypes.MinSpan, currentEpoch, + 0, // validatorChunkIndex, valIdx, ) require.NoError(t, err) From 22ea44f685a5f5c5b3aeca7d3ce18381e5e8b351 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Wed, 14 Feb 2024 00:37:07 +0100 Subject: [PATCH 06/11] `updatedChunkByChunkIndex`: Update all known validators in the chunk. --- beacon-chain/slasher/detect_attestations.go | 106 +++++++++--------- .../slasher/detect_attestations_test.go | 38 ++----- 2 files changed, 60 insertions(+), 84 deletions(-) diff --git a/beacon-chain/slasher/detect_attestations.go b/beacon-chain/slasher/detect_attestations.go index 9dbb9b7f6beb..04bba33bd4c1 100644 --- a/beacon-chain/slasher/detect_attestations.go +++ b/beacon-chain/slasher/detect_attestations.go @@ -81,31 +81,21 @@ func (s *Service) checkSurroundVotes( attWrappersByValidatorChunkIndex := s.groupByValidatorChunkIndex(attWrappers) for validatorChunkIndex, attWrappers := range attWrappersByValidatorChunkIndex { - // Map of updated chunks by chunk index, which will be saved at the end. - updatedMinChunks, updatedMaxChunks := map[uint64]Chunker{}, map[uint64]Chunker{} - - // Get all validator indexes corresponding to this validator chunk index. - validatorIndexes := s.params.validatorIndexesInChunk(validatorChunkIndex) - - // For all validators in the chunk corresponding to this validator chunk index, - // update `updatedMinChunks` and `updatedMaxChunks` with default values up to the current epoch. - for _, validatorIndex := range validatorIndexes { - // This function modifies `updatedMinChunks` in place. - if err := s.loadAndUpdateChunks(ctx, updatedMinChunks, slashertypes.MinSpan, currentEpoch, validatorChunkIndex, validatorIndex); err != nil { - return nil, errors.Wrapf(err, "could not update validator index for min chunks %d", validatorIndex) - } + minChunkByChunkIndex, err := s.updatedChunkByChunkIndex(ctx, slashertypes.MinSpan, currentEpoch, validatorChunkIndex) + if err != nil { + return nil, errors.Wrap(err, "could not update updatedMinChunks") + } - // This function modifies `updatedMaxChunks` in place. - if err := s.loadAndUpdateChunks(ctx, updatedMaxChunks, slashertypes.MaxSpan, currentEpoch, validatorChunkIndex, validatorIndex); err != nil { - return nil, errors.Wrapf(err, "could not update validator index for max chunks %d", validatorIndex) - } + maxChunkByChunkIndex, err := s.updatedChunkByChunkIndex(ctx, slashertypes.MaxSpan, currentEpoch, validatorChunkIndex) + if err != nil { + return nil, errors.Wrap(err, "could not update updatedMaxChunks") } // Group (already grouped by validator chunk index) attestation wrappers by chunk index. attWrappersByChunkIndex := s.groupByChunkIndex(attWrappers) // Check for surrounding votes. - surroundingSlashings, err := s.updateSpans(ctx, updatedMinChunks, attWrappersByChunkIndex, slashertypes.MinSpan, validatorChunkIndex, currentEpoch) + surroundingSlashings, err := s.updateSpans(ctx, minChunkByChunkIndex, attWrappersByChunkIndex, slashertypes.MinSpan, validatorChunkIndex, currentEpoch) if err != nil { return nil, errors.Wrapf(err, "could not update min attestation spans for validator chunk index %d", validatorChunkIndex) } @@ -115,7 +105,7 @@ func (s *Service) checkSurroundVotes( } // Check for surrounded votes. - surroundedSlashings, err := s.updateSpans(ctx, updatedMaxChunks, attWrappersByChunkIndex, slashertypes.MaxSpan, validatorChunkIndex, currentEpoch) + surroundedSlashings, err := s.updateSpans(ctx, maxChunkByChunkIndex, attWrappersByChunkIndex, slashertypes.MaxSpan, validatorChunkIndex, currentEpoch) if err != nil { return nil, errors.Wrapf(err, "could not update max attestation spans for validator chunk index %d", validatorChunkIndex) } @@ -125,11 +115,11 @@ func (s *Service) checkSurroundVotes( } // Save updated chunks into the database. - if err := s.saveUpdatedChunks(ctx, updatedMinChunks, slashertypes.MinSpan, validatorChunkIndex); err != nil { + if err := s.saveUpdatedChunks(ctx, minChunkByChunkIndex, slashertypes.MinSpan, validatorChunkIndex); err != nil { return nil, errors.Wrap(err, "could not save chunks for min spans") } - if err := s.saveUpdatedChunks(ctx, updatedMaxChunks, slashertypes.MaxSpan, validatorChunkIndex); err != nil { + if err := s.saveUpdatedChunks(ctx, maxChunkByChunkIndex, slashertypes.MaxSpan, validatorChunkIndex); err != nil { return nil, errors.Wrap(err, "could not save chunks for max spans") } @@ -246,55 +236,63 @@ func (s *Service) checkDoubleVotes( return slashings, nil } -// This function updates `updatedChunks`, representing the slashing spans for a given validator for -// a change in epoch since the last epoch we have recorded for the validator. -// For example, if the last epoch a validator has written is N, and the current epoch is N+5, -// we update entries in the slashing spans with their neutral element for epochs N+1 to N+4. -// This also puts any loaded chunks in a map used as a cache for further processing and minimizing -// database reads later on. -func (s *Service) loadAndUpdateChunks( +// updatedChunkByChunkIndex loads the chunks from the database for validators corresponding to +// the `validatorChunkIndex` and which have an entry in `s.latestEpochWrittenForValidator`. +// It then updates the chunks with the neutral element for corresponding validators from +// the latest epoch written to the current epoch. +// A mapping between chunk index and chunk is returned to the caller. +func (s *Service) updatedChunkByChunkIndex( ctx context.Context, - chunkByChunkIndex map[uint64]Chunker, chunkKind slashertypes.ChunkKind, currentEpoch primitives.Epoch, validatorChunkIndex uint64, - validatorIndex primitives.ValidatorIndex, -) error { +) (map[uint64]Chunker, error) { var err error - latestEpochWritten, ok := s.latestEpochWrittenForValidator[validatorIndex] - if !ok { - return nil - } - - for latestEpochWritten <= currentEpoch { - chunkIndex := s.params.chunkIndex(latestEpochWritten) + chunkByChunkIndex := map[uint64]Chunker{} - currentChunk, ok := chunkByChunkIndex[chunkIndex] + validatorIndexes := s.params.validatorIndexesInChunk(validatorChunkIndex) + for _, validatorIndex := range validatorIndexes { + // Retrieve the latest epoch written for the validator. + latestEpochWritten, ok := s.latestEpochWrittenForValidator[validatorIndex] if !ok { - currentChunk, err = s.getChunkFromDatabase(ctx, chunkKind, validatorChunkIndex, chunkIndex) - if err != nil { - return errors.Wrap(err, "could not get chunk") - } + continue } - for s.params.chunkIndex(latestEpochWritten) == chunkIndex && latestEpochWritten <= currentEpoch { - if err := setChunkRawDistance( - s.params, - currentChunk.Chunk(), - validatorIndex, - latestEpochWritten, - currentChunk.NeutralElement(), - ); err != nil { - return err + for latestEpochWritten <= currentEpoch { + // Get the chunk index for the latest epoch written. + chunkIndex := s.params.chunkIndex(latestEpochWritten) + + // Get the chunk corresponding to the chunk index from the `chunkByChunkIndex` map. + currentChunk, ok := chunkByChunkIndex[chunkIndex] + if !ok { + // If the chunk is not in the map, retrieve it from the database. + currentChunk, err = s.getChunkFromDatabase(ctx, chunkKind, validatorChunkIndex, chunkIndex) + if err != nil { + return nil, errors.Wrap(err, "could not get chunk") + } + } + + // Update the current chunk with the neutral element for the validator index for the latest epoch written. + for s.params.chunkIndex(latestEpochWritten) == chunkIndex && latestEpochWritten <= currentEpoch { + if err := setChunkRawDistance( + s.params, + currentChunk.Chunk(), + validatorIndex, + latestEpochWritten, + currentChunk.NeutralElement(), + ); err != nil { + return nil, err + } + + latestEpochWritten++ } chunkByChunkIndex[chunkIndex] = currentChunk - latestEpochWritten++ } } - return nil + return chunkByChunkIndex, nil } // Updates spans and detects any slashable attester offenses along the way. diff --git a/beacon-chain/slasher/detect_attestations_test.go b/beacon-chain/slasher/detect_attestations_test.go index dbab3e9d002b..bbb063047229 100644 --- a/beacon-chain/slasher/detect_attestations_test.go +++ b/beacon-chain/slasher/detect_attestations_test.go @@ -849,34 +849,20 @@ func Test_epochUpdateForValidators(t *testing.T) { } t.Run("no update if no latest written epoch", func(t *testing.T) { - validators := []primitives.ValidatorIndex{ - 1, 2, - } currentEpoch := primitives.Epoch(3) // No last written epoch for both validators. s.latestEpochWrittenForValidator = map[primitives.ValidatorIndex]primitives.Epoch{} // Because the validators have no recorded latest epoch written, we expect // no chunks to be loaded nor updated to. - updatedChunks := make(map[uint64]Chunker) - for _, valIdx := range validators { - err := s.loadAndUpdateChunks( - ctx, - updatedChunks, - slashertypes.MinSpan, - currentEpoch, - 0, // validatorChunkIndex - valIdx, - ) - require.NoError(t, err) - } + updatedChunks, err := s.updatedChunkByChunkIndex( + ctx, slashertypes.MinSpan, currentEpoch, 0, // validatorChunkIndex + ) + require.NoError(t, err) require.Equal(t, 0, len(updatedChunks)) }) t.Run("update from latest written epoch", func(t *testing.T) { - validators := []primitives.ValidatorIndex{ - 1, 2, - } currentEpoch := primitives.Epoch(3) // Set the latest written epoch for validators to current epoch - 1. @@ -889,18 +875,10 @@ func Test_epochUpdateForValidators(t *testing.T) { // Because the latest written epoch for the input validators is == 2, we expect // that we will update all epochs from 2 up to 3 (the current epoch). This is all // safe contained in chunk index 1. - updatedChunks := make(map[uint64]Chunker) - for _, valIdx := range validators { - err := s.loadAndUpdateChunks( - ctx, - updatedChunks, - slashertypes.MinSpan, - currentEpoch, - 0, // validatorChunkIndex, - valIdx, - ) - require.NoError(t, err) - } + updatedChunks, err := s.updatedChunkByChunkIndex( + ctx, slashertypes.MinSpan, currentEpoch, 0, // validatorChunkIndex, + ) + require.NoError(t, err) require.Equal(t, 1, len(updatedChunks)) _, ok := updatedChunks[1] require.Equal(t, true, ok) From 0e893606d059f2c924386bd80aef25328a2f13a4 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Wed, 14 Feb 2024 09:17:56 +0100 Subject: [PATCH 07/11] `LastEpochWrittenForValidators`: Avoid avoidable `for`loop. --- beacon-chain/db/slasherkv/slasher.go | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/beacon-chain/db/slasherkv/slasher.go b/beacon-chain/db/slasherkv/slasher.go index 8a73f2b398d4..7d92be7fb8c0 100644 --- a/beacon-chain/db/slasherkv/slasher.go +++ b/beacon-chain/db/slasherkv/slasher.go @@ -34,18 +34,15 @@ func (s *Store) LastEpochWrittenForValidators( defer span.End() attestedEpochs := make([]*slashertypes.AttestedEpochForValidator, 0) - encodedIndexes := make([][]byte, len(validatorIndexes)) - - for i, validatorIndex := range validatorIndexes { - encodedIndexes[i] = encodeValidatorIndex(validatorIndex) - } err := s.db.View(func(tx *bolt.Tx) error { bkt := tx.Bucket(attestedEpochsByValidator) - for i, encodedIndex := range encodedIndexes { + for _, validatorIndex := range validatorIndexes { var epoch primitives.Epoch + encodedIndex := encodeValidatorIndex(validatorIndex) + if epochBytes := bkt.Get(encodedIndex); epochBytes != nil { if err := epoch.UnmarshalSSZ(epochBytes); err != nil { return err @@ -53,7 +50,7 @@ func (s *Store) LastEpochWrittenForValidators( } attestedEpoch := &slashertypes.AttestedEpochForValidator{ - ValidatorIndex: validatorIndexes[i], + ValidatorIndex: validatorIndex, Epoch: epoch, } From d36fb77ba83913acfa1ae7bfe07435cb857c38e4 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Wed, 14 Feb 2024 16:38:09 +0100 Subject: [PATCH 08/11] `chunks.go`: Ensure implementations respect the interface. --- beacon-chain/slasher/chunks.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/beacon-chain/slasher/chunks.go b/beacon-chain/slasher/chunks.go index 6d34c864774d..32fc9a858369 100644 --- a/beacon-chain/slasher/chunks.go +++ b/beacon-chain/slasher/chunks.go @@ -81,6 +81,8 @@ type MinSpanChunksSlice struct { data []uint16 } +var _ Chunker = (*MinSpanChunksSlice)(nil) + // MaxSpanChunksSlice represents the same data structure as MinSpanChunksSlice however // keeps track of validator max spans for slashing detection instead. type MaxSpanChunksSlice struct { @@ -88,6 +90,8 @@ type MaxSpanChunksSlice struct { data []uint16 } +var _ Chunker = (*MaxSpanChunksSlice)(nil) + // EmptyMinSpanChunksSlice initializes a min span chunk of length C*K for // C = chunkSize and K = validatorChunkSize filled with neutral elements. // For min spans, the neutral element is `undefined`, represented by MaxUint16. From ab7b67cb17b82f0d79f6354a2d8490790dbe67cb Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Wed, 14 Feb 2024 16:39:56 +0100 Subject: [PATCH 09/11] `LastEpochWrittenForValidators`: Stop considering lack of epoch as genesis epoch. --- beacon-chain/db/slasherkv/slasher.go | 15 +++++++++------ beacon-chain/db/slasherkv/slasher_test.go | 13 +++++-------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/beacon-chain/db/slasherkv/slasher.go b/beacon-chain/db/slasherkv/slasher.go index 7d92be7fb8c0..9a523a93eabc 100644 --- a/beacon-chain/db/slasherkv/slasher.go +++ b/beacon-chain/db/slasherkv/slasher.go @@ -39,14 +39,17 @@ func (s *Store) LastEpochWrittenForValidators( bkt := tx.Bucket(attestedEpochsByValidator) for _, validatorIndex := range validatorIndexes { - var epoch primitives.Epoch - encodedIndex := encodeValidatorIndex(validatorIndex) - if epochBytes := bkt.Get(encodedIndex); epochBytes != nil { - if err := epoch.UnmarshalSSZ(epochBytes); err != nil { - return err - } + epochBytes := bkt.Get(encodedIndex) + if epochBytes == nil { + // If there is no epoch for this validator, skip to the next validator. + continue + } + + var epoch primitives.Epoch + if err := epoch.UnmarshalSSZ(epochBytes); err != nil { + return err } attestedEpoch := &slashertypes.AttestedEpochForValidator{ diff --git a/beacon-chain/db/slasherkv/slasher_test.go b/beacon-chain/db/slasherkv/slasher_test.go index ad1282e0a055..aaa2dfa39115 100644 --- a/beacon-chain/db/slasherkv/slasher_test.go +++ b/beacon-chain/db/slasherkv/slasher_test.go @@ -56,19 +56,16 @@ func TestStore_LastEpochWrittenForValidators(t *testing.T) { epochs[i] = primitives.Epoch(i) } - attestedEpochs, err := beaconDB.LastEpochWrittenForValidators(ctx, indices) - require.NoError(t, err) - require.Equal(t, true, len(attestedEpochs) == len(indices)) - - for _, item := range attestedEpochs { - require.Equal(t, primitives.Epoch(0), item.Epoch) - } - epochsByValidator := make(map[primitives.ValidatorIndex]primitives.Epoch, validatorsCount) for i := 0; i < validatorsCount; i++ { epochsByValidator[indices[i]] = epochs[i] } + // No epochs written for any validators, should return empty list. + attestedEpochs, err := beaconDB.LastEpochWrittenForValidators(ctx, indices) + require.NoError(t, err) + require.Equal(t, 0, len(attestedEpochs)) + err = beaconDB.SaveLastEpochsWrittenForValidators(ctx, epochsByValidator) require.NoError(t, err) From 112ab9af5a79682ba881d0f35f0006e073233d70 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Wed, 14 Feb 2024 16:41:05 +0100 Subject: [PATCH 10/11] `updatedChunkByChunkIndex`: Don't update latest updated epoch. And add a bunch of tests. --- beacon-chain/slasher/detect_attestations.go | 19 +- .../slasher/detect_attestations_test.go | 287 +++++++++++++++--- 2 files changed, 257 insertions(+), 49 deletions(-) diff --git a/beacon-chain/slasher/detect_attestations.go b/beacon-chain/slasher/detect_attestations.go index 04bba33bd4c1..4f59b461b07c 100644 --- a/beacon-chain/slasher/detect_attestations.go +++ b/beacon-chain/slasher/detect_attestations.go @@ -237,9 +237,9 @@ func (s *Service) checkDoubleVotes( } // updatedChunkByChunkIndex loads the chunks from the database for validators corresponding to -// the `validatorChunkIndex` and which have an entry in `s.latestEpochWrittenForValidator`. +// the `validatorChunkIndex`. // It then updates the chunks with the neutral element for corresponding validators from -// the latest epoch written to the current epoch. +// the epoch just after the latest epoch written to the current epoch. // A mapping between chunk index and chunk is returned to the caller. func (s *Service) updatedChunkByChunkIndex( ctx context.Context, @@ -255,13 +255,16 @@ func (s *Service) updatedChunkByChunkIndex( for _, validatorIndex := range validatorIndexes { // Retrieve the latest epoch written for the validator. latestEpochWritten, ok := s.latestEpochWrittenForValidator[validatorIndex] + + // Start from the epoch just after the latest epoch written. + epochToWrite := latestEpochWritten + 1 if !ok { - continue + epochToWrite = 0 } - for latestEpochWritten <= currentEpoch { + for epochToWrite <= currentEpoch { // Get the chunk index for the latest epoch written. - chunkIndex := s.params.chunkIndex(latestEpochWritten) + chunkIndex := s.params.chunkIndex(epochToWrite) // Get the chunk corresponding to the chunk index from the `chunkByChunkIndex` map. currentChunk, ok := chunkByChunkIndex[chunkIndex] @@ -274,18 +277,18 @@ func (s *Service) updatedChunkByChunkIndex( } // Update the current chunk with the neutral element for the validator index for the latest epoch written. - for s.params.chunkIndex(latestEpochWritten) == chunkIndex && latestEpochWritten <= currentEpoch { + for s.params.chunkIndex(epochToWrite) == chunkIndex && epochToWrite <= currentEpoch { if err := setChunkRawDistance( s.params, currentChunk.Chunk(), validatorIndex, - latestEpochWritten, + epochToWrite, currentChunk.NeutralElement(), ); err != nil { return nil, err } - latestEpochWritten++ + epochToWrite++ } chunkByChunkIndex[chunkIndex] = currentChunk diff --git a/beacon-chain/slasher/detect_attestations_test.go b/beacon-chain/slasher/detect_attestations_test.go index bbb063047229..3d2b78de6509 100644 --- a/beacon-chain/slasher/detect_attestations_test.go +++ b/beacon-chain/slasher/detect_attestations_test.go @@ -834,55 +834,260 @@ func Test_processQueuedAttestations_OverlappingChunkIndices(t *testing.T) { } func Test_epochUpdateForValidators(t *testing.T) { - ctx := context.Background() - slasherDB := dbtest.SetupSlasherDB(t) + neutralMin, neutralMax := uint16(65535), uint16(0) + + testCases := []struct { + name string + chunkSize uint64 + validatorChunkSize uint64 + historyLength primitives.Epoch + currentEpoch primitives.Epoch + validatorChunkIndex uint64 + latestUpdatedEpochByValidatorIndex map[primitives.ValidatorIndex]primitives.Epoch + initialMinChunkByChunkIndex map[uint64][]uint16 + expectedMinChunkByChunkIndex map[uint64][]uint16 + initialMaxChunkByChunkIndex map[uint64][]uint16 + expectedMaxChunkByChunkIndex map[uint64][]uint16 + }{ + { + name: "start with no data - first chunk", + chunkSize: 4, + validatorChunkSize: 2, + historyLength: 8, + currentEpoch: 2, + validatorChunkIndex: 21, + latestUpdatedEpochByValidatorIndex: nil, + initialMinChunkByChunkIndex: nil, + expectedMinChunkByChunkIndex: map[uint64][]uint16{ + // | validator 42 | validator 43 | + 0: {neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin}, + }, + initialMaxChunkByChunkIndex: nil, + expectedMaxChunkByChunkIndex: map[uint64][]uint16{ + // | validator 42 | validator 43 | + 0: {neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax}, + }, + }, + { + name: "start with no data - second chunk", + chunkSize: 4, + validatorChunkSize: 2, + historyLength: 8, + currentEpoch: 5, + validatorChunkIndex: 21, + latestUpdatedEpochByValidatorIndex: nil, + initialMinChunkByChunkIndex: nil, + expectedMinChunkByChunkIndex: map[uint64][]uint16{ + // | validator 42 | validator 43 | + 0: {neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin}, + 1: {neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin}, + }, + initialMaxChunkByChunkIndex: nil, + expectedMaxChunkByChunkIndex: map[uint64][]uint16{ + // | validator 42 | validator 43 | + 0: {neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax}, + 1: {neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax}, + }, + }, + { + name: "start with some data - first chunk", + chunkSize: 4, + validatorChunkSize: 2, + historyLength: 8, + currentEpoch: 2, + validatorChunkIndex: 21, + latestUpdatedEpochByValidatorIndex: map[primitives.ValidatorIndex]primitives.Epoch{42: 0, 43: 1}, + initialMinChunkByChunkIndex: map[uint64][]uint16{ + // | validator 42 | validator 43 | + 0: {14, 9999, 9999, 9999, 15, 16, 9999, 9999}, + }, + expectedMinChunkByChunkIndex: map[uint64][]uint16{ + // | validator 42 | validator 43 | + 0: {14, neutralMin, neutralMin, 9999, 15, 16, neutralMin, 9999}, + }, + initialMaxChunkByChunkIndex: map[uint64][]uint16{ + // | validator 42 | validator 43 | + 0: {70, 9999, 9999, 9999, 71, 72, 9999, 9999}, + }, + expectedMaxChunkByChunkIndex: map[uint64][]uint16{ + // | validator 42 | validator 43 | + 0: {70, neutralMax, neutralMax, 9999, 71, 72, neutralMax, 9999}, + }, + }, + { + name: "start with some data - second chunk", + chunkSize: 4, + validatorChunkSize: 2, + historyLength: 8, + currentEpoch: 5, + validatorChunkIndex: 21, + latestUpdatedEpochByValidatorIndex: map[primitives.ValidatorIndex]primitives.Epoch{42: 1, 43: 2}, + initialMinChunkByChunkIndex: map[uint64][]uint16{ + // | validator 42 | validator 43 | + 0: {14, 13, 9999, 9999, 15, 16, 17, 9999}, + }, + expectedMinChunkByChunkIndex: map[uint64][]uint16{ + // | validator 42 | validator 43 | + 0: {14, 13, neutralMin, neutralMin, 15, 16, 17, neutralMin}, - // Check if the chunk at chunk index already exists in-memory. - s := &Service{ - params: &Parameters{ - chunkSize: 2, // 2 epochs in a chunk. - validatorChunkSize: 2, // 2 validators in a chunk. - historyLength: 4, + // | validator 42 | validator 43 | + 1: {neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin}, + }, + initialMaxChunkByChunkIndex: map[uint64][]uint16{ + // | validator 42 | validator 43 | + 0: {70, 69, 9999, 9999, 71, 72, 73, 9999}, + }, + expectedMaxChunkByChunkIndex: map[uint64][]uint16{ + // | validator 42 | validator 43 | + 0: {70, 69, neutralMax, neutralMax, 71, 72, 73, neutralMax}, + + // | validator 42 | validator 43 | + 1: {neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax}, + }, + }, + { + name: "start with some data - third chunk", + chunkSize: 4, + validatorChunkSize: 2, + historyLength: 12, + currentEpoch: 9, + validatorChunkIndex: 21, + latestUpdatedEpochByValidatorIndex: map[primitives.ValidatorIndex]primitives.Epoch{42: 5, 43: 6}, + initialMinChunkByChunkIndex: map[uint64][]uint16{ + // | validator 42 | validator 43 | + 1: {14, 13, 9999, 9999, 15, 16, 17, 9999}, + }, + expectedMinChunkByChunkIndex: map[uint64][]uint16{ + // | validator 42 | validator 43 | + 1: {14, 13, neutralMin, neutralMin, 15, 16, 17, neutralMin}, + + // | validator 42 | validator 43 | + 2: {neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin}, + }, + initialMaxChunkByChunkIndex: map[uint64][]uint16{ + // | validator 42 | validator 43 | + 1: {70, 69, 9999, 9999, 71, 72, 73, 9999}, + }, + expectedMaxChunkByChunkIndex: map[uint64][]uint16{ + // | validator 42 | validator 43 | + 1: {70, 69, neutralMax, neutralMax, 71, 72, 73, neutralMax}, + + // | validator 42 | validator 43 | + 2: {neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax}, + }, + }, + { + name: "start with some data - third chunk - wrap to first chunk", + chunkSize: 4, + validatorChunkSize: 2, + historyLength: 12, + currentEpoch: 14, + validatorChunkIndex: 21, + latestUpdatedEpochByValidatorIndex: map[primitives.ValidatorIndex]primitives.Epoch{42: 9, 43: 10}, + initialMinChunkByChunkIndex: map[uint64][]uint16{ + // | validator 42 | validator 43 | + 0: {55, 55, 55, 55, 55, 55, 55, 55}, + 1: {66, 66, 66, 66, 66, 66, 66, 66}, + + // | validator 42 | validator 43 | + 2: {77, 77, 9999, 9999, 77, 77, 77, 9999}, + }, + expectedMinChunkByChunkIndex: map[uint64][]uint16{ + // | validator 42 | validator 43 | + 2: {77, 77, neutralMin, neutralMin, 77, 77, 77, neutralMin}, + + // | validator 42 | validator 43 | + 0: {neutralMin, neutralMin, neutralMin, 55, neutralMin, neutralMin, neutralMin, 55}, + }, + initialMaxChunkByChunkIndex: map[uint64][]uint16{ + // | validator 42 | validator 43 | + 0: {55, 55, 55, 55, 55, 55, 55, 55}, + 1: {66, 66, 66, 66, 66, 66, 66, 66}, + + // | validator 42 | validator 43 | + 2: {77, 77, 9999, 9999, 77, 77, 77, 9999}, + }, + expectedMaxChunkByChunkIndex: map[uint64][]uint16{ + // | validator 42 | validator 43 | + 2: {77, 77, neutralMax, neutralMax, 77, 77, 77, neutralMax}, + + // | validator 42 | validator 43 | + 0: {neutralMax, neutralMax, neutralMax, 55, neutralMax, neutralMax, neutralMax, 55}, + }, }, - serviceCfg: &ServiceConfig{Database: slasherDB}, - latestEpochWrittenForValidator: map[primitives.ValidatorIndex]primitives.Epoch{}, } - t.Run("no update if no latest written epoch", func(t *testing.T) { - currentEpoch := primitives.Epoch(3) - // No last written epoch for both validators. - s.latestEpochWrittenForValidator = map[primitives.ValidatorIndex]primitives.Epoch{} + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + // Create context. + ctx := context.Background() - // Because the validators have no recorded latest epoch written, we expect - // no chunks to be loaded nor updated to. - updatedChunks, err := s.updatedChunkByChunkIndex( - ctx, slashertypes.MinSpan, currentEpoch, 0, // validatorChunkIndex - ) - require.NoError(t, err) - require.Equal(t, 0, len(updatedChunks)) - }) + // Initialize the slasher database. + slasherDB := dbtest.SetupSlasherDB(t) - t.Run("update from latest written epoch", func(t *testing.T) { - currentEpoch := primitives.Epoch(3) + // Intialize the slasher service. + service := &Service{ + params: &Parameters{ + chunkSize: tt.chunkSize, + validatorChunkSize: tt.validatorChunkSize, + historyLength: tt.historyLength, + }, + serviceCfg: &ServiceConfig{Database: slasherDB}, + latestEpochWrittenForValidator: tt.latestUpdatedEpochByValidatorIndex, + } - // Set the latest written epoch for validators to current epoch - 1. - latestWrittenEpoch := currentEpoch - 1 - s.latestEpochWrittenForValidator = map[primitives.ValidatorIndex]primitives.Epoch{ - 1: latestWrittenEpoch, - 2: latestWrittenEpoch, - } + // Save min initial chunks if they exist. + if tt.initialMinChunkByChunkIndex != nil { + minChunkerByChunkerIndex := map[uint64]Chunker{} + for chunkIndex, minChunk := range tt.initialMinChunkByChunkIndex { + minChunkerByChunkerIndex[chunkIndex] = &MinSpanChunksSlice{data: minChunk} + } - // Because the latest written epoch for the input validators is == 2, we expect - // that we will update all epochs from 2 up to 3 (the current epoch). This is all - // safe contained in chunk index 1. - updatedChunks, err := s.updatedChunkByChunkIndex( - ctx, slashertypes.MinSpan, currentEpoch, 0, // validatorChunkIndex, - ) - require.NoError(t, err) - require.Equal(t, 1, len(updatedChunks)) - _, ok := updatedChunks[1] - require.Equal(t, true, ok) - }) + err := service.saveUpdatedChunks(ctx, minChunkerByChunkerIndex, slashertypes.MinSpan, tt.validatorChunkIndex) + require.NoError(t, err) + } + + // Save max initial chunks if they exist. + if tt.initialMaxChunkByChunkIndex != nil { + maxChunkerByChunkerIndex := map[uint64]Chunker{} + for chunkIndex, maxChunk := range tt.initialMaxChunkByChunkIndex { + maxChunkerByChunkerIndex[chunkIndex] = &MaxSpanChunksSlice{data: maxChunk} + } + + err := service.saveUpdatedChunks(ctx, maxChunkerByChunkerIndex, slashertypes.MaxSpan, tt.validatorChunkIndex) + require.NoError(t, err) + } + + // Get chunks. + actualMinChunkByChunkIndex, err := service.updatedChunkByChunkIndex( + ctx, slashertypes.MinSpan, tt.currentEpoch, tt.validatorChunkIndex, + ) + + // Compare the actual and expected chunks. + require.NoError(t, err) + require.Equal(t, len(tt.expectedMinChunkByChunkIndex), len(actualMinChunkByChunkIndex)) + for chunkIndex, expectedMinChunk := range tt.expectedMinChunkByChunkIndex { + actualMinChunk, ok := actualMinChunkByChunkIndex[chunkIndex] + require.Equal(t, true, ok) + require.Equal(t, len(expectedMinChunk), len(actualMinChunk.Chunk())) + require.DeepSSZEqual(t, expectedMinChunk, actualMinChunk.Chunk()) + } + + actualMaxChunkByChunkIndex, err := service.updatedChunkByChunkIndex( + ctx, slashertypes.MaxSpan, tt.currentEpoch, tt.validatorChunkIndex, + ) + + require.NoError(t, err) + require.Equal(t, len(tt.expectedMaxChunkByChunkIndex), len(actualMaxChunkByChunkIndex)) + for chunkIndex, expectedMaxChunk := range tt.expectedMaxChunkByChunkIndex { + actualMaxChunk, ok := actualMaxChunkByChunkIndex[chunkIndex] + require.Equal(t, true, ok) + require.Equal(t, len(expectedMaxChunk), len(actualMaxChunk.Chunk())) + require.DeepSSZEqual(t, expectedMaxChunk, actualMaxChunk.Chunk()) + } + + }) + } } func Test_applyAttestationForValidator_MinSpanChunk(t *testing.T) { From f559e633325b6dcaf3b630a1bf6a890e3359b3ef Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Wed, 14 Feb 2024 18:28:08 +0100 Subject: [PATCH 11/11] Improve slasher cold boot duration. Before this commit, on a slasher cold boot (aka, without any db), the `updatedChunkByChunkIndex` function looped for all validators AND for all epochs between the genesis epoch and the current epoch. This could take several dozen of minutes, and it is useless since the min/max spans are actually a circular buffer with a limited lenght. Cells of min/max spans can be overwritten (with the same value) plenty of times. After this commit, the `updatedChunkByChunkIndex` function loops for all validators AND AT most 'historyLength' lenght. Every cell of min/max spans are written AT MOST once. Time needed for slasher boot goes from `O(nm)` to "only" `O(m)`, where: - `n` is the number of epochs since the genesis. - `m` is the number of validators. --- beacon-chain/slasher/detect_attestations.go | 14 ++++++-- .../slasher/detect_attestations_test.go | 33 +++++++++++++++++++ 2 files changed, 44 insertions(+), 3 deletions(-) diff --git a/beacon-chain/slasher/detect_attestations.go b/beacon-chain/slasher/detect_attestations.go index 4f59b461b07c..8f4643d71bbd 100644 --- a/beacon-chain/slasher/detect_attestations.go +++ b/beacon-chain/slasher/detect_attestations.go @@ -247,8 +247,6 @@ func (s *Service) updatedChunkByChunkIndex( currentEpoch primitives.Epoch, validatorChunkIndex uint64, ) (map[uint64]Chunker, error) { - var err error - chunkByChunkIndex := map[uint64]Chunker{} validatorIndexes := s.params.validatorIndexesInChunk(validatorChunkIndex) @@ -257,11 +255,21 @@ func (s *Service) updatedChunkByChunkIndex( latestEpochWritten, ok := s.latestEpochWrittenForValidator[validatorIndex] // Start from the epoch just after the latest epoch written. - epochToWrite := latestEpochWritten + 1 + epochToWrite, err := latestEpochWritten.SafeAdd(1) + if err != nil { + return nil, errors.Wrap(err, "could not add 1 to latest epoch written") + } + if !ok { epochToWrite = 0 } + // It is useless to update more than `historyLength` epochs, since + // the chunks are circular and we will be overwritten at least one. + if currentEpoch-epochToWrite >= s.params.historyLength { + epochToWrite = currentEpoch + 1 - s.params.historyLength + } + for epochToWrite <= currentEpoch { // Get the chunk index for the latest epoch written. chunkIndex := s.params.chunkIndex(epochToWrite) diff --git a/beacon-chain/slasher/detect_attestations_test.go b/beacon-chain/slasher/detect_attestations_test.go index 3d2b78de6509..1d87e776d545 100644 --- a/beacon-chain/slasher/detect_attestations_test.go +++ b/beacon-chain/slasher/detect_attestations_test.go @@ -1015,6 +1015,39 @@ func Test_epochUpdateForValidators(t *testing.T) { 0: {neutralMax, neutralMax, neutralMax, 55, neutralMax, neutralMax, neutralMax, 55}, }, }, + { + name: "start with some data - high latest updated epoch", + chunkSize: 4, + validatorChunkSize: 2, + historyLength: 12, + currentEpoch: 16, + validatorChunkIndex: 21, + latestUpdatedEpochByValidatorIndex: map[primitives.ValidatorIndex]primitives.Epoch{42: 2, 43: 3}, + initialMinChunkByChunkIndex: map[uint64][]uint16{ + // | validator 42 | validator 43 | + 0: {55, 55, 55, 55, 55, 55, 55, 55}, + 1: {66, 66, 66, 66, 66, 66, 66, 66}, + 2: {77, 77, 77, 77, 77, 77, 77, 77}, + }, + expectedMinChunkByChunkIndex: map[uint64][]uint16{ + // | validator 42 | validator 43 | + 0: {neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin}, + 1: {neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin}, + 2: {neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin, neutralMin}, + }, + initialMaxChunkByChunkIndex: map[uint64][]uint16{ + // | validator 42 | validator 43 | + 0: {55, 55, 55, 55, 55, 55, 55, 55}, + 1: {66, 66, 66, 66, 66, 66, 66, 66}, + 2: {77, 77, 77, 77, 77, 77, 77, 77}, + }, + expectedMaxChunkByChunkIndex: map[uint64][]uint16{ + // | validator 42 | validator 43 | + 0: {neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax}, + 1: {neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax}, + 2: {neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax, neutralMax}, + }, + }, } for _, tt := range testCases {