-
Notifications
You must be signed in to change notification settings - Fork 985
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
swarm: Add FromFn
ConnectionHandler
#2852
Conversation
I looked at using this implementation for the ping protocol and I think it would be doable apart from I thing: Overriding Thoughts? |
Agreed. See #2778. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have to give this abstraction more thought. I find this abstraction to be very similar to libp2p-request-response
. I have to think about whether it makes sense to have this next to libp2p-request-response
, have this replace libp2p-request-response
or have libp2p-request-response
build on top of this.
My preference would be to first see if we can successfully use this abstraction within our monorepo, e.g. have In addition, I'd like to somehow survey people to see how they use it / whether they know about this abstraction. Perhaps more examples can also help with that. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 I like the general idea.
I tried using this locally for rendezvous and we would have to use this mechanism to push the registration data from the behaviour down to the connections.
Did you manage to implement the rendezvous or any other protocol completely with this handler?
Is the name any good?
No strong opinion. Maybe FromUpgrade
would describe it a bit better?
@mxinden @elenaf9 What do you think of adding
NotifyHandler::All
?
I don't see a reason not to add it, and updating the state of all handlers sounds like a valid use-case, so I'm in favor of it.
I don't have a working example yet:
|
FromFn
ConnectionHandler
FromFn
ConnectionHandler
Note that this would require |
Thanks for linking this. I guess we've come full circle. I was thinking that it will require To make this handler really useful, we'd need some kind of helper that sends an action to all connections. |
This allows us to capture the `remote_peer_id` and `connection_point` of the connection.
Another update on this: I've introduced a builder pattern to builder the initial handler. This makes it much more convenient to configure a handler that doesn't accept inbound streams for example or will never make outbound streams. A lot of this is now also enforced in the type system. I am gonna try to full convert At the moment, we can't convert |
This clone is not very expensive and offers a huge ergonomic improvement.
b70272d
to
c15433f
Compare
The latest patches introduce the notion of a I think this is getting close to fulfilling the requirements of #2657. |
swarm/src/handler/from_fn.rs
Outdated
pub fn register_connection(&mut self, peer_id: PeerId, id: ConnectionId) { | ||
self.connections.insert((peer_id, id)); | ||
} | ||
|
||
pub fn unregister_connection(&mut self, peer_id: PeerId, id: ConnectionId) { | ||
self.connections.remove(&(peer_id, id)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once #2832 is fixed, we can make this more ergonomic by just requiring a reference to the FromSwarm
event and we can pick out the data as we need.
Rendezvous is now also fully migrated and functional. |
protocols/rendezvous/src/server.rs
Outdated
from_fn::OutEvent::InboundEmitted(Ok(Some(InboundOutEvent::NewRegistration( | ||
new_registration, | ||
)))) => { | ||
let (registration, expiry) = self.registrations.add(new_registration); | ||
self.next_expiry.push(expiry); | ||
|
||
vec![NetworkBehaviourAction::GenerateEvent( | ||
Event::PeerRegistered { peer, registration }, | ||
)] | ||
} | ||
from_fn::OutEvent::InboundEmitted(Ok(Some(InboundOutEvent::RegistrationFailed { | ||
error, | ||
namespace, | ||
}))) => { | ||
vec![NetworkBehaviourAction::GenerateEvent( | ||
Event::PeerNotRegistered { | ||
peer, | ||
error, | ||
namespace, | ||
}, | ||
)] | ||
} | ||
from_fn::OutEvent::InboundEmitted(Ok(Some(InboundOutEvent::Discovered { | ||
cookie, | ||
registrations, | ||
previous_registrations, | ||
}))) => { | ||
self.registrations | ||
.cookies | ||
.insert(cookie, previous_registrations); | ||
|
||
vec![NetworkBehaviourAction::GenerateEvent( | ||
Event::DiscoverServed { | ||
enquirer: peer, | ||
registrations, | ||
}, | ||
)] | ||
} | ||
from_fn::OutEvent::InboundEmitted(Ok(Some(InboundOutEvent::DiscoverFailed { | ||
error, | ||
}))) => { | ||
vec![NetworkBehaviourAction::GenerateEvent( | ||
Event::DiscoverNotServed { | ||
enquirer: peer, | ||
error, | ||
}, | ||
)] | ||
} | ||
from_fn::OutEvent::InboundEmitted(Ok(Some(InboundOutEvent::Unregister(namespace)))) => { | ||
self.registrations.remove(namespace.clone(), peer); | ||
|
||
vec![NetworkBehaviourAction::GenerateEvent( | ||
Event::PeerUnregistered { peer, namespace }, | ||
)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A lot of code here is only present to make things observable from the outside. We don't really need to emit all these events in order for the protocol to be functional.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this does indeed look quite promising!
} | ||
}) | ||
} | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, this looks pretty neat!
pub fn with_inbound_handler<TInbound, TInboundStream>( | ||
self, | ||
max_inbound: usize, | ||
handler: impl Fn(NegotiatedSubstream, PeerId, ConnectedPoint, Arc<TState>) -> TInboundStream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please include the protocol name (also for outbound).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At the moment, there is only one protocol possible so it will always be that one if that function is called.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know, and I’d like to change that. All prefabricated upgrades today ignore the negotiated protocol name, which to my mind leaves the second most important value provided by libp2p on the table (the first one being the ability to dial a PeerId).
max_inbound: usize, | ||
inbound_handler: Option< | ||
Box< | ||
dyn Fn( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAICS this restricts the part of the handler that constructs the stream processor to shared access to its closure — is there a reason for this? If not I’d prefer FnMut.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No particular reason, I guess this could be useful if you want to track connection-local state across all inbound/outbound streams?
That would only be shared across one closure though, meaning you wouldn't be able to share state between the inbound and outbound factory function.
I think we might be able to change this interface to allow for a single factory function that gets called for both inbound and outbound streams which would allow you to share state. Is that what you would be after?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sharing state is not my concern because doing that always proceeds via shared references (of which Arc is a special case). Fn
does not allow captured values to be modified (more precisely: mutably borrowed), even if they are owned.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fn
does not allow captured values to be modified (more precisely: mutably borrowed), even if they are owned.
If you want to mutate a captured value, you are effectively sharing the state within a connection across all stream handler factory invocations.
I think we are saying the same thing and I agree that this capability is useful :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, you mean “sharing along the timeline”, not “sharing in different places”. Yes, now we both know that we are in agreement :-)
/// state changes. | ||
/// | ||
/// [`NetworkBehaviour`]: crate::NetworkBehaviour | ||
pub fn from_fn(protocol: &'static str) -> Builder<WantState> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please allow multiple protocols, like
pub fn from_fn<I>(protocols: I) -> Builder<I, WantState>
where
I: IntoIterator + Clone + Send + 'static
I::Item: ProtocolName
{ ... }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that is doable!
{ | ||
if self.keep_alive.is_yes() { | ||
// TODO: Make timeout configurable | ||
self.keep_alive = KeepAlive::Until(Instant::now() + Duration::from_secs(10)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It feels weird that this grace period is implemented on the level of ConnectionHandler — shouldn’t it be a NetworkBehaviour setting that applies whenever KeepAlive toggles from Yes to No?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not happy with this either but I couldn't think of a better way to handle this. keep_alive
is per connection. My initial implementation would have been a with_keep_alive_timeout
configuration parameter on the builder that defaults to 30s or something.
PeerId, | ||
ConnectedPoint, | ||
Arc<TState>, | ||
) -> BoxStream<'static, TInbound> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I’d prefer type aliases for these function types
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean exactly? As a user, you shouldn't need to come in contact this this BoxStream
per se. Where would you like to see a type alias?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I only mean the readability of this source module by itself.
.connections | ||
.iter() | ||
.map(|(peer_id, conn_id)| (*peer_id, *conn_id, self.shared.clone())) | ||
.collect(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This erases previously undelivered wake-ups — which is fine unless it happens in a tight loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The old events are useless though because they contain stale state. I am not sure if there is a better way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, old events are useless. Perhaps I am overthinking this, my worry is that there may be a situation where only the first (few) connections are ever notified because updates keep coming in — a fairness concern. This would be solved by randomising the notification order, or by always finishing one batch of notifications (with values updated during the process!) before starting the next. The second is deterministic but more work to implement.
Getting these things wrong usually has low chance of severe impact, but when the impact happens it is really difficult for downstream code to find the bug.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that we currently drain events from the NetworkBehaviour
before we inject new ones into it so I don't think that this would happen but I'd have to double check that.
This pull request has merge conflicts. Could you please resolve them @thomaseizinger? 🙏 |
This pull request has merge conflicts. Could you please resolve them @thomaseizinger? 🙏 |
I think we can close this as won't-merge. It was an interesting exploration though! |
Description
The
from_fn
handler is a building block for describing protocols that have little or no connection-specific state. It encourages you to implement your protocol as a function of data without suspending the IO stream. This allows you to use regularasync
functions to express whatever message handshake you want.To allow sharing of data between connections,
from_fn
uses an abstraction calledShared
. It implements the observer pattern and pushes new versions of this data to all connections every time it gets updated. Each new stream (inbound or outbound) gets a copy of this state when created.The stream handlers can either complete with a single event or emit several events as they progress. Thus, this abstraction should also help with the much requested "streaming response" protocol.
With
from_fn
, you still need to implement your ownNetworkBehaviour
. This is considered to be relatively easy though. Not having to implement aConnectionHandler
is a major step towards better usability.Links to any relevant issues
Open Questions
async_fn_handler
?libp2p-request-response
?Open tasks
ReadyUpgrade
as separate PR: core/upgrade/: AddReadyUpgrade
#2855Change checklist