Skip to content

Commit

Permalink
fix(state/grandpa): track changes across forks (#2519)
Browse files Browse the repository at this point in the history
* feat: tracking grandpa messages across forks


Co-authored-by: Quentin McGaw <quentin.mcgaw@gmail.com>
Co-authored-by: jimboj <jamesdjohnson218@gmail.com>
  • Loading branch information
3 people authored Jul 5, 2022
1 parent b09eb07 commit 3ab76bc
Show file tree
Hide file tree
Showing 28 changed files with 2,412 additions and 590 deletions.
4 changes: 2 additions & 2 deletions dot/core/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import (
"github.com/ChainSafe/gossamer/lib/runtime/wasmer"
"github.com/ChainSafe/gossamer/lib/services"
"github.com/ChainSafe/gossamer/lib/transaction"
cscale "github.com/centrifuge/go-substrate-rpc-client/v3/scale"
ctypes "github.com/centrifuge/go-substrate-rpc-client/v3/types"
cscale "github.com/centrifuge/go-substrate-rpc-client/v4/scale"
ctypes "github.com/centrifuge/go-substrate-rpc-client/v4/types"
)

var (
Expand Down
302 changes: 79 additions & 223 deletions dot/digest/digest.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ var (
_ services.Service = &Handler{}
)

var (
ErrUnknownConsensusEngineID = errors.New("unknown consensus engine ID")
)

// Handler is used to handle consensus messages and relevant authority updates to BABE and GRANDPA
type Handler struct {
ctx context.Context
Expand All @@ -32,28 +36,9 @@ type Handler struct {
imported chan *types.Block
finalised chan *types.FinalisationInfo

// GRANDPA changes
grandpaScheduledChange *grandpaChange
grandpaForcedChange *grandpaChange
grandpaPause *pause
grandpaResume *resume

logger log.LeveledLogger
}

type grandpaChange struct {
auths []types.Authority
atBlock uint
}

type pause struct {
atBlock uint
}

type resume struct {
atBlock uint
}

// NewHandler returns a new Handler
func NewHandler(lvl log.Level, blockState BlockState, epochState EpochState,
grandpaState GrandpaState) (*Handler, error) {
Expand Down Expand Up @@ -91,44 +76,80 @@ func (h *Handler) Stop() error {
return nil
}

// NextGrandpaAuthorityChange returns the block number of the next upcoming grandpa authorities change.
// It returns 0 if no change is scheduled.
func (h *Handler) NextGrandpaAuthorityChange() (next uint) {
next = ^uint(0)

if h.grandpaScheduledChange != nil {
next = h.grandpaScheduledChange.atBlock
// HandleDigests handles consensus digests for an imported block
func (h *Handler) HandleDigests(header *types.Header) error {
consensusDigests := h.toConsensusDigests(header.Digest.Types)
consensusDigests, err := checkForGRANDPAForcedChanges(consensusDigests)
if err != nil {
return fmt.Errorf("failed while checking GRANDPA digests: %w", err)
}

if h.grandpaForcedChange != nil && h.grandpaForcedChange.atBlock < next {
next = h.grandpaForcedChange.atBlock
for i := range consensusDigests {
// avoiding implicit memory aliasing in for loop, since:
// for _, digest := range consensusDigests { &digest }
// is using the address of a loop variable
digest := consensusDigests[i]
err := h.handleConsensusDigest(&digest, header)
if err != nil {
h.logger.Errorf("cannot handle consensus digest: %w", err)
}
}

if h.grandpaPause != nil && h.grandpaPause.atBlock < next {
next = h.grandpaPause.atBlock
}
return nil
}

// toConsensusDigests converts a slice of scale.VaryingDataType to a slice of types.ConsensusDigest.
func (h *Handler) toConsensusDigests(scaleVaryingTypes []scale.VaryingDataType) []types.ConsensusDigest {
consensusDigests := make([]types.ConsensusDigest, 0, len(scaleVaryingTypes))

for _, d := range scaleVaryingTypes {
digest, ok := d.Value().(types.ConsensusDigest)
if !ok {
h.logger.Debugf("digest type not supported: %T", d.Value())
continue
}

if h.grandpaResume != nil && h.grandpaResume.atBlock < next {
next = h.grandpaResume.atBlock
switch digest.ConsensusEngineID {
case types.GrandpaEngineID, types.BabeEngineID:
consensusDigests = append(consensusDigests, digest)
}
}

return next
return consensusDigests
}

// HandleDigests handles consensus digests for an imported block
func (h *Handler) HandleDigests(header *types.Header) {
for i, d := range header.Digest.Types {
val, ok := d.Value().(types.ConsensusDigest)
if !ok {
// checkForGRANDPAForcedChanges removes any GrandpaScheduledChange in the presence of a
// GrandpaForcedChange in the same block digest, returning a new slice of types.ConsensusDigest
func checkForGRANDPAForcedChanges(digests []types.ConsensusDigest) ([]types.ConsensusDigest, error) {
var hasForcedChange bool
digestsWithoutScheduled := make([]types.ConsensusDigest, 0, len(digests))
for _, digest := range digests {
if digest.ConsensusEngineID != types.GrandpaEngineID {
digestsWithoutScheduled = append(digestsWithoutScheduled, digest)
continue
}

err := h.handleConsensusDigest(&val, header)
data := types.NewGrandpaConsensusDigest()
err := scale.Unmarshal(digest.Data, &data)
if err != nil {
h.logger.Errorf("cannot handle digest for block number %d, index %d, digest %s: %s",
header.Number, i, d.Value(), err)
return nil, fmt.Errorf("cannot unmarshal GRANDPA consensus digest: %w", err)
}

switch data.Value().(type) {
case types.GrandpaScheduledChange:
case types.GrandpaForcedChange:
hasForcedChange = true
digestsWithoutScheduled = append(digestsWithoutScheduled, digest)
default:
digestsWithoutScheduled = append(digestsWithoutScheduled, digest)
}
}

if hasForcedChange {
return digestsWithoutScheduled, nil
}

return digests, nil
}

func (h *Handler) handleConsensusDigest(d *types.ConsensusDigest, header *types.Header) error {
Expand All @@ -139,42 +160,19 @@ func (h *Handler) handleConsensusDigest(d *types.ConsensusDigest, header *types.
if err != nil {
return err
}
err = h.handleGrandpaConsensusDigest(data, header)
if err != nil {
return err
}
return nil

return h.grandpaState.HandleGRANDPADigest(header, data)
case types.BabeEngineID:
data := types.NewBabeConsensusDigest()
err := scale.Unmarshal(d.Data, &data)
if err != nil {
return err
}
err = h.handleBabeConsensusDigest(data, header)
if err != nil {
return err
}
return nil
}

return errors.New("unknown consensus engine ID")
}

func (h *Handler) handleGrandpaConsensusDigest(digest scale.VaryingDataType, header *types.Header) error {
switch val := digest.Value().(type) {
case types.GrandpaScheduledChange:
return h.handleScheduledChange(val, header)
case types.GrandpaForcedChange:
return h.handleForcedChange(val, header)
case types.GrandpaOnDisabled:
return nil // do nothing, as this is not implemented in substrate
case types.GrandpaPause:
return h.handlePause(val)
case types.GrandpaResume:
return h.handleResume(val)
return h.handleBabeConsensusDigest(data, header)
default:
return fmt.Errorf("%w: 0x%x", ErrUnknownConsensusEngineID, d.ConsensusEngineID.ToBytes())
}

return errors.New("invalid consensus digest data")
}

func (h *Handler) handleBabeConsensusDigest(digest scale.VaryingDataType, header *types.Header) error {
Expand All @@ -194,7 +192,7 @@ func (h *Handler) handleBabeConsensusDigest(digest scale.VaryingDataType, header
return nil

case types.BABEOnDisabled:
return h.handleBABEOnDisabled(val, header)
return nil

case types.NextConfigData:
currEpoch, err := h.epochState.GetEpochForBlock(header)
Expand All @@ -220,10 +218,14 @@ func (h *Handler) handleBlockImport(ctx context.Context) {
continue
}

h.HandleDigests(&block.Header)
err := h.handleGrandpaChangesOnImport(block.Header.Number)
err := h.HandleDigests(&block.Header)
if err != nil {
h.logger.Errorf("failed to handle grandpa changes on block import: %s", err)
h.logger.Errorf("failed to handle digests: %s", err)
}

err = h.grandpaState.ApplyForcedChanges(&block.Header)
if err != nil {
h.logger.Errorf("failed to apply forced changes: %s", err)
}
case <-ctx.Done():
return
Expand All @@ -249,159 +251,13 @@ func (h *Handler) handleBlockFinalisation(ctx context.Context) {
h.logger.Errorf("failed to persist babe next epoch config: %s", err)
}

err = h.handleGrandpaChangesOnFinalization(info.Header.Number)
err = h.grandpaState.ApplyScheduledChanges(&info.Header)
if err != nil {
h.logger.Errorf("failed to handle grandpa changes on block finalisation: %s", err)
h.logger.Errorf("failed to apply scheduled change: %s", err)
}

case <-ctx.Done():
return
}
}
}

func (h *Handler) handleGrandpaChangesOnImport(num uint) error {
resume := h.grandpaResume
if resume != nil && num >= resume.atBlock {
h.grandpaResume = nil
}

fc := h.grandpaForcedChange
if fc != nil && num >= fc.atBlock {
curr, err := h.grandpaState.IncrementSetID()
if err != nil {
return err
}

h.grandpaForcedChange = nil
h.logger.Debugf("incremented grandpa set id %d", curr)
}

return nil
}

func (h *Handler) handleGrandpaChangesOnFinalization(num uint) error {
pause := h.grandpaPause
if pause != nil && num >= pause.atBlock {
h.grandpaPause = nil
}

sc := h.grandpaScheduledChange
if sc != nil && num >= sc.atBlock {
curr, err := h.grandpaState.IncrementSetID()
if err != nil {
return err
}

h.grandpaScheduledChange = nil
h.logger.Debugf("incremented grandpa set id %d", curr)
}

// if blocks get finalised before forced change takes place, disregard it
h.grandpaForcedChange = nil
return nil
}

func (h *Handler) handleScheduledChange(sc types.GrandpaScheduledChange, header *types.Header) error {
curr, err := h.blockState.BestBlockHeader()
if err != nil {
return err
}

if h.grandpaScheduledChange != nil {
return nil
}

h.logger.Debugf("handling GrandpaScheduledChange data: %v", sc)

c, err := newGrandpaChange(sc.Auths, sc.Delay, curr.Number)
if err != nil {
return err
}

h.grandpaScheduledChange = c

auths, err := types.GrandpaAuthoritiesRawToAuthorities(sc.Auths)
if err != nil {
return err
}
h.logger.Debugf("setting GrandpaScheduledChange at block %d",
header.Number+uint(sc.Delay))
return h.grandpaState.SetNextChange(
types.NewGrandpaVotersFromAuthorities(auths),
header.Number+uint(sc.Delay),
)
}

func (h *Handler) handleForcedChange(fc types.GrandpaForcedChange, header *types.Header) error {
if header == nil {
return errors.New("header is nil")
}

if h.grandpaForcedChange != nil {
return errors.New("already have forced change scheduled")
}

h.logger.Debugf("handling GrandpaForcedChange with data %v", fc)

c, err := newGrandpaChange(fc.Auths, fc.Delay, header.Number)
if err != nil {
return err
}

h.grandpaForcedChange = c

auths, err := types.GrandpaAuthoritiesRawToAuthorities(fc.Auths)
if err != nil {
return err
}

h.logger.Debugf("setting GrandpaForcedChange at block %d",
header.Number+uint(fc.Delay))
return h.grandpaState.SetNextChange(
types.NewGrandpaVotersFromAuthorities(auths),
header.Number+uint(fc.Delay),
)
}

func (h *Handler) handlePause(p types.GrandpaPause) error {
curr, err := h.blockState.BestBlockHeader()
if err != nil {
return err
}

h.grandpaPause = &pause{
atBlock: curr.Number + uint(p.Delay),
}

return h.grandpaState.SetNextPause(h.grandpaPause.atBlock)
}

func (h *Handler) handleResume(r types.GrandpaResume) error {
curr, err := h.blockState.BestBlockHeader()
if err != nil {
return err
}

h.grandpaResume = &resume{
atBlock: curr.Number + uint(r.Delay),
}

return h.grandpaState.SetNextResume(h.grandpaResume.atBlock)
}

func newGrandpaChange(raw []types.GrandpaAuthoritiesRaw, delay uint32, currBlock uint) (*grandpaChange, error) {
auths, err := types.GrandpaAuthoritiesRawToAuthorities(raw)
if err != nil {
return nil, err
}

return &grandpaChange{
auths: auths,
atBlock: currBlock + uint(delay),
}, nil
}

func (h *Handler) handleBABEOnDisabled(_ types.BABEOnDisabled, _ *types.Header) error {
h.logger.Debug("handling BABEOnDisabled")
return nil
}
Loading

0 comments on commit 3ab76bc

Please sign in to comment.