Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transports/quic: Adapt QuicMuxer to upstream StreamMuxer changes #6

Closed
wants to merge 97 commits into from

Conversation

elenaf9
Copy link
Owner

@elenaf9 elenaf9 commented Aug 3, 2022

Description

Merge latest upstream master and adapt QuixMuxer to StreamMuxer trait changes from libp2p#2724. Relevant commit is 57840a3.

Discussed in libp2p#2722 (comment) and kpp#19 (comment).

//CC @kpp @thomaseizinger - unfortunately can not explicitly request a review from you, but I would appreciate one.

mxinden and others added 24 commits July 14, 2022 06:15
…ibp2p#2752)

Document that the `ConnectionHandler` implementation has to enforce a limit on
the number of inbound substreams.
…ibp2p#2734)

* misc/metrics: Explicitly delegate event recording to each recorder

This allows delegating a single event to multiple `Recorder`s. That enables e.g. the
`identify::Metrics` `Recorder` to act both on `IdentifyEvent` and `SwarmEvent`. The latter enables
it to garbage collect per peer data on disconnects.

* protocols/dcutr: Expose PROTOCOL_NAME

* protocols/identify: Expose PROTOCOL_NAME and PUSH_PROTOCOL_NAME

* protocols/ping: Expose PROTOCOL_NAME

* protocols/relay: Expose HOP_PROTOCOL_NAME and STOP_PROTOCOL_NAME

* misc/metrics: Track # connected nodes supporting specific protocol

An example metric exposed with this patch:

```
libp2p_identify_protocols{protocol="/ipfs/ping/1.0.0"} 10
```

