Skip to content

Commit

Permalink
Batch event fetching every 2 seconds to avoid: "Too Many Concurrent R…
Browse files Browse the repository at this point in the history
…equests" at relays
  • Loading branch information
mikedilger committed Sep 8, 2023
1 parent db0dd6c commit d50f250
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 16 deletions.
13 changes: 11 additions & 2 deletions src/overlord/minion/handle_websocket.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::Minion;
use crate::comms::ToOverlordMessage;
use crate::error::Error;
use crate::globals::GLOBALS;
use futures_util::sink::SinkExt;
Expand Down Expand Up @@ -49,9 +50,17 @@ impl Minion {
}

// Remove from sought set
self.sought.remove(&event.id);
if let Some(ess) = self.sought_events.remove(&event.id) {
// and notify the overlord of the completed job
for job_id in ess.job_ids.iter() {
self.to_overlord.send(ToOverlordMessage::MinionJobComplete(
self.url.clone(),
*job_id,
))?;
}
}

// Try processing everything immediately
// Process the event
crate::process::process_new_event(
&event,
Some(self.url.clone()),
Expand Down
64 changes: 50 additions & 14 deletions src/overlord/minion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use nostr_types::{
};
use reqwest::Response;
use std::borrow::Cow;
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::sync::atomic::Ordering;
use std::time::Duration;
use subscription_map::SubscriptionMap;
Expand All @@ -31,6 +31,11 @@ use tokio::sync::mpsc::UnboundedSender;
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
use tungstenite::protocol::{Message as WsMessage, WebSocketConfig};

pub struct EventSeekState {
pub job_ids: Vec<u64>,
pub asked: bool,
}

pub struct Minion {
url: RelayUrl,
to_overlord: UnboundedSender<ToOverlordMessage>,
Expand All @@ -42,7 +47,7 @@ pub struct Minion {
next_events_subscription_id: u32,
keepgoing: bool,
postings: HashSet<Id>,
sought: HashSet<Id>,
sought_events: HashMap<Id, EventSeekState>,
}

impl Minion {
Expand All @@ -69,7 +74,7 @@ impl Minion {
next_events_subscription_id: 0,
keepgoing: true,
postings: HashSet::new(),
sought: HashSet::new(),
sought_events: HashMap::new(),
})
}
}
Expand Down Expand Up @@ -269,18 +274,28 @@ impl Minion {
async fn loop_handler(&mut self) -> Result<(), Error> {
let ws_stream = self.stream.as_mut().unwrap();

let mut timer = tokio::time::interval(std::time::Duration::new(
// Ping timer
let mut ping_timer = tokio::time::interval(std::time::Duration::new(
GLOBALS.storage.read_setting_websocket_ping_frequency_sec(),
0,
));
timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
timer.tick().await; // use up the first immediate tick.
ping_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
ping_timer.tick().await; // use up the first immediate tick.

// Periodic Task timer (2 sec)
let mut task_timer = tokio::time::interval(std::time::Duration::new(2, 0));
task_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
task_timer.tick().await; // use up the first immediate tick.

select! {
biased;
_ = timer.tick() => {
_ = ping_timer.tick() => {
ws_stream.send(WsMessage::Ping(vec![0x1])).await?;
},
_ = task_timer.tick() => {
// Update subscription for sought events
self.get_events().await?;
},
to_minion_message = self.from_overlord.recv() => {
let to_minion_message = match to_minion_message {
Ok(m) => m,
Expand Down Expand Up @@ -338,11 +353,13 @@ impl Minion {
pub async fn handle_overlord_message(&mut self, message: ToMinionPayload) -> Result<(), Error> {
match message.detail {
ToMinionPayloadDetail::FetchEvent(id) => {
if self.sought.contains(&id) {
return Ok(());
}
self.sought.insert(id);
self.get_event(message.job_id, id.into()).await?;
self.sought_events.entry(id)
.and_modify(|ess| ess.job_ids.push(message.job_id))
.or_insert(EventSeekState {
job_ids: vec![message.job_id],
asked: false,
});
// We don't ask the relay immediately. See task_timer.
}
ToMinionPayloadDetail::PostEvent(event) => {
let id = event.id;
Expand Down Expand Up @@ -741,10 +758,29 @@ impl Minion {
Ok(())
}

async fn get_event(&mut self, job_id: u64, id: IdHex) -> Result<(), Error> {
async fn get_events(&mut self) -> Result<(), Error> {

// Collect all the sought events we have not yet asked for, and
// presumptively mark them as having been asked for.
let mut ids: Vec<IdHex> = Vec::new();
for (id, ess) in self.sought_events.iter_mut() {
if !ess.asked {
ids.push((*id).into());
ess.asked = true;
}
}

// Bail if nothing is sought
if ids.is_empty() {
return Ok(());
}

// The subscription job_id wont be used.
let job_id: u64 = u64::MAX;

// create the filter
let mut filter = Filter::new();
filter.ids = vec![id.into()];
filter.ids = ids.drain(..).map(|idhex| idhex.into()).collect();

tracing::trace!("{}: Event Filter: {} events", &self.url, filter.ids.len());

Expand Down

0 comments on commit d50f250

Please sign in to comment.