Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release Candidate 2023.2.7 (dev -> main) #4487

Merged
merged 11 commits into from
Aug 12, 2023
59 changes: 59 additions & 0 deletions .mergify.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
pull_request_rules:
- name: Squash merge rule to dev
conditions:
# applied for merge to the dev branch
- base=dev
# no unresolved threads
- "#review-threads-unresolved=0"
# Approved by two reviewers
- "#approved-reviews-by>=2"
# no unverified commit
- "#commits-unverified=0"
# Travis ci succeeded
- "check-success=Travis CI - Pull Request"
# git guardian succeeded
- "check-success=GitGuardian Security Checks"
# PR is not a draft
- -draft
# PR is not conflicting with the base branch
- -conflict
# conditions to avoid auto merge mistakes
# PR title doesn't have wip (not case sensitive)
- -title~=(?i)wip
# PR doesn't have WIP label (not case sensitive)
- label!=(?i)wip
# ready-to-merge is required to trigger the merge
- label=ready-to-merge
actions:
merge:
method: squash
- name: merge rule to main
conditions:
# from the dev branch : no direct PR to main
- head=dev
# applied for merge to the dev branch
- base=main
# no unresolved threads
- "#review-threads-unresolved=0"
# Approved by two reviewers
- "#approved-reviews-by>=2"
# no unverified commit
- "#commits-unverified=0"
# Travis ci succeeded
- "check-success=Travis CI - Pull Request"
# git guardian succeeded
- "check-success=GitGuardian Security Checks"
# PR is not a draft
- -draft
# PR is not conflicting with the base branch
- -conflict
# conditions to avoid auto merge mistakes
# PR title doesn't have wip (not case sensitive)
- -title~=(?i)wip
# PR doesn't have WIP label (not case sensitive)
- label!=(?i)wip
# ready-to-merge is required to trigger the merge
- label=ready-to-merge
actions:
merge:
method: merge
2 changes: 1 addition & 1 deletion api/service/legacysync/syncing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"time"

nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
peer "github.com/libp2p/go-libp2p-core/peer"
peer "github.com/libp2p/go-libp2p/core/peer"

