diff --git a/cluster-endpoints/src/grpc_multiplex.rs b/cluster-endpoints/src/grpc_multiplex.rs index faeeb53e..f0a819f2 100644 --- a/cluster-endpoints/src/grpc_multiplex.rs +++ b/cluster-endpoints/src/grpc_multiplex.rs @@ -1,4 +1,3 @@ -use crate::grpc_stream_utils::channelize_stream; use crate::grpc_subscription::map_block_update; use futures::StreamExt; use geyser_grpc_connector::grpc_subscription_autoreconnect::{ @@ -62,84 +61,98 @@ pub fn create_grpc_multiplex_blocks_subscription( info!("- connection to {}", grpc_source); } - let confirmed_blocks_stream = { - let commitment_config = CommitmentConfig::confirmed(); - - let mut streams = Vec::new(); - for grpc_source in &grpc_sources { - let stream = create_geyser_reconnecting_stream( - grpc_source.clone(), - GeyserFilter(commitment_config).blocks_and_txs(), - ); - streams.push(stream); - } - - create_multiplexed_stream(streams, BlockExtractor(commitment_config)) - }; - - let finalized_blockmeta_stream = { - let commitment_config = CommitmentConfig::finalized(); - - let mut streams = Vec::new(); - for grpc_source in &grpc_sources { - let stream = create_geyser_reconnecting_stream( - grpc_source.clone(), - GeyserFilter(commitment_config).blocks_meta(), - ); - streams.push(stream); - } - create_multiplexed_stream(streams, BlockMetaHashExtractor(commitment_config)) - }; - // return value is the broadcast receiver let (producedblock_sender, blocks_output_stream) = tokio::sync::broadcast::channel::(1000); let jh_block_emitter_task = { tokio::task::spawn(async move { - // by blockhash - let mut recent_confirmed_blocks = HashMap::::new(); - let mut confirmed_blocks_stream = std::pin::pin!(confirmed_blocks_stream); - let mut finalized_blockmeta_stream = std::pin::pin!(finalized_blockmeta_stream); - - let sender = producedblock_sender; - let mut cleanup_tick = tokio::time::interval(Duration::from_secs(5)); - let mut last_finalized_slot: Slot = 0; loop { - tokio::select! { - confirmed_block = confirmed_blocks_stream.next() => { - let confirmed_block = confirmed_block.expect("confirmed block from stream"); - trace!("got confirmed block {} with blockhash {}", - confirmed_block.slot, confirmed_block.blockhash.clone()); - if let Err(e) = sender.send(confirmed_block.clone()) { - warn!("Confirmed block channel has no receivers {e:?}"); - continue - } - recent_confirmed_blocks.insert(confirmed_block.blockhash.clone(), confirmed_block); - }, - meta_finalized = finalized_blockmeta_stream.next() => { - let blockhash = meta_finalized.expect("finalized block meta from stream"); - if let Some(cached_confirmed_block) = recent_confirmed_blocks.remove(&blockhash) { - let finalized_block = cached_confirmed_block.to_finalized_block(); - last_finalized_slot = finalized_block.slot; - debug!("got finalized blockmeta {} with blockhash {}", - finalized_block.slot, finalized_block.blockhash.clone()); - if let Err(e) = sender.send(finalized_block) { - warn!("Finalized block channel has no receivers {e:?}"); - continue; + let confirmed_blocks_stream = { + let commitment_config = CommitmentConfig::confirmed(); + + let mut streams = Vec::new(); + for grpc_source in &grpc_sources { + let stream = create_geyser_reconnecting_stream( + grpc_source.clone(), + GeyserFilter(commitment_config).blocks_and_txs(), + ); + streams.push(stream); + } + + create_multiplexed_stream(streams, BlockExtractor(commitment_config)) + }; + + let finalized_blockmeta_stream = { + let commitment_config = CommitmentConfig::finalized(); + + let mut streams = Vec::new(); + for grpc_source in &grpc_sources { + let stream = create_geyser_reconnecting_stream( + grpc_source.clone(), + GeyserFilter(commitment_config).blocks_meta(), + ); + streams.push(stream); + } + create_multiplexed_stream(streams, BlockMetaHashExtractor(commitment_config)) + }; + + // by blockhash + let mut recent_confirmed_blocks = HashMap::::new(); + let mut confirmed_blocks_stream = std::pin::pin!(confirmed_blocks_stream); + let mut finalized_blockmeta_stream = std::pin::pin!(finalized_blockmeta_stream); + + let mut cleanup_tick = tokio::time::interval(Duration::from_secs(5)); + let mut last_finalized_slot: Slot = 0; + let mut cleanup_without_recv_blocks: u8 = 0; + let mut cleanup_without_recv_blocks_meta: u8 = 0; + const MAX_ALLOWED_CLEANUP_WITHOUT_RECV : u8 = 12; // 12*5 = 60s without recving data + loop { + tokio::select! { + confirmed_block = confirmed_blocks_stream.next() => { + cleanup_without_recv_blocks = 0; + + let confirmed_block = confirmed_block.expect("confirmed block from stream"); + trace!("got confirmed block {} with blockhash {}", + confirmed_block.slot, confirmed_block.blockhash.clone()); + if let Err(e) = producedblock_sender.send(confirmed_block.clone()) { + warn!("Confirmed block channel has no receivers {e:?}"); + continue + } + recent_confirmed_blocks.insert(confirmed_block.blockhash.clone(), confirmed_block); + }, + meta_finalized = finalized_blockmeta_stream.next() => { + cleanup_without_recv_blocks_meta = 0; + let blockhash = meta_finalized.expect("finalized block meta from stream"); + if let Some(cached_confirmed_block) = recent_confirmed_blocks.remove(&blockhash) { + let finalized_block = cached_confirmed_block.to_finalized_block(); + last_finalized_slot = finalized_block.slot; + debug!("got finalized blockmeta {} with blockhash {}", + finalized_block.slot, finalized_block.blockhash.clone()); + if let Err(e) = producedblock_sender.send(finalized_block) { + warn!("Finalized block channel has no receivers {e:?}"); + continue; + } + } else { + debug!("finalized block meta received for blockhash {} which was never seen or already emitted", blockhash); + } + }, + _ = cleanup_tick.tick() => { + if cleanup_without_recv_blocks_meta > MAX_ALLOWED_CLEANUP_WITHOUT_RECV || + cleanup_without_recv_blocks > MAX_ALLOWED_CLEANUP_WITHOUT_RECV { + log::error!("block or block meta stream stopped restaring blocks"); + break; + } + cleanup_without_recv_blocks += 1; + cleanup_without_recv_blocks_meta += 1; + let size_before = recent_confirmed_blocks.len(); + recent_confirmed_blocks.retain(|_blockhash, block| { + last_finalized_slot == 0 || block.slot > last_finalized_slot - 100 + }); + let cnt_cleaned = size_before - recent_confirmed_blocks.len(); + if cnt_cleaned > 0 { + debug!("cleaned {} confirmed blocks from cache", cnt_cleaned); } - } else { - debug!("finalized block meta received for blockhash {} which was never seen or already emitted", blockhash); - } - }, - _ = cleanup_tick.tick() => { - let size_before = recent_confirmed_blocks.len(); - recent_confirmed_blocks.retain(|_blockhash, block| { - last_finalized_slot == 0 || block.slot > last_finalized_slot - 100 - }); - let cnt_cleaned = size_before - recent_confirmed_blocks.len(); - if cnt_cleaned > 0 { - debug!("cleaned {} confirmed blocks from cache", cnt_cleaned); } } } @@ -179,37 +192,63 @@ pub fn create_grpc_multiplex_slots_subscription( info!("- connection to {}", grpc_source); } - let multiplex_stream = { - let mut streams = Vec::new(); - for grpc_source in &grpc_sources { - let mut slots = HashMap::new(); - slots.insert( - "client".to_string(), - SubscribeRequestFilterSlots { - filter_by_commitment: Some(true), - }, - ); - - let filter = SubscribeRequest { - slots, - accounts: Default::default(), - transactions: HashMap::new(), - entry: Default::default(), - blocks: HashMap::new(), - blocks_meta: HashMap::new(), - commitment: Some(yellowstone_grpc_proto::geyser::CommitmentLevel::Processed as i32), - accounts_data_slice: Default::default(), - ping: None, + let (multiplexed_messages_sender, multiplexed_messages) = tokio::sync::broadcast::channel(1000); + + let jh = tokio::spawn(async move { + loop { + let multiplex_stream = { + let mut streams = Vec::new(); + for grpc_source in &grpc_sources { + let mut slots = HashMap::new(); + slots.insert( + "client".to_string(), + SubscribeRequestFilterSlots { + filter_by_commitment: Some(true), + }, + ); + + let filter = SubscribeRequest { + slots, + accounts: Default::default(), + transactions: HashMap::new(), + entry: Default::default(), + blocks: HashMap::new(), + blocks_meta: HashMap::new(), + commitment: Some(yellowstone_grpc_proto::geyser::CommitmentLevel::Processed as i32), + accounts_data_slice: Default::default(), + ping: None, + }; + + let stream = create_geyser_reconnecting_stream(grpc_source.clone(), filter); + streams.push(stream); + } + + create_multiplexed_stream(streams, SlotExtractor {}) }; - let stream = create_geyser_reconnecting_stream(grpc_source.clone(), filter); - streams.push(stream); + let mut multiplex_stream = std::pin::pin!(multiplex_stream); + loop { + tokio::select! { + slot_data = multiplex_stream.next() => { + if let Some(slot_data) = slot_data { + match multiplexed_messages_sender.send(slot_data) { + Ok(receivers) => { + trace!("sent data to {} receivers", receivers); + } + Err(send_error) => log::error!("Get error while sending on slot channel {}", send_error), + }; + } else { + debug!("Slot stream send None type"); + } + }, + _ = tokio::time::sleep(Duration::from_secs(30)) => { + log::error!("Slots timedout restarting subscription"); + break; + } + } + } } + }); - create_multiplexed_stream(streams, SlotExtractor {}) - }; - - let (multiplexed_stream, jh_channelizer) = channelize_stream(multiplex_stream); - - (multiplexed_stream, jh_channelizer) + (multiplexed_messages, jh) }