Skip to content

Commit

Permalink
Breaking: Remove Disconnect Channel in ChannelReporter implementation
Browse files Browse the repository at this point in the history
Previously we used a oneshot channel to rendezvous between the reporter and the listener when disconnecting the event channel between them. Doing so was required for `ChannelReporter::disconnect`, to wait for already received events to be handled.

A disadvantage of how the listener was setup in conjunction with the reporter, made it difficult to have the thread used by the channel based implementation, return a value (at the end of the scope of the thread, i.e. without more channels).

In the new implementation, in Reporter::disconnect, we just use the fact that when all senders of the channel are dropped, the channel will be in a disconnected state.

Since already received events may still be processed, we split up the disconnecting and finishing up of the processing of events in two parts.

The user is now responsible to wait for the processing of events to finish, instead of the disconnect method.

This can be achieved by running FinishProcessing::finish_processing which will block until all events have been processed.
  • Loading branch information
foresterre committed Jun 16, 2022
1 parent 8c4d485 commit a70922f
Show file tree
Hide file tree
Showing 12 changed files with 293 additions and 286 deletions.
7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "storyteller"
version = "0.4.2"
version = "0.5.0"
edition = "2018"

authors = ["Martijn Gribnau <garm@ilumeo.com>"]
Expand All @@ -17,8 +17,6 @@ exclude = ["/.github", "docs/sketches/*.png"]
default = ["channel_reporter"]
channel_reporter = ["crossbeam-channel"]

experimental_handle_disconnect_ack = []

[dependencies.crossbeam-channel]
version = "0.5"
optional = true
Expand All @@ -28,6 +26,9 @@ serde = { version = "1", features = ["derive"] }
serde_json = "1"
indicatif = "0.16.2"

# parameterized tests
yare = "1.0.1"

[[example]]
name = "json_lines"
required-features = ["channel_reporter"]
19 changes: 10 additions & 9 deletions examples/json_lines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,13 @@ use std::io::{Stderr, Write};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::{io, thread};
use storyteller::EventHandler;
use storyteller::{EventHandler, FinishProcessing};

use storyteller::{
disconnect_channel, event_channel, ChannelEventListener, ChannelReporter, EventListener,
Reporter,
};
use storyteller::{event_channel, ChannelEventListener, ChannelReporter, EventListener, Reporter};

// See the test function `bar` in src/tests.rs for an example where the handler is a progress bar.
fn main() {
let (sender, receiver) = event_channel::<ExampleEvent>();
let (disconnect_sender, disconnect_receiver) = disconnect_channel();

// Handlers are implemented by you. Here you find one which writes jsonlines messages to stderr.
// This can be anything, for example a progress bar (see src/tests.rs for an example of this),
Expand All @@ -21,17 +17,17 @@ fn main() {
let handler = JsonHandler::default();

// This one is included with the library. It just needs to be hooked up with a channel.
let reporter = ChannelReporter::new(sender, disconnect_receiver);
let reporter = ChannelReporter::new(sender);

// This one is also included with the library. It also needs to be hooked up with a channel.
let listener = ChannelEventListener::new(receiver, disconnect_sender);
let listener = ChannelEventListener::new(receiver);

// Here we use the jsonlines handler we defined above, in combination with the default `EventListener`
// implementation on the `ChannelEventListener` we used above.
//
// If we don't run the handler, we'll end up in an infinite loop, because our `reporter.disconnect()`
// below will block until it receives a Disconnect message.
listener.run_handler(handler);
let fin = listener.run_handler(handler);

#[allow(unused_must_use)]
// sending events can fail, but we'll assume they won't for this example
Expand All @@ -55,7 +51,12 @@ fn main() {
reporter.report_event(ExampleEvent::text("[status]\t\tFour"));
}

// Within the ChannelReporter, the sender is dropped, thereby disconnecting the channel
// Already sent events can still be processed.
let _ = reporter.disconnect();

// To keep the processing of already sent events alive, we block the handler
let _ = fin.finish_processing();
}

// ------- Events + Disconnect
Expand Down
48 changes: 12 additions & 36 deletions src/channel_reporter/channel.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
//! Channels which can be used by the `ChannelReporter` and `ChannelEventListener`.

use crate::Disconnect;
use std::fmt::Formatter;
use std::{any, fmt};

Expand All @@ -14,12 +13,18 @@ pub fn event_channel<Event>() -> (EventSender<Event>, EventReceiver<Event>) {
}

/// A sender, used by `ChannelReporter` and `ChannelEventListener`.
#[derive(Clone)]
pub struct EventSender<T>(crossbeam_channel::Sender<T>);

impl<T> EventSender<T> {
pub fn send(&self, message: T) -> Result<(), EventSendError<T>> {
self.0.send(message).map_err(|err| EventSendError(err.0))
}

/// When all senders are disconnected, the channel is disconnected
pub fn disconnect(self) {
drop(self.0)
}
}

/// A receiver, used by `ChannelReporter` and `ChannelEventListener`.
Expand All @@ -31,6 +36,12 @@ impl<T> EventReceiver<T> {
}
}

