Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: initial implementation #2

Merged
merged 35 commits into from
Sep 7, 2022
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
5764e5d
minimal implementation
wemeetagain Jun 7, 2022
22da7f7
Add keepalive
wemeetagain Jun 8, 2022
768fb43
Implement maxIncomingStreams functionality
wemeetagain Jun 8, 2022
7d5199a
maxMessageSize, initialWindowSize functionality
wemeetagain Jun 8, 2022
c4330e7
Add a comment
wemeetagain Jun 8, 2022
1cce27d
Clean up config object
wemeetagain Jun 8, 2022
af28760
Clean up close
wemeetagain Jun 8, 2022
5280899
More tweaks
wemeetagain Jun 8, 2022
233c7fa
Add readme
wemeetagain Jun 9, 2022
d2f54cd
remove todos
wemeetagain Jun 9, 2022
ddeb046
Add some tests
wemeetagain Jun 16, 2022
5d38188
Add missing dev dependency
wemeetagain Jun 16, 2022
6e25085
Fix window overflow test
wemeetagain Jun 16, 2022
708c5ef
Update interfaces
wemeetagain Jun 16, 2022
c18e25c
Fix some tests
wemeetagain Jun 16, 2022
d5b3259
Tweak keepalive test
wemeetagain Jun 16, 2022
0a067e6
No keepalive in compliance tests
wemeetagain Jun 17, 2022
305fea9
Remove unneeded parts of the package.json
wemeetagain Jun 17, 2022
bb0e4b5
Add maxIncomingStreams config check
wemeetagain Jun 17, 2022
764fefa
Add maxOutgoingStreams
wemeetagain Jun 17, 2022
43e2256
Update config property names
wemeetagain Jun 17, 2022
85824c5
Add more comments
wemeetagain Jun 17, 2022
cc3e29f
Add to gitignore
wemeetagain Jun 17, 2022
1324aa1
Add sanity check to decoder
wemeetagain Jun 18, 2022
4444b24
Add decode unit tests
wemeetagain Jun 18, 2022
ca68cc9
More comments
wemeetagain Jun 18, 2022
1f043b8
Tweak test
wemeetagain Jun 18, 2022
60d41e8
Tweak muxer factory
wemeetagain Jun 19, 2022
9766631
Update muxer.close
wemeetagain Jun 20, 2022
ccdbd95
Add `direction` to YamuxMuxerInit
wemeetagain Jun 20, 2022
23ef45a
Review tweaks
wemeetagain Jul 4, 2022
db592b9
Update libp2p dependencies
wemeetagain Jul 4, 2022
b351a09
Add some benchmarks
wemeetagain Jul 4, 2022
e0b6579
tweak benchmark
wemeetagain Jul 6, 2022
a3c2d37
chore: update libp2p dependencies
wemeetagain Sep 1, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ lib-cov

# Coverage directory used by tools like istanbul
coverage
.coverage

# Grunt intermediate storage (http://gruntjs.com/creating-plugins#storing-task-files)
.grunt
Expand Down
101 changes: 101 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# js-libp2p-yamux

