diff --git a/crates/librqbit/src/peer_connection.rs b/crates/librqbit/src/peer_connection.rs index 9c7b83a8..efef6ab5 100644 --- a/crates/librqbit/src/peer_connection.rs +++ b/crates/librqbit/src/peer_connection.rs @@ -23,7 +23,7 @@ pub trait PeerConnectionHandler { fn on_connected(&self, _connection_time: Duration) {} fn get_have_bytes(&self) -> u64; fn serialize_bitfield_message_to_buf(&self, buf: &mut Vec) -> anyhow::Result; - fn on_handshake(&self, handshake: Handshake) -> anyhow::Result<()>; + fn on_handshake(&self, handshake: Handshake) -> anyhow::Result<()>; fn on_extended_handshake( &self, extended_handshake: &ExtendedHandshake, @@ -120,7 +120,16 @@ impl PeerConnection { } } - pub async fn manage_peer( + pub async fn manage_peer_incoming( + &self, + mut outgoing_chan: tokio::sync::mpsc::UnboundedReceiver, + handshake: Handshake, + socket: tokio::net::TcpSocket, + ) -> anyhow::Result<()> { + todo!() + } + + pub async fn manage_peer_outgoing( &self, mut outgoing_chan: tokio::sync::mpsc::UnboundedReceiver, ) -> anyhow::Result<()> { diff --git a/crates/librqbit/src/peer_info_reader/mod.rs b/crates/librqbit/src/peer_info_reader/mod.rs index 6955aab1..dd69a0a1 100644 --- a/crates/librqbit/src/peer_info_reader/mod.rs +++ b/crates/librqbit/src/peer_info_reader/mod.rs @@ -51,7 +51,7 @@ pub async fn read_metainfo_from_peer( ); let result_reader = async move { result_rx.await? }; - let connection_runner = async move { connection.manage_peer(writer_rx).await }; + let connection_runner = async move { connection.manage_peer_outgoing(writer_rx).await }; tokio::select! { result = result_reader => result, @@ -145,7 +145,7 @@ impl PeerConnectionHandler for Handler { Ok(0) } - fn on_handshake(&self, handshake: Handshake) -> anyhow::Result<()> { + fn on_handshake(&self, handshake: Handshake) -> anyhow::Result<()> { if !handshake.supports_extended() { anyhow::bail!("this peer does not support extended handshaking, which is a prerequisite to download metadata") } diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 47eb680a..0276a081 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -404,7 +404,7 @@ impl TorrentStateLive { .fetch_add(1, Ordering::Relaxed); let res = tokio::select! { r = requester => {r} - r = peer_connection.manage_peer(rx) => {r} + r = peer_connection.manage_peer_outgoing(rx) => {r} }; handler.state.peer_semaphore.add_permits(1); @@ -502,7 +502,7 @@ impl TorrentStateLive { matches!(self.get_next_needed_piece(handle), Ok(Some(_))) } - fn set_peer_live(&self, handle: PeerHandle, h: Handshake) { + fn set_peer_live(&self, handle: PeerHandle, h: Handshake) { let result = self.peers.with_peer_mut(handle, "set_peer_live", |p| { p.state .connecting_to_live(Id20(h.peer_id), &self.peers.stats) @@ -771,7 +771,7 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { Ok(len) } - fn on_handshake(&self, handshake: Handshake) -> anyhow::Result<()> { + fn on_handshake(&self, handshake: Handshake) -> anyhow::Result<()> { self.state.set_peer_live(self.addr, handshake); Ok(()) } diff --git a/crates/peer_binary_protocol/src/lib.rs b/crates/peer_binary_protocol/src/lib.rs index 2535ac7b..f448b87e 100644 --- a/crates/peer_binary_protocol/src/lib.rs +++ b/crates/peer_binary_protocol/src/lib.rs @@ -5,7 +5,7 @@ pub mod extended; use bincode::Options; -use buffers::{ByteBuf, ByteString}; +use buffers::{ByteBuf, ByteBufT, ByteString}; use byteorder::{ByteOrder, BE}; use clone_to_owned::CloneToOwned; use librqbit_core::{constants::CHUNK_SIZE, id20::Id20, lengths::ChunkInfo}; @@ -472,8 +472,8 @@ where } #[derive(Serialize, Deserialize, Debug)] -pub struct Handshake<'a> { - pub pstr: &'a str, +pub struct Handshake { + pub pstr: ByteBuf, pub reserved: [u8; 8], pub info_hash: [u8; 20], pub peer_id: [u8; 20], @@ -485,8 +485,8 @@ fn bopts() -> impl bincode::Options { .with_big_endian() } -impl<'a> Handshake<'a> { - pub fn new(info_hash: Id20, peer_id: Id20) -> Handshake<'static> { +impl Handshake> { + pub fn new(info_hash: Id20, peer_id: Id20) -> Handshake> { debug_assert_eq!(PSTR_BT1.len(), 19); let mut reserved: u64 = 0; @@ -496,19 +496,16 @@ impl<'a> Handshake<'a> { BE::write_u64(&mut reserved_arr, reserved); Handshake { - pstr: PSTR_BT1, + pstr: ByteBuf(PSTR_BT1.as_bytes()), reserved: reserved_arr, info_hash: info_hash.0, peer_id: peer_id.0, } } - pub fn supports_extended(&self) -> bool { - self.reserved[5] & 0x10 > 0 - } - fn bopts() -> impl bincode::Options { - bincode::DefaultOptions::new() - } - pub fn deserialize(b: &[u8]) -> Result<(Handshake<'_>, usize), MessageDeserializeError> { + + pub fn deserialize( + b: &[u8], + ) -> Result<(Handshake>, usize), MessageDeserializeError> { let pstr_len = *b .first() .ok_or(MessageDeserializeError::NotEnoughData(1, "handshake"))?; @@ -526,11 +523,40 @@ impl<'a> Handshake<'a> { expected_len, )) } - pub fn serialize(&self, buf: &mut Vec) { +} + +impl Handshake { + pub fn supports_extended(&self) -> bool { + self.reserved[5] & 0x10 > 0 + } + fn bopts() -> impl bincode::Options { + bincode::DefaultOptions::new() + } + + pub fn serialize(&self, buf: &mut Vec) + where + B: Serialize, + { Self::bopts().serialize_into(buf, &self).unwrap() } } +impl CloneToOwned for Handshake +where + B: CloneToOwned, +{ + type Target = Handshake<::Target>; + + fn clone_to_owned(&self) -> Self::Target { + Handshake { + pstr: self.pstr.clone_to_owned(), + reserved: self.reserved, + info_hash: self.info_hash, + peer_id: self.peer_id, + } + } +} + #[derive(Serialize, Deserialize, Debug, Clone, Copy)] pub struct Request { pub index: u32,