impl<T> Clone for EventReceiver<T> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}

#[derive(PartialEq, Eq, Clone, Copy)]
pub struct EventSendError<T>(pub T);

Expand All @@ -42,38 +53,3 @@ impl<T> fmt::Debug for EventSendError<T> {

#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub struct EventRecvError;

// --- Disconnect channel variants

/// A sender, used to communicate Disconnect's between the `ChannelReporter` and `ChannelEventListener`.
pub struct DisconnectSender(crossbeam_channel::Sender<Disconnect>);

impl DisconnectSender {
pub fn acknowledge_disconnection(&self) -> Result<(), DisconnectSendError> {
self.0.send(Disconnect).map_err(|_| DisconnectSendError)
}
}

/// A receiver, used to communicate Disconnect's between the `ChannelReporter` and `ChannelEventListener`.
pub struct DisconnectReceiver(crossbeam_channel::Receiver<Disconnect>);

impl DisconnectReceiver {
pub(crate) fn recv(&self) -> Result<Disconnect, DisconnectRecvError> {
self.0.recv().map_err(|_| DisconnectRecvError)
}
}

/// A channel used to by the `ChannelEventListener` to acknowledge the disconnection of the `ChannelReporter`.
///
/// Allows us to wait
pub fn disconnect_channel() -> (DisconnectSender, DisconnectReceiver) {
let (sender, receiver) = crossbeam_channel::bounded::<Disconnect>(0);

(DisconnectSender(sender), DisconnectReceiver(receiver))
}

#[derive(PartialEq, Eq, Clone, Copy, Debug)]
pub struct DisconnectSendError;

#[derive(PartialEq, Eq, Clone, Copy, Debug)]
pub struct DisconnectRecvError;
184 changes: 70 additions & 114 deletions src/channel_reporter/listener.rs
Original file line number Diff line number Diff line change
@@ -1,113 +1,33 @@
use crate::{DisconnectSender, EventHandler, EventListener, EventReceiver};
use crate::listener::FinishProcessing;
use crate::{EventHandler, EventListener, EventReceiver};
use std::thread;
use std::thread::JoinHandle;

/// A listener which uses a channel to receive messages of type `Event`, and uses
/// a thread to run the event handler (in [`crate::ChannelEventListener::run_handler`].
/// a thread to run the event handler (in [`ChannelEventListener::run_handler`]).
///
/// The channels required to create an instance can be created by calling the [`crate::event_channel`]
/// and [`crate::disconnect_channel`] functions.
/// The channel based receiver required to create an instance can be created by calling the
/// [`event_channel()`] function.
///
/// The [`crate::Reporter`] associated with this event listener is the [`crate::ChannelReporter`].
/// The [`Reporter`] associated with this event listener is the [`ChannelReporter`].
///
/// [`ChannelEventListener::run_handler`]: crate::ChannelEventListener::run_handler
/// [`event_channel()`]: crate::event_channel
/// [`Reporter`]: crate::Reporter
/// [`ChannelReporter`]: crate::ChannelReporter
pub struct ChannelEventListener<Event> {
message_receiver: EventReceiver<Event>,
disconnect_sender: DisconnectSender,
event_receiver: EventReceiver<Event>,
}

impl<Event> ChannelEventListener<Event> {
/// Create a new channel based event listener.
///
/// The channels required to create an instance can be created by calling the [`crate::event_channel`]
/// and [`crate::disconnect_channel`] functions.
pub fn new(
message_receiver: EventReceiver<Event>,
disconnect_sender: DisconnectSender,
) -> Self {
Self {
message_receiver,
disconnect_sender,
}
}

/// If you use `ChannelEventListener` by wrapping it, instead of using it directly,
/// for example if you want to write your own `EventListener` implementation,
/// you will need this `&EventReceiver` to receive events.
///
/// ### Example
///
/// **NB:** This example should **not** be used on its own! It does not contain a fully working listener!
/// See [`crate::EventListener`] on how to implement your own listener instead.
///
/// ```no_run
/// // NB: This example is incomplete!
/// // It does not contain a fully working listener!
///
/// use storyteller::{ChannelEventListener, EventHandler, EventListener};
///
/// struct MyEvent;
///
/// struct WrappingListener {
/// listener: ChannelEventListener<MyEvent>,
/// }
///
/// impl EventListener for WrappingListener {
/// type Event = MyEvent;
///
/// fn run_handler<H>(self, handler: H) where H: EventHandler<Event=Self::Event> {
///
/// let disconnect_sender = self.listener.disconnect_sender();
/// let message_receiver = self.listener.message_receiver(); // <---
///
/// loop {
/// if let Err(_) = message_receiver.recv() {
/// disconnect_sender.acknowledge_disconnection().unwrap();
/// }
/// }
/// }
/// }
/// ```
pub fn message_receiver(&self) -> &EventReceiver<Event> {
&self.message_receiver
}

/// If you use `ChannelEventListener` by wrapping it, instead of using it directly,
/// for example if you want to write your own `EventListener` implementation,
/// you will need this `&DisconnectSender` to acknowledge when a reporter disconnects.
///
/// ### Example
///
/// **NB:** This example should **not*** be used on its own! It does not contain a fully working listener!
/// See [`crate::EventListener`] on how to implement your own listener instead.
///
/// ```no_run
/// // NB: This example is incomplete!
/// // It does not contain a fully working listener!
///
/// use storyteller::{ChannelEventListener, EventHandler, EventListener};
///
/// struct MyEvent;
///
/// struct WrappingListener {
/// listener: ChannelEventListener<MyEvent>,
/// }
/// The channel based receiver required to create an instance can be created by calling the
/// [`event_channel()`] function.
///
/// impl EventListener for WrappingListener {
/// type Event = MyEvent;
///
/// fn run_handler<H>(self, handler: H) where H: EventHandler<Event=Self::Event> {
///
/// let disconnect_sender = self.listener.disconnect_sender(); // <---
/// let message_receiver = self.listener.message_receiver();
///
/// loop {
/// if let Err(_) = message_receiver.recv() {
/// disconnect_sender.acknowledge_disconnection().unwrap();
/// }
/// }
/// }
/// }
/// ```
pub fn disconnect_sender(&self) -> &DisconnectSender {
&self.disconnect_sender
/// [`event_channel()`]: crate::event_channel
pub fn new(event_receiver: EventReceiver<Event>) -> Self {
Self { event_receiver }
}
}

Expand All @@ -116,32 +36,68 @@ where
Event: Send + 'static,
{
type Event = Event;
type FinishProcessingHandle = ChannelFinalizeHandler;

fn run_handler<H>(self, handler: H)
fn run_handler<H>(&self, handler: H) -> Self::FinishProcessingHandle
where
H: EventHandler<Event = Self::Event>,
H: EventHandler<Event = Self::Event> + 'static,
{
thread::spawn(move || {
let disconnect_sender = self.disconnect_sender();
let message_receiver = self.message_receiver();
let event_receiver = self.event_receiver.clone();

loop {
match message_receiver.recv() {
let handle = thread::spawn(move || {
//
'evl: loop {
match event_receiver.recv() {
Ok(message) => handler.handle(message),
Err(_disconnect) => {
handler.finish();

let _ack = disconnect_sender.acknowledge_disconnection();

#[cfg(not(feature = "experimental_handle_disconnect_ack"))]
{
_ack.expect("Failed to send disconnect acknowledgement!");
}

break;
break 'evl;
}
}
}
});

ChannelFinalizeHandler::new(handle)
}
}

/// A [`FinishProcessing`] implementation for the [`ChannelEventListener`].
/// Used to wait for the [`EventHandler`] ran by the `listener` to finish processing
/// events.
///
/// ### Caution: Infinite looping
///
/// Calling [`FinishProcessing::finish_processing`] without first disconnecting
/// the sender channel of the reporter will cause the program to be stuck in an infinite
/// loop.
///
/// The reason for this is that disconnecting the channel causes the loop to process
/// a disconnect event, where we break out of the loop. If this disconnect does not
/// happen, the thread processing events will not be finished, and
/// [`FinishProcessing::finish_processing`] will block, since it waits for the thread
/// to be finished.
///
/// To disconnect the sender channel of the reporter, call [`ChannelReporter::disconnect`].
///
/// [`FinishProcessing`]: crate::FinishProcessing
/// [`EventHandler`]: crate::EventHandler
/// [`ChannelEventListener`]: crate::ChannelEventListener
/// [`ChannelReporter::disconnect`]: crate::ChannelReporter::disconnect
#[must_use]
pub struct ChannelFinalizeHandler {
handle: JoinHandle<()>,
}

impl ChannelFinalizeHandler {
fn new(handle: JoinHandle<()>) -> Self {
Self { handle }
}
}

impl FinishProcessing for ChannelFinalizeHandler {
type Err = ();

fn finish_processing(self) -> Result<(), Self::Err> {
self.handle.join().map_err(|_| ())
}
}
Loading

0 comments on commit a70922f

Please sign in to comment.