Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve/geyser reconnect on error #270

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ WS_ADDR=ws://0.0.0.0:8900
# PROMETHEUS_ADDR=your_prometheus_address_here

## Fanout size and retries configuration
FANOUT_SIZE=32
FANOUT_SIZE=18
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you reduce ?

MAX_RETRIES=40
RETRY_TIMEOUT=3

Expand Down
200 changes: 112 additions & 88 deletions cluster-endpoints/src/grpc_multiplex.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use crate::grpc_subscription::map_block_update;

Check warning on line 1 in cluster-endpoints/src/grpc_multiplex.rs

View workflow job for this annotation

GitHub Actions / lite-rpc full build

Diff in /home/runner/work/lite-rpc/lite-rpc/cluster-endpoints/src/grpc_multiplex.rs
use futures::StreamExt;
use geyser_grpc_connector::grpc_subscription_autoreconnect::{
create_geyser_reconnecting_stream, GeyserFilter, GrpcSourceConfig,
};
use futures::{Stream, StreamExt};
use geyser_grpc_connector::grpc_subscription_autoreconnect::{create_geyser_reconnecting_stream, GeyserFilter, GrpcSourceConfig, Message};
use geyser_grpc_connector::grpcmultiplex_fastestwins::{
create_multiplexed_stream, FromYellowstoneExtractor,
};
Expand All @@ -13,8 +11,10 @@
use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::CommitmentConfig;
use std::collections::HashMap;
use std::pin::Pin;
use std::time::Duration;
use tokio::sync::broadcast::Receiver;
use tokio::time::Instant;
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::{
SubscribeRequest, SubscribeRequestFilterSlots, SubscribeUpdate,
Expand Down Expand Up @@ -49,7 +49,9 @@
}
}

/// connect to multiple grpc sources to consume confirmed blocks and block status update
const STREAM_RECONNECT_COOLDOWN: Duration = Duration::from_secs(60);
const STREAM_RECONNECT_AFTER_TIME_WITHOUT_BLOCKS: Duration = Duration::from_secs(30);

pub fn create_grpc_multiplex_blocks_subscription(
grpc_sources: Vec<GrpcSourceConfig>,
) -> (Receiver<ProducedBlock>, AnyhowJoinHandle) {
Expand All @@ -61,98 +63,75 @@
info!("- connection to {}", grpc_source);
}

// let confirmed_blocks_stream = create_confirmed_blocks_stream(&grpc_sources);

// let finalized_blockmeta_stream = create_finalized_blockmeta_stream(&grpc_sources);

// return value is the broadcast receiver
let (producedblock_sender, blocks_output_stream) =
tokio::sync::broadcast::channel::<ProducedBlock>(1000);

