Skip to content

Commit

Permalink
Handshake clone to owned
Browse files Browse the repository at this point in the history
  • Loading branch information
ikatson committed Dec 5, 2023
1 parent a59bd04 commit 1f7925a
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 21 deletions.
13 changes: 11 additions & 2 deletions crates/librqbit/src/peer_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub trait PeerConnectionHandler {
fn on_connected(&self, _connection_time: Duration) {}
fn get_have_bytes(&self) -> u64;
fn serialize_bitfield_message_to_buf(&self, buf: &mut Vec<u8>) -> anyhow::Result<usize>;
fn on_handshake(&self, handshake: Handshake) -> anyhow::Result<()>;
fn on_handshake<B>(&self, handshake: Handshake<B>) -> anyhow::Result<()>;
fn on_extended_handshake(
&self,
extended_handshake: &ExtendedHandshake<ByteBuf>,
Expand Down Expand Up @@ -120,7 +120,16 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
}
}

pub async fn manage_peer(
pub async fn manage_peer_incoming(
&self,
mut outgoing_chan: tokio::sync::mpsc::UnboundedReceiver<WriterRequest>,
handshake: Handshake<ByteString>,
socket: tokio::net::TcpSocket,
) -> anyhow::Result<()> {
todo!()
}

pub async fn manage_peer_outgoing(
&self,
mut outgoing_chan: tokio::sync::mpsc::UnboundedReceiver<WriterRequest>,
) -> anyhow::Result<()> {
Expand Down
4 changes: 2 additions & 2 deletions crates/librqbit/src/peer_info_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub async fn read_metainfo_from_peer(
);

let result_reader = async move { result_rx.await? };
let connection_runner = async move { connection.manage_peer(writer_rx).await };
let connection_runner = async move { connection.manage_peer_outgoing(writer_rx).await };

tokio::select! {
result = result_reader => result,
Expand Down Expand Up @@ -145,7 +145,7 @@ impl PeerConnectionHandler for Handler {
Ok(0)
}

fn on_handshake(&self, handshake: Handshake) -> anyhow::Result<()> {
fn on_handshake<B>(&self, handshake: Handshake<B>) -> anyhow::Result<()> {
if !handshake.supports_extended() {
anyhow::bail!("this peer does not support extended handshaking, which is a prerequisite to download metadata")
}
Expand Down
6 changes: 3 additions & 3 deletions crates/librqbit/src/torrent_state/live/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ impl TorrentStateLive {
.fetch_add(1, Ordering::Relaxed);
let res = tokio::select! {
r = requester => {r}
r = peer_connection.manage_peer(rx) => {r}
r = peer_connection.manage_peer_outgoing(rx) => {r}
};

handler.state.peer_semaphore.add_permits(1);
Expand Down Expand Up @@ -502,7 +502,7 @@ impl TorrentStateLive {
matches!(self.get_next_needed_piece(handle), Ok(Some(_)))
}

fn set_peer_live(&self, handle: PeerHandle, h: Handshake) {
fn set_peer_live<B>(&self, handle: PeerHandle, h: Handshake<B>) {
let result = self.peers.with_peer_mut(handle, "set_peer_live", |p| {
p.state
.connecting_to_live(Id20(h.peer_id), &self.peers.stats)
Expand Down Expand Up @@ -771,7 +771,7 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler {
Ok(len)
}

fn on_handshake(&self, handshake: Handshake) -> anyhow::Result<()> {
fn on_handshake<B>(&self, handshake: Handshake<B>) -> anyhow::Result<()> {
self.state.set_peer_live(self.addr, handshake);
Ok(())
}
Expand Down
54 changes: 40 additions & 14 deletions crates/peer_binary_protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
pub mod extended;

use bincode::Options;
use buffers::{ByteBuf, ByteString};
use buffers::{ByteBuf, ByteBufT, ByteString};
use byteorder::{ByteOrder, BE};
use clone_to_owned::CloneToOwned;
use librqbit_core::{constants::CHUNK_SIZE, id20::Id20, lengths::ChunkInfo};
Expand Down Expand Up @@ -472,8 +472,8 @@ where
}

#[derive(Serialize, Deserialize, Debug)]
pub struct Handshake<'a> {
pub pstr: &'a str,
pub struct Handshake<ByteBuf> {
pub pstr: ByteBuf,
pub reserved: [u8; 8],
pub info_hash: [u8; 20],
pub peer_id: [u8; 20],
Expand All @@ -485,8 +485,8 @@ fn bopts() -> impl bincode::Options {
.with_big_endian()
}

impl<'a> Handshake<'a> {
pub fn new(info_hash: Id20, peer_id: Id20) -> Handshake<'static> {
impl Handshake<ByteBuf<'static>> {
pub fn new(info_hash: Id20, peer_id: Id20) -> Handshake<ByteBuf<'static>> {
debug_assert_eq!(PSTR_BT1.len(), 19);

let mut reserved: u64 = 0;
Expand All @@ -496,19 +496,16 @@ impl<'a> Handshake<'a> {
BE::write_u64(&mut reserved_arr, reserved);

Handshake {
pstr: PSTR_BT1,
pstr: ByteBuf(PSTR_BT1.as_bytes()),
reserved: reserved_arr,
info_hash: info_hash.0,
peer_id: peer_id.0,
}
}
pub fn supports_extended(&self) -> bool {
self.reserved[5] & 0x10 > 0
}
fn bopts() -> impl bincode::Options {
bincode::DefaultOptions::new()
}
pub fn deserialize(b: &[u8]) -> Result<(Handshake<'_>, usize), MessageDeserializeError> {

pub fn deserialize(
b: &[u8],
) -> Result<(Handshake<ByteBuf<'_>>, usize), MessageDeserializeError> {
let pstr_len = *b
.first()
.ok_or(MessageDeserializeError::NotEnoughData(1, "handshake"))?;
Expand All @@ -526,11 +523,40 @@ impl<'a> Handshake<'a> {
expected_len,
))
}
pub fn serialize(&self, buf: &mut Vec<u8>) {
}

impl<B> Handshake<B> {
pub fn supports_extended(&self) -> bool {
self.reserved[5] & 0x10 > 0
}
fn bopts() -> impl bincode::Options {
bincode::DefaultOptions::new()
}

pub fn serialize(&self, buf: &mut Vec<u8>)
where
B: Serialize,
{
Self::bopts().serialize_into(buf, &self).unwrap()
}
}

impl<B> CloneToOwned for Handshake<B>
where
B: CloneToOwned,
{
type Target = Handshake<<B as CloneToOwned>::Target>;

fn clone_to_owned(&self) -> Self::Target {
Handshake {
pstr: self.pstr.clone_to_owned(),
reserved: self.reserved,
info_hash: self.info_hash,
peer_id: self.peer_id,
}
}
}

#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
pub struct Request {
pub index: u32,
Expand Down

0 comments on commit 1f7925a

Please sign in to comment.