Skip to content

Commit

Permalink
fix(healthmanager)_: extract subscriber logic from RPC Health Manager (
Browse files Browse the repository at this point in the history
…#6147)

- Subscription common logic is extracted to a separate type.
- Fix race condition where a goroutine extracts value from sync.Map and then another goroutine calls unsubscribe and closes the channel before the first goroutine writes to the channel.
- Moved TestInterleavedChainStatusChanges and TestDelayedChainUpdate to the correct file.
- Renamed test suites with duplicate names.

updates CODEOWNERS
closes #6139

Co-authored-by: Igor Sirotin <sirotin@status.im>
  • Loading branch information
friofry and igor-sirotin authored Dec 4, 2024
1 parent 233f2f9 commit f3eed58
Show file tree
Hide file tree
Showing 8 changed files with 279 additions and 157 deletions.
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ shell.nix @status-im/devops
# Feel free to add yourself for any new packages you implement.
/cmd/status-backend @igor-sirotin
/internal/sentry @igor-sirotin
/healthmanager @friofry
34 changes: 10 additions & 24 deletions healthmanager/blockchain_health_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@ type BlockchainStatus struct {
StatusPerChain map[uint64]rpcstatus.ProviderStatus `json:"statusPerChain"`
}

// BlockchainHealthManager manages the state of all providers and aggregates their statuses.
type BlockchainHealthManager struct {
mu sync.RWMutex
aggregator *aggregator.Aggregator
subscribers sync.Map // thread-safe
mu sync.RWMutex
aggregator *aggregator.Aggregator
subscriptionManager *SubscriptionManager

providers map[uint64]*ProvidersHealthManager
cancelFuncs map[uint64]context.CancelFunc // Map chainID to cancel functions
Expand All @@ -38,9 +37,10 @@ type BlockchainHealthManager struct {
func NewBlockchainHealthManager() *BlockchainHealthManager {
agg := aggregator.NewAggregator("blockchain")
return &BlockchainHealthManager{
aggregator: agg,
providers: make(map[uint64]*ProvidersHealthManager),
cancelFuncs: make(map[uint64]context.CancelFunc),
aggregator: agg,
providers: make(map[uint64]*ProvidersHealthManager),
cancelFuncs: make(map[uint64]context.CancelFunc),
subscriptionManager: &SubscriptionManager{subscribers: make(map[chan struct{}]struct{})},
}
}

Expand Down Expand Up @@ -109,15 +109,12 @@ func (b *BlockchainHealthManager) Stop() {

// Subscribe allows clients to receive notifications about changes.
func (b *BlockchainHealthManager) Subscribe() chan struct{} {
ch := make(chan struct{}, 1)
b.subscribers.Store(ch, struct{}{})
return ch
return b.subscriptionManager.Subscribe()
}

// Unsubscribe removes a subscriber from receiving notifications.
func (b *BlockchainHealthManager) Unsubscribe(ch chan struct{}) {
b.subscribers.Delete(ch) // Удаляем подписчика из sync.Map
close(ch)
b.subscriptionManager.Unsubscribe(ch)
}

// aggregateAndUpdateStatus collects statuses from all providers and updates the overall and short status.
Expand Down Expand Up @@ -185,18 +182,7 @@ func compareShortStatus(newStatus, previousStatus BlockchainStatus) bool {

// emitBlockchainHealthStatus sends a notification to all subscribers about the new blockchain status.
func (b *BlockchainHealthManager) emitBlockchainHealthStatus(ctx context.Context) {
b.subscribers.Range(func(key, value interface{}) bool {
subscriber := key.(chan struct{})
select {
case <-ctx.Done():
// Stop sending notifications when the context is cancelled
return false
case subscriber <- struct{}{}:
default:
// Skip notification if the subscriber's channel is full (non-blocking)
}
return true
})
b.subscriptionManager.Emit(ctx)
}

func (b *BlockchainHealthManager) GetFullStatus() BlockchainFullStatus {
Expand Down
96 changes: 84 additions & 12 deletions healthmanager/blockchain_health_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,17 +113,9 @@ func (s *BlockchainHealthManagerSuite) TestConcurrentSubscriptionUnsubscription(
}

wg.Wait()

activeSubscribersCount := 0
s.manager.subscribers.Range(func(key, value interface{}) bool {
activeSubscribersCount++
return true
})

// After all subscribers are removed, there should be no active subscribers
s.Equal(0, activeSubscribersCount, "Expected no subscribers after unsubscription")
s.Equal(0, len(s.manager.subscriptionManager.subscribers), "Expected no subscribers after unsubscription")
}

func (s *BlockchainHealthManagerSuite) TestConcurrency() {
var wg sync.WaitGroup
chainsCount := 10
Expand All @@ -137,21 +129,27 @@ func (s *BlockchainHealthManagerSuite) TestConcurrency() {
}

ch := s.manager.Subscribe()
defer s.manager.Unsubscribe(ch)

for i := 1; i <= chainsCount; i++ {
wg.Add(1)
go func(chainID uint64) {
defer wg.Done()
phm := s.manager.providers[chainID]
for j := 0; j < providersCount; j++ {
wg.Add(1)
err := errors.New("connection error")
if j == providersCount-1 {
err = nil
}
name := fmt.Sprintf("provider-%d", j)
go phm.Update(ctx, []rpcstatus.RpcProviderCallStatus{
{Name: name, Timestamp: time.Now(), Err: err},
})

go func(name string, err error) {
defer wg.Done()
phm.Update(ctx, []rpcstatus.RpcProviderCallStatus{
{Name: name, Timestamp: time.Now(), Err: err},
})
}(name, err)
}
}(uint64(i))
}
Expand Down Expand Up @@ -232,6 +230,80 @@ func (s *BlockchainHealthManagerSuite) TestMixedProviderStatusInSingleChain() {
s.Equal(rpcstatus.StatusUp, shortStatus.StatusPerChain[1].Status) // Chain 1 should be marked as down
}

func (s *BlockchainHealthManagerSuite) TestInterleavedChainStatusChanges() {
// Register providers for chains 1, 2, and 3
phm1 := NewProvidersHealthManager(1)
phm2 := NewProvidersHealthManager(2)
phm3 := NewProvidersHealthManager(3)
err := s.manager.RegisterProvidersHealthManager(s.ctx, phm1)
s.Require().NoError(err)
err = s.manager.RegisterProvidersHealthManager(s.ctx, phm2)
s.Require().NoError(err)
err = s.manager.RegisterProvidersHealthManager(s.ctx, phm3)
s.Require().NoError(err)

// Subscribe to status updates
ch := s.manager.Subscribe()

defer s.manager.Unsubscribe(ch)

// Initially, all chains are up
phm1.Update(s.ctx, []rpcstatus.RpcProviderCallStatus{{Name: "provider_chain1", Timestamp: time.Now(), Err: nil}})
phm2.Update(s.ctx, []rpcstatus.RpcProviderCallStatus{{Name: "provider_chain2", Timestamp: time.Now(), Err: nil}})
phm3.Update(s.ctx, []rpcstatus.RpcProviderCallStatus{{Name: "provider_chain3", Timestamp: time.Now(), Err: nil}})

// Wait for the status to propagate
s.waitForUpdate(ch, rpcstatus.StatusUp, 100*time.Millisecond)

// Now chain 1 goes down, and chain 3 goes down at the same time
phm1.Update(s.ctx, []rpcstatus.RpcProviderCallStatus{{Name: "provider_chain1", Timestamp: time.Now(), Err: errors.New("connection error")}})
phm3.Update(s.ctx, []rpcstatus.RpcProviderCallStatus{{Name: "provider_chain3", Timestamp: time.Now(), Err: errors.New("connection error")}})

// Wait for the status to reflect the changes
s.waitForUpdate(ch, rpcstatus.StatusUp, 100*time.Millisecond)

// Check that short status correctly reflects the mixed state
shortStatus := s.manager.GetStatusPerChain()
s.Equal(rpcstatus.StatusUp, shortStatus.Status.Status)
s.Equal(rpcstatus.StatusDown, shortStatus.StatusPerChain[1].Status) // Chain 1 is down
s.Equal(rpcstatus.StatusUp, shortStatus.StatusPerChain[2].Status) // Chain 2 is still up
s.Equal(rpcstatus.StatusDown, shortStatus.StatusPerChain[3].Status) // Chain 3 is down
}

func (s *BlockchainHealthManagerSuite) TestDelayedChainUpdate() {
// Register providers for chains 1 and 2
phm1 := NewProvidersHealthManager(1)
phm2 := NewProvidersHealthManager(2)
err := s.manager.RegisterProvidersHealthManager(s.ctx, phm1)
s.Require().NoError(err)
err = s.manager.RegisterProvidersHealthManager(s.ctx, phm2)
s.Require().NoError(err)

// Subscribe to status updates
ch := s.manager.Subscribe()
defer s.manager.Unsubscribe(ch)

// Initially, both chains are up
phm1.Update(s.ctx, []rpcstatus.RpcProviderCallStatus{{Name: "provider1_chain1", Timestamp: time.Now(), Err: nil}})
s.waitForUpdate(ch, rpcstatus.StatusUp, 100*time.Millisecond)
phm2.Update(s.ctx, []rpcstatus.RpcProviderCallStatus{{Name: "provider1_chain2", Timestamp: time.Now(), Err: nil}})
s.waitForUpdate(ch, rpcstatus.StatusUp, 100*time.Millisecond)

// Chain 2 goes down
phm2.Update(s.ctx, []rpcstatus.RpcProviderCallStatus{{Name: "provider1_chain2", Timestamp: time.Now(), Err: errors.New("connection error")}})
s.waitForUpdate(ch, rpcstatus.StatusUp, 100*time.Millisecond)

// Chain 1 goes down after a delay
phm1.Update(s.ctx, []rpcstatus.RpcProviderCallStatus{{Name: "provider1_chain1", Timestamp: time.Now(), Err: errors.New("connection error")}})
s.waitForUpdate(ch, rpcstatus.StatusDown, 100*time.Millisecond)

// Check that short status reflects the final state where both chains are down
shortStatus := s.manager.GetStatusPerChain()
s.Equal(rpcstatus.StatusDown, shortStatus.Status.Status)
s.Equal(rpcstatus.StatusDown, shortStatus.StatusPerChain[1].Status) // Chain 1 is down
s.Equal(rpcstatus.StatusDown, shortStatus.StatusPerChain[2].Status) // Chain 2 is down
}

func TestBlockchainHealthManagerSuite(t *testing.T) {
suite.Run(t, new(BlockchainHealthManagerSuite))
}
46 changes: 12 additions & 34 deletions healthmanager/providers_health_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,21 @@ import (
)

type ProvidersHealthManager struct {
mu sync.RWMutex
chainID uint64
aggregator *aggregator.Aggregator
subscribers sync.Map // Use sync.Map for concurrent access to subscribers
lastStatus *rpcstatus.ProviderStatus
mu sync.RWMutex
chainID uint64
aggregator *aggregator.Aggregator
subscriptionManager *SubscriptionManager
lastStatus *rpcstatus.ProviderStatus
}

// NewProvidersHealthManager creates a new instance of ProvidersHealthManager with the given chain ID.
func NewProvidersHealthManager(chainID uint64) *ProvidersHealthManager {
agg := aggregator.NewAggregator(fmt.Sprintf("%d", chainID))

return &ProvidersHealthManager{
chainID: chainID,
aggregator: agg,
chainID: chainID,
aggregator: agg,
subscriptionManager: NewSubscriptionManager(),
}
}

Expand Down Expand Up @@ -61,25 +62,12 @@ func (p *ProvidersHealthManager) GetStatuses() map[string]rpcstatus.ProviderStat

// Subscribe allows providers to receive notifications about changes.
func (p *ProvidersHealthManager) Subscribe() chan struct{} {
ch := make(chan struct{}, 1)
p.subscribers.Store(ch, struct{}{})
return ch
return p.subscriptionManager.Subscribe()
}

// Unsubscribe removes a subscriber from receiving notifications.
func (p *ProvidersHealthManager) Unsubscribe(ch chan struct{}) {
p.subscribers.Delete(ch)
close(ch)
}

// UnsubscribeAll removes all subscriber channels.
func (p *ProvidersHealthManager) UnsubscribeAll() {
p.subscribers.Range(func(key, value interface{}) bool {
ch := key.(chan struct{})
close(ch)
p.subscribers.Delete(key)
return true
})
p.subscriptionManager.Unsubscribe(ch)
}

// Reset clears all provider statuses and resets the chain status to unknown.
Expand All @@ -89,7 +77,7 @@ func (p *ProvidersHealthManager) Reset() {
p.aggregator = aggregator.NewAggregator(fmt.Sprintf("%d", p.chainID))
}

// Status Returns the current aggregated status.
// Status returns the current aggregated status.
func (p *ProvidersHealthManager) Status() rpcstatus.ProviderStatus {
p.mu.RLock()
defer p.mu.RUnlock()
Expand All @@ -103,15 +91,5 @@ func (p *ProvidersHealthManager) ChainID() uint64 {

// emitChainStatus sends a notification to all subscribers.
func (p *ProvidersHealthManager) emitChainStatus(ctx context.Context) {
p.subscribers.Range(func(key, value interface{}) bool {
subscriber := key.(chan struct{})
select {
case subscriber <- struct{}{}:
case <-ctx.Done():
return false // Stop sending if context is done
default:
// Non-blocking send; skip if the channel is full
}
return true
})
p.subscriptionManager.Emit(ctx)
}
78 changes: 5 additions & 73 deletions healthmanager/providers_health_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ func (s *ProvidersHealthManagerSuite) TestInitialStatus() {
}

func (s *ProvidersHealthManagerSuite) TestUpdateProviderStatuses() {
s.updateAndWait(s.phm.Subscribe(), []rpcstatus.RpcProviderCallStatus{
ch := s.phm.Subscribe()
defer s.phm.Unsubscribe(ch)
s.updateAndWait(ch, []rpcstatus.RpcProviderCallStatus{
{Name: "Provider1", Timestamp: time.Now(), Err: nil},
{Name: "Provider2", Timestamp: time.Now(), Err: errors.New("connection error")},
}, rpcstatus.StatusUp, time.Second)
Expand All @@ -75,6 +77,7 @@ func (s *ProvidersHealthManagerSuite) TestUpdateProviderStatuses() {

func (s *ProvidersHealthManagerSuite) TestChainStatusUpdatesOnce() {
ch := s.phm.Subscribe()
defer s.phm.Unsubscribe(ch)
s.assertChainStatus(rpcstatus.StatusDown)

// Update providers to Down
Expand All @@ -88,6 +91,7 @@ func (s *ProvidersHealthManagerSuite) TestChainStatusUpdatesOnce() {

func (s *ProvidersHealthManagerSuite) TestSubscribeReceivesOnlyOnChange() {
ch := s.phm.Subscribe()
defer s.phm.Unsubscribe(ch)

// Update provider to Up and wait for notification
upStatuses := []rpcstatus.RpcProviderCallStatus{
Expand Down Expand Up @@ -136,78 +140,6 @@ func (s *ProvidersHealthManagerSuite) TestConcurrency() {
s.Equal(chainStatus, rpcstatus.StatusUp, "Expected chain status to be either Up or Down")
}

func (s *BlockchainHealthManagerSuite) TestInterleavedChainStatusChanges() {
// Register providers for chains 1, 2, and 3
phm1 := NewProvidersHealthManager(1)
phm2 := NewProvidersHealthManager(2)
phm3 := NewProvidersHealthManager(3)
err := s.manager.RegisterProvidersHealthManager(s.ctx, phm1)
s.Require().NoError(err)
err = s.manager.RegisterProvidersHealthManager(s.ctx, phm2)
s.Require().NoError(err)
err = s.manager.RegisterProvidersHealthManager(s.ctx, phm3)
s.Require().NoError(err)

// Subscribe to status updates
ch := s.manager.Subscribe()
defer s.manager.Unsubscribe(ch)

// Initially, all chains are up
phm1.Update(s.ctx, []rpcstatus.RpcProviderCallStatus{{Name: "provider_chain1", Timestamp: time.Now(), Err: nil}})
phm2.Update(s.ctx, []rpcstatus.RpcProviderCallStatus{{Name: "provider_chain2", Timestamp: time.Now(), Err: nil}})
phm3.Update(s.ctx, []rpcstatus.RpcProviderCallStatus{{Name: "provider_chain3", Timestamp: time.Now(), Err: nil}})

// Wait for the status to propagate
s.waitForUpdate(ch, rpcstatus.StatusUp, 100*time.Millisecond)

// Now chain 1 goes down, and chain 3 goes down at the same time
phm1.Update(s.ctx, []rpcstatus.RpcProviderCallStatus{{Name: "provider_chain1", Timestamp: time.Now(), Err: errors.New("connection error")}})
phm3.Update(s.ctx, []rpcstatus.RpcProviderCallStatus{{Name: "provider_chain3", Timestamp: time.Now(), Err: errors.New("connection error")}})

// Wait for the status to reflect the changes
s.waitForUpdate(ch, rpcstatus.StatusUp, 100*time.Millisecond)

// Check that short status correctly reflects the mixed state
shortStatus := s.manager.GetStatusPerChain()
s.Equal(rpcstatus.StatusUp, shortStatus.Status.Status)
s.Equal(rpcstatus.StatusDown, shortStatus.StatusPerChain[1].Status) // Chain 1 is down
s.Equal(rpcstatus.StatusUp, shortStatus.StatusPerChain[2].Status) // Chain 2 is still up
s.Equal(rpcstatus.StatusDown, shortStatus.StatusPerChain[3].Status) // Chain 3 is down
}

func (s *BlockchainHealthManagerSuite) TestDelayedChainUpdate() {
// Register providers for chains 1 and 2
phm1 := NewProvidersHealthManager(1)
phm2 := NewProvidersHealthManager(2)
err := s.manager.RegisterProvidersHealthManager(s.ctx, phm1)
s.Require().NoError(err)
err = s.manager.RegisterProvidersHealthManager(s.ctx, phm2)
s.Require().NoError(err)

// Subscribe to status updates
ch := s.manager.Subscribe()
defer s.manager.Unsubscribe(ch)

// Initially, both chains are up
phm1.Update(s.ctx, []rpcstatus.RpcProviderCallStatus{{Name: "provider1_chain1", Timestamp: time.Now(), Err: nil}})
phm2.Update(s.ctx, []rpcstatus.RpcProviderCallStatus{{Name: "provider1_chain2", Timestamp: time.Now(), Err: nil}})
s.waitForUpdate(ch, rpcstatus.StatusUp, 100*time.Millisecond)

// Chain 2 goes down
phm2.Update(s.ctx, []rpcstatus.RpcProviderCallStatus{{Name: "provider1_chain2", Timestamp: time.Now(), Err: errors.New("connection error")}})
s.waitForUpdate(ch, rpcstatus.StatusUp, 100*time.Millisecond)

// Chain 1 goes down after a delay
phm1.Update(s.ctx, []rpcstatus.RpcProviderCallStatus{{Name: "provider1_chain1", Timestamp: time.Now(), Err: errors.New("connection error")}})
s.waitForUpdate(ch, rpcstatus.StatusDown, 100*time.Millisecond)

// Check that short status reflects the final state where both chains are down
shortStatus := s.manager.GetStatusPerChain()
s.Equal(rpcstatus.StatusDown, shortStatus.Status.Status)
s.Equal(rpcstatus.StatusDown, shortStatus.StatusPerChain[1].Status) // Chain 1 is down
s.Equal(rpcstatus.StatusDown, shortStatus.StatusPerChain[2].Status) // Chain 2 is down
}

func TestProvidersHealthManagerSuite(t *testing.T) {
suite.Run(t, new(ProvidersHealthManagerSuite))
}
Loading

0 comments on commit f3eed58

Please sign in to comment.