[![](https://img.shields.io/badge/made%20by-ChainSafe%20Systems-blue.svg?style=flat-square)](http://chainsafe.io)
[![](https://img.shields.io/badge/project-libp2p-yellow.svg?style=flat-square)](http://libp2p.io/)
[![](https://img.shields.io/codecov/c/github/chainsafe/js-libp2p-yamux.svg?style=flat-square)](https://codecov.io/gh/chainsafe/js-libp2p-yamux)
[![Build Status](https://github.com/chainsafe/js-libp2p-yamux/actions/workflows/js-test-and-release.yml/badge.svg?branch=master)](https://github.com/chainsafe/js-libp2p-yamux/actions/workflows/js-test-and-release.yml)
[![js-standard-style](https://img.shields.io/badge/code%20style-standard-brightgreen.svg?style=flat-square)](https://github.com/feross/standard)
![](https://img.shields.io/badge/npm-%3E%3D7.0.0-orange.svg?style=flat-square)
![](https://img.shields.io/badge/Node.js-%3E%3D16.0.0-orange.svg?style=flat-square)

> JavaScript implementation of [yamux](https://github.com/hashicorp/yamux/blob/master/spec.md).

[![](https://github.com/libp2p/js-libp2p-interfaces/raw/master/packages/libp2p-interfaces/src/stream-muxer/img/badge.png)](https://github.com/libp2p/js-libp2p-interfaces/tree/master/packages/libp2p-interfaces/src/stream-muxer)

## Install

```sh
npm install @chainsafe/libp2p-yamux
```

## Usage

```js
import { YamuxMuxer } from '@chainsafe/libp2p-yamux'
import { Components } from '@libp2p/interfaces/components'
import { pipe } from 'it-pipe'
import { duplexPair } from 'it-pair/duplex'
import all from 'it-all'

// Connect two yamux muxers to demo basic stream multiplexing functionality

const clientMuxer = new YamuxMuxer(new Components(), {
client: true,
onIncomingStream: stream => {
// echo data on incoming streams
pipe(stream, stream)
},
onStreamEnd: stream => {
// do nothing
}
})

const serverMuxer = new YamuxMuxer(new Components(), {
client: false,
onIncomingStream: stream => {
// echo data on incoming streams
pipe(stream, stream)
},
onStreamEnd: stream => {
// do nothing
}
})

// `p` is our "connections", what we use to connect the two sides
// In a real application, a connection is usually to a remote computer
const p = duplexPair()

// connect the muxers together
pipe(p[0], clientMuxer, p[0])
pipe(p[1], serverMuxer, p[1])

// now either side can open streams
const stream0 = clientMuxer.newStream()
const stream1 = serverMuxer.newStream()

// Send some data to the other side
const encoder = new TextEncoder()
const data = [encoder.encode('hello'), encoder.encode('world')]
pipe(data, stream0)

// Receive data back
const result = await pipe(stream0, all)

// close a stream
stream1.close()

// close the muxer
clientMuxer.close()
```

## API

This library implements the `StreamMuxerFactory`, `StreamMuxer` and `Stream` interfaces defined in [`@libp2p/interfaces/stream-muxer`](https://github.com/libp2p/js-libp2p-interfaces/tree/master/packages/libp2p-interfaces/src/stream-muxer).

## Contribute

The libp2p implementation in JavaScript is a work in progress. As such, there are a few things you can do right now to help out:

- Go through the modules and **check out existing issues**. This is especially useful for modules in active development. Some knowledge of IPFS/libp2p may be required, as well as the infrastructure behind it - for instance, you may need to read up on p2p and more complex operations like muxing to be able to help technically.
- **Perform code reviews**. More eyes will help a) speed the project along b) ensure quality and c) reduce possible future bugs.

## License

Licensed under either of

* Apache 2.0, ([LICENSE-APACHE](LICENSE-APACHE) / http://www.apache.org/licenses/LICENSE-2.0)
* MIT ([LICENSE-MIT](LICENSE-MIT) / http://opensource.org/licenses/MIT)

### Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.
109 changes: 109 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
{
"name": "@chainsafe/libp2p-yamux",
"version": "0.1.0",
"description": "Yamux stream multiplexer for libp2p",
"license": "Apache-2.0 OR MIT",
"homepage": "https://github.com/ChainSafe/js-libp2p-yamux#readme",
"repository": {
"type": "git",
"url": "git+https://github.com/ChainSafe/js-libp2p-yamux.git"
},
"bugs": {
"url": "https://github.com/ChainSafe/js-libp2p-yamux/issues"
},
"keywords": [
"IPFS",
"libp2p",
"stream",
"multiplexer",
"muxer"
],
"engines": {
"node": ">=16.0.0",
"npm": ">=7.0.0"
},
"type": "module",
"types": "./dist/src/index.d.ts",
"typesVersions": {
"*": {
"*": [
"*",
"dist/*",
"dist/src/*",
"dist/src/*/index"
],
"src/*": [
"*",
"dist/*",
"dist/src/*",
"dist/src/*/index"
]
}
},
"files": [
"src",
"dist/src",
"!dist/test",
"!**/*.tsbuildinfo"
],
"exports": {
".": {
"import": "./dist/src/index.js"
},
"./config": {
"import": "./dist/src/config.js"
},
"./stream": {
"import": "./dist/src/stream.js"
}
},
"eslintConfig": {
"extends": "ipfs",
"parserOptions": {
"sourceType": "module"
},
"ignorePatterns": [
"src/*.d.ts"
]
},
"scripts": {
"clean": "aegir clean",
"lint": "aegir lint",
"dep-check": "aegir dep-check",
"benchmark": "benchmark dist/test/bench/*.bench.js --timeout 400000",
"build": "aegir build",
"test": "aegir test",
"test:chrome": "aegir test -t browser",
"test:chrome-webworker": "aegir test -t webworker",
"test:firefox": "aegir test -t browser -- --browser firefox",
"test:firefox-webworker": "aegir test -t webworker -- --browser firefox",
"test:node": "aegir test -t node --cov",
"test:electron-main": "aegir test -t electron-main"
},
"dependencies": {
"@libp2p/components": "^2.0.1",
"@libp2p/interface-connection": "^2.1.1",
"@libp2p/interface-stream-muxer": "^2.0.1",
"@libp2p/logger": "^2.0.0",
"@libp2p/tracked-map": "^2.0.1",
"abortable-iterator": "^4.0.2",
"any-signal": "^3.0.1",
"err-code": "^3.0.1",
"iso-random-stream": "^2.0.0",
"it-pipe": "^2.0.3",
"it-pushable": "^3.0.0",
"multiformats": "^9.6.5",
"uint8arraylist": "^1.5.1",
"uint8arrays": "^3.0.0"
},
"devDependencies": {
"@dapplion/benchmark": "^0.2.2",
"@libp2p/interface-stream-muxer-compliance-tests": "^3.0.1",
"@libp2p/mplex": "^4.0.0",
"aegir": "^37.3.0",
"it-drain": "^1.0.5",
"it-pair": "2.0.2",
"it-stream-types": "^1.0.4"
},
"browser": {}
}
90 changes: 90 additions & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import { logger, Logger } from '@libp2p/logger'
import errcode from 'err-code'
import { ERR_INVALID_CONFIG, INITIAL_STREAM_WINDOW, MAX_STREAM_WINDOW } from './constants.js'

// TOOD use config items or delete them
export interface Config {
/**
* Used to control the log destination
*
* It can be disabled by explicitly setting to `undefined`
*/
log?: Logger

/**
* Used to do periodic keep alive messages using a ping.
*/
enableKeepAlive: boolean

/**
* How often to perform the keep alive
*
* measured in milliseconds
*/
keepAliveInterval: number

/**
* Maximum number of concurrent inbound streams that we accept.
* If the peer tries to open more streams, those will be reset immediately.
*/
maxInboundStreams: number

/**
* Maximum number of concurrent outbound streams that we accept.
* If the application tries to open more streams, the call to `newStream` will throw
*/
maxOutboundStreams: number

/**
* Used to control the initial window size that we allow for a stream.
*
* measured in bytes
*/
initialStreamWindowSize: number

/**
* Used to control the maximum window size that we allow for a stream.
*/
maxStreamWindowSize: number

/**
* Maximum size of a message that we'll send on a stream.
* This ensures that a single stream doesn't hog a connection.
*/
maxMessageSize: number
}

export const defaultConfig: Config = {
log: logger('libp2p:yamux'),
enableKeepAlive: true,
keepAliveInterval: 30_000,
maxInboundStreams: 1_000,
maxOutboundStreams: 1_000,
initialStreamWindowSize: INITIAL_STREAM_WINDOW,
maxStreamWindowSize: MAX_STREAM_WINDOW,
maxMessageSize: 64 * 1024
}

export function verifyConfig (config: Config): void {
wemeetagain marked this conversation as resolved.
Show resolved Hide resolved
if (config.keepAliveInterval <= 0) {
throw errcode(new Error('keep-alive interval must be positive'), ERR_INVALID_CONFIG)
}
if (config.maxInboundStreams < 0) {
throw errcode(new Error('max inbound streams must be larger or equal 0'), ERR_INVALID_CONFIG)
}
if (config.maxOutboundStreams < 0) {
throw errcode(new Error('max outbound streams must be larger or equal 0'), ERR_INVALID_CONFIG)
}
if (config.initialStreamWindowSize < INITIAL_STREAM_WINDOW) {
throw errcode(new Error('InitialStreamWindowSize must be larger or equal 256 kB'), ERR_INVALID_CONFIG)
}
if (config.maxStreamWindowSize < config.initialStreamWindowSize) {
throw errcode(new Error('MaxStreamWindowSize must be larger than the InitialStreamWindowSize'), ERR_INVALID_CONFIG)
}
if (config.maxStreamWindowSize > 2 ** 32 - 1) {
throw errcode(new Error('MaxStreamWindowSize must be less than equal MAX_UINT32'), ERR_INVALID_CONFIG)
}
if (config.maxMessageSize < 1024) {
throw errcode(new Error('MaxMessageSize must be greater than a kilobyte'), ERR_INVALID_CONFIG)
}
}
41 changes: 41 additions & 0 deletions src/constants.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Protocol violation errors

export const ERR_INVALID_FRAME = 'ERR_INVALID_FRAME'
export const ERR_UNREQUESTED_PING = 'ERR_UNREQUESTED_PING'
export const ERR_NOT_MATCHING_PING = 'ERR_NOT_MATCHING_PING'
export const ERR_STREAM_ALREADY_EXISTS = 'ERR_STREAM_ALREADY_EXISTS'
export const ERR_DECODE_INVALID_VERSION = 'ERR_DECODE_INVALID_VERSION'
export const ERR_BOTH_CLIENTS = 'ERR_BOTH_CLIENTS'
export const ERR_RECV_WINDOW_EXCEEDED = 'ERR_RECV_WINDOW_EXCEEDED'

export const PROTOCOL_ERRORS = new Set([
ERR_INVALID_FRAME,
ERR_UNREQUESTED_PING,
ERR_NOT_MATCHING_PING,
ERR_STREAM_ALREADY_EXISTS,
ERR_DECODE_INVALID_VERSION,
ERR_BOTH_CLIENTS,
ERR_RECV_WINDOW_EXCEEDED
])

// local errors

export const ERR_INVALID_CONFIG = 'ERR_INVALID_CONFIG'
export const ERR_MUXER_LOCAL_CLOSED = 'ERR_MUXER_LOCAL_CLOSED'
export const ERR_MUXER_REMOTE_CLOSED = 'ERR_MUXER_REMOTE_CLOSED'
export const ERR_STREAM_RESET = 'ERR_STREAM_RESET'
export const ERR_STREAM_ABORT = 'ERR_STREAM_ABORT'
export const ERR_MAX_OUTBOUND_STREAMS_EXCEEDED = 'ERROR_MAX_OUTBOUND_STREAMS_EXCEEDED'
export const ERR_DECODE_IN_PROGRESS = 'ERR_DECODE_IN_PROGRESS'

/**
* INITIAL_STREAM_WINDOW is the initial stream window size.
*
* Not an implementation choice, this is defined in the specification
*/
export const INITIAL_STREAM_WINDOW = 256 * 1024

/**
* Default max stream window
*/
export const MAX_STREAM_WINDOW = 16 * 1024 * 1024
Loading