Skip to content
This repository has been archived by the owner on Nov 9, 2019. It is now read-only.

Network and spawning actors #14

Merged
merged 22 commits into from
Jul 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,5 @@ members = [
"aktoro-runtime",

"examples/hello_world",
"examples/net",
]
32 changes: 27 additions & 5 deletions aktoro-channel/src/channel.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,35 @@
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::task::Waker;
use std::sync::Arc;
use std::task;

use crossbeam_queue::SegQueue;
use crossbeam_utils::atomic::AtomicCell;

use crate::counters::Counters;
use crate::error::*;
use crate::message::Message;
use crate::queue::Queue;

type Waker = Arc<AtomicCell<(bool, Option<task::Waker>)>>;

/// A channel allowing senders to pass
/// messages over it, and receivers to
/// retrieve them.
pub(crate) struct Channel<T> {
/// The queue that is holding the
/// messages that have not been
/// received yet.
pub(crate) queue: Queue<Message<T>>,
/// Whether the channel is closed.
pub(crate) closed: AtomicBool,
/// The counters used to store the
/// current number of senders,
/// receivers, messages and the
/// channel's limits.
pub(crate) counters: Counters,
/// A list of the wakers that can
/// be used to wake up receivers.
pub(crate) wakers: SegQueue<Waker>,
}

Expand Down Expand Up @@ -71,9 +85,7 @@ impl<T> Channel<T> {
}
}

/// Registers a new waker to be
/// notified when a new message is
/// available.
/// Registers a new waker.
pub(crate) fn register(&self, waker: Waker) {
self.wakers.push(waker);
}
Expand All @@ -82,7 +94,17 @@ impl<T> Channel<T> {
/// available.
fn notify(&self) {
if let Ok(waker) = self.wakers.pop() {
waker.wake();
match waker.swap((true, None)) {
(true, Some(waker_)) => {
self.wakers.push(waker);
waker_.wake();
}
(true, None) => {
self.wakers.push(waker);
self.notify();
}
_ => self.notify(),
}
}
}

Expand Down
8 changes: 4 additions & 4 deletions aktoro-channel/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::error::Error as StdError;
use std::error;
use std::fmt;
use std::fmt::Debug;
use std::fmt::Display;
Expand Down Expand Up @@ -207,11 +207,11 @@ impl TryRecvError {
}
}

impl StdError for CloneError {}
impl error::Error for CloneError {}

impl<T> StdError for TrySendError<T> {}
impl<T> error::Error for TrySendError<T> {}

impl StdError for TryRecvError {}
impl error::Error for TryRecvError {}

impl Display for CloneError {
fn fmt(&self, fmt: &mut Formatter) -> fmt::Result {
Expand Down
4 changes: 4 additions & 0 deletions aktoro-channel/src/queue.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
use crossbeam_queue::ArrayQueue;
use crossbeam_queue::SegQueue;

/// The queue used by the channels to send
/// and receive data.
pub(crate) enum Queue<T> {
/// The bounded queue variant.
Bounded(ArrayQueue<T>),
/// The unbounded queue variant.
Unbounded(SegQueue<T>),
}

Expand Down
51 changes: 40 additions & 11 deletions aktoro-channel/src/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
use std::task::Waker;

use crossbeam_utils::atomic::AtomicCell;
use futures_core::FusedStream;
use futures_core::Stream;

Expand All @@ -15,7 +17,15 @@ use crate::error::*;
/// implementation.
///
/// [`try_recv`]: #method.try_recv
pub struct Receiver<T>(Option<Arc<Channel<T>>>);
pub struct Receiver<T> {
/// The channel the receiver will
/// receive data from.
channel: Option<Arc<Channel<T>>>,
/// A reference to the space the
/// receiver was assigned to store
/// its waker.
waker: Arc<AtomicCell<(bool, Option<Waker>)>>,
}

impl<T> Receiver<T> {
/// Creates a receiver from a pointer
Expand All @@ -26,13 +36,20 @@ impl<T> Receiver<T> {
// immediately after a channel's
// creation.
channel.counters.add_recver().expect("receivers limit == 0");
Receiver(Some(channel))

let waker = Arc::new(AtomicCell::new((true, None)));
channel.register(waker.clone());

Receiver {
waker,
channel: Some(channel),
}
}

/// Tries to receive a message from
/// the channel.
pub fn try_recv(&self) -> Result<Option<T>, TryRecvError> {
if let Some(channel) = &self.0 {
if let Some(channel) = &self.channel {
match channel.try_recv() {
Ok(Some(msg)) => Ok(Some(msg.unwrap())),
Ok(None) => Ok(None),
Expand All @@ -46,7 +63,7 @@ impl<T> Receiver<T> {
/// Whether the channel the receiver
/// is connected to is closed.
pub fn is_closed(&self) -> bool {
if let Some(channel) = &self.0 {
if let Some(channel) = &self.channel {
channel.check_is_closed()
} else {
true
Expand All @@ -56,15 +73,19 @@ impl<T> Receiver<T> {
/// Closes the channel the receiver
/// is connected to.
pub fn close_channel(&self) {
if let Some(channel) = &self.0 {
self.waker.store((false, None));

if let Some(channel) = &self.channel {
channel.close()
}
}

/// Disconnects the receiver from the
/// channel.
pub fn disconnect(&mut self) {
let channel = if let Some(channel) = self.0.take() {
self.waker.store((false, None));

let channel = if let Some(channel) = self.channel.take() {
channel
} else {
return;
Expand All @@ -79,9 +100,14 @@ impl<T> Receiver<T> {
/// returning a new receiver connected to
/// the same channel, or an error.
pub fn try_clone(&self) -> Result<Self, CloneError> {
if let Some(channel) = &self.0 {
if let Some(channel) = &self.channel {
if channel.counters.add_recver().is_ok() {
Ok(Receiver(Some(channel.clone())))
let waker = Arc::new(AtomicCell::new((true, None)));

Ok(Receiver {
waker,
channel: Some(channel.clone()),
})
} else {
Err(CloneError::limit())
}
Expand All @@ -95,7 +121,9 @@ impl<T> Stream for Receiver<T> {
type Item = T;

fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<T>> {
if let Some(channel) = &self.0 {
self.waker.store((true, None));

if let Some(channel) = &self.channel {
// We try to receive a message...
match channel.try_recv() {
// ...and return it if one is
Expand All @@ -104,7 +132,8 @@ impl<T> Stream for Receiver<T> {
// ...or register the stream's
// waker if none is available...
Ok(None) => {
channel.register(ctx.waker().clone());
self.waker.store((true, Some(ctx.waker().clone())));

Poll::Pending
}
// ...or stop the stream if
Expand All @@ -119,7 +148,7 @@ impl<T> Stream for Receiver<T> {

impl<T> FusedStream for Receiver<T> {
fn is_terminated(&self) -> bool {
if let Some(channel) = &self.0 {
if let Some(channel) = &self.channel {
channel.is_closed() && channel.is_empty()
} else {
true
Expand Down
Loading