Skip to content

Commit

Permalink
disable geyser stakes and votes for refactoring (#268)
Browse files Browse the repository at this point in the history
  • Loading branch information
godmodegalactus authored Jan 11, 2024
1 parent f910587 commit d91e2e9
Show file tree
Hide file tree
Showing 4 changed files with 4 additions and 84 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion lite-rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ solana-lite-rpc-core = { workspace = true }
solana-lite-rpc-services = { workspace = true }
solana-lite-rpc-cluster-endpoints = { workspace = true }
solana-lite-rpc-history = { workspace = true }
solana-lite-rpc-stakevote = { workspace = true }

[dev-dependencies]
bench = { path = "../bench" }
36 changes: 2 additions & 34 deletions lite-rpc/src/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use crate::{
jsonrpsee_subscrption_handler_sink::JsonRpseeSubscriptionHandlerSink,
rpc::LiteRpcServer,
};
use solana_lite_rpc_core::structures::leaderschedule::GetVoteAccountsConfig;
use solana_sdk::epoch_info::EpochInfo;
use std::collections::HashMap;

Expand Down Expand Up @@ -39,7 +38,6 @@ use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, slot_histo
use solana_transaction_status::{TransactionStatus, UiConfirmedBlock};
use std::{str::FromStr, sync::Arc};
use tokio::net::ToSocketAddrs;
use tokio::sync::oneshot;

lazy_static::lazy_static! {
static ref RPC_SEND_TX: IntCounter =
Expand All @@ -65,12 +63,6 @@ pub struct LiteBridge {
rpc_client: Arc<RpcClient>,
transaction_service: TransactionService,
history: History,
state_vote_sendder: Option<
tokio::sync::mpsc::Sender<(
GetVoteAccountsConfig,
tokio::sync::oneshot::Sender<RpcVoteAccountStatus>,
)>,
>,
}

impl LiteBridge {
Expand All @@ -79,19 +71,12 @@ impl LiteBridge {
data_cache: DataCache,
transaction_service: TransactionService,
history: History,
state_vote_sendder: Option<
tokio::sync::mpsc::Sender<(
GetVoteAccountsConfig,
oneshot::Sender<RpcVoteAccountStatus>,
)>,
>,
) -> Self {
Self {
rpc_client,
data_cache,
transaction_service,
history,
state_vote_sendder,
}
}

Expand Down Expand Up @@ -507,25 +492,8 @@ impl LiteRpcServer for LiteBridge {

async fn get_vote_accounts(
&self,
config: Option<RpcGetVoteAccountsConfig>,
_config: Option<RpcGetVoteAccountsConfig>,
) -> crate::rpc::Result<RpcVoteAccountStatus> {
let config: GetVoteAccountsConfig =
GetVoteAccountsConfig::try_from(config.unwrap_or_default()).unwrap_or_default();
if let Some(state_vote_sendder) = &self.state_vote_sendder {
let (tx, rx) = oneshot::channel();
if let Err(err) = state_vote_sendder.send((config, tx)).await {
return Err(jsonrpsee::core::Error::Custom(format!(
"error during query processing:{err}",
)));
}
rx.await.map_err(|err| {
jsonrpsee::core::Error::Custom(format!("error during query processing:{err}"))
})
} else {
self.rpc_client
.get_vote_accounts()
.await
.map_err(|err| (jsonrpsee::core::Error::Custom(err.to_string())))
}
todo!()
}
}
50 changes: 2 additions & 48 deletions lite-rpc/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use lite_rpc::service_spawner::ServiceSpawner;
use lite_rpc::DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE;
use log::info;
use solana_lite_rpc_cluster_endpoints::endpoint_stremers::EndpointStreaming;
use solana_lite_rpc_cluster_endpoints::grpc_leaders_getter::GrpcLeaderGetter;
use solana_lite_rpc_cluster_endpoints::grpc_subscription::create_grpc_subscription;
use solana_lite_rpc_cluster_endpoints::grpc_subscription_autoreconnect::{
GrpcConnectionTimeouts, GrpcSourceConfig,
Expand All @@ -31,7 +30,6 @@ use solana_lite_rpc_core::structures::{
epoch::EpochCache, identity_stakes::IdentityStakes, notifications::NotificationSender,
produced_block::ProducedBlock,
};
use solana_lite_rpc_core::traits::leaders_fetcher_interface::LeaderFetcherInterface;
use solana_lite_rpc_core::types::BlockStream;
use solana_lite_rpc_core::AnyhowJoinHandle;
use solana_lite_rpc_history::block_stores::inmemory_block_store::InmemoryBlockStore;
Expand Down Expand Up @@ -100,8 +98,6 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
transaction_retry_after_secs,
quic_proxy_addr,
use_grpc,
calculate_leader_schedule_form_geyser,
grpc_addr,
..
} = args;

Expand Down Expand Up @@ -154,7 +150,7 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
get_latest_block(blocks_notifier.resubscribe(), CommitmentConfig::finalized()).await;
info!("Got finalized block: {:?}", finalized_block.slot);

let (epoch_data, current_epoch_info) = EpochCache::bootstrap_epoch(&rpc_client).await?;
let (epoch_data, _current_epoch_info) = EpochCache::bootstrap_epoch(&rpc_client).await?;

let block_information_store =
BlockInformationStore::new(BlockInformation::from_block(&finalized_block));
Expand Down Expand Up @@ -208,48 +204,7 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
data_cache: data_cache.clone(),
};
//init grpc leader schedule and vote account is configured.
let (leader_schedule, rpc_stakes_send): (Arc<dyn LeaderFetcherInterface>, Option<_>) =
if use_grpc && calculate_leader_schedule_form_geyser {
//init leader schedule grpc process.

//1) get stored leader schedule and stakes (or via RPC if not present)
solana_lite_rpc_stakevote::bootstrat_literpc_leader_schedule(
rpc_client.url(),
&data_cache,
current_epoch_info.epoch,
)
.await;

//2) start stake vote and leader schedule.
let (rpc_stakes_send, rpc_stakes_recv) = mpsc::channel(1000);
let stake_vote_jh = solana_lite_rpc_stakevote::start_stakes_and_votes_loop(
data_cache.clone(),
slot_notifier.resubscribe(),
rpc_stakes_recv,
Arc::clone(&rpc_client),
grpc_addr,
)
.await?;

//
tokio::spawn(async move {
let err = stake_vote_jh.await;
log::error!("Vote and stake Services exit with error: {err:?}");
});

(
Arc::new(GrpcLeaderGetter::new(
Arc::clone(&data_cache.leader_schedule),
data_cache.epoch_data.clone(),
)),
Some(rpc_stakes_send),
)
} else {
(
Arc::new(JsonRpcLeaderGetter::new(rpc_client.clone(), 1024, 128)),
None,
)
};
let leader_schedule = Arc::new(JsonRpcLeaderGetter::new(rpc_client.clone(), 1024, 128));
let tpu_service: TpuService = TpuService::new(
tpu_config,
validator_identity,
Expand Down Expand Up @@ -284,7 +239,6 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
data_cache.clone(),
transaction_service,
history,
rpc_stakes_send,
)
.start(lite_rpc_http_addr, lite_rpc_ws_addr),
);
Expand Down

0 comments on commit d91e2e9

Please sign in to comment.