Skip to content

Commit

Permalink
feat: prevent potential logs being missed during initial sync
Browse files Browse the repository at this point in the history
  • Loading branch information
t00ts committed Sep 26, 2024
1 parent 55963dc commit 1205c11
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 41 deletions.
65 changes: 61 additions & 4 deletions crates/ethereum/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,10 @@ pub trait EthereumApi {
from_block: L1BlockNumber,
to_block: L1BlockNumber,
) -> anyhow::Result<Vec<EthereumStateUpdate>>;
async fn listen<F, Fut>(
async fn sync_and_listen<F, Fut>(
&mut self,
address: &H160,
from_block: L1BlockNumber,
poll_interval: Duration,
callback: F,
) -> anyhow::Result<()>
Expand Down Expand Up @@ -155,9 +156,10 @@ impl EthereumApi for EthereumClient {
/// Listens for Ethereum events and notifies the caller using the provided
/// callback. State updates will only be emitted once they belong to a
/// finalized block.
async fn listen<F, Fut>(
async fn sync_and_listen<F, Fut>(
&mut self,
address: &H160,
from_block: L1BlockNumber,
poll_interval: Duration,
callback: F,
) -> anyhow::Result<()>
Expand All @@ -169,9 +171,40 @@ impl EthereumApi for EthereumClient {
let ws = WsConnect::new(self.url.clone());
let provider = ProviderBuilder::new().on_ws(ws).await?;

// Fetch the current Starknet state from Ethereum
let state_update = self.get_starknet_state(address).await?;
let _ = callback(EthereumEvent::StateUpdate(state_update)).await;

// Sync logs from the last known L1 handler block up to the latest finalized
// block
let logs = self
.get_message_logs(
address,
from_block,
state_update
.l1_block_number
.expect("missing l1 block number"),
)
.await?;

tracing::trace!(
number_of_logs=%logs.len(),
from_block=%from_block,
to_block=?state_update.l1_block_number.unwrap(),
"Fetched L1 to L2 message logs",
);

for log in logs {
let _ = callback(EthereumEvent::MessageLog(log)).await;
}

// Prevent a gap between the latest confirmed L1 block and the tip of the chain
// when syncing logs.
let mut logs_in_sync = false;

// Create the StarknetCoreContract instance
let address = Address::new((*address).into());
let core_contract = StarknetCoreContract::new(address, provider.clone());
let core_address = Address::new((*address).into());
let core_contract = StarknetCoreContract::new(core_address, provider.clone());

// Listen for L1 to L2 message events
let mut logs = provider
Expand Down Expand Up @@ -227,6 +260,30 @@ impl EthereumApi for EthereumClient {
}
}
Some(log) = logs.next() => {
// Fetch potentially unsynced logs from the last finalized block number
// up to the block number of the current log.
if !logs_in_sync {
let log_l1_block_number = log.block_number.map(L1BlockNumber::new_or_panic);
let unsynced_logs = self.get_message_logs(
address,
state_update
.l1_block_number
.expect("missing l1 block number"),
log_l1_block_number
.expect("missing l1 block number"),
)
.await?;
tracing::trace!(
number_of_logs=%unsynced_logs.len(),
from_block=?state_update.l1_block_number,
to_block=?log_l1_block_number,
"Fetched unsynced L1 to L2 message logs",
);
for log in unsynced_logs {
callback(EthereumEvent::MessageLog(log)).await;
}
logs_in_sync = true;
}
// Decode the message
let log: Log<StarknetCoreContract::LogMessageToL2> = log.log_decode()?;
// Create L1ToL2MessageHash from the log data
Expand Down
53 changes: 16 additions & 37 deletions crates/pathfinder/src/state/sync/l1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,47 +41,26 @@ where

let tx_event = std::sync::Arc::new(tx_event);

// Fetch the current Starknet state from Ethereum
let state_update = ethereum.get_starknet_state(&core_address).await?;
let _ = tx_event.send(SyncEvent::L1Update(state_update)).await;

// Sync logs from the last known L1 handler block
let logs = ethereum
.get_message_logs(
&core_address,
last_synced_l1_handler_block,
state_update
.l1_block_number
.expect("missing l1 block number"),
)
.await?;

tracing::trace!(
number_of_logs=%logs.len(),
from_block=%last_synced_l1_handler_block,
to_block=?state_update.l1_block_number.unwrap(),
"Fetched L1 to L2 message logs",
);

for log in logs {
let _ = tx_event.send(SyncEvent::L1ToL2Message(log)).await;
}

// Subscribe to subsequent state updates and message logs
ethereum
.listen(&core_address, poll_interval, move |event| {
let tx_event = tx_event.clone();
async move {
match event {
EthereumEvent::StateUpdate(state_update) => {
let _ = tx_event.send(SyncEvent::L1Update(state_update)).await;
}
EthereumEvent::MessageLog(log) => {
let _ = tx_event.send(SyncEvent::L1ToL2Message(log)).await;
.sync_and_listen(
&core_address,
last_synced_l1_handler_block,
poll_interval,
move |event| {
let tx_event = tx_event.clone();
async move {
match event {
EthereumEvent::StateUpdate(state_update) => {
let _ = tx_event.send(SyncEvent::L1Update(state_update)).await;
}
EthereumEvent::MessageLog(log) => {
let _ = tx_event.send(SyncEvent::L1ToL2Message(log)).await;
}
}
}
}
})
},
)
.await?;

Ok(())
Expand Down

0 comments on commit 1205c11

Please sign in to comment.