"github.com/ethereum/go-ethereum/common"
"github.com/harmony-one/harmony/api/service/legacysync/downloader"
Expand Down
12 changes: 11 additions & 1 deletion api/service/stagedstreamsync/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ const (
BlockByHashesUpperCap int = 10 // number of get blocks by hashes upper cap
BlockByHashesLowerCap int = 3 // number of get blocks by hashes lower cap

LastMileBlocksThreshold int = 10
LastMileBlocksThreshold int = 10
SyncLoopBatchSize uint32 = 30 // maximum size for one query of block hashes
VerifyHeaderBatchSize uint64 = 100 // block chain header verification batch size (not used for now)
LastMileBlocksSize = 50

// SoftQueueCap is the soft cap of size in resultQueue. When the queue size is larger than this limit,
// no more request will be assigned to workers to wait for InsertChain to finish.
Expand Down Expand Up @@ -50,8 +53,15 @@ type (
// config for beacon config
BHConfig *BeaconHelperConfig

// use memory db
UseMemDB bool

// log the stage progress
LogProgress bool

// logs every single process and error to help debugging stream sync
// DebugMode is not accessible to the end user and is only an aid for development
DebugMode bool
}

// BeaconHelperConfig is the extra config used for beaconHelper which uses
Expand Down
10 changes: 10 additions & 0 deletions api/service/stagedstreamsync/default_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ var DefaultForwardOrder = ForwardOrder{
BlockBodies,
// Stages below don't use Internet
States,
LastMile,
Finish,
}

var DefaultRevertOrder = RevertOrder{
Finish,
LastMile,
States,
BlockBodies,
ShortRange,
Expand All @@ -29,6 +31,7 @@ var DefaultRevertOrder = RevertOrder{

var DefaultCleanUpOrder = CleanUpOrder{
Finish,
LastMile,
States,
BlockBodies,
ShortRange,
Expand All @@ -42,6 +45,7 @@ func DefaultStages(ctx context.Context,
srCfg StageShortRangeCfg,
bodiesCfg StageBodiesCfg,
statesCfg StageStatesCfg,
lastMileCfg StageLastMileCfg,
finishCfg StageFinishCfg,
) []*Stage {

Expand All @@ -50,6 +54,7 @@ func DefaultStages(ctx context.Context,
handlerStageEpochSync := NewStageEpoch(seCfg)
handlerStageBodies := NewStageBodies(bodiesCfg)
handlerStageStates := NewStageStates(statesCfg)
handlerStageLastMile := NewStageLastMile(lastMileCfg)
handlerStageFinish := NewStageFinish(finishCfg)

return []*Stage{
Expand Down Expand Up @@ -78,6 +83,11 @@ func DefaultStages(ctx context.Context,
Description: "Update Blockchain State",
Handler: handlerStageStates,
},
{
ID: LastMile,
Description: "update status for blocks after sync and update last mile blocks as well",
Handler: handlerStageLastMile,
},
{
ID: Finish,
Description: "Finalize Changes",
Expand Down
39 changes: 27 additions & 12 deletions api/service/stagedstreamsync/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/ethereum/go-ethereum/event"
"github.com/rs/zerolog"

"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/core"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/utils"
Expand Down Expand Up @@ -37,7 +38,7 @@ type (
)

// NewDownloader creates a new downloader
func NewDownloader(host p2p.Host, bc core.BlockChain, dbDir string, isBeaconNode bool, config Config) *Downloader {
func NewDownloader(host p2p.Host, bc core.BlockChain, consensus *consensus.Consensus, dbDir string, isBeaconNode bool, config Config) *Downloader {
config.fixValues()

sp := sync.NewProtocol(sync.Config{
Expand Down Expand Up @@ -67,8 +68,8 @@ func NewDownloader(host p2p.Host, bc core.BlockChain, dbDir string, isBeaconNode

ctx, cancel := context.WithCancel(context.Background())

//TODO: use mem db should be in config file
stagedSyncInstance, err := CreateStagedSync(ctx, bc, dbDir, false, isBeaconNode, sp, config, logger, config.LogProgress)
// create an instance of staged sync for the downloader
stagedSyncInstance, err := CreateStagedSync(ctx, bc, consensus, dbDir, isBeaconNode, sp, config, logger)
if err != nil {
cancel()
return nil
Expand Down Expand Up @@ -189,6 +190,7 @@ func (d *Downloader) waitForBootFinish() {
func (d *Downloader) loop() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()

// for shard chain and beacon chain node, first we start with initSync=true to
// make sure it goes through the long range sync first.
// for epoch chain we do only need to go through epoch sync process
Expand All @@ -208,21 +210,23 @@ func (d *Downloader) loop() {
go trigger()

case <-d.downloadC:
addedBN, err := d.stagedSyncInstance.doSync(d.ctx, initSync)
bnBeforeSync := d.bc.CurrentBlock().NumberU64()
estimatedHeight, addedBN, err := d.stagedSyncInstance.doSync(d.ctx, initSync)
if err != nil {
//TODO: if there is a bad block which can't be resolved
if d.stagedSyncInstance.invalidBlock.Active {
numTriedStreams := len(d.stagedSyncInstance.invalidBlock.StreamID)
// if many streams couldn't solve it, then that's an unresolvable bad block
if numTriedStreams >= d.config.InitStreams {
if !d.stagedSyncInstance.invalidBlock.IsLogged {
fmt.Println("unresolvable bad block:", d.stagedSyncInstance.invalidBlock.Number)
d.logger.Error().
Uint64("bad block number", d.stagedSyncInstance.invalidBlock.Number).
Msg(WrapStagedSyncMsg("unresolvable bad block"))
d.stagedSyncInstance.invalidBlock.IsLogged = true
}
//TODO: if we don't have any new or untried stream in the list, sleep or panic
}
}

// If any error happens, sleep 5 seconds and retry
d.logger.Error().
Err(err).
Expand All @@ -242,16 +246,27 @@ func (d *Downloader) loop() {
Uint32("shard", d.bc.ShardID()).
Msg(WrapStagedSyncMsg("sync finished"))
}

// If block number has been changed, trigger another sync
if addedBN != 0 {
// If block number has been changed, trigger another sync
go trigger()
// try to add last mile from pub-sub (blocking)
if d.bh != nil {
d.bh.insertSync()
}
}
// try to add last mile from pub-sub (blocking)
if d.bh != nil {
d.bh.insertSync()
// if last doSync needed only to add a few blocks less than LastMileBlocksThreshold and
// the node is fully synced now, then switch to short range
// the reason why we need to check distanceBeforeSync is because, if it was long distance,
// very likely, there are a couple of new blocks have been added to the other nodes which
// we should still stay in long range and check them.
bnAfterSync := d.bc.CurrentBlock().NumberU64()
distanceBeforeSync := estimatedHeight - bnBeforeSync
distanceAfterSync := estimatedHeight - bnAfterSync
if estimatedHeight > 0 && addedBN > 0 &&
distanceBeforeSync <= uint64(LastMileBlocksThreshold) &&
distanceAfterSync <= uint64(LastMileBlocksThreshold) {
initSync = false
}
initSync = false

case <-d.closeC:
return
Expand Down
5 changes: 3 additions & 2 deletions api/service/stagedstreamsync/downloaders.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package stagedstreamsync

import (
"github.com/harmony-one/abool"
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/p2p"
)
Expand All @@ -15,7 +16,7 @@ type Downloaders struct {
}

// NewDownloaders creates Downloaders for sync of multiple blockchains
func NewDownloaders(host p2p.Host, bcs []core.BlockChain, dbDir string, config Config) *Downloaders {
func NewDownloaders(host p2p.Host, bcs []core.BlockChain, consensus *consensus.Consensus, dbDir string, config Config) *Downloaders {
ds := make(map[uint32]*Downloader)
isBeaconNode := len(bcs) == 1
for _, bc := range bcs {
Expand All @@ -25,7 +26,7 @@ func NewDownloaders(host p2p.Host, bcs []core.BlockChain, dbDir string, config C
if _, ok := ds[bc.ShardID()]; ok {
continue
}
ds[bc.ShardID()] = NewDownloader(host, bc, dbDir, isBeaconNode, config)
ds[bc.ShardID()] = NewDownloader(host, bc, consensus, dbDir, isBeaconNode, config)
}
return &Downloaders{
ds: ds,
Expand Down
5 changes: 3 additions & 2 deletions api/service/stagedstreamsync/service.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package stagedstreamsync

import (
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/p2p"
)
Expand All @@ -11,9 +12,9 @@ type StagedStreamSyncService struct {
}

// NewService creates a new downloader service
func NewService(host p2p.Host, bcs []core.BlockChain, config Config, dbDir string) *StagedStreamSyncService {
func NewService(host p2p.Host, bcs []core.BlockChain, consensus *consensus.Consensus, config Config, dbDir string) *StagedStreamSyncService {
return &StagedStreamSyncService{
Downloaders: NewDownloaders(host, bcs, dbDir, config),
Downloaders: NewDownloaders(host, bcs, consensus, dbDir, config),
}
}

Expand Down
6 changes: 6 additions & 0 deletions api/service/stagedstreamsync/short_range_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,12 @@ func (sh *srHelper) removeStreams(sts []sttypes.StreamID) {
}
}

func (sh *srHelper) streamsFailed(sts []sttypes.StreamID, reason string) {
for _, st := range sts {
sh.syncProtocol.StreamFailed(st, reason)
}
}

// blameAllStreams only not to blame all whitelisted streams when the it's not the last block signature verification failed.
func (sh *srHelper) blameAllStreams(blocks types.Blocks, errIndex int, err error) bool {
if errors.As(err, &emptySigVerifyErr) && errIndex == len(blocks)-1 {
Expand Down
10 changes: 8 additions & 2 deletions api/service/stagedstreamsync/stage_bodies.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/utils"
sttypes "github.com/harmony-one/harmony/p2p/stream/types"
"github.com/harmony-one/harmony/shard"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -60,6 +61,11 @@ func (b *StageBodies) Exec(ctx context.Context, firstCycle bool, invalidBlockRev
return nil
}

// shouldn't execute for epoch chain
if b.configs.bc.ShardID() == shard.BeaconChainShardID && !s.state.isBeaconNode {
return nil
}

maxHeight := s.state.status.targetBN
currentHead := b.configs.bc.CurrentBlock().NumberU64()
if currentHead >= maxHeight {
Expand All @@ -77,7 +83,7 @@ func (b *StageBodies) Exec(ctx context.Context, firstCycle bool, invalidBlockRev
return errV
}

if currProgress == 0 {
if currProgress <= currentHead {
if err := b.cleanAllBlockDBs(ctx); err != nil {
return err
}
Expand Down Expand Up @@ -209,7 +215,7 @@ func (b *StageBodies) redownloadBadBlock(ctx context.Context, s *StageState) err
isOneOfTheBadStreams := false
for _, id := range s.state.invalidBlock.StreamID {
if id == stid {
b.configs.protocol.RemoveStream(stid)
b.configs.protocol.StreamFailed(stid, "re-download bad block from this stream failed")
isOneOfTheBadStreams = true
break
}
Expand Down
Loading