Skip to content

Commit

Permalink
feat: Add e2e reorg tests to syncers (#56)
Browse files Browse the repository at this point in the history
* feat: Added e2e tests to the syncer

* fix: UTs

* fix: comments

* fix: comments 2

* fix: rebase

* fix: lint

* fix: rebase remove old test

* fix: another rebase fix

* fix: stress test

* fix: ut build

* fix: rebuild tree after reorg

* fix: comments

---------

Co-authored-by: Goran Rojovic <goran.rojovic@ethernal.tech>
  • Loading branch information
rbpol and goran-ethernal authored Sep 25, 2024
1 parent 3e3f348 commit 9a694c4
Show file tree
Hide file tree
Showing 14 changed files with 404 additions and 140 deletions.
3 changes: 2 additions & 1 deletion l1infotreesync/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ func buildAppender(client EthClienter, globalExitRoot, rollupManager common.Addr
l, err,
)
}
log.Infof("updateL1InfoTreeSignatureV2: expected root: %s", common.Bytes2Hex(l1InfoTreeUpdate.CurrentL1InfoRoot[:]))
log.Infof("updateL1InfoTreeSignatureV2: expected root: %s",
common.BytesToHash(l1InfoTreeUpdate.CurrentL1InfoRoot[:]))

return nil
}
Expand Down
229 changes: 181 additions & 48 deletions l1infotreesync/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,14 +160,124 @@ func TestE2E(t *testing.T) {
}
}

func TestWithReorgs(t *testing.T) {
ctx := context.Background()
dbPathSyncer := path.Join(t.TempDir(), "file::memory:?cache=shared")
dbPathReorg := t.TempDir()
privateKey, err := crypto.GenerateKey()
require.NoError(t, err)
auth, err := bind.NewKeyedTransactorWithChainID(privateKey, big.NewInt(1337))
require.NoError(t, err)
client, gerAddr, verifyAddr, gerSc, verifySC, err := newSimulatedClient(auth)
require.NoError(t, err)
rd, err := reorgdetector.New(client.Client(), reorgdetector.Config{DBPath: dbPathReorg, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 30)})
require.NoError(t, err)
require.NoError(t, rd.Start(ctx))
syncer, err := l1infotreesync.New(ctx, dbPathSyncer, gerAddr, verifyAddr, 10, etherman.LatestBlock, rd, client.Client(), time.Millisecond, 0, time.Second, 25)
require.NoError(t, err)
go syncer.Start(ctx)

// Commit block
header, err := client.Client().HeaderByHash(ctx, client.Commit()) // Block 3
require.NoError(t, err)
reorgFrom := header.Hash()
fmt.Println("start from header:", header.Number)

updateL1InfoTreeAndRollupExitTree := func(i int, rollupID uint32) {
// Update L1 Info Tree
_, err := gerSc.UpdateExitRoot(auth, common.HexToHash(strconv.Itoa(i)))
require.NoError(t, err)

// Update L1 Info Tree + Rollup Exit Tree
newLocalExitRoot := common.HexToHash(strconv.Itoa(i) + "ffff" + strconv.Itoa(1))
_, err = verifySC.VerifyBatches(auth, rollupID, 0, newLocalExitRoot, common.Hash{}, true)
require.NoError(t, err)

// Update Rollup Exit Tree
newLocalExitRoot = common.HexToHash(strconv.Itoa(i) + "ffff" + strconv.Itoa(2))
_, err = verifySC.VerifyBatches(auth, rollupID, 0, newLocalExitRoot, common.Hash{}, false)
require.NoError(t, err)
}

// create some events and update the trees
updateL1InfoTreeAndRollupExitTree(1, 1)

// Block 4
commitBlocks(t, client, 1, time.Second*5)

// Make sure syncer is up to date
waitForSyncerToCatchUp(ctx, t, syncer, client)

// Assert rollup exit root
expectedRollupExitRoot, err := verifySC.GetRollupExitRoot(&bind.CallOpts{Pending: false})
require.NoError(t, err)
actualRollupExitRoot, err := syncer.GetLastRollupExitRoot(ctx)
require.NoError(t, err)
t.Log("exit roots", common.Hash(expectedRollupExitRoot), actualRollupExitRoot.Hash)
require.Equal(t, common.Hash(expectedRollupExitRoot), actualRollupExitRoot.Hash)

// Assert L1 Info tree root
expectedL1InfoRoot, err := gerSc.GetRoot(&bind.CallOpts{Pending: false})
require.NoError(t, err)
expectedGER, err := gerSc.GetLastGlobalExitRoot(&bind.CallOpts{Pending: false})
require.NoError(t, err)
actualL1InfoRoot, err := syncer.GetLastL1InfoTreeRoot(ctx)
require.NoError(t, err)
info, err := syncer.GetInfoByIndex(ctx, actualL1InfoRoot.Index)
require.NoError(t, err)

