Skip to content

Commit

Permalink
Refactor uptime tracking (#1388)
Browse files Browse the repository at this point in the history
Co-authored-by: Darioush Jalali <darioush.jalali@avalabs.org>
Co-authored-by: Michael Kaplan <55204436+michaelkaplan13@users.noreply.github.com>
  • Loading branch information
3 people authored Nov 15, 2024
1 parent cc414c8 commit fad48ec
Show file tree
Hide file tree
Showing 23 changed files with 665 additions and 625 deletions.
8 changes: 4 additions & 4 deletions plugin/evm/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,19 @@ func (api *ValidatorsAPI) GetCurrentValidators(_ *http.Request, _ *struct{}, rep
api.vm.ctx.Lock.RLock()
defer api.vm.ctx.Lock.RUnlock()

vIDs := api.vm.validatorState.GetValidationIDs()
vIDs := api.vm.validatorsManager.GetValidationIDs()

reply.Validators = make([]CurrentValidator, 0, vIDs.Len())

for _, vID := range vIDs.List() {
validator, err := api.vm.validatorState.GetValidator(vID)
validator, err := api.vm.validatorsManager.GetValidator(vID)
if err != nil {
return err
}

isConnected := api.vm.uptimeManager.IsConnected(validator.NodeID)
isConnected := api.vm.validatorsManager.IsConnected(validator.NodeID)

uptime, _, err := api.vm.uptimeManager.CalculateUptime(validator.NodeID)
uptime, _, err := api.vm.validatorsManager.CalculateUptime(validator.NodeID)
if err != nil {
return err
}
Expand Down
30 changes: 30 additions & 0 deletions plugin/evm/validators/interfaces/interfaces.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package interfaces

import (
"context"
"time"

"github.com/ava-labs/avalanchego/ids"
avalancheuptime "github.com/ava-labs/avalanchego/snow/uptime"
stateinterfaces "github.com/ava-labs/subnet-evm/plugin/evm/validators/state/interfaces"
)

type ValidatorReader interface {
// GetValidatorAndUptime returns the uptime of the validator specified by validationID
GetValidatorAndUptime(validationID ids.ID) (stateinterfaces.Validator, time.Duration, time.Time, error)
}

type Manager interface {
stateinterfaces.State
avalancheuptime.Manager
ValidatorReader

// Sync updates the validator set managed
// by the manager
Sync(ctx context.Context) error
// DispatchSync starts the sync process
DispatchSync(ctx context.Context)
}
161 changes: 161 additions & 0 deletions plugin/evm/validators/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package validators

import (
"context"
"fmt"
"time"

"github.com/ava-labs/avalanchego/database"
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/snow"
avalancheuptime "github.com/ava-labs/avalanchego/snow/uptime"
avalanchevalidators "github.com/ava-labs/avalanchego/snow/validators"
"github.com/ava-labs/avalanchego/utils/timer/mockable"
"github.com/ava-labs/subnet-evm/plugin/evm/validators/interfaces"
validators "github.com/ava-labs/subnet-evm/plugin/evm/validators/state"
stateinterfaces "github.com/ava-labs/subnet-evm/plugin/evm/validators/state/interfaces"
"github.com/ava-labs/subnet-evm/plugin/evm/validators/uptime"
uptimeinterfaces "github.com/ava-labs/subnet-evm/plugin/evm/validators/uptime/interfaces"

"github.com/ethereum/go-ethereum/log"
)

const (
SyncFrequency = 1 * time.Minute
)

type manager struct {
chainCtx *snow.Context
stateinterfaces.State
uptimeinterfaces.PausableManager
}

// NewManager returns a new validator manager
// that manages the validator state and the uptime manager.
func NewManager(
ctx *snow.Context,
db database.Database,
clock *mockable.Clock,
) (interfaces.Manager, error) {
validatorState, err := validators.NewState(db)
if err != nil {
return nil, fmt.Errorf("failed to initialize validator state: %w", err)
}

// Initialize uptime manager
uptimeManager := uptime.NewPausableManager(avalancheuptime.NewManager(validatorState, clock))
validatorState.RegisterListener(uptimeManager)

return &manager{
chainCtx: ctx,
State: validatorState,
PausableManager: uptimeManager,
}, nil
}

// GetValidatorAndUptime returns the calculated uptime of the validator specified by validationID
// and the last updated time.
// GetValidatorAndUptime holds the chain context lock while performing the operation and can be called concurrently.
func (m *manager) GetValidatorAndUptime(validationID ids.ID) (stateinterfaces.Validator, time.Duration, time.Time, error) {
// lock the state
m.chainCtx.Lock.RLock()
defer m.chainCtx.Lock.RUnlock()

// Get validator first
vdr, err := m.GetValidator(validationID)
if err != nil {
return stateinterfaces.Validator{}, 0, time.Time{}, fmt.Errorf("failed to get validator: %w", err)
}

uptime, lastUpdated, err := m.CalculateUptime(vdr.NodeID)
if err != nil {
return stateinterfaces.Validator{}, 0, time.Time{}, fmt.Errorf("failed to get uptime: %w", err)
}

return vdr, uptime, lastUpdated, nil
}

// DispatchSync starts the sync process
// DispatchSync holds the chain context lock while performing the sync.
func (m *manager) DispatchSync(ctx context.Context) {
ticker := time.NewTicker(SyncFrequency)
defer ticker.Stop()

for {
select {
case <-ticker.C:
m.chainCtx.Lock.Lock()
if err := m.Sync(ctx); err != nil {
log.Error("failed to sync validators", "error", err)
}
m.chainCtx.Lock.Unlock()
case <-ctx.Done():
return
}
}
}

// Sync synchronizes the validator state with the current validator set
// and writes the state to the database.
// Sync is not safe to call concurrently and should be called with the chain context locked.
func (m *manager) Sync(ctx context.Context) error {
now := time.Now()
log.Debug("performing validator sync")
// get current validator set
currentValidatorSet, _, err := m.chainCtx.ValidatorState.GetCurrentValidatorSet(ctx, m.chainCtx.SubnetID)
if err != nil {
return fmt.Errorf("failed to get current validator set: %w", err)
}

// load the current validator set into the validator state
if err := loadValidators(m.State, currentValidatorSet); err != nil {
return fmt.Errorf("failed to load current validators: %w", err)
}

// write validators to the database
if err := m.State.WriteState(); err != nil {
return fmt.Errorf("failed to write validator state: %w", err)
}

// TODO: add metrics
log.Debug("validator sync complete", "duration", time.Since(now))
return nil
}

// loadValidators loads the [validators] into the validator state [validatorState]
func loadValidators(validatorState stateinterfaces.State, newValidators map[ids.ID]*avalanchevalidators.GetCurrentValidatorOutput) error {
currentValidationIDs := validatorState.GetValidationIDs()
// first check if we need to delete any existing validators
for vID := range currentValidationIDs {
// if the validator is not in the new set of validators
// delete the validator
if _, exists := newValidators[vID]; !exists {
validatorState.DeleteValidator(vID)
}
}

// then load the new validators
for newVID, newVdr := range newValidators {
currentVdr := stateinterfaces.Validator{
ValidationID: newVID,
NodeID: newVdr.NodeID,
Weight: newVdr.Weight,
StartTimestamp: newVdr.StartTime,
IsActive: newVdr.IsActive,
IsSoV: newVdr.IsSoV,
}
if currentValidationIDs.Contains(newVID) {
if err := validatorState.UpdateValidator(currentVdr); err != nil {
return err
}
} else {
if err := validatorState.AddValidator(currentVdr); err != nil {
return err
}
}
}
return nil
}
Loading

0 comments on commit fad48ec

Please sign in to comment.