Skip to content

Commit

Permalink
wait for confirmation block in epoch start block syncer
Browse files Browse the repository at this point in the history
  • Loading branch information
ssd04 committed Sep 25, 2024
1 parent 85dca04 commit 712e0da
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 23 deletions.
168 changes: 145 additions & 23 deletions epochStart/bootstrap/epochStartMetaBlockProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package bootstrap

import (
"context"
"fmt"
"math"
"sync"
"time"
Expand All @@ -26,14 +27,21 @@ const minNumConnectedPeers = 1
var _ process.InterceptorProcessor = (*epochStartMetaBlockProcessor)(nil)

type epochStartMetaBlockProcessor struct {
messenger Messenger
requestHandler RequestHandler
marshalizer marshal.Marshalizer
hasher hashing.Hasher
mutReceivedMetaBlocks sync.RWMutex
mapReceivedMetaBlocks map[string]data.MetaHeaderHandler
mapMetaBlocksFromPeers map[string][]core.PeerID
messenger Messenger
requestHandler RequestHandler
marshalizer marshal.Marshalizer
hasher hashing.Hasher
mutReceivedMetaBlocks sync.RWMutex
mapReceivedMetaBlocks map[string]data.MetaHeaderHandler
mapMetaBlocksFromPeers map[string][]core.PeerID

mutReceivedConfMetaBlocks sync.RWMutex
mapReceivedConfMetaBlocks map[string]data.MetaHeaderHandler
mapConfMetaBlocksFromPeers map[string][]core.PeerID

chanConsensusReached chan bool
chanMetaBlockReached chan bool
chanConfMetaBlockReached chan bool

Check failure on line 44 in epochStart/bootstrap/epochStartMetaBlockProcessor.go

View workflow job for this annotation

GitHub Actions / golangci linter

field `chanConfMetaBlockReached` is unused (unused)
metaBlock data.MetaHeaderHandler
peerCountTarget int
minNumConnectedPeers int
Expand Down Expand Up @@ -82,6 +90,8 @@ func NewEpochStartMetaBlockProcessor(
mutReceivedMetaBlocks: sync.RWMutex{},
mapReceivedMetaBlocks: make(map[string]data.MetaHeaderHandler),
mapMetaBlocksFromPeers: make(map[string][]core.PeerID),
mapReceivedConfMetaBlocks: make(map[string]data.MetaHeaderHandler),
mapConfMetaBlocksFromPeers: make(map[string][]core.PeerID),
chanConsensusReached: make(chan bool, 1),
}

Expand Down Expand Up @@ -136,22 +146,45 @@ func (e *epochStartMetaBlockProcessor) Save(data process.InterceptedData, fromCo
return nil
}

if !metaBlock.IsStartOfEpochBlock() {
log.Debug("received metablock is not of type epoch start", "error", epochStart.ErrNotEpochStartBlock)
mbHash := interceptedHdr.Hash()

if metaBlock.IsStartOfEpochBlock() {
log.Debug("received epoch start meta", "epoch", metaBlock.GetEpoch(), "from peer", fromConnectedPeer.Pretty())
e.mutReceivedMetaBlocks.Lock()
e.mapReceivedMetaBlocks[string(mbHash)] = metaBlock
e.addToPeerList(string(mbHash), fromConnectedPeer)
e.mutReceivedMetaBlocks.Unlock()

return nil
}

mbHash := interceptedHdr.Hash()
if e.isEpochStartConfirmationBlock(metaBlock) {
log.Debug("received epoch start confirmation meta", "epoch", metaBlock.GetEpoch(), "from peer", fromConnectedPeer.Pretty())
e.mutReceivedConfMetaBlocks.Lock()
e.mapReceivedConfMetaBlocks[string(mbHash)] = metaBlock
e.addToConfPeerList(string(mbHash), fromConnectedPeer)
e.mutReceivedConfMetaBlocks.Unlock()

log.Debug("received epoch start meta", "epoch", metaBlock.GetEpoch(), "from peer", fromConnectedPeer.Pretty())
e.mutReceivedMetaBlocks.Lock()
e.mapReceivedMetaBlocks[string(mbHash)] = metaBlock
e.addToPeerList(string(mbHash), fromConnectedPeer)
e.mutReceivedMetaBlocks.Unlock()
return nil
}

log.Debug("received metablock is not of type epoch start", "error", epochStart.ErrNotEpochStartBlock)
return nil
}

func (e *epochStartMetaBlockProcessor) isEpochStartConfirmationBlock(metaBlock data.HeaderHandler) bool {
startOfEpochMetaBlock, err := e.getMostReceivedMetaBlock()
if err != nil {
return false
}

if startOfEpochMetaBlock.GetNonce() != metaBlock.GetNonce()-1 {
return false
}

return true
}

// this func should be called under mutex protection
func (e *epochStartMetaBlockProcessor) addToPeerList(hash string, peer core.PeerID) {
peersListForHash := e.mapMetaBlocksFromPeers[hash]
Expand All @@ -163,6 +196,16 @@ func (e *epochStartMetaBlockProcessor) addToPeerList(hash string, peer core.Peer
e.mapMetaBlocksFromPeers[hash] = append(e.mapMetaBlocksFromPeers[hash], peer)
}

func (e *epochStartMetaBlockProcessor) addToConfPeerList(hash string, peer core.PeerID) {
peersListForHash := e.mapConfMetaBlocksFromPeers[hash]
for _, pid := range peersListForHash {
if pid == peer {
return
}
}
e.mapConfMetaBlocksFromPeers[hash] = append(e.mapConfMetaBlocksFromPeers[hash], peer)
}

// GetEpochStartMetaBlock will return the metablock after it is confirmed or an error if the number of tries was exceeded
// This is a blocking method which will end after the consensus for the meta block is obtained or the context is done
func (e *epochStartMetaBlockProcessor) GetEpochStartMetaBlock(ctx context.Context) (data.MetaHeaderHandler, error) {
Expand All @@ -180,7 +223,16 @@ func (e *epochStartMetaBlockProcessor) GetEpochStartMetaBlock(ctx context.Contex
}
}()

err = e.requestMetaBlock()
err = e.waitForMetaBlock(ctx)
if err != nil {
return nil, err
}

if check.IfNil(e.metaBlock) {
return nil, epochStart.ErrNilMetaBlock
}

err = e.requestConfirmationMetaBlock(e.metaBlock.GetNonce())
if err != nil {
return nil, err
}
Expand All @@ -194,7 +246,7 @@ func (e *epochStartMetaBlockProcessor) GetEpochStartMetaBlock(ctx context.Contex
case <-ctx.Done():
return e.getMostReceivedMetaBlock()
case <-chanRequests:
err = e.requestMetaBlock()
err = e.requestConfirmationMetaBlock(e.metaBlock.GetNonce())
if err != nil {
return nil, err
}
Expand All @@ -206,6 +258,34 @@ func (e *epochStartMetaBlockProcessor) GetEpochStartMetaBlock(ctx context.Contex
}
}

func (e *epochStartMetaBlockProcessor) waitForMetaBlock(ctx context.Context) error {
err := e.requestMetaBlock()
if err != nil {
return err
}

chanRequests := time.After(durationBetweenReRequests)
chanCheckMaps := time.After(durationBetweenChecks)

for {
select {
case <-e.chanMetaBlockReached:
return nil
case <-ctx.Done():
return fmt.Errorf("did not received epoch start meta block")
case <-chanRequests:
err = e.requestMetaBlock()
if err != nil {
return err
}
chanRequests = time.After(durationBetweenReRequests)
case <-chanCheckMaps:
e.checkMetaBlockMaps()
chanCheckMaps = time.After(durationBetweenChecks)
}
}
}

func (e *epochStartMetaBlockProcessor) getMostReceivedMetaBlock() (data.MetaHeaderHandler, error) {
e.mutReceivedMetaBlocks.RLock()
defer e.mutReceivedMetaBlocks.RUnlock()
Expand Down Expand Up @@ -238,27 +318,69 @@ func (e *epochStartMetaBlockProcessor) requestMetaBlock() error {
return nil
}

func (e *epochStartMetaBlockProcessor) checkMaps() {
func (e *epochStartMetaBlockProcessor) requestConfirmationMetaBlock(nonce uint64) error {
numConnectedPeers := len(e.messenger.ConnectedPeers())
err := e.requestHandler.SetNumPeersToQuery(factory.MetachainBlocksTopic, numConnectedPeers, numConnectedPeers)
if err != nil {
return err
}

e.requestHandler.RequestMetaHeaderByNonce(nonce)

return nil
}

func (e *epochStartMetaBlockProcessor) checkMetaBlockMaps() {
e.mutReceivedMetaBlocks.RLock()
defer e.mutReceivedMetaBlocks.RUnlock()

for hash, peersList := range e.mapMetaBlocksFromPeers {
log.Debug("metablock from peers", "num peers", len(peersList), "target", e.peerCountTarget, "hash", []byte(hash))
found := e.processEntry(peersList, hash)
if found {
metaBlockFound := e.processMetaBlockEntry(peersList, hash)
if metaBlockFound {
e.metaBlock = e.mapReceivedMetaBlocks[hash]
e.chanMetaBlockReached <- true
break
}
}
}

func (e *epochStartMetaBlockProcessor) processEntry(
func (e *epochStartMetaBlockProcessor) checkMaps() {
e.mutReceivedMetaBlocks.RLock()
defer e.mutReceivedMetaBlocks.RUnlock()

metaBlockFound := e.checkReceivedMetaBlock(e.mapMetaBlocksFromPeers)
confMetaBlockFound := e.checkReceivedMetaBlock(e.mapConfMetaBlocksFromPeers)

// no need to check proof here since it is check in interceptor
if metaBlockFound && confMetaBlockFound {
e.chanConsensusReached <- true
}
}

func (e *epochStartMetaBlockProcessor) checkReceivedMetaBlock(blocksFromPeers map[string][]core.PeerID) bool {
for hash, peersList := range blocksFromPeers {
log.Debug("metablock from peers", "num peers", len(peersList), "target", e.peerCountTarget, "hash", []byte(hash))

metaBlockFound := e.processMetaBlockEntry(peersList, hash)
if metaBlockFound {
return true
}
}

return false
}

func (e *epochStartMetaBlockProcessor) checkMetaBlockProof() bool {

Check failure on line 374 in epochStart/bootstrap/epochStartMetaBlockProcessor.go

View workflow job for this annotation

GitHub Actions / golangci linter

func `(*epochStartMetaBlockProcessor).checkMetaBlockProof` is unused (unused)
return true
}

func (e *epochStartMetaBlockProcessor) processMetaBlockEntry(
peersList []core.PeerID,
hash string,
) bool {
if len(peersList) >= e.peerCountTarget {
log.Info("got consensus for epoch start metablock", "len", len(peersList))
e.metaBlock = e.mapReceivedMetaBlocks[hash]
e.chanConsensusReached <- true
return true
}

Expand Down
1 change: 1 addition & 0 deletions epochStart/bootstrap/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type Messenger interface {
// RequestHandler defines which methods a request handler should implement
type RequestHandler interface {
RequestStartOfEpochMetaBlock(epoch uint32)
RequestMetaHeaderByNonce(nonce uint64)
SetNumPeersToQuery(topic string, intra int, cross int) error
GetNumPeersToQuery(topic string) (int, int, error)
IsInterfaceNil() bool
Expand Down

0 comments on commit 712e0da

Please sign in to comment.