diff --git a/crates/ethereum/src/lib.rs b/crates/ethereum/src/lib.rs index 2939cc10b..9a1249f23 100644 --- a/crates/ethereum/src/lib.rs +++ b/crates/ethereum/src/lib.rs @@ -101,9 +101,10 @@ pub trait EthereumApi { from_block: L1BlockNumber, to_block: L1BlockNumber, ) -> anyhow::Result>; - async fn listen( + async fn sync_and_listen( &mut self, address: &H160, + from_block: L1BlockNumber, poll_interval: Duration, callback: F, ) -> anyhow::Result<()> @@ -155,9 +156,10 @@ impl EthereumApi for EthereumClient { /// Listens for Ethereum events and notifies the caller using the provided /// callback. State updates will only be emitted once they belong to a /// finalized block. - async fn listen( + async fn sync_and_listen( &mut self, address: &H160, + from_block: L1BlockNumber, poll_interval: Duration, callback: F, ) -> anyhow::Result<()> @@ -169,9 +171,40 @@ impl EthereumApi for EthereumClient { let ws = WsConnect::new(self.url.clone()); let provider = ProviderBuilder::new().on_ws(ws).await?; + // Fetch the current Starknet state from Ethereum + let state_update = self.get_starknet_state(address).await?; + let _ = callback(EthereumEvent::StateUpdate(state_update)).await; + + // Sync logs from the last known L1 handler block up to the latest finalized + // block + let logs = self + .get_message_logs( + address, + from_block, + state_update + .l1_block_number + .expect("missing l1 block number"), + ) + .await?; + + tracing::trace!( + number_of_logs=%logs.len(), + from_block=%from_block, + to_block=?state_update.l1_block_number.unwrap(), + "Fetched L1 to L2 message logs", + ); + + for log in logs { + let _ = callback(EthereumEvent::MessageLog(log)).await; + } + + // Prevent a gap between the latest confirmed L1 block and the tip of the chain + // when syncing logs. + let mut logs_in_sync = false; + // Create the StarknetCoreContract instance - let address = Address::new((*address).into()); - let core_contract = StarknetCoreContract::new(address, provider.clone()); + let core_address = Address::new((*address).into()); + let core_contract = StarknetCoreContract::new(core_address, provider.clone()); // Listen for L1 to L2 message events let mut logs = provider @@ -227,6 +260,30 @@ impl EthereumApi for EthereumClient { } } Some(log) = logs.next() => { + // Fetch potentially unsynced logs from the last finalized block number + // up to the block number of the current log. + if !logs_in_sync { + let log_l1_block_number = log.block_number.map(L1BlockNumber::new_or_panic); + let unsynced_logs = self.get_message_logs( + address, + state_update + .l1_block_number + .expect("missing l1 block number"), + log_l1_block_number + .expect("missing l1 block number"), + ) + .await?; + tracing::trace!( + number_of_logs=%unsynced_logs.len(), + from_block=?state_update.l1_block_number, + to_block=?log_l1_block_number, + "Fetched unsynced L1 to L2 message logs", + ); + for log in unsynced_logs { + callback(EthereumEvent::MessageLog(log)).await; + } + logs_in_sync = true; + } // Decode the message let log: Log = log.log_decode()?; // Create L1ToL2MessageHash from the log data diff --git a/crates/pathfinder/src/state/sync/l1.rs b/crates/pathfinder/src/state/sync/l1.rs index 481bac7a5..d8444b803 100644 --- a/crates/pathfinder/src/state/sync/l1.rs +++ b/crates/pathfinder/src/state/sync/l1.rs @@ -41,47 +41,26 @@ where let tx_event = std::sync::Arc::new(tx_event); - // Fetch the current Starknet state from Ethereum - let state_update = ethereum.get_starknet_state(&core_address).await?; - let _ = tx_event.send(SyncEvent::L1Update(state_update)).await; - - // Sync logs from the last known L1 handler block - let logs = ethereum - .get_message_logs( - &core_address, - last_synced_l1_handler_block, - state_update - .l1_block_number - .expect("missing l1 block number"), - ) - .await?; - - tracing::trace!( - number_of_logs=%logs.len(), - from_block=%last_synced_l1_handler_block, - to_block=?state_update.l1_block_number.unwrap(), - "Fetched L1 to L2 message logs", - ); - - for log in logs { - let _ = tx_event.send(SyncEvent::L1ToL2Message(log)).await; - } - // Subscribe to subsequent state updates and message logs ethereum - .listen(&core_address, poll_interval, move |event| { - let tx_event = tx_event.clone(); - async move { - match event { - EthereumEvent::StateUpdate(state_update) => { - let _ = tx_event.send(SyncEvent::L1Update(state_update)).await; - } - EthereumEvent::MessageLog(log) => { - let _ = tx_event.send(SyncEvent::L1ToL2Message(log)).await; + .sync_and_listen( + &core_address, + last_synced_l1_handler_block, + poll_interval, + move |event| { + let tx_event = tx_event.clone(); + async move { + match event { + EthereumEvent::StateUpdate(state_update) => { + let _ = tx_event.send(SyncEvent::L1Update(state_update)).await; + } + EthereumEvent::MessageLog(log) => { + let _ = tx_event.send(SyncEvent::L1ToL2Message(log)).await; + } } } - } - }) + }, + ) .await?; Ok(())