require.Equal(t, common.Hash(expectedL1InfoRoot), actualL1InfoRoot.Hash)
require.Equal(t, common.Hash(expectedGER), info.GlobalExitRoot, fmt.Sprintf("%+v", info))

// Forking from block 3
err = client.Fork(reorgFrom)
require.NoError(t, err)

// Block 4, 5, 6 after the fork
commitBlocks(t, client, 3, time.Millisecond*500)

// Make sure syncer is up to date
waitForSyncerToCatchUp(ctx, t, syncer, client)

// Assert rollup exit root after the fork - should be zero since there are no events in the block after the fork
expectedRollupExitRoot, err = verifySC.GetRollupExitRoot(&bind.CallOpts{Pending: false})
require.NoError(t, err)
actualRollupExitRoot, err = syncer.GetLastRollupExitRoot(ctx)
require.ErrorContains(t, err, "not found") // rollup exit tree reorged, it does not have any exits in it
t.Log("exit roots", common.Hash(expectedRollupExitRoot), actualRollupExitRoot.Hash)
require.Equal(t, common.Hash(expectedRollupExitRoot), actualRollupExitRoot.Hash)

// Forking from block 3 again
err = client.Fork(reorgFrom)
require.NoError(t, err)
time.Sleep(time.Millisecond * 500)

// create some events and update the trees
updateL1InfoTreeAndRollupExitTree(2, 1)

// Block 4, 5, 6, 7 after the fork
commitBlocks(t, client, 4, time.Millisecond*100)

// Make sure syncer is up to date
waitForSyncerToCatchUp(ctx, t, syncer, client)

// Assert rollup exit root after the fork - should be zero since there are no events in the block after the fork
expectedRollupExitRoot, err = verifySC.GetRollupExitRoot(&bind.CallOpts{Pending: false})
require.NoError(t, err)
actualRollupExitRoot, err = syncer.GetLastRollupExitRoot(ctx)
require.NoError(t, err)
t.Log("exit roots", common.Hash(expectedRollupExitRoot), actualRollupExitRoot.Hash)
require.Equal(t, common.Hash(expectedRollupExitRoot), actualRollupExitRoot.Hash)
}

