Skip to content

Commit

Permalink
Implement WebRTC messages framing (#2896)
Browse files Browse the repository at this point in the history
cc libp2p/specs#412
cc #1712

This PR finishes implementing the WebRTC spec by adding the last
remaining item: the messages framing.

Implementing this messages framing while minimizing the amount of data
copies is rather challenging.
Instead of going for the complicated solution, I went for the more easy
solution of having an intermediate read buffer where data is first
copied.
Going for the simple solution decreases the chances of bugs and
increases the ease of debugging, so it's preferable at the moment.

In the future, once WebRTC is fully working, we can rewrite this
messages framing code in a more optimized way.

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
tomaka and mergify[bot] authored Oct 19, 2022
1 parent adaa131 commit b7d13d7
Show file tree
Hide file tree
Showing 11 changed files with 585 additions and 159 deletions.
2 changes: 1 addition & 1 deletion bin/light-base/src/json_rpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1289,7 +1289,7 @@ impl<TPlat: Platform> Background<TPlat> {

async fn storage_query(
&self,
keys: impl Iterator<Item = impl AsRef<[u8]>> + Clone,
keys: impl Iterator<Item = impl AsRef<[u8]> + Clone> + Clone,
hash: &[u8; 32],
total_attempts: u32,
timeout_per_request: Duration,
Expand Down
2 changes: 1 addition & 1 deletion bin/light-base/src/network_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ impl<TPlat: Platform> NetworkService<TPlat> {
self: Arc<Self>,
chain_index: usize,
target: PeerId, // TODO: takes by value because of futures longevity issue
config: protocol::StorageProofRequestConfig<impl Iterator<Item = impl AsRef<[u8]>>>,
config: protocol::StorageProofRequestConfig<impl Iterator<Item = impl AsRef<[u8]> + Clone>>,
timeout: Duration,
) -> Result<service::EncodedMerkleProof, StorageProofRequestError> {
let rx = {
Expand Down
91 changes: 41 additions & 50 deletions bin/light-base/src/network_service/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,12 +382,7 @@ async fn multi_stream_connection_task<TPlat: Platform>(
// from this slice the data to send. Consequently, the write buffer is held locally. This is
// suboptimal compared to writing to a write buffer provided by the platform, but it is easier
// to implement it this way.
let mut write_buffer = vec![0; 4096];

// When reading/writing substreams, the substream can ask to be woken up after a certain time.
// This variable stores the earliest time when we should be waking up.
// TODO: this is wrong; this code assumes that substreams will be found in `ready_substreams` while it is not the case now; however it seems more appropriate to modify `ready_substreams` rather than accomodate this limitation here
let mut wake_up_after = None;
let mut write_buffer = vec![0; 16384]; // TODO: the write buffer must not exceed 16kiB due to the libp2p WebRTC spec; this should ideally be enforced through the connection task API

loop {
// Start opening new outbound substreams, if needed.
Expand Down Expand Up @@ -424,56 +419,52 @@ async fn multi_stream_connection_task<TPlat: Platform>(

let now = TPlat::now();

// Clear `wake_up_after` if necessary, otherwise it will always stay at a constant value.
// TODO: nit: can use `Option::is_some_and` after it's stable; https://github.com/rust-lang/rust/issues/93050
if wake_up_after
.as_ref()
.map(|time| *time <= now)
.unwrap_or(false)
{
wake_up_after = None;
}
// When reading/writing substreams, the substream can ask to be woken up after a certain
// time. This variable stores the earliest time when we should be waking up.
let mut wake_up_after = None;

// Perform a read-write on all substreams.
// TODO: trying to read/write every single substream every single time is suboptimal, but making this not suboptimal is very complicated
for substream_id in open_substreams.iter().map(|(id, _)| id).collect::<Vec<_>>() {
let substream = &mut open_substreams[substream_id];

let mut read_write = ReadWrite {
now: now.clone(),
incoming_buffer: TPlat::read_buffer(substream),
outgoing_buffer: Some((&mut write_buffer, &mut [])), // TODO: this should be None if a previous read_write() produced None
read_bytes: 0,
written_bytes: 0,
wake_up_after: None,
};

let kill_substream =
connection_task.substream_read_write(&substream_id, &mut read_write);

// Because the `read_write` object borrows the stream, we need to drop it before we
// can modify the connection. Before dropping the `read_write`, clone some important
// information from it.
let read_bytes = read_write.read_bytes;
let written_bytes = read_write.written_bytes;
match (&mut wake_up_after, &read_write.wake_up_after) {
(_, None) => {}
(val @ None, Some(t)) => *val = Some(t.clone()),
(Some(curr), Some(upd)) if *upd < *curr => *curr = upd.clone(),
(Some(_), Some(_)) => {}
}
drop(read_write);
loop {
let substream = &mut open_substreams[substream_id];

let mut read_write = ReadWrite {
now: now.clone(),
incoming_buffer: TPlat::read_buffer(substream),
outgoing_buffer: Some((&mut write_buffer, &mut [])), // TODO: this should be None if a previous read_write() produced None
read_bytes: 0,
written_bytes: 0,
wake_up_after,
};

let kill_substream =
connection_task.substream_read_write(&substream_id, &mut read_write);

// Because the `read_write` object borrows the stream, we need to drop it before we
// can modify the connection. Before dropping the `read_write`, clone some important
// information from it.
let read_bytes = read_write.read_bytes;
let written_bytes = read_write.written_bytes;
wake_up_after = read_write.wake_up_after.take();
drop(read_write);

// Now update the connection.
if written_bytes != 0 {
TPlat::send(substream, &write_buffer[..written_bytes]);
}
TPlat::advance_read_cursor(substream, read_bytes);

// Now update the connection.
if written_bytes != 0 {
TPlat::send(substream, &write_buffer[..written_bytes]);
}
TPlat::advance_read_cursor(substream, read_bytes);
// If the `connection_task` requires this substream to be killed, we drop the `Stream`
// object.
if kill_substream {
open_substreams.remove(substream_id);
break;
}

// If the `connection_task` requires this substream to be killed, we drop the `Stream`
// object.
if kill_substream {
open_substreams.remove(substream_id);
if read_bytes == 0 && written_bytes == 0 {
break;
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion bin/light-base/src/sync_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ impl<TPlat: Platform> SyncService<TPlat> {
block_number: u64,
block_hash: &[u8; 32],
storage_trie_root: &[u8; 32],
requested_keys: impl Iterator<Item = impl AsRef<[u8]>> + Clone,
requested_keys: impl Iterator<Item = impl AsRef<[u8]> + Clone> + Clone,
total_attempts: u32,
timeout_per_request: Duration,
_max_parallel: NonZeroU32,
Expand Down
4 changes: 4 additions & 0 deletions bin/wasm-node/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

### Changed

- The WebRTC protocol implementation is now up to date with the specification. While the specification hasn't been finalized yet and could still evolve, the current version is believed to be likely to be final. ([#2896](https://github.com/paritytech/smoldot/pull/2896))

### Fixed

- Fix timeout not being checked when opening a notifications substream. ([#2323](https://github.com/paritytech/smoldot/pull/2323))
Expand Down
14 changes: 8 additions & 6 deletions bin/wasm-node/javascript/src/index-browser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -256,15 +256,15 @@ export function start(options?: ClientOptions): Client {
"v=0" + "\n" +
// Identifies the creator of the SDP document. We are allowed to use dummy values
// (`-` and `0.0.0.0`) to remain anonymous, which we do. Note that "IN" means
// "Internet". (RFC8866)
// "Internet" (and not "input"). (RFC8866)
"o=- 0 0 IN IP" + ipVersion + " " + targetIp + "\n" +
// Name for the session. We are allowed to pass a dummy `-`. (RFC8866)
"s=-" + "\n" +
// Start and end of the validity of the session. `0 0` means that the session never
// expires. (RFC8866)
"t=0 0" + "\n" +
// A lite implementation is only appropriate for devices that will
// *always* be connected to the public Internet and have a public
// always be connected to the public Internet and have a public
// IP address at which it can receive packets from any
// correspondent. ICE will not function when a lite implementation
// is placed behind a NAT (RFC8445).
Expand All @@ -273,12 +273,12 @@ export function start(options?: ClientOptions): Client {
// The protocol in this line (i.e. `TCP/DTLS/SCTP` or `UDP/DTLS/SCTP`) must always be
// the same as the one in the offer. We know that this is true because we tweak the
// offer to match the protocol.
// The `<fmt>` component must always be `pc-datachannel` for WebRTC.
// The `<fmt>` component must always be `webrtc-datachannel` for WebRTC.
// The rest of the SDP payload adds attributes to this specific media stream.
// RFCs: 8839, 8866, 8841
"m=application " + targetPort + " " + "UDP/DTLS/SCTP webrtc-datachannel" + "\n" +
// Indicates the IP address of the remote.
// Note that "IN" means "Internet".
// Note that "IN" means "Internet" (and not "input").
"c=IN IP" + ipVersion + " " + targetIp + "\n" +
// Media ID - uniquely identifies this media stream (RFC9143).
"a=mid:0" + "\n" +
Expand All @@ -287,6 +287,7 @@ export function start(options?: ClientOptions): Client {
// ICE username and password, which are used for establishing and
// maintaining the ICE connection. (RFC8839)
// MUST match ones used by the answerer (server).
// These values are set according to the libp2p WebRTC specification.
"a=ice-ufrag:" + remoteCertMultibase + "\n" +
"a=ice-pwd:" + remoteCertMultibase + "\n" +
// Fingerprint of the certificate that the server will use during the TLS
Expand All @@ -303,8 +304,9 @@ export function start(options?: ClientOptions): Client {
// (UDP or TCP)
"a=sctp-port:5000" + "\n" +
// The maximum SCTP user message size (in bytes) (RFC8841)
"a=max-message-size:100000" + "\n" +
// A transport address for a candidate that can be used for connectivity checks (RFC8839).
"a=max-message-size:16384" + "\n" + // TODO: should this be part of the spec?
// A transport address for a candidate that can be used for connectivity
// checks (RFC8839).
"a=candidate:1 1 UDP 1 " + targetIp + " " + targetPort + " typ host" + "\n";

await pc!.setRemoteDescription({ type: "answer", sdp: remoteSdp });
Expand Down
Loading

0 comments on commit b7d13d7

Please sign in to comment.