Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements for revert command. #4520

Merged
merged 1 commit into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 9 additions & 18 deletions api/service/stagedsync/sync_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ const (
downloadBlocksRetryLimit = 3 // downloadBlocks service retry limit
RegistrationNumber = 3
SyncingPortDifference = 3000
inSyncThreshold = 0 // when peerBlockHeight - myBlockHeight <= inSyncThreshold, it's ready to join consensus
SyncLoopBatchSize uint32 = 30 // maximum size for one query of block hashes
verifyHeaderBatchSize uint64 = 100 // block chain header verification batch size (not used for now)
inSyncThreshold = 0 // when peerBlockHeight - myBlockHeight <= inSyncThreshold, it's ready to join consensus
SyncLoopBatchSize uint32 = 30 // maximum size for one query of block hashes
LastMileBlocksSize = 50

// after cutting off a number of connected peers, the result number of peers
Expand Down Expand Up @@ -53,14 +52,6 @@ type SyncPeerConfig struct {
failedTimes uint64
}

// CreateTestSyncPeerConfig used for testing.
func CreateTestSyncPeerConfig(client *downloader.Client, blockHashes [][]byte) *SyncPeerConfig {
return &SyncPeerConfig{
client: client,
blockHashes: blockHashes,
}
}

// GetClient returns client pointer of downloader.Client
func (peerConfig *SyncPeerConfig) GetClient() *downloader.Client {
return peerConfig.client
Expand Down Expand Up @@ -303,21 +294,21 @@ func (sc *SyncConfig) FindPeerByHash(peerHash []byte) *SyncPeerConfig {
// getHowManyMaxConsensus returns max number of consensus nodes and the first ID of consensus group.
// Assumption: all peers are sorted by CompareSyncPeerConfigByBlockHashes first.
// Caller shall ensure mtx is locked for reading.
func (sc *SyncConfig) getHowManyMaxConsensus() (int, int) {
func getHowManyMaxConsensus(peers []*SyncPeerConfig) (int, int) {
// As all peers are sorted by their blockHashes, all equal blockHashes should come together and consecutively.
if len(sc.peers) == 0 {
if len(peers) == 0 {
return -1, 0
} else if len(sc.peers) == 1 {
} else if len(peers) == 1 {
return 0, 1
}
maxFirstID := len(sc.peers) - 1
maxFirstID := len(peers) - 1
for i := maxFirstID - 1; i >= 0; i-- {
if CompareSyncPeerConfigByblockHashes(sc.peers[maxFirstID], sc.peers[i]) != 0 {
if CompareSyncPeerConfigByblockHashes(peers[maxFirstID], peers[i]) != 0 {
break
}
maxFirstID = i
}
maxCount := len(sc.peers) - maxFirstID
maxCount := len(peers) - maxFirstID
return maxFirstID, maxCount
}

Expand Down Expand Up @@ -386,7 +377,7 @@ func (sc *SyncConfig) GetBlockHashesConsensusAndCleanUp(bgMode bool) error {
sort.Slice(sc.peers, func(i, j int) bool {
return CompareSyncPeerConfigByblockHashes(sc.peers[i], sc.peers[j]) == -1
})
maxFirstID, maxCount := sc.getHowManyMaxConsensus()
maxFirstID, maxCount := getHowManyMaxConsensus(sc.peers)
if maxFirstID == -1 {
return errors.New("invalid peer index -1 for block hashes query")
}
Expand Down
109 changes: 61 additions & 48 deletions cmd/harmony/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,29 @@ func setupNodeLog(config harmonyconfig.HarmonyConfig) {
}
}

func revert(chain core.BlockChain, hc harmonyconfig.HarmonyConfig) {
curNum := chain.CurrentBlock().NumberU64()
if curNum < uint64(hc.Revert.RevertBefore) && curNum >= uint64(hc.Revert.RevertTo) {
// Remove invalid blocks
for chain.CurrentBlock().NumberU64() >= uint64(hc.Revert.RevertTo) {
curBlock := chain.CurrentBlock()
rollbacks := []ethCommon.Hash{curBlock.Hash()}
if err := chain.Rollback(rollbacks); err != nil {
fmt.Printf("Revert failed: %v\n", err)
os.Exit(1)
}
lastSig := curBlock.Header().LastCommitSignature()
sigAndBitMap := append(lastSig[:], curBlock.Header().LastCommitBitmap()...)
chain.WriteCommitSig(curBlock.NumberU64()-1, sigAndBitMap)
}
fmt.Printf("Revert finished. Current block: %v\n", chain.CurrentBlock().NumberU64())
utils.Logger().Warn().
Uint64("Current Block", chain.CurrentBlock().NumberU64()).
Msg("Revert finished.")
os.Exit(1)
}
}

func setupNodeAndRun(hc harmonyconfig.HarmonyConfig) {
var err error

Expand Down Expand Up @@ -353,26 +376,7 @@ func setupNodeAndRun(hc harmonyconfig.HarmonyConfig) {
if hc.Revert.RevertBeacon {
chain = currentNode.Beaconchain()
}
curNum := chain.CurrentBlock().NumberU64()
if curNum < uint64(hc.Revert.RevertBefore) && curNum >= uint64(hc.Revert.RevertTo) {
// Remove invalid blocks
for chain.CurrentBlock().NumberU64() >= uint64(hc.Revert.RevertTo) {
curBlock := chain.CurrentBlock()
rollbacks := []ethCommon.Hash{curBlock.Hash()}
if err := chain.Rollback(rollbacks); err != nil {
fmt.Printf("Revert failed: %v\n", err)
os.Exit(1)
}
lastSig := curBlock.Header().LastCommitSignature()
sigAndBitMap := append(lastSig[:], curBlock.Header().LastCommitBitmap()...)
chain.WriteCommitSig(curBlock.NumberU64()-1, sigAndBitMap)
}
fmt.Printf("Revert finished. Current block: %v\n", chain.CurrentBlock().NumberU64())
utils.Logger().Warn().
Uint64("Current Block", chain.CurrentBlock().NumberU64()).
Msg("Revert finished.")
os.Exit(1)
}
revert(chain, hc)
}

startMsg := "==== New Harmony Node ===="
Expand Down Expand Up @@ -671,31 +675,7 @@ func createGlobalConfig(hc harmonyconfig.HarmonyConfig) (*nodeconfig.ConfigType,
return nodeConfig, nil
}

func setupConsensusAndNode(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfig.ConfigType, registry *registry.Registry) *node.Node {
// Parse minPeers from harmonyconfig.HarmonyConfig
var minPeers int
var aggregateSig bool
if hc.Consensus != nil {
minPeers = hc.Consensus.MinPeers
aggregateSig = hc.Consensus.AggregateSig
} else {
minPeers = defaultConsensusConfig.MinPeers
aggregateSig = defaultConsensusConfig.AggregateSig
}

blacklist, err := setupBlacklist(hc)
if err != nil {
utils.Logger().Warn().Msgf("Blacklist setup error: %s", err.Error())
}
allowedTxs, err := setupAllowedTxs(hc)
if err != nil {
utils.Logger().Warn().Msgf("AllowedTxs setup error: %s", err.Error())
}

localAccounts, err := setupLocalAccounts(hc, blacklist)
if err != nil {
utils.Logger().Warn().Msgf("local accounts setup error: %s", err.Error())
}
func setupChain(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfig.ConfigType, registry *registry.Registry) *registry.Registry {

// Current node.
var chainDBFactory shardchain.DBFactory
Expand All @@ -714,6 +694,7 @@ func setupConsensusAndNode(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfi
}

engine := chain.NewEngine()
registry.SetEngine(engine)

chainConfig := nodeConfig.GetNetworkType().ChainConfig()
collection := shardchain.NewCollection(
Expand All @@ -724,6 +705,7 @@ func setupConsensusAndNode(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfi
collection.DisableCache(shardID)
}
}
registry.SetShardChainCollection(collection)

var blockchain core.BlockChain

Expand All @@ -737,17 +719,48 @@ func setupConsensusAndNode(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfi
registry.SetBeaconchain(beacon)
}

blockchain, err = collection.ShardChain(nodeConfig.ShardID)
blockchain, err := collection.ShardChain(nodeConfig.ShardID)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "Error :%v \n", err)
os.Exit(1)
}
registry.SetBlockchain(blockchain)
registry.SetWebHooks(nodeConfig.WebHooks.Hooks)
if registry.GetBeaconchain() == nil {
registry.SetBeaconchain(registry.GetBlockchain())
}
return registry
}

func setupConsensusAndNode(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfig.ConfigType, registry *registry.Registry) *node.Node {
// Parse minPeers from harmonyconfig.HarmonyConfig
var minPeers int
var aggregateSig bool
if hc.Consensus != nil {
minPeers = hc.Consensus.MinPeers
aggregateSig = hc.Consensus.AggregateSig
} else {
minPeers = defaultConsensusConfig.MinPeers
aggregateSig = defaultConsensusConfig.AggregateSig
}

blacklist, err := setupBlacklist(hc)
if err != nil {
utils.Logger().Warn().Msgf("Blacklist setup error: %s", err.Error())
}
allowedTxs, err := setupAllowedTxs(hc)
if err != nil {
utils.Logger().Warn().Msgf("AllowedTxs setup error: %s", err.Error())
}
localAccounts, err := setupLocalAccounts(hc, blacklist)
if err != nil {
utils.Logger().Warn().Msgf("local accounts setup error: %s", err.Error())
}

registry = setupChain(hc, nodeConfig, registry)
if registry.GetShardChainCollection() == nil {
panic("shard chain collection is nil1111111")
}
registry.SetWebHooks(nodeConfig.WebHooks.Hooks)
cxPool := core.NewCxPool(core.CxPoolSize)
registry.SetCxPool(cxPool)

Expand All @@ -762,7 +775,7 @@ func setupConsensusAndNode(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfi
os.Exit(1)
}

currentNode := node.New(myHost, currentConsensus, engine, collection, blacklist, allowedTxs, localAccounts, nodeConfig.ArchiveModes(), &hc, registry)
currentNode := node.New(myHost, currentConsensus, blacklist, allowedTxs, localAccounts, &hc, registry)

if hc.Legacy != nil && hc.Legacy.TPBroadcastInvalidTxn != nil {
currentNode.BroadcastInvalidTx = *hc.Legacy.TPBroadcastInvalidTxn
Expand Down
7 changes: 5 additions & 2 deletions core_test/shardchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@ func TestAddNewBlock(t *testing.T) {
if err != nil {
t.Fatal("cannot get blockchain")
}
reg := registry.New().SetBlockchain(blockchain)
reg := registry.New().
SetBlockchain(blockchain).
SetEngine(engine).
SetShardChainCollection(collection)
consensus, err := consensus.New(
host, shard.BeaconChainShardID, multibls.GetPrivateKeys(blsKey), reg, decider, 3, false,
)
Expand All @@ -57,7 +60,7 @@ func TestAddNewBlock(t *testing.T) {
}
nodeconfig.SetNetworkType(nodeconfig.Testnet)
var block *types.Block
node := node.New(host, consensus, engine, collection, nil, nil, nil, nil, nil, reg)
node := node.New(host, consensus, nil, nil, nil, nil, reg)
commitSigs := make(chan []byte, 1)
commitSigs <- []byte{}
block, err = node.Worker.FinalizeNewBlock(
Expand Down
38 changes: 38 additions & 0 deletions internal/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package registry
import (
"sync"

"github.com/harmony-one/harmony/consensus/engine"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/internal/shardchain"
"github.com/harmony-one/harmony/webhooks"
)

Expand All @@ -16,6 +18,8 @@ type Registry struct {
txPool *core.TxPool
cxPool *core.CxPool
isBackup bool
engine engine.Engine
collection *shardchain.CollectionImpl
}

// New creates a new registry.
Expand Down Expand Up @@ -122,3 +126,37 @@ func (r *Registry) GetCxPool() *core.CxPool {

return r.cxPool
}

// SetEngine sets the engine to registry.
func (r *Registry) SetEngine(engine engine.Engine) *Registry {
r.mu.Lock()
defer r.mu.Unlock()

r.engine = engine
return r
}

// GetEngine gets the engine from registry.
func (r *Registry) GetEngine() engine.Engine {
r.mu.Lock()
defer r.mu.Unlock()

return r.engine
}

// SetShardChainCollection sets the shard chain collection to registry.
func (r *Registry) SetShardChainCollection(collection *shardchain.CollectionImpl) *Registry {
r.mu.Lock()
defer r.mu.Unlock()

r.collection = collection
return r
}

// GetShardChainCollection gets the shard chain collection from registry.
func (r *Registry) GetShardChainCollection() *shardchain.CollectionImpl {
r.mu.Lock()
defer r.mu.Unlock()

return r.collection
}
18 changes: 7 additions & 11 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"sync"
"time"

"github.com/harmony-one/harmony/consensus/engine"
"github.com/harmony-one/harmony/internal/registry"
"github.com/harmony-one/harmony/internal/shardchain/tikv_manage"
"github.com/harmony-one/harmony/internal/tikv"
Expand Down Expand Up @@ -50,7 +49,6 @@ import (
common2 "github.com/harmony-one/harmony/internal/common"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/params"
"github.com/harmony-one/harmony/internal/shardchain"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/node/worker"
"github.com/harmony-one/harmony/p2p"
Expand Down Expand Up @@ -105,8 +103,7 @@ type Node struct {
pendingCXReceipts map[string]*types.CXReceiptsProof // All the receipts received but not yet processed for Consensus
pendingCXMutex sync.Mutex
crosslinks *crosslinks.Crosslinks // Memory storage for crosslink processing.
// Shard databases
shardChains shardchain.Collection

SelfPeer p2p.Peer
stateMutex sync.Mutex // mutex for change node state
TxPool *core.TxPool
Expand Down Expand Up @@ -194,7 +191,10 @@ func (node *Node) Beaconchain() core.BlockChain {
}

func (node *Node) chain(shardID uint32, options core.Options) core.BlockChain {
bc, err := node.shardChains.ShardChain(shardID, options)
if node.registry.GetShardChainCollection() == nil {
panic("shard chain collection is nil")
}
bc, err := node.registry.GetShardChainCollection().ShardChain(shardID, options)
if err != nil {
utils.Logger().Error().Err(err).Msg("cannot get beaconchain")
}
Expand Down Expand Up @@ -1009,12 +1009,9 @@ func (node *Node) GetSyncID() [SyncIDLength]byte {
func New(
host p2p.Host,
consensusObj *consensus.Consensus,
engine engine.Engine,
collection *shardchain.CollectionImpl,
blacklist map[common.Address]struct{},
allowedTxs map[common.Address]core.AllowedTxData,
localAccounts []common.Address,
isArchival map[uint32]bool,
harmonyconfig *harmonyconfig.HarmonyConfig,
registry *registry.Registry,
) *Node {
Expand All @@ -1041,7 +1038,6 @@ func New(
networkType := node.NodeConfig.GetNetworkType()
chainConfig := networkType.ChainConfig()
node.chainConfig = chainConfig
node.shardChains = collection
node.IsSynchronized = abool.NewBool(false)

if host != nil {
Expand All @@ -1064,9 +1060,9 @@ func New(
if b2 {
shardID := node.NodeConfig.ShardID
// HACK get the real error reason
_, err = node.shardChains.ShardChain(shardID)
_, err = node.registry.GetShardChainCollection().ShardChain(shardID)
} else {
_, err = node.shardChains.ShardChain(shard.BeaconChainShardID)
_, err = node.registry.GetShardChainCollection().ShardChain(shard.BeaconChainShardID)
}
fmt.Fprintf(os.Stderr, "Cannot initialize node: %v\n", err)
os.Exit(-1)
Expand Down
Loading