func TestStressAndReorgs(t *testing.T) {
const (
totalIterations = 200 // Have tested with much larger number (+10k)
enableReorgs = false // test fails when set to true
reorgEveryXIterations = 53
maxReorgDepth = 5
maxEventsPerBlock = 7
maxRollups = 31
totalIterations = 3
blocksInIteration = 140
reorgEveryXIterations = 70
reorgSizeInBlocks = 2
maxRollupID = 31
extraBlocksToMine = 10
)

ctx := context.Background()
Expand All @@ -182,58 +292,48 @@ func TestStressAndReorgs(t *testing.T) {
rd, err := reorgdetector.New(client.Client(), reorgdetector.Config{DBPath: dbPathReorg, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 100)})
require.NoError(t, err)
require.NoError(t, rd.Start(ctx))
syncer, err := l1infotreesync.New(ctx, dbPathSyncer, gerAddr, verifyAddr, 10, etherman.LatestBlock, rd, client.Client(), time.Millisecond, 0, 100*time.Millisecond, 3)
syncer, err := l1infotreesync.New(ctx, dbPathSyncer, gerAddr, verifyAddr, 10, etherman.LatestBlock, rd, client.Client(), time.Millisecond, 0, time.Second, 100)
require.NoError(t, err)
go syncer.Start(ctx)

for i := 0; i < totalIterations; i++ {
for j := 0; j < i%maxEventsPerBlock; j++ {
switch j % 3 {
case 0: // Update L1 Info Tree
_, err := gerSc.UpdateExitRoot(auth, common.HexToHash(strconv.Itoa(i)))
require.NoError(t, err)
case 1: // Update L1 Info Tree + Rollup Exit Tree
newLocalExitRoot := common.HexToHash(strconv.Itoa(i) + "ffff" + strconv.Itoa(j))
_, err := verifySC.VerifyBatches(auth, 1+uint32(i%maxRollups), 0, newLocalExitRoot, common.Hash{}, true)
require.NoError(t, err)
case 2: // Update Rollup Exit Tree
newLocalExitRoot := common.HexToHash(strconv.Itoa(i) + "ffff" + strconv.Itoa(j))
_, err := verifySC.VerifyBatches(auth, 1+uint32(i%maxRollups), 0, newLocalExitRoot, common.Hash{}, false)
updateL1InfoTreeAndRollupExitTree := func(i, j int, rollupID uint32) {
// Update L1 Info Tree
_, err := gerSc.UpdateExitRoot(auth, common.HexToHash(strconv.Itoa(i)))
require.NoError(t, err)

// Update L1 Info Tree + Rollup Exit Tree
newLocalExitRoot := common.HexToHash(strconv.Itoa(i) + "ffff" + strconv.Itoa(j))
_, err = verifySC.VerifyBatches(auth, rollupID, 0, newLocalExitRoot, common.Hash{}, true)
require.NoError(t, err)

// Update Rollup Exit Tree
newLocalExitRoot = common.HexToHash(strconv.Itoa(i) + "fffa" + strconv.Itoa(j))
_, err = verifySC.VerifyBatches(auth, rollupID, 0, newLocalExitRoot, common.Hash{}, false)
require.NoError(t, err)
}

for i := 1; i <= totalIterations; i++ {
for j := 1; j <= blocksInIteration; j++ {
commitBlocks(t, client, 1, time.Millisecond*10)

if j%reorgEveryXIterations == 0 {
currentBlockNum, err := client.Client().BlockNumber(ctx)
require.NoError(t, err)
}
}
client.Commit()
time.Sleep(time.Microsecond * 30) // Sleep just enough for goroutine to switch
if enableReorgs && i%reorgEveryXIterations == 0 {
reorgDepth := i%maxReorgDepth + 1
currentBlockNum, err := client.Client().BlockNumber(ctx)
require.NoError(t, err)
targetReorgBlockNum := currentBlockNum - uint64(reorgDepth)
if targetReorgBlockNum < currentBlockNum { // we are dealing with uints...
reorgBlock, err := client.Client().BlockByNumber(ctx, big.NewInt(int64(targetReorgBlockNum)))

block, err := client.Client().BlockByNumber(ctx, big.NewInt(int64(currentBlockNum-reorgSizeInBlocks)))
require.NoError(t, err)
err = client.Fork(reorgBlock.Hash())
reorgFrom := block.Hash()
err = client.Fork(reorgFrom)
require.NoError(t, err)
} else {
updateL1InfoTreeAndRollupExitTree(i, j, uint32(j%maxRollupID)+1)
}
}
}

syncerUpToDate := false
var errMsg string
lb, err := client.Client().BlockNumber(ctx)
require.NoError(t, err)
for i := 0; i < 50; i++ {
lpb, err := syncer.GetLastProcessedBlock(ctx)
require.NoError(t, err)
if lpb == lb {
syncerUpToDate = true
commitBlocks(t, client, 1, time.Millisecond*10)

break
}
time.Sleep(time.Millisecond * 100)
errMsg = fmt.Sprintf("last block from client: %d, last block from syncer: %d", lb, lpb)
}
require.True(t, syncerUpToDate, errMsg)
waitForSyncerToCatchUp(ctx, t, syncer, client)

// Assert rollup exit root
expectedRollupExitRoot, err := verifySC.GetRollupExitRoot(&bind.CallOpts{Pending: false})
Expand All @@ -252,6 +352,39 @@ func TestStressAndReorgs(t *testing.T) {
info, err := syncer.GetInfoByIndex(ctx, lastRoot.Index)
require.NoError(t, err, fmt.Sprintf("index: %d", lastRoot.Index))

require.Equal(t, common.Hash(expectedL1InfoRoot), lastRoot.Hash)
t.Logf("expectedL1InfoRoot: %s", common.Hash(expectedL1InfoRoot).String())
require.Equal(t, common.Hash(expectedGER), info.GlobalExitRoot, fmt.Sprintf("%+v", info))
require.Equal(t, common.Hash(expectedL1InfoRoot), lastRoot.Hash)
}

func waitForSyncerToCatchUp(ctx context.Context, t *testing.T, syncer *l1infotreesync.L1InfoTreeSync, client *simulated.Backend) {
t.Helper()

syncerUpToDate := false
var errMsg string

for i := 0; i < 200; i++ {
lpb, err := syncer.GetLastProcessedBlock(ctx)
require.NoError(t, err)
lb, err := client.Client().BlockNumber(ctx)
require.NoError(t, err)
if lpb == lb {
syncerUpToDate = true
break
}
time.Sleep(time.Second / 2)
errMsg = fmt.Sprintf("last block from client: %d, last block from syncer: %d", lb, lpb)
}

require.True(t, syncerUpToDate, errMsg)
}

// commitBlocks commits the specified number of blocks with the given client and waits for the specified duration after each block
func commitBlocks(t *testing.T, client *simulated.Backend, numBlocks int, waitDuration time.Duration) {
t.Helper()

for i := 0; i < numBlocks; i++ {
client.Commit()
time.Sleep(waitDuration)
}
}
2 changes: 1 addition & 1 deletion l1infotreesync/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func (p *processor) ProcessBlock(ctx context.Context, b sync.Block) error {
}
}()

if _, err := tx.Exec(`INSERT INTO block (num) VALUES ($1)`, b.Num); err != nil {
if _, err = tx.Exec(`INSERT INTO block (num) VALUES ($1)`, b.Num); err != nil {
return fmt.Errorf("err: %w", err)
}

Expand Down
6 changes: 5 additions & 1 deletion lastgersync/evmdownloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,11 @@ func (d *downloader) Download(ctx context.Context, fromBlock uint64, downloadedC
break
}

blockHeader := d.GetBlockHeader(ctx, lastBlock)
blockHeader, isCanceled := d.GetBlockHeader(ctx, lastBlock)
if isCanceled {
return
}

block := &sync.EVMBlock{
EVMBlockHeader: sync.EVMBlockHeader{
Num: blockHeader.Num,
Expand Down
18 changes: 13 additions & 5 deletions reorgdetector/reorgdetector.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,20 @@ func (rd *ReorgDetector) detectReorgInTrackedList(ctx context.Context) error {
errGroup errgroup.Group
)

rd.trackedBlocksLock.Lock()
defer rd.trackedBlocksLock.Unlock()
subscriberIDs := rd.getSubscriberIDs()

for id, hdrs := range rd.trackedBlocks {
for _, id := range subscriberIDs {
id := id
hdrs := hdrs

// This is done like this because of a possible deadlock
// between AddBlocksToTrack and detectReorgInTrackedList
rd.trackedBlocksLock.RLock()
hdrs, ok := rd.trackedBlocks[id]
rd.trackedBlocksLock.RUnlock()

if !ok {
continue
}

errGroup.Go(func() error {
headers := hdrs.getSorted()
Expand All @@ -136,7 +144,7 @@ func (rd *ReorgDetector) detectReorgInTrackedList(ctx context.Context) error {
if !ok || currentHeader == nil {
if currentHeader, err = rd.client.HeaderByNumber(ctx, new(big.Int).SetUint64(hdr.Num)); err != nil {
headersCacheLock.Unlock()
return fmt.Errorf("failed to get the header: %w", err)
return fmt.Errorf("failed to get the header %d: %w", hdr.Num, err)
}
headersCache[hdr.Num] = currentHeader
}
Expand Down
5 changes: 4 additions & 1 deletion reorgdetector/reorgdetector_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,17 @@ func (rd *ReorgDetector) getTrackedBlocks(ctx context.Context) (map[string]*head

// saveTrackedBlock saves the tracked block for a subscriber in db and in memory
func (rd *ReorgDetector) saveTrackedBlock(ctx context.Context, id string, b header) error {
rd.trackedBlocksLock.Lock()

// this has to go after the lock, because of a possible deadlock
// between AddBlocksToTrack and detectReorgInTrackedList
tx, err := rd.db.BeginRw(ctx)
if err != nil {
return err
}

defer tx.Rollback()

rd.trackedBlocksLock.Lock()
hdrs, ok := rd.trackedBlocks[id]
if !ok || hdrs.isEmpty() {
hdrs = newHeadersList(b)
Expand Down
19 changes: 17 additions & 2 deletions reorgdetector/reorgdetector_sub.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,24 @@ func (rd *ReorgDetector) Subscribe(id string) (*Subscription, error) {
func (rd *ReorgDetector) notifySubscriber(id string, startingBlock header) {
// Notify subscriber about this particular reorg
rd.subscriptionsLock.RLock()
if sub, ok := rd.subscriptions[id]; ok {
sub, ok := rd.subscriptions[id]
rd.subscriptionsLock.RUnlock()

if ok {
sub.ReorgedBlock <- startingBlock.Num
<-sub.ReorgProcessed
}
rd.subscriptionsLock.RUnlock()
}

// getSubscriberIDs returns a list of subscriber IDs
func (rd *ReorgDetector) getSubscriberIDs() []string {
rd.subscriptionsLock.RLock()
defer rd.subscriptionsLock.RUnlock()

ids := make([]string, 0, len(rd.subscriptions))
for id := range rd.subscriptions {
ids = append(ids, id)
}

return ids
}
Loading

0 comments on commit 9a694c4

Please sign in to comment.