Skip to content

Commit

Permalink
feat!: carry zxid of transaction that triggering WatchedEvent
Browse files Browse the repository at this point in the history
  • Loading branch information
kezhuw committed Aug 31, 2023
1 parent 3ec27ad commit b45d138
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 40 deletions.
23 changes: 20 additions & 3 deletions src/session/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,36 @@ pub struct WatcherEvent<'a> {
pub event_type: EventType,
pub session_state: SessionState,
pub path: &'a str,
pub zxid: i64,
}

impl<'a> Ref<'a> for WatcherEvent<'a> {
type Value = WatchedEvent;

fn to_value(&self) -> Self::Value {
WatchedEvent { event_type: self.event_type, session_state: self.session_state, path: self.path.to_owned() }
WatchedEvent {
event_type: self.event_type,
session_state: self.session_state,
path: self.path.to_owned(),
zxid: self.zxid,
}
}
}

impl<'a> ToRef<'a, WatcherEvent<'a>> for WatchedEvent {
fn to_ref(&'a self) -> WatcherEvent<'a> {
WatcherEvent { event_type: self.event_type, session_state: self.session_state, path: &self.path }
WatcherEvent {
event_type: self.event_type,
session_state: self.session_state,
path: &self.path,
zxid: self.zxid,
}
}
}

impl WatcherEvent<'_> {
pub fn with_zxid(self, zxid: i64) -> Self {
Self { zxid, ..self }
}
}

Expand Down Expand Up @@ -81,6 +98,6 @@ impl<'a> DeserializableRecord<'a> for WatcherEvent<'a> {
let event_type = EventType::from_server(unsafe { buf.get_unchecked_i32() })?;
let session_state = SessionState::from_server(unsafe { buf.get_unchecked_i32() })?;
let path = record::unmarshal(buf)?;
Ok(WatcherEvent { event_type, session_state, path })
Ok(WatcherEvent { event_type, session_state, path, zxid: WatchedEvent::NO_ZXID })
}
}
6 changes: 3 additions & 3 deletions src/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,9 @@ impl Session {
}
}

