Skip to content

Commit

Permalink
refactor block remote validation code
Browse files Browse the repository at this point in the history
  • Loading branch information
j75689 committed Apr 19, 2022
1 parent f8c6a4a commit b3a701d
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 11 deletions.
7 changes: 7 additions & 0 deletions core/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,13 @@ func (v *BlockValidator) ValidateBody(block *types.Block) error {
}
return nil
},
func() error {
if v.remoteValidator != nil && !v.remoteValidator.AncestorVerified(block.Header()) {
return fmt.Errorf("%w, number: %s, hash: %s", ErrAncestorHasNotBeenVerified, block.Number(), block.Hash())
}

return nil
},
}
validateRes := make(chan error, len(validateFuns))
for _, f := range validateFuns {
Expand Down
9 changes: 0 additions & 9 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2092,15 +2092,6 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
}()

for ; block != nil && err == nil || err == ErrKnownBlock; block, err = it.next() {
if bc.validator.RemoteVerifyManager() != nil {
for !bc.Validator().RemoteVerifyManager().AncestorVerified(block.Header()) {
if bc.insertStopped() {
break
}
log.Info("block ancestor has not been verified", "number", block.Number(), "hash", block.Hash())
time.Sleep(100 * time.Millisecond)
}
}
// If the chain is terminating, stop processing blocks
if bc.insertStopped() {
log.Debug("Abort during block processing")
Expand Down
3 changes: 3 additions & 0 deletions core/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ var (
// ErrDiffLayerNotFound is returned when diff layer not found.
ErrDiffLayerNotFound = errors.New("diff layer not found")

// ErrDiffLayerNotFound is returned when block - 11 has not been verified by the remote verifier.
ErrAncestorHasNotBeenVerified = errors.New("block ancestor has not been verified")

// ErrCurrentBlockNotFound is returned when current block not found.
ErrCurrentBlockNotFound = errors.New("current block not found")

Expand Down
28 changes: 26 additions & 2 deletions core/remote_state_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"math/big"
"math/rand"
"sync"
"time"

lru "github.com/hashicorp/golang-lru"
Expand Down Expand Up @@ -40,6 +41,7 @@ var (

type remoteVerifyManager struct {
bc *BlockChain
taskLock sync.RWMutex
tasks map[common.Hash]*verifyTask
peers verifyPeers
verifiedCache *lru.Cache
Expand Down Expand Up @@ -109,14 +111,17 @@ func (vm *remoteVerifyManager) mainLoop() {
vm.NewBlockVerifyTask(h.Block.Header())
case hash := <-vm.verifyCh:
vm.cacheBlockVerified(hash)
vm.taskLock.Lock()
if task, ok := vm.tasks[hash]; ok {
delete(vm.tasks, hash)
verifyTaskCounter.Dec(1)
verifyTaskSucceedMeter.Mark(1)
verifyTaskExecutionTimer.Update(time.Since(task.startAt))
close(task.terminalCh)
}
vm.taskLock.Unlock()
case <-pruneTicker.C:
vm.taskLock.Lock()
for hash, task := range vm.tasks {
if vm.bc.CurrentHeader().Number.Cmp(task.blockHeader.Number) == 1 &&
vm.bc.CurrentHeader().Number.Uint64()-task.blockHeader.Number.Uint64() > pruneHeightDiff {
Expand All @@ -126,16 +131,21 @@ func (vm *remoteVerifyManager) mainLoop() {
close(task.terminalCh)
}
}
vm.taskLock.Unlock()
case message := <-vm.messageCh:
vm.taskLock.RLock()
if vt, ok := vm.tasks[message.verifyResult.BlockHash]; ok {
vt.messageCh <- message
}
vm.taskLock.RUnlock()

// System stopped
case <-vm.bc.quit:
vm.taskLock.RLock()
for _, task := range vm.tasks {
close(task.terminalCh)
}
vm.taskLock.RUnlock()
return
case <-vm.chainHeadSub.Err():
return
Expand All @@ -156,7 +166,10 @@ func (vm *remoteVerifyManager) NewBlockVerifyTask(header *types.Header) {
return
}
// if there already has a verify task for this block, skip.
if _, ok := vm.tasks[hash]; ok {
vm.taskLock.RLock()
_, ok := vm.tasks[hash]
vm.taskLock.RUnlock()
if ok {
return
}

Expand Down Expand Up @@ -184,7 +197,9 @@ func (vm *remoteVerifyManager) NewBlockVerifyTask(header *types.Header) {
return
}
verifyTask := NewVerifyTask(diffHash, header, vm.peers, vm.verifyCh, vm.allowInsecure)
vm.taskLock.Lock()
vm.tasks[hash] = verifyTask
vm.taskLock.Unlock()
verifyTaskCounter.Inc(1)
}(header.Hash())
header = vm.bc.GetHeaderByHash(header.ParentHash)
Expand All @@ -208,7 +223,16 @@ func (vm *remoteVerifyManager) AncestorVerified(header *types.Header) bool {
}

hash := header.Hash()
_, exist := vm.verifiedCache.Get(hash)

// Check if the task is complete
vm.taskLock.RLock()
task, exist := vm.tasks[hash]
vm.taskLock.RUnlock()
if exist {
<-task.terminalCh
}

_, exist = vm.verifiedCache.Get(hash)
return exist
}

Expand Down
4 changes: 4 additions & 0 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state/snapshot"
"github.com/ethereum/go-ethereum/core/types"
Expand Down Expand Up @@ -1812,6 +1813,9 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error {
// of the blocks delivered from the downloader, and the indexing will be off.
log.Debug("Downloaded item processing failed on sidechain import", "index", index, "err", err)
}
if errors.Is(err, core.ErrAncestorHasNotBeenVerified) {
return err
}
return fmt.Errorf("%w: %v", errInvalidChain, err)
}
return nil
Expand Down

0 comments on commit b3a701d

Please sign in to comment.