Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove block contents from mempool #485

Merged
merged 2 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion nomos-core/src/da/attestation/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use crate::da::blob::Blob;
use bytes::Bytes;
use std::hash::Hash;

pub trait Attestation {
type Blob: Blob;
type Hash: Hash + Eq + Clone;
fn blob(&self) -> <Self::Blob as Blob>::Hash;
fn hash(&self) -> <Self::Blob as Blob>::Hash;
fn hash(&self) -> Self::Hash;
fn as_bytes(&self) -> Bytes;
}
Comment on lines 5 to 11
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why adding the Hash associated type? Reasoning for using the blob one was to mark that the return hash is not the attestation/certificate/whatever but the related blob one. Which now that I think of it it makes sense at type level but maybe not so much as it cannot be enforced (neither I added documentation, completely wrong on my side).

Copy link
Contributor Author

@zeegomo zeegomo Oct 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reasoning for using the blob one was to mark that the return hash is not the attestation/certificate/whatever but the related blob one.

For that we have the blob method. Attestations and certificates could be for the same blob but still be different (e.g. produced by different nodes) and we need a way to differentiate those.

Why adding the Hash associated type?

In theory we could reuse the blob type to enforce that all the items for DA use the same hash type but I could not find a way to write a generic constraint like Attestation<Blob::Hash = T> (rust-lang/rust#52662), so I had to add this as a separate trait instead

4 changes: 3 additions & 1 deletion nomos-core/src/da/certificate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ pub mod select;

use crate::da::blob::Blob;
use bytes::Bytes;
use std::hash::Hash;

pub trait Certificate {
type Blob: Blob;
type Hash: Hash + Eq + Clone;
fn blob(&self) -> <Self::Blob as Blob>::Hash;
fn hash(&self) -> <Self::Blob as Blob>::Hash;
fn hash(&self) -> Self::Hash;
fn as_bytes(&self) -> Bytes;
}

Expand Down
3 changes: 3 additions & 0 deletions nomos-da/full-replication/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ pub struct Attestation {

impl attestation::Attestation for Attestation {
type Blob = Blob;
type Hash = [u8; 32];

fn blob(&self) -> [u8; 32] {
self.blob
}
Expand Down Expand Up @@ -145,6 +147,7 @@ impl Hash for Certificate {

impl certificate::Certificate for Certificate {
type Blob = Blob;
type Hash = [u8; 32];

fn blob(&self) -> <Self::Blob as blob::Blob>::Hash {
self.attestations[0].blob
Expand Down
57 changes: 51 additions & 6 deletions nomos-services/consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ where
ClPool::Settings: Send + Sync + 'static,
DaPool: MemPool + Send + Sync + 'static,
DaPool::Settings: Send + Sync + 'static,
ClPool::Item: Transaction
ClPool::Item: Transaction<Hash = ClPool::Key>
+ Debug
+ Clone
+ Eq
Expand All @@ -176,7 +176,7 @@ where
+ Send
+ Sync
+ 'static,
DaPool::Item: Certificate
DaPool::Item: Certificate<Hash = DaPool::Key>
+ Debug
+ Clone
+ Eq
Expand Down Expand Up @@ -365,7 +365,7 @@ where
ClPool::Settings: Send + Sync + 'static,
DaPool: MemPool + Send + Sync + 'static,
DaPool::Settings: Send + Sync + 'static,
ClPool::Item: Transaction
ClPool::Item: Transaction<Hash = ClPool::Key>
+ Debug
+ Clone
+ Eq
Expand All @@ -375,7 +375,7 @@ where
+ Send
+ Sync
+ 'static,
DaPool::Item: Certificate
DaPool::Item: Certificate<Hash = DaPool::Key>
+ Debug
+ Clone
+ Eq
Expand Down Expand Up @@ -455,6 +455,8 @@ where
task_manager,
adapter.clone(),
storage_relay,
cl_mempool_relay,
da_mempool_relay,
)
.await;
}
Expand Down Expand Up @@ -530,15 +532,28 @@ where
carnot
}

#[allow(clippy::type_complexity)]
#[instrument(level = "debug", skip(adapter, task_manager, stream, storage_relay))]
#[allow(clippy::type_complexity, clippy::too_many_arguments)]
#[instrument(
level = "debug",
skip(
carnot,
adapter,
task_manager,
stream,
storage_relay,
cl_mempool_relay,
da_mempool_relay
)
)]
async fn process_block(
mut carnot: Carnot<O>,
block: Block<ClPool::Item, DaPool::Item>,
mut stream: Pin<Box<dyn Stream<Item = Block<ClPool::Item, DaPool::Item>> + Send>>,
task_manager: &mut TaskManager<View, Event<ClPool::Item, DaPool::Item>>,
adapter: A,
storage_relay: OutboundRelay<StorageMsg<Storage>>,
cl_mempool_relay: OutboundRelay<MempoolMsg<ClPool::Item, ClPool::Key>>,
da_mempool_relay: OutboundRelay<MempoolMsg<DaPool::Item, DaPool::Key>>,
) -> (Carnot<O>, Option<Output<ClPool::Item, DaPool::Item>>) {
tracing::debug!("received proposal {:?}", block);
if carnot.highest_voted_view() >= block.header().view {
Expand Down Expand Up @@ -569,6 +584,22 @@ where
if let Err((e, _msg)) = storage_relay.send(msg).await {
tracing::error!("Could not send block to storage: {e}");
}

// remove included content from mempool
mark_in_block(
cl_mempool_relay,
original_block.transactions().map(Transaction::hash),
block.id,
)
.await;

mark_in_block(
da_mempool_relay,
original_block.blobs().map(Certificate::hash),
block.id,
)
.await;

if new_view != carnot.current_view() {
task_manager.push(
block.view,
Expand Down Expand Up @@ -1076,6 +1107,20 @@ async fn get_mempool_contents<Item, Key>(
rx.await
}

async fn mark_in_block<Item, Key>(
mempool: OutboundRelay<MempoolMsg<Item, Key>>,
ids: impl Iterator<Item = Key>,
block: BlockId,
) {
mempool
.send(MempoolMsg::MarkInBlock {
ids: ids.collect(),
block,
})
.await
.unwrap_or_else(|(e, _)| tracing::error!("Could not mark items in block: {e}"))
}

#[cfg(test)]
mod tests {
use consensus_engine::Block;
Expand Down
Loading