Skip to content

Commit

Permalink
Merge pull request #189 from ikatson/limit-init
Browse files Browse the repository at this point in the history
Limit concurrency of torrent initialization (fix #139)
  • Loading branch information
ikatson authored Aug 15, 2024
2 parents 37ee8b7 + 726a5e1 commit 45779da
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 3 deletions.
20 changes: 18 additions & 2 deletions crates/librqbit/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ pub struct Session {
reqwest_client: reqwest::Client,
connector: Arc<StreamConnector>,

concurrent_initialize_semaphore: Arc<tokio::sync::Semaphore>,

// This is stored for all tasks to stop when session is dropped.
_cancellation_token_drop_guard: DropGuard,
}
Expand Down Expand Up @@ -375,6 +377,9 @@ pub struct SessionOptions {

// socks5://[username:password@]host:port
pub socks_proxy_url: Option<String>,

// how many concurrent torrent initializations can happen
pub concurrent_init_limit: Option<usize>,
}

async fn create_tcp_listener(
Expand Down Expand Up @@ -562,6 +567,7 @@ impl Session {
default_storage_factory: opts.default_storage_factory,
reqwest_client,
connector: stream_connector,
concurrent_initialize_semaphore: Arc::new(tokio::sync::Semaphore::new(opts.concurrent_init_limit.unwrap_or(3)))
});

if let Some(mut disk_write_rx) = disk_write_rx {
Expand Down Expand Up @@ -1085,7 +1091,12 @@ impl Session {
let _ = span.enter();

managed_torrent
.start(peer_rx, opts.paused, self.cancellation_token.child_token())
.start(
peer_rx,
opts.paused,
self.cancellation_token.child_token(),
self.concurrent_initialize_semaphore.clone(),
)
.context("error starting torrent")?;
}

Expand Down Expand Up @@ -1233,7 +1244,12 @@ impl Session {
self.tcp_listen_port,
handle.info().options.force_tracker_interval,
)?;
handle.start(peer_rx, false, self.cancellation_token.child_token())?;
handle.start(
peer_rx,
false,
self.cancellation_token.child_token(),
self.concurrent_initialize_semaphore.clone(),
)?;
self.try_update_persistence_metadata(handle).await;
Ok(())
}
Expand Down
14 changes: 13 additions & 1 deletion crates/librqbit/src/torrent_state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ impl ManagedTorrent {
peer_rx: Option<PeerStream>,
start_paused: bool,
live_cancellation_token: CancellationToken,
init_semaphore: Arc<tokio::sync::Semaphore>,
) -> anyhow::Result<()> {
let mut g = self.locked.write();

Expand Down Expand Up @@ -283,10 +284,16 @@ impl ManagedTorrent {
let t = self.clone();
let span = self.info().span.clone();
let token = live_cancellation_token.clone();

spawn_with_cancel(
error_span!(parent: span.clone(), "initialize_and_start"),
token.clone(),
async move {
let _permit = init_semaphore
.acquire()
.await
.context("bug: concurrent init semaphore was closed")?;

match init.check().await {
Ok(paused) => {
let mut g = t.locked.write();
Expand Down Expand Up @@ -344,7 +351,12 @@ impl ManagedTorrent {
drop(g);

// Recurse.
self.start(peer_rx, start_paused, live_cancellation_token)
self.start(
peer_rx,
start_paused,
live_cancellation_token,
init_semaphore,
)
}
ManagedTorrentState::None => bail!("bug: torrent is in empty state"),
}
Expand Down
5 changes: 5 additions & 0 deletions crates/rqbit/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ struct Opts {
/// Alternatively, set this as an environment variable RQBIT_SOCKS_PROXY_URL
#[arg(long)]
socks_url: Option<String>,

/// How many torrents can be initializing (rehashing) at the same time
#[arg(long, default_value = "5")]
concurrent_init_limit: usize,
}

#[derive(Parser)]
Expand Down Expand Up @@ -335,6 +339,7 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> {
}
}),
socks_proxy_url: socks_url,
concurrent_init_limit: Some(opts.concurrent_init_limit),
};

let stats_printer = |session: Arc<Session>| async move {
Expand Down

0 comments on commit 45779da

Please sign in to comment.