Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

send duplicate shred proofs for conflicting shred scenarios #32965

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

50 changes: 33 additions & 17 deletions core/src/window_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use {
rayon::{prelude::*, ThreadPool},
solana_gossip::cluster_info::ClusterInfo,
solana_ledger::{
blockstore::{Blockstore, BlockstoreInsertionMetrics},
blockstore::{Blockstore, BlockstoreInsertionMetrics, PossibleDuplicateShred},
leader_schedule_cache::LeaderScheduleCache,
shred::{self, Nonce, ReedSolomonCache, Shred},
},
Expand Down Expand Up @@ -138,23 +138,37 @@ impl WindowServiceMetrics {
fn run_check_duplicate(
cluster_info: &ClusterInfo,
blockstore: &Blockstore,
shred_receiver: &Receiver<Shred>,
shred_receiver: &Receiver<PossibleDuplicateShred>,
duplicate_slots_sender: &DuplicateSlotSender,
) -> Result<()> {
let check_duplicate = |shred: Shred| -> Result<()> {
let check_duplicate = |shred: PossibleDuplicateShred| -> Result<()> {
let shred_slot = shred.slot();
if !blockstore.has_duplicate_shreds_in_slot(shred_slot) {
if let Some(existing_shred_payload) = blockstore.is_shred_duplicate(&shred) {
cluster_info.push_duplicate_shred(&shred, &existing_shred_payload)?;
let (shred1, shred2) = match shred {
PossibleDuplicateShred::LastIndexConflict(shred, conflict) => (shred, conflict),
PossibleDuplicateShred::ErasureConflict(shred, conflict) => (shred, conflict),
PossibleDuplicateShred::Exists(shred) => {
// Unlike the other cases we have to wait until here to decide to handle the duplicate and store
// in blockstore. This is because the duplicate could have been part of the same insert batch,
// so we wait until the batch has been written.
if blockstore.has_duplicate_shreds_in_slot(shred_slot) {
return Ok(()); // A duplicate is already recorded
}
let Some(existing_shred_payload) = blockstore.is_shred_duplicate(&shred) else {
return Ok(()); // Not a duplicate
};
blockstore.store_duplicate_slot(
shred_slot,
existing_shred_payload,
shred.into_payload(),
existing_shred_payload.clone(),
shred.clone().into_payload(),
)?;

duplicate_slots_sender.send(shred_slot)?;
(shred, existing_shred_payload)
}
}
};

// Propagate duplicate proof through gossip
cluster_info.push_duplicate_shred(&shred1, &shred2)?;
// Notify duplicate consensus state machine
duplicate_slots_sender.send(shred_slot)?;

Ok(())
};
Expand Down Expand Up @@ -226,7 +240,7 @@ fn run_insert<F>(
reed_solomon_cache: &ReedSolomonCache,
) -> Result<()>
where
F: Fn(Shred),
F: Fn(PossibleDuplicateShred),
{
const RECV_TIMEOUT: Duration = Duration::from_millis(200);
let mut shred_receiver_elapsed = Measure::start("shred_receiver_elapsed");
Expand Down Expand Up @@ -370,7 +384,7 @@ impl WindowService {
cluster_info: Arc<ClusterInfo>,
exit: Arc<AtomicBool>,
blockstore: Arc<Blockstore>,
duplicate_receiver: Receiver<Shred>,
duplicate_receiver: Receiver<PossibleDuplicateShred>,
duplicate_slots_sender: DuplicateSlotSender,
) -> JoinHandle<()> {
let handle_error = || {
Expand Down Expand Up @@ -400,7 +414,7 @@ impl WindowService {
blockstore: Arc<Blockstore>,
leader_schedule_cache: Arc<LeaderScheduleCache>,
verified_receiver: Receiver<Vec<PacketBatch>>,
check_duplicate_sender: Sender<Shred>,
check_duplicate_sender: Sender<PossibleDuplicateShred>,
completed_data_sets_sender: CompletedDataSetsSender,
retransmit_sender: Sender<Vec<ShredPayload>>,
outstanding_requests: Arc<RwLock<OutstandingShredRepairs>>,
Expand All @@ -417,8 +431,8 @@ impl WindowService {
Builder::new()
.name("solWinInsert".to_string())
.spawn(move || {
let handle_duplicate = |shred| {
let _ = check_duplicate_sender.send(shred);
let handle_duplicate = |possible_duplicate_shred| {
let _ = check_duplicate_sender.send(possible_duplicate_shred);
};
let mut metrics = BlockstoreInsertionMetrics::default();
let mut ws_metrics = WindowServiceMetrics::default();
Expand Down Expand Up @@ -551,7 +565,9 @@ mod test {
};
assert_eq!(duplicate_shred.slot(), shreds[0].slot());
let duplicate_shred_slot = duplicate_shred.slot();
sender.send(duplicate_shred.clone()).unwrap();
sender
.send(PossibleDuplicateShred::Exists(duplicate_shred.clone()))
.unwrap();
assert!(!blockstore.has_duplicate_shreds_in_slot(duplicate_shred_slot));
let keypair = Keypair::new();
let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), timestamp());
Expand Down
1 change: 1 addition & 0 deletions gossip/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ thiserror = { workspace = true }
[dev-dependencies]
num_cpus = { workspace = true }
serial_test = { workspace = true }
test-case = { workspace = true }

[build-dependencies]
rustc_version = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ pub fn make_accounts_hashes_message(
pub(crate) type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>;

// TODO These messages should go through the gpu pipeline for spam filtering
#[frozen_abi(digest = "6T2sn92PMrTijsgncH3bBZL4K5GUowb442cCw4y4DuwV")]
#[frozen_abi(digest = "EnbW8mYTsPMndq9NkHLTkHJgduXvWSfSD6bBdmqQ8TiF")]
#[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)]
#[allow(clippy::large_enum_variant)]
pub(crate) enum Protocol {
Expand Down
2 changes: 1 addition & 1 deletion gossip/src/crds_gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ impl CrdsGossip {
let now = timestamp();
for entry in entries {
if let Err(err) = crds.insert(entry, now, GossipRoute::LocalMessage) {
error!("push_duplicate_shred faild: {:?}", err);
error!("push_duplicate_shred failed: {:?}", err);
}
}
Ok(())
Expand Down
Loading