Skip to content

Commit

Permalink
Fixed panic with dsync. (#4562)
Browse files Browse the repository at this point in the history
  • Loading branch information
Frozen authored Nov 16, 2023
1 parent 8f774ea commit 582a4cf
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 110 deletions.
2 changes: 1 addition & 1 deletion cmd/harmony/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -862,7 +862,7 @@ func setupConsensusAndNode(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfi
currentNode.NodeConfig.ConsensusPriKey = nodeConfig.ConsensusPriKey

// This needs to be executed after consensus setup
if err := currentNode.InitConsensusWithValidators(); err != nil {
if err := currentConsensus.InitConsensusWithValidators(); err != nil {
utils.Logger().Warn().
Int("shardID", hc.General.ShardID).
Err(err).
Expand Down
85 changes: 79 additions & 6 deletions consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,20 @@ import (
"sync/atomic"
"time"

"github.com/harmony-one/harmony/consensus/engine"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/internal/registry"

"github.com/harmony-one/abool"
bls_core "github.com/harmony-one/bls/ffi/go/bls"
"github.com/harmony-one/harmony/consensus/engine"
"github.com/harmony-one/harmony/consensus/quorum"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/crypto/bls"
bls_cosi "github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/internal/registry"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/multibls"
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/shard"
"github.com/harmony-one/harmony/shard/committee"
"github.com/harmony-one/harmony/staking/slash"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -121,7 +122,9 @@ type Consensus struct {
// finalityCounter keep tracks of the finality time
finalityCounter atomic.Value //int64

dHelper *downloadHelper
dHelper interface {
DownloadAsync()
}

// Both flags only for initialization state.
start bool
Expand Down Expand Up @@ -177,6 +180,10 @@ func (consensus *Consensus) verifyBlock(block *types.Block) error {
// BlocksSynchronized lets the main loop know that block synchronization finished
// thus the blockchain is likely to be up to date.
func (consensus *Consensus) BlocksSynchronized() {
err := consensus.AddConsensusLastMile()
if err != nil {
consensus.GetLogger().Error().Err(err).Msg("add last mile failed")
}
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
consensus.syncReadyChan()
Expand Down Expand Up @@ -274,6 +281,7 @@ func New(
msgSender: NewMessageSender(host),
// FBFT timeout
consensusTimeout: createTimeout(),
dHelper: downloadAsync{},
}

if multiBLSPriKey != nil {
Expand Down Expand Up @@ -311,3 +319,68 @@ func (consensus *Consensus) GetHost() p2p.Host {
func (consensus *Consensus) Registry() *registry.Registry {
return consensus.registry
}

// InitConsensusWithValidators initialize shard state
// from latest epoch and update committee pub
// keys for consensus
func (consensus *Consensus) InitConsensusWithValidators() (err error) {
shardID := consensus.ShardID
currentBlock := consensus.Blockchain().CurrentBlock()
blockNum := currentBlock.NumberU64()
consensus.SetMode(Listening)
epoch := currentBlock.Epoch()
utils.Logger().Info().
Uint64("blockNum", blockNum).
Uint32("shardID", shardID).
Uint64("epoch", epoch.Uint64()).
Msg("[InitConsensusWithValidators] Try To Get PublicKeys")
shardState, err := committee.WithStakingEnabled.Compute(
epoch, consensus.Blockchain(),
)
if err != nil {
utils.Logger().Err(err).
Uint64("blockNum", blockNum).
Uint32("shardID", shardID).
Uint64("epoch", epoch.Uint64()).
Msg("[InitConsensusWithValidators] Failed getting shard state")
return err
}
subComm, err := shardState.FindCommitteeByID(shardID)
if err != nil {
utils.Logger().Err(err).
Interface("shardState", shardState).
Msg("[InitConsensusWithValidators] Find CommitteeByID")
return err
}
pubKeys, err := subComm.BLSPublicKeys()
if err != nil {
utils.Logger().Error().
Uint32("shardID", shardID).
Uint64("blockNum", blockNum).
Msg("[InitConsensusWithValidators] PublicKeys is Empty, Cannot update public keys")
return errors.Wrapf(
err,
"[InitConsensusWithValidators] PublicKeys is Empty, Cannot update public keys",
)
}

for _, key := range pubKeys {
if consensus.GetPublicKeys().Contains(key.Object) {
utils.Logger().Info().
Uint64("blockNum", blockNum).
Int("numPubKeys", len(pubKeys)).
Str("mode", consensus.Mode().String()).
Msg("[InitConsensusWithValidators] Successfully updated public keys")
consensus.UpdatePublicKeys(pubKeys, shard.Schedule.InstanceForEpoch(epoch).ExternalAllowlist())
consensus.SetMode(Normal)
return nil
}
}
return nil
}

type downloadAsync struct {
}

func (a downloadAsync) DownloadAsync() {
}
38 changes: 18 additions & 20 deletions consensus/consensus_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,8 @@ func (consensus *Consensus) BlockCommitSigs(blockNum uint64) ([]byte, error) {
return nil, nil
}
lastCommits, err := consensus.Blockchain().ReadCommitSig(blockNum)
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
if err != nil ||
len(lastCommits) < bls.BLSSignatureSizeInBytes {
msgs := consensus.FBFTLog().GetMessagesByTypeSeq(
Expand All @@ -300,30 +302,26 @@ func (consensus *Consensus) BlockCommitSigs(blockNum uint64) ([]byte, error) {
func (consensus *Consensus) Start(
stopChan chan struct{},
) {
consensus.GetLogger().Info().Time("time", time.Now()).Msg("[ConsensusMainLoop] Consensus started")
go func() {
consensus.getLogger().Info().Time("time", time.Now()).Msg("[ConsensusMainLoop] Consensus started")
go func() {
ticker := time.NewTicker(250 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-stopChan:
return
case <-ticker.C:
consensus.Tick()
}
ticker := time.NewTicker(250 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-stopChan:
return
case <-ticker.C:
consensus.Tick()
}
}()

consensus.mutex.Lock()
consensus.consensusTimeout[timeoutBootstrap].Start()
consensus.getLogger().Info().Msg("[ConsensusMainLoop] Start bootstrap timeout (only once)")
// Set up next block due time.
consensus.NextBlockDue = time.Now().Add(consensus.BlockPeriod)
consensus.mutex.Unlock()
}
}()

consensus.dHelper.start()
consensus.mutex.Lock()
consensus.consensusTimeout[timeoutBootstrap].Start()
consensus.getLogger().Info().Msg("[ConsensusMainLoop] Start bootstrap timeout (only once)")
// Set up next block due time.
consensus.NextBlockDue = time.Now().Add(consensus.BlockPeriod)
consensus.mutex.Unlock()
}

func (consensus *Consensus) StartChannel() {
Expand Down
29 changes: 13 additions & 16 deletions consensus/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ type downloader interface {
// Set downloader set the downloader of the shard to consensus
// TODO: It will be better to move this to consensus.New and register consensus as a service
func (consensus *Consensus) SetDownloader(d downloader) {
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
consensus.dHelper = newDownloadHelper(consensus, d)
}

type downloadHelper struct {
d downloader
c *Consensus

startedCh chan struct{}
finishedCh chan struct{}
Expand All @@ -41,46 +42,42 @@ func newDownloadHelper(c *Consensus, d downloader) *downloadHelper {
finishedSub := d.SubscribeDownloadFinished(finishedCh)

out := &downloadHelper{
c: c,
d: d,
startedCh: startedCh,
finishedCh: finishedCh,
startedSub: startedSub,
finishedSub: finishedSub,
}
go out.downloadStartedLoop()
go out.downloadFinishedLoop()
go out.downloadStartedLoop(c)
go out.downloadFinishedLoop(c)
return out
}

func (dh *downloadHelper) start() {
func (dh *downloadHelper) DownloadAsync() {
dh.d.DownloadAsync()
}

func (dh *downloadHelper) downloadStartedLoop() {
func (dh *downloadHelper) downloadStartedLoop(c *Consensus) {
for {
select {
case <-dh.startedCh:
dh.c.BlocksNotSynchronized()
c.BlocksNotSynchronized()

case err := <-dh.startedSub.Err():
dh.c.getLogger().Info().Err(err).Msg("consensus download finished loop closed")
c.GetLogger().Info().Err(err).Msg("consensus download finished loop closed")
return
}
}
}

func (dh *downloadHelper) downloadFinishedLoop() {
func (dh *downloadHelper) downloadFinishedLoop(c *Consensus) {
for {
select {
case <-dh.finishedCh:
err := dh.c.AddConsensusLastMile()
if err != nil {
dh.c.getLogger().Error().Err(err).Msg("add last mile failed")
}
dh.c.BlocksSynchronized()
c.BlocksSynchronized()

case err := <-dh.finishedSub.Err():
dh.c.getLogger().Info().Err(err).Msg("consensus download finished loop closed")
c.GetLogger().Info().Err(err).Msg("consensus download finished loop closed")
return
}
}
Expand All @@ -104,7 +101,7 @@ func (consensus *Consensus) AddConsensusLastMile() error {
}

func (consensus *Consensus) spinUpStateSync() {
consensus.dHelper.d.DownloadAsync()
consensus.dHelper.DownloadAsync()
consensus.current.SetMode(Syncing)
for _, v := range consensus.consensusTimeout {
v.Stop()
Expand Down
67 changes: 0 additions & 67 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ import (
"github.com/harmony-one/harmony/node/worker"
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/shard"
"github.com/harmony-one/harmony/shard/committee"
"github.com/harmony-one/harmony/staking/reward"
"github.com/harmony-one/harmony/staking/slash"
staking "github.com/harmony-one/harmony/staking/types"
Expand Down Expand Up @@ -1196,72 +1195,6 @@ func (node *Node) updateInitialRewardValues() {
reward.SetTotalInitialTokens(initTotal)
}

// InitConsensusWithValidators initialize shard state
// from latest epoch and update committee pub
// keys for consensus
func (node *Node) InitConsensusWithValidators() (err error) {
if node.Consensus == nil {
utils.Logger().Error().
Msg("[InitConsensusWithValidators] consenus is nil; Cannot figure out shardID")
return errors.New(
"[InitConsensusWithValidators] consenus is nil; Cannot figure out shardID",
)
}
shardID := node.Consensus.ShardID
currentBlock := node.Blockchain().CurrentBlock()
blockNum := currentBlock.NumberU64()
node.Consensus.SetMode(consensus.Listening)
epoch := currentBlock.Epoch()
utils.Logger().Info().
Uint64("blockNum", blockNum).
Uint32("shardID", shardID).
Uint64("epoch", epoch.Uint64()).
Msg("[InitConsensusWithValidators] Try To Get PublicKeys")
shardState, err := committee.WithStakingEnabled.Compute(
epoch, node.Consensus.Blockchain(),
)
if err != nil {
utils.Logger().Err(err).
Uint64("blockNum", blockNum).
Uint32("shardID", shardID).
Uint64("epoch", epoch.Uint64()).
Msg("[InitConsensusWithValidators] Failed getting shard state")
return err
}
subComm, err := shardState.FindCommitteeByID(shardID)
if err != nil {
utils.Logger().Err(err).
Interface("shardState", shardState).
Msg("[InitConsensusWithValidators] Find CommitteeByID")
return err
}
pubKeys, err := subComm.BLSPublicKeys()
if err != nil {
utils.Logger().Error().
Uint32("shardID", shardID).
Uint64("blockNum", blockNum).
Msg("[InitConsensusWithValidators] PublicKeys is Empty, Cannot update public keys")
return errors.Wrapf(
err,
"[InitConsensusWithValidators] PublicKeys is Empty, Cannot update public keys",
)
}

for _, key := range pubKeys {
if node.Consensus.GetPublicKeys().Contains(key.Object) {
utils.Logger().Info().
Uint64("blockNum", blockNum).
Int("numPubKeys", len(pubKeys)).
Str("mode", node.Consensus.Mode().String()).
Msg("[InitConsensusWithValidators] Successfully updated public keys")
node.Consensus.UpdatePublicKeys(pubKeys, shard.Schedule.InstanceForEpoch(epoch).ExternalAllowlist())
node.Consensus.SetMode(consensus.Normal)
return nil
}
}
return nil
}

func (node *Node) initNodeConfiguration() (service.NodeConfig, chan p2p.Peer, error) {
chanPeer := make(chan p2p.Peer)
nodeConfig := service.NodeConfig{
Expand Down

0 comments on commit 582a4cf

Please sign in to comment.