diff --git a/src/session/event.rs b/src/session/event.rs index 620ec11..964c5db 100644 --- a/src/session/event.rs +++ b/src/session/event.rs @@ -16,19 +16,36 @@ pub struct WatcherEvent<'a> { pub event_type: EventType, pub session_state: SessionState, pub path: &'a str, + pub zxid: i64, } impl<'a> Ref<'a> for WatcherEvent<'a> { type Value = WatchedEvent; fn to_value(&self) -> Self::Value { - WatchedEvent { event_type: self.event_type, session_state: self.session_state, path: self.path.to_owned() } + WatchedEvent { + event_type: self.event_type, + session_state: self.session_state, + path: self.path.to_owned(), + zxid: self.zxid, + } } } impl<'a> ToRef<'a, WatcherEvent<'a>> for WatchedEvent { fn to_ref(&'a self) -> WatcherEvent<'a> { - WatcherEvent { event_type: self.event_type, session_state: self.session_state, path: &self.path } + WatcherEvent { + event_type: self.event_type, + session_state: self.session_state, + path: &self.path, + zxid: self.zxid, + } + } +} + +impl WatcherEvent<'_> { + pub fn with_zxid(self, zxid: i64) -> Self { + Self { zxid, ..self } } } @@ -81,6 +98,6 @@ impl<'a> DeserializableRecord<'a> for WatcherEvent<'a> { let event_type = EventType::from_server(unsafe { buf.get_unchecked_i32() })?; let session_state = SessionState::from_server(unsafe { buf.get_unchecked_i32() })?; let path = record::unmarshal(buf)?; - Ok(WatcherEvent { event_type, session_state, path }) + Ok(WatcherEvent { event_type, session_state, path, zxid: WatchedEvent::NO_ZXID }) } } diff --git a/src/session/mod.rs b/src/session/mod.rs index e913575..d39e34c 100644 --- a/src/session/mod.rs +++ b/src/session/mod.rs @@ -210,9 +210,9 @@ impl Session { } } - fn handle_notification(&mut self, mut body: &[u8], depot: &mut Depot) -> Result<(), Error> { + fn handle_notification(&mut self, zxid: i64, mut body: &[u8], depot: &mut Depot) -> Result<(), Error> { let event = record::unmarshal_entity::(&"watch notification", &mut body)?; - self.watch_manager.dispatch_server_event(event, depot); + self.watch_manager.dispatch_server_event(event.with_zxid(zxid), depot); Ok(()) } @@ -281,7 +281,7 @@ impl Session { return Err(Error::AuthFailed); } if header.xid == PredefinedXid::Notification.into() { - self.handle_notification(body, depot)?; + self.handle_notification(header.zxid, body, depot)?; return Ok(()); } else if header.xid == PredefinedXid::Ping.into() { depot.pop_ping()?; diff --git a/src/session/types.rs b/src/session/types.rs index 77b4ebd..52f311c 100644 --- a/src/session/types.rs +++ b/src/session/types.rs @@ -68,6 +68,7 @@ impl SessionState { /// WatchedEvent represents update to watched node or session. #[derive(Clone, Debug, PartialEq, Eq)] +#[non_exhaustive] pub struct WatchedEvent { pub event_type: EventType, @@ -79,9 +80,35 @@ pub struct WatchedEvent { /// Node path from chroot. Empty if this is a state event. pub path: String, + + /// `zxid` of the transaction that triggered this node event, [WatchedEvent::NO_ZXID] for session event. + /// + /// # Notable behaviors + /// * This feature was shipped in 3.9.0, `zxid` wil be [WatchedEvent::NO_ZXID] for earlier versions. See [ZOOKEEPER-4655]. + /// * It is possible to receive multiple events with same `zxid` and even same `path` due to [MultiWriter]. See [ZOOKEEPER-4695]. + /// + /// [ZOOKEEPER-4655]: https://issues.apache.org/jira/browse/ZOOKEEPER-4655 + /// [ZOOKEEPER-4695]: https://issues.apache.org/jira/browse/ZOOKEEPER-4695 + pub zxid: i64, } impl WatchedEvent { + pub const NO_ZXID: i64 = -1; + + pub fn new(event: EventType, path: impl Into) -> Self { + debug_assert_ne!(event, EventType::Session); + Self { event_type: event, session_state: SessionState::SyncConnected, path: path.into(), zxid: Self::NO_ZXID } + } + + pub fn with_zxid(self, zxid: i64) -> Self { + debug_assert_ne!(self.event_type, EventType::Session); + Self { zxid, ..self } + } + + pub fn new_session(state: SessionState) -> Self { + Self { event_type: EventType::Session, session_state: state, path: Default::default(), zxid: Self::NO_ZXID } + } + pub(crate) fn drain_root_path(&mut self, root: &str) { if self.event_type != EventType::Session && !root.is_empty() { util::drain_root_path(&mut self.path, root).unwrap(); diff --git a/src/session/watch.rs b/src/session/watch.rs index e933732..b872036 100644 --- a/src/session/watch.rs +++ b/src/session/watch.rs @@ -9,7 +9,7 @@ use super::request::{Operation, SessionOperation, StateReceiver, StateResponser} use super::types::{EventType, SessionState, WatchMode, WatchedEvent}; use crate::error::Error; use crate::proto::{ErrorCode, OpCode, SetWatchesRequest}; -use crate::util::Ref; +use crate::util::{Ref, ToRef}; const SET_WATCHES_MAX_BYTES: usize = 128 * 1024; @@ -320,7 +320,8 @@ impl WatchManager { } pub fn dispatch_session_state(&mut self, state: SessionState) { - let event = WatcherEvent { event_type: EventType::Session, session_state: state, path: Default::default() }; + let _event = WatchedEvent::new_session(state); + let event = _event.to_ref(); self.watches.values_mut().for_each(|watch| { watch.send(&event, &mut self.watching_paths); }); diff --git a/tests/zookeeper.rs b/tests/zookeeper.rs index 90849bb..b85861d 100644 --- a/tests/zookeeper.rs +++ b/tests/zookeeper.rs @@ -1224,56 +1224,53 @@ async fn test_watcher_coexist_on_same_path() { let mut persistent_watcher = client.watch("/a", zk::AddWatchMode::Persistent).await.unwrap(); let mut recursive_watcher = client.watch("/a", zk::AddWatchMode::PersistentRecursive).await.unwrap(); - client.create("/a", &vec![], PERSISTENT_OPEN).await.unwrap(); + let (stat, _) = client.create("/a", &vec![], PERSISTENT_OPEN).await.unwrap(); - let event = exist_watcher.changed().await; - assert_that!(event.event_type).is_equal_to(zk::EventType::NodeCreated); - assert_that!(event.path).is_same_string_to("/a"); - - assert_that!(persistent_watcher.changed().await).is_equal_to(&event); - assert_that!(recursive_watcher.changed().await).is_equal_to(&event); + let expected = zk::WatchedEvent::new(zk::EventType::NodeCreated, "/a".to_string()).with_zxid(stat.czxid); + assert_that!(exist_watcher.changed().await).is_equal_to(&expected); + assert_that!(persistent_watcher.changed().await).is_equal_to(&expected); + assert_that!(recursive_watcher.changed().await).is_equal_to(&expected); let (_, _, data_watcher) = client.get_and_watch_data("/a").await.unwrap(); let (_, _, child_watcher) = client.get_and_watch_children("/a").await.unwrap(); let (_, exist_watcher) = client.check_and_watch_stat("/a").await.unwrap(); - client.create("/a/b", &vec![], PERSISTENT_OPEN).await.unwrap(); - let event = child_watcher.changed().await; - assert_that!(event.event_type).is_equal_to(zk::EventType::NodeChildrenChanged); - assert_that!(event.path).is_same_string_to("/a"); - assert_that!(persistent_watcher.changed().await).is_equal_to(&event); + let (stat, _) = client.create("/a/b", &vec![], PERSISTENT_OPEN).await.unwrap(); + let expected = zk::WatchedEvent::new(zk::EventType::NodeChildrenChanged, "/a".to_string()).with_zxid(stat.czxid); + assert_that!(child_watcher.changed().await).is_equal_to(&expected); + assert_that!(persistent_watcher.changed().await).is_equal_to(&expected); - let event = recursive_watcher.changed().await; - assert_that!(event.event_type).is_equal_to(zk::EventType::NodeCreated); - assert_that!(event.path).is_same_string_to("/a/b"); + let expected = zk::WatchedEvent::new(zk::EventType::NodeCreated, "/a/b".to_string()).with_zxid(stat.czxid); + assert_that!(recursive_watcher.changed().await).is_equal_to(&expected); let (_, _, child_watcher) = client.get_and_watch_children("/a").await.unwrap(); client.delete("/a/b", None).await.unwrap(); - let event = child_watcher.changed().await; - assert_that!(event.event_type).is_equal_to(zk::EventType::NodeChildrenChanged); - assert_that!(event.path).is_same_string_to("/a"); - assert_that!(persistent_watcher.changed().await).is_equal_to(&event); + let stat = client.check_stat("/a").await.unwrap().unwrap(); + + let expected = zk::WatchedEvent::new(zk::EventType::NodeChildrenChanged, "/a".to_string()).with_zxid(stat.pzxid); + assert_that!(child_watcher.changed().await).is_equal_to(&expected); + assert_that!(persistent_watcher.changed().await).is_equal_to(&expected); - let event = recursive_watcher.changed().await; - assert_that!(event.event_type).is_equal_to(zk::EventType::NodeDeleted); - assert_that!(event.path).is_same_string_to("/a/b"); + let expected = zk::WatchedEvent::new(zk::EventType::NodeDeleted, "/a/b".to_string()).with_zxid(stat.pzxid); + assert_that!(recursive_watcher.changed().await).is_equal_to(&expected); let (_, _, child_watcher) = client.get_and_watch_children("/a").await.unwrap(); client.delete("/a", None).await.unwrap(); - let event = child_watcher.changed().await; - assert_that!(event.event_type).is_equal_to(zk::EventType::NodeDeleted); - assert_that!(event.path).is_same_string_to("/a"); - assert_that!(data_watcher.changed().await).is_equal_to(&event); - assert_that!(exist_watcher.changed().await).is_equal_to(&event); - assert_that!(persistent_watcher.changed().await).is_equal_to(&event); - assert_that!(recursive_watcher.changed().await).is_equal_to(&event); + let stat = client.check_stat("/").await.unwrap().unwrap(); + let expected = zk::WatchedEvent::new(zk::EventType::NodeDeleted, "/a".to_string()).with_zxid(stat.pzxid); + assert_that!(child_watcher.changed().await).is_equal_to(&expected); + assert_that!(data_watcher.changed().await).is_equal_to(&expected); + assert_that!(exist_watcher.changed().await).is_equal_to(&expected); + assert_that!(persistent_watcher.changed().await).is_equal_to(&expected); + assert_that!(recursive_watcher.changed().await).is_equal_to(&expected); // persistent ones still exist - client.create("/a", &vec![], PERSISTENT_OPEN).await.unwrap(); - let event = persistent_watcher.changed().await; - assert_that!(recursive_watcher.changed().await).is_equal_to(&event); + let (stat, _) = client.create("/a", &vec![], PERSISTENT_OPEN).await.unwrap(); + let expected = zk::WatchedEvent::new(zk::EventType::NodeCreated, "/a".to_string()).with_zxid(stat.mzxid); + assert_that!(persistent_watcher.changed().await).is_equal_to(&expected); + assert_that!(recursive_watcher.changed().await).is_equal_to(&expected); } #[test_log::test(tokio::test)]