fn handle_notification(&mut self, mut body: &[u8], depot: &mut Depot) -> Result<(), Error> {
fn handle_notification(&mut self, zxid: i64, mut body: &[u8], depot: &mut Depot) -> Result<(), Error> {
let event = record::unmarshal_entity::<WatcherEvent>(&"watch notification", &mut body)?;
self.watch_manager.dispatch_server_event(event, depot);
self.watch_manager.dispatch_server_event(event.with_zxid(zxid), depot);
Ok(())
}

Expand Down Expand Up @@ -281,7 +281,7 @@ impl Session {
return Err(Error::AuthFailed);
}
if header.xid == PredefinedXid::Notification.into() {
self.handle_notification(body, depot)?;
self.handle_notification(header.zxid, body, depot)?;
return Ok(());
} else if header.xid == PredefinedXid::Ping.into() {
depot.pop_ping()?;
Expand Down
27 changes: 27 additions & 0 deletions src/session/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ impl SessionState {

/// WatchedEvent represents update to watched node or session.
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub struct WatchedEvent {
pub event_type: EventType,

Expand All @@ -79,9 +80,35 @@ pub struct WatchedEvent {

/// Node path from chroot. Empty if this is a state event.
pub path: String,

/// `zxid` of the transaction that triggered this node event, [WatchedEvent::NO_ZXID] for session event.
///
/// # Notable behaviors
/// * This feature was shipped in 3.9.0, `zxid` wil be [WatchedEvent::NO_ZXID] for earlier versions. See [ZOOKEEPER-4655].
/// * It is possible to receive multiple events with same `zxid` and even same `path` due to [MultiWriter]. See [ZOOKEEPER-4695].
///
/// [ZOOKEEPER-4655]: https://issues.apache.org/jira/browse/ZOOKEEPER-4655
/// [ZOOKEEPER-4695]: https://issues.apache.org/jira/browse/ZOOKEEPER-4695
pub zxid: i64,
}

impl WatchedEvent {
pub const NO_ZXID: i64 = -1;

pub fn new(event: EventType, path: impl Into<String>) -> Self {
debug_assert_ne!(event, EventType::Session);
Self { event_type: event, session_state: SessionState::SyncConnected, path: path.into(), zxid: Self::NO_ZXID }
}

pub fn with_zxid(self, zxid: i64) -> Self {
debug_assert_ne!(self.event_type, EventType::Session);
Self { zxid, ..self }
}

pub fn new_session(state: SessionState) -> Self {
Self { event_type: EventType::Session, session_state: state, path: Default::default(), zxid: Self::NO_ZXID }
}

pub(crate) fn drain_root_path(&mut self, root: &str) {
if self.event_type != EventType::Session && !root.is_empty() {
util::drain_root_path(&mut self.path, root).unwrap();
Expand Down
5 changes: 3 additions & 2 deletions src/session/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use super::request::{Operation, SessionOperation, StateReceiver, StateResponser}
use super::types::{EventType, SessionState, WatchMode, WatchedEvent};
use crate::error::Error;
use crate::proto::{ErrorCode, OpCode, SetWatchesRequest};
use crate::util::Ref;
use crate::util::{Ref, ToRef};

const SET_WATCHES_MAX_BYTES: usize = 128 * 1024;

Expand Down Expand Up @@ -320,7 +320,8 @@ impl WatchManager {
}

pub fn dispatch_session_state(&mut self, state: SessionState) {
let event = WatcherEvent { event_type: EventType::Session, session_state: state, path: Default::default() };
let _event = WatchedEvent::new_session(state);
let event = _event.to_ref();
self.watches.values_mut().for_each(|watch| {
watch.send(&event, &mut self.watching_paths);
});
Expand Down
61 changes: 29 additions & 32 deletions tests/zookeeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1224,56 +1224,53 @@ async fn test_watcher_coexist_on_same_path() {
let mut persistent_watcher = client.watch("/a", zk::AddWatchMode::Persistent).await.unwrap();
let mut recursive_watcher = client.watch("/a", zk::AddWatchMode::PersistentRecursive).await.unwrap();

client.create("/a", &vec![], PERSISTENT_OPEN).await.unwrap();
let (stat, _) = client.create("/a", &vec![], PERSISTENT_OPEN).await.unwrap();

let event = exist_watcher.changed().await;
assert_that!(event.event_type).is_equal_to(zk::EventType::NodeCreated);
assert_that!(event.path).is_same_string_to("/a");

assert_that!(persistent_watcher.changed().await).is_equal_to(&event);
assert_that!(recursive_watcher.changed().await).is_equal_to(&event);
let expected = zk::WatchedEvent::new(zk::EventType::NodeCreated, "/a".to_string()).with_zxid(stat.czxid);
assert_that!(exist_watcher.changed().await).is_equal_to(&expected);
assert_that!(persistent_watcher.changed().await).is_equal_to(&expected);
assert_that!(recursive_watcher.changed().await).is_equal_to(&expected);

let (_, _, data_watcher) = client.get_and_watch_data("/a").await.unwrap();
let (_, _, child_watcher) = client.get_and_watch_children("/a").await.unwrap();
let (_, exist_watcher) = client.check_and_watch_stat("/a").await.unwrap();

client.create("/a/b", &vec![], PERSISTENT_OPEN).await.unwrap();
let event = child_watcher.changed().await;
assert_that!(event.event_type).is_equal_to(zk::EventType::NodeChildrenChanged);
assert_that!(event.path).is_same_string_to("/a");
assert_that!(persistent_watcher.changed().await).is_equal_to(&event);
let (stat, _) = client.create("/a/b", &vec![], PERSISTENT_OPEN).await.unwrap();
let expected = zk::WatchedEvent::new(zk::EventType::NodeChildrenChanged, "/a".to_string()).with_zxid(stat.czxid);
assert_that!(child_watcher.changed().await).is_equal_to(&expected);
assert_that!(persistent_watcher.changed().await).is_equal_to(&expected);

let event = recursive_watcher.changed().await;
assert_that!(event.event_type).is_equal_to(zk::EventType::NodeCreated);
assert_that!(event.path).is_same_string_to("/a/b");
let expected = zk::WatchedEvent::new(zk::EventType::NodeCreated, "/a/b".to_string()).with_zxid(stat.czxid);
assert_that!(recursive_watcher.changed().await).is_equal_to(&expected);

let (_, _, child_watcher) = client.get_and_watch_children("/a").await.unwrap();

client.delete("/a/b", None).await.unwrap();
let event = child_watcher.changed().await;
assert_that!(event.event_type).is_equal_to(zk::EventType::NodeChildrenChanged);
assert_that!(event.path).is_same_string_to("/a");
assert_that!(persistent_watcher.changed().await).is_equal_to(&event);
let stat = client.check_stat("/a").await.unwrap().unwrap();

let expected = zk::WatchedEvent::new(zk::EventType::NodeChildrenChanged, "/a".to_string()).with_zxid(stat.pzxid);
assert_that!(child_watcher.changed().await).is_equal_to(&expected);
assert_that!(persistent_watcher.changed().await).is_equal_to(&expected);

let event = recursive_watcher.changed().await;
assert_that!(event.event_type).is_equal_to(zk::EventType::NodeDeleted);
assert_that!(event.path).is_same_string_to("/a/b");
let expected = zk::WatchedEvent::new(zk::EventType::NodeDeleted, "/a/b".to_string()).with_zxid(stat.pzxid);
assert_that!(recursive_watcher.changed().await).is_equal_to(&expected);

let (_, _, child_watcher) = client.get_and_watch_children("/a").await.unwrap();

client.delete("/a", None).await.unwrap();
let event = child_watcher.changed().await;
assert_that!(event.event_type).is_equal_to(zk::EventType::NodeDeleted);
assert_that!(event.path).is_same_string_to("/a");
assert_that!(data_watcher.changed().await).is_equal_to(&event);
assert_that!(exist_watcher.changed().await).is_equal_to(&event);
assert_that!(persistent_watcher.changed().await).is_equal_to(&event);
assert_that!(recursive_watcher.changed().await).is_equal_to(&event);
let stat = client.check_stat("/").await.unwrap().unwrap();
let expected = zk::WatchedEvent::new(zk::EventType::NodeDeleted, "/a".to_string()).with_zxid(stat.pzxid);
assert_that!(child_watcher.changed().await).is_equal_to(&expected);
assert_that!(data_watcher.changed().await).is_equal_to(&expected);
assert_that!(exist_watcher.changed().await).is_equal_to(&expected);
assert_that!(persistent_watcher.changed().await).is_equal_to(&expected);
assert_that!(recursive_watcher.changed().await).is_equal_to(&expected);

// persistent ones still exist
client.create("/a", &vec![], PERSISTENT_OPEN).await.unwrap();
let event = persistent_watcher.changed().await;
assert_that!(recursive_watcher.changed().await).is_equal_to(&event);
let (stat, _) = client.create("/a", &vec![], PERSISTENT_OPEN).await.unwrap();
let expected = zk::WatchedEvent::new(zk::EventType::NodeCreated, "/a".to_string()).with_zxid(stat.mzxid);
assert_that!(persistent_watcher.changed().await).is_equal_to(&expected);
assert_that!(recursive_watcher.changed().await).is_equal_to(&expected);
}

#[test_log::test(tokio::test)]
Expand Down

0 comments on commit b45d138

Please sign in to comment.