Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
Indy2222 committed Jul 7, 2023
1 parent 843ba5d commit 900bb7e
Show file tree
Hide file tree
Showing 5 changed files with 230 additions and 29 deletions.
1 change: 1 addition & 0 deletions crates/net/src/connection/book.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{

use ahash::AHashMap;

// TODO ensure this is more than max retries
/// Connection info should be tossed away after this time.
const MAX_CONN_AGE: Duration = Duration::from_secs(600);

Expand Down
115 changes: 98 additions & 17 deletions crates/net/src/connection/confirms.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
use std::{
cmp::Ordering,
net::SocketAddr,
time::{Duration, Instant},
};

use ahash::AHashSet;
use async_std::{
channel::{SendError, Sender},
sync::{Arc, Mutex},
};

use super::book::{Connection, ConnectionBook};
use crate::{
header::{DatagramHeader, PackageId},
header::{DatagramHeader, PackageId, PackageIdIter},
protocol::MAX_PACKAGE_SIZE,
tasks::OutDatagram,
};
Expand All @@ -23,7 +25,7 @@ const MAX_BUFF_AGE: Duration = Duration::from_millis(100);

#[derive(Clone)]
pub(crate) struct Confirmations {
book: Arc<Mutex<ConnectionBook<Buffer>>>,
book: Arc<Mutex<ConnectionBook<IdReceiver>>>,
}

impl Confirmations {
Expand All @@ -33,16 +35,23 @@ impl Confirmations {
}
}

/// This method marks a package with `id` from `addr` as received.
/// This method checks whether a package with `id` from `addr` was already
/// marked as received in the past. If so it returns true. Otherwise, it
/// marks the package as received and returns false.
///
/// This method should be called exactly once after each reliable package
/// is delivered.
pub(crate) async fn received(&mut self, time: Instant, addr: SocketAddr, id: PackageId) {
/// is delivered and in order.
pub(crate) async fn received(
&mut self,
time: Instant,
addr: SocketAddr,
id: PackageId,
) -> bool {
self.book
.lock()
.await
.update(time, addr, Buffer::new)
.push(time, id);
.update(time, addr, IdReceiver::new)
.push(time, id)
}

/// Send package confirmation datagrams.
Expand Down Expand Up @@ -71,10 +80,10 @@ impl Confirmations {
let mut next = Instant::now() + MAX_BUFF_AGE;
let mut book = self.book.lock().await;

while let Some((addr, buffer)) = book.next() {
if let Some(expiration) = buffer.expiration() {
if force || expiration <= time || buffer.full() {
while let Some(data) = buffer.flush(MAX_PACKAGE_SIZE) {
while let Some((addr, id_receiver)) = book.next() {
if let Some(expiration) = id_receiver.buffer.expiration() {
if force || expiration <= time || id_receiver.buffer.full() {
while let Some(data) = id_receiver.buffer.flush(MAX_PACKAGE_SIZE) {
datagrams
.send(OutDatagram::new(
DatagramHeader::Confirmation,
Expand All @@ -97,6 +106,80 @@ impl Confirmations {
}
}

struct IdReceiver {
// TODO is it ok if this is discarded after 5 minutes?
duplicates: Duplicates,
buffer: Buffer,
}

impl IdReceiver {
fn new() -> Self {
Self {
duplicates: Duplicates::new(),
buffer: Buffer::new(),
}
}

// TODO docs
fn push(&mut self, time: Instant, id: PackageId) -> bool {
if self.duplicates.process(id) {
true
} else {
self.buffer.push(time, id);
false
}
}
}

impl Connection for IdReceiver {
fn pending(&self) -> bool {
!self.buffer.is_empty()
}
}

// TODO tests
struct Duplicates {
highest_id: Option<PackageId>,
holes: AHashSet<PackageId>,
}

impl Duplicates {
fn new() -> Self {
Self {
highest_id: None,
holes: AHashSet::new(),
}
}

/// Registers package as delivered and returns true if it was already
/// delivered in the past.
fn process(&mut self, id: PackageId) -> bool {
match self.highest_id {
Some(highest) => match highest.ordering(id) {
Ordering::Less => {
// TODO invert this?
for hole in PackageIdIter::range(highest.incremented(), id) {
self.holes.insert(hole);
}

self.highest_id = Some(id);
false
}
Ordering::Greater => !self.holes.remove(&id),
Ordering::Equal => true,
},
None => {
for hole in PackageIdIter::range(PackageId::zero(), id) {
self.holes.insert(hole);
}

self.highest_id = Some(id);
false
}
}
}
}

/// Buffer with datagram confirmations.
struct Buffer {
oldest: Instant,
Expand All @@ -113,6 +196,10 @@ impl Buffer {
}
}

fn is_empty(&self) -> bool {
self.buffer.is_empty()
}

/// Pushes another datagram ID to the buffer.
fn push(&mut self, time: Instant, id: PackageId) {
if self.buffer.is_empty() {
Expand Down Expand Up @@ -154,12 +241,6 @@ impl Buffer {
}
}

impl Connection for Buffer {
fn pending(&self) -> bool {
!self.buffer.is_empty()
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
112 changes: 109 additions & 3 deletions crates/net/src/header.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::fmt;
use std::{cmp::Ordering, fmt};

use thiserror::Error;

Expand Down Expand Up @@ -150,20 +150,46 @@ pub(crate) enum HeaderError {
pub(crate) struct PackageId(u32);

impl PackageId {
const MAX: u32 = 0xffffff;

pub(crate) const fn zero() -> Self {
Self(0)
}

/// Increments the counter by one. It wraps around to zero after reaching
/// maximum value.
pub(crate) fn incremented(self) -> Self {
if self.0 >= 0xffffff {
if self.0 >= Self::MAX {
Self(0)
} else {
Self(self.0 + 1)
}
}

/// Returns probable relative ordering of two package IDs.
///
/// Note that the implementation is circular due to wrapping around maximum
/// value and thus the ordering is not transitive.
pub(crate) fn ordering(self, other: PackageId) -> Ordering {
match self.0.cmp(&other.0) {
Ordering::Greater => {
if self.0.abs_diff(other.0) < Self::MAX / 2 {
Ordering::Greater
} else {
Ordering::Less
}
}
Ordering::Less => {
if self.0.abs_diff(other.0) < Self::MAX / 2 {
Ordering::Less
} else {
Ordering::Greater
}
}
Ordering::Equal => Ordering::Equal,
}
}

/// # Panics
///
/// If not exactly 3 bytes are passed.
Expand All @@ -184,6 +210,42 @@ impl PackageId {
}
}

// TODO test
pub(crate) struct PackageIdIter {
current: PackageId,
stop: Option<PackageId>,
}

impl PackageIdIter {
pub(crate) fn counter() -> Self {
Self {
current: PackageId::zero(),
stop: None,
}
}

pub(crate) fn range(start: PackageId, stop: PackageId) -> Self {
Self {
current: start,
stop: Some(stop),
}
}
}

impl Iterator for PackageIdIter {
type Item = PackageId;

fn next(&mut self) -> Option<Self::Item> {
if Some(self.current) == self.stop {
return None;
}

let current = self.current;
self.current = current.incremented();
Some(current)
}
}

impl fmt::Display for PackageId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
Expand Down Expand Up @@ -246,11 +308,55 @@ mod tests {
}

#[test]
fn test_id() {
fn test_incremented() {
let id = PackageId::from_bytes(&[0, 1, 0]);
assert_eq!(id.incremented().to_bytes(), [0, 1, 1]);

let id: PackageId = 0xffffff.try_into().unwrap();
assert_eq!(id.incremented(), 0.try_into().unwrap());
}

#[test]
fn test_ordering() {
assert_eq!(
PackageId::from_bytes(&[0, 1, 1]).ordering(PackageId::from_bytes(&[0, 1, 2])),
Ordering::Less
);
assert_eq!(
PackageId::from_bytes(&[0, 1, 1]).ordering(PackageId::from_bytes(&[0, 1, 0])),
Ordering::Greater
);
assert_eq!(
PackageId::from_bytes(&[0, 1, 1]).ordering(PackageId::from_bytes(&[0, 1, 1])),
Ordering::Equal
);

assert_eq!(
PackageId::from_bytes(&[0, 1, 2]).ordering(PackageId::from_bytes(&[255, 255, 1])),
Ordering::Greater
);
assert_eq!(
PackageId::from_bytes(&[255, 255, 1]).ordering(PackageId::from_bytes(&[0, 1, 2])),
Ordering::Less
);
}

#[test]
fn test_iter() {
let mut counter = PackageIdIter::counter();
assert_eq!(counter.next().unwrap(), PackageId::zero());
assert_eq!(counter.next().unwrap(), PackageId::zero().incremented());
assert_eq!(
counter.next().unwrap(),
PackageId::zero().incremented().incremented()
);

let mut counter = PackageIdIter::range(
PackageId::from_bytes(&[0, 1, 2]),
PackageId::from_bytes(&[0, 1, 4]),
);
assert_eq!(counter.next().unwrap(), PackageId::from_bytes(&[0, 1, 2]));
assert_eq!(counter.next().unwrap(), PackageId::from_bytes(&[0, 1, 3]));
assert!(counter.next().is_none());
}
}
16 changes: 15 additions & 1 deletion crates/net/src/tasks/ureceiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,35 @@ pub(super) async fn run(
loop {
let Ok(result) = timeout(Duration::from_millis(500), datagrams.recv()).await else {
if packages.is_closed() {
// This must be here in case of no incoming packages to ensure
// that the check is done at least once every 500ms.
break;
} else {
continue;
}
};

// This must be here in case of both a) packages are incoming
// frequently (so no timeouts above), b) packages are skipped because
// they are duplicates (so no packages.send() is called).
if packages.is_closed() {
break;
}

let Ok(datagram) = result else {
error!("Datagram receiver channel is unexpectedly closed.");
break;
};

if datagram.header.reliable() {
confirms
let duplicate = confirms
.received(Instant::now(), datagram.source, datagram.header.id())
.await;

if duplicate {
// TODO trace log?
continue;
}
}

let result = packages
Expand Down
Loading

0 comments on commit 900bb7e

Please sign in to comment.