From 9a694c42045debb2fee5f8713154c198a17a5c17 Mon Sep 17 00:00:00 2001 From: rbpol Date: Wed, 25 Sep 2024 08:07:37 +0100 Subject: [PATCH] feat: Add e2e reorg tests to syncers (#56) * feat: Added e2e tests to the syncer * fix: UTs * fix: comments * fix: comments 2 * fix: rebase * fix: lint * fix: rebase remove old test * fix: another rebase fix * fix: stress test * fix: ut build * fix: rebuild tree after reorg * fix: comments --------- Co-authored-by: Goran Rojovic --- l1infotreesync/downloader.go | 3 +- l1infotreesync/e2e_test.go | 229 +++++++++++++++++++++++------ l1infotreesync/processor.go | 2 +- lastgersync/evmdownloader.go | 6 +- reorgdetector/reorgdetector.go | 18 ++- reorgdetector/reorgdetector_db.go | 5 +- reorgdetector/reorgdetector_sub.go | 19 ++- sync/evmdownloader.go | 99 ++++++++----- sync/evmdownloader_test.go | 22 +-- sync/evmdriver.go | 19 +-- sync/evmdriver_test.go | 21 +-- sync/mock_downloader_test.go | 16 +- tree/appendonlytree.go | 2 +- tree/tree_test.go | 83 +++++++++++ 14 files changed, 404 insertions(+), 140 deletions(-) diff --git a/l1infotreesync/downloader.go b/l1infotreesync/downloader.go index 2051f7b5..16ccb37a 100644 --- a/l1infotreesync/downloader.go +++ b/l1infotreesync/downloader.go @@ -86,7 +86,8 @@ func buildAppender(client EthClienter, globalExitRoot, rollupManager common.Addr l, err, ) } - log.Infof("updateL1InfoTreeSignatureV2: expected root: %s", common.Bytes2Hex(l1InfoTreeUpdate.CurrentL1InfoRoot[:])) + log.Infof("updateL1InfoTreeSignatureV2: expected root: %s", + common.BytesToHash(l1InfoTreeUpdate.CurrentL1InfoRoot[:])) return nil } diff --git a/l1infotreesync/e2e_test.go b/l1infotreesync/e2e_test.go index 146c1924..21820059 100644 --- a/l1infotreesync/e2e_test.go +++ b/l1infotreesync/e2e_test.go @@ -160,14 +160,124 @@ func TestE2E(t *testing.T) { } } +func TestWithReorgs(t *testing.T) { + ctx := context.Background() + dbPathSyncer := path.Join(t.TempDir(), "file::memory:?cache=shared") + dbPathReorg := t.TempDir() + privateKey, err := crypto.GenerateKey() + require.NoError(t, err) + auth, err := bind.NewKeyedTransactorWithChainID(privateKey, big.NewInt(1337)) + require.NoError(t, err) + client, gerAddr, verifyAddr, gerSc, verifySC, err := newSimulatedClient(auth) + require.NoError(t, err) + rd, err := reorgdetector.New(client.Client(), reorgdetector.Config{DBPath: dbPathReorg, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 30)}) + require.NoError(t, err) + require.NoError(t, rd.Start(ctx)) + syncer, err := l1infotreesync.New(ctx, dbPathSyncer, gerAddr, verifyAddr, 10, etherman.LatestBlock, rd, client.Client(), time.Millisecond, 0, time.Second, 25) + require.NoError(t, err) + go syncer.Start(ctx) + + // Commit block + header, err := client.Client().HeaderByHash(ctx, client.Commit()) // Block 3 + require.NoError(t, err) + reorgFrom := header.Hash() + fmt.Println("start from header:", header.Number) + + updateL1InfoTreeAndRollupExitTree := func(i int, rollupID uint32) { + // Update L1 Info Tree + _, err := gerSc.UpdateExitRoot(auth, common.HexToHash(strconv.Itoa(i))) + require.NoError(t, err) + + // Update L1 Info Tree + Rollup Exit Tree + newLocalExitRoot := common.HexToHash(strconv.Itoa(i) + "ffff" + strconv.Itoa(1)) + _, err = verifySC.VerifyBatches(auth, rollupID, 0, newLocalExitRoot, common.Hash{}, true) + require.NoError(t, err) + + // Update Rollup Exit Tree + newLocalExitRoot = common.HexToHash(strconv.Itoa(i) + "ffff" + strconv.Itoa(2)) + _, err = verifySC.VerifyBatches(auth, rollupID, 0, newLocalExitRoot, common.Hash{}, false) + require.NoError(t, err) + } + + // create some events and update the trees + updateL1InfoTreeAndRollupExitTree(1, 1) + + // Block 4 + commitBlocks(t, client, 1, time.Second*5) + + // Make sure syncer is up to date + waitForSyncerToCatchUp(ctx, t, syncer, client) + + // Assert rollup exit root + expectedRollupExitRoot, err := verifySC.GetRollupExitRoot(&bind.CallOpts{Pending: false}) + require.NoError(t, err) + actualRollupExitRoot, err := syncer.GetLastRollupExitRoot(ctx) + require.NoError(t, err) + t.Log("exit roots", common.Hash(expectedRollupExitRoot), actualRollupExitRoot.Hash) + require.Equal(t, common.Hash(expectedRollupExitRoot), actualRollupExitRoot.Hash) + + // Assert L1 Info tree root + expectedL1InfoRoot, err := gerSc.GetRoot(&bind.CallOpts{Pending: false}) + require.NoError(t, err) + expectedGER, err := gerSc.GetLastGlobalExitRoot(&bind.CallOpts{Pending: false}) + require.NoError(t, err) + actualL1InfoRoot, err := syncer.GetLastL1InfoTreeRoot(ctx) + require.NoError(t, err) + info, err := syncer.GetInfoByIndex(ctx, actualL1InfoRoot.Index) + require.NoError(t, err) + + require.Equal(t, common.Hash(expectedL1InfoRoot), actualL1InfoRoot.Hash) + require.Equal(t, common.Hash(expectedGER), info.GlobalExitRoot, fmt.Sprintf("%+v", info)) + + // Forking from block 3 + err = client.Fork(reorgFrom) + require.NoError(t, err) + + // Block 4, 5, 6 after the fork + commitBlocks(t, client, 3, time.Millisecond*500) + + // Make sure syncer is up to date + waitForSyncerToCatchUp(ctx, t, syncer, client) + + // Assert rollup exit root after the fork - should be zero since there are no events in the block after the fork + expectedRollupExitRoot, err = verifySC.GetRollupExitRoot(&bind.CallOpts{Pending: false}) + require.NoError(t, err) + actualRollupExitRoot, err = syncer.GetLastRollupExitRoot(ctx) + require.ErrorContains(t, err, "not found") // rollup exit tree reorged, it does not have any exits in it + t.Log("exit roots", common.Hash(expectedRollupExitRoot), actualRollupExitRoot.Hash) + require.Equal(t, common.Hash(expectedRollupExitRoot), actualRollupExitRoot.Hash) + + // Forking from block 3 again + err = client.Fork(reorgFrom) + require.NoError(t, err) + time.Sleep(time.Millisecond * 500) + + // create some events and update the trees + updateL1InfoTreeAndRollupExitTree(2, 1) + + // Block 4, 5, 6, 7 after the fork + commitBlocks(t, client, 4, time.Millisecond*100) + + // Make sure syncer is up to date + waitForSyncerToCatchUp(ctx, t, syncer, client) + + // Assert rollup exit root after the fork - should be zero since there are no events in the block after the fork + expectedRollupExitRoot, err = verifySC.GetRollupExitRoot(&bind.CallOpts{Pending: false}) + require.NoError(t, err) + actualRollupExitRoot, err = syncer.GetLastRollupExitRoot(ctx) + require.NoError(t, err) + t.Log("exit roots", common.Hash(expectedRollupExitRoot), actualRollupExitRoot.Hash) + require.Equal(t, common.Hash(expectedRollupExitRoot), actualRollupExitRoot.Hash) +} + func TestStressAndReorgs(t *testing.T) { const ( - totalIterations = 200 // Have tested with much larger number (+10k) - enableReorgs = false // test fails when set to true - reorgEveryXIterations = 53 - maxReorgDepth = 5 - maxEventsPerBlock = 7 - maxRollups = 31 + totalIterations = 3 + blocksInIteration = 140 + reorgEveryXIterations = 70 + reorgSizeInBlocks = 2 + maxRollupID = 31 + extraBlocksToMine = 10 ) ctx := context.Background() @@ -182,58 +292,48 @@ func TestStressAndReorgs(t *testing.T) { rd, err := reorgdetector.New(client.Client(), reorgdetector.Config{DBPath: dbPathReorg, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 100)}) require.NoError(t, err) require.NoError(t, rd.Start(ctx)) - syncer, err := l1infotreesync.New(ctx, dbPathSyncer, gerAddr, verifyAddr, 10, etherman.LatestBlock, rd, client.Client(), time.Millisecond, 0, 100*time.Millisecond, 3) + syncer, err := l1infotreesync.New(ctx, dbPathSyncer, gerAddr, verifyAddr, 10, etherman.LatestBlock, rd, client.Client(), time.Millisecond, 0, time.Second, 100) require.NoError(t, err) go syncer.Start(ctx) - for i := 0; i < totalIterations; i++ { - for j := 0; j < i%maxEventsPerBlock; j++ { - switch j % 3 { - case 0: // Update L1 Info Tree - _, err := gerSc.UpdateExitRoot(auth, common.HexToHash(strconv.Itoa(i))) - require.NoError(t, err) - case 1: // Update L1 Info Tree + Rollup Exit Tree - newLocalExitRoot := common.HexToHash(strconv.Itoa(i) + "ffff" + strconv.Itoa(j)) - _, err := verifySC.VerifyBatches(auth, 1+uint32(i%maxRollups), 0, newLocalExitRoot, common.Hash{}, true) - require.NoError(t, err) - case 2: // Update Rollup Exit Tree - newLocalExitRoot := common.HexToHash(strconv.Itoa(i) + "ffff" + strconv.Itoa(j)) - _, err := verifySC.VerifyBatches(auth, 1+uint32(i%maxRollups), 0, newLocalExitRoot, common.Hash{}, false) + updateL1InfoTreeAndRollupExitTree := func(i, j int, rollupID uint32) { + // Update L1 Info Tree + _, err := gerSc.UpdateExitRoot(auth, common.HexToHash(strconv.Itoa(i))) + require.NoError(t, err) + + // Update L1 Info Tree + Rollup Exit Tree + newLocalExitRoot := common.HexToHash(strconv.Itoa(i) + "ffff" + strconv.Itoa(j)) + _, err = verifySC.VerifyBatches(auth, rollupID, 0, newLocalExitRoot, common.Hash{}, true) + require.NoError(t, err) + + // Update Rollup Exit Tree + newLocalExitRoot = common.HexToHash(strconv.Itoa(i) + "fffa" + strconv.Itoa(j)) + _, err = verifySC.VerifyBatches(auth, rollupID, 0, newLocalExitRoot, common.Hash{}, false) + require.NoError(t, err) + } + + for i := 1; i <= totalIterations; i++ { + for j := 1; j <= blocksInIteration; j++ { + commitBlocks(t, client, 1, time.Millisecond*10) + + if j%reorgEveryXIterations == 0 { + currentBlockNum, err := client.Client().BlockNumber(ctx) require.NoError(t, err) - } - } - client.Commit() - time.Sleep(time.Microsecond * 30) // Sleep just enough for goroutine to switch - if enableReorgs && i%reorgEveryXIterations == 0 { - reorgDepth := i%maxReorgDepth + 1 - currentBlockNum, err := client.Client().BlockNumber(ctx) - require.NoError(t, err) - targetReorgBlockNum := currentBlockNum - uint64(reorgDepth) - if targetReorgBlockNum < currentBlockNum { // we are dealing with uints... - reorgBlock, err := client.Client().BlockByNumber(ctx, big.NewInt(int64(targetReorgBlockNum))) + + block, err := client.Client().BlockByNumber(ctx, big.NewInt(int64(currentBlockNum-reorgSizeInBlocks))) require.NoError(t, err) - err = client.Fork(reorgBlock.Hash()) + reorgFrom := block.Hash() + err = client.Fork(reorgFrom) require.NoError(t, err) + } else { + updateL1InfoTreeAndRollupExitTree(i, j, uint32(j%maxRollupID)+1) } } } - syncerUpToDate := false - var errMsg string - lb, err := client.Client().BlockNumber(ctx) - require.NoError(t, err) - for i := 0; i < 50; i++ { - lpb, err := syncer.GetLastProcessedBlock(ctx) - require.NoError(t, err) - if lpb == lb { - syncerUpToDate = true + commitBlocks(t, client, 1, time.Millisecond*10) - break - } - time.Sleep(time.Millisecond * 100) - errMsg = fmt.Sprintf("last block from client: %d, last block from syncer: %d", lb, lpb) - } - require.True(t, syncerUpToDate, errMsg) + waitForSyncerToCatchUp(ctx, t, syncer, client) // Assert rollup exit root expectedRollupExitRoot, err := verifySC.GetRollupExitRoot(&bind.CallOpts{Pending: false}) @@ -252,6 +352,39 @@ func TestStressAndReorgs(t *testing.T) { info, err := syncer.GetInfoByIndex(ctx, lastRoot.Index) require.NoError(t, err, fmt.Sprintf("index: %d", lastRoot.Index)) - require.Equal(t, common.Hash(expectedL1InfoRoot), lastRoot.Hash) + t.Logf("expectedL1InfoRoot: %s", common.Hash(expectedL1InfoRoot).String()) require.Equal(t, common.Hash(expectedGER), info.GlobalExitRoot, fmt.Sprintf("%+v", info)) + require.Equal(t, common.Hash(expectedL1InfoRoot), lastRoot.Hash) +} + +func waitForSyncerToCatchUp(ctx context.Context, t *testing.T, syncer *l1infotreesync.L1InfoTreeSync, client *simulated.Backend) { + t.Helper() + + syncerUpToDate := false + var errMsg string + + for i := 0; i < 200; i++ { + lpb, err := syncer.GetLastProcessedBlock(ctx) + require.NoError(t, err) + lb, err := client.Client().BlockNumber(ctx) + require.NoError(t, err) + if lpb == lb { + syncerUpToDate = true + break + } + time.Sleep(time.Second / 2) + errMsg = fmt.Sprintf("last block from client: %d, last block from syncer: %d", lb, lpb) + } + + require.True(t, syncerUpToDate, errMsg) +} + +// commitBlocks commits the specified number of blocks with the given client and waits for the specified duration after each block +func commitBlocks(t *testing.T, client *simulated.Backend, numBlocks int, waitDuration time.Duration) { + t.Helper() + + for i := 0; i < numBlocks; i++ { + client.Commit() + time.Sleep(waitDuration) + } } diff --git a/l1infotreesync/processor.go b/l1infotreesync/processor.go index a672c5ef..0bb31cc3 100644 --- a/l1infotreesync/processor.go +++ b/l1infotreesync/processor.go @@ -240,7 +240,7 @@ func (p *processor) ProcessBlock(ctx context.Context, b sync.Block) error { } }() - if _, err := tx.Exec(`INSERT INTO block (num) VALUES ($1)`, b.Num); err != nil { + if _, err = tx.Exec(`INSERT INTO block (num) VALUES ($1)`, b.Num); err != nil { return fmt.Errorf("err: %w", err) } diff --git a/lastgersync/evmdownloader.go b/lastgersync/evmdownloader.go index 97235c28..e76bb578 100644 --- a/lastgersync/evmdownloader.go +++ b/lastgersync/evmdownloader.go @@ -105,7 +105,11 @@ func (d *downloader) Download(ctx context.Context, fromBlock uint64, downloadedC break } - blockHeader := d.GetBlockHeader(ctx, lastBlock) + blockHeader, isCanceled := d.GetBlockHeader(ctx, lastBlock) + if isCanceled { + return + } + block := &sync.EVMBlock{ EVMBlockHeader: sync.EVMBlockHeader{ Num: blockHeader.Num, diff --git a/reorgdetector/reorgdetector.go b/reorgdetector/reorgdetector.go index 7a995bac..496a844c 100644 --- a/reorgdetector/reorgdetector.go +++ b/reorgdetector/reorgdetector.go @@ -120,12 +120,20 @@ func (rd *ReorgDetector) detectReorgInTrackedList(ctx context.Context) error { errGroup errgroup.Group ) - rd.trackedBlocksLock.Lock() - defer rd.trackedBlocksLock.Unlock() + subscriberIDs := rd.getSubscriberIDs() - for id, hdrs := range rd.trackedBlocks { + for _, id := range subscriberIDs { id := id - hdrs := hdrs + + // This is done like this because of a possible deadlock + // between AddBlocksToTrack and detectReorgInTrackedList + rd.trackedBlocksLock.RLock() + hdrs, ok := rd.trackedBlocks[id] + rd.trackedBlocksLock.RUnlock() + + if !ok { + continue + } errGroup.Go(func() error { headers := hdrs.getSorted() @@ -136,7 +144,7 @@ func (rd *ReorgDetector) detectReorgInTrackedList(ctx context.Context) error { if !ok || currentHeader == nil { if currentHeader, err = rd.client.HeaderByNumber(ctx, new(big.Int).SetUint64(hdr.Num)); err != nil { headersCacheLock.Unlock() - return fmt.Errorf("failed to get the header: %w", err) + return fmt.Errorf("failed to get the header %d: %w", hdr.Num, err) } headersCache[hdr.Num] = currentHeader } diff --git a/reorgdetector/reorgdetector_db.go b/reorgdetector/reorgdetector_db.go index 3174cbc0..79bd6cd4 100644 --- a/reorgdetector/reorgdetector_db.go +++ b/reorgdetector/reorgdetector_db.go @@ -53,6 +53,10 @@ func (rd *ReorgDetector) getTrackedBlocks(ctx context.Context) (map[string]*head // saveTrackedBlock saves the tracked block for a subscriber in db and in memory func (rd *ReorgDetector) saveTrackedBlock(ctx context.Context, id string, b header) error { + rd.trackedBlocksLock.Lock() + + // this has to go after the lock, because of a possible deadlock + // between AddBlocksToTrack and detectReorgInTrackedList tx, err := rd.db.BeginRw(ctx) if err != nil { return err @@ -60,7 +64,6 @@ func (rd *ReorgDetector) saveTrackedBlock(ctx context.Context, id string, b head defer tx.Rollback() - rd.trackedBlocksLock.Lock() hdrs, ok := rd.trackedBlocks[id] if !ok || hdrs.isEmpty() { hdrs = newHeadersList(b) diff --git a/reorgdetector/reorgdetector_sub.go b/reorgdetector/reorgdetector_sub.go index 675a81c5..c5002a2b 100644 --- a/reorgdetector/reorgdetector_sub.go +++ b/reorgdetector/reorgdetector_sub.go @@ -34,9 +34,24 @@ func (rd *ReorgDetector) Subscribe(id string) (*Subscription, error) { func (rd *ReorgDetector) notifySubscriber(id string, startingBlock header) { // Notify subscriber about this particular reorg rd.subscriptionsLock.RLock() - if sub, ok := rd.subscriptions[id]; ok { + sub, ok := rd.subscriptions[id] + rd.subscriptionsLock.RUnlock() + + if ok { sub.ReorgedBlock <- startingBlock.Num <-sub.ReorgProcessed } - rd.subscriptionsLock.RUnlock() +} + +// getSubscriberIDs returns a list of subscriber IDs +func (rd *ReorgDetector) getSubscriberIDs() []string { + rd.subscriptionsLock.RLock() + defer rd.subscriptionsLock.RUnlock() + + ids := make([]string, 0, len(rd.subscriptions)) + for id := range rd.subscriptions { + ids = append(ids, id) + } + + return ids } diff --git a/sync/evmdownloader.go b/sync/evmdownloader.go index c9c4e661..13539f2f 100644 --- a/sync/evmdownloader.go +++ b/sync/evmdownloader.go @@ -2,6 +2,7 @@ package sync import ( "context" + "errors" "math/big" "time" @@ -24,7 +25,7 @@ type EVMDownloaderInterface interface { WaitForNewBlocks(ctx context.Context, lastBlockSeen uint64) (newLastBlock uint64) GetEventsByBlockRange(ctx context.Context, fromBlock, toBlock uint64) []EVMBlock GetLogs(ctx context.Context, fromBlock, toBlock uint64) []types.Log - GetBlockHeader(ctx context.Context, blockNum uint64) EVMBlockHeader + GetBlockHeader(ctx context.Context, blockNum uint64) (EVMBlockHeader, bool) } type LogAppenderMap map[common.Hash]func(b *EVMBlock, l types.Log) error @@ -101,8 +102,13 @@ func (d *EVMDownloader) Download(ctx context.Context, fromBlock uint64, download if len(blocks) == 0 || blocks[len(blocks)-1].Num < toBlock { // Indicate the last downloaded block if there are not events on it d.log.Debugf("sending block %d to the driver (without events)", toBlock) + header, isCanceled := d.GetBlockHeader(ctx, toBlock) + if isCanceled { + return + } + downloadedCh <- EVMBlock{ - EVMBlockHeader: d.GetBlockHeader(ctx, toBlock), + EVMBlockHeader: header, } } fromBlock = toBlock + 1 @@ -170,44 +176,53 @@ func (d *EVMDownloaderImplementation) WaitForNewBlocks( } func (d *EVMDownloaderImplementation) GetEventsByBlockRange(ctx context.Context, fromBlock, toBlock uint64) []EVMBlock { - blocks := []EVMBlock{} - logs := d.GetLogs(ctx, fromBlock, toBlock) - for _, l := range logs { - if len(blocks) == 0 || blocks[len(blocks)-1].Num < l.BlockNumber { - b := d.GetBlockHeader(ctx, l.BlockNumber) - if b.Hash != l.BlockHash { - d.log.Infof( - "there has been a block hash change between the event query and the block query "+ - "for block %d: %s vs %s. Retrying.", - l.BlockNumber, b.Hash, l.BlockHash, - ) - return d.GetEventsByBlockRange(ctx, fromBlock, toBlock) + select { + case <-ctx.Done(): + return nil + default: + blocks := []EVMBlock{} + logs := d.GetLogs(ctx, fromBlock, toBlock) + for _, l := range logs { + if len(blocks) == 0 || blocks[len(blocks)-1].Num < l.BlockNumber { + b, canceled := d.GetBlockHeader(ctx, l.BlockNumber) + if canceled { + return nil + } + + if b.Hash != l.BlockHash { + d.log.Infof( + "there has been a block hash change between the event query and the block query "+ + "for block %d: %s vs %s. Retrying.", + l.BlockNumber, b.Hash, l.BlockHash, + ) + return d.GetEventsByBlockRange(ctx, fromBlock, toBlock) + } + blocks = append(blocks, EVMBlock{ + EVMBlockHeader: EVMBlockHeader{ + Num: l.BlockNumber, + Hash: l.BlockHash, + Timestamp: b.Timestamp, + ParentHash: b.ParentHash, + }, + Events: []interface{}{}, + }) } - blocks = append(blocks, EVMBlock{ - EVMBlockHeader: EVMBlockHeader{ - Num: l.BlockNumber, - Hash: l.BlockHash, - Timestamp: b.Timestamp, - ParentHash: b.ParentHash, - }, - Events: []interface{}{}, - }) - } - for { - attempts := 0 - err := d.appender[l.Topics[0]](&blocks[len(blocks)-1], l) - if err != nil { - attempts++ - d.log.Error("error trying to append log: ", err) - d.rh.Handle("getLogs", attempts) - continue + for { + attempts := 0 + err := d.appender[l.Topics[0]](&blocks[len(blocks)-1], l) + if err != nil { + attempts++ + d.log.Error("error trying to append log: ", err) + d.rh.Handle("getLogs", attempts) + continue + } + break } - break } - } - return blocks + return blocks + } } func (d *EVMDownloaderImplementation) GetLogs(ctx context.Context, fromBlock, toBlock uint64) []types.Log { @@ -224,6 +239,11 @@ func (d *EVMDownloaderImplementation) GetLogs(ctx context.Context, fromBlock, to for { unfilteredLogs, err = d.ethClient.FilterLogs(ctx, query) if err != nil { + if errors.Is(err, context.Canceled) { + // context is canceled, we don't want to fatal on max attempts in this case + return nil + } + attempts++ d.log.Error("error calling FilterLogs to eth client: ", err) d.rh.Handle("getLogs", attempts) @@ -243,11 +263,16 @@ func (d *EVMDownloaderImplementation) GetLogs(ctx context.Context, fromBlock, to return logs } -func (d *EVMDownloaderImplementation) GetBlockHeader(ctx context.Context, blockNum uint64) EVMBlockHeader { +func (d *EVMDownloaderImplementation) GetBlockHeader(ctx context.Context, blockNum uint64) (EVMBlockHeader, bool) { attempts := 0 for { header, err := d.ethClient.HeaderByNumber(ctx, new(big.Int).SetUint64(blockNum)) if err != nil { + if errors.Is(err, context.Canceled) { + // context is canceled, we don't want to fatal on max attempts in this case + return EVMBlockHeader{}, true + } + attempts++ d.log.Errorf("error getting block header for block %d, err: %v", blockNum, err) d.rh.Handle("getBlockHeader", attempts) @@ -258,6 +283,6 @@ func (d *EVMDownloaderImplementation) GetBlockHeader(ctx context.Context, blockN Hash: header.Hash(), ParentHash: header.ParentHash, Timestamp: header.Time, - } + }, false } } diff --git a/sync/evmdownloader_test.go b/sync/evmdownloader_test.go index 59c43b8f..04c92e72 100644 --- a/sync/evmdownloader_test.go +++ b/sync/evmdownloader_test.go @@ -222,9 +222,9 @@ func TestDownload(t *testing.T) { } expectedBlocks = append(expectedBlocks, b1) d.On("GetEventsByBlockRange", mock.Anything, uint64(0), uint64(1)). - Return([]EVMBlock{}) + Return([]EVMBlock{}, false) d.On("GetBlockHeader", mock.Anything, uint64(1)). - Return(b1.EVMBlockHeader) + Return(b1.EVMBlockHeader, false) // iteration 1: wait for next block to be created d.On("WaitForNewBlocks", mock.Anything, uint64(1)). @@ -240,7 +240,7 @@ func TestDownload(t *testing.T) { } expectedBlocks = append(expectedBlocks, b2) d.On("GetEventsByBlockRange", mock.Anything, uint64(2), uint64(2)). - Return([]EVMBlock{b2}) + Return([]EVMBlock{b2}, false) // iteration 3: wait for next block to be created (jump to block 8) d.On("WaitForNewBlocks", mock.Anything, uint64(2)). @@ -270,9 +270,9 @@ func TestDownload(t *testing.T) { } expectedBlocks = append(expectedBlocks, b6, b7, b8) d.On("GetEventsByBlockRange", mock.Anything, uint64(3), uint64(8)). - Return([]EVMBlock{b6, b7}) + Return([]EVMBlock{b6, b7}, false) d.On("GetBlockHeader", mock.Anything, uint64(8)). - Return(b8.EVMBlockHeader) + Return(b8.EVMBlockHeader, false) // iteration 5: wait for next block to be created (jump to block 30) d.On("WaitForNewBlocks", mock.Anything, uint64(8)). @@ -288,9 +288,9 @@ func TestDownload(t *testing.T) { } expectedBlocks = append(expectedBlocks, b19) d.On("GetEventsByBlockRange", mock.Anything, uint64(9), uint64(19)). - Return([]EVMBlock{}) + Return([]EVMBlock{}, false) d.On("GetBlockHeader", mock.Anything, uint64(19)). - Return(b19.EVMBlockHeader) + Return(b19.EVMBlockHeader, false) // iteration 7: from block 20 to 30, events on last block b30 := EVMBlock{ @@ -302,7 +302,7 @@ func TestDownload(t *testing.T) { } expectedBlocks = append(expectedBlocks, b30) d.On("GetEventsByBlockRange", mock.Anything, uint64(20), uint64(30)). - Return([]EVMBlock{b30}) + Return([]EVMBlock{b30}, false) // iteration 8: wait for next block to be created (jump to block 35) d.On("WaitForNewBlocks", mock.Anything, uint64(30)). @@ -369,14 +369,16 @@ func TestGetBlockHeader(t *testing.T) { // at first attempt clientMock.On("HeaderByNumber", ctx, blockNumBig).Return(returnedBlock, nil).Once() - actualBlock := d.GetBlockHeader(ctx, blockNum) + actualBlock, isCanceled := d.GetBlockHeader(ctx, blockNum) assert.Equal(t, expectedBlock, actualBlock) + assert.False(t, isCanceled) // after error from client clientMock.On("HeaderByNumber", ctx, blockNumBig).Return(nil, errors.New("foo")).Once() clientMock.On("HeaderByNumber", ctx, blockNumBig).Return(returnedBlock, nil).Once() - actualBlock = d.GetBlockHeader(ctx, blockNum) + actualBlock, isCanceled = d.GetBlockHeader(ctx, blockNum) assert.Equal(t, expectedBlock, actualBlock) + assert.False(t, isCanceled) } func buildAppender() LogAppenderMap { diff --git a/sync/evmdriver.go b/sync/evmdriver.go index ae7388e0..7865f645 100644 --- a/sync/evmdriver.go +++ b/sync/evmdriver.go @@ -71,6 +71,7 @@ reset: attempts int err error ) + for { lastProcessedBlock, err = d.processor.GetLastProcessedBlock(ctx) if err != nil { @@ -84,18 +85,19 @@ reset: cancellableCtx, cancel := context.WithCancel(ctx) defer cancel() + log.Info("Starting sync...", " lastProcessedBlock", lastProcessedBlock) // start downloading downloadCh := make(chan EVMBlock, d.downloadBufferSize) - go d.downloader.Download(cancellableCtx, lastProcessedBlock, downloadCh) + go d.downloader.Download(cancellableCtx, lastProcessedBlock+1, downloadCh) for { select { case b := <-downloadCh: - d.log.Debug("handleNewBlock") + d.log.Debug("handleNewBlock", " blockNum: ", b.Num, " blockHash: ", b.Hash) d.handleNewBlock(ctx, b) case firstReorgedBlock := <-d.reorgSub.ReorgedBlock: - d.log.Debug("handleReorg") - d.handleReorg(ctx, cancel, downloadCh, firstReorgedBlock) + d.log.Debug("handleReorg from block: ", firstReorgedBlock) + d.handleReorg(ctx, cancel, firstReorgedBlock) goto reset } } @@ -130,15 +132,10 @@ func (d *EVMDriver) handleNewBlock(ctx context.Context, b EVMBlock) { } } -func (d *EVMDriver) handleReorg( - ctx context.Context, cancel context.CancelFunc, downloadCh chan EVMBlock, firstReorgedBlock uint64, -) { +func (d *EVMDriver) handleReorg(ctx context.Context, cancel context.CancelFunc, firstReorgedBlock uint64) { // stop downloader cancel() - _, ok := <-downloadCh - for ok { - _, ok = <-downloadCh - } + // handle reorg attempts := 0 for { diff --git a/sync/evmdriver_test.go b/sync/evmdriver_test.go index 907dac28..c17370e1 100644 --- a/sync/evmdriver_test.go +++ b/sync/evmdriver_test.go @@ -198,36 +198,19 @@ func TestHandleReorg(t *testing.T) { // happy path _, cancel := context.WithCancel(ctx) - downloadCh := make(chan EVMBlock) firstReorgedBlock := uint64(5) pm.On("Reorg", ctx, firstReorgedBlock).Return(nil) - go driver.handleReorg(ctx, cancel, downloadCh, firstReorgedBlock) - close(downloadCh) + go driver.handleReorg(ctx, cancel, firstReorgedBlock) done := <-reorgProcessed require.True(t, done) - // download ch sends some garbage - _, cancel = context.WithCancel(ctx) - downloadCh = make(chan EVMBlock) - firstReorgedBlock = uint64(6) - pm.On("Reorg", ctx, firstReorgedBlock).Return(nil) - go driver.handleReorg(ctx, cancel, downloadCh, firstReorgedBlock) - downloadCh <- EVMBlock{} - downloadCh <- EVMBlock{} - downloadCh <- EVMBlock{} - close(downloadCh) - done = <-reorgProcessed - require.True(t, done) - // processor fails 2 times _, cancel = context.WithCancel(ctx) - downloadCh = make(chan EVMBlock) firstReorgedBlock = uint64(7) pm.On("Reorg", ctx, firstReorgedBlock).Return(errors.New("foo")).Once() pm.On("Reorg", ctx, firstReorgedBlock).Return(errors.New("foo")).Once() pm.On("Reorg", ctx, firstReorgedBlock).Return(nil).Once() - go driver.handleReorg(ctx, cancel, downloadCh, firstReorgedBlock) - close(downloadCh) + go driver.handleReorg(ctx, cancel, firstReorgedBlock) done = <-reorgProcessed require.True(t, done) } diff --git a/sync/mock_downloader_test.go b/sync/mock_downloader_test.go index c965efb6..f28045b5 100644 --- a/sync/mock_downloader_test.go +++ b/sync/mock_downloader_test.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.1. DO NOT EDIT. +// Code generated by mockery v2.45.0. DO NOT EDIT. package sync @@ -20,7 +20,7 @@ func (_m *EVMDownloaderMock) Download(ctx context.Context, fromBlock uint64, dow } // GetBlockHeader provides a mock function with given fields: ctx, blockNum -func (_m *EVMDownloaderMock) GetBlockHeader(ctx context.Context, blockNum uint64) EVMBlockHeader { +func (_m *EVMDownloaderMock) GetBlockHeader(ctx context.Context, blockNum uint64) (EVMBlockHeader, bool) { ret := _m.Called(ctx, blockNum) if len(ret) == 0 { @@ -28,13 +28,23 @@ func (_m *EVMDownloaderMock) GetBlockHeader(ctx context.Context, blockNum uint64 } var r0 EVMBlockHeader + var r1 bool + if rf, ok := ret.Get(0).(func(context.Context, uint64) (EVMBlockHeader, bool)); ok { + return rf(ctx, blockNum) + } if rf, ok := ret.Get(0).(func(context.Context, uint64) EVMBlockHeader); ok { r0 = rf(ctx, blockNum) } else { r0 = ret.Get(0).(EVMBlockHeader) } - return r0 + if rf, ok := ret.Get(1).(func(context.Context, uint64) bool); ok { + r1 = rf(ctx, blockNum) + } else { + r1 = ret.Get(1).(bool) + } + + return r0, r1 } // GetEventsByBlockRange provides a mock function with given fields: ctx, fromBlock, toBlock diff --git a/tree/appendonlytree.go b/tree/appendonlytree.go index 418a576b..5b14b962 100644 --- a/tree/appendonlytree.go +++ b/tree/appendonlytree.go @@ -113,7 +113,7 @@ func (t *AppendOnlyTree) initCache(tx db.Txer) error { } // Reverse the siblings to go from leafs to root - for i, j := 0, len(siblings)-1; i < j; i, j = i+1, j-1 { + for i, j := 0, len(siblings)-1; i == j; i, j = i+1, j-1 { siblings[i], siblings[j] = siblings[j], siblings[i] } diff --git a/tree/tree_test.go b/tree/tree_test.go index b5278723..dc2cfc9e 100644 --- a/tree/tree_test.go +++ b/tree/tree_test.go @@ -2,6 +2,7 @@ package tree_test import ( "context" + "database/sql" "encoding/json" "fmt" "os" @@ -18,6 +19,88 @@ import ( "github.com/stretchr/testify/require" ) +func TestCheckExpectedRoot(t *testing.T) { + createTreeDB := func() *sql.DB { + dbPath := path.Join(t.TempDir(), "file::memory:?cache=shared") + log.Debug("DB created at: ", dbPath) + require.NoError(t, migrations.RunMigrations(dbPath)) + treeDB, err := db.NewSQLiteDB(dbPath) + require.NoError(t, err) + + return treeDB + } + + addLeaves := func(merkletree *tree.AppendOnlyTree, + treeDB *sql.DB, + numOfLeavesToAdd, from int) { + tx, err := db.NewTx(context.Background(), treeDB) + require.NoError(t, err) + + for i := from; i < from+numOfLeavesToAdd; i++ { + require.NoError(t, merkletree.AddLeaf(tx, uint64(i), 0, types.Leaf{ + Index: uint32(i), + Hash: common.HexToHash(fmt.Sprintf("%x", i)), + })) + } + + require.NoError(t, tx.Commit()) + } + + t.Run("Check when no reorg", func(t *testing.T) { + numOfLeavesToAdd := 10 + indexToCheck := uint32(numOfLeavesToAdd - 1) + + treeDB := createTreeDB() + merkleTree := tree.NewAppendOnlyTree(treeDB, "") + + addLeaves(merkleTree, treeDB, numOfLeavesToAdd, 0) + + expectedRoot, err := merkleTree.GetLastRoot(context.Background()) + require.NoError(t, err) + + addLeaves(merkleTree, treeDB, numOfLeavesToAdd, numOfLeavesToAdd) + + root2, err := merkleTree.GetRootByIndex(context.Background(), indexToCheck) + require.NoError(t, err) + require.Equal(t, expectedRoot.Hash, root2.Hash) + require.Equal(t, expectedRoot.Index, root2.Index) + }) + + t.Run("Check after rebuild tree when reorg", func(t *testing.T) { + numOfLeavesToAdd := 10 + indexToCheck := uint32(numOfLeavesToAdd - 1) + treeDB := createTreeDB() + merkleTree := tree.NewAppendOnlyTree(treeDB, "") + + addLeaves(merkleTree, treeDB, numOfLeavesToAdd, 0) + + expectedRoot, err := merkleTree.GetLastRoot(context.Background()) + require.NoError(t, err) + + addLeaves(merkleTree, treeDB, numOfLeavesToAdd, numOfLeavesToAdd) + + // reorg tree + tx, err := db.NewTx(context.Background(), treeDB) + require.NoError(t, err) + require.NoError(t, merkleTree.Reorg(tx, uint64(indexToCheck+1))) + require.NoError(t, tx.Commit()) + + // rebuild cache on adding new leaf + tx, err = db.NewTx(context.Background(), treeDB) + require.NoError(t, err) + require.NoError(t, merkleTree.AddLeaf(tx, uint64(indexToCheck+1), 0, types.Leaf{ + Index: indexToCheck + 1, + Hash: common.HexToHash(fmt.Sprintf("%x", indexToCheck+1)), + })) + require.NoError(t, tx.Commit()) + + root2, err := merkleTree.GetRootByIndex(context.Background(), indexToCheck) + require.NoError(t, err) + require.Equal(t, expectedRoot.Hash, root2.Hash) + require.Equal(t, expectedRoot.Index, root2.Index) + }) +} + func TestMTAddLeaf(t *testing.T) { data, err := os.ReadFile("testvectors/root-vectors.json") require.NoError(t, err)