diff --git a/driver/chain_syncer/beaconsync/progress_tracker.go b/driver/chain_syncer/beaconsync/progress_tracker.go index e11feeb79..1948b8ddf 100644 --- a/driver/chain_syncer/beaconsync/progress_tracker.go +++ b/driver/chain_syncer/beaconsync/progress_tracker.go @@ -2,7 +2,8 @@ package beaconsync import ( "context" - "sync/atomic" + "math/big" + "sync" "time" "github.com/ethereum/go-ethereum" @@ -24,16 +25,19 @@ type SyncProgressTracker struct { client *rpc.EthClient // Meta data - triggered atomic.Bool - lastSyncedVerifiedBlockID atomic.Uint64 - lastSyncedVerifiedBlockHash atomic.Value + triggered bool + lastSyncedVerifiedBlockID *big.Int + lastSyncedVerifiedBlockHash common.Hash // Out-of-sync check related lastSyncProgress *ethereum.SyncProgress lastProgressedTime time.Time timeout time.Duration - outOfSync atomic.Bool + outOfSync bool ticker *time.Ticker + + // Read-write mutex + mutex sync.RWMutex } // NewSyncProgressTracker creates a new SyncProgressTracker instance. @@ -56,12 +60,15 @@ func (t *SyncProgressTracker) Track(ctx context.Context) { // track is the internal implementation of MonitorSyncProgress, tries to // track the L2 execution engine's beacon sync progress. func (t *SyncProgressTracker) track(ctx context.Context) { - if !t.triggered.Load() { + t.mutex.Lock() + defer t.mutex.Unlock() + + if !t.triggered { log.Debug("Beacon sync not triggered") return } - if t.outOfSync.Load() { + if t.outOfSync { return } @@ -87,12 +94,12 @@ func (t *SyncProgressTracker) track(ctx context.Context) { return } - if headHeight >= t.lastSyncedVerifiedBlockID.Load() { + if new(big.Int).SetUint64(headHeight).Cmp(t.lastSyncedVerifiedBlockID) >= 0 { t.lastProgressedTime = time.Now() log.Info( "L2 execution engine has finished the P2P sync work, all verified blocks synced, "+ "will switch to insert pending blocks one by one", - "lastSyncedVerifiedBlockID", t.lastSyncedVerifiedBlockID.Load(), + "lastSyncedVerifiedBlockID", t.lastSyncedVerifiedBlockID, "lastSyncedVerifiedBlockHash", t.lastSyncedVerifiedBlockHash, ) return @@ -105,7 +112,7 @@ func (t *SyncProgressTracker) track(ctx context.Context) { // Check whether the L2 execution engine has synced any new block through P2P since last event loop. if syncProgressed(t.lastSyncProgress, progress) { - t.outOfSync.Store(false) + t.outOfSync = false t.lastProgressedTime = time.Now() return } @@ -113,7 +120,7 @@ func (t *SyncProgressTracker) track(ctx context.Context) { // Has not synced any new block since last loop, check whether reaching the timeout. if time.Since(t.lastProgressedTime) > t.timeout { // Mark the L2 execution engine out of sync. - t.outOfSync.Store(true) + t.outOfSync = true log.Warn( "L2 execution engine is not able to sync through P2P", @@ -124,59 +131,80 @@ func (t *SyncProgressTracker) track(ctx context.Context) { } // UpdateMeta updates the inner beacon sync metadata. -func (t *SyncProgressTracker) UpdateMeta(id uint64, blockHash common.Hash) { +func (t *SyncProgressTracker) UpdateMeta(id *big.Int, blockHash common.Hash) { + t.mutex.Lock() + defer t.mutex.Unlock() + log.Debug("Update sync progress tracker meta", "id", id, "hash", blockHash) - if !t.triggered.Load() { + if !t.triggered { t.lastProgressedTime = time.Now() } - t.triggered.Store(true) - t.lastSyncedVerifiedBlockID.Store(id) - t.lastSyncedVerifiedBlockHash.Store(blockHash) + t.triggered = true + t.lastSyncedVerifiedBlockID = id + t.lastSyncedVerifiedBlockHash = blockHash } // ClearMeta cleans the inner beacon sync metadata. func (t *SyncProgressTracker) ClearMeta() { + t.mutex.Lock() + defer t.mutex.Unlock() + log.Debug("Clear sync progress tracker meta") - t.triggered.Store(false) - t.lastSyncedVerifiedBlockID.Store(0) - t.lastSyncedVerifiedBlockHash.Store(common.Hash{}) - t.outOfSync.Store(false) + t.triggered = false + t.lastSyncedVerifiedBlockID = nil + t.lastSyncedVerifiedBlockHash = common.Hash{} + t.outOfSync = false } // HeadChanged checks if a new beacon sync request will be needed. -func (t *SyncProgressTracker) HeadChanged(newID uint64) bool { - if !t.triggered.Load() { +func (t *SyncProgressTracker) HeadChanged(newID *big.Int) bool { + t.mutex.RLock() + defer t.mutex.RUnlock() + + if !t.triggered { return true } - return t.lastSyncedVerifiedBlockID.Load() != newID + return t.lastSyncedVerifiedBlockID != nil && t.lastSyncedVerifiedBlockID != newID } // OutOfSync tells whether the L2 execution engine is marked as out of sync. func (t *SyncProgressTracker) OutOfSync() bool { - return t.outOfSync.Load() + t.mutex.RLock() + defer t.mutex.RUnlock() + + return t.outOfSync } // Triggered returns tracker.triggered. func (t *SyncProgressTracker) Triggered() bool { - return t.triggered.Load() + t.mutex.RLock() + defer t.mutex.RUnlock() + + return t.triggered } // LastSyncedVerifiedBlockID returns tracker.lastSyncedVerifiedBlockID. -func (t *SyncProgressTracker) LastSyncedVerifiedBlockID() uint64 { - return t.lastSyncedVerifiedBlockID.Load() +func (t *SyncProgressTracker) LastSyncedVerifiedBlockID() *big.Int { + t.mutex.RLock() + defer t.mutex.RUnlock() + + if t.lastSyncedVerifiedBlockID == nil { + return nil + } + + return new(big.Int).Set(t.lastSyncedVerifiedBlockID) } // LastSyncedVerifiedBlockHash returns tracker.lastSyncedVerifiedBlockHash. func (t *SyncProgressTracker) LastSyncedVerifiedBlockHash() common.Hash { - val := t.lastSyncedVerifiedBlockHash.Load() - if val == nil { - return common.Hash{} - } - return val.(common.Hash) + t.mutex.RLock() + defer t.mutex.RUnlock() + + return t.lastSyncedVerifiedBlockHash } // syncProgressed checks whether there is any new progress since last sync progress check. diff --git a/driver/chain_syncer/beaconsync/progress_tracker_test.go b/driver/chain_syncer/beaconsync/progress_tracker_test.go index 3420419e3..37d0a03a0 100644 --- a/driver/chain_syncer/beaconsync/progress_tracker_test.go +++ b/driver/chain_syncer/beaconsync/progress_tracker_test.go @@ -59,22 +59,22 @@ func (s *BeaconSyncProgressTrackerTestSuite) TestTrack() { // Triggered ctx, cancel = context.WithCancel(context.Background()) - s.t.UpdateMeta(common.Big256.Uint64(), testutils.RandomHash()) + s.t.UpdateMeta(common.Big256, testutils.RandomHash()) go s.t.Track(ctx) time.Sleep(syncProgressCheckInterval + 5*time.Second) cancel() } func (s *BeaconSyncProgressTrackerTestSuite) TestClearMeta() { - s.t.triggered.Store(true) + s.t.triggered = true s.t.ClearMeta() - s.False(s.t.triggered.Load()) + s.False(s.t.triggered) } func (s *BeaconSyncProgressTrackerTestSuite) TestHeadChanged() { - s.True(s.t.HeadChanged(256)) - s.t.triggered.Store(true) - s.False(s.t.HeadChanged(s.t.LastSyncedVerifiedBlockID())) + s.True(s.t.HeadChanged(common.Big256)) + s.t.triggered = true + s.False(s.t.HeadChanged(common.Big256)) } func (s *BeaconSyncProgressTrackerTestSuite) TestOutOfSync() { @@ -86,9 +86,9 @@ func (s *BeaconSyncProgressTrackerTestSuite) TestTriggered() { } func (s *BeaconSyncProgressTrackerTestSuite) TestLastSyncedVerifiedBlockID() { - s.Equal(uint64(0), s.t.LastSyncedVerifiedBlockID()) - s.t.lastSyncedVerifiedBlockID.Store(1) - s.Equal(uint64(1), s.t.LastSyncedVerifiedBlockID()) + s.Nil(s.t.LastSyncedVerifiedBlockID()) + s.t.lastSyncedVerifiedBlockID = common.Big1 + s.Equal(common.Big1.Uint64(), s.t.LastSyncedVerifiedBlockID().Uint64()) } func (s *BeaconSyncProgressTrackerTestSuite) TestLastSyncedVerifiedBlockHash() { @@ -97,7 +97,7 @@ func (s *BeaconSyncProgressTrackerTestSuite) TestLastSyncedVerifiedBlockHash() { s.t.LastSyncedVerifiedBlockHash(), ) randomHash := testutils.RandomHash() - s.t.lastSyncedVerifiedBlockHash.Store(randomHash) + s.t.lastSyncedVerifiedBlockHash = randomHash s.Equal(randomHash, s.t.LastSyncedVerifiedBlockHash()) } diff --git a/driver/chain_syncer/beaconsync/syncer.go b/driver/chain_syncer/beaconsync/syncer.go index da3b9aa39..6b5027d89 100644 --- a/driver/chain_syncer/beaconsync/syncer.go +++ b/driver/chain_syncer/beaconsync/syncer.go @@ -41,7 +41,7 @@ func (s *Syncer) TriggerBeaconSync(blockID uint64) error { return err } - if !s.progressTracker.HeadChanged(blockID) { + if !s.progressTracker.HeadChanged(new(big.Int).SetUint64(blockID)) { log.Debug("Verified head has not changed", "blockID", blockID, "hash", latestVerifiedHeadPayload.BlockHash) return nil } @@ -78,7 +78,7 @@ func (s *Syncer) TriggerBeaconSync(blockID uint64) error { } // Update sync status. - s.progressTracker.UpdateMeta(blockID, latestVerifiedHeadPayload.BlockHash) + s.progressTracker.UpdateMeta(new(big.Int).SetUint64(blockID), latestVerifiedHeadPayload.BlockHash) log.Info( "⛓️ Beacon sync triggered", diff --git a/driver/chain_syncer/calldata/syncer.go b/driver/chain_syncer/calldata/syncer.go index 258062f8d..36480e9a6 100644 --- a/driver/chain_syncer/calldata/syncer.go +++ b/driver/chain_syncer/calldata/syncer.go @@ -212,7 +212,7 @@ func (s *Syncer) onBlockProposed( ) if s.progressTracker.Triggered() { // Already synced through beacon sync, just skip this event. - if event.BlockId.Uint64() <= s.progressTracker.LastSyncedVerifiedBlockID() { + if event.BlockId.Cmp(s.progressTracker.LastSyncedVerifiedBlockID()) <= 0 { return nil } diff --git a/driver/chain_syncer/chain_syncer.go b/driver/chain_syncer/chain_syncer.go index 1f3113404..39e114752 100644 --- a/driver/chain_syncer/chain_syncer.go +++ b/driver/chain_syncer/chain_syncer.go @@ -108,7 +108,7 @@ func (s *L2ChainSyncer) Sync() error { } // Reset to the latest L2 execution engine's chain status. - s.progressTracker.UpdateMeta(l2Head.Number.Uint64(), l2Head.Hash()) + s.progressTracker.UpdateMeta(l2Head.Number, l2Head.Hash()) } // Insert the proposed block one by one. @@ -136,8 +136,8 @@ func (s *L2ChainSyncer) AheadOfProtocolVerifiedHead(verifiedHeightToCompare uint return false } - if s.progressTracker.LastSyncedVerifiedBlockID() != 0 { - return s.state.GetL2Head().Number.Uint64() >= s.progressTracker.LastSyncedVerifiedBlockID() + if s.progressTracker.LastSyncedVerifiedBlockID() != nil { + return s.state.GetL2Head().Number.Uint64() >= s.progressTracker.LastSyncedVerifiedBlockID().Uint64() } return true