Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add limit and shrink policy for recycler #15320

Merged
merged 16 commits into from
Feb 24, 2021
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion bench-streamer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ fn main() -> Result<()> {

let mut read_channels = Vec::new();
let mut read_threads = Vec::new();
let recycler = PacketsRecycler::default();
let recycler = PacketsRecycler::new_without_limit("bench-streamer-recycler-shrink-stats");
for _ in 0..num_sockets {
let read = solana_net_utils::bind_to(ip_addr, port, false).unwrap();
read.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
Expand Down
12 changes: 6 additions & 6 deletions core/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1853,7 +1853,7 @@ impl ClusterInfo {
let mut last_contact_info_trace = timestamp();
let mut last_contact_info_save = timestamp();
let mut entrypoints_processed = false;
let recycler = PacketsRecycler::default();
let recycler = PacketsRecycler::new_without_limit("gossip-recycler-shrink-stats");
let crds_data = vec![
CrdsData::Version(Version::new(self.id())),
CrdsData::NodeInstance(self.instance.with_wallclock(timestamp())),
Expand Down Expand Up @@ -2104,7 +2104,7 @@ impl ClusterInfo {
.process_pull_requests(callers.cloned(), timestamp());
let output_size_limit =
self.update_data_budget(stakes.len()) / PULL_RESPONSE_MIN_SERIALIZED_SIZE;
let mut packets = Packets::new_with_recycler(recycler.clone(), 64, "handle_pull_requests");
let mut packets = Packets::new_with_recycler(recycler.clone(), 64).unwrap();
let (caller_and_filters, addrs): (Vec<_>, Vec<_>) = {
let mut rng = rand::thread_rng();
let check_pull_request =
Expand Down Expand Up @@ -2389,8 +2389,7 @@ impl ClusterInfo {
if packets.is_empty() {
None
} else {
let packets =
Packets::new_with_recycler_data(recycler, "handle_ping_messages", packets);
let packets = Packets::new_with_recycler_data(recycler, packets).unwrap();
Some(packets)
}
}
Expand Down Expand Up @@ -3019,7 +3018,8 @@ impl ClusterInfo {
exit: &Arc<AtomicBool>,
) -> JoinHandle<()> {
let exit = exit.clone();
let recycler = PacketsRecycler::default();
let recycler =
PacketsRecycler::new_without_limit("cluster-info-listen-recycler-shrink-stats");
Builder::new()
.name("solana-listen".to_string())
.spawn(move || {
Expand Down Expand Up @@ -3464,7 +3464,7 @@ mod tests {
.iter()
.map(|ping| Pong::new(ping, &this_node).unwrap())
.collect();
let recycler = PacketsRecycler::default();
let recycler = PacketsRecycler::new_without_limit("");
let packets = cluster_info
.handle_ping_messages(
remote_nodes
Expand Down
15 changes: 13 additions & 2 deletions core/src/fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,14 @@ impl FetchStage {
) -> (Self, PacketReceiver) {
let (sender, receiver) = channel();
(
Self::new_with_sender(sockets, tpu_forwards_sockets, exit, &sender, &poh_recorder),
Self::new_with_sender(
sockets,
tpu_forwards_sockets,
exit,
&sender,
&poh_recorder,
None,
),
receiver,
)
}
Expand All @@ -39,6 +46,7 @@ impl FetchStage {
exit: &Arc<AtomicBool>,
sender: &PacketSender,
poh_recorder: &Arc<Mutex<PohRecorder>>,
allocated_packet_limit: Option<u32>,
) -> Self {
let tx_sockets = sockets.into_iter().map(Arc::new).collect();
let tpu_forwards_sockets = tpu_forwards_sockets.into_iter().map(Arc::new).collect();
Expand All @@ -48,6 +56,7 @@ impl FetchStage {
exit,
&sender,
&poh_recorder,
allocated_packet_limit,
)
}

Expand Down Expand Up @@ -92,8 +101,10 @@ impl FetchStage {
exit: &Arc<AtomicBool>,
sender: &PacketSender,
poh_recorder: &Arc<Mutex<PohRecorder>>,
limit: Option<u32>,
) -> Self {
let recycler: PacketsRecycler = Recycler::warmed(1000, 1024);
let recycler: PacketsRecycler =
Recycler::warmed(1000, 1024, limit, "fetch_stage_recycler_shrink");

let tpu_threads = sockets.into_iter().map(|socket| {
streamer::receiver(
Expand Down
2 changes: 1 addition & 1 deletion core/src/gossip_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl GossipService {
gossip_socket.clone(),
&exit,
request_sender,
Recycler::default(),
Recycler::new_without_limit("gossip-receiver-recycler-shrink-stats"),
"gossip_receiver",
);
let (response_sender, response_receiver) = channel();
Expand Down
24 changes: 8 additions & 16 deletions core/src/serve_repair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ impl ServeRepair {
exit: &Arc<AtomicBool>,
) -> JoinHandle<()> {
let exit = exit.clone();
let recycler = PacketsRecycler::default();
let recycler = PacketsRecycler::new_without_limit("serve-repair-recycler-shrink-stats");
Builder::new()
.name("solana-repair-listen".to_string())
.spawn(move || {
Expand Down Expand Up @@ -498,11 +498,7 @@ impl ServeRepair {

if let Some(packet) = packet {
inc_new_counter_debug!("serve_repair-window-request-ledger", 1);
return Some(Packets::new_with_recycler_data(
recycler,
"run_window_request",
vec![packet],
));
return Some(Packets::new_with_recycler_data(recycler, vec![packet])).unwrap();
}
}

Expand Down Expand Up @@ -538,11 +534,7 @@ impl ServeRepair {
from_addr,
nonce,
)?;
return Some(Packets::new_with_recycler_data(
recycler,
"run_highest_window_request",
vec![packet],
));
return Packets::new_with_recycler_data(recycler, vec![packet]);
}
None
}
Expand All @@ -555,7 +547,7 @@ impl ServeRepair {
max_responses: usize,
nonce: Nonce,
) -> Option<Packets> {
let mut res = Packets::new_with_recycler(recycler.clone(), 64, "run_orphan");
let mut res = Packets::new_with_recycler(recycler.clone(), 64).unwrap();
if let Some(blockstore) = blockstore {
// Try to find the next "n" parent slots of the input slot
while let Ok(Some(meta)) = blockstore.meta(slot) {
Expand Down Expand Up @@ -609,7 +601,7 @@ mod tests {

/// test run_window_request responds with the right shred, and do not overrun
fn run_highest_window_request(slot: Slot, num_slots: u64, nonce: Nonce) {
let recycler = PacketsRecycler::default();
let recycler = PacketsRecycler::new_without_limit("");
solana_logger::setup();
let ledger_path = get_tmp_ledger_path!();
{
Expand Down Expand Up @@ -677,7 +669,7 @@ mod tests {

/// test window requests respond with the right shred, and do not overrun
fn run_window_request(slot: Slot, nonce: Nonce) {
let recycler = PacketsRecycler::default();
let recycler = PacketsRecycler::new_without_limit("");
solana_logger::setup();
let ledger_path = get_tmp_ledger_path!();
{
Expand Down Expand Up @@ -845,7 +837,7 @@ mod tests {

fn run_orphan(slot: Slot, num_slots: u64, nonce: Nonce) {
solana_logger::setup();
let recycler = PacketsRecycler::default();
let recycler = PacketsRecycler::new_without_limit("");
let ledger_path = get_tmp_ledger_path!();
{
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
Expand Down Expand Up @@ -916,7 +908,7 @@ mod tests {
#[test]
fn run_orphan_corrupted_shred_size() {
solana_logger::setup();
let recycler = PacketsRecycler::default();
let recycler = PacketsRecycler::new_without_limit("");
let ledger_path = get_tmp_ledger_path!();
{
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
Expand Down
2 changes: 1 addition & 1 deletion core/src/serve_repair_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl ServeRepairService {
serve_repair_socket.clone(),
&exit,
request_sender,
Recycler::default(),
Recycler::new_without_limit("serve-repair-receiver-recycler-shrink-stats"),
"serve_repair_receiver",
);
let (response_sender, response_receiver) = channel();
Expand Down
4 changes: 3 additions & 1 deletion core/src/shred_fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,10 @@ impl ShredFetchStage {
sender: &PacketSender,
bank_forks: Option<Arc<RwLock<BankForks>>>,
exit: &Arc<AtomicBool>,
limit: Option<u32>,
) -> Self {
let recycler: PacketsRecycler = Recycler::warmed(100, 1024);
let recycler: PacketsRecycler =
Recycler::warmed(100, 1024, limit, "shred_fetch_stage_recycler_shrink");

let (mut tvu_threads, tvu_filter) = Self::packet_modifier(
sockets,
Expand Down
4 changes: 2 additions & 2 deletions core/src/sigverify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ impl Default for TransactionSigVerifier {
fn default() -> Self {
init();
Self {
recycler: Recycler::warmed(50, 4096),
recycler_out: Recycler::warmed(50, 4096),
recycler: Recycler::warmed(50, 4096, None, ""),
recycler_out: Recycler::warmed(50, 4096, None, ""),
}
}
}
Expand Down
5 changes: 4 additions & 1 deletion core/src/sigverify_shreds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ impl ShredSigVerifier {
Self {
bank_forks,
leader_schedule_cache,
recycler_cache: RecyclerCache::warmed(),
recycler_cache: RecyclerCache::warmed(
"shred-sig-verifier-offsets-recycler-shrink-stats",
"shred-sig-verifier-buffer-recycler-shrink-stats",
),
}
}
fn read_slots(batches: &[Packets]) -> HashSet<u64> {
Expand Down
3 changes: 3 additions & 0 deletions core/src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ impl Tpu {
&exit,
&packet_sender,
&poh_recorder,
// At 1024 packets per `Packet`, each packet about MTU size ~1k, this is roughly
// 20GB
Some(20_000),
);
let (verified_sender, verified_receiver) = unbounded();

Expand Down
1 change: 1 addition & 0 deletions core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ impl Tvu {
&fetch_sender,
Some(bank_forks.clone()),
&exit,
None,
);

let (verified_sender, verified_receiver) = unbounded();
Expand Down
2 changes: 1 addition & 1 deletion ledger/benches/sigverify_shreds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const NUM_PACKETS: usize = 256;
const NUM_BATCHES: usize = 1;
#[bench]
fn bench_sigverify_shreds_sign_gpu(bencher: &mut Bencher) {
let recycler_cache = RecyclerCache::default();
let recycler_cache = RecyclerCache::new("", "");

let mut packets = Packets::default();
packets.packets.set_pinnable();
Expand Down
17 changes: 12 additions & 5 deletions ledger/src/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,12 +225,21 @@ pub struct EntryVerificationState {
device_verification_data: DeviceVerificationData,
}

#[derive(Default, Clone)]
#[derive(Clone)]
pub struct VerifyRecyclers {
hash_recycler: Recycler<PinnedVec<Hash>>,
tick_count_recycler: Recycler<PinnedVec<u64>>,
}

impl Default for VerifyRecyclers {
fn default() -> Self {
Self {
hash_recycler: Recycler::new_without_limit("hash_recycler_shrink_stats"),
tick_count_recycler: Recycler::new_without_limit("tick_count_recycler_shrink_stats"),
}
}
}

#[derive(PartialEq, Clone, Copy, Debug)]
pub enum EntryVerificationStatus {
Failure,
Expand Down Expand Up @@ -554,14 +563,12 @@ impl EntrySlice for [Entry] {
.take(self.len())
.collect();

let mut hashes_pinned = recyclers.hash_recycler.allocate("poh_verify_hash");
let mut hashes_pinned = recyclers.hash_recycler.allocate().unwrap();
hashes_pinned.set_pinnable();
hashes_pinned.resize(hashes.len(), Hash::default());
hashes_pinned.copy_from_slice(&hashes);

let mut num_hashes_vec = recyclers
.tick_count_recycler
.allocate("poh_verify_num_hashes");
let mut num_hashes_vec = recyclers.tick_count_recycler.allocate().unwrap();
num_hashes_vec.reserve_and_pin(cmp::max(1, self.len()));
for entry in self {
num_hashes_vec.push(entry.num_hashes.saturating_sub(1));
Expand Down
24 changes: 12 additions & 12 deletions ledger/src/sigverify_shreds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ fn slot_key_data_for_gpu<
.push(*slot);
}
}
let mut keyvec = recycler_cache.buffer().allocate("shred_gpu_pubkeys");
let mut keyvec = recycler_cache.buffer().allocate().unwrap();
keyvec.set_pinnable();
let mut slot_to_key_ix = HashMap::new();

Expand All @@ -152,7 +152,7 @@ fn slot_key_data_for_gpu<
slot_to_key_ix.insert(s, i);
}
}
let mut offsets = recycler_cache.offsets().allocate("shred_offsets");
let mut offsets = recycler_cache.offsets().allocate().unwrap();
offsets.set_pinnable();
slots.iter().for_each(|packet_slots| {
packet_slots.iter().for_each(|slot| {
Expand Down Expand Up @@ -185,11 +185,11 @@ fn shred_gpu_offsets(
batches: &[Packets],
recycler_cache: &RecyclerCache,
) -> (TxOffset, TxOffset, TxOffset, Vec<Vec<u32>>) {
let mut signature_offsets = recycler_cache.offsets().allocate("shred_signatures");
let mut signature_offsets = recycler_cache.offsets().allocate().unwrap();
signature_offsets.set_pinnable();
let mut msg_start_offsets = recycler_cache.offsets().allocate("shred_msg_starts");
let mut msg_start_offsets = recycler_cache.offsets().allocate().unwrap();
msg_start_offsets.set_pinnable();
let mut msg_sizes = recycler_cache.offsets().allocate("shred_msg_sizes");
let mut msg_sizes = recycler_cache.offsets().allocate().unwrap();
msg_sizes.set_pinnable();
let mut v_sig_lens = vec![];
for batch in batches.iter() {
Expand Down Expand Up @@ -242,7 +242,7 @@ pub fn verify_shreds_gpu(
trace!("pubkeys_len: {}", pubkeys_len);
let (signature_offsets, msg_start_offsets, msg_sizes, v_sig_lens) =
shred_gpu_offsets(pubkeys_len, batches, recycler_cache);
let mut out = recycler_cache.buffer().allocate("out_buffer");
let mut out = recycler_cache.buffer().allocate().unwrap();
out.set_pinnable();
elems.push(
perf_libs::Elems {
Expand Down Expand Up @@ -332,7 +332,7 @@ pub fn sign_shreds_cpu(keypair: &Keypair, batches: &mut [Packets]) {
}

pub fn sign_shreds_gpu_pinned_keypair(keypair: &Keypair, cache: &RecyclerCache) -> PinnedVec<u8> {
let mut vec = cache.buffer().allocate("pinned_keypair");
let mut vec = cache.buffer().allocate().unwrap();
let pubkey = keypair.pubkey().to_bytes();
let secret = keypair.secret().to_bytes();
let mut hasher = Sha512::default();
Expand Down Expand Up @@ -370,17 +370,17 @@ pub fn sign_shreds_gpu(
let mut num_packets = num_keypair_packets;

//should be zero
let mut pubkey_offsets = recycler_cache.offsets().allocate("pubkey offsets");
let mut pubkey_offsets = recycler_cache.offsets().allocate().unwrap();
pubkey_offsets.resize(count, 0);

let mut secret_offsets = recycler_cache.offsets().allocate("secret_offsets");
let mut secret_offsets = recycler_cache.offsets().allocate().unwrap();
secret_offsets.resize(count, pubkey_size as u32);

trace!("offset: {}", offset);
let (signature_offsets, msg_start_offsets, msg_sizes, _v_sig_lens) =
shred_gpu_offsets(offset, batches, recycler_cache);
let total_sigs = signature_offsets.len();
let mut signatures_out = recycler_cache.buffer().allocate("ed25519 signatures");
let mut signatures_out = recycler_cache.buffer().allocate().unwrap();
signatures_out.set_pinnable();
signatures_out.resize(total_sigs * sig_size, 0);
elems.push(
Expand Down Expand Up @@ -560,7 +560,7 @@ pub mod tests {

fn run_test_sigverify_shreds_gpu(slot: Slot) {
solana_logger::setup();
let recycler_cache = RecyclerCache::default();
let recycler_cache = RecyclerCache::new("", "");

let mut batch = [Packets::default()];
let mut shred = Shred::new_from_data(
Expand Down Expand Up @@ -624,7 +624,7 @@ pub mod tests {

fn run_test_sigverify_shreds_sign_gpu(slot: Slot) {
solana_logger::setup();
let recycler_cache = RecyclerCache::default();
let recycler_cache = RecyclerCache::new("", "");

let mut packets = Packets::default();
let num_packets = 32;
Expand Down
Loading