Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(events): Scaffold EventDispatcher #132

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

felipecsl
Copy link
Member

@felipecsl felipecsl commented Dec 21, 2024

Description

This is a very simple and crude EventDispatcher implementation for async event delivery. It doesn't yet handle retries, online/offline modes or actually perform a delivery (it just fakes it for now).

It does start a timer on the provided schedule and stops as needed according to whether or not we have pending events for delivery.
Note: This was more or less ported/adapted from the JS implementation: https://github.com/Eppo-exp/js-sdk-common/blob/main/src/events/default-event-dispatcher.ts

For now, mainly looking for feedback on the high level approach taken, especially around the async handling with tokio::spawn, interval_at, etc. - wink wink @rasendubi

Testing

Just wrote one test for now, will write more before merging.

@@ -36,7 +36,7 @@ serde-bool = "0.1.3"
serde_json = "1.0.116"
serde_with = { version = "3.11.0", default-features = false, features = ["base64", "hex", "macros"] }
thiserror = "2.0.3"
tokio = { version = "1.34.0", features = ["rt", "time"] }
tokio = { version = "1.34.0", features = ["full"] }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needed for #[tokio::test]

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tokio::test only needs macros. Full feature set has more dependencies, and libraries are not supposed to use it

macros is also a dev-only dependency, so we should add

tokio = { features = ["macros"] }

to dev dependencies

Copy link
Member

@leoromanovsky leoromanovsky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice starting! Looking forward to learning from Oleksii.

#[derive(Debug, Serialize, Deserialize)]
pub struct Event {
pub uuid: String,
pub timestamp: i64,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a shared Timestamp type in timestamp.rs

@@ -47,7 +47,7 @@ impl Configuration {
/// Return bandit configuration for the given key.
///
/// Returns `None` if bandits are missing for bandit does not exist.
pub(crate) fn get_bandit<'a>(&'a self, bandit_key: &str) -> Option<&'a BanditConfiguration> {
pub(crate) fn get_bandit(&self, bandit_key: &str) -> Option<&BanditConfiguration> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✔️ I have been using RustRover for work in this repository and it also gave me this feedback.

Screenshot 2024-12-20 at 4 45 53 PM


#[derive(Debug, Serialize, Deserialize)]
pub struct Event {
pub uuid: String,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

considering adding a UUID type that serialized to a string and we can re-use across the product.

Suggested change
pub uuid: String,
pub uuid: MyUuid,

elsewhere:

/// A wrapper around `uuid::Uuid` to avoid direct user dependency on the `uuid` crate.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(transparent)]
pub struct MyUuid(
    #[serde(with = "uuid::serde")]
    uuid::Uuid
);

impl MyUuid {
    /// Generate a new random UUID (v4).
    pub fn new_v4() -> Self {
        MyUuid(uuid::Uuid::new_v4())
    }

    /// Parse a UUID from a string.
    pub fn parse_str(s: &str) -> Result<Self, uuid::Error> {
        uuid::Uuid::parse_str(s).map(MyUuid)
    }

