diff --git a/client/network/src/protocol/notifications/handler.rs b/client/network/src/protocol/notifications/handler.rs index 337c39e5f94ff..660a752c07c84 100644 --- a/client/network/src/protocol/notifications/handler.rs +++ b/client/network/src/protocol/notifications/handler.rs @@ -824,18 +824,22 @@ impl ConnectionHandler for NotifsHandler { #[cfg(test)] pub mod tests { use super::*; - // use crate::protocol::notifications::upgrade::{ - // NotificationsInOpen, NotificationsInSubstreamHandshake, NotificationsOutOpen, - // }; - // use asynchronous_codec::Framed; - // use libp2p::Multiaddr; - // use multistream_select::{dialer_select_proto, listener_select_proto, Negotiated, Version}; + use crate::protocol::notifications::upgrade::{ + NotificationsInOpen, NotificationsInSubstreamHandshake, NotificationsOutOpen, + }; + use asynchronous_codec::Framed; + use libp2p::{ + core::muxing::SubstreamBox, + swarm::{handler, StreamUpgradeError}, + Multiaddr, + }; + use multistream_select::{dialer_select_proto, listener_select_proto, Negotiated, Version}; use std::{ collections::HashMap, io::{Error, IoSlice, IoSliceMut}, }; use tokio::sync::mpsc; - // use unsigned_varint::codec::UviBytes; + use unsigned_varint::codec::UviBytes; struct OpenSubstream { notifications: stream::Peekable< @@ -934,20 +938,20 @@ pub mod tests { ) } - // /// Create new negotiated substream pair. - // pub async fn negotiated() -> (Negotiated, Negotiated) { - // let (socket1, socket2) = Self::new(); - // let socket1 = SubstreamBox::new(socket1); - // let socket2 = SubstreamBox::new(socket2); + /// Create new negotiated substream pair. + pub async fn negotiated() -> (Negotiated, Negotiated) { + let (socket1, socket2) = Self::new(); + let socket1 = SubstreamBox::new(socket1); + let socket2 = SubstreamBox::new(socket2); - // let protos = vec![b"/echo/1.0.0", b"/echo/2.5.0"]; - // let (res1, res2) = tokio::join!( - // dialer_select_proto(socket1, protos.clone(), Version::V1), - // listener_select_proto(socket2, protos), - // ); + let protos = vec![b"/echo/1.0.0", b"/echo/2.5.0"]; + let (res1, res2) = tokio::join!( + dialer_select_proto(socket1, protos.clone(), Version::V1), + listener_select_proto(socket2, protos), + ); - // (res1.unwrap().1, res2.unwrap().1) - // } + (res1.unwrap().1, res2.unwrap().1) + } } impl AsyncWrite for MockSubstream { @@ -1012,596 +1016,596 @@ pub mod tests { } } - // /// Create new [`NotifsHandler`]. - // fn notifs_handler() -> NotifsHandler { - // let proto = Protocol { - // config: ProtocolConfig { - // name: "/foo".into(), - // fallback_names: vec![], - // handshake: Arc::new(RwLock::new(b"hello, world".to_vec())), - // max_notification_size: u64::MAX, - // }, - // in_upgrade: NotificationsIn::new("/foo", Vec::new(), u64::MAX), - // state: State::Closed { pending_opening: false }, - // }; - - // NotifsHandler { - // protocols: vec![proto], - // when_connection_open: Instant::now(), - // endpoint: ConnectedPoint::Listener { - // local_addr: Multiaddr::empty(), - // send_back_addr: Multiaddr::empty(), - // }, - // peer_id: PeerId::random(), - // events_queue: VecDeque::new(), - // } - // } - - // // verify that if another substream is attempted to be opened by remote while an inbound - // // substream already exists, the new inbound stream is rejected and closed by the local node. - // #[tokio::test] - // async fn second_open_desired_by_remote_rejected() { - // let mut handler = notifs_handler(); - // let (io, mut io2) = MockSubstream::negotiated().await; - // let mut codec = UviBytes::default(); - // codec.set_max_len(usize::MAX); - - // let notif_in = NotificationsInOpen { - // handshake: b"hello, world".to_vec(), - // negotiated_fallback: None, - // substream: NotificationsInSubstream::new( - // Framed::new(io, codec), - // NotificationsInSubstreamHandshake::NotSent, - // ), - // }; - - // handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound( - // handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () }, - // )); - - // // verify that the substream is in (partly) opened state - // assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. })); - // futures::future::poll_fn(|cx| { - // let mut buf = Vec::with_capacity(512); - // assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending)); - // Poll::Ready(()) - // }) - // .await; - - // // attempt to open another inbound substream and verify that it is rejected - // let (io, mut io2) = MockSubstream::negotiated().await; - // let mut codec = UviBytes::default(); - // codec.set_max_len(usize::MAX); - - // let notif_in = NotificationsInOpen { - // handshake: b"hello, world".to_vec(), - // negotiated_fallback: None, - // substream: NotificationsInSubstream::new( - // Framed::new(io, codec), - // NotificationsInSubstreamHandshake::NotSent, - // ), - // }; - - // handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound( - // handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () }, - // )); - - // // verify that the new substream is rejected and closed - // futures::future::poll_fn(|cx| { - // let mut buf = Vec::with_capacity(512); - - // if let Poll::Ready(Err(err)) = Pin::new(&mut io2).poll_read(cx, &mut buf) { - // assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof,); - // } - - // Poll::Ready(()) - // }) - // .await; - // } - - // #[tokio::test] - // async fn open_rejected_if_substream_is_opening() { - // let mut handler = notifs_handler(); - // let (io, mut io2) = MockSubstream::negotiated().await; - // let mut codec = UviBytes::default(); - // codec.set_max_len(usize::MAX); - - // let notif_in = NotificationsInOpen { - // handshake: b"hello, world".to_vec(), - // negotiated_fallback: None, - // substream: NotificationsInSubstream::new( - // Framed::new(io, codec), - // NotificationsInSubstreamHandshake::NotSent, - // ), - // }; - - // handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound( - // handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () }, - // )); - - // // verify that the substream is in (partly) opened state - // assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. })); - // futures::future::poll_fn(|cx| { - // let mut buf = Vec::with_capacity(512); - // assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending)); - // Poll::Ready(()) - // }) - // .await; - - // // move the handler state to 'Opening' - // handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 }); - // assert!(std::matches!( - // handler.protocols[0].state, - // State::Opening { in_substream: Some(_) } - // )); - - // // remote now tries to open another substream, verify that it is rejected and closed - // let (io, mut io2) = MockSubstream::negotiated().await; - // let mut codec = UviBytes::default(); - // codec.set_max_len(usize::MAX); - - // let notif_in = NotificationsInOpen { - // handshake: b"hello, world".to_vec(), - // negotiated_fallback: None, - // substream: NotificationsInSubstream::new( - // Framed::new(io, codec), - // NotificationsInSubstreamHandshake::NotSent, - // ), - // }; - - // handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound( - // handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () }, - // )); - - // // verify that the new substream is rejected and closed but that the first substream is - // // still in correct state - // futures::future::poll_fn(|cx| { - // let mut buf = Vec::with_capacity(512); - - // if let Poll::Ready(Err(err)) = Pin::new(&mut io2).poll_read(cx, &mut buf) { - // assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof,); - // } else { - // panic!("unexpected result"); - // } - - // Poll::Ready(()) - // }) - // .await; - // assert!(std::matches!( - // handler.protocols[0].state, - // State::Opening { in_substream: Some(_) } - // )); - // } - - // #[tokio::test] - // async fn open_rejected_if_substream_already_open() { - // let mut handler = notifs_handler(); - // let (io, mut io2) = MockSubstream::negotiated().await; - // let mut codec = UviBytes::default(); - // codec.set_max_len(usize::MAX); - - // let notif_in = NotificationsInOpen { - // handshake: b"hello, world".to_vec(), - // negotiated_fallback: None, - // substream: NotificationsInSubstream::new( - // Framed::new(io, codec), - // NotificationsInSubstreamHandshake::NotSent, - // ), - // }; - // handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound( - // handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () }, - // )); - - // // verify that the substream is in (partly) opened state - // assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. })); - // futures::future::poll_fn(|cx| { - // let mut buf = Vec::with_capacity(512); - // assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending)); - // Poll::Ready(()) - // }) - // .await; - - // // move the handler state to 'Opening' - // handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 }); - // assert!(std::matches!( - // handler.protocols[0].state, - // State::Opening { in_substream: Some(_) } - // )); - - // // accept the substream and move its state to `Open` - // let (io, _io2) = MockSubstream::negotiated().await; - // let mut codec = UviBytes::default(); - // codec.set_max_len(usize::MAX); - - // let notif_out = NotificationsOutOpen { - // handshake: b"hello, world".to_vec(), - // negotiated_fallback: None, - // substream: NotificationsOutSubstream::new(Framed::new(io, codec)), - // }; - // handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedOutbound( - // handler::FullyNegotiatedOutbound { protocol: notif_out, info: 0 }, - // )); - - // assert!(std::matches!( - // handler.protocols[0].state, - // State::Open { in_substream: Some(_), .. } - // )); - - // // remote now tries to open another substream, verify that it is rejected and closed - // let (io, mut io2) = MockSubstream::negotiated().await; - // let mut codec = UviBytes::default(); - // codec.set_max_len(usize::MAX); - // let notif_in = NotificationsInOpen { - // handshake: b"hello, world".to_vec(), - // negotiated_fallback: None, - // substream: NotificationsInSubstream::new( - // Framed::new(io, codec), - // NotificationsInSubstreamHandshake::NotSent, - // ), - // }; - // handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound( - // handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () }, - // )); - - // // verify that the new substream is rejected and closed but that the first substream is - // // still in correct state - // futures::future::poll_fn(|cx| { - // let mut buf = Vec::with_capacity(512); - - // if let Poll::Ready(Err(err)) = Pin::new(&mut io2).poll_read(cx, &mut buf) { - // assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof); - // } else { - // panic!("unexpected result"); - // } - - // Poll::Ready(()) - // }) - // .await; - // assert!(std::matches!( - // handler.protocols[0].state, - // State::Open { in_substream: Some(_), .. } - // )); - // } - - // #[tokio::test] - // async fn fully_negotiated_resets_state_for_closed_substream() { - // let mut handler = notifs_handler(); - // let (io, mut io2) = MockSubstream::negotiated().await; - // let mut codec = UviBytes::default(); - // codec.set_max_len(usize::MAX); - - // let notif_in = NotificationsInOpen { - // handshake: b"hello, world".to_vec(), - // negotiated_fallback: None, - // substream: NotificationsInSubstream::new( - // Framed::new(io, codec), - // NotificationsInSubstreamHandshake::NotSent, - // ), - // }; - // handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound( - // handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () }, - // )); - - // // verify that the substream is in (partly) opened state - // assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. })); - // futures::future::poll_fn(|cx| { - // let mut buf = Vec::with_capacity(512); - // assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending)); - // Poll::Ready(()) - // }) - // .await; - - // // first instruct the handler to open a connection and then close it right after - // // so the handler is in state `Closed { pending_opening: true }` - // handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 }); - // assert!(std::matches!( - // handler.protocols[0].state, - // State::Opening { in_substream: Some(_) } - // )); - - // handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 }); - // assert!(std::matches!(handler.protocols[0].state, State::Closed { pending_opening: true })); - - // // verify that if the the outbound substream is successfully negotiated, the state is not - // // changed as the substream was commanded to be closed by the handler. - // let (io, _io2) = MockSubstream::negotiated().await; - // let mut codec = UviBytes::default(); - // codec.set_max_len(usize::MAX); - - // let notif_out = NotificationsOutOpen { - // handshake: b"hello, world".to_vec(), - // negotiated_fallback: None, - // substream: NotificationsOutSubstream::new(Framed::new(io, codec)), - // }; - // handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedOutbound( - // handler::FullyNegotiatedOutbound { protocol: notif_out, info: 0 }, - // )); - - // assert!(std::matches!( - // handler.protocols[0].state, - // State::Closed { pending_opening: false } - // )); - // } - - // #[tokio::test] - // async fn fully_negotiated_resets_state_for_open_desired_substream() { - // let mut handler = notifs_handler(); - // let (io, mut io2) = MockSubstream::negotiated().await; - // let mut codec = UviBytes::default(); - // codec.set_max_len(usize::MAX); - - // let notif_in = NotificationsInOpen { - // handshake: b"hello, world".to_vec(), - // negotiated_fallback: None, - // substream: NotificationsInSubstream::new( - // Framed::new(io, codec), - // NotificationsInSubstreamHandshake::NotSent, - // ), - // }; - // handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound( - // handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () }, - // )); - - // // verify that the substream is in (partly) opened state - // assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. })); - // futures::future::poll_fn(|cx| { - // let mut buf = Vec::with_capacity(512); - // assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending)); - // Poll::Ready(()) - // }) - // .await; - - // // first instruct the handler to open a connection and then close it right after - // // so the handler is in state `Closed { pending_opening: true }` - // handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 }); - // assert!(std::matches!( - // handler.protocols[0].state, - // State::Opening { in_substream: Some(_) } - // )); - - // handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 }); - // assert!(std::matches!(handler.protocols[0].state, State::Closed { pending_opening: true })); - - // // attempt to open another inbound substream and verify that it is rejected - // let (io, _io2) = MockSubstream::negotiated().await; - // let mut codec = UviBytes::default(); - // codec.set_max_len(usize::MAX); - - // let notif_in = NotificationsInOpen { - // handshake: b"hello, world".to_vec(), - // negotiated_fallback: None, - // substream: NotificationsInSubstream::new( - // Framed::new(io, codec), - // NotificationsInSubstreamHandshake::NotSent, - // ), - // }; - // handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound( - // handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () }, - // )); - - // assert!(std::matches!( - // handler.protocols[0].state, - // State::OpenDesiredByRemote { pending_opening: true, .. } - // )); - - // // verify that if the the outbound substream is successfully negotiated, the state is not - // // changed as the substream was commanded to be closed by the handler. - // let (io, _io2) = MockSubstream::negotiated().await; - // let mut codec = UviBytes::default(); - // codec.set_max_len(usize::MAX); - - // let notif_out = NotificationsOutOpen { - // handshake: b"hello, world".to_vec(), - // negotiated_fallback: None, - // substream: NotificationsOutSubstream::new(Framed::new(io, codec)), - // }; - - // handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedOutbound( - // handler::FullyNegotiatedOutbound { protocol: notif_out, info: 0 }, - // )); - - // assert!(std::matches!( - // handler.protocols[0].state, - // State::OpenDesiredByRemote { pending_opening: false, .. } - // )); - // } - - // #[tokio::test] - // async fn dial_upgrade_error_resets_closed_outbound_state() { - // let mut handler = notifs_handler(); - // let (io, mut io2) = MockSubstream::negotiated().await; - // let mut codec = UviBytes::default(); - // codec.set_max_len(usize::MAX); - - // let notif_in = NotificationsInOpen { - // handshake: b"hello, world".to_vec(), - // negotiated_fallback: None, - // substream: NotificationsInSubstream::new( - // Framed::new(io, codec), - // NotificationsInSubstreamHandshake::NotSent, - // ), - // }; - // handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound( - // handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () }, - // )); - - // // verify that the substream is in (partly) opened state - // assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. })); - // futures::future::poll_fn(|cx| { - // let mut buf = Vec::with_capacity(512); - // assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending)); - // Poll::Ready(()) - // }) - // .await; - - // // first instruct the handler to open a connection and then close it right after - // // so the handler is in state `Closed { pending_opening: true }` - // handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 }); - // assert!(std::matches!( - // handler.protocols[0].state, - // State::Opening { in_substream: Some(_) } - // )); - - // handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 }); - // assert!(std::matches!(handler.protocols[0].state, State::Closed { pending_opening: true })); - - // // inject dial failure to an already closed substream and verify outbound state is reset - // handler.on_connection_event(handler::ConnectionEvent::DialUpgradeError( - // handler::DialUpgradeError { info: 0, error: StreamUpgradeError::Timeout }, - // )); - // assert!(std::matches!( - // handler.protocols[0].state, - // State::Closed { pending_opening: false } - // )); - // } - - // #[tokio::test] - // async fn dial_upgrade_error_resets_open_desired_state() { - // let mut handler = notifs_handler(); - // let (io, mut io2) = MockSubstream::negotiated().await; - // let mut codec = UviBytes::default(); - // codec.set_max_len(usize::MAX); - - // let notif_in = NotificationsInOpen { - // handshake: b"hello, world".to_vec(), - // negotiated_fallback: None, - // substream: NotificationsInSubstream::new( - // Framed::new(io, codec), - // NotificationsInSubstreamHandshake::NotSent, - // ), - // }; - // handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound( - // handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () }, - // )); - - // // verify that the substream is in (partly) opened state - // assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. })); - // futures::future::poll_fn(|cx| { - // let mut buf = Vec::with_capacity(512); - // assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending)); - // Poll::Ready(()) - // }) - // .await; - - // // first instruct the handler to open a connection and then close it right after - // // so the handler is in state `Closed { pending_opening: true }` - // handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 }); - // assert!(std::matches!( - // handler.protocols[0].state, - // State::Opening { in_substream: Some(_) } - // )); - - // handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 }); - // assert!(std::matches!(handler.protocols[0].state, State::Closed { pending_opening: true })); - - // let (io, _io2) = MockSubstream::negotiated().await; - // let mut codec = UviBytes::default(); - // codec.set_max_len(usize::MAX); - - // let notif_in = NotificationsInOpen { - // handshake: b"hello, world".to_vec(), - // negotiated_fallback: None, - // substream: NotificationsInSubstream::new( - // Framed::new(io, codec), - // NotificationsInSubstreamHandshake::NotSent, - // ), - // }; - // handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound( - // handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () }, - // )); - - // assert!(std::matches!( - // handler.protocols[0].state, - // State::OpenDesiredByRemote { pending_opening: true, .. } - // )); - - // // inject dial failure to an already closed substream and verify outbound state is reset - // handler.on_connection_event(handler::ConnectionEvent::DialUpgradeError( - // handler::DialUpgradeError { info: 0, error: StreamUpgradeError::Timeout }, - // )); - // assert!(std::matches!( - // handler.protocols[0].state, - // State::OpenDesiredByRemote { pending_opening: false, .. } - // )); - // } - - // #[tokio::test] - // async fn sync_notifications_clogged() { - // let mut handler = notifs_handler(); - // let (io, _) = MockSubstream::negotiated().await; - // let codec = UviBytes::default(); - - // let (async_tx, async_rx) = futures::channel::mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE); - // let (sync_tx, sync_rx) = futures::channel::mpsc::channel(1); - // let notifications_sink = NotificationsSink { - // inner: Arc::new(NotificationsSinkInner { - // peer_id: PeerId::random(), - // async_channel: FuturesMutex::new(async_tx), - // sync_channel: Mutex::new(Some(sync_tx)), - // }), - // }; - - // handler.protocols[0].state = State::Open { - // notifications_sink_rx: stream::select(async_rx.fuse(), sync_rx.fuse()).peekable(), - // out_substream: Some(NotificationsOutSubstream::new(Framed::new(io, codec))), - // in_substream: None, - // }; - - // notifications_sink.send_sync_notification(vec![1, 3, 3, 7]); - // notifications_sink.send_sync_notification(vec![1, 3, 3, 8]); - // notifications_sink.send_sync_notification(vec![1, 3, 3, 9]); - // notifications_sink.send_sync_notification(vec![1, 3, 4, 0]); - - // futures::future::poll_fn(|cx| { - // assert!(std::matches!( - // handler.poll(cx), - // Poll::Ready(ConnectionHandlerEvent::Close( - // NotifsHandlerError::SyncNotificationsClogged, - // )) - // )); - // Poll::Ready(()) - // }) - // .await; - // } - - // #[tokio::test] - // async fn close_desired_by_remote() { - // let mut handler = notifs_handler(); - // let (io, io2) = MockSubstream::negotiated().await; - // let mut codec = UviBytes::default(); - // codec.set_max_len(usize::MAX); - - // let notif_in = NotificationsInOpen { - // handshake: b"hello, world".to_vec(), - // negotiated_fallback: None, - // substream: NotificationsInSubstream::new( - // Framed::new(io, codec), - // NotificationsInSubstreamHandshake::PendingSend(vec![1, 2, 3, 4]), - // ), - // }; - - // // add new inbound substream but close it immediately and verify that correct events are - // // emitted - // handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound( - // handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () }, - // )); - // drop(io2); - - // futures::future::poll_fn(|cx| { - // assert!(std::matches!( - // handler.poll(cx), - // Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( - // NotifsHandlerOut::OpenDesiredByRemote { protocol_index: 0 }, - // )) - // )); - // assert!(std::matches!( - // handler.poll(cx), - // Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( - // NotifsHandlerOut::CloseDesired { protocol_index: 0 }, - // )) - // )); - // Poll::Ready(()) - // }) - // .await; - // } + /// Create new [`NotifsHandler`]. + fn notifs_handler() -> NotifsHandler { + let proto = Protocol { + config: ProtocolConfig { + name: "/foo".into(), + fallback_names: vec![], + handshake: Arc::new(RwLock::new(b"hello, world".to_vec())), + max_notification_size: u64::MAX, + }, + in_upgrade: NotificationsIn::new("/foo", Vec::new(), u64::MAX), + state: State::Closed { pending_opening: false }, + }; + + NotifsHandler { + protocols: vec![proto], + when_connection_open: Instant::now(), + endpoint: ConnectedPoint::Listener { + local_addr: Multiaddr::empty(), + send_back_addr: Multiaddr::empty(), + }, + peer_id: PeerId::random(), + events_queue: VecDeque::new(), + } + } + + // verify that if another substream is attempted to be opened by remote while an inbound + // substream already exists, the new inbound stream is rejected and closed by the local node. + #[tokio::test] + async fn second_open_desired_by_remote_rejected() { + let mut handler = notifs_handler(); + let (io, mut io2) = MockSubstream::negotiated().await; + let mut codec = UviBytes::default(); + codec.set_max_len(usize::MAX); + + let notif_in = NotificationsInOpen { + handshake: b"hello, world".to_vec(), + negotiated_fallback: None, + substream: NotificationsInSubstream::new( + Framed::new(io, codec), + NotificationsInSubstreamHandshake::NotSent, + ), + }; + + handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound( + handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () }, + )); + + // verify that the substream is in (partly) opened state + assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. })); + futures::future::poll_fn(|cx| { + let mut buf = Vec::with_capacity(512); + assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending)); + Poll::Ready(()) + }) + .await; + + // attempt to open another inbound substream and verify that it is rejected + let (io, mut io2) = MockSubstream::negotiated().await; + let mut codec = UviBytes::default(); + codec.set_max_len(usize::MAX); + + let notif_in = NotificationsInOpen { + handshake: b"hello, world".to_vec(), + negotiated_fallback: None, + substream: NotificationsInSubstream::new( + Framed::new(io, codec), + NotificationsInSubstreamHandshake::NotSent, + ), + }; + + handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound( + handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () }, + )); + + // verify that the new substream is rejected and closed + futures::future::poll_fn(|cx| { + let mut buf = Vec::with_capacity(512); + + if let Poll::Ready(Err(err)) = Pin::new(&mut io2).poll_read(cx, &mut buf) { + assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof,); + } + + Poll::Ready(()) + }) + .await; + } + + #[tokio::test] + async fn open_rejected_if_substream_is_opening() { + let mut handler = notifs_handler(); + let (io, mut io2) = MockSubstream::negotiated().await; + let mut codec = UviBytes::default(); + codec.set_max_len(usize::MAX); + + let notif_in = NotificationsInOpen { + handshake: b"hello, world".to_vec(), + negotiated_fallback: None, + substream: NotificationsInSubstream::new( + Framed::new(io, codec), + NotificationsInSubstreamHandshake::NotSent, + ), + }; + + handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound( + handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () }, + )); + + // verify that the substream is in (partly) opened state + assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. })); + futures::future::poll_fn(|cx| { + let mut buf = Vec::with_capacity(512); + assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending)); + Poll::Ready(()) + }) + .await; + + // move the handler state to 'Opening' + handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 }); + assert!(std::matches!( + handler.protocols[0].state, + State::Opening { in_substream: Some(_) } + )); + + // remote now tries to open another substream, verify that it is rejected and closed + let (io, mut io2) = MockSubstream::negotiated().await; + let mut codec = UviBytes::default(); + codec.set_max_len(usize::MAX); + + let notif_in = NotificationsInOpen { + handshake: b"hello, world".to_vec(), + negotiated_fallback: None, + substream: NotificationsInSubstream::new( + Framed::new(io, codec), + NotificationsInSubstreamHandshake::NotSent, + ), + }; + + handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound( + handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () }, + )); + + // verify that the new substream is rejected and closed but that the first substream is + // still in correct state + futures::future::poll_fn(|cx| { + let mut buf = Vec::with_capacity(512); + + if let Poll::Ready(Err(err)) = Pin::new(&mut io2).poll_read(cx, &mut buf) { + assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof,); + } else { + panic!("unexpected result"); + } + + Poll::Ready(()) + }) + .await; + assert!(std::matches!( + handler.protocols[0].state, + State::Opening { in_substream: Some(_) } + )); + } + + #[tokio::test] + async fn open_rejected_if_substream_already_open() { + let mut handler = notifs_handler(); + let (io, mut io2) = MockSubstream::negotiated().await; + let mut codec = UviBytes::default(); + codec.set_max_len(usize::MAX); + + let notif_in = NotificationsInOpen { + handshake: b"hello, world".to_vec(), + negotiated_fallback: None, + substream: NotificationsInSubstream::new( + Framed::new(io, codec), + NotificationsInSubstreamHandshake::NotSent, + ), + }; + handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound( + handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () }, + )); + + // verify that the substream is in (partly) opened state + assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. })); + futures::future::poll_fn(|cx| { + let mut buf = Vec::with_capacity(512); + assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending)); + Poll::Ready(()) + }) + .await; + + // move the handler state to 'Opening' + handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 }); + assert!(std::matches!( + handler.protocols[0].state, + State::Opening { in_substream: Some(_) } + )); + + // accept the substream and move its state to `Open` + let (io, _io2) = MockSubstream::negotiated().await; + let mut codec = UviBytes::default(); + codec.set_max_len(usize::MAX); + + let notif_out = NotificationsOutOpen { + handshake: b"hello, world".to_vec(), + negotiated_fallback: None, + substream: NotificationsOutSubstream::new(Framed::new(io, codec)), + }; + handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedOutbound( + handler::FullyNegotiatedOutbound { protocol: notif_out, info: 0 }, + )); + + assert!(std::matches!( + handler.protocols[0].state, + State::Open { in_substream: Some(_), .. } + )); + + // remote now tries to open another substream, verify that it is rejected and closed + let (io, mut io2) = MockSubstream::negotiated().await; + let mut codec = UviBytes::default(); + codec.set_max_len(usize::MAX); + let notif_in = NotificationsInOpen { + handshake: b"hello, world".to_vec(), + negotiated_fallback: None, + substream: NotificationsInSubstream::new( + Framed::new(io, codec), + NotificationsInSubstreamHandshake::NotSent, + ), + }; + handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound( + handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () }, + )); + + // verify that the new substream is rejected and closed but that the first substream is + // still in correct state + futures::future::poll_fn(|cx| { + let mut buf = Vec::with_capacity(512); + + if let Poll::Ready(Err(err)) = Pin::new(&mut io2).poll_read(cx, &mut buf) { + assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof); + } else { + panic!("unexpected result"); + } + + Poll::Ready(()) + }) + .await; + assert!(std::matches!( + handler.protocols[0].state, + State::Open { in_substream: Some(_), .. } + )); + } + + #[tokio::test] + async fn fully_negotiated_resets_state_for_closed_substream() { + let mut handler = notifs_handler(); + let (io, mut io2) = MockSubstream::negotiated().await; + let mut codec = UviBytes::default(); + codec.set_max_len(usize::MAX); + + let notif_in = NotificationsInOpen { + handshake: b"hello, world".to_vec(), + negotiated_fallback: None, + substream: NotificationsInSubstream::new( + Framed::new(io, codec), + NotificationsInSubstreamHandshake::NotSent, + ), + }; + handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound( + handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () }, + )); + + // verify that the substream is in (partly) opened state + assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. })); + futures::future::poll_fn(|cx| { + let mut buf = Vec::with_capacity(512); + assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending)); + Poll::Ready(()) + }) + .await; + + // first instruct the handler to open a connection and then close it right after + // so the handler is in state `Closed { pending_opening: true }` + handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 }); + assert!(std::matches!( + handler.protocols[0].state, + State::Opening { in_substream: Some(_) } + )); + + handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 }); + assert!(std::matches!(handler.protocols[0].state, State::Closed { pending_opening: true })); + + // verify that if the the outbound substream is successfully negotiated, the state is not + // changed as the substream was commanded to be closed by the handler. + let (io, _io2) = MockSubstream::negotiated().await; + let mut codec = UviBytes::default(); + codec.set_max_len(usize::MAX); + + let notif_out = NotificationsOutOpen { + handshake: b"hello, world".to_vec(), + negotiated_fallback: None, + substream: NotificationsOutSubstream::new(Framed::new(io, codec)), + }; + handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedOutbound( + handler::FullyNegotiatedOutbound { protocol: notif_out, info: 0 }, + )); + + assert!(std::matches!( + handler.protocols[0].state, + State::Closed { pending_opening: false } + )); + } + + #[tokio::test] + async fn fully_negotiated_resets_state_for_open_desired_substream() { + let mut handler = notifs_handler(); + let (io, mut io2) = MockSubstream::negotiated().await; + let mut codec = UviBytes::default(); + codec.set_max_len(usize::MAX); + + let notif_in = NotificationsInOpen { + handshake: b"hello, world".to_vec(), + negotiated_fallback: None, + substream: NotificationsInSubstream::new( + Framed::new(io, codec), + NotificationsInSubstreamHandshake::NotSent, + ), + }; + handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound( + handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () }, + )); + + // verify that the substream is in (partly) opened state + assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. })); + futures::future::poll_fn(|cx| { + let mut buf = Vec::with_capacity(512); + assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending)); + Poll::Ready(()) + }) + .await; + + // first instruct the handler to open a connection and then close it right after + // so the handler is in state `Closed { pending_opening: true }` + handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 }); + assert!(std::matches!( + handler.protocols[0].state, + State::Opening { in_substream: Some(_) } + )); + + handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 }); + assert!(std::matches!(handler.protocols[0].state, State::Closed { pending_opening: true })); + + // attempt to open another inbound substream and verify that it is rejected + let (io, _io2) = MockSubstream::negotiated().await; + let mut codec = UviBytes::default(); + codec.set_max_len(usize::MAX); + + let notif_in = NotificationsInOpen { + handshake: b"hello, world".to_vec(), + negotiated_fallback: None, + substream: NotificationsInSubstream::new( + Framed::new(io, codec), + NotificationsInSubstreamHandshake::NotSent, + ), + }; + handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound( + handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () }, + )); + + assert!(std::matches!( + handler.protocols[0].state, + State::OpenDesiredByRemote { pending_opening: true, .. } + )); + + // verify that if the the outbound substream is successfully negotiated, the state is not + // changed as the substream was commanded to be closed by the handler. + let (io, _io2) = MockSubstream::negotiated().await; + let mut codec = UviBytes::default(); + codec.set_max_len(usize::MAX); + + let notif_out = NotificationsOutOpen { + handshake: b"hello, world".to_vec(), + negotiated_fallback: None, + substream: NotificationsOutSubstream::new(Framed::new(io, codec)), + }; + + handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedOutbound( + handler::FullyNegotiatedOutbound { protocol: notif_out, info: 0 }, + )); + + assert!(std::matches!( + handler.protocols[0].state, + State::OpenDesiredByRemote { pending_opening: false, .. } + )); + } + + #[tokio::test] + async fn dial_upgrade_error_resets_closed_outbound_state() { + let mut handler = notifs_handler(); + let (io, mut io2) = MockSubstream::negotiated().await; + let mut codec = UviBytes::default(); + codec.set_max_len(usize::MAX); + + let notif_in = NotificationsInOpen { + handshake: b"hello, world".to_vec(), + negotiated_fallback: None, + substream: NotificationsInSubstream::new( + Framed::new(io, codec), + NotificationsInSubstreamHandshake::NotSent, + ), + }; + handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound( + handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () }, + )); + + // verify that the substream is in (partly) opened state + assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. })); + futures::future::poll_fn(|cx| { + let mut buf = Vec::with_capacity(512); + assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending)); + Poll::Ready(()) + }) + .await; + + // first instruct the handler to open a connection and then close it right after + // so the handler is in state `Closed { pending_opening: true }` + handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 }); + assert!(std::matches!( + handler.protocols[0].state, + State::Opening { in_substream: Some(_) } + )); + + handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 }); + assert!(std::matches!(handler.protocols[0].state, State::Closed { pending_opening: true })); + + // inject dial failure to an already closed substream and verify outbound state is reset + handler.on_connection_event(handler::ConnectionEvent::DialUpgradeError( + handler::DialUpgradeError { info: 0, error: StreamUpgradeError::Timeout }, + )); + assert!(std::matches!( + handler.protocols[0].state, + State::Closed { pending_opening: false } + )); + } + + #[tokio::test] + async fn dial_upgrade_error_resets_open_desired_state() { + let mut handler = notifs_handler(); + let (io, mut io2) = MockSubstream::negotiated().await; + let mut codec = UviBytes::default(); + codec.set_max_len(usize::MAX); + + let notif_in = NotificationsInOpen { + handshake: b"hello, world".to_vec(), + negotiated_fallback: None, + substream: NotificationsInSubstream::new( + Framed::new(io, codec), + NotificationsInSubstreamHandshake::NotSent, + ), + }; + handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound( + handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () }, + )); + + // verify that the substream is in (partly) opened state + assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. })); + futures::future::poll_fn(|cx| { + let mut buf = Vec::with_capacity(512); + assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending)); + Poll::Ready(()) + }) + .await; + + // first instruct the handler to open a connection and then close it right after + // so the handler is in state `Closed { pending_opening: true }` + handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 }); + assert!(std::matches!( + handler.protocols[0].state, + State::Opening { in_substream: Some(_) } + )); + + handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 }); + assert!(std::matches!(handler.protocols[0].state, State::Closed { pending_opening: true })); + + let (io, _io2) = MockSubstream::negotiated().await; + let mut codec = UviBytes::default(); + codec.set_max_len(usize::MAX); + + let notif_in = NotificationsInOpen { + handshake: b"hello, world".to_vec(), + negotiated_fallback: None, + substream: NotificationsInSubstream::new( + Framed::new(io, codec), + NotificationsInSubstreamHandshake::NotSent, + ), + }; + handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound( + handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () }, + )); + + assert!(std::matches!( + handler.protocols[0].state, + State::OpenDesiredByRemote { pending_opening: true, .. } + )); + + // inject dial failure to an already closed substream and verify outbound state is reset + handler.on_connection_event(handler::ConnectionEvent::DialUpgradeError( + handler::DialUpgradeError { info: 0, error: StreamUpgradeError::Timeout }, + )); + assert!(std::matches!( + handler.protocols[0].state, + State::OpenDesiredByRemote { pending_opening: false, .. } + )); + } + + #[tokio::test] + async fn sync_notifications_clogged() { + let mut handler = notifs_handler(); + let (io, _) = MockSubstream::negotiated().await; + let codec = UviBytes::default(); + + let (async_tx, async_rx) = futures::channel::mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE); + let (sync_tx, sync_rx) = futures::channel::mpsc::channel(1); + let notifications_sink = NotificationsSink { + inner: Arc::new(NotificationsSinkInner { + peer_id: PeerId::random(), + async_channel: FuturesMutex::new(async_tx), + sync_channel: Mutex::new(Some(sync_tx)), + }), + }; + + handler.protocols[0].state = State::Open { + notifications_sink_rx: stream::select(async_rx.fuse(), sync_rx.fuse()).peekable(), + out_substream: Some(NotificationsOutSubstream::new(Framed::new(io, codec))), + in_substream: None, + }; + + notifications_sink.send_sync_notification(vec![1, 3, 3, 7]); + notifications_sink.send_sync_notification(vec![1, 3, 3, 8]); + notifications_sink.send_sync_notification(vec![1, 3, 3, 9]); + notifications_sink.send_sync_notification(vec![1, 3, 4, 0]); + + futures::future::poll_fn(|cx| { + assert!(std::matches!( + handler.poll(cx), + Poll::Ready(ConnectionHandlerEvent::Close( + NotifsHandlerError::SyncNotificationsClogged, + )) + )); + Poll::Ready(()) + }) + .await; + } + + #[tokio::test] + async fn close_desired_by_remote() { + let mut handler = notifs_handler(); + let (io, io2) = MockSubstream::negotiated().await; + let mut codec = UviBytes::default(); + codec.set_max_len(usize::MAX); + + let notif_in = NotificationsInOpen { + handshake: b"hello, world".to_vec(), + negotiated_fallback: None, + substream: NotificationsInSubstream::new( + Framed::new(io, codec), + NotificationsInSubstreamHandshake::PendingSend(vec![1, 2, 3, 4]), + ), + }; + + // add new inbound substream but close it immediately and verify that correct events are + // emitted + handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound( + handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () }, + )); + drop(io2); + + futures::future::poll_fn(|cx| { + assert!(std::matches!( + handler.poll(cx), + Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + NotifsHandlerOut::OpenDesiredByRemote { protocol_index: 0 }, + )) + )); + assert!(std::matches!( + handler.poll(cx), + Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + NotifsHandlerOut::CloseDesired { protocol_index: 0 }, + )) + )); + Poll::Ready(()) + }) + .await; + } } diff --git a/client/network/src/protocol/notifications/upgrade/notifications.rs b/client/network/src/protocol/notifications/upgrade/notifications.rs index 0148fd22bd883..83228f33b8160 100644 --- a/client/network/src/protocol/notifications/upgrade/notifications.rs +++ b/client/network/src/protocol/notifications/upgrade/notifications.rs @@ -114,6 +114,13 @@ pub struct NotificationsOutSubstream { socket: Framed>>>, } +#[cfg(test)] +impl NotificationsOutSubstream { + pub fn new(socket: Framed>>>) -> Self { + Self { socket } + } +} + impl NotificationsIn { /// Builds a new potential upgrade. pub fn new( @@ -196,13 +203,13 @@ impl NotificationsInSubstream where TSubstream: AsyncRead + AsyncWrite + Unpin, { - // #[cfg(test)] - // pub fn new( - // socket: Framed>>>, - // handshake: NotificationsInSubstreamHandshake, - // ) -> Self { - // Self { socket, handshake } - // } + #[cfg(test)] + pub fn new( + socket: Framed>>>, + handshake: NotificationsInSubstreamHandshake, + ) -> Self { + Self { socket, handshake } + } /// Sends the handshake in order to inform the remote that we accept the substream. pub fn send_handshake(&mut self, message: impl Into>) {