Skip to content

Commit

Permalink
Warn on header verified send errors.
Browse files Browse the repository at this point in the history
  • Loading branch information
aterentic-ethernal committed Sep 21, 2023
1 parent 96d9ddf commit c5818b6
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 26 deletions.
31 changes: 18 additions & 13 deletions src/api/v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::{
time::Instant,
};
use tokio::sync::broadcast;
use tracing::{error, info};
use tracing::{error, info, warn};
use warp::{Filter, Rejection, Reply};

mod handlers;
Expand Down Expand Up @@ -96,12 +96,25 @@ fn ws_route(
.and_then(handlers::ws)
}

async fn publish_message(clients: &WsClients, topic: Topic, message: PublishMessage) {
match clients.publish(&topic, message).await {
Ok(results) => {
let published_messages = results.iter().filter(|&result| result.is_ok()).count();
info!(?topic, published_messages, "Message published");
for error in results.into_iter().filter_map(Result::err) {
warn!(?topic, "Cannot publish message to client: {error}")
}
},
Err(error) => error!(?topic, "Cannot publish message: {error}"),
}
}

pub async fn publish_header_verified(
mut header_receiver: broadcast::Receiver<(Header, Instant)>,
clients: WsClients,
) {
loop {
let (header, received_at) = match header_receiver.recv().await {
let (header, _) = match header_receiver.recv().await {
Ok(value) => value,
Err(error) => {
error!("Cannot receive message: {error}");
Expand All @@ -117,17 +130,9 @@ pub async fn publish_header_verified(
},
};

if let Err(error) = clients
.publish(
Topic::HeaderVerified,
PublishMessage::HeaderVerified(message),
)
.await
{
error!("Cannot publish message: {error}");
} else {
info!(?received_at, "Header received");
}
let topic = Topic::HeaderVerified;
let message = PublishMessage::HeaderVerified(message);
publish_message(&clients, topic, message).await;
}
}

Expand Down
26 changes: 13 additions & 13 deletions src/api/v2/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ impl Reply for Status {
}
}

#[derive(Serialize, Deserialize, PartialEq, Eq, Hash)]
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Hash)]
#[serde(rename_all = "kebab-case")]
pub enum Topic {
HeaderVerified,
Expand Down Expand Up @@ -404,19 +404,19 @@ impl WsClients {
clients.insert(subscription_id.clone(), WsClient::new(subscription));
}

pub async fn publish(&self, topic: Topic, message: PublishMessage) -> anyhow::Result<()> {
pub async fn publish(
&self,
topic: &Topic,
message: PublishMessage,
) -> anyhow::Result<Vec<anyhow::Result<()>>> {
let clients = self.0.read().await;
for (_, client) in clients.iter() {
if !client.is_subscribed(&topic) {
continue;
}
let message = message.clone().try_into()?;
if let Some(sender) = &client.sender {
let _ = sender.send(Ok(message));
// TODO: Aggregate errors
}
}
Ok(())
let message: warp::ws::Message = message.try_into()?;
Ok(clients
.iter()
.filter(|(_, client)| client.is_subscribed(topic))
.flat_map(|(_, client)| &client.sender)
.map(|sender| sender.send(Ok(message.clone())).context("Send failed"))
.collect::<Vec<_>>())
}
}

Expand Down

0 comments on commit c5818b6

Please sign in to comment.