This implies that 10 of the currently connected nodes support the ping protocol.
…nd,address_change,close}` (libp2p#2724)

Instead of having a mix of `poll_event`, `poll_outbound` and `poll_close`, we
flatten the entire interface of `StreamMuxer` into 4 individual functions:

- `poll_inbound`
- `poll_outbound`
- `poll_address_change`
- `poll_close`

This design is closer to the design of other async traits like `AsyncRead` and
`AsyncWrite`. It also allows us to delete the `StreamMuxerEvent`.
* build(deps): Bump Swatinem/rust-cache from 1.4.0 to 2.0.0

Bumps [Swatinem/rust-cache](https://github.com/Swatinem/rust-cache) from 1.4.0 to 2.0.0.
- [Release notes](https://github.com/Swatinem/rust-cache/releases)
- [Changelog](https://github.com/Swatinem/rust-cache/blob/master/CHANGELOG.md)
- [Commits](Swatinem/rust-cache@cb2cf0c...6720f05)

---
updated-dependencies:
- dependency-name: Swatinem/rust-cache
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
Follow up on libp2p#2724. Given that
libp2p-core is bumped to v0.35.0, libp2p-tcp needs to be bumped as well.
….0 (libp2p#2761)

* build(deps): Update prometheus-client requirement from 0.16.0 to 0.17.0

Updates the requirements on [prometheus-client](https://github.com/prometheus/client_rust) to permit the latest version.
- [Release notes](https://github.com/prometheus/client_rust/releases)
- [Changelog](https://github.com/prometheus/client_rust/blob/master/CHANGELOG.md)
- [Commits](prometheus/client_rust@v0.16.0...v0.17.0)

---
updated-dependencies:
- dependency-name: prometheus-client
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
We are already boxing the given object so we might as well pin to
to avoid the `Unpin` trait bound.
…rBox` (libp2p#2775)

`StreamMuxerBox` is never shared across threads but owned by a single
connection. Restricting it to be `Sync` unnecessarily limits the design
space around the `StreamMuxer` trait and its implementations.
…ibp2p#2765)

Co-authored-by: Elena Frank <elena.frank@protonmail.com>
Co-authored-by: Max Inden <mail@max-inden.de>
Copy link

@thomaseizinger thomaseizinger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!

I've left a few comments :)

while let Poll::Ready(event) = self.connection.poll_event(cx) {
match event {
ConnectionEvent::Connected => {
tracing::error!("Unexpected Connected event on established QUIC connection");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

error seems a bit much here. I wouldn't won't be woken at 3am because my production app is reporting _error_s and then seeing it is this one which is practically harmless :)

Why is this unexpected in the first place?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConnectionEvent::Connected should only be returned a single time, which is when we finished all the crypto and established a connection.
In quic::upgrade::Update we poll a pending new connection until it returns ConnectionEvent::Connected. Only then we create the QuicMuxer for this connection. Hence within QuicMuxer the event should not happen again.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it really should not happen, then perhaps put a debug assert here?

transports/quic/src/muxer.rs Outdated Show resolved Hide resolved
transports/quic/src/muxer.rs Show resolved Hide resolved
transports/quic/src/muxer.rs Outdated Show resolved Hide resolved
transports/quic/src/muxer.rs Outdated Show resolved Hide resolved
Comment on lines 208 to 212
while let Poll::Ready(event) = inner.connection.poll_event(cx) {
if let ConnectionEvent::ConnectionLost(_) = event {
return Poll::Ready(Ok(()));
}
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's assume this returns Poll::Pending at some point. Then we will break out of this loop and register a waker further down. Once we are woken, poll_close gets called again and we go straight into inner.poll_connection which will likely yield the ConnectionEvent::ConnectionLost. I don't think we will be observing it here then, right?

I think we may not want to call poll_connection at the top here or maybe return an error from poll_connection in case it has been closed?

transports/quic/src/muxer.rs Outdated Show resolved Hide resolved
transports/quic/src/muxer.rs Outdated Show resolved Hide resolved
transports/quic/src/muxer.rs Outdated Show resolved Hide resolved
transports/quic/src/muxer.rs Outdated Show resolved Hide resolved
@thomaseizinger
Copy link

If we end up merging libp2p#2797, we could only poll the inner connection within poll_event actually and not on every poll function.

Generate `NetworkBehaviour::OutEvent` if not provided through
`#[behaviour(out_event = "MyOutEvent")]` and event processing is
disabled (default).
@elenaf9
Copy link
Owner Author

elenaf9 commented Sep 19, 2022

Sorry but your code stuck in libp2p-perf:
log

How to reproduce: kpp@libp2p-perf:quic-reimpl#57b46925c4714:

diff --git a/rust/Cargo.toml b/rust/Cargo.toml
index 493da63..65fbe13 100644
--- a/rust/Cargo.toml
+++ b/rust/Cargo.toml
@@ -7,7 +7,8 @@ edition = "2018"
 # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
 
 [dependencies]
-libp2p = { git = "https://github.com/elenaf9/rust-libp2p/", branch = "quic/multiple-endpoints-2", version = "0.47.0", default-features = false, features = ["dns-async-std", "noise", "plaintext", "tcp-async-io", "yamux", "quic"] }
+#libp2p = { git = "https://github.com/elenaf9/rust-libp2p/", branch = "quic/multiple-endpoints-2", version = "0.47.0", default-features = false, features = ["dns-async-std", "noise", "plaintext", "tcp-async-io", "yamux", "quic"] }
+libp2p = { git = "https://github.com/elenaf9/rust-libp2p/", branch = "quic/muxer", version = "0.47.0", default-features = false, features = ["dns-async-std", "noise", "plaintext", "tcp-async-io", "yamux", "quic"] }

build and ./run.sh.

Should be fixed with 4e027b1 and bdba780. @kpp Would you mind testing again?

The existing implementation was based on an old API of the quinn_proto
Endpoint which by now has changed. In particular we can not explicitly
`accept` new connections on the endpoint anymore.
Instead if there is a new connections and our channel for new
connections is full because the endpoint is too busy, we now simply
drop the connection to backpressure the remote.
@elenaf9
Copy link
Owner Author

elenaf9 commented Sep 20, 2022

Thank you for the continuous reviews @thomaseizinger. I think I addressed everything.

There are still two major TODOs in the code, namely

  • Handling of ECN information. Right I think the best way to solve this is use the quinn_udp implementation, which support reading / writing ECN bits for packets. But I have not looked into it in detail yet.
  • Set the local address for connections correctly if the socket address is a wildcard address.

They are unrelated to the change of this PR.
However, while this PR was originally meant to only target the QuicMuxer, I have already touched quite a lot of code also outside of the muxer module. Due to this, I am leaning towards resolving the above issues also within this PR. @kpp wdyt?

Copy link

@thomaseizinger thomaseizinger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work!

I'd be in favor of merging this into the main QUIC branch rather sooner than later. Reviewing this is unnecessarily hard because there are so many unrelated changes in here.

@@ -183,16 +186,14 @@ impl Connection {
/// On success, a [`quinn_proto::StreamEvent::Finished`] event will later be produced when the
/// substream has been effectively closed. A [`ConnectionEvent::StreamStopped`] event can also
/// be emitted.
pub fn shutdown_substream(
pub fn finish_substream(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the docs, this only closes the write-side. If that is the case, perhaps close or close_write would be a better name?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I prefer to stick with finish_substream because it is consistent if the name of the event it produces quinn_proto::StreamEvent::Finished / ConnectionEvent::StreamFinished.
But not a strong opinion, happy to change it to close_write if you insist :)

transports/quic/src/muxer.rs Show resolved Hide resolved
transports/quic/src/transport.rs Show resolved Hide resolved
transports/quic/src/endpoint.rs Show resolved Hide resolved
swarm-derive/src/lib.rs Show resolved Hide resolved
transports/quic/src/endpoint.rs Show resolved Hide resolved
transports/quic/src/endpoint.rs Show resolved Hide resolved
@kpp
Copy link

kpp commented Sep 21, 2022

Due to this, I am leaning towards resolving the above issues also within this PR. @kpp wdyt?

@elenaf9 I don't think this is a good idea. Let's merge this one ASAP.

Should be fixed with 4e027b1 and bdba780. @kpp Would you mind testing again?

I tested b7103aa only, it works! Here are perf logs:

b7103aa
# Start Rust and Golang servers.
Local peer id: PeerId("Qmcqq9TFaYbb94uwdER1BXyGfCFY4Bb1gKozxNyVvLvTSw")
about to listen on "/ip4/127.0.0.1/udp/9992/quic"
Listening on "/ip4/127.0.0.1/udp/9992/quic".

# Rust -> Rust

Local peer id: PeerId("12D3KooWPGMKCBwUb4ZdxTsAJkkZQq2Topzcazjtt77hS6GYEKUP")
IncomingConnection { local_addr: "/ip4/127.0.0.1/udp/9992/quic", send_back_addr: "/ip4/127.0.0.1/udp/45161/quic" }
ConnectionEstablished { peer_id: PeerId("12D3KooWPGMKCBwUb4ZdxTsAJkkZQq2Topzcazjtt77hS6GYEKUP"), endpoint: Listener { local_addr: "/ip4/127.0.0.1/udp/9992/quic", send_back_addr: "/ip4/127.0.0.1/udp/45161/quic" }, num_established: 1, concurrent_dial_errors: None }
Behaviour(PerfRunDone(10.004522187s, 982206277))
Interval	Transfer	Bandwidth
0 s - 10.01 s	982 MBytes	785.16 MBit/s
ConnectionClosed { peer_id: PeerId("12D3KooWPGMKCBwUb4ZdxTsAJkkZQq2Topzcazjtt77hS6GYEKUP"), endpoint: Listener { local_addr: "/ip4/127.0.0.1/udp/9992/quic", send_back_addr: "/ip4/127.0.0.1/udp/45161/quic" }, num_established: 0, cause: Some(IO(Custom { kind: Other, error: ConnectionLost(Quinn(ApplicationClosed(ApplicationClose { error_code: 0, reason: b"" }))) })) }
ConnectionClosed { peer_id: PeerId("Qmcqq9TFaYbb94uwdER1BXyGfCFY4Bb1gKozxNyVvLvTSw"), endpoint: Dialer { address: "/ip4/127.0.0.1/udp/9992/quic", role_override: Dialer }, num_established: 0, cause: None }

# Rust -> Golang

Local peer id: PeerId("12D3KooWSeYNmCJguvQGfVvDJQbTnaDWvi4DiTVeSqEBFZJgPAr7")
Interval	Transfer	Bandwidth
0 s - 10.00 s	1094 MBytes	875.04 MBit/s
ConnectionClosed { peer_id: PeerId("12D3KooWL3XJ9EMCyZvmmGXL2LMiVBtrVa2BuESsJiXkSj7333Jw"), endpoint: Dialer { address: "/ip4/127.0.0.1/udp/9993/quic", role_override: Dialer }, num_established: 0, cause: None }


# Golang -> Rust

2022/09/21 21:01:31 failed to sufficiently increase receive buffer size (was: 208 kiB, wanted: 2048 kiB, got: 416 kiB). See https://github.com/lucas-clemente/quic-go/wiki/UDP-Receive-Buffer-Size for details.
IncomingConnection { local_addr: "/ip4/127.0.0.1/udp/9992/quic", send_back_addr: "/ip4/127.0.0.1/udp/39035/quic" }
ConnectionEstablished { peer_id: PeerId("12D3KooWSrnuqq4MQNqdyW5KPNdV7Ynx6bCwNBnEF9ZzSdV5LEU2"), endpoint: Listener { local_addr: "/ip4/127.0.0.1/udp/9992/quic", send_back_addr: "/ip4/127.0.0.1/udp/39035/quic" }, num_established: 1, concurrent_dial_errors: None }
Interval 	Transfer	Bandwidth
0s - 10.00 s 	931 MBytes	744.76 MBit/s
Behaviour(PerfRunDone(10.000510422s, 931200000))
ConnectionClosed { peer_id: PeerId("12D3KooWSrnuqq4MQNqdyW5KPNdV7Ynx6bCwNBnEF9ZzSdV5LEU2"), endpoint: Listener { local_addr: "/ip4/127.0.0.1/udp/9992/quic", send_back_addr: "/ip4/127.0.0.1/udp/39035/quic" }, num_established: 0, cause: Some(IO(Custom { kind: Other, error: ConnectionLost(Quinn(ApplicationClosed(ApplicationClose { error_code: 0, reason: b"" }))) })) }

# Golang -> Golang

2022/09/21 21:01:41 failed to sufficiently increase receive buffer size (was: 208 kiB, wanted: 2048 kiB, got: 416 kiB). See https://github.com/lucas-clemente/quic-go/wiki/UDP-Receive-Buffer-Size for details.
Interval 	Transfer	Bandwidth
0s - 10.00 s 	1077 MBytes	861.50 MBit/s

@elenaf9
Copy link
Owner Author

elenaf9 commented Sep 21, 2022

Due to this, I am leaning towards resolving the above issues also within this PR. @kpp wdyt?

@elenaf9 I don't think this is a good idea. Let's merge this one ASAP.

Okay. From my side this is ready for merge then. Will create the PR tomorrow.

Should be fixed with 4e027b1 and bdba780. @kpp Would you mind testing again?

I tested b7103aa only, it works! Here are perf logs:

Thanks for testing.
Concerning the concrete numbers: I don't think we should rely on them too much. E.g. compared to the run in libp2p#2289 (comment) the numbers have significantly decreased, however they also decreased for the go -> go test (1273.55 MBit/s vs 861.50 MBit/s), where no code at all has changed between the two runs. Also, when I run them on my local machine on the same commit the numbers differ again, even between individual runs.

@elenaf9
Copy link
Owner Author

elenaf9 commented Sep 22, 2022

Merged via kpp#23.

@elenaf9 elenaf9 closed this Sep 22, 2022
elenaf9 added a commit to kpp/rust-libp2p that referenced this pull request Sep 22, 2022
@elenaf9 elenaf9 deleted the quic/muxer branch September 22, 2022 09:19
@elenaf9 elenaf9 restored the quic/muxer branch October 16, 2022 15:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.