diff --git a/bin/light-base/src/network_service/tasks.rs b/bin/light-base/src/network_service/tasks.rs index b8fc190c70..756af6c90c 100644 --- a/bin/light-base/src/network_service/tasks.rs +++ b/bin/light-base/src/network_service/tasks.rs @@ -18,7 +18,7 @@ use super::Shared; use crate::platform::{Platform, PlatformConnection, PlatformSubstreamDirection}; -use alloc::{string::ToString as _, sync::Arc, vec}; +use alloc::{string::ToString as _, sync::Arc, vec, vec::Vec}; use core::{iter, pin::Pin}; use futures::{channel::mpsc, prelude::*}; use smoldot::{libp2p::read_write::ReadWrite, network::service}; @@ -372,6 +372,8 @@ async fn multi_stream_connection_task( let mut pending_opening_out_substreams = 0; // Newly-open substream that has just been yielded by the connection. let mut newly_open_substream = None; + // `true` if the remote has force-closed our connection. + let mut has_reset = false; // List of all currently open substreams. The index (as a `usize`) corresponds to the id // of this substream within the `connection_task` state machine. let mut open_substreams = slab::Slab::::with_capacity(16); @@ -433,12 +435,12 @@ async fn multi_stream_connection_task( } // Perform a read-write on all substreams that are ready. - loop { - let substream_id = match connection_task.ready_substreams().next() { - Some(s) => *s, - None => break, - }; - + // TODO: what is a ready substream is a bit of a clusterfuck, figure out + for substream_id in connection_task + .ready_substreams() + .copied() + .collect::>() + { let substream = &mut open_substreams[substream_id]; let mut read_write = ReadWrite { @@ -576,8 +578,15 @@ async fn multi_stream_connection_task( debug_assert!(newly_open_substream.is_none()); futures::select! { _ = message_from_coordinator => {} - substream = TPlat::next_substream(&mut connection).fuse() => { - newly_open_substream = substream; + substream = if has_reset { either::Right(future::pending()) } else { either::Left(TPlat::next_substream(&mut connection)) }.fuse() => { + match substream { + Some(s) => newly_open_substream = Some(s), + None => { + // `None` is returned if the remote has force-closed the connection. + connection_task.reset(); + has_reset = true; + } + } } _ = poll_after => {} _ = data_ready.fuse() => {} diff --git a/bin/wasm-node/CHANGELOG.md b/bin/wasm-node/CHANGELOG.md index 9f77d62ffa..9667b90d29 100644 --- a/bin/wasm-node/CHANGELOG.md +++ b/bin/wasm-node/CHANGELOG.md @@ -12,6 +12,7 @@ ### Added +- Add experimental support for WebRTC according to the in-progress specification for libp2p-webrtc. For now this feature must explicitly be enabled by passing `enableExperimentalWebRTC: true` as part of the ̀`ClientConfig`. The multiaddress format for WebRTC is `/ip4/.../udp/.../webrtc/certhash/...` (or `/ip6/...`), where the payload behind `/certhash` is a multibase-encoded multihash-encoded SHA256 of the DTLS certificate used by the remote. ([#2579](https://github.com/paritytech/smoldot/pull/2579)) - Add support for the `chainHead_unstable_finalizedDatabase` JSON-RPC method. This JSON-RPC method aims to be a replacement for the `databaseContent` method of the `Chain` and is expected to remain a permanently unstable smoldot-specific function. ([#2749](https://github.com/paritytech/smoldot/pull/2749)) ## 0.6.33 - 2022-09-13 diff --git a/bin/wasm-node/javascript/package-lock.json b/bin/wasm-node/javascript/package-lock.json index 2345ce7683..4849b9ce1c 100644 --- a/bin/wasm-node/javascript/package-lock.json +++ b/bin/wasm-node/javascript/package-lock.json @@ -445,29 +445,6 @@ "node": ">=8" } }, - "node_modules/ava/node_modules/debug": { - "version": "4.3.4", - "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.4.tgz", - "integrity": "sha512-PRWFHuSU3eDtQJPvnNY7Jcket1j0t5OuOsFzPPzsekD52Zl8qUfFIPEiswXqIvHWGVHOgX+7G/vCNNhehwxfkQ==", - "dev": true, - "dependencies": { - "ms": "2.1.2" - }, - "engines": { - "node": ">=6.0" - }, - "peerDependenciesMeta": { - "supports-color": { - "optional": true - } - } - }, - "node_modules/ava/node_modules/debug/node_modules/ms": { - "version": "2.1.2", - "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz", - "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==", - "dev": true - }, "node_modules/ava/node_modules/globby": { "version": "13.1.1", "resolved": "https://registry.npmjs.org/globby/-/globby-13.1.1.tgz", @@ -1336,6 +1313,23 @@ "node": ">=6" } }, + "node_modules/debug": { + "version": "4.3.4", + "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.4.tgz", + "integrity": "sha512-PRWFHuSU3eDtQJPvnNY7Jcket1j0t5OuOsFzPPzsekD52Zl8qUfFIPEiswXqIvHWGVHOgX+7G/vCNNhehwxfkQ==", + "dev": true, + "dependencies": { + "ms": "2.1.2" + }, + "engines": { + "node": ">=6.0" + }, + "peerDependenciesMeta": { + "supports-color": { + "optional": true + } + } + }, "node_modules/decamelize": { "version": "1.2.0", "resolved": "https://registry.npmjs.org/decamelize/-/decamelize-1.2.0.tgz", @@ -2445,6 +2439,12 @@ "mkdirp": "bin/cmd.js" } }, + "node_modules/ms": { + "version": "2.1.2", + "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz", + "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==", + "dev": true + }, "node_modules/node-gyp-build": { "version": "4.2.3", "resolved": "https://registry.npmjs.org/node-gyp-build/-/node-gyp-build-4.2.3.tgz", @@ -4213,23 +4213,6 @@ } } }, - "debug": { - "version": "4.3.4", - "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.4.tgz", - "integrity": "sha512-PRWFHuSU3eDtQJPvnNY7Jcket1j0t5OuOsFzPPzsekD52Zl8qUfFIPEiswXqIvHWGVHOgX+7G/vCNNhehwxfkQ==", - "dev": true, - "requires": { - "ms": "2.1.2" - }, - "dependencies": { - "ms": { - "version": "2.1.2", - "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz", - "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==", - "dev": true - } - } - }, "globby": { "version": "13.1.1", "resolved": "https://registry.npmjs.org/globby/-/globby-13.1.1.tgz", @@ -4916,6 +4899,15 @@ "time-zone": "^1.0.0" } }, + "debug": { + "version": "4.3.4", + "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.4.tgz", + "integrity": "sha512-PRWFHuSU3eDtQJPvnNY7Jcket1j0t5OuOsFzPPzsekD52Zl8qUfFIPEiswXqIvHWGVHOgX+7G/vCNNhehwxfkQ==", + "dev": true, + "requires": { + "ms": "2.1.2" + } + }, "decamelize": { "version": "1.2.0", "resolved": "https://registry.npmjs.org/decamelize/-/decamelize-1.2.0.tgz", @@ -5771,6 +5763,12 @@ "minimist": "^1.2.5" } }, + "ms": { + "version": "2.1.2", + "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz", + "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==", + "dev": true + }, "node-gyp-build": { "version": "4.2.3", "resolved": "https://registry.npmjs.org/node-gyp-build/-/node-gyp-build-4.2.3.tgz", diff --git a/bin/wasm-node/javascript/src/base64.ts b/bin/wasm-node/javascript/src/base64.ts new file mode 100644 index 0000000000..57df72ab00 --- /dev/null +++ b/bin/wasm-node/javascript/src/base64.ts @@ -0,0 +1,115 @@ +// Smoldot +// Copyright (C) 2019-2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +let rfc4648Alphabet: Map = new Map(); +const rfc4648AlphabetAsStr = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; +for (let i = 0; i < rfc4648AlphabetAsStr.length; ++i) { + rfc4648Alphabet.set(rfc4648AlphabetAsStr[i]!, i) +} + +let urlSafeAlphabet: Map = new Map(); +const urlSafeAlphabetAsStr = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_"; +for (let i = 0; i < urlSafeAlphabetAsStr.length; ++i) { + urlSafeAlphabet.set(urlSafeAlphabetAsStr[i]!, i) +} + +/** + * Decodes a multibase-encoded string. + * + * Throws an exception if the encoding isn't base64 or one of its variants. + */ +export function multibaseBase64Decode(input: string): Uint8Array { + if (input.length === 0) + throw new Error("Invalid multibase"); + + switch (input[0]) { + case 'm': + case 'M': + return classicDecode(input.slice(1)) + case 'u': + case 'U': + return urlSafeDecode(input.slice(1)) + default: + throw new Error('Unknown multibase prefix: ' + input[0]); + } +} + +/** + * Decodes a base64-encoded string into bytes using the original alphabet from RFC4648. + * + * See . + */ +export function classicDecode(input: string): Uint8Array { + return base64Decode(input, rfc4648Alphabet); +} + +/** + * Decodes a base64-encoded string into bytes using the URL-safe alphabet. + * + * See . + */ +export function urlSafeDecode(input: string): Uint8Array { + return base64Decode(input, urlSafeAlphabet); +} + +/** + * Decodes a base64-encoded string into bytes using the given alphabet. + */ +export function base64Decode(input: string, alphabet: Map): Uint8Array { + // Remove the padding bytes at the end of the string. We don't check whether the padding is + // accurate. + while (input.length !== 0 && input[input.length - 1] === '=') + input = input.slice(0, -1) + + // Contains the output data. + const out = new Uint8Array(Math.floor(input.length * 6 / 8)); + // Position within `out` of the next byte to write. + let outPos = 0; + + // The bits decoded from the input are added to the right of this value. + let currentByte = 0; + // The left-most `validBitsInCurrentByte` bits of `currentByte` must be written out. + let validBitsInCurrentByte = 0; + + for (let i = 0; i < input.length; ++i) { + const inputChr = input[i]!; + + const bitsToAppend = alphabet.get(inputChr); + if (bitsToAppend === undefined) + throw new Error('Invalid base64 character: ' + inputChr); + console.assert(bitsToAppend < (1 << 6)); + + currentByte = (currentByte << 6) | bitsToAppend; + validBitsInCurrentByte += 6; + + if (validBitsInCurrentByte >= 8) { + let outByte = currentByte >> (validBitsInCurrentByte - 8); + out[outPos] = outByte; + outPos += 1; + validBitsInCurrentByte -= 8; + } + console.assert(validBitsInCurrentByte < 8); + currentByte &= 0xff; + } + + if ((currentByte & ((1 << validBitsInCurrentByte) - 1)) !== 0) + throw new Error("Unexpected EOF"); + if (validBitsInCurrentByte >= 6) + throw new Error("Unexpected EOF"); + + return out; +} diff --git a/bin/wasm-node/javascript/src/client.ts b/bin/wasm-node/javascript/src/client.ts index ee856ac9ad..5b5acb0e15 100644 --- a/bin/wasm-node/javascript/src/client.ts +++ b/bin/wasm-node/javascript/src/client.ts @@ -253,6 +253,16 @@ export interface ClientOptions { * connections. */ forbidWss?: boolean; + + /** + * Enable experimental support for WebRTC connections. + * + * Support for WebRTC connections is currently in progress and might have significant issues. + * + * This flag currently defaults to `false`. In a later version, it will be removed and WebRTC + * connections will be enabled by default. + */ + enableExperimentalWebRTC?: boolean; } /** diff --git a/bin/wasm-node/javascript/src/index-browser.ts b/bin/wasm-node/javascript/src/index-browser.ts index fc0d431161..ec03dcd124 100644 --- a/bin/wasm-node/javascript/src/index-browser.ts +++ b/bin/wasm-node/javascript/src/index-browser.ts @@ -19,6 +19,7 @@ import { Client, ClientOptions, start as innerStart } from './client.js' import { Connection, ConnectionError, ConnectionConfig } from './instance/instance.js'; +import { classicDecode, multibaseBase64Decode } from './base64.js' import { inflate } from 'pako'; export { @@ -46,7 +47,7 @@ export function start(options?: ClientOptions): Client { return innerStart(options, { trustedBase64DecodeAndZlibInflate: (input) => { - return Promise.resolve(inflate(trustedBase64Decode(input))) + return Promise.resolve(inflate(classicDecode(input))) }, performanceNow: () => { return performance.now() @@ -58,42 +59,33 @@ export function start(options?: ClientOptions): Client { crypto.getRandomValues(buffer); }, connect: (config) => { - return connect(config, options?.forbidWs || false, options?.forbidNonLocalWs || false, options?.forbidWss || false) + return connect( + config, + options?.forbidWs || false, + options?.forbidNonLocalWs || false, + options?.forbidWss || false, + !(options?.enableExperimentalWebRTC || false) + ) } }) } -/** - * Decodes a base64 string. - * - * The input is assumed to be correct. - */ -function trustedBase64Decode(base64: string): Uint8Array { - // This code is a bit sketchy due to the fact that we decode into a string, but it seems to - // work. - const binaryString = atob(base64); - const size = binaryString.length; - const bytes = new Uint8Array(size); - for (let i = 0; i < size; i++) { - bytes[i] = binaryString.charCodeAt(i); - } - return bytes; -} - /** * Tries to open a new connection using the given configuration. * * @see Connection * @throws ConnectionError If the multiaddress couldn't be parsed or contains an invalid protocol. */ - function connect(config: ConnectionConfig, forbidWs: boolean, forbidNonLocalWs: boolean, forbidWss: boolean): Connection { - let connection: WebSocket; - + function connect(config: ConnectionConfig, forbidWs: boolean, forbidNonLocalWs: boolean, forbidWss: boolean, forbidWebRTC: boolean): Connection { // Attempt to parse the multiaddress. // TODO: remove support for `/wss` in a long time (https://github.com/paritytech/smoldot/issues/1940) const wsParsed = config.address.match(/^\/(ip4|ip6|dns4|dns6|dns)\/(.*?)\/tcp\/(.*?)\/(ws|wss|tls\/ws)$/); + const webRTCParsed = config.address.match(/^\/(ip4|ip6)\/(.*?)\/udp\/(.*?)\/webrtc\/certhash\/(.*?)$/); + if (wsParsed != null) { + let connection: WebSocket; + const proto = (wsParsed[4] == 'ws') ? 'ws' : 'wss'; if ( (proto == 'ws' && forbidWs) || @@ -121,23 +113,285 @@ function trustedBase64Decode(base64: string): Uint8Array { config.onMessage(new Uint8Array(msg.data as ArrayBuffer)); }; + return { + close: (): void => { + connection.onopen = null; + connection.onclose = null; + connection.onmessage = null; + connection.onerror = null; + connection.close(); + }, + + send: (data: Uint8Array): void => { + connection.send(data); + }, + + openOutSubstream: () => { throw new Error('Wrong connection type') } + }; + } else if (webRTCParsed != null) { + const targetPort = webRTCParsed[3]; + if (forbidWebRTC || targetPort === '0') { + throw new ConnectionError('Connection type not allowed'); + } + + const ipVersion = webRTCParsed[1] == 'ip4'? '4' : '6'; + const targetIp = webRTCParsed[2]; + const remoteCertMultibase = webRTCParsed[4]!; + + // The payload of `/certhash` is the hash of the self-generated certificate that the + // server presents. + // This function throws an exception if the certhash isn't correct. For this reason, this call + // is performed as part of the parsing of the multiaddr. + const remoteCertMultihash = multibaseBase64Decode(remoteCertMultibase); + const remoteCertSha256Hash = multihashToSha256(remoteCertMultihash); + + let pc: RTCPeerConnection | null = null; + const dataChannels = new Map(); + // TODO: this system is a complete hack + let isFirstSubstream = true; + // The opening of the connection is asynchronous. If smoldot calls `close` in the meanwhile, + // this variable is set to `true`, and we interrupt the opening. + let cancelOpening = false; + + // Function that configures a newly-opened channel and adds it to the map. Used for both + // inbound and outbound substreams. + const addChannel = (dataChannel: RTCDataChannel, direction: 'inbound' | 'outbound') => { + const dataChannelId = dataChannel.id!; + + dataChannel.onopen = () => { + config.onStreamOpened(dataChannelId, direction); + }; + + dataChannel.onerror = (_error) => { + config.onStreamClose(dataChannelId); + }; + + dataChannel.onclose = () => { + config.onStreamClose(dataChannelId); + }; + + dataChannel.onmessage = (m) => { + // The `data` field is an `ArrayBuffer`. + config.onMessage(new Uint8Array(m.data), dataChannelId); + } + + dataChannels.set(dataChannelId, dataChannel); + } + + // It is possible for the browser to use multiple different certificates. + // In order for our local certificate to be deterministic, we need to generate it manually and + // set it explicitly as part of the configuration. + // According to , + // browsers are guaranteed to support `{ name: "ECDSA", namedCurve: "P-256" }`. + RTCPeerConnection.generateCertificate({ name: "ECDSA", namedCurve: "P-256", hash: "SHA-256" } as EcKeyGenParams).then((localCertificate) => { + if (cancelOpening) + return; + + // We need to build the multihash corresponding to the local certificate. + let localTlsCertificateMultihash: Uint8Array | null = null; + for (const { algorithm, value } of localCertificate.getFingerprints()) { + if (algorithm === 'sha-256') { + localTlsCertificateMultihash = new Uint8Array(34); + localTlsCertificateMultihash.set([0x12, 32], 0); + localTlsCertificateMultihash.set(value!.split(':').map((s) => parseInt(s, 16)), 2); + break; + } + } + if (localTlsCertificateMultihash === null) { + // Because we've already returned from the `connect` function at this point, we pretend + // that the connection has failed to open. + config.onConnectionClose('Failed to obtain the browser certificate fingerprint'); + return; + } + + // Create a new WebRTC connection. + pc = new RTCPeerConnection({ certificates: [localCertificate] }); + + // `onconnectionstatechange` is used to detect when the connection has closed or has failed + // to open. + // Note that smoldot will think that the connection is open even when it is still opening. + // Therefore we don't care about events concerning the fact that the connection is now fully + // open. + pc.onconnectionstatechange = (_event) => { + if (pc!.connectionState == "closed" || pc!.connectionState == "disconnected" || pc!.connectionState == "failed") { + config.onConnectionClose("WebRTC state transitioned to " + pc!.connectionState); + + pc!.onconnectionstatechange = null; + pc!.onnegotiationneeded = null; + pc!.ondatachannel = null; + + for (const channel of Array.from(dataChannels.values())) { + channel.onopen = null; + channel.onerror = null; + channel.onclose = null; + channel.onmessage = null; + } + + pc!.close(); // Not necessarily necessary, but it doesn't hurt to do so. + dataChannels.clear(); + } + }; + + pc.onnegotiationneeded = async (_event) => { + // Create a new offer and set it as local description. + let sdpOffer = (await pc!.createOffer()).sdp!; + // According to the libp2p WebRTC spec, the ufrag and pwd are the same + // randomly-generated string. We modify the local description to ensure that. + const pwd = sdpOffer.match(/^a=ice-pwd:(.+)$/m); + if (pwd != null) { + sdpOffer = sdpOffer.replace(/^a=ice-ufrag.*$/m, 'a=ice-ufrag:' + pwd[1]); + } else { + console.error("Failed to set ufrag to pwd. WebRTC connections will likely fail. Please report this issues."); + } + await pc!.setLocalDescription({ type: 'offer', sdp: sdpOffer }); + + // Transform certificate hash into fingerprint (upper-hex; each byte separated by ":"). + const fingerprint = Array.from(remoteCertSha256Hash).map((n) => ("0" + n.toString(16)).slice(-2).toUpperCase()).join(':'); + + // Note that the trailing line feed is important, as otherwise Chrome + // fails to parse the payload. + const remoteSdp = + // Version of the SDP protocol. Always 0. (RFC8866) + "v=0" + "\n" + + // Identifies the creator of the SDP document. We are allowed to use dummy values + // (`-` and `0.0.0.0`) to remain anonymous, which we do. Note that "IN" means + // "Internet". (RFC8866) + "o=- 0 0 IN IP" + ipVersion + " " + targetIp + "\n" + + // Name for the session. We are allowed to pass a dummy `-`. (RFC8866) + "s=-" + "\n" + + // Start and end of the validity of the session. `0 0` means that the session never + // expires. (RFC8866) + "t=0 0" + "\n" + + // A lite implementation is only appropriate for devices that will + // *always* be connected to the public Internet and have a public + // IP address at which it can receive packets from any + // correspondent. ICE will not function when a lite implementation + // is placed behind a NAT (RFC8445). + "a=ice-lite" + "\n" + + // A `m=` line describes a request to establish a certain protocol. + // The protocol in this line (i.e. `TCP/DTLS/SCTP` or `UDP/DTLS/SCTP`) must always be + // the same as the one in the offer. We know that this is true because we tweak the + // offer to match the protocol. + // The `` component must always be `pc-datachannel` for WebRTC. + // The rest of the SDP payload adds attributes to this specific media stream. + // RFCs: 8839, 8866, 8841 + "m=application " + targetPort + " " + "UDP/DTLS/SCTP webrtc-datachannel" + "\n" + + // Indicates the IP address of the remote. + // Note that "IN" means "Internet". + "c=IN IP" + ipVersion + " " + targetIp + "\n" + + // Media ID - uniquely identifies this media stream (RFC9143). + "a=mid:0" + "\n" + + // Indicates that we are complying with RFC8839 (as oppposed to the legacy RFC5245). + "a=ice-options:ice2" + "\n" + + // ICE username and password, which are used for establishing and + // maintaining the ICE connection. (RFC8839) + // MUST match ones used by the answerer (server). + "a=ice-ufrag:" + remoteCertMultibase + "\n" + + "a=ice-pwd:" + remoteCertMultibase + "\n" + + // Fingerprint of the certificate that the server will use during the TLS + // handshake. (RFC8122) + // MUST be derived from the certificate used by the answerer (server). + "a=fingerprint:sha-256 " + fingerprint + "\n" + + // Indicates that the remote DTLS server will only listen for incoming + // connections. (RFC5763) + // The answerer (server) MUST not be located behind a NAT (RFC6135). + "a=setup:passive" + "\n" + + // The SCTP port (RFC8841) + // Note it's different from the "m=" line port value, which + // indicates the port of the underlying transport-layer protocol + // (UDP or TCP) + "a=sctp-port:5000" + "\n" + + // The maximum SCTP user message size (in bytes) (RFC8841) + "a=max-message-size:100000" + "\n" + + // A transport address for a candidate that can be used for connectivity checks (RFC8839). + "a=candidate:1 1 UDP 1 " + targetIp + " " + targetPort + " typ host" + "\n"; + + await pc!.setRemoteDescription({ type: "answer", sdp: remoteSdp }); + }; + + pc.ondatachannel = ({ channel }) => { + addChannel(channel, 'inbound') + }; + + // Creating a `RTCPeerConnection` doesn't actually do anything before a channel is created. + // The connection is therefore immediately reported as opened to smoldot so that it starts + // opening substreams. + // One concern might be that smoldot will think that the remote is reachable at this address + // (because we report the connection as being open) even when it might not be the case. + // However, WebRTC has a handshake to perform, and smoldot will only consider a connection + // as "actually open" once the handshake has finished. + config.onOpen({ + type: 'multi-stream', + handshake: 'webrtc', + localTlsCertificateMultihash, + remoteTlsCertificateMultihash: remoteCertMultihash + }); + }); + + return { + close: (streamId: number | undefined): void => { + // If `streamId` is undefined, then the whole connection must be destroyed. + if (streamId === undefined) { + // The `RTCPeerConnection` is created at the same time as we report the connection as + // being open. It is however possible for smoldot to cancel the opening, in which case + // `pc` will still be undefined. + if (!pc) { + cancelOpening = true; + return; + } + + pc.onconnectionstatechange = null; + pc.onnegotiationneeded = null; + pc.ondatachannel = null; + + for (const channel of Array.from(dataChannels.values())) { + channel.onopen = null; + channel.onerror = null; + channel.onclose = null; + channel.onmessage = null; + } + + pc.close(); + dataChannels.clear(); + + } else { + const channel = dataChannels.get(streamId)!; + channel.onopen = null; + channel.onerror = null; + channel.onclose = null; + channel.onmessage = null; + channel.close(); + dataChannels.delete(streamId); + } + }, + + send: (data: Uint8Array, streamId: number): void => { + dataChannels.get(streamId)!.send(data); + }, + + openOutSubstream: () => { + // `openOutSubstream` can only be called after we have called `config.onOpen`, therefore + // `pc` is guaranteed to be non-null. + if (isFirstSubstream) { + isFirstSubstream = false; + addChannel(pc!.createDataChannel("data", { id: 1, negotiated: true }), 'outbound') + } else { + addChannel(pc!.createDataChannel("data"), 'outbound') + } + } + }; } else { throw new ConnectionError('Unrecognized multiaddr format'); } +} - return { - close: (): void => { - connection.onopen = null; - connection.onclose = null; - connection.onmessage = null; - connection.onerror = null; - connection.close(); - }, - - send: (data: Uint8Array): void => { - connection.send(data); - }, +/// Parses a multihash-multibase-encoded string into a SHA256 hash. +/// +/// Throws an exception if the multihash algorithm isn't SHA256. +const multihashToSha256 = (certMultihash: Uint8Array): Uint8Array => { + if (certMultihash.length != 34 || certMultihash[0] != 0x12 || certMultihash[1] != 32) { + throw new Error('Certificate multihash is not SHA-256'); + } - openOutSubstream: () => { throw new Error('Wrong connection type') } - }; + return new Uint8Array(certMultihash.slice(2)); } diff --git a/bin/wasm-node/javascript/src/instance/bindings-smoldot-light.ts b/bin/wasm-node/javascript/src/instance/bindings-smoldot-light.ts index efd832729a..3519258f93 100644 --- a/bin/wasm-node/javascript/src/instance/bindings-smoldot-light.ts +++ b/bin/wasm-node/javascript/src/instance/bindings-smoldot-light.ts @@ -334,12 +334,12 @@ export default function (config: Config): { imports: WebAssembly.ModuleImports, break } case 'multi-stream': { - const bufferLen = 2 + info.localTlsCertificateMultihash.length + info.remoteTlsCertificateMultihash.length; + const bufferLen = 1 + info.localTlsCertificateMultihash.length + info.remoteTlsCertificateMultihash.length; const ptr = instance.exports.alloc(bufferLen) >>> 0; const mem = new Uint8Array(instance.exports.memory.buffer); buffer.writeUInt8(mem, ptr, 0); mem.set(info.localTlsCertificateMultihash, ptr + 1) - mem.set(info.remoteTlsCertificateMultihash, ptr + info.localTlsCertificateMultihash.length) + mem.set(info.remoteTlsCertificateMultihash, ptr + 1 + info.localTlsCertificateMultihash.length) instance.exports.connection_open_multi_stream(connectionId, ptr, bufferLen); break } diff --git a/bin/wasm-node/rust/src/bindings.rs b/bin/wasm-node/rust/src/bindings.rs index eb26bec329..7475a7d208 100644 --- a/bin/wasm-node/rust/src/bindings.rs +++ b/bin/wasm-node/rust/src/bindings.rs @@ -512,6 +512,7 @@ pub extern "C" fn stream_message(connection_id: u32, stream_id: u32, ptr: u32, l /// For the `outbound` parameter, pass `0` if the substream has been opened by the remote, and any /// value other than `0` if the substream has been opened in response to a call to /// [`connection_stream_open`]. +#[no_mangle] pub extern "C" fn connection_stream_opened(connection_id: u32, stream_id: u32, outbound: u32) { crate::platform::connection_stream_opened(connection_id, stream_id, outbound) } diff --git a/bin/wasm-node/rust/src/platform.rs b/bin/wasm-node/rust/src/platform.rs index 3898083c38..628192de3b 100644 --- a/bin/wasm-node/rust/src/platform.rs +++ b/bin/wasm-node/rust/src/platform.rs @@ -571,7 +571,7 @@ pub(crate) fn connection_open_multi_stream( }, )), )(&handshake_ty[..]) - .unwrap(); + .expect("invalid handshake type provided to connection_open_multi_stream"); let mut lock = STATE.try_lock().unwrap(); let connection = lock.connections.get_mut(&connection_id).unwrap(); diff --git a/src/libp2p/collection.rs b/src/libp2p/collection.rs index f4171ebd19..7b55b84e60 100644 --- a/src/libp2p/collection.rs +++ b/src/libp2p/collection.rs @@ -428,7 +428,7 @@ where let _previous_value = self.connections.insert( connection_id, Connection { - state: InnerConnectionState::Established, + state: InnerConnectionState::Handshaking, user_data, }, ); diff --git a/src/libp2p/collection/multi_stream.rs b/src/libp2p/collection/multi_stream.rs index ae7711ddf4..bfadab6824 100644 --- a/src/libp2p/collection/multi_stream.rs +++ b/src/libp2p/collection/multi_stream.rs @@ -560,29 +560,29 @@ where /// Notifies the state machine that a new substream has been opened. /// - /// `inbound` indicates whether the substream has been opened by the remote (`true`) or - /// locally (`false`). + /// `outbound` indicates whether the substream has been opened by the remote (`false`) or + /// locally (`true`). /// - /// If `inbound` is `false`, then the value returned by + /// If `outbound` is `true`, then the value returned by /// [`MultiStreamConnectionTask::desired_outbound_substreams`] will decrease by one. /// /// # Panic /// /// Panics if there already exists a substream with an identical identifier. /// - pub fn add_substream(&mut self, id: TSubId, inbound: bool) { + pub fn add_substream(&mut self, id: TSubId, outbound: bool) { match &mut self.connection { MultiStreamConnectionTaskInner::Handshake { opened_substream: ref mut opened_substream @ None, .. - } if !inbound => { + } if outbound => { *opened_substream = Some(id); } MultiStreamConnectionTaskInner::Handshake { .. } => { // TODO: protocol has been violated, reset the connection? } MultiStreamConnectionTaskInner::Established { established, .. } => { - established.add_substream(id, inbound) + established.add_substream(id, outbound) } MultiStreamConnectionTaskInner::ShutdownAcked { .. } | MultiStreamConnectionTaskInner::ShutdownWaitingAck { .. } => { @@ -604,26 +604,23 @@ where /// > **Note**: An example situation is: a notification is queued, which leads to a message /// > being sent to a connection task, which, once injected, leads to a notifications /// > substream being "ready" because it needs to send more data. + // TODO: this function really should be more precise as to what a ready substream means pub fn ready_substreams(&self) -> impl Iterator { match &self.connection { MultiStreamConnectionTaskInner::Handshake { opened_substream: Some(opened_substream), - handshake, .. - } => { - let iter = if handshake.as_ref().unwrap().ready_to_write() { - Some(opened_substream) - } else { - None - } - .into_iter(); - either::Right(either::Left(iter)) - } - MultiStreamConnectionTaskInner::Established { established, .. } => { - // Note that the handshake substream is never ready as it never has anything - // to write after the end of the handshake. - either::Left(established.ready_substreams()) - } + } => either::Right(either::Left(iter::once(opened_substream))), + MultiStreamConnectionTaskInner::Established { + established, + handshake_substream, + .. + } => either::Left( + handshake_substream + .as_ref() + .into_iter() + .chain(established.ready_substreams()), + ), MultiStreamConnectionTaskInner::Handshake { opened_substream: None, .. diff --git a/src/libp2p/connection/established/multi_stream.rs b/src/libp2p/connection/established/multi_stream.rs index 70811092d6..c5487d0030 100644 --- a/src/libp2p/connection/established/multi_stream.rs +++ b/src/libp2p/connection/established/multi_stream.rs @@ -178,18 +178,18 @@ where /// Notifies the state machine that a new substream has been opened. /// - /// `inbound` indicates whether the substream has been opened by the remote (`true`) or - /// locally (`false`). + /// `outbound` indicates whether the substream has been opened by the remote (`false`) or + /// locally (`true`). /// - /// If `inbound` is `false`, then the value returned by + /// If `outbound` is `true`, then the value returned by /// [`MultiStream::desired_outbound_substreams`] will decrease by one. /// /// # Panic /// /// Panics if there already exists a substream with an identical identifier. /// - pub fn add_substream(&mut self, id: TSubId, inbound: bool) { - let (substream, out_substream_id) = if inbound { + pub fn add_substream(&mut self, id: TSubId, outbound: bool) { + let (substream, out_substream_id) = if !outbound { let out_substream_id = self.next_out_substream_id; self.next_out_substream_id += 1; diff --git a/src/libp2p/connection/noise.rs b/src/libp2p/connection/noise.rs index 6f8763d8fa..fd2e37c58d 100644 --- a/src/libp2p/connection/noise.rs +++ b/src/libp2p/connection/noise.rs @@ -647,12 +647,6 @@ impl HandshakeInProgress { } } - /// Returns `true` if the Noise handshake is waiting to write out data. Returns `false` if - /// instead it is blocked on incoming data. - pub fn ready_to_write(&self) -> bool { - !self.tx_buffer_encrypted.is_empty() - } - /// Feeds data coming from a socket and outputs data to write to the socket. /// /// On success, returns the new state of the negotiation. diff --git a/src/network/service.rs b/src/network/service.rs index e62344a10f..590cba1def 100644 --- a/src/network/service.rs +++ b/src/network/service.rs @@ -1084,13 +1084,6 @@ where } } - // Because multi-stream connections are considered as having immediately finished their - // handshake, we mark the address as connected. - if let Some(KBucketsPeer { addresses, .. }) = self.kbuckets_peers.get_mut(expected_peer_id) - { - addresses.set_connected(multiaddr); - } - self.pending_ids.remove(id.0); (connection_id, connection_task)