let jh_block_emitter_task = {
tokio::task::spawn(async move {

Check warning on line 75 in cluster-endpoints/src/grpc_multiplex.rs

View workflow job for this annotation

GitHub Actions / lite-rpc full build

Diff in /home/runner/work/lite-rpc/lite-rpc/cluster-endpoints/src/grpc_multiplex.rs
loop {
let confirmed_blocks_stream = {
let commitment_config = CommitmentConfig::confirmed();

let mut streams = Vec::new();
for grpc_source in &grpc_sources {
let stream = create_geyser_reconnecting_stream(
grpc_source.clone(),
GeyserFilter(commitment_config).blocks_and_txs(),
);
streams.push(stream);
}
// by blockhash
let mut recent_confirmed_blocks = HashMap::<String, ProducedBlock>::new();
let mut confirmed_blocks_stream = Box::pin(create_confirmed_blocks_stream(&grpc_sources));
let mut finalized_blockmeta_stream = Box::pin(create_finalized_blockmeta_stream(&grpc_sources));

create_multiplexed_stream(streams, BlockExtractor(commitment_config))
};

let finalized_blockmeta_stream = {
let commitment_config = CommitmentConfig::finalized();
let sender = producedblock_sender;
let mut delay_reconnect_until = Instant::now() + STREAM_RECONNECT_COOLDOWN;
let mut block_heartbeat_tick = tokio::time::interval(Duration::from_secs(5));
let mut cleanup_tick = tokio::time::interval(Duration::from_secs(5));
let mut last_finalized_slot: Slot = 0;
loop {
tokio::select! {
confirmed_block = confirmed_blocks_stream.next() => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel the streams are not working well, the performance issues that I found was related to stream when I moved back to channel it is not working very well. May be thing that you can check on your side.

let confirmed_block = confirmed_block.expect("confirmed block from stream");
trace!("got confirmed block {} with blockhash {}",
confirmed_block.slot, confirmed_block.blockhash.clone());
if let Err(e) = sender.send(confirmed_block.clone()) {
warn!("Confirmed block channel has no receivers {e:?}");
continue
}
recent_confirmed_blocks.insert(confirmed_block.blockhash.clone(), confirmed_block);
delay_reconnect_until = Instant::now() + STREAM_RECONNECT_AFTER_TIME_WITHOUT_BLOCKS;
},
meta_finalized = finalized_blockmeta_stream.next() => {
let blockhash = meta_finalized.expect("finalized block meta from stream");
if let Some(cached_confirmed_block) = recent_confirmed_blocks.remove(&blockhash) {
let finalized_block = cached_confirmed_block.to_finalized_block();
last_finalized_slot = finalized_block.slot;
debug!("got finalized blockmeta {} with blockhash {}",
finalized_block.slot, finalized_block.blockhash.clone());
delay_reconnect_until = Instant::now() + STREAM_RECONNECT_AFTER_TIME_WITHOUT_BLOCKS;
if let Err(e) = sender.send(finalized_block) {
warn!("Finalized block channel has no receivers {e:?}");
continue;
}
} else {
debug!("finalized block meta received for blockhash {} which was never seen or already emitted", blockhash);
}
},
_ = cleanup_tick.tick() => {
let size_before = recent_confirmed_blocks.len();
recent_confirmed_blocks.retain(|_blockhash, block| {
last_finalized_slot == 0 || block.slot > last_finalized_slot - 100
});
let cnt_cleaned = size_before - recent_confirmed_blocks.len();
if cnt_cleaned > 0 {
debug!("cleaned {} confirmed blocks from cache", cnt_cleaned);
}
},
_ = block_heartbeat_tick.tick() => {
if Instant::now() > delay_reconnect_until {
// prevent crash loops
delay_reconnect_until = Instant::now() + STREAM_RECONNECT_COOLDOWN;

let mut streams = Vec::new();
for grpc_source in &grpc_sources {
let stream = create_geyser_reconnecting_stream(
grpc_source.clone(),
GeyserFilter(commitment_config).blocks_meta(),
);
streams.push(stream);
}
create_multiplexed_stream(streams, BlockMetaHashExtractor(commitment_config))
};
// TODO add attempt counter
warn!("No block data received for {:.2} seconds, restarting streams", STREAM_RECONNECT_AFTER_TIME_WITHOUT_BLOCKS.as_secs_f32());

// by blockhash
let mut recent_confirmed_blocks = HashMap::<String, ProducedBlock>::new();
let mut confirmed_blocks_stream = std::pin::pin!(confirmed_blocks_stream);
let mut finalized_blockmeta_stream = std::pin::pin!(finalized_blockmeta_stream);

let mut cleanup_tick = tokio::time::interval(Duration::from_secs(5));
let mut last_finalized_slot: Slot = 0;
let mut cleanup_without_recv_blocks: u8 = 0;
let mut cleanup_without_recv_blocks_meta: u8 = 0;
const MAX_ALLOWED_CLEANUP_WITHOUT_RECV: u8 = 12; // 12*5 = 60s without recving data
loop {
tokio::select! {
confirmed_block = confirmed_blocks_stream.next() => {
cleanup_without_recv_blocks = 0;

let confirmed_block = confirmed_block.expect("confirmed block from stream");
trace!("got confirmed block {} with blockhash {}",
confirmed_block.slot, confirmed_block.blockhash.clone());
if let Err(e) = producedblock_sender.send(confirmed_block.clone()) {
warn!("Confirmed block channel has no receivers {e:?}");
continue
}
recent_confirmed_blocks.insert(confirmed_block.blockhash.clone(), confirmed_block);
},
meta_finalized = finalized_blockmeta_stream.next() => {
cleanup_without_recv_blocks_meta = 0;
let blockhash = meta_finalized.expect("finalized block meta from stream");
if let Some(cached_confirmed_block) = recent_confirmed_blocks.remove(&blockhash) {
let finalized_block = cached_confirmed_block.to_finalized_block();
last_finalized_slot = finalized_block.slot;
debug!("got finalized blockmeta {} with blockhash {}",
finalized_block.slot, finalized_block.blockhash.clone());
if let Err(e) = producedblock_sender.send(finalized_block) {
warn!("Finalized block channel has no receivers {e:?}");
continue;
}
} else {
debug!("finalized block meta received for blockhash {} which was never seen or already emitted", blockhash);
}
},
_ = cleanup_tick.tick() => {
if cleanup_without_recv_blocks_meta > MAX_ALLOWED_CLEANUP_WITHOUT_RECV ||
cleanup_without_recv_blocks > MAX_ALLOWED_CLEANUP_WITHOUT_RECV {
log::error!("block or block meta stream stopped restaring blocks");
break;
}
cleanup_without_recv_blocks += 1;
cleanup_without_recv_blocks_meta += 1;
let size_before = recent_confirmed_blocks.len();
recent_confirmed_blocks.retain(|_blockhash, block| {
last_finalized_slot == 0 || block.slot > last_finalized_slot - 100
});
let cnt_cleaned = size_before - recent_confirmed_blocks.len();
if cnt_cleaned > 0 {
debug!("cleaned {} confirmed blocks from cache", cnt_cleaned);
}
confirmed_blocks_stream = Box::pin(create_confirmed_blocks_stream(&grpc_sources));
finalized_blockmeta_stream = Box::pin(create_finalized_blockmeta_stream(&grpc_sources));
}
}
}
Expand All @@ -160,9 +139,38 @@
})
};

(blocks_output_stream, jh_block_emitter_task)

Check warning on line 142 in cluster-endpoints/src/grpc_multiplex.rs

View workflow job for this annotation

GitHub Actions / lite-rpc full build

Diff in /home/runner/work/lite-rpc/lite-rpc/cluster-endpoints/src/grpc_multiplex.rs
}

fn create_finalized_blockmeta_stream(grpc_sources: &Vec<GrpcSourceConfig>) -> impl Stream<Item=String> + Sized {
let commitment_config = CommitmentConfig::finalized();

let mut streams = Vec::new();
for grpc_source in grpc_sources {
let stream = create_geyser_reconnecting_stream(
grpc_source.clone(),
GeyserFilter(commitment_config).blocks_meta(),
);
streams.push(stream);
}
create_multiplexed_stream(streams, BlockMetaHashExtractor(commitment_config))

Check warning on line 156 in cluster-endpoints/src/grpc_multiplex.rs

View workflow job for this annotation

GitHub Actions / lite-rpc full build

Diff in /home/runner/work/lite-rpc/lite-rpc/cluster-endpoints/src/grpc_multiplex.rs
}

fn create_confirmed_blocks_stream(grpc_sources: &Vec<GrpcSourceConfig>) -> impl Stream<Item=ProducedBlock> + Sized {
let commitment_config = CommitmentConfig::confirmed();

let mut streams = Vec::new();
for grpc_source in grpc_sources {
let stream = create_geyser_reconnecting_stream(
grpc_source.clone(),
GeyserFilter(commitment_config).blocks_and_txs(),
);
streams.push(stream);
}

create_multiplexed_stream(streams, BlockExtractor(commitment_config))
}

struct SlotExtractor {}

impl FromYellowstoneExtractor for crate::grpc_multiplex::SlotExtractor {
Expand Down Expand Up @@ -253,4 +261,20 @@
});

(multiplexed_messages, jh)
}

Check warning on line 264 in cluster-endpoints/src/grpc_multiplex.rs

View workflow job for this annotation

GitHub Actions / lite-rpc full build

Diff in /home/runner/work/lite-rpc/lite-rpc/cluster-endpoints/src/grpc_multiplex.rs

// detect if no blocks arrive for N seconds and restart the streams
fn create_multiplexed_stream_recover<E>(grpc_source_streams: Vec<impl Stream<Item = Message>>,
extractor: E) -> impl Stream<Item = E::Target>
where
E: FromYellowstoneExtractor
{

create_multiplexed_stream(grpc_source_streams, extractor)
}


Check warning on line 276 in cluster-endpoints/src/grpc_multiplex.rs

View workflow job for this annotation

GitHub Actions / lite-rpc full build

Diff in /home/runner/work/lite-rpc/lite-rpc/cluster-endpoints/src/grpc_multiplex.rs
#[tokio::test]
async fn test_multiplexed_slots() {

}
4 changes: 2 additions & 2 deletions core/src/quic_connection_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@
}
}
Err(_) => {
log::debug!("timeout while writing transaction for {}", identity);
warn!("timeout while writing transaction for {}", identity);

Check failure on line 161 in core/src/quic_connection_utils.rs

View workflow job for this annotation

GitHub Actions / Test lite-rpc against running Validator

cannot find macro `warn` in this scope

Check failure on line 161 in core/src/quic_connection_utils.rs

View workflow job for this annotation

GitHub Actions / Test lite-rpc against running Validator

cannot find macro `warn` in this scope

Check failure on line 161 in core/src/quic_connection_utils.rs

View workflow job for this annotation

GitHub Actions / Test lite-rpc against running Validator

cannot find macro `warn` in this scope

Check failure on line 161 in core/src/quic_connection_utils.rs

View workflow job for this annotation

GitHub Actions / lite-rpc full build

cannot find macro `warn` in this scope
return Err(QuicConnectionError::TimeOut);
}
}
Expand All @@ -177,7 +177,7 @@
}
}
Err(_) => {
log::debug!("timeout while finishing transaction for {}", identity);
warn!("timeout while finishing transaction for {}", identity);

Check failure on line 180 in core/src/quic_connection_utils.rs

View workflow job for this annotation

GitHub Actions / Test lite-rpc against running Validator

cannot find macro `warn` in this scope

Check failure on line 180 in core/src/quic_connection_utils.rs

View workflow job for this annotation

GitHub Actions / Test lite-rpc against running Validator

cannot find macro `warn` in this scope

Check failure on line 180 in core/src/quic_connection_utils.rs

View workflow job for this annotation

GitHub Actions / Test lite-rpc against running Validator

cannot find macro `warn` in this scope

Check failure on line 180 in core/src/quic_connection_utils.rs

View workflow job for this annotation

GitHub Actions / lite-rpc full build

cannot find macro `warn` in this scope
return Err(QuicConnectionError::TimeOut);
}
}
Expand Down
3 changes: 2 additions & 1 deletion lite-rpc/src/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use solana_lite_rpc_services::{

use anyhow::Context;
use jsonrpsee::{core::SubscriptionResult, server::ServerBuilder, PendingSubscriptionSink};
use log::info;
use prometheus::{opts, register_int_counter, IntCounter};
use solana_lite_rpc_core::{
stores::{block_information_store::BlockInformation, data_cache::DataCache, tx_store::TxProps},
Expand Down Expand Up @@ -206,7 +207,7 @@ impl LiteRpcServer for LiteBridge {
.get_latest_block(commitment_config)
.await;

log::trace!("glb {blockhash} {slot} {block_height}");
info!("glb {blockhash} {slot} {block_height}");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why info no need.


Ok(RpcResponse {
context: RpcResponseContext {
Expand Down
Loading