diff --git a/bin/client/src/l1/driver.rs b/bin/client/src/l1/driver.rs index 8f560353..dd8506a0 100644 --- a/bin/client/src/l1/driver.rs +++ b/bin/client/src/l1/driver.rs @@ -142,7 +142,7 @@ impl DerivationDriver { } } - attributes = self.pipeline.next_attributes(); + attributes = self.pipeline.next(); } Ok(attributes.expect("Must be some")) diff --git a/crates/derive/src/pipeline/core.rs b/crates/derive/src/pipeline/core.rs index 4852ef9a..3490e8cd 100644 --- a/crates/derive/src/pipeline/core.rs +++ b/crates/derive/src/pipeline/core.rs @@ -56,8 +56,13 @@ where S: NextAttributes + ResettableStage + OriginProvider + OriginAdvancer + Debug + Send + Sync, P: L2ChainProvider + Send + Sync + Debug, { + /// Peeks at the next prepared [L2AttributesWithParent] from the pipeline. + fn peek(&self) -> Option<&L2AttributesWithParent> { + self.prepared.front() + } + /// Returns the next prepared [L2AttributesWithParent] from the pipeline. - fn next_attributes(&mut self) -> Option { + fn next(&mut self) -> Option { self.prepared.pop_front() } diff --git a/crates/derive/src/traits/pipeline.rs b/crates/derive/src/traits/pipeline.rs index 7b7f3f49..1c25dd31 100644 --- a/crates/derive/src/traits/pipeline.rs +++ b/crates/derive/src/traits/pipeline.rs @@ -8,8 +8,11 @@ use kona_primitives::{BlockInfo, L2AttributesWithParent, L2BlockInfo}; /// This trait defines the interface for interacting with the derivation pipeline. #[async_trait] pub trait Pipeline: OriginProvider { + /// Peeks at the next [L2AttributesWithParent] from the pipeline. + fn peek(&self) -> Option<&L2AttributesWithParent>; + /// Returns the next [L2AttributesWithParent] from the pipeline. - fn next_attributes(&mut self) -> Option; + fn next(&mut self) -> Option; /// Resets the pipeline on the next [Pipeline::step] call. async fn reset(&mut self, origin: BlockInfo) -> anyhow::Result<()>; diff --git a/examples/trusted-sync/src/main.rs b/examples/trusted-sync/src/main.rs index 4edad3aa..b6d9a47a 100644 --- a/examples/trusted-sync/src/main.rs +++ b/examples/trusted-sync/src/main.rs @@ -99,17 +99,37 @@ async fn sync(cli: cli::Cli) -> Result<()> { } } - let attributes = if let Some(attributes) = pipeline.next_attributes() { - attributes + // Peek at the next prepared attributes and validate them. + if let Some(attributes) = pipeline.peek() { + match validator.validate(attributes).await { + Ok(true) => { + info!(target: LOG_TARGET, "Validated payload attributes"); + } + Ok(false) => { + error!(target: LOG_TARGET, "Failed payload validation: {}", attributes.parent.block_info.hash); + metrics::FAILED_PAYLOAD_DERIVATION.inc(); + let _ = pipeline.next(); // Take the attributes and continue + continue; + } + Err(e) => { + error!(target: LOG_TARGET, "Failed to validate payload attributes: {:?}", e); + // Don't take the next attributes, re-try the current one. + continue; + } + } } else { debug!(target: LOG_TARGET, "No attributes to validate"); continue; }; - if !validator.validate(&attributes).await { - error!(target: LOG_TARGET, "Failed payload validation: {}", attributes.parent.block_info.hash); - metrics::FAILED_PAYLOAD_DERIVATION.inc(); - } + // Take the next attributes from the pipeline since they're valid. + let attributes = if let Some(attributes) = pipeline.next() { + attributes + } else { + error!(target: LOG_TARGET, "Must have valid attributes"); + continue; + }; + // If we validated payload attributes, we should advance the cursor. advance_cursor_flag = true; metrics::DERIVED_ATTRIBUTES_COUNT.inc(); diff --git a/examples/trusted-sync/src/validation.rs b/examples/trusted-sync/src/validation.rs index d7148417..af0d706c 100644 --- a/examples/trusted-sync/src/validation.rs +++ b/examples/trusted-sync/src/validation.rs @@ -8,7 +8,7 @@ use kona_derive::types::{ L2AttributesWithParent, L2PayloadAttributes, RawTransaction, RollupConfig, }; use std::vec::Vec; -use tracing::warn; +use tracing::{error, warn}; /// OnlineValidator /// @@ -57,7 +57,7 @@ impl OnlineValidator { if let Ok(tx) = tx { txs.push(tx); } else { - warn!("Failed to fetch transaction: {:?}", tx); + warn!(target: "validation", "Failed to fetch transaction: {:?}", tx); } } Ok((block.header, txs)) @@ -80,10 +80,15 @@ impl OnlineValidator { } /// Validates the given [`L2AttributesWithParent`]. - pub async fn validate(&self, attributes: &L2AttributesWithParent) -> bool { + pub async fn validate(&self, attributes: &L2AttributesWithParent) -> Result { let expected = attributes.parent.block_info.number + 1; let tag = BlockNumberOrTag::from(expected); - let payload = self.get_payload(tag).await.unwrap(); - attributes.attributes == payload + match self.get_payload(tag).await { + Ok(payload) => Ok(attributes.attributes == payload), + Err(e) => { + error!(target: "validation", "Failed to fetch payload for block {}: {:?}", expected, e); + Err(e) + } + } } }