Skip to content

Commit

Permalink
Merge pull request #4546 from harmony-one/dev
Browse files Browse the repository at this point in the history
Release Candidate v2024.0.0 - HIP-32 code (no mainnet HF) + Improvements
  • Loading branch information
sophoah authored Mar 5, 2024
2 parents 2dee6c6 + 49bba17 commit b3aebb6
Show file tree
Hide file tree
Showing 118 changed files with 10,086 additions and 1,556 deletions.
4 changes: 2 additions & 2 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
This [github document](https://help.github.com/articles/creating-a-pull-request/) provides some guidance on how to create a pull request in github.

## PR requirement
To pursue engineering excellence, we have insisted on the highest stardard on the quality of each PR.
To pursue engineering excellence, we have insisted on the highest standard for the quality of each PR.

* For each PR, please run [golint](https://github.com/golang/lint), [goimports](https://godoc.org/golang.org/x/tools/cmd/goimports), to fix the basic issues/warnings.
* Make sure you understand [How to Write a Git Commit Message](https://chris.beams.io/posts/git-commit/).
Expand All @@ -21,7 +21,7 @@ To pursue engineering excellence, we have insisted on the highest stardard on th
The best practice is to reorder and squash your local commits before the PR submission to create an atomic and self-contained PR.
This [book chapter](https://git-scm.com/book/en/v2/Git-Tools-Rewriting-History) provides detailed explanation and guidance on how to rewrite the local git history.

For exampple, a typical workflow is like the following.
For example, a typical workflow is like the following.
```bash
# assuming you are working on a fix of bug1, and use a local branch called "fixes_of_bug1".

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -179,4 +179,4 @@ debug_external: clean
bash test/debug-external.sh

build_localnet_validator:
bash test/build-localnet-validator.sh
bash test/build-localnet-validator.sh
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ make debug-kill
To keep things consistent, we have a docker image to run all tests. **These are the same tests ran on the pull request checks**.
Note that all testing docker container binds a couple of ports to the host machine for your convince. The ports are:
Note that all test Docker containers bind several ports to the host machine for your convenience. The ports are:
* `9500` - Shard 0 RPC for a validator
* `9501` - Shard 1 RPC for a validator
* `9599` - Shard 0 RPC for an explorer
Expand All @@ -174,7 +174,7 @@ To run this test, do:
make test-rpc
```
This test starts a localnet (within the Docker container), **ensures it reaches a consensus**, and runs a series of tests to ensure correct RPC behavior.
This test also acts as a preliminary integration test (more through tests are done on the testnets).
This test also acts as a preliminary integration test (more thorough tests are done on the testnets).
> The tests ran by this command can be found [here](https://github.com/harmony-one/harmony-test/tree/master/localnet).
If you wish to debug further with the localnet after the tests are done, open a new shell and run:
Expand All @@ -194,7 +194,7 @@ To run this test, do:
make test-rosetta
```
This test starts a localnet (within the Docker container), **ensures it reaches a consensus**, and runs the Construction & Data API checks using the [rosetta-cli](https://github.com/coinbase/rosetta-cli).
This test also acts as a preliminary integration test (more through tests are done on the testnets).
This test also acts as a preliminary integration test (more thorough tests are done on the testnets).
> The config for this test can be found [here](https://github.com/harmony-one/harmony-test/blob/master/localnet/configs/localnet_rosetta_test_s0.json) & [here](https://github.com/harmony-one/harmony-test/blob/master/localnet/configs/localnet_rosetta_test_s1.json)
Similar to the RPC tests, if you wish to debug further with the localnet after the tests are done, open a new shell and run:
Expand Down
17 changes: 15 additions & 2 deletions api/service/legacysync/epoch_syncing.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ func syncLoop(bc core.BlockChain, syncConfig *SyncConfig) (timeout int) {

err := ProcessStateSync(syncConfig, heights, bc)
if err != nil {
if errors.Is(err, core.ErrKnownBlock) {
return 10
}
utils.Logger().Error().Err(err).
Msgf("[EPOCHSYNC] ProcessStateSync failed (isBeacon: %t, ShardID: %d, otherEpoch: %d, currentEpoch: %d)",
isBeacon, bc.ShardID(), otherEpoch, curEpoch)
Expand Down Expand Up @@ -199,8 +202,18 @@ func processWithPayload(payload [][]byte, bc core.BlockChain) error {
decoded = append(decoded, block)
}

_, err := bc.InsertChain(decoded, true)
return err
for _, block := range decoded {
_, err := bc.InsertChain([]*types.Block{block}, true)
switch {
case errors.Is(err, core.ErrKnownBlock):
continue
case err != nil:
return err
default:
}
}

return nil
}

// CreateSyncConfig creates SyncConfig for StateSync object.
Expand Down
33 changes: 26 additions & 7 deletions api/service/legacysync/syncing.go
Original file line number Diff line number Diff line change
Expand Up @@ -860,11 +860,12 @@ func (ss *StateSync) getBlockFromLastMileBlocksByParentHash(parentHash common.Ha
}

// UpdateBlockAndStatus ...
func (ss *StateSync) UpdateBlockAndStatus(block *types.Block, bc core.BlockChain, verifyAllSig bool) error {
func (ss *StateSync) UpdateBlockAndStatus(block *types.Block, bc core.BlockChain) error {
if block.NumberU64() != bc.CurrentBlock().NumberU64()+1 {
utils.Logger().Debug().Uint64("curBlockNum", bc.CurrentBlock().NumberU64()).Uint64("receivedBlockNum", block.NumberU64()).Msg("[SYNC] Inappropriate block number, ignore!")
return nil
}
verifyAllSig := true

haveCurrentSig := len(block.GetCurrentCommitSig()) != 0
// Verify block signatures
Expand Down Expand Up @@ -904,7 +905,17 @@ func (ss *StateSync) UpdateBlockAndStatus(block *types.Block, bc core.BlockChain
}

_, err := bc.InsertChain([]*types.Block{block}, false /* verifyHeaders */)
if err != nil {
switch {
case errors.Is(err, core.ErrKnownBlock):
utils.Logger().Info().
Uint64("blockHeight", block.NumberU64()).
Uint64("blockEpoch", block.Epoch().Uint64()).
Str("blockHex", block.Hash().Hex()).
Uint32("ShardID", block.ShardID()).
Err(err).
Msg("[SYNC] UpdateBlockAndStatus: Block exists")
return nil
case err != nil:
utils.Logger().Error().
Err(err).
Msgf(
Expand All @@ -913,6 +924,7 @@ func (ss *StateSync) UpdateBlockAndStatus(block *types.Block, bc core.BlockChain
block.ShardID(),
)
return err
default:
}
utils.Logger().Info().
Uint64("blockHeight", block.NumberU64()).
Expand Down Expand Up @@ -944,8 +956,8 @@ func (ss *StateSync) generateNewState(bc core.BlockChain) error {
break
}
// Enforce sig check for the last block in a batch
enforceSigCheck := !commonIter.HasNext()
err = ss.UpdateBlockAndStatus(block, bc, enforceSigCheck)
_ = !commonIter.HasNext()
err = ss.UpdateBlockAndStatus(block, bc)
if err != nil {
break
}
Expand All @@ -962,7 +974,7 @@ func (ss *StateSync) generateNewState(bc core.BlockChain) error {
if block == nil {
break
}
err = ss.UpdateBlockAndStatus(block, bc, true)
err = ss.UpdateBlockAndStatus(block, bc)
if err != nil {
break
}
Expand All @@ -983,7 +995,7 @@ func (ss *StateSync) generateNewState(bc core.BlockChain) error {
if block == nil {
break
}
err = ss.UpdateBlockAndStatus(block, bc, false)
err = ss.UpdateBlockAndStatus(block, bc)
if err != nil {
break
}
Expand Down Expand Up @@ -1111,6 +1123,9 @@ func (ss *StateSync) SyncLoop(bc core.BlockChain, isBeacon bool, consensus *cons
}
err := ss.ProcessStateSync(startHash[:], size, bc)
if err != nil {
if errors.Is(err, core.ErrKnownBlock) {
continue
}
utils.Logger().Error().Err(err).
Msgf("[SYNC] ProcessStateSync failed (isBeacon: %t, ShardID: %d, otherHeight: %d, currentHeight: %d)",
isBeacon, bc.ShardID(), otherHeight, currentHeight)
Expand Down Expand Up @@ -1148,7 +1163,11 @@ func (ss *StateSync) addConsensusLastMile(bc core.BlockChain, consensus *consens
if block == nil {
break
}
if _, err := bc.InsertChain(types.Blocks{block}, true); err != nil {
_, err := bc.InsertChain(types.Blocks{block}, true)
switch {
case errors.Is(err, core.ErrKnownBlock):
case errors.Is(err, core.ErrNotLastBlockInEpoch):
case err != nil:
return errors.Wrap(err, "failed to InsertChain")
}
}
Expand Down
5 changes: 5 additions & 0 deletions api/service/stagedstreamsync/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/p2p/stream/common/streammanager"
syncproto "github.com/harmony-one/harmony/p2p/stream/protocols/sync"
"github.com/harmony-one/harmony/p2p/stream/protocols/sync/message"
sttypes "github.com/harmony-one/harmony/p2p/stream/types"
)

Expand All @@ -20,6 +21,10 @@ type syncProtocol interface {
GetBlocksByHashes(ctx context.Context, hs []common.Hash, opts ...syncproto.Option) ([]*types.Block, sttypes.StreamID, error)
GetReceipts(ctx context.Context, hs []common.Hash, opts ...syncproto.Option) (receipts []types.Receipts, stid sttypes.StreamID, err error)
GetNodeData(ctx context.Context, hs []common.Hash, opts ...syncproto.Option) (data [][]byte, stid sttypes.StreamID, err error)
GetAccountRange(ctx context.Context, root common.Hash, origin common.Hash, limit common.Hash, bytes uint64, opts ...syncproto.Option) (accounts []*message.AccountData, proof [][]byte, stid sttypes.StreamID, err error)
GetStorageRanges(ctx context.Context, root common.Hash, accounts []common.Hash, origin common.Hash, limit common.Hash, bytes uint64, opts ...syncproto.Option) (slots [][]*message.StorageData, proof [][]byte, stid sttypes.StreamID, err error)
GetByteCodes(ctx context.Context, hs []common.Hash, bytes uint64, opts ...syncproto.Option) (codes [][]byte, stid sttypes.StreamID, err error)
GetTrieNodes(ctx context.Context, root common.Hash, paths []*message.TrieNodePathSet, bytes uint64, opts ...syncproto.Option) (nodes [][]byte, stid sttypes.StreamID, err error)

RemoveStream(stID sttypes.StreamID) // If a stream delivers invalid data, remove the stream
StreamFailed(stID sttypes.StreamID, reason string)
Expand Down
5 changes: 4 additions & 1 deletion api/service/stagedstreamsync/beacon_helper.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package stagedstreamsync

import (
"errors"
"time"

"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/utils"
"github.com/rs/zerolog"
Expand Down Expand Up @@ -126,7 +128,8 @@ func (bh *beaconHelper) insertLastMileBlocks() (inserted int, bn uint64, err err
}
// TODO: Instruct the beacon helper to verify signatures. This may require some forks
// in pub-sub message (add commit sigs in node.block.sync messages)
if _, err = bh.bc.InsertChain(types.Blocks{b}, true); err != nil {
_, err = bh.bc.InsertChain(types.Blocks{b}, true)
if err != nil && !errors.Is(err, core.ErrKnownBlock) {
bn--
return
}
Expand Down
56 changes: 38 additions & 18 deletions api/service/stagedstreamsync/block_manager.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package stagedstreamsync

import (
"fmt"
"sync"

"github.com/ethereum/go-ethereum/common"
sttypes "github.com/harmony-one/harmony/p2p/stream/types"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/rs/zerolog"
Expand All @@ -11,6 +13,7 @@ import (
type BlockDownloadDetails struct {
loopID int
streamID sttypes.StreamID
rootHash common.Hash
}

// blockDownloadManager is the helper structure for get blocks request management
Expand All @@ -19,11 +22,11 @@ type blockDownloadManager struct {
tx kv.RwTx

targetBN uint64
requesting map[uint64]struct{} // block numbers that have been assigned to workers but not received
processing map[uint64]struct{} // block numbers received requests but not inserted
retries *prioritizedNumbers // requests where error happens
rq *resultQueue // result queue wait to be inserted into blockchain
bdd map[uint64]BlockDownloadDetails // details about how this block was downloaded
requesting map[uint64]struct{} // block numbers that have been assigned to workers but not received
processing map[uint64]struct{} // block numbers received requests but not inserted
retries *prioritizedNumbers // requests where error happens
rq *resultQueue // result queue wait to be inserted into blockchain
bdd map[uint64]*BlockDownloadDetails // details about how this block was downloaded

logger zerolog.Logger
lock sync.Mutex
Expand All @@ -38,26 +41,26 @@ func newBlockDownloadManager(tx kv.RwTx, chain blockChain, targetBN uint64, logg
processing: make(map[uint64]struct{}),
retries: newPrioritizedNumbers(),
rq: newResultQueue(),
bdd: make(map[uint64]BlockDownloadDetails),
bdd: make(map[uint64]*BlockDownloadDetails),
logger: logger,
}
}

// GetNextBatch get the next block numbers batch
func (gbm *blockDownloadManager) GetNextBatch() []uint64 {
func (gbm *blockDownloadManager) GetNextBatch(curHeight uint64) []uint64 {
gbm.lock.Lock()
defer gbm.lock.Unlock()

cap := BlocksPerRequest

bns := gbm.getBatchFromRetries(cap)
bns := gbm.getBatchFromRetries(cap, curHeight)
if len(bns) > 0 {
cap -= len(bns)
gbm.addBatchToRequesting(bns)
}

if gbm.availableForMoreTasks() {
addBNs := gbm.getBatchFromUnprocessed(cap)
addBNs := gbm.getBatchFromUnprocessed(cap, curHeight)
gbm.addBatchToRequesting(addBNs)
bns = append(bns, addBNs...)
}
Expand Down Expand Up @@ -88,7 +91,7 @@ func (gbm *blockDownloadManager) HandleRequestResult(bns []uint64, blockBytes []
gbm.retries.push(bn)
} else {
gbm.processing[bn] = struct{}{}
gbm.bdd[bn] = BlockDownloadDetails{
gbm.bdd[bn] = &BlockDownloadDetails{
loopID: loopID,
streamID: streamID,
}
Expand All @@ -107,7 +110,7 @@ func (gbm *blockDownloadManager) SetDownloadDetails(bns []uint64, loopID int, st
defer gbm.lock.Unlock()

for _, bn := range bns {
gbm.bdd[bn] = BlockDownloadDetails{
gbm.bdd[bn] = &BlockDownloadDetails{
loopID: loopID,
streamID: streamID,
}
Expand All @@ -116,25 +119,43 @@ func (gbm *blockDownloadManager) SetDownloadDetails(bns []uint64, loopID int, st
}

// GetDownloadDetails returns the download details for a block
func (gbm *blockDownloadManager) GetDownloadDetails(blockNumber uint64) (loopID int, streamID sttypes.StreamID) {
func (gbm *blockDownloadManager) GetDownloadDetails(blockNumber uint64) (loopID int, streamID sttypes.StreamID, err error) {
gbm.lock.Lock()
defer gbm.lock.Unlock()

return gbm.bdd[blockNumber].loopID, gbm.bdd[blockNumber].streamID
if dm, exist := gbm.bdd[blockNumber]; exist {
return dm.loopID, dm.streamID, nil
}
return 0, sttypes.StreamID(fmt.Sprint(0)), fmt.Errorf("there is no download details for the block number: %d", blockNumber)
}

// SetRootHash sets the root hash for a specific block
func (gbm *blockDownloadManager) SetRootHash(blockNumber uint64, root common.Hash) {
gbm.lock.Lock()
defer gbm.lock.Unlock()

gbm.bdd[blockNumber].rootHash = root
}

// GetRootHash returns the root hash for a specific block
func (gbm *blockDownloadManager) GetRootHash(blockNumber uint64) common.Hash {
gbm.lock.Lock()
defer gbm.lock.Unlock()

return gbm.bdd[blockNumber].rootHash
}

// getBatchFromRetries get the block number batch to be requested from retries.
func (gbm *blockDownloadManager) getBatchFromRetries(cap int) []uint64 {
func (gbm *blockDownloadManager) getBatchFromRetries(cap int, fromBlockNumber uint64) []uint64 {
var (
requestBNs []uint64
curHeight = gbm.chain.CurrentBlock().NumberU64()
)
for cnt := 0; cnt < cap; cnt++ {
bn := gbm.retries.pop()
if bn == 0 {
break // no more retries
}
if bn <= curHeight {
if bn <= fromBlockNumber {
continue
}
requestBNs = append(requestBNs, bn)
Expand All @@ -143,10 +164,9 @@ func (gbm *blockDownloadManager) getBatchFromRetries(cap int) []uint64 {
}

// getBatchFromUnprocessed returns a batch of block numbers to be requested from unprocessed.
func (gbm *blockDownloadManager) getBatchFromUnprocessed(cap int) []uint64 {
func (gbm *blockDownloadManager) getBatchFromUnprocessed(cap int, curHeight uint64) []uint64 {
var (
requestBNs []uint64
curHeight = gbm.chain.CurrentBlock().NumberU64()
)
bn := curHeight + 1
// TODO: this algorithm can be potentially optimized.
Expand Down
Loading

0 comments on commit b3aebb6

Please sign in to comment.