Skip to content

Commit

Permalink
Fixing tcp_shutdown functionality (#7)
Browse files Browse the repository at this point in the history
* Fixing tcp_shutdown functionality

* Bumping up main version
  • Loading branch information
aalykiot authored Jun 25, 2024
1 parent 0ab67d1 commit 1047453
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 23 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
[package]
name = "dune_event_loop"
description = "Dune's event-loop library used for timers, async tasks and TCP connections"
description = "A multi-platform event loop library 🎡"
authors = ["Alex Alikiotis <alexalikiotis5@gmail.com>"]
version = "0.1.0"
version = "0.1.1"
repository = "https://gitlab.com/aalykiot/ev_loop"
license = "MIT"
readme = "README.md"
Expand Down
41 changes: 21 additions & 20 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ use rayon::ThreadPool;
use rayon::ThreadPoolBuilder;
pub use signal_hook::consts::signal as Signal;
use signal_hook::low_level::emulate_default_handler;
use std::any::type_name;
use std::borrow::Cow;
use std::cell::Cell;
use std::collections::BTreeMap;
use std::collections::HashMap;
Expand Down Expand Up @@ -49,10 +47,6 @@ pub type Index = u32;

/// All objects that are tracked by the event-loop should implement the `Resource` trait.
trait Resource: Downcast + 'static {
/// Returns a string representation of the resource.
fn name(&self) -> Cow<str> {
type_name::<Self>().into()
}
/// Custom way to close any resources.
fn close(&mut self) {}
}
Expand All @@ -75,6 +69,8 @@ struct TaskWrap {

impl Resource for TaskWrap {}

type OneShotCallback = dyn FnOnce(LoopHandle) + 'static;

// Wrapper types for the task resource.
type Task = Box<dyn FnOnce() -> TaskResult + Send>;
type TaskOnFinish = Box<dyn FnOnce(LoopHandle, TaskResult) + 'static>;
Expand All @@ -86,12 +82,6 @@ type TcpListenerOnConnection = Box<dyn FnMut(LoopHandle, Index, Result<TcpSocket
type TcpOnWrite = Box<dyn FnOnce(LoopHandle, Index, Result<usize>) + 'static>;
type TcpOnRead = Box<dyn FnMut(LoopHandle, Index, Result<Vec<u8>>) + 'static>;

// Wrapper around check callbacks.
type OnCheck = Box<dyn FnOnce(LoopHandle) + 'static>;

// Wrapper around close callbacks.
type OnClose = Box<dyn FnOnce(LoopHandle) + 'static>;

// Wrapper around fs events callbacks.
type FsWatchOnEvent = Box<dyn FnMut(LoopHandle, FsEvent) + 'static>;

Expand Down Expand Up @@ -131,7 +121,7 @@ pub struct TcpSocketInfo {

/// Describes a callback that will run once after the Poll phase.
struct CheckWrap {
cb: Option<OnCheck>,
cb: Option<Box<OneShotCallback>>,
}

impl Resource for CheckWrap {}
Expand Down Expand Up @@ -287,8 +277,8 @@ enum Action {
TcpListenReq(Index, TcpListenerWrap),
TcpWriteReq(Index, Vec<u8>, TcpOnWrite),
TcpReadStartReq(Index, TcpOnRead),
TcpCloseReq(Index, OnClose),
TcpShutdownReq(Index),
TcpCloseReq(Index, Box<OneShotCallback>),
TcpShutdownReq(Index, Box<OneShotCallback>),
CheckReq(Index, CheckWrap),
CheckRemoveReq(Index),
FsEventStartReq(Index, FsWatcherWrap),
Expand Down Expand Up @@ -343,7 +333,7 @@ pub struct EventLoop {
action_queue_empty: Rc<Cell<bool>>,
action_dispatcher: Rc<mpsc::Sender<Action>>,
check_queue: Vec<Index>,
close_queue: Vec<(Index, Option<OnClose>)>,
close_queue: Vec<(Index, Option<Box<OneShotCallback>>)>,
thread_pool: ThreadPool,
thread_pool_tasks: usize,
event_dispatcher: Arc<Mutex<mpsc::Sender<Event>>>,
Expand Down Expand Up @@ -456,7 +446,7 @@ impl EventLoop {
Action::TcpWriteReq(index, data, cb) => self.tcp_write_req(index, data, cb),
Action::TcpReadStartReq(index, cb) => self.tcp_read_start_req(index, cb),
Action::TcpCloseReq(index, cb) => self.tcp_close_req(index, cb),
Action::TcpShutdownReq(index) => self.tcp_shutdown_req(index),
Action::TcpShutdownReq(index, cb) => self.tcp_shutdown_req(index, cb),
Action::CheckReq(index, cb) => self.check_req(index, cb),
Action::CheckRemoveReq(index) => self.check_remove_req(index),
Action::FsEventStartReq(index, w_wrap) => self.fs_event_start_req(index, w_wrap),
Expand Down Expand Up @@ -1034,7 +1024,11 @@ impl EventLoop {
}

/// Closes the write side of the stream.
fn tcp_shutdown_req(&mut self, index: Index) {
fn tcp_shutdown_req(
&mut self,
index: Index,
on_shutdown: Box<dyn FnOnce(LoopHandle) + 'static>,
) {
// Get resource by it's ID.
let resource = match self.resources.get_mut(&index) {
Some(resource) => resource,
Expand All @@ -1043,6 +1037,7 @@ impl EventLoop {

// Cast resource to TcpStreamWrap.
resource.downcast_mut::<TcpStreamWrap>().unwrap().close();
on_shutdown(self.handle())
}

/// Schedules a new check callback.
Expand Down Expand Up @@ -1284,8 +1279,14 @@ impl LoopHandle {
}

/// Closes the write side of the TCP stream.
pub fn tcp_shutdown(&self, index: Index) {
self.actions.send(Action::TcpShutdownReq(index)).unwrap();
pub fn tcp_shutdown<F>(&self, index: Index, on_shutdown: F)
where
F: FnOnce(LoopHandle) + 'static,
{
self.actions
.send(Action::TcpShutdownReq(index, Box::new(on_shutdown)))
.unwrap();

self.actions_queue_empty.set(false);
}

Expand Down

0 comments on commit 1047453

Please sign in to comment.