From b32ccd4f0d402729d2867cc689f38c61f02f81f9 Mon Sep 17 00:00:00 2001 From: Jeb Bearer Date: Tue, 16 Jan 2024 16:54:14 -0500 Subject: [PATCH] Ensure seen_at blocks exist before inserting virtual batches --- state/state.go | 1 + synchronizer/synchronizer.go | 80 +++++++++++++++++++++--------------- 2 files changed, 47 insertions(+), 34 deletions(-) diff --git a/state/state.go b/state/state.go index d5ede3c..ede083a 100644 --- a/state/state.go +++ b/state/state.go @@ -1348,6 +1348,7 @@ func (s *State) SetGenesis(ctx context.Context, block Block, genesis Genesis, db TxHash: ZeroHash, Coinbase: ZeroAddress, BlockNumber: block.BlockNumber, + SeenAt: block.BlockNumber, } err = s.AddVirtualBatch(ctx, virtualBatch, dbTx) if err != nil { diff --git a/synchronizer/synchronizer.go b/synchronizer/synchronizer.go index cc70188..9ef0c91 100644 --- a/synchronizer/synchronizer.go +++ b/synchronizer/synchronizer.go @@ -641,47 +641,18 @@ func (s *ClientSynchronizer) processSequenceBatches(sequencedBatches []etherman. return nil } + if err := s.ensureBlockExists(dbTx, batchesSeenAtBlock); err != nil { + return err + } + for _, sbatch := range sequencedBatches { // Ensure the L1 origin for this batch is in the database. Since the L1 origin assigned by // HotShot is not necessarily the same as an L1 block which we added to the database as a // result of receiving a contract event, it might not be. blockNumber := sbatch.BlockNumber - exists, err := s.state.ContainsBlock(s.ctx, blockNumber, dbTx) - if err != nil { - log.Errorf("error fetching L1 block %d from db: %v", blockNumber, err) - rollbackErr := dbTx.Rollback(s.ctx) - if rollbackErr != nil { - log.Fatalf("error rolling back state. BlockNumber: %d, rollbackErr: %s, error: %w", blockNumber, rollbackErr.Error(), err) - } + if err := s.ensureBlockExists(dbTx, blockNumber); err != nil { return err } - if !exists { - header, err := s.etherMan.HeaderByNumber(s.ctx, big.NewInt(int64(blockNumber))) - if err != nil { - log.Errorf("error fetching L1 block %d: %v", blockNumber, err) - rollbackErr := dbTx.Rollback(s.ctx) - if rollbackErr != nil { - log.Fatalf("error rolling back state. BlockNumber: %d, rollbackErr: %s, error: %w", blockNumber, rollbackErr.Error(), err) - } - return err - } - b := state.Block{ - BlockNumber: blockNumber, - BlockHash: header.Hash(), - ParentHash: header.ParentHash, - ReceivedAt: time.Unix(int64(header.Time), 0), - } - log.Infof("L1 block %d does not already exist, adding to database", blockNumber) - err = s.state.AddBlock(s.ctx, &b, dbTx) - if err != nil { - log.Errorf("error storing block. BlockNumber: %d, error: %w", blockNumber, err) - rollbackErr := dbTx.Rollback(s.ctx) - if rollbackErr != nil { - log.Fatalf("error rolling back state to store block. BlockNumber: %d, rollbackErr: %s, error : %w", blockNumber, rollbackErr.Error(), err) - } - return err - } - } virtualBatch := state.VirtualBatch{ BatchNumber: sbatch.BatchNumber, @@ -796,6 +767,47 @@ func (s *ClientSynchronizer) processSequenceBatches(sequencedBatches []etherman. return nil } +func (s *ClientSynchronizer) ensureBlockExists(dbTx pgx.Tx, blockNumber uint64) error { + exists, err := s.state.ContainsBlock(s.ctx, blockNumber, dbTx) + if err != nil { + log.Errorf("error fetching L1 block %d from db: %v", blockNumber, err) + rollbackErr := dbTx.Rollback(s.ctx) + if rollbackErr != nil { + log.Fatalf("error rolling back state. BlockNumber: %d, rollbackErr: %s, error: %w", blockNumber, rollbackErr.Error(), err) + } + return err + } + if !exists { + header, err := s.etherMan.HeaderByNumber(s.ctx, big.NewInt(int64(blockNumber))) + if err != nil { + log.Errorf("error fetching L1 block %d: %v", blockNumber, err) + rollbackErr := dbTx.Rollback(s.ctx) + if rollbackErr != nil { + log.Fatalf("error rolling back state. BlockNumber: %d, rollbackErr: %s, error: %w", blockNumber, rollbackErr.Error(), err) + } + return err + } + b := state.Block{ + BlockNumber: blockNumber, + BlockHash: header.Hash(), + ParentHash: header.ParentHash, + ReceivedAt: time.Unix(int64(header.Time), 0), + } + log.Infof("L1 block %d does not already exist, adding to database", blockNumber) + err = s.state.AddBlock(s.ctx, &b, dbTx) + if err != nil { + log.Errorf("error storing block. BlockNumber: %d, error: %w", blockNumber, err) + rollbackErr := dbTx.Rollback(s.ctx) + if rollbackErr != nil { + log.Fatalf("error rolling back state to store block. BlockNumber: %d, rollbackErr: %s, error : %w", blockNumber, rollbackErr.Error(), err) + } + return err + } + } + + return nil +} + func (s *ClientSynchronizer) processForcedBatch(forcedBatch etherman.ForcedBatch, dbTx pgx.Tx) error { // Store forced batch into the db forcedB := state.ForcedBatch{