Skip to content

Commit

Permalink
Code fully compiles for processing incoming peers
Browse files Browse the repository at this point in the history
  • Loading branch information
ikatson committed Dec 5, 2023
1 parent 1f7925a commit f68bcc3
Show file tree
Hide file tree
Showing 11 changed files with 310 additions and 32 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/dht/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "librqbit-dht"
version = "4.0.0"
version = "4.1.0"
edition = "2021"
description = "DHT implementation, used in rqbit torrent client."
license = "Apache-2.0"
Expand Down
2 changes: 1 addition & 1 deletion crates/dht/src/dht.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{
cmp::Reverse,
net::{SocketAddr, SocketAddrV4},
net::SocketAddr,
sync::{
atomic::{AtomicU16, Ordering},
Arc,
Expand Down
4 changes: 2 additions & 2 deletions crates/librqbit/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "librqbit"
version = "4.0.0"
version = "4.1.0"
authors = ["Igor Katson <igor.katson@gmail.com>"]
edition = "2021"
description = "The main library used by rqbit torrent client. The binary is just a small wrapper on top of it."
Expand All @@ -26,7 +26,7 @@ bencode = {path = "../bencode", default-features=false, package="librqbit-bencod
buffers = {path = "../buffers", package="librqbit-buffers", version = "2.2.1"}
librqbit-core = {path = "../librqbit_core", version = "3.2.1"}
clone_to_owned = {path = "../clone_to_owned", package="librqbit-clone-to-owned", version = "2.2.1"}
peer_binary_protocol = {path = "../peer_binary_protocol", package="librqbit-peer-protocol", version = "3.2.1"}
peer_binary_protocol = {path = "../peer_binary_protocol", package="librqbit-peer-protocol", version = "3.3.0"}
sha1w = {path = "../sha1w", default-features=false, package="librqbit-sha1-wrapper", version="2.2.1"}
dht = {path = "../dht", package="librqbit-dht", version="4.0.0"}
librqbit-upnp = {path = "../upnp", version = "0.1.0"}
Expand Down
86 changes: 78 additions & 8 deletions crates/librqbit/src/peer_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub struct PeerConnection<H> {
spawner: BlockingSpawner,
}

async fn with_timeout<T, E>(
pub(crate) async fn with_timeout<T, E>(
timeout_value: Duration,
fut: impl std::future::Future<Output = Result<T, E>>,
) -> anyhow::Result<T>
Expand Down Expand Up @@ -120,18 +120,57 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
}
}

// By the time this is called:
// read_buf should start with valuable data. The handshake should be removed from it.
pub async fn manage_peer_incoming(
&self,
mut outgoing_chan: tokio::sync::mpsc::UnboundedReceiver<WriterRequest>,
outgoing_chan: tokio::sync::mpsc::UnboundedReceiver<WriterRequest>,
// How many bytes into read buffer have we read already.
read_so_far: usize,
read_buf: Vec<u8>,
handshake: Handshake<ByteString>,
socket: tokio::net::TcpSocket,
mut conn: tokio::net::TcpStream,
) -> anyhow::Result<()> {
todo!()
use tokio::io::AsyncWriteExt;

let rwtimeout = self
.options
.read_write_timeout
.unwrap_or_else(|| Duration::from_secs(10));

if handshake.info_hash != self.info_hash.0 {
anyhow::bail!("wrong info hash");
}

trace!(
"incoming connection: id={:?}",
try_decode_peer_id(Id20(handshake.peer_id))
);

let mut write_buf = Vec::<u8>::with_capacity(PIECE_MESSAGE_DEFAULT_LEN);
let handshake = Handshake::new(self.info_hash, self.peer_id);
handshake.serialize(&mut write_buf);
with_timeout(rwtimeout, conn.write_all(&write_buf))
.await
.context("error writing handshake")?;
write_buf.clear();

let h_supports_extended = handshake.supports_extended();

self.manage_peer(
h_supports_extended,
read_so_far,
read_buf,
write_buf,
conn,
outgoing_chan,
)
.await
}

pub async fn manage_peer_outgoing(
&self,
mut outgoing_chan: tokio::sync::mpsc::UnboundedReceiver<WriterRequest>,
outgoing_chan: tokio::sync::mpsc::UnboundedReceiver<WriterRequest>,
) -> anyhow::Result<()> {
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
Expand Down Expand Up @@ -170,20 +209,51 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
let (h, size) = Handshake::deserialize(&read_buf[..read_so_far])
.map_err(|e| anyhow::anyhow!("error deserializing handshake: {:?}", e))?;

let h_supports_extended = h.supports_extended();
trace!("connected: id={:?}", try_decode_peer_id(Id20(h.peer_id)));
if h.info_hash != self.info_hash.0 {
anyhow::bail!("info hash does not match");
}

let mut extended_handshake: Option<ExtendedHandshake<ByteString>> = None;
let supports_extended = h.supports_extended();

self.handler.on_handshake(h)?;

if read_so_far > size {
read_buf.copy_within(size..read_so_far, 0);
}
read_so_far -= size;

self.manage_peer(
h_supports_extended,
read_so_far,
read_buf,
write_buf,
conn,
outgoing_chan,
)
.await
}

async fn manage_peer(
&self,
handshake_supports_extended: bool,
// How many bytes into read_buf is there of peer-sent-data.
mut read_so_far: usize,
mut read_buf: Vec<u8>,
mut write_buf: Vec<u8>,
mut conn: tokio::net::TcpStream,
mut outgoing_chan: tokio::sync::mpsc::UnboundedReceiver<WriterRequest>,
) -> anyhow::Result<()> {
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;

let rwtimeout = self
.options
.read_write_timeout
.unwrap_or_else(|| Duration::from_secs(10));

let mut extended_handshake: Option<ExtendedHandshake<ByteString>> = None;
let supports_extended = handshake_supports_extended;

if supports_extended {
let my_extended =
Message::Extended(ExtendedMessage::Handshake(ExtendedHandshake::new()));
Expand Down
123 changes: 114 additions & 9 deletions crates/librqbit/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,35 @@ use std::{
use anyhow::{bail, Context};
use bencode::{bencode_serialize_to_writer, BencodeDeserializer};
use buffers::{ByteBufT, ByteString};
use clone_to_owned::CloneToOwned;
use dht::{
Dht, DhtBuilder, DhtConfig, Id20, PersistentDht, PersistentDhtConfig, RequestPeersStream,
};
use futures::{stream::FuturesUnordered, StreamExt, TryFutureExt};
use librqbit_core::{
directories::get_configuration_directory,
magnet::Magnet,
peer_id::generate_peer_id,
torrent_metainfo::{torrent_from_bytes, TorrentMetaV1Info, TorrentMetaV1Owned},
};
use parking_lot::RwLock;
use peer_binary_protocol::{Handshake, PIECE_MESSAGE_DEFAULT_LEN};
use reqwest::Url;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use serde_with::serde_as;
use tokio::net::TcpListener;
use tracing::{debug, error, error_span, info, warn};
use tokio::{
io::AsyncReadExt,
net::{TcpListener, TcpStream},
};
use tracing::{debug, error, error_span, info, trace, warn, Instrument};

use crate::{
dht_utils::{read_metainfo_from_peer_receiver, ReadMetainfoResult},
peer_connection::PeerConnectionOptions,
peer_connection::{with_timeout, PeerConnectionOptions},
spawn_utils::{spawn, BlockingSpawner},
torrent_state::{ManagedTorrentBuilder, ManagedTorrentHandle, ManagedTorrentState},
torrent_state::{
ManagedTorrentBuilder, ManagedTorrentHandle, ManagedTorrentState, TorrentStateLive,
},
};

pub const SUPPORTED_SCHEMES: [&str; 3] = ["http:", "https:", "magnet:"];
Expand Down Expand Up @@ -375,6 +383,14 @@ async fn get_public_announce_addr(port: u16) -> anyhow::Result<SocketAddr> {
Ok(addr)
}

pub(crate) struct CheckedIncomingConnection {
pub addr: SocketAddr,
pub stream: tokio::net::TcpStream,
pub read_buf: Vec<u8>,
pub handshake: Handshake<ByteString>,
pub read_so_far: usize,
}

impl Session {
/// Create a new session. The passed in folder will be used as a default unless overriden per torrent.
pub async fn new(output_folder: PathBuf) -> anyhow::Result<Arc<Self>> {
Expand Down Expand Up @@ -509,14 +525,103 @@ impl Session {
Ok(())
}

async fn check_incoming_connection(
&self,
addr: SocketAddr,
mut stream: TcpStream,
) -> anyhow::Result<(Arc<TorrentStateLive>, CheckedIncomingConnection)> {
// TODO: move buffer handling to peer_connection

let rwtimeout = self
.peer_opts
.read_write_timeout
.unwrap_or_else(|| Duration::from_secs(10));

let mut read_buf = vec![0u8; PIECE_MESSAGE_DEFAULT_LEN * 2];
let mut read_so_far = with_timeout(rwtimeout, stream.read(&mut read_buf))
.await
.context("error reading handshake")?;
if read_so_far == 0 {
anyhow::bail!("bad handshake");
}
let (h, size) = Handshake::deserialize(&read_buf[..read_so_far])
.map_err(|e| anyhow::anyhow!("error deserializing handshake: {:?}", e))?;

trace!("received handshake from {addr}: {:?}", h);

for (id, torrent) in self.db.read().torrents.iter() {
if torrent.info_hash().0 != h.info_hash {
continue;
}

let live = match torrent.live() {
Some(live) => live,
None => {
bail!("torrent {id} is not live, ignoring connection");
}
};

let handshake = h.clone_to_owned();

if read_so_far > size {
read_buf.copy_within(size..read_so_far, 0);
}
read_so_far -= size;

return Ok((
live,
CheckedIncomingConnection {
addr,
stream,
handshake,
read_buf,
read_so_far,
},
));
}

bail!("didn't find a matching torrent for {:?}", h.info_hash)
}

fn handover_checked_connection(
&self,
live: Arc<TorrentStateLive>,
checked: CheckedIncomingConnection,
) -> anyhow::Result<()> {
live.add_incoming_peer(checked)
}

async fn task_tcp_listener(self: Arc<Self>, l: TcpListener) -> anyhow::Result<()> {
let mut buf = vec![0u8; 4096];
let mut futs = FuturesUnordered::new();

loop {
let (stream, addr) = l.accept().await.context("error accepting")?;
info!("accepted connection from {addr}");
tokio::select! {
r = l.accept() => {
match r {
Ok((stream, addr)) => {
trace!("accepted connection from {addr}");
futs.push(
self.check_incoming_connection(addr, stream)
.map_err(|e| {
error!("error checking incoming connection: {e:#}");
e
})
.instrument(error_span!("incoming", addr=%addr))
);
}
Err(e) => {
error!("error accepting: {e:#}");
continue;
}
}
},
Some(Ok((live, checked))) = futs.next(), if !futs.is_empty() => {
if let Err(e) = self.handover_checked_connection(live, checked) {
warn!("error handing over incoming connection: {e:#}");
}
},
}
}
Ok(())
}

async fn task_upnp_port_forwarder(self: Arc<Self>, port: u16) -> anyhow::Result<()> {
Expand Down Expand Up @@ -562,7 +667,7 @@ impl Session {
});
}

fn stop(&self) {
pub fn stop(&self) {
let _ = self.cancel_tx.send(());
}

Expand Down
Loading

0 comments on commit f68bcc3

Please sign in to comment.