Skip to content

Commit

Permalink
[stateless_validation] fix: Pending chunk endorsement cache (#10568)
Browse files Browse the repository at this point in the history
Issue: #10560


https://near.zulipchat.com/#narrow/stream/407237-pagoda.2Fcore.2Fstateless-validation/topic/StatelessNet.20planning/near/419827117
> * add pending_chunk_endorsements cache to ChunkEndorsementTracker
> * if we have an error on get_chunk, add endorsements to pending cache
> * on on_chunk_completed, process all pending endorsements for chunk
hash
We discussed previously that full chunk may not be needed. But this
solution aligns well with the existing design
  • Loading branch information
staffik committed Feb 6, 2024
1 parent 2eec0c5 commit c198d8b
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 8 deletions.
4 changes: 4 additions & 0 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1373,8 +1373,12 @@ impl Client {
self.chain.blocks_delay_tracker.mark_chunk_completed(&chunk_header, StaticClock::utc());
self.block_production_info
.record_chunk_collected(partial_chunk.height_created(), partial_chunk.shard_id());

// TODO(#10569) We would like a proper error handling here instead of `expect`.
persist_chunk(partial_chunk, shard_chunk, self.chain.mut_chain_store())
.expect("Could not persist chunk");

self.chunk_endorsement_tracker.process_pending_endorsements(&chunk_header);
// We're marking chunk as accepted.
self.chain.blocks_with_missing_chunks.accept_chunk(&chunk_header.chunk_hash());
// If this was the last chunk that was missing for a block, it will be processed now.
Expand Down
75 changes: 67 additions & 8 deletions chain/client/src/stateless_validation/chunk_endorsement_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,28 @@ pub struct ChunkEndorsementTracker {
/// This is keyed on chunk_hash and account_id of validator to avoid duplicates.
/// Chunk endorsements would later be used as a part of block production.
chunk_endorsements: SyncLruCache<ChunkHash, HashMap<AccountId, ChunkEndorsement>>,
/// We store chunk endorsements to be processed later because we did not have
/// chunks ready at the time we received that endorsements from validators.
/// This is keyed on chunk_hash and account_id of validator to avoid duplicates.
pending_chunk_endorsements: SyncLruCache<ChunkHash, HashMap<AccountId, ChunkEndorsement>>,
}

impl Client {
pub fn process_chunk_endorsement(
&mut self,
endorsement: ChunkEndorsement,
) -> Result<(), Error> {
let chunk_header = self.chain.get_chunk(endorsement.chunk_hash())?.cloned_header();
self.chunk_endorsement_tracker.process_chunk_endorsement(&chunk_header, endorsement)
// We should not need whole chunk ready here, we only need chunk header.
match self.chain.get_chunk(endorsement.chunk_hash()) {
Ok(chunk) => self
.chunk_endorsement_tracker
.process_chunk_endorsement(&chunk.cloned_header(), endorsement),
Err(Error::ChunkMissing(_)) => {
tracing::debug!(target: "stateless_validation", ?endorsement, "Endorsement arrived before chunk.");
self.chunk_endorsement_tracker.add_chunk_endorsement_to_pending_cache(endorsement)
}
Err(error) => return Err(error),
}
}
}

Expand All @@ -40,33 +53,79 @@ impl ChunkEndorsementTracker {
Self {
epoch_manager,
chunk_endorsements: SyncLruCache::new(NUM_CHUNKS_IN_CHUNK_ENDORSEMENTS_CACHE),
// We can use a different cache size if needed, it does not have to be the same as for `chunk_endorsements`.
pending_chunk_endorsements: SyncLruCache::new(NUM_CHUNKS_IN_CHUNK_ENDORSEMENTS_CACHE),
}
}

/// Process pending endorsements for the given chunk header.
/// It removes these endorsements from the `pending_chunk_endorsements` cache.
pub fn process_pending_endorsements(&self, chunk_header: &ShardChunkHeader) {
let chunk_hash = &chunk_header.chunk_hash();
let chunk_endorsements = {
let mut guard = self.pending_chunk_endorsements.lock();
guard.pop(chunk_hash)
};
let Some(chunk_endorsements) = chunk_endorsements else {
return;
};
tracing::debug!(target: "stateless_validation", ?chunk_hash, "Processing pending chunk endorsements.");
for endorsement in chunk_endorsements.values() {
if let Err(error) = self.process_chunk_endorsement(chunk_header, endorsement.clone()) {
tracing::debug!(target: "stateless_validation", ?endorsement, "Error processing pending chunk endorsement: {:?}", error);
}
}
}

/// Add the chunk endorsement to a cache of pending chunk endorsements (if not yet there).
pub(crate) fn add_chunk_endorsement_to_pending_cache(
&self,
endorsement: ChunkEndorsement,
) -> Result<(), Error> {
self.process_chunk_endorsement_impl(endorsement, None)
}

/// Function to process an incoming chunk endorsement from chunk validators.
/// We first verify the chunk endorsement and then store it in a cache.
/// We would later include the endorsements in the block production.
pub(crate) fn process_chunk_endorsement(
&self,
chunk_header: &ShardChunkHeader,
endorsement: ChunkEndorsement,
) -> Result<(), Error> {
self.process_chunk_endorsement_impl(endorsement, Some(chunk_header))
}

/// If the chunk header is available, we will verify the chunk endorsement and then store it in a cache.
/// Otherwise, we store the endorsement in a separate cache of endorsements to be processed when the chunk is ready.
fn process_chunk_endorsement_impl(
&self,
endorsement: ChunkEndorsement,
chunk_header: Option<&ShardChunkHeader>,
) -> Result<(), Error> {
let chunk_hash = endorsement.chunk_hash();
let account_id = &endorsement.account_id;

let endorsement_cache = if chunk_header.is_some() {
&self.chunk_endorsements
} else {
&self.pending_chunk_endorsements
};

// If we have already processed this chunk endorsement, return early.
if self
.chunk_endorsements
if endorsement_cache
.get(chunk_hash)
.is_some_and(|existing_endorsements| existing_endorsements.get(account_id).is_some())
{
tracing::debug!(target: "stateless_validation", ?endorsement, "Already received chunk endorsement.");
return Ok(());
}

if !self.epoch_manager.verify_chunk_endorsement(chunk_header, &endorsement)? {
tracing::error!(target: "stateless_validation", ?endorsement, "Invalid chunk endorsement.");
return Err(Error::InvalidChunkEndorsement);
if let Some(chunk_header) = chunk_header {
if !self.epoch_manager.verify_chunk_endorsement(&chunk_header, &endorsement)? {
tracing::error!(target: "stateless_validation", ?endorsement, "Invalid chunk endorsement.");
return Err(Error::InvalidChunkEndorsement);
}
}

// If we are the current block producer, we store the chunk endorsement for each chunk which
Expand All @@ -76,7 +135,7 @@ impl ChunkEndorsementTracker {
// Maybe add check to ensure we don't accept endorsements from chunks already included in some block?
// Maybe add check to ensure we don't accept endorsements from chunks that have too old height_created?
tracing::debug!(target: "stateless_validation", ?endorsement, "Received and saved chunk endorsement.");
let mut guard = self.chunk_endorsements.lock();
let mut guard = endorsement_cache.lock();
guard.get_or_insert(chunk_hash.clone(), || HashMap::new());
let chunk_endorsements = guard.get_mut(chunk_hash).unwrap();
chunk_endorsements.insert(account_id.clone(), endorsement);
Expand Down

0 comments on commit c198d8b

Please sign in to comment.