Skip to content

Commit

Permalink
add retry on error
Browse files Browse the repository at this point in the history
  • Loading branch information
morphy2k committed Sep 8, 2021
1 parent 6f75425 commit 6872ac0
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 8 deletions.
6 changes: 6 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,10 @@ pub struct Feed {
pub sink: SinkOptions,
#[serde(default, with = "humantime_serde")]
pub interval: Option<Duration>,
#[serde(default = "retry_limit_default")]
pub retry_limit: usize,
}

const fn retry_limit_default() -> usize {
10
}
8 changes: 7 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,13 @@ fn watch_feeds(feeds: HashMap<String, Feed>, client: Client) -> Result<Vec<Task<

for (name, config) in feeds.into_iter() {
let sink = config.sink.sink(&client)?;
let watcher = Watcher::new(config.url, sink, config.interval, client.clone())?;
let watcher = Watcher::new(
config.url,
sink,
config.interval,
client.clone(),
config.retry_limit,
)?;

let rx = tx.subscribe();

Expand Down
29 changes: 22 additions & 7 deletions src/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ pub struct Watcher<T: Sink> {
sink: T,
interval: Duration,
client: Client,
retry_limit: usize,
retries_left: usize,
last_date: Option<DateTime<FixedOffset>>,
}

Expand All @@ -29,12 +31,15 @@ impl<'a, T: Sink> Watcher<T> {
sink: T,
interval: Option<Duration>,
client: Client,
retry_limit: usize,
) -> Result<Self> {
Ok(Self {
url: url.into_url()?,
sink,
interval: interval.unwrap_or(DEFAULT_INTERVAL),
client,
retry_limit,
retries_left: retry_limit,
last_date: None,
})
}
Expand All @@ -52,8 +57,9 @@ impl<'a, T: Sink> Watcher<T> {
let feed = match self.fetch().await {
Ok(c) => c,
Err(e) => {
if is_timeout(&e) {
error!("Timeout while getting items: {}", e);
if is_retriable(&e) && self.retries_left > 0 {
error!("error while getting items: {}", e);
self.retries_left -= 1;
continue;
} else {
return Err(e);
Expand All @@ -78,18 +84,27 @@ impl<'a, T: Sink> Watcher<T> {
None => continue,
};

debug!("pushing {} items from \"{}\"", news.len(), feed.title());
debug!(
"pushing {} items from \"{}\" feed",
news.len(),
feed.title()
);

if let Err(err) = self.sink.push(news).await {
if is_timeout(&err) {
error!("Timeout while pushing items: {}", err);
if is_retriable(&err) && self.retries_left > 0 {
error!("error while pushing items: {}", err);
self.retries_left -= 1;
continue;
} else {
return Err(err);
}
}

self.last_date = last.date.into();

if self.retries_left != self.retry_limit {
self.retries_left = self.retry_limit;
}
}

self.sink.shutdown().await?;
Expand Down Expand Up @@ -124,9 +139,9 @@ impl<'a, T: Sink> Watcher<T> {
}
}

fn is_timeout(err: &Error) -> bool {
fn is_retriable(err: &Error) -> bool {
match err {
Error::Request(e) => e.is_timeout(),
Error::Request(e) => e.is_timeout() || e.is_connect() || e.is_status(),
_ => false,
}
}

0 comments on commit 6872ac0

Please sign in to comment.