Skip to content

Commit

Permalink
feat(exex): commit notifications to WAL before sending to ExExes (#11354
Browse files Browse the repository at this point in the history
)
  • Loading branch information
shekhirin authored Oct 2, 2024
1 parent 3bbbade commit 16be8b9
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 30 deletions.
34 changes: 24 additions & 10 deletions crates/exex/exex/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ impl ExExHandle {
// I.e., the ExEx has already processed the notification.
if finished_height.number >= new.tip().number {
debug!(
target: "exex::manager",
exex_id = %self.id,
%notification_id,
?finished_height,
Expand All @@ -135,6 +136,7 @@ impl ExExHandle {
}

debug!(
target: "exex::manager",
exex_id = %self.id,
%notification_id,
"Reserving slot for notification"
Expand All @@ -145,6 +147,7 @@ impl ExExHandle {
}

debug!(
target: "exex::manager",
exex_id = %self.id,
%notification_id,
"Sending notification"
Expand Down Expand Up @@ -327,7 +330,7 @@ where
/// This function checks if all ExExes are on the canonical chain and finalizes the WAL if
/// necessary.
fn finalize_wal(&self, finalized_header: SealedHeader) -> eyre::Result<()> {
debug!(header = ?finalized_header.num_hash(), "Received finalized header");
debug!(target: "exex::manager", header = ?finalized_header.num_hash(), "Received finalized header");

// Check if all ExExes are on the canonical chain
let exex_finished_heights = self
Expand Down Expand Up @@ -368,9 +371,13 @@ where
is_canonical.not().then_some((exex_id, num_hash))
})
.format_with(", ", |(exex_id, num_hash), f| {
f(&format_args!("{exex_id:?} = {num_hash:?}"))
});
f(&format_args!("{exex_id} = {num_hash:?}"))
})
// We need this because `debug!` uses the argument twice when formatting the final
// log message, but the result of `format_with` can only be used once
.to_string();
debug!(
target: "exex::manager",
%unfinalized_exexes,
"Not all ExExes are on the canonical chain, can't finalize the WAL"
);
Expand Down Expand Up @@ -403,7 +410,7 @@ where
// Handle incoming ExEx events
for exex in &mut this.exex_handles {
while let Poll::Ready(Some(event)) = exex.receiver.poll_recv(cx) {
debug!(exex_id = %exex.id, ?event, "Received event from ExEx");
debug!(target: "exex::manager", exex_id = %exex.id, ?event, "Received event from ExEx");
exex.metrics.events_sent_total.increment(1);
match event {
ExExEvent::FinishedHeight(height) => exex.finished_height = Some(height),
Expand All @@ -424,10 +431,12 @@ where
while this.buffer.len() < this.max_capacity {
if let Poll::Ready(Some(notification)) = this.handle_rx.poll_recv(cx) {
debug!(
target: "exex::manager",
committed_tip = ?notification.committed_chain().map(|chain| chain.tip().number),
reverted_tip = ?notification.reverted_chain().map(|chain| chain.tip().number),
"Received new notification"
);
this.wal.commit(&notification)?;
this.push_notification(notification);
continue
}
Expand Down Expand Up @@ -459,7 +468,7 @@ where
}

// Remove processed buffered notifications
debug!(%min_id, "Updating lowest notification id in buffer");
debug!(target: "exex::manager", %min_id, "Updating lowest notification id in buffer");
this.buffer.retain(|&(id, _)| id >= min_id);
this.min_id = min_id;

Expand Down Expand Up @@ -602,7 +611,7 @@ mod tests {
use super::*;
use alloy_primitives::B256;
use eyre::OptionExt;
use futures::StreamExt;
use futures::{FutureExt, StreamExt};
use rand::Rng;
use reth_primitives::SealedBlockWithSenders;
use reth_provider::{test_utils::create_test_provider_factory, BlockWriter, Chain};
Expand Down Expand Up @@ -1121,7 +1130,7 @@ mod tests {
}

#[tokio::test]
async fn test_exex_wal_finalize() -> eyre::Result<()> {
async fn test_exex_wal() -> eyre::Result<()> {
reth_tracing::init_test_tracing();

let mut rng = generators::rng();
Expand All @@ -1141,12 +1150,11 @@ mod tests {
let notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(vec![block.clone()], Default::default(), None)),
};
wal.commit(&notification)?;

let (finalized_headers_tx, rx) = watch::channel(None);
let finalized_header_stream = ForkChoiceStream::new(rx);

let (exex_handle, events_tx, _) =
let (exex_handle, events_tx, mut notifications) =
ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle());

let mut exex_manager = std::pin::pin!(ExExManager::new(
Expand All @@ -1159,7 +1167,13 @@ mod tests {

let mut cx = Context::from_waker(futures::task::noop_waker_ref());

assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
exex_manager.handle().send(notification.clone())?;

assert!(exex_manager.as_mut().poll(&mut cx)?.is_pending());
assert_eq!(
notifications.next().poll_unpin(&mut cx),
Poll::Ready(Some(notification.clone()))
);
assert_eq!(
exex_manager.wal.iter_notifications()?.collect::<eyre::Result<Vec<_>>>()?,
[notification.clone()]
Expand Down
6 changes: 2 additions & 4 deletions crates/exex/exex/src/notifications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,21 +223,19 @@ where
/// - ExEx is at the same block number as the node head (`node_head.number ==
/// exex_head.number`). Nothing to do.
fn check_backfill(&mut self) -> eyre::Result<()> {
debug!(target: "exex::manager", "Synchronizing ExEx head");

let backfill_job_factory =
BackfillJobFactory::new(self.executor.clone(), self.provider.clone());
match self.exex_head.block.number.cmp(&self.node_head.number) {
std::cmp::Ordering::Less => {
// ExEx is behind the node head, start backfill
debug!(target: "exex::manager", "ExEx is behind the node head and on the canonical chain, starting backfill");
debug!(target: "exex::notifications", "ExEx is behind the node head and on the canonical chain, starting backfill");
let backfill = backfill_job_factory
.backfill(self.exex_head.block.number + 1..=self.node_head.number)
.into_stream();
self.backfill_job = Some(backfill);
}
std::cmp::Ordering::Equal => {
debug!(target: "exex::manager", "ExEx is at the node head");
debug!(target: "exex::notifications", "ExEx is at the node head");
}
std::cmp::Ordering::Greater => {
return Err(eyre::eyre!("ExEx is ahead of the node head"))
Expand Down
12 changes: 6 additions & 6 deletions crates/exex/exex/src/wal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl WalInner {
}

/// Fills the block cache with the notifications from the storage.
#[instrument(target = "exex::wal", skip(self))]
#[instrument(skip(self))]
fn fill_block_cache(&mut self) -> eyre::Result<()> {
let Some(files_range) = self.storage.files_range()? else { return Ok(()) };
self.next_file_id.store(files_range.end() + 1, Ordering::Relaxed);
Expand Down Expand Up @@ -128,7 +128,7 @@ impl WalInner {
Ok(())
}

#[instrument(target = "exex::wal", skip_all, fields(
#[instrument(skip_all, fields(
reverted_block_range = ?notification.reverted_chain().as_ref().map(|chain| chain.range()),
committed_block_range = ?notification.committed_chain().as_ref().map(|chain| chain.range())
))]
Expand All @@ -138,27 +138,27 @@ impl WalInner {
let file_id = self.next_file_id.fetch_add(1, Ordering::Relaxed);
let size = self.storage.write_notification(file_id, notification)?;

debug!(?file_id, "Inserting notification blocks into the block cache");
debug!(target: "exex::wal", ?file_id, "Inserting notification blocks into the block cache");
block_cache.insert_notification_blocks_with_file_id(file_id, notification);

self.update_metrics(&block_cache, size as i64);

Ok(())
}

#[instrument(target = "exex::wal", skip(self))]
#[instrument(skip(self))]
fn finalize(&self, to_block: BlockNumHash) -> eyre::Result<()> {
let mut block_cache = self.block_cache.write();
let file_ids = block_cache.remove_before(to_block.number);

// Remove notifications from the storage.
if file_ids.is_empty() {
debug!("No notifications were finalized from the storage");
debug!(target: "exex::wal", "No notifications were finalized from the storage");
return Ok(())
}

let (removed_notifications, removed_size) = self.storage.remove_notifications(file_ids)?;
debug!(?removed_notifications, ?removed_size, "Storage was finalized");
debug!(target: "exex::wal", ?removed_notifications, ?removed_size, "Storage was finalized");

self.update_metrics(&block_cache, -(removed_size as i64));

Expand Down
22 changes: 12 additions & 10 deletions crates/exex/exex/src/wal/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,18 @@ impl Storage {
/// # Returns
///
/// The size of the file that was removed in bytes, if any.
#[instrument(target = "exex::wal::storage", skip(self))]
#[instrument(skip(self))]
fn remove_notification(&self, file_id: u32) -> Option<u64> {
let path = self.file_path(file_id);
let size = path.metadata().ok()?.len();

match reth_fs_util::remove_file(self.file_path(file_id)) {
Ok(()) => {
debug!("Notification was removed from the storage");
debug!(target: "exex::wal::storage", "Notification was removed from the storage");
Some(size)
}
Err(err) => {
debug!(?err, "Failed to remove notification from the storage");
debug!(target: "exex::wal::storage", ?err, "Failed to remove notification from the storage");
None
}
}
Expand Down Expand Up @@ -108,31 +108,33 @@ impl Storage {
) -> impl Iterator<Item = eyre::Result<(u32, u64, ExExNotification)>> + '_ {
range.map(move |id| {
let (notification, size) =
self.read_notification(id)?.ok_or_eyre("notification not found")?;
self.read_notification(id)?.ok_or_eyre("notification {id} not found")?;

Ok((id, size, notification))
})
}

/// Reads the notification from the file with the given ID.
#[instrument(target = "exex::wal::storage", skip(self))]
#[instrument(skip(self))]
pub(super) fn read_notification(
&self,
file_id: u32,
) -> eyre::Result<Option<(ExExNotification, u64)>> {
let file_path = self.file_path(file_id);
debug!(?file_path, "Reading notification from WAL");
debug!(target: "exex::wal::storage", ?file_path, "Reading notification from WAL");

let mut file = match File::open(&file_path) {
Ok(file) => file,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None),
Err(err) => return Err(err.into()),
Err(err) => return Err(reth_fs_util::FsPathError::open(err, &file_path).into()),
};
let size = file.metadata()?.len();

// Deserialize using the bincode- and msgpack-compatible serde wrapper
let notification: reth_exex_types::serde_bincode_compat::ExExNotification<'_> =
rmp_serde::decode::from_read(&mut file)?;
rmp_serde::decode::from_read(&mut file).map_err(|err| {
eyre::eyre!("failed to decode notification from {file_path:?}: {err:?}")
})?;

Ok(Some((notification.into(), size)))
}
Expand All @@ -142,14 +144,14 @@ impl Storage {
/// # Returns
///
/// The size of the file that was written in bytes.
#[instrument(target = "exex::wal::storage", skip(self, notification))]
#[instrument(skip(self, notification))]
pub(super) fn write_notification(
&self,
file_id: u32,
notification: &ExExNotification,
) -> eyre::Result<u64> {
let file_path = self.file_path(file_id);
debug!(?file_path, "Writing notification to WAL");
debug!(target: "exex::wal::storage", ?file_path, "Writing notification to WAL");

// Serialize using the bincode- and msgpack-compatible serde wrapper
let notification =
Expand Down

0 comments on commit 16be8b9

Please sign in to comment.