Skip to content

Commit

Permalink
deps!: update all deps to support no-copy operations (#1335)
Browse files Browse the repository at this point in the history
Updates all deps needed to support passing lists of byte arrays where they have been created from multiple input buffers.

When reading multiplexed data, all messages arrive in length-prefixed buffers, which means the first few bytes tell the consumer how many bytes long next chunk will be.

One length prefixed chunk can be delivered in several payloads from the underlying network transport. The first payload can also include the length prefix and some or all of the data, so we stitch these together in a `Uint8ArrayList` to avoid having to concatenate `Uint8Array`s together.

Previously once we'd received enough bytes to satisfy the length prefix we'd concatenate the bytes together, but this is a potentially expensive operation where transports have small message sizes so instead just pass the `Uint8ArrayList` to the consumer and let them decide wether to concatenate or not as some consumers will be smart enough to operate on lists of `Uint8Array`s instead of always requiring a contiguous block of memory.

BREAKING CHANGE: Streams are now `Duplex<Uint8ArrayList, Uint8ArrayList | Uint8Array>`
  • Loading branch information
achingbrain authored Aug 11, 2022
1 parent 564f4b8 commit f439d9b
Show file tree
Hide file tree
Showing 40 changed files with 626 additions and 200 deletions.
4 changes: 2 additions & 2 deletions .aegir.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { WebSockets } from '@libp2p/websockets'
import { Mplex } from '@libp2p/mplex'
import { NOISE } from '@chainsafe/libp2p-noise'
import { Noise } from '@chainsafe/libp2p-noise'
import { pipe } from 'it-pipe'
import { createFromJSON } from '@libp2p/peer-id-factory'

Expand Down Expand Up @@ -31,7 +31,7 @@ export default {
new Mplex()
],
connectionEncryption: [
NOISE,
new Noise(),
new Plaintext()
],
relay: {
Expand Down
2 changes: 1 addition & 1 deletion examples/connection-encryption/1.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ const createNode = async () => {
stream,
async function (source) {
for await (const msg of source) {
console.log(uint8ArrayToString(msg))
console.log(uint8ArrayToString(msg.subarray()))
}
}
)
Expand Down
6 changes: 3 additions & 3 deletions examples/delegated-routing/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
"version": "0.1.0",
"private": true,
"dependencies": {
"@chainsafe/libp2p-noise": "^7.0.1",
"ipfs-core": "^0.14.1",
"@chainsafe/libp2p-noise": "^8.0.0",
"ipfs-core": "^0.15.4",
"libp2p": "../../",
"@libp2p/delegated-content-routing": "^2.0.1",
"@libp2p/delegated-peer-routing": "^2.0.1",
"@libp2p/kad-dht": "^3.0.0",
"@libp2p/mplex": "^4.0.2",
"@libp2p/mplex": "^5.0.0",
"@libp2p/webrtc-star": "^3.0.0",
"@libp2p/websockets": "^3.0.0",
"react": "^17.0.2",
Expand Down
2 changes: 1 addition & 1 deletion examples/echo/src/dialer.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ async function run() {
// For each chunk of data
for await (const data of source) {
// Output the data
console.log('received echo:', uint8ArrayToString(data))
console.log('received echo:', uint8ArrayToString(data.subarray()))
}
}
)
Expand Down
4 changes: 2 additions & 2 deletions examples/libp2p-in-the-browser/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
},
"license": "ISC",
"dependencies": {
"@chainsafe/libp2p-noise": "^7.0.1",
"@chainsafe/libp2p-noise": "^8.0.0",
"@libp2p/bootstrap": "^2.0.0",
"@libp2p/mplex": "^4.0.2",
"@libp2p/mplex": "^5.0.0",
"@libp2p/webrtc-star": "^3.0.0",
"@libp2p/websockets": "^3.0.0",
"libp2p": "../../"
Expand Down
2 changes: 1 addition & 1 deletion examples/pnet/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ generateKey(otherSwarmKey)
stream,
async function (source) {
for await (const msg of source) {
console.log(uint8ArrayToString(msg))
console.log(uint8ArrayToString(msg.subarray()))
}
}
)
Expand Down
2 changes: 1 addition & 1 deletion examples/protocol-and-stream-muxing/1.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const createNode = async () => {
stream,
async function (source) {
for await (const msg of source) {
console.log(uint8ArrayToString(msg))
console.log(uint8ArrayToString(msg.subarray()))
}
}
)
Expand Down
2 changes: 1 addition & 1 deletion examples/protocol-and-stream-muxing/2.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ const createNode = async () => {
stream,
async function (source) {
for await (const msg of source) {
console.log(`from: ${protocol}, msg: ${uint8ArrayToString(msg)}`)
console.log(`from: ${protocol}, msg: ${uint8ArrayToString(msg.subarray())}`)
}
}
).finally(() => {
Expand Down
4 changes: 2 additions & 2 deletions examples/protocol-and-stream-muxing/3.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ const createNode = async () => {
stream,
async function (source) {
for await (const msg of source) {
console.log(uint8ArrayToString(msg))
console.log(uint8ArrayToString(msg.subarray()))
}
}
)
Expand All @@ -48,7 +48,7 @@ const createNode = async () => {
stream,
async function (source) {
for await (const msg of source) {
console.log(uint8ArrayToString(msg))
console.log(uint8ArrayToString(msg.subarray()))
}
}
)
Expand Down
5 changes: 5 additions & 0 deletions examples/transports/2.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ function printAddrs (node, number) {
node2.handle('/print', async ({ stream }) => {
const result = await pipe(
stream,
async function * (source) {
for await (const list of source) {
yield list.subarray()
}
},
toBuffer
)
console.log(uint8ArrayToString(result))
Expand Down
2 changes: 1 addition & 1 deletion examples/transports/3.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ function print ({ stream }) {
stream,
async function (source) {
for await (const msg of source) {
console.log(uint8ArrayToString(msg))
console.log(uint8ArrayToString(msg.subarray()))
}
}
)
Expand Down
2 changes: 1 addition & 1 deletion examples/transports/4.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ function print ({ stream }) {
stream,
async function (source) {
for await (const msg of source) {
console.log(uint8ArrayToString(msg))
console.log(uint8ArrayToString(msg.subarray()))
}
}
)
Expand Down
4 changes: 2 additions & 2 deletions examples/webrtc-direct/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
"license": "ISC",
"dependencies": {
"@libp2p/webrtc-direct": "^2.0.0",
"@chainsafe/libp2p-noise": "^7.0.3",
"@chainsafe/libp2p-noise": "^8.0.0",
"@libp2p/bootstrap": "^2.0.0",
"@libp2p/mplex": "^4.0.3",
"@libp2p/mplex": "^5.0.0",
"libp2p": "../../",
"wrtc": "^0.4.7"
},
Expand Down
74 changes: 37 additions & 37 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -98,36 +98,36 @@
},
"dependencies": {
"@achingbrain/nat-port-mapper": "^1.0.3",
"@libp2p/components": "^2.0.1",
"@libp2p/connection": "^4.0.0",
"@libp2p/crypto": "^1.0.0",
"@libp2p/interface-address-manager": "^1.0.1",
"@libp2p/interface-connection": "^2.0.0",
"@libp2p/interface-connection-encrypter": "^1.0.2",
"@libp2p/interface-content-routing": "^1.0.1",
"@libp2p/interface-dht": "^1.0.0",
"@libp2p/interface-metrics": "^2.0.0",
"@libp2p/interface-peer-discovery": "^1.0.0",
"@libp2p/interface-peer-id": "^1.0.2",
"@libp2p/interface-peer-info": "^1.0.1",
"@libp2p/interface-peer-routing": "^1.0.0",
"@libp2p/interface-peer-store": "^1.2.0",
"@libp2p/interface-pubsub": "^2.0.0",
"@libp2p/interface-registrar": "^2.0.0",
"@libp2p/interface-stream-muxer": "^2.0.1",
"@libp2p/interface-transport": "^1.0.0",
"@libp2p/interfaces": "^3.0.2",
"@libp2p/components": "^2.0.3",
"@libp2p/connection": "^4.0.1",
"@libp2p/crypto": "^1.0.3",
"@libp2p/interface-address-manager": "^1.0.2",
"@libp2p/interface-connection": "^3.0.1",
"@libp2p/interface-connection-encrypter": "^2.0.1",
"@libp2p/interface-content-routing": "^1.0.2",
"@libp2p/interface-dht": "^1.0.1",
"@libp2p/interface-metrics": "^3.0.0",
"@libp2p/interface-peer-discovery": "^1.0.1",
"@libp2p/interface-peer-id": "^1.0.4",
"@libp2p/interface-peer-info": "^1.0.2",
"@libp2p/interface-peer-routing": "^1.0.1",
"@libp2p/interface-peer-store": "^1.2.1",
"@libp2p/interface-pubsub": "^2.0.1",
"@libp2p/interface-registrar": "^2.0.3",
"@libp2p/interface-stream-muxer": "^2.0.2",
"@libp2p/interface-transport": "^1.0.3",
"@libp2p/interfaces": "^3.0.3",
"@libp2p/logger": "^2.0.0",
"@libp2p/multistream-select": "^2.0.1",
"@libp2p/multistream-select": "^3.0.0",
"@libp2p/peer-collections": "^2.0.0",
"@libp2p/peer-id": "^1.1.10",
"@libp2p/peer-id-factory": "^1.0.9",
"@libp2p/peer-record": "^4.0.0",
"@libp2p/peer-store": "^3.0.0",
"@libp2p/peer-id": "^1.1.15",
"@libp2p/peer-id-factory": "^1.0.18",
"@libp2p/peer-record": "^4.0.1",
"@libp2p/peer-store": "^3.1.2",
"@libp2p/tracked-map": "^2.0.1",
"@libp2p/utils": "^3.0.0",
"@libp2p/utils": "^3.0.1",
"@multiformats/mafmt": "^11.0.2",
"@multiformats/multiaddr": "^10.1.8",
"@multiformats/multiaddr": "^10.3.3",
"abortable-iterator": "^4.0.2",
"any-signal": "^3.0.0",
"datastore-core": "^7.0.0",
Expand All @@ -140,7 +140,7 @@
"it-filter": "^1.0.3",
"it-first": "^1.0.6",
"it-foreach": "^0.1.1",
"it-handshake": "^4.0.0",
"it-handshake": "^4.1.2",
"it-length-prefixed": "^8.0.2",
"it-map": "^1.0.6",
"it-merge": "^1.0.3",
Expand All @@ -156,31 +156,31 @@
"p-retry": "^5.0.0",
"p-settle": "^5.0.0",
"private-ip": "^2.3.3",
"protons-runtime": "^2.0.2",
"protons-runtime": "^3.0.1",
"retimer": "^3.0.0",
"sanitize-filename": "^1.6.3",
"set-delayed-interval": "^1.0.0",
"timeout-abort-controller": "^3.0.0",
"uint8arraylist": "^2.0.0",
"uint8arraylist": "^2.3.2",
"uint8arrays": "^3.0.0",
"wherearewe": "^1.0.0",
"xsalsa20": "^1.1.0"
},
"devDependencies": {
"@chainsafe/libp2p-noise": "^7.0.2",
"@chainsafe/libp2p-noise": "^8.0.0",
"@libp2p/bootstrap": "^2.0.0",
"@libp2p/daemon-client": "^2.0.0",
"@libp2p/daemon-server": "^2.0.0",
"@libp2p/daemon-client": "^2.0.4",
"@libp2p/daemon-server": "^2.0.4",
"@libp2p/delegated-content-routing": "^2.0.1",
"@libp2p/delegated-peer-routing": "^2.0.1",
"@libp2p/floodsub": "^3.0.0",
"@libp2p/interface-compliance-tests": "^3.0.1",
"@libp2p/interface-connection-encrypter-compliance-tests": "^1.0.0",
"@libp2p/interface-mocks": "^3.0.1",
"@libp2p/interface-connection-encrypter-compliance-tests": "^2.0.1",
"@libp2p/interface-mocks": "^4.0.1",
"@libp2p/interop": "^2.0.0",
"@libp2p/kad-dht": "^3.0.0",
"@libp2p/kad-dht": "^3.0.1",
"@libp2p/mdns": "^3.0.0",
"@libp2p/mplex": "^4.0.2",
"@libp2p/mplex": "^5.0.0",
"@libp2p/pubsub": "^3.0.1",
"@libp2p/tcp": "^3.0.0",
"@libp2p/topology": "^3.0.0",
Expand All @@ -205,7 +205,7 @@
"p-event": "^5.0.1",
"p-times": "^4.0.0",
"p-wait-for": "^5.0.0",
"protons": "^4.0.1",
"protons": "^5.0.0",
"rimraf": "^3.0.2",
"sinon": "^14.0.0",
"ts-sinon": "^2.0.2"
Expand Down
4 changes: 2 additions & 2 deletions src/circuit/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import { Multiaddr } from '@multiformats/multiaddr'
import Libp2p from 'libp2p'
import { TCP } from '@libp2p/tcp'
import { Mplex } from '@libp2p/mplex'
import { NOISE } from '@chainsafe/libp2p-noise'
import { Noise } from '@chainsafe/libp2p-noise'

const relayAddr = ...

Expand All @@ -56,7 +56,7 @@ const node = await createLibp2p({
new Mplex()
],
connectionEncryption: [
NOISE
new Noise()
]
},
config: {
Expand Down
8 changes: 5 additions & 3 deletions src/circuit/circuit/hop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import type { Duplex } from 'it-stream-types'
import type { Circuit } from '../transport.js'
import type { ConnectionManager } from '@libp2p/interface-connection-manager'
import type { AbortOptions } from '@libp2p/interfaces'
import type { Uint8ArrayList } from 'uint8arraylist'

const log = logger('libp2p:circuit:hop')

Expand All @@ -24,7 +25,7 @@ export interface HopRequest {
connectionManager: ConnectionManager
}

export async function handleHop (hopRequest: HopRequest) {
export async function handleHop (hopRequest: HopRequest): Promise<void> {
const {
connection,
request,
Expand Down Expand Up @@ -84,7 +85,7 @@ export async function handleHop (hopRequest: HopRequest) {
srcPeer: request.srcPeer
}

let destinationStream: Duplex<Uint8Array>
let destinationStream: Duplex<Uint8ArrayList>
try {
log('performing STOP request')
const result = await stop({
Expand Down Expand Up @@ -128,7 +129,7 @@ export interface HopConfig extends AbortOptions {
* Performs a HOP request to a relay peer, to request a connection to another
* peer. A new, virtual, connection will be created between the two via the relay.
*/
export async function hop (options: HopConfig): Promise<Duplex<Uint8Array>> {
export async function hop (options: HopConfig): Promise<Duplex<Uint8ArrayList, Uint8ArrayList | Uint8Array>> {
const {
connection,
request,
Expand All @@ -151,6 +152,7 @@ export async function hop (options: HopConfig): Promise<Duplex<Uint8Array>> {

if (response.code === CircuitPB.Status.SUCCESS) {
log('hop request was successful')

return streamHandler.rest()
}

Expand Down
3 changes: 2 additions & 1 deletion src/circuit/circuit/stop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { validateAddrs } from './utils.js'
import type { Connection } from '@libp2p/interface-connection'
import type { Duplex } from 'it-stream-types'
import type { AbortOptions } from '@libp2p/interfaces'
import type { Uint8ArrayList } from 'uint8arraylist'

const log = logger('libp2p:circuit:stop')

Expand All @@ -18,7 +19,7 @@ export interface HandleStopOptions {
/**
* Handles incoming STOP requests
*/
export function handleStop (options: HandleStopOptions): Duplex<Uint8Array> | undefined {
export function handleStop (options: HandleStopOptions): Duplex<Uint8ArrayList, Uint8ArrayList | Uint8Array> | undefined {
const {
connection,
request,
Expand Down
4 changes: 2 additions & 2 deletions src/circuit/circuit/stream-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ export interface StreamHandlerOptions {

export class StreamHandler {
private readonly stream: Stream
private readonly shake: Handshake
private readonly shake: Handshake<Uint8ArrayList | Uint8Array>
private readonly decoder: Source<Uint8ArrayList>

constructor (options: StreamHandlerOptions) {
Expand Down Expand Up @@ -56,7 +56,7 @@ export class StreamHandler {
*/
write (msg: CircuitRelay) {
log('write message type %s', msg.type)
this.shake.write(lp.encode.single(CircuitRelay.encode(msg)).slice())
this.shake.write(lp.encode.single(CircuitRelay.encode(msg)))
}

/**
Expand Down
Loading

0 comments on commit f439d9b

Please sign in to comment.