Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix committee test race #12338

Merged
merged 8 commits into from
Apr 27, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions beacon-chain/cache/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ go_library(
"@com_github_pkg_errors//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@io_k8s_client_go//tools/cache:go_default_library",
"@io_opencensus_go//trace:go_default_library",
],
Expand Down
13 changes: 10 additions & 3 deletions beacon-chain/cache/active_balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,16 @@ type BalanceCache struct {

// NewEffectiveBalanceCache creates a new effective balance cache for storing/accessing total balance by epoch.
func NewEffectiveBalanceCache() *BalanceCache {
return &BalanceCache{
cache: lruwrpr.New(maxBalanceCacheSize),
}
c := &BalanceCache{}
c.Clear()
return c
}

// Clear resets the SyncCommitteeCache to its initial state
func (c *BalanceCache) Clear() {
c.lock.Lock()
defer c.lock.Unlock()
c.cache = lruwrpr.New(maxBalanceCacheSize)
}

// AddTotalEffectiveBalance adds a new total effective balance entry for current balance for state `st` into the cache.
Expand Down
10 changes: 5 additions & 5 deletions beacon-chain/cache/active_balance_disabled.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,11 @@
package cache

import (
"sync"

lru "github.com/hashicorp/golang-lru"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state"
)

// FakeBalanceCache is a fake struct with 1 LRU cache for looking up balance by epoch.
type FakeBalanceCache struct {
cache *lru.Cache
lock sync.RWMutex
}

// NewEffectiveBalanceCache creates a new effective balance cache for storing/accessing total balance by epoch.
Expand All @@ -29,3 +24,8 @@ func (c *FakeBalanceCache) AddTotalEffectiveBalance(st state.ReadOnlyBeaconState
func (c *FakeBalanceCache) Get(st state.ReadOnlyBeaconState) (uint64, error) {
return 0, nil
}

// Clear is a stub.
func (c *FakeBalanceCache) Clear() {
return
}
15 changes: 11 additions & 4 deletions beacon-chain/cache/committee.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,17 @@ func committeeKeyFn(obj interface{}) (string, error) {

// NewCommitteesCache creates a new committee cache for storing/accessing shuffled indices of a committee.
func NewCommitteesCache() *CommitteeCache {
return &CommitteeCache{
CommitteeCache: lruwrpr.New(maxCommitteesCacheSize),
inProgress: make(map[string]bool),
}
cc := &CommitteeCache{}
cc.Clear()
return cc
}

// Clear resets the CommitteeCache to its initial state
func (c *CommitteeCache) Clear() {
c.lock.Lock()
defer c.lock.Unlock()
c.CommitteeCache = lruwrpr.New(maxCommitteesCacheSize)
c.inProgress = make(map[string]bool)
}

// Committee fetches the shuffled indices by slot and committee index. Every list of indices
Expand Down
5 changes: 5 additions & 0 deletions beacon-chain/cache/committee_disabled.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,8 @@ func (c *FakeCommitteeCache) MarkInProgress(seed [32]byte) error {
func (c *FakeCommitteeCache) MarkNotInProgress(seed [32]byte) error {
return nil
}

// Clear is a stub.
func (c *FakeCommitteeCache) Clear() {
return nil
}
13 changes: 10 additions & 3 deletions beacon-chain/cache/proposer_indices.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,16 @@ func proposerIndicesKeyFn(obj interface{}) (string, error) {

// NewProposerIndicesCache creates a new proposer indices cache for storing/accessing proposer index assignments of an epoch.
func NewProposerIndicesCache() *ProposerIndicesCache {
return &ProposerIndicesCache{
proposerIndicesCache: cache.NewFIFO(proposerIndicesKeyFn),
}
c := &ProposerIndicesCache{}
c.Clear()
return c
}

// Clear resets the ProposerIndicesCache to its initial state
func (c *ProposerIndicesCache) Clear() {
c.lock.Lock()
defer c.lock.Unlock()
c.proposerIndicesCache = cache.NewFIFO(proposerIndicesKeyFn)
}

// AddProposerIndices adds ProposerIndices object to the cache.
Expand Down
4 changes: 4 additions & 0 deletions beacon-chain/cache/proposer_indices_disabled.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,7 @@ func (c *FakeProposerIndicesCache) HasProposerIndices(r [32]byte) (bool, error)
func (c *FakeProposerIndicesCache) Len() int {
return 0
}

// Clear is a stub.
func (c *FakeProposerIndicesCache) Clear() {
}
29 changes: 24 additions & 5 deletions beacon-chain/cache/sync_committee.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ package cache

import (
"sync"
"sync/atomic"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
log "github.com/sirupsen/logrus"
"k8s.io/client-go/tools/cache"
)

Expand All @@ -31,8 +33,9 @@ var (
// SyncCommitteeCache utilizes a FIFO cache to sufficiently cache validator position within sync committee.
// It is thread safe with concurrent read write.
type SyncCommitteeCache struct {
cache *cache.FIFO
lock sync.RWMutex
cache *cache.FIFO
lock sync.RWMutex
cleared *atomic.Uint64
}

// Index position of all validators in sync committee where `currentSyncCommitteeRoot` is the
Expand All @@ -51,9 +54,17 @@ type positionInCommittee struct {

// NewSyncCommittee initializes and returns a new SyncCommitteeCache.
func NewSyncCommittee() *SyncCommitteeCache {
return &SyncCommitteeCache{
cache: cache.NewFIFO(keyFn),
}
c := &SyncCommitteeCache{cleared: &atomic.Uint64{}}
c.Clear()
return c
}

// Clear resets the SyncCommitteeCache to its initial state
func (s *SyncCommitteeCache) Clear() {
s.lock.Lock()
defer s.lock.Unlock()
s.cleared.Add(1)
s.cache = cache.NewFIFO(keyFn)
}

// CurrentPeriodIndexPosition returns current period index position of a validator index with respect with
Expand Down Expand Up @@ -123,6 +134,10 @@ func (s *SyncCommitteeCache) idxPositionInCommittee(
// current epoch and next epoch. This should be called when `current_sync_committee` and `next_sync_committee`
// change and that happens every `EPOCHS_PER_SYNC_COMMITTEE_PERIOD`.
func (s *SyncCommitteeCache) UpdatePositionsInCommittee(syncCommitteeBoundaryRoot [32]byte, st state.BeaconState) error {
// since we call UpdatePositionsInCommittee asynchronously, keep track of the cache value
// seen at the beginning of the routine and compare at the end before updating. If the underlying value has been
// cycled (new address), don't update it.
clearCount := s.cleared.Load()
csc, err := st.CurrentSyncCommittee()
if err != nil {
return err
Expand Down Expand Up @@ -162,6 +177,10 @@ func (s *SyncCommitteeCache) UpdatePositionsInCommittee(syncCommitteeBoundaryRoo

s.lock.Lock()
defer s.lock.Unlock()
if clearCount != s.cleared.Load() {
log.Warn("cache rotated during async committee update operation - abandoning cache update")
return nil
}

if err := s.cache.Add(&syncCommitteeIndexPosition{
currentSyncCommitteeRoot: syncCommitteeBoundaryRoot,
Expand Down
5 changes: 5 additions & 0 deletions beacon-chain/cache/sync_committee_disabled.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,8 @@ func (s *FakeSyncCommitteeCache) NextPeriodIndexPosition(root [32]byte, valIdx p
func (s *FakeSyncCommitteeCache) UpdatePositionsInCommittee(syncCommitteeBoundaryRoot [32]byte, state state.BeaconState) error {
return nil
}

// Clear -- fake.
func (s *FakeSyncCommitteeCache) Clear() {
return
}
1 change: 1 addition & 0 deletions beacon-chain/core/helpers/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ go_test(
"weak_subjectivity_test.go",
],
embed = [":go_default_library"],
race = "on",
shard_count = 2,
deps = [
"//beacon-chain/cache:go_default_library",
Expand Down
8 changes: 4 additions & 4 deletions beacon-chain/core/helpers/beacon_committee.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,10 +382,10 @@ func UpdateProposerIndicesInCache(ctx context.Context, state state.ReadOnlyBeaco

// ClearCache clears the beacon committee cache and sync committee cache.
func ClearCache() {
committeeCache = cache.NewCommitteesCache()
proposerIndicesCache = cache.NewProposerIndicesCache()
syncCommitteeCache = cache.NewSyncCommittee()
balanceCache = cache.NewEffectiveBalanceCache()
committeeCache.Clear()
proposerIndicesCache.Clear()
syncCommitteeCache.Clear()
balanceCache.Clear()
}

// computeCommittee returns the requested shuffled committee out of the total committees using
Expand Down
10 changes: 8 additions & 2 deletions beacon-chain/core/helpers/beacon_committee_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func TestVerifyBitfieldLength_OK(t *testing.T) {

func TestCommitteeAssignments_CannotRetrieveFutureEpoch(t *testing.T) {
ClearCache()
defer ClearCache()
epoch := primitives.Epoch(1)
state, err := state_native.InitializeFromProtoPhase0(&ethpb.BeaconState{
Slot: 0, // Epoch 0.
Expand All @@ -101,6 +102,8 @@ func TestCommitteeAssignments_CannotRetrieveFutureEpoch(t *testing.T) {
}

func TestCommitteeAssignments_NoProposerForSlot0(t *testing.T) {
ClearCache()
defer ClearCache()
validators := make([]*ethpb.Validator, 4*params.BeaconConfig().SlotsPerEpoch)
for i := 0; i < len(validators); i++ {
var activationEpoch primitives.Epoch
Expand All @@ -118,7 +121,6 @@ func TestCommitteeAssignments_NoProposerForSlot0(t *testing.T) {
RandaoMixes: make([][]byte, params.BeaconConfig().EpochsPerHistoricalVector),
})
require.NoError(t, err)
ClearCache()
_, proposerIndexToSlots, err := CommitteeAssignments(context.Background(), state, 0)
require.NoError(t, err, "Failed to determine CommitteeAssignments")
for _, ss := range proposerIndexToSlots {
Expand Down Expand Up @@ -188,6 +190,7 @@ func TestCommitteeAssignments_CanRetrieve(t *testing.T) {
},
}

defer ClearCache()
for i, tt := range tests {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
ClearCache()
Expand Down Expand Up @@ -255,6 +258,8 @@ func TestCommitteeAssignments_CannotRetrieveOlderThanSlotsPerHistoricalRoot(t *t
}

func TestCommitteeAssignments_EverySlotHasMin1Proposer(t *testing.T) {
ClearCache()
defer ClearCache()
// Initialize test with 256 validators, each slot and each index gets 4 validators.
validators := make([]*ethpb.Validator, 4*params.BeaconConfig().SlotsPerEpoch)
for i := 0; i < len(validators); i++ {
Expand All @@ -269,7 +274,6 @@ func TestCommitteeAssignments_EverySlotHasMin1Proposer(t *testing.T) {
RandaoMixes: make([][]byte, params.BeaconConfig().EpochsPerHistoricalVector),
})
require.NoError(t, err)
ClearCache()
epoch := primitives.Epoch(1)
_, proposerIndexToSlots, err := CommitteeAssignments(context.Background(), state, epoch)
require.NoError(t, err, "Failed to determine CommitteeAssignments")
Expand Down Expand Up @@ -376,6 +380,7 @@ func TestVerifyAttestationBitfieldLengths_OK(t *testing.T) {
},
}

defer ClearCache()
for i, tt := range tests {
ClearCache()
require.NoError(t, state.SetSlot(tt.stateSlot))
Expand All @@ -390,6 +395,7 @@ func TestVerifyAttestationBitfieldLengths_OK(t *testing.T) {

func TestUpdateCommitteeCache_CanUpdate(t *testing.T) {
ClearCache()
defer ClearCache()
validatorCount := params.BeaconConfig().MinGenesisActiveValidatorCount
validators := make([]*ethpb.Validator, validatorCount)
indices := make([]primitives.ValidatorIndex, validatorCount)
Expand Down
4 changes: 4 additions & 0 deletions beacon-chain/core/helpers/rewards_penalties_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ func TestTotalActiveBalance(t *testing.T) {
}

func TestTotalActiveBal_ReturnMin(t *testing.T) {
ClearCache()
defer ClearCache()
tests := []struct {
vCount int
}{
Expand All @@ -96,6 +98,8 @@ func TestTotalActiveBal_ReturnMin(t *testing.T) {
}

func TestTotalActiveBalance_WithCache(t *testing.T) {
ClearCache()
defer ClearCache()
tests := []struct {
vCount int
wantCount int
Expand Down
4 changes: 1 addition & 3 deletions beacon-chain/core/helpers/sync_committee.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ var (
// along with the sync committee root.
// 1. Checks if the public key exists in the sync committee cache
// 2. If 1 fails, checks if the public key exists in the input current sync committee object
func IsCurrentPeriodSyncCommittee(
st state.BeaconState, valIdx primitives.ValidatorIndex,
) (bool, error) {
func IsCurrentPeriodSyncCommittee(st state.BeaconState, valIdx primitives.ValidatorIndex) (bool, error) {
root, err := syncPeriodBoundaryRoot(st)
if err != nil {
return false, err
Expand Down
Loading