Skip to content

Commit

Permalink
implement JoinSet (#7)
Browse files Browse the repository at this point in the history
  • Loading branch information
morphy2k authored Mar 23, 2022
1 parent 1f9d42b commit 71320b9
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 44 deletions.
5 changes: 5 additions & 0 deletions .cargo/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[build]
rustflags = ["--cfg=tokio_unstable"]

[target.x86_64-unknown-linux-gnu]
rustflags = ["--cfg=tokio_unstable"]
31 changes: 2 additions & 29 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ tokio = { version = "1.17", features = [
reqwest = { version = "0.11", features = ["json"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
futures = { version = "0.3", default-features = false, features = ["std"] }
async-trait = "0.1"
rss = { version = "2", default-features = false }
atom_syndication = { version = "0.11", default-features = false }
Expand Down
26 changes: 12 additions & 14 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,12 @@ use crate::{
use std::{collections::HashMap, env, path::PathBuf, process, time::Duration};

use error::Error;
use futures::future;
use pico_args::Arguments;
use reqwest::Client;
use tokio::{
signal::unix::{signal, SignalKind},
sync::broadcast,
task::JoinHandle,
task::JoinSet,
};
use tracing::{error, info};

Expand Down Expand Up @@ -62,9 +61,12 @@ async fn main() -> Result<()> {

let client = build_client()?;

let tasks = watch_feeds(config.feeds, client)?;

future::try_join_all(tasks).await?;
let mut tasks = watch_feeds(config.feeds, client)?;
while let Some(res) = tasks.join_one().await? {
if res.is_err() {
tasks.abort_all()
}
}

Ok(())
}
Expand Down Expand Up @@ -131,12 +133,10 @@ fn parse_args() -> Result<Args> {
Ok(args)
}

type Task<T> = JoinHandle<Result<T>>;
fn watch_feeds(feeds: HashMap<String, Feed>, client: Client) -> Result<JoinSet<Result<()>>> {
let mut tasks = JoinSet::new();

fn watch_feeds(feeds: HashMap<String, Feed>, client: Client) -> Result<Vec<Task<()>>> {
let mut tasks = Vec::with_capacity(feeds.len());

let (tx, _) = broadcast::channel(tasks.capacity());
let (tx, _) = broadcast::channel(feeds.len());

for (name, config) in feeds.into_iter() {
let sink = config.sink.sink(&client)?;
Expand All @@ -150,18 +150,16 @@ fn watch_feeds(feeds: HashMap<String, Feed>, client: Client) -> Result<Vec<Task<

let rx = tx.subscribe();

tasks.push(tokio::spawn(async move {
tasks.spawn(async move {
info!("Start watcher for \"{}\"", name);

if let Err(e) = watcher.watch(rx).await {
error!(feed =? name, error =? e, "Watcher stopped with an error");
return Err(e);
}

info!("Watcher for \"{}\" has stopped", name);

Ok(())
}));
});
}

tokio::spawn(async move {
Expand Down

0 comments on commit 71320b9

Please sign in to comment.