Skip to content

Commit

Permalink
cumulus/client: added external rpc connection retry logic
Browse files Browse the repository at this point in the history
Signed-off-by: Iulian Barbu <iulian.barbu@parity.io>
  • Loading branch information
iulianbarbu committed Aug 29, 2024
1 parent a67d623 commit 51cd9b4
Showing 1 changed file with 36 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use jsonrpsee::{
use sc_rpc_api::chain::ChainApiClient;
use schnellru::{ByLength, LruMap};
use sp_runtime::generic::SignedBlock;
use std::sync::Arc;
use std::{sync::Arc, time::Duration};
use tokio::sync::mpsc::{
channel as tokio_channel, Receiver as TokioReceiver, Sender as TokioSender,
};
Expand All @@ -43,6 +43,9 @@ use url::Url;
use crate::rpc_client::{distribute_header, RpcDispatcherMessage};

const LOG_TARGET: &str = "reconnecting-websocket-client";
const DEFAULT_EXTERNAL_RPC_CONN_RETRIES: usize = 5;
const DEFAULT_SLEEP_TIME_MS_BETWEEN_RETRIES: u64 = 1000;
const DEFAULT_SLEEP_EXP_BACKOFF_BETWEEN_RETRIES: i32 = 2;

/// Worker that should be used in combination with [`RelayChainRpcClient`].
///
Expand Down Expand Up @@ -93,16 +96,45 @@ struct RelayChainSubscriptions {
best_subscription: Subscription<RelayHeader>,
}

/// Try to find a new RPC server to connect to.
/// Try to find a new RPC server to connect to. Uses a naive retry
/// logic that does an exponential backoff in between iterations
/// through all URLs from the list. It uses a constant to tell how
/// many iterations of connection attempts to all URLs we allow. We
/// return early when a connection is made.
async fn connect_next_available_rpc_server(
urls: &Vec<String>,
starting_position: usize,
) -> Result<(usize, Arc<JsonRpcClient>), ()> {
tracing::debug!(target: LOG_TARGET, starting_position, "Connecting to RPC server.");
for (counter, url) in urls.iter().cycle().skip(starting_position).take(urls.len()).enumerate() {

let mut prev_iteration: u32 = 0;
for (counter, url) in urls
.iter()
.cycle()
.skip(starting_position)
.take(urls.len() * DEFAULT_EXTERNAL_RPC_CONN_RETRIES)
.enumerate()
{
// If we reached the end of the urls list, backoff before retrying
// connections to the entire list once more.
let Ok(current_iteration) = (counter / urls.len()).try_into() else {
tracing::error!(target: LOG_TARGET, "Too many connection attempts to the RPC servers, aborting...");
break;
};
if current_iteration > prev_iteration {
// Safe conversion given we convert positive i32s which are lower than u64::MAX.
tokio::time::sleep(Duration::from_millis(
DEFAULT_SLEEP_TIME_MS_BETWEEN_RETRIES *
DEFAULT_SLEEP_EXP_BACKOFF_BETWEEN_RETRIES.pow(prev_iteration) as u64,
))
.await;
prev_iteration = current_iteration;
}

let index = (starting_position + counter) % urls.len();
tracing::info!(
target: LOG_TARGET,
current_iteration,
index,
url,
"Trying to connect to next external relaychain node.",
Expand All @@ -112,6 +144,7 @@ async fn connect_next_available_rpc_server(
Err(err) => tracing::debug!(target: LOG_TARGET, url, ?err, "Unable to connect."),
};
}

Err(())
}

Expand Down

0 comments on commit 51cd9b4

Please sign in to comment.