From c198d8b8989d7db09cf089e89118c25b9066346e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Chuda=C5=9B?= <18039094+staffik@users.noreply.github.com> Date: Tue, 6 Feb 2024 14:54:25 +0100 Subject: [PATCH] [stateless_validation] fix: Pending chunk endorsement cache (#10568) Issue: https://github.com/near/nearcore/issues/10560 https://near.zulipchat.com/#narrow/stream/407237-pagoda.2Fcore.2Fstateless-validation/topic/StatelessNet.20planning/near/419827117 > * add pending_chunk_endorsements cache to ChunkEndorsementTracker > * if we have an error on get_chunk, add endorsements to pending cache > * on on_chunk_completed, process all pending endorsements for chunk hash We discussed previously that full chunk may not be needed. But this solution aligns well with the existing design --- chain/client/src/client.rs | 4 + .../chunk_endorsement_tracker.rs | 75 +++++++++++++++++-- 2 files changed, 71 insertions(+), 8 deletions(-) diff --git a/chain/client/src/client.rs b/chain/client/src/client.rs index bdfa922bfd3..a30419741db 100644 --- a/chain/client/src/client.rs +++ b/chain/client/src/client.rs @@ -1373,8 +1373,12 @@ impl Client { self.chain.blocks_delay_tracker.mark_chunk_completed(&chunk_header, StaticClock::utc()); self.block_production_info .record_chunk_collected(partial_chunk.height_created(), partial_chunk.shard_id()); + + // TODO(#10569) We would like a proper error handling here instead of `expect`. persist_chunk(partial_chunk, shard_chunk, self.chain.mut_chain_store()) .expect("Could not persist chunk"); + + self.chunk_endorsement_tracker.process_pending_endorsements(&chunk_header); // We're marking chunk as accepted. self.chain.blocks_with_missing_chunks.accept_chunk(&chunk_header.chunk_hash()); // If this was the last chunk that was missing for a block, it will be processed now. diff --git a/chain/client/src/stateless_validation/chunk_endorsement_tracker.rs b/chain/client/src/stateless_validation/chunk_endorsement_tracker.rs index e28dcbe5e72..47472560177 100644 --- a/chain/client/src/stateless_validation/chunk_endorsement_tracker.rs +++ b/chain/client/src/stateless_validation/chunk_endorsement_tracker.rs @@ -23,6 +23,10 @@ pub struct ChunkEndorsementTracker { /// This is keyed on chunk_hash and account_id of validator to avoid duplicates. /// Chunk endorsements would later be used as a part of block production. chunk_endorsements: SyncLruCache>, + /// We store chunk endorsements to be processed later because we did not have + /// chunks ready at the time we received that endorsements from validators. + /// This is keyed on chunk_hash and account_id of validator to avoid duplicates. + pending_chunk_endorsements: SyncLruCache>, } impl Client { @@ -30,8 +34,17 @@ impl Client { &mut self, endorsement: ChunkEndorsement, ) -> Result<(), Error> { - let chunk_header = self.chain.get_chunk(endorsement.chunk_hash())?.cloned_header(); - self.chunk_endorsement_tracker.process_chunk_endorsement(&chunk_header, endorsement) + // We should not need whole chunk ready here, we only need chunk header. + match self.chain.get_chunk(endorsement.chunk_hash()) { + Ok(chunk) => self + .chunk_endorsement_tracker + .process_chunk_endorsement(&chunk.cloned_header(), endorsement), + Err(Error::ChunkMissing(_)) => { + tracing::debug!(target: "stateless_validation", ?endorsement, "Endorsement arrived before chunk."); + self.chunk_endorsement_tracker.add_chunk_endorsement_to_pending_cache(endorsement) + } + Err(error) => return Err(error), + } } } @@ -40,9 +53,38 @@ impl ChunkEndorsementTracker { Self { epoch_manager, chunk_endorsements: SyncLruCache::new(NUM_CHUNKS_IN_CHUNK_ENDORSEMENTS_CACHE), + // We can use a different cache size if needed, it does not have to be the same as for `chunk_endorsements`. + pending_chunk_endorsements: SyncLruCache::new(NUM_CHUNKS_IN_CHUNK_ENDORSEMENTS_CACHE), + } + } + + /// Process pending endorsements for the given chunk header. + /// It removes these endorsements from the `pending_chunk_endorsements` cache. + pub fn process_pending_endorsements(&self, chunk_header: &ShardChunkHeader) { + let chunk_hash = &chunk_header.chunk_hash(); + let chunk_endorsements = { + let mut guard = self.pending_chunk_endorsements.lock(); + guard.pop(chunk_hash) + }; + let Some(chunk_endorsements) = chunk_endorsements else { + return; + }; + tracing::debug!(target: "stateless_validation", ?chunk_hash, "Processing pending chunk endorsements."); + for endorsement in chunk_endorsements.values() { + if let Err(error) = self.process_chunk_endorsement(chunk_header, endorsement.clone()) { + tracing::debug!(target: "stateless_validation", ?endorsement, "Error processing pending chunk endorsement: {:?}", error); + } } } + /// Add the chunk endorsement to a cache of pending chunk endorsements (if not yet there). + pub(crate) fn add_chunk_endorsement_to_pending_cache( + &self, + endorsement: ChunkEndorsement, + ) -> Result<(), Error> { + self.process_chunk_endorsement_impl(endorsement, None) + } + /// Function to process an incoming chunk endorsement from chunk validators. /// We first verify the chunk endorsement and then store it in a cache. /// We would later include the endorsements in the block production. @@ -50,13 +92,28 @@ impl ChunkEndorsementTracker { &self, chunk_header: &ShardChunkHeader, endorsement: ChunkEndorsement, + ) -> Result<(), Error> { + self.process_chunk_endorsement_impl(endorsement, Some(chunk_header)) + } + + /// If the chunk header is available, we will verify the chunk endorsement and then store it in a cache. + /// Otherwise, we store the endorsement in a separate cache of endorsements to be processed when the chunk is ready. + fn process_chunk_endorsement_impl( + &self, + endorsement: ChunkEndorsement, + chunk_header: Option<&ShardChunkHeader>, ) -> Result<(), Error> { let chunk_hash = endorsement.chunk_hash(); let account_id = &endorsement.account_id; + let endorsement_cache = if chunk_header.is_some() { + &self.chunk_endorsements + } else { + &self.pending_chunk_endorsements + }; + // If we have already processed this chunk endorsement, return early. - if self - .chunk_endorsements + if endorsement_cache .get(chunk_hash) .is_some_and(|existing_endorsements| existing_endorsements.get(account_id).is_some()) { @@ -64,9 +121,11 @@ impl ChunkEndorsementTracker { return Ok(()); } - if !self.epoch_manager.verify_chunk_endorsement(chunk_header, &endorsement)? { - tracing::error!(target: "stateless_validation", ?endorsement, "Invalid chunk endorsement."); - return Err(Error::InvalidChunkEndorsement); + if let Some(chunk_header) = chunk_header { + if !self.epoch_manager.verify_chunk_endorsement(&chunk_header, &endorsement)? { + tracing::error!(target: "stateless_validation", ?endorsement, "Invalid chunk endorsement."); + return Err(Error::InvalidChunkEndorsement); + } } // If we are the current block producer, we store the chunk endorsement for each chunk which @@ -76,7 +135,7 @@ impl ChunkEndorsementTracker { // Maybe add check to ensure we don't accept endorsements from chunks already included in some block? // Maybe add check to ensure we don't accept endorsements from chunks that have too old height_created? tracing::debug!(target: "stateless_validation", ?endorsement, "Received and saved chunk endorsement."); - let mut guard = self.chunk_endorsements.lock(); + let mut guard = endorsement_cache.lock(); guard.get_or_insert(chunk_hash.clone(), || HashMap::new()); let chunk_endorsements = guard.get_mut(chunk_hash).unwrap(); chunk_endorsements.insert(account_id.clone(), endorsement);