Skip to content
This repository has been archived by the owner on May 11, 2024. It is now read-only.

Commit

Permalink
revert change
Browse files Browse the repository at this point in the history
  • Loading branch information
mask-pp committed Apr 1, 2024
1 parent f1ac519 commit f18952e
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 48 deletions.
92 changes: 60 additions & 32 deletions driver/chain_syncer/beaconsync/progress_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ package beaconsync

import (
"context"
"sync/atomic"
"math/big"
"sync"
"time"

"github.com/ethereum/go-ethereum"
Expand All @@ -24,16 +25,19 @@ type SyncProgressTracker struct {
client *rpc.EthClient

// Meta data
triggered atomic.Bool
lastSyncedVerifiedBlockID atomic.Uint64
lastSyncedVerifiedBlockHash atomic.Value
triggered bool
lastSyncedVerifiedBlockID *big.Int
lastSyncedVerifiedBlockHash common.Hash

// Out-of-sync check related
lastSyncProgress *ethereum.SyncProgress
lastProgressedTime time.Time
timeout time.Duration
outOfSync atomic.Bool
outOfSync bool
ticker *time.Ticker

// Read-write mutex
mutex sync.RWMutex
}

// NewSyncProgressTracker creates a new SyncProgressTracker instance.
Expand All @@ -56,12 +60,15 @@ func (t *SyncProgressTracker) Track(ctx context.Context) {
// track is the internal implementation of MonitorSyncProgress, tries to
// track the L2 execution engine's beacon sync progress.
func (t *SyncProgressTracker) track(ctx context.Context) {
if !t.triggered.Load() {
t.mutex.Lock()
defer t.mutex.Unlock()

if !t.triggered {
log.Debug("Beacon sync not triggered")
return
}

if t.outOfSync.Load() {
if t.outOfSync {
return
}

Expand All @@ -87,12 +94,12 @@ func (t *SyncProgressTracker) track(ctx context.Context) {
return
}

if headHeight >= t.lastSyncedVerifiedBlockID.Load() {
if new(big.Int).SetUint64(headHeight).Cmp(t.lastSyncedVerifiedBlockID) >= 0 {
t.lastProgressedTime = time.Now()
log.Info(
"L2 execution engine has finished the P2P sync work, all verified blocks synced, "+
"will switch to insert pending blocks one by one",
"lastSyncedVerifiedBlockID", t.lastSyncedVerifiedBlockID.Load(),
"lastSyncedVerifiedBlockID", t.lastSyncedVerifiedBlockID,
"lastSyncedVerifiedBlockHash", t.lastSyncedVerifiedBlockHash,
)
return
Expand All @@ -105,15 +112,15 @@ func (t *SyncProgressTracker) track(ctx context.Context) {

// Check whether the L2 execution engine has synced any new block through P2P since last event loop.
if syncProgressed(t.lastSyncProgress, progress) {
t.outOfSync.Store(false)
t.outOfSync = false
t.lastProgressedTime = time.Now()
return
}

// Has not synced any new block since last loop, check whether reaching the timeout.
if time.Since(t.lastProgressedTime) > t.timeout {
// Mark the L2 execution engine out of sync.
t.outOfSync.Store(true)
t.outOfSync = true

log.Warn(
"L2 execution engine is not able to sync through P2P",
Expand All @@ -124,59 +131,80 @@ func (t *SyncProgressTracker) track(ctx context.Context) {
}

// UpdateMeta updates the inner beacon sync metadata.
func (t *SyncProgressTracker) UpdateMeta(id uint64, blockHash common.Hash) {
func (t *SyncProgressTracker) UpdateMeta(id *big.Int, blockHash common.Hash) {
t.mutex.Lock()
defer t.mutex.Unlock()

log.Debug("Update sync progress tracker meta", "id", id, "hash", blockHash)

if !t.triggered.Load() {
if !t.triggered {
t.lastProgressedTime = time.Now()
}

t.triggered.Store(true)
t.lastSyncedVerifiedBlockID.Store(id)
t.lastSyncedVerifiedBlockHash.Store(blockHash)
t.triggered = true
t.lastSyncedVerifiedBlockID = id
t.lastSyncedVerifiedBlockHash = blockHash
}

// ClearMeta cleans the inner beacon sync metadata.
func (t *SyncProgressTracker) ClearMeta() {
t.mutex.Lock()
defer t.mutex.Unlock()

log.Debug("Clear sync progress tracker meta")

t.triggered.Store(false)
t.lastSyncedVerifiedBlockID.Store(0)
t.lastSyncedVerifiedBlockHash.Store(common.Hash{})
t.outOfSync.Store(false)
t.triggered = false
t.lastSyncedVerifiedBlockID = nil
t.lastSyncedVerifiedBlockHash = common.Hash{}
t.outOfSync = false
}

// HeadChanged checks if a new beacon sync request will be needed.
func (t *SyncProgressTracker) HeadChanged(newID uint64) bool {
if !t.triggered.Load() {
func (t *SyncProgressTracker) HeadChanged(newID *big.Int) bool {
t.mutex.RLock()
defer t.mutex.RUnlock()

if !t.triggered {
return true
}

return t.lastSyncedVerifiedBlockID.Load() != newID
return t.lastSyncedVerifiedBlockID != nil && t.lastSyncedVerifiedBlockID != newID
}

// OutOfSync tells whether the L2 execution engine is marked as out of sync.
func (t *SyncProgressTracker) OutOfSync() bool {
return t.outOfSync.Load()
t.mutex.RLock()
defer t.mutex.RUnlock()

return t.outOfSync
}

// Triggered returns tracker.triggered.
func (t *SyncProgressTracker) Triggered() bool {
return t.triggered.Load()
t.mutex.RLock()
defer t.mutex.RUnlock()

return t.triggered
}

// LastSyncedVerifiedBlockID returns tracker.lastSyncedVerifiedBlockID.
func (t *SyncProgressTracker) LastSyncedVerifiedBlockID() uint64 {
return t.lastSyncedVerifiedBlockID.Load()
func (t *SyncProgressTracker) LastSyncedVerifiedBlockID() *big.Int {
t.mutex.RLock()
defer t.mutex.RUnlock()

if t.lastSyncedVerifiedBlockID == nil {
return nil
}

return new(big.Int).Set(t.lastSyncedVerifiedBlockID)
}

// LastSyncedVerifiedBlockHash returns tracker.lastSyncedVerifiedBlockHash.
func (t *SyncProgressTracker) LastSyncedVerifiedBlockHash() common.Hash {
val := t.lastSyncedVerifiedBlockHash.Load()
if val == nil {
return common.Hash{}
}
return val.(common.Hash)
t.mutex.RLock()
defer t.mutex.RUnlock()

return t.lastSyncedVerifiedBlockHash
}

// syncProgressed checks whether there is any new progress since last sync progress check.
Expand Down
20 changes: 10 additions & 10 deletions driver/chain_syncer/beaconsync/progress_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,22 +59,22 @@ func (s *BeaconSyncProgressTrackerTestSuite) TestTrack() {

// Triggered
ctx, cancel = context.WithCancel(context.Background())
s.t.UpdateMeta(common.Big256.Uint64(), testutils.RandomHash())
s.t.UpdateMeta(common.Big256, testutils.RandomHash())
go s.t.Track(ctx)
time.Sleep(syncProgressCheckInterval + 5*time.Second)
cancel()
}

func (s *BeaconSyncProgressTrackerTestSuite) TestClearMeta() {
s.t.triggered.Store(true)
s.t.triggered = true
s.t.ClearMeta()
s.False(s.t.triggered.Load())
s.False(s.t.triggered)
}

func (s *BeaconSyncProgressTrackerTestSuite) TestHeadChanged() {
s.True(s.t.HeadChanged(256))
s.t.triggered.Store(true)
s.False(s.t.HeadChanged(s.t.LastSyncedVerifiedBlockID()))
s.True(s.t.HeadChanged(common.Big256))
s.t.triggered = true
s.False(s.t.HeadChanged(common.Big256))
}

func (s *BeaconSyncProgressTrackerTestSuite) TestOutOfSync() {
Expand All @@ -86,9 +86,9 @@ func (s *BeaconSyncProgressTrackerTestSuite) TestTriggered() {
}

func (s *BeaconSyncProgressTrackerTestSuite) TestLastSyncedVerifiedBlockID() {
s.Equal(uint64(0), s.t.LastSyncedVerifiedBlockID())
s.t.lastSyncedVerifiedBlockID.Store(1)
s.Equal(uint64(1), s.t.LastSyncedVerifiedBlockID())
s.Nil(s.t.LastSyncedVerifiedBlockID())
s.t.lastSyncedVerifiedBlockID = common.Big1
s.Equal(common.Big1.Uint64(), s.t.LastSyncedVerifiedBlockID().Uint64())
}

func (s *BeaconSyncProgressTrackerTestSuite) TestLastSyncedVerifiedBlockHash() {
Expand All @@ -97,7 +97,7 @@ func (s *BeaconSyncProgressTrackerTestSuite) TestLastSyncedVerifiedBlockHash() {
s.t.LastSyncedVerifiedBlockHash(),
)
randomHash := testutils.RandomHash()
s.t.lastSyncedVerifiedBlockHash.Store(randomHash)
s.t.lastSyncedVerifiedBlockHash = randomHash
s.Equal(randomHash, s.t.LastSyncedVerifiedBlockHash())
}

Expand Down
4 changes: 2 additions & 2 deletions driver/chain_syncer/beaconsync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (s *Syncer) TriggerBeaconSync(blockID uint64) error {
return err
}

if !s.progressTracker.HeadChanged(blockID) {
if !s.progressTracker.HeadChanged(new(big.Int).SetUint64(blockID)) {
log.Debug("Verified head has not changed", "blockID", blockID, "hash", latestVerifiedHeadPayload.BlockHash)
return nil
}
Expand Down Expand Up @@ -78,7 +78,7 @@ func (s *Syncer) TriggerBeaconSync(blockID uint64) error {
}

// Update sync status.
s.progressTracker.UpdateMeta(blockID, latestVerifiedHeadPayload.BlockHash)
s.progressTracker.UpdateMeta(new(big.Int).SetUint64(blockID), latestVerifiedHeadPayload.BlockHash)

log.Info(
"⛓️ Beacon sync triggered",
Expand Down
2 changes: 1 addition & 1 deletion driver/chain_syncer/calldata/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (s *Syncer) onBlockProposed(
)
if s.progressTracker.Triggered() {
// Already synced through beacon sync, just skip this event.
if event.BlockId.Uint64() <= s.progressTracker.LastSyncedVerifiedBlockID() {
if event.BlockId.Cmp(s.progressTracker.LastSyncedVerifiedBlockID()) <= 0 {
return nil
}

Expand Down
6 changes: 3 additions & 3 deletions driver/chain_syncer/chain_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (s *L2ChainSyncer) Sync() error {
}

// Reset to the latest L2 execution engine's chain status.
s.progressTracker.UpdateMeta(l2Head.Number.Uint64(), l2Head.Hash())
s.progressTracker.UpdateMeta(l2Head.Number, l2Head.Hash())
}

// Insert the proposed block one by one.
Expand Down Expand Up @@ -136,8 +136,8 @@ func (s *L2ChainSyncer) AheadOfProtocolVerifiedHead(verifiedHeightToCompare uint
return false
}

if s.progressTracker.LastSyncedVerifiedBlockID() != 0 {
return s.state.GetL2Head().Number.Uint64() >= s.progressTracker.LastSyncedVerifiedBlockID()
if s.progressTracker.LastSyncedVerifiedBlockID() != nil {
return s.state.GetL2Head().Number.Uint64() >= s.progressTracker.LastSyncedVerifiedBlockID().Uint64()
}

return true
Expand Down

0 comments on commit f18952e

Please sign in to comment.