Skip to content

Commit

Permalink
thats right... all at once
Browse files Browse the repository at this point in the history
  • Loading branch information
onlycs committed Aug 30, 2023
1 parent 049512d commit 7e00669
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 143 deletions.
26 changes: 18 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ chrono = "0.4.26"
dotenv = "0.15.0"
dotenv_codegen = "0.15.0"
futures = "0.3.28"
itertools = "0.11.0"
log = "0.4.19"
poise = "0.5.5"
serde = { version = "1.0.171", features = ["derive"] }
Expand Down
135 changes: 26 additions & 109 deletions src/events/emitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,18 @@ type Output = Result<()>;
type AsyncOutput = BoxFuture<'static, Output>;

pub struct Listener {
callback: Arc<dyn Fn(Vec<u8>, serenity::Context) -> Output + Send + Sync>,
filter: Option<Arc<dyn Fn(Vec<u8>) -> bool + Send + Sync + 'static>>,
}

pub struct AsyncListener {
callback: Arc<dyn Fn(Vec<u8>, serenity::Context) -> AsyncOutput + Send + Sync>,
filter: Option<Arc<dyn Fn(Vec<u8>) -> bool + Send + Sync + 'static>>,
}

pub struct EventEmitter {
listeners: HashMap<TypeId, Vec<Listener>>,
async_listeners: HashMap<TypeId, Vec<AsyncListener>>,
}