    /// Get the inner UUID if necessary.
    pub fn as_inner(&self) -> &uuid::Uuid {
        &self.0
    }
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Depending on what library we use to generate UUIDs, it likely already has serde support directly, so we could use their Uuid type directly:

Suggested change
pub uuid: String,
pub uuid: uuid::Uuid,

pub struct Event {
pub uuid: String,
pub timestamp: i64,
pub event_type: String,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is an enum possible here or is this going to be open ended?

}

pub fn push(&self, event: Event) {
let mut queue = self.event_queue.lock().unwrap();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I see unwrap in code that hints to me that something could go wrong in an exceptional condition. I'm not yet familiar with the VecDeque class so I asked around:

Screenshot 2024-12-20 at 4 58 00 PM

pub struct EventDispatcher {
config: EventDispatcherConfig,
batch_processor: BatchEventProcessor,
delivery_task_active: Arc<Mutex<bool>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe Mutex<bool> is blocking to the calling thread.

Depending on our need consider Arc<AtomicBool> or tokio::sync::Mutex - I will do some more reading to further understand the difference but the former is used in the library already.

Comment on lines +37 to +40
// Start the delivery loop if it's not already active
if !self.is_delivery_timer_active() {
self.start_delivery_loop();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

multiple callers could enter here.

consider performing an atomic operation to change from false to true exactly once. I tried this locally:

Screenshot 2024-12-20 at 5 17 15 PM

Copy link
Collaborator

@rasendubi rasendubi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took me a bit of time to figure out what's going on and what we're trying to achieve. My understanding is that we're developing a service for event ingestion, so users wouldn't need to implement their own assignment loggers

General questions:

  • Why doesn't EventDispatcher plug into assignment logger interface and creates a new abstraction for the user? Assignment logger interface seems like a perfect fit
  • Shutting down SDK may lead to loosing event. We need a way to flush the queue

On rust implementation:

The main concern is that it doesn't fit our runtime model. We don't have tokio runtime freely available, especially on the main thread which is called from other languages. We have a poller thread running a tokio runtime, but it doesn't run when polling is disabled.

So our options are:

  • Spawn a new native thread for event dispatcher, likely running a new tokio runtime.
  • Multiplex event dispatcher with the poller thread, making it a generic "eppo background" thread.

The former is simpler and has better isolation but is somewhat wasteful. The later has better performance and centralized "thread management" in one place (there are some caveats with spawning threads in forking web servers, so having less threads is better). So I'm leaning towards having a single background thread.

Regardless of the approach we choose, the good first step is to rewrite the sending part of event dispatcher as an async task (that is always running). After we do that, we can decide whether to spawn it on a shared background thread or a dedicated thread.

For how to structure that task, I would center it around a command channel (so that dispatch is simpler and doesn't need to know how to spawn tasks):

// pseudo-code
enum EventDispatcherCommand {
    Event(Event),
    Flush,
}

pub fn dispatch(&self, event: Event) {
    // Send command to sender end of an unbounded channel
    self.tx.send(EventDispatcherCommand::Event(event))
        // TODO: handle/log error instead of panicking
        .expect("receiver should not be closed before all senders are closed")
}

// task:
async fn event_dispatcher(mut rx: UnboundedReceiver<EventDispatcherCommand>) {
    loop {
        let mut batch = Vec::new();

        // Wait for the first event in the batch.
        //
        // Optimization: Moved outside of the loop below, so we're not woken up on regular intervals
        // unless we have something to send. (This achieves a similar effect as starting/stopping
        // delivery loop.)
        match rx.recv().await {
            None => {
                // Channel closed, no more messages. Exit the main loop.
                return;
            }
            Some(EventDispatcherCommand::Event(event)) => batch.push(event),
            Some(EventDispatcherCommand::Flush) => {
                // No buffered events yet, nothing to flush.
                continue;
            }
        }

        let deadline = Instant::now() + delivery_interval;
        // Loop until we have enough events to send or reached deadline.
        loop {
            tokio::select! {
                _ = tokio::time::sleep_until(deadline) => {
                    // reached deadline -> send everything we have
                    break;
                },
                command = rx => {
                    match command {
                        None => {
                            // channel closed
                            break;
                        },
                        Some(EventDispatcherCommand::Event(event)) => {
                            batch.push(event);
                            if batch.len() >= batch_size {
                                // Reached max batch size -> send events immediately
                                break;
                            } // else loop to get more events
                        },
                        Some(EventDispatcherCommand::Flush) => {
                            break;
                        }
                    }
                }
            }
        }

        // Send `batch` events.
        tokio::spawn(async move {
            // Spawning a new task, so the main task can continue batching events and respond to
            // commands.
        })
    }
}

@@ -36,7 +36,7 @@ serde-bool = "0.1.3"
serde_json = "1.0.116"
serde_with = { version = "3.11.0", default-features = false, features = ["base64", "hex", "macros"] }
thiserror = "2.0.3"
tokio = { version = "1.34.0", features = ["rt", "time"] }
tokio = { version = "1.34.0", features = ["full"] }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tokio::test only needs macros. Full feature set has more dependencies, and libraries are not supposed to use it

macros is also a dev-only dependency, so we should add

tokio = { features = ["macros"] }

to dev dependencies

Comment on lines +12 to +14
pub delivery_interval_ms: u64,
pub retry_interval_ms: u64,
pub max_retry_delay_ms: u64,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer Duration type over numbers


#[derive(Debug, Serialize, Deserialize)]
pub struct Event {
pub uuid: String,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Depending on what library we use to generate UUIDs, it likely already has serde support directly, so we could use their Uuid type directly:

Suggested change
pub uuid: String,
pub uuid: uuid::Uuid,

pub uuid: String,
pub timestamp: i64,
pub event_type: String,
pub payload: HashMap<String, serde_json::Value>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's this? Can we have a more specific type?

Using serde_json::Value usually means that there's going to be two serializations: some type -> serde_json::Value -> JSON string. If we have proper type, we can go from some type to JSON string directly.

//!
//! # Overview
//!
//! `eppo_core` is organized as a set of building blocks that help to build Eppo SDKs. Different
//! languages have different constraints. Some languages might use all building blocks and others
//! might reimplement some pieces in the host language.
//!
//! [`Configuration`] is the heart of an SDK. It is an immutable structure that encapsulates all
//! [`Configuration`] is the heart of the SDK. It is an immutable structure that encapsulates all
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a native speaker but my understanding is that "an" was correct because I don't have a specific SDK in mind. eppo_core is a library for building SDKs

const MIN_BATCH_SIZE: usize = 100;
const MAX_BATCH_SIZE: usize = 10_000;

impl BatchEventProcessor {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it doesn't look like BatchEventProcessor is doing any "processing." It's just a batching queue

#[derive(Debug, Clone)]
pub struct BatchEventProcessor {
batch_size: usize,
event_queue: Arc<Mutex<VecDeque<Event>>>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: Mutex is blocking, so it may slow down the sending process. Consider using channels in rust:

Comment on lines +29 to +37
let mut queue = self.event_queue.lock().unwrap();
let mut batch = vec![];
while let Some(event) = queue.pop_front() {
batch.push(event);
if batch.len() >= self.batch_size {
break;
}
}
batch
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: this approach is somewhat slow as it will cause batch vector to re-allocate memory as it grows.

One way to fix that is to specify capacity when creating the vector, so it reserves the required memory in one go: https://doc.rust-lang.org/std/vec/struct.Vec.html#method.with_capacity

Another option is to use VecDeque::split_off.

(Both are probably irrelevant if we use channels.)

pub fn new(batch_size: usize) -> Self {
// clamp batch size between min and max
BatchEventProcessor {
batch_size: batch_size.clamp(MIN_BATCH_SIZE, MAX_BATCH_SIZE),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason to enforce this limit in core (i.e., internal library)?

Comment on lines +58 to +67
interval.tick().await;
let events_to_process = batch_processor.next_batch();
if !events_to_process.is_empty() {
EventDispatcher::deliver(&config.ingestion_url, &events_to_process).await;
} else {
// If no more events to deliver, break the loop
let mut is_active = active_flag.lock().unwrap();
*is_active = false;
break;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

major: what happens if events arrive faster than batch_size/interval_duration?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants