Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Always pass port to jsonrpsee WebSocket client (#2339)
Browse files Browse the repository at this point in the history
* Always pass port to jsonrpsee

* Remove useless host check

* Do not silently drop
  • Loading branch information
skunert authored Mar 20, 2023
1 parent 782ac16 commit 2a9066c
Showing 1 changed file with 62 additions and 7 deletions.
69 changes: 62 additions & 7 deletions client/relay-chain-rpc-interface/src/reconnecting_ws_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,25 @@ pub struct ReconnectingWsClient {
to_worker_channel: TokioSender<RpcDispatcherMessage>,
}

/// Format url and force addition of a port
fn url_to_string_with_port(url: Url) -> Option<String> {
// This is already validated on CLI side, just defensive here
if (url.scheme() != "ws" && url.scheme() != "wss") || url.host_str().is_none() {
tracing::warn!(target: LOG_TARGET, ?url, "Non-WebSocket URL or missing host.");
return None
}

// Either we have a user-supplied port or use the default for 'ws' or 'wss' here
Some(format!(
"{}://{}:{}{}{}",
url.scheme(),
url.host_str()?,
url.port_or_known_default()?,
url.path(),
url.query().map(|query| format!("?{}", query)).unwrap_or_default()
))
}

impl ReconnectingWsClient {
/// Create a new websocket client frontend.
pub async fn new(urls: Vec<Url>, task_manager: &mut TaskManager) -> RelayChainResult<Self> {
Expand Down Expand Up @@ -144,7 +163,7 @@ impl ReconnectingWsClient {

/// Worker that should be used in combination with [`RelayChainRpcClient`]. Must be polled to distribute header notifications to listeners.
struct ReconnectingWebsocketWorker {
ws_urls: Vec<Url>,
ws_urls: Vec<String>,
/// Communication channel with the RPC client
client_receiver: TokioReceiver<RpcDispatcherMessage>,

Expand Down Expand Up @@ -176,7 +195,7 @@ fn distribute_header(header: RelayHeader, senders: &mut Vec<Sender<RelayHeader>>
/// and reconnections.
#[derive(Debug)]
struct ClientManager {
urls: Vec<Url>,
urls: Vec<String>,
active_client: Arc<JsonRpcClient>,
active_index: usize,
}
Expand All @@ -189,7 +208,7 @@ struct RelayChainSubscriptions {

/// Try to find a new RPC server to connect to.
async fn connect_next_available_rpc_server(
urls: &Vec<Url>,
urls: &Vec<String>,
starting_position: usize,
) -> Result<(usize, Arc<JsonRpcClient>), ()> {
tracing::debug!(target: LOG_TARGET, starting_position, "Connecting to RPC server.");
Expand All @@ -198,18 +217,19 @@ async fn connect_next_available_rpc_server(
tracing::info!(
target: LOG_TARGET,
index,
?url,
url,
"Trying to connect to next external relaychain node.",
);
if let Ok(ws_client) = WsClientBuilder::default().build(url).await {
return Ok((index, Arc::new(ws_client)))
match WsClientBuilder::default().build(&url).await {
Ok(ws_client) => return Ok((index, Arc::new(ws_client))),
Err(err) => tracing::debug!(target: LOG_TARGET, url, ?err, "Unable to connect."),
};
}
Err(())
}

impl ClientManager {
pub async fn new(urls: Vec<Url>) -> Result<Self, ()> {
pub async fn new(urls: Vec<String>) -> Result<Self, ()> {
if urls.is_empty() {
return Err(())
}
Expand Down Expand Up @@ -325,6 +345,8 @@ impl ReconnectingWebsocketWorker {
async fn new(
urls: Vec<Url>,
) -> (ReconnectingWebsocketWorker, TokioSender<RpcDispatcherMessage>) {
let urls = urls.into_iter().filter_map(url_to_string_with_port).collect();

let (tx, rx) = tokio_channel(100);
let worker = ReconnectingWebsocketWorker {
ws_urls: urls,
Expand Down Expand Up @@ -518,3 +540,36 @@ impl ReconnectingWebsocketWorker {
}
}
}

#[cfg(test)]
mod test {
use super::url_to_string_with_port;
use url::Url;

#[test]
fn url_to_string_works() {
let url = Url::parse("wss://something/path").unwrap();
assert_eq!(Some("wss://something:443/path".to_string()), url_to_string_with_port(url));

let url = Url::parse("ws://something/path").unwrap();
assert_eq!(Some("ws://something:80/path".to_string()), url_to_string_with_port(url));

let url = Url::parse("wss://something:100/path").unwrap();
assert_eq!(Some("wss://something:100/path".to_string()), url_to_string_with_port(url));

let url = Url::parse("wss://something:100/path").unwrap();
assert_eq!(Some("wss://something:100/path".to_string()), url_to_string_with_port(url));

let url = Url::parse("wss://something/path?query=yes").unwrap();
assert_eq!(
Some("wss://something:443/path?query=yes".to_string()),
url_to_string_with_port(url)
);

let url = Url::parse("wss://something:9090/path?query=yes").unwrap();
assert_eq!(
Some("wss://something:9090/path?query=yes".to_string()),
url_to_string_with_port(url)
);
}
}

0 comments on commit 2a9066c

Please sign in to comment.