impl EventEmitter {
pub fn new() -> Self {
Self {
listeners: HashMap::new(),
async_listeners: HashMap::new(),
}
}

Expand All @@ -41,106 +34,32 @@ impl EventEmitter {
if let Some(listeners) = self.listeners.get_mut(&TypeId::of::<Event>()) {
let bytes: Vec<u8> = serde_json::to_vec(&argument)?;

for listener in listeners.iter_mut() {
let bytes = bytes.clone();
let callback = Arc::clone(&listener.callback);
let context = context.clone();

if let Some(filter) = &listener.filter {
if !filter(bytes.clone()) {
continue;
let to_run = listeners
.iter()
.map(|listener| {
let callback = Arc::clone(&listener.callback);
let filter = listener.filter.as_ref().map(Arc::clone);
let bytes = bytes.clone();

async move {
if let Some(filter) = filter {
if !filter(bytes.clone()) {
return;
}
}

callback(bytes, context.clone()).await.unwrap();
}
}
})
.collect_vec();

thread::spawn(move || {
callback(bytes, context.clone()).unwrap();
});
}
}

if let Some(listeners) = self.async_listeners.get_mut(&TypeId::of::<Event>()) {
let bytes: Vec<u8> = serde_json::to_vec(&argument)?;

for listener in listeners.iter_mut() {
let bytes = bytes.clone();
let callback = Arc::clone(&listener.callback);
let context = context.clone();

if let Some(filter) = &listener.filter {
if !filter(bytes.clone()) {
continue;
}
}

tokio::spawn(async move {
callback(bytes, context.clone()).await.unwrap();
});
}
futures::future::join_all(to_run).await;
}

Ok(())
}

pub fn on<Event, Callback>(
&mut self,
_event: Event, /* making the user specify generic argument for this looks ugly af */
callback: Callback,
) where
Event: EmitterEvent,
Callback:
Fn(Event::Argument, serenity::Context) -> Result<()> + Send + Sync + 'static + 'static,
{
let parsed_callback = move |bytes: Vec<u8>, ctx: serenity::Context| {
callback(serde_json::from_slice(&bytes)?, ctx)
};

let listener = Listener {
callback: Arc::new(parsed_callback),
filter: None,
};

match self.listeners.get_mut(&TypeId::of::<Event>()) {
Some(callbacks) => {
callbacks.push(listener);
}
None => {
self.listeners.insert(TypeId::of::<Event>(), vec![listener]);
}
}
}

pub fn on_filter<Event, Callback, Filter>(
&mut self,
_event: Event, /* making the user specify generic argument for this looks ugly af */
callback: Callback,
filter: Filter,
) where
Event: EmitterEvent,
Callback: Fn(Event::Argument, serenity::Context) -> Result<()> + Send + Sync + 'static,
Filter: Fn(Event::Argument) -> bool + Send + Sync + 'static,
{
let parsed_callback = move |bytes: Vec<u8>, ctx: serenity::Context| {
callback(serde_json::from_slice(&bytes)?, ctx)
};

let parsed_filter = move |bytes: Vec<u8>| filter(serde_json::from_slice(&bytes).unwrap());

let listener = Listener {
callback: Arc::new(parsed_callback),
filter: Some(Arc::new(parsed_filter)),
};

match self.listeners.get_mut(&TypeId::of::<Event>()) {
Some(callbacks) => {
callbacks.push(listener);
}
None => {
self.listeners.insert(TypeId::of::<Event>(), vec![listener]);
}
}
}

pub fn on_async<Event, Callback, Fut>(
pub fn on<Event, Callback, Fut>(
&mut self,
_event: Event, /* making the user specify generic argument for this looks ugly af */
callback: Callback,
Expand All @@ -153,23 +72,22 @@ impl EventEmitter {
callback(serde_json::from_slice(&bytes).unwrap(), ctx).boxed()
};

let listener = AsyncListener {
let listener = Listener {
callback: Arc::new(parsed_callback),
filter: None,
};

match self.async_listeners.get_mut(&TypeId::of::<Event>()) {
match self.listeners.get_mut(&TypeId::of::<Event>()) {
Some(async_callbacks) => {
async_callbacks.push(listener);
}
None => {
self.async_listeners
.insert(TypeId::of::<Event>(), vec![listener]);
self.listeners.insert(TypeId::of::<Event>(), vec![listener]);
}
}
}

pub fn on_async_filter<Event, Callback, Fut, Filter>(
pub fn on_filter<Event, Callback, Fut, Filter>(
&mut self,
_event: Event, /* making the user specify generic argument for this looks ugly af */
callback: Callback,
Expand All @@ -186,18 +104,17 @@ impl EventEmitter {

let parsed_filter = move |bytes: Vec<u8>| filter(serde_json::from_slice(&bytes).unwrap());

let listener = AsyncListener {
let listener = Listener {
callback: Arc::new(parsed_callback),
filter: Some(Arc::new(parsed_filter)),
};

match self.async_listeners.get_mut(&TypeId::of::<Event>()) {
match self.listeners.get_mut(&TypeId::of::<Event>()) {
Some(async_callbacks) => {
async_callbacks.push(listener);
}
None => {
self.async_listeners
.insert(TypeId::of::<Event>(), vec![listener]);
self.listeners.insert(TypeId::of::<Event>(), vec![listener]);
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/features/starboard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ pub async fn register(ctx: &serenity::Context) -> Result<()> {

let mut emitter = emitter_mutex.lock().await;

emitter.on_async_filter(
emitter.on_filter(
events::MessageReactionAdd,
|payload, ctx| async move {
star_no_interaction(&ctx, &payload.message).await?;
Expand All @@ -205,7 +205,7 @@ pub async fn register(ctx: &serenity::Context) -> Result<()> {
},
);

emitter.on_async_filter(
emitter.on_filter(
events::ComponentInteractionEvent,
|interaction, ctx| async move {
let author = &interaction.user;
Expand Down Expand Up @@ -251,7 +251,7 @@ pub async fn register(ctx: &serenity::Context) -> Result<()> {
|interaction| interaction.data.custom_id == "oreo_starboard_delete",
);

emitter.on_async_filter(
emitter.on_filter(
events::ModalInteractionEvent,
|interaction, ctx| async move {
let prisma = prisma::create().await?;
Expand Down
Loading

0 comments on commit 7e00669

Please sign in to comment.