Skip to content

Commit

Permalink
Remove block contents from mempool (#485)
Browse files Browse the repository at this point in the history
* Add Hash type param to Attestation and Certificate

* remove block contents from mempool
  • Loading branch information
zeegomo authored Oct 30, 2023
1 parent 7fcfe89 commit e505618
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 8 deletions.
4 changes: 3 additions & 1 deletion nomos-core/src/da/attestation/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use crate::da::blob::Blob;
use bytes::Bytes;
use std::hash::Hash;

pub trait Attestation {
type Blob: Blob;
type Hash: Hash + Eq + Clone;
fn blob(&self) -> <Self::Blob as Blob>::Hash;
fn hash(&self) -> <Self::Blob as Blob>::Hash;
fn hash(&self) -> Self::Hash;
fn as_bytes(&self) -> Bytes;
}
4 changes: 3 additions & 1 deletion nomos-core/src/da/certificate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ pub mod select;

use crate::da::blob::Blob;
use bytes::Bytes;
use std::hash::Hash;

pub trait Certificate {
type Blob: Blob;
type Hash: Hash + Eq + Clone;
fn blob(&self) -> <Self::Blob as Blob>::Hash;
fn hash(&self) -> <Self::Blob as Blob>::Hash;
fn hash(&self) -> Self::Hash;
fn as_bytes(&self) -> Bytes;
}

Expand Down
3 changes: 3 additions & 0 deletions nomos-da/full-replication/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ pub struct Attestation {

impl attestation::Attestation for Attestation {
type Blob = Blob;
type Hash = [u8; 32];

fn blob(&self) -> [u8; 32] {
self.blob
}
Expand Down Expand Up @@ -145,6 +147,7 @@ impl Hash for Certificate {

impl certificate::Certificate for Certificate {
type Blob = Blob;
type Hash = [u8; 32];

fn blob(&self) -> <Self::Blob as blob::Blob>::Hash {
self.attestations[0].blob
Expand Down
57 changes: 51 additions & 6 deletions nomos-services/consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ where
ClPool::Settings: Send + Sync + 'static,
DaPool: MemPool + Send + Sync + 'static,
DaPool::Settings: Send + Sync + 'static,
ClPool::Item: Transaction
ClPool::Item: Transaction<Hash = ClPool::Key>
+ Debug
+ Clone
+ Eq
Expand All @@ -176,7 +176,7 @@ where
+ Send
+ Sync
+ 'static,
DaPool::Item: Certificate
DaPool::Item: Certificate<Hash = DaPool::Key>
+ Debug
+ Clone
+ Eq
Expand Down Expand Up @@ -365,7 +365,7 @@ where
ClPool::Settings: Send + Sync + 'static,
DaPool: MemPool + Send + Sync + 'static,
DaPool::Settings: Send + Sync + 'static,
ClPool::Item: Transaction
ClPool::Item: Transaction<Hash = ClPool::Key>
+ Debug
+ Clone
+ Eq
Expand All @@ -375,7 +375,7 @@ where
+ Send
+ Sync
+ 'static,
DaPool::Item: Certificate
DaPool::Item: Certificate<Hash = DaPool::Key>
+ Debug
+ Clone
+ Eq
Expand Down Expand Up @@ -455,6 +455,8 @@ where
task_manager,
adapter.clone(),
storage_relay,
cl_mempool_relay,
da_mempool_relay,
)
.await;
}
Expand Down Expand Up @@ -530,15 +532,28 @@ where
carnot
}

#[allow(clippy::type_complexity)]
#[instrument(level = "debug", skip(adapter, task_manager, stream, storage_relay))]
#[allow(clippy::type_complexity, clippy::too_many_arguments)]
#[instrument(
level = "debug",
skip(
carnot,
adapter,
task_manager,
stream,
storage_relay,
cl_mempool_relay,
da_mempool_relay
)
)]
async fn process_block(
mut carnot: Carnot<O>,
block: Block<ClPool::Item, DaPool::Item>,
mut stream: Pin<Box<dyn Stream<Item = Block<ClPool::Item, DaPool::Item>> + Send>>,
task_manager: &mut TaskManager<View, Event<ClPool::Item, DaPool::Item>>,
adapter: A,
storage_relay: OutboundRelay<StorageMsg<Storage>>,
cl_mempool_relay: OutboundRelay<MempoolMsg<ClPool::Item, ClPool::Key>>,
da_mempool_relay: OutboundRelay<MempoolMsg<DaPool::Item, DaPool::Key>>,
) -> (Carnot<O>, Option<Output<ClPool::Item, DaPool::Item>>) {
tracing::debug!("received proposal {:?}", block);
if carnot.highest_voted_view() >= block.header().view {
Expand Down Expand Up @@ -569,6 +584,22 @@ where
if let Err((e, _msg)) = storage_relay.send(msg).await {
tracing::error!("Could not send block to storage: {e}");
}

// remove included content from mempool
mark_in_block(
cl_mempool_relay,
original_block.transactions().map(Transaction::hash),
block.id,
)
.await;

mark_in_block(
da_mempool_relay,
original_block.blobs().map(Certificate::hash),
block.id,
)
.await;

if new_view != carnot.current_view() {
task_manager.push(
block.view,
Expand Down Expand Up @@ -1076,6 +1107,20 @@ async fn get_mempool_contents<Item, Key>(
rx.await
}

async fn mark_in_block<Item, Key>(
mempool: OutboundRelay<MempoolMsg<Item, Key>>,
ids: impl Iterator<Item = Key>,
block: BlockId,
) {
mempool
.send(MempoolMsg::MarkInBlock {
ids: ids.collect(),
block,
})
.await
.unwrap_or_else(|(e, _)| tracing::error!("Could not mark items in block: {e}"))
}

#[cfg(test)]
mod tests {
use consensus_engine::Block;
Expand Down

0 comments on commit e505618

Please sign in to comment.