Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
refactor: async iterators (#94)
Browse files Browse the repository at this point in the history
BREAKING CHANGE: All places in the API that used callbacks are now replaced with async/await while pull-streams are replaced with async iterators. The API has also been updated according to the latest `interface-stream-muxer` version, https://github.com/libp2p/interface-stream-muxer/tree/v0.7.0.

License: MIT
Signed-off-by: Alan Shaw <alan.shaw@protocol.ai>
  • Loading branch information
Alan Shaw authored and vasco-santos committed Sep 18, 2019
1 parent 567bddf commit c9bede5
Show file tree
Hide file tree
Showing 28 changed files with 1,384 additions and 1,627 deletions.
35 changes: 0 additions & 35 deletions .aegir.js

This file was deleted.

12 changes: 3 additions & 9 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,13 +1,7 @@
node_modules
coverage
.nyc_output
package-lock.json
yarn.lock
docs

**/node_modules
**/*.log
test/setup/tmp-disposable-nodes-addrs.json
dist
coverage
**/*.swp
examples/sub-module/**/bundle.js
examples/sub-module/**/*-minified.js
examples/sub-module/*-bundle.js
187 changes: 116 additions & 71 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
js-libp2p-mplex
===================
# js-libp2p-mplex

[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://protocol.ai)
[![](https://img.shields.io/badge/project-libp2p-yellow.svg?style=flat-square)](http://libp2p.io/)
Expand All @@ -12,100 +11,146 @@ js-libp2p-mplex
![](https://img.shields.io/badge/npm-%3E%3D6.0.0-orange.svg?style=flat-square)
![](https://img.shields.io/badge/Node.js-%3E%3D10.0.0-orange.svg?style=flat-square)

> JavaScript implementation of https://github.com/libp2p/mplex
> JavaScript implementation of [mplex](https://github.com/libp2p/specs/tree/master/mplex).
[![](https://github.com/libp2p/interface-stream-muxer/raw/master/img/badge.png)](https://github.com/libp2p/interface-stream-muxer)

## Lead Maintainer

[Vasco Santos](https://github.com/vasco-santos).
[Vasco Santos](https://github.com/vasco-santos)

## Install

```sh
npm install libp2p-mplex
```

## Usage

Let's define a `listener.js`, which starts a TCP server on port 9999 and waits for a connection. Once we get a connection, we wait for a stream. And finally, once we have the stream, we pull the data from that stream, and printing it to the console.

```JavaScript
const mplex = require('libp2p-mplex')
const tcp = require('net')
const pull = require('pull-stream')
const toPull = require('stream-to-pull-stream')

const listener = tcp.createServer((socket) => {
console.log('[listener] Got connection!')

const muxer = mplex.listener(toPull(socket))

muxer.on('stream', (stream) => {
console.log('[listener] Got stream!')
pull(
stream,
pull.drain((data) => {
console.log('[listener] Received:')
console.log(data.toString())
})
)
})
})
```js
const Mplex = require('libp2p-mplex')
const pipe = require('it-pipe')

listener.listen(9999, () => {
console.log('[listener] listening on 9999')
const muxer = new Mplex({
onStream: stream => { // Receive a duplex stream from the remote
// ...receive data from the remote and optionally send data back
}
})

pipe(conn, muxer, conn) // conn is duplex connection to another peer

const stream = muxer.newStream() // Create a new duplex stream to the remote

// Use the duplex stream to send some data to the remote...
pipe([1, 2, 3], stream)
```

Now, let's define `dialer.js` who will connect to our `listener` over a TCP socket. Once we have that, we'll put a message in the stream for our `listener`.
## API

```JavaScript
const mplex = require('libp2p-mplex')
const tcp = require('net')
const pull = require('pull-stream')
const toPull = require('stream-to-pull-stream')
### `const muxer = new Mplex([options])`

const socket = tcp.connect(9999)
Create a new _duplex_ stream that can be piped together with a connection in order to allow multiplexed communications.

const muxer = mplex.dialer(toPull(socket))
e.g.

console.log('[dialer] opening stream')
const stream = muxer.newStream((err) => {
console.log('[dialer] opened stream')
if (err) throw err
})
```js
const Mplex = require('libp2p-mplex')
const pipe = require('it-pipe')

pull(
pull.values(['hey, how is it going. I am the dialer']),
stream
)
// Create a duplex muxer
const muxer = new Mplex()

// Use the muxer in a pipeline
pipe(conn, muxer, conn) // conn is duplex connection to another peer
```

Now we can first run `listener.js` and then `dialer.js` to see the
following output:
`options` is an optional `Object` that may have the following properties:

* `onStream` - A function called when receiving a new stream from the remote. e.g.
```js
// Receive a new stream on the muxed connection
const onStream = stream => {
// Read from this stream and write back to it (echo server)
pipe(
stream,
source => (async function * () {
for await (const data of source) yield data
})(),
stream
)
}
const muxer = new Mplex({ onStream })
// ...
```
**Note:** The `onStream` function can be passed in place of the `options` object. i.e.
```js
new Mplex(stream => { /* ... */ })
```
* `signal` - An [`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) which can be used to abort the muxer, _including_ all of it's multiplexed connections. e.g.
```js
const controller = new AbortController()
const muxer = new Mplex({ signal: controller.signal })

pipe(conn, muxer, conn)

controller.abort()
```
* `maxMsgSize` - The maximum size in bytes the data field of multiplexed messages may contain (default 1MB)

### `muxer.onStream`

Use this property as an alternative to passing `onStream` as an option to the `Mplex` constructor.

### `const stream = muxer.newStream([options])`

Initiate a new stream with the remote. Returns a [duplex stream](https://gist.github.com/alanshaw/591dc7dd54e4f99338a347ef568d6ee9#duplex-it).

e.g.

*listener.js*
```js
// Create a new stream on the muxed connection
const stream = muxer.newStream()

```
$ node listener.js
[listener] listening on 9999
[listener] Got connection!
[listener] Got stream!
[listener] Received:
hey, how is it going. I am the dialer
// Use this new stream like any other duplex stream:
pipe([1, 2, 3], stream, consume)
```

*dialer.js*
In addition to `sink` and `source` properties, this stream also has the following API, that will **normally _not_ be used by stream consumers**.

```
$ node dialer.js
[dialer] opening stream
[dialer] opened stream
```
#### `stream.close()`

## Install
Closes the stream for **reading**. If iterating over the source of this stream in a `for await of` loop, it will return (exit the loop) after any buffered data has been consumed.

```sh
> npm install libp2p-mplex
```
This function is called automatically by the muxer when it receives a `CLOSE` message from the remote.

## API
The source will return normally, the sink will continue to consume.

```js
const mplex = require('libp2p-mplex')
```
#### `stream.abort([err])`

Closes the stream for **reading** _and_ **writing**. This should be called when a _local error_ has occurred.

Note, if called without an error any buffered data in the source can still be consumed and the stream will end normally.

This will cause a `RESET` message to be sent to the remote, _unless_ the sink has already ended.

The sink will return and the source will throw if an error is passed or return normally if not.

#### `stream.reset()`

Closes the stream _immediately_ for **reading** _and_ **writing**. This should be called when a _remote error_ has occurred.

This function is called automatically by the muxer when it receives a `RESET` message from the remote.

The sink will return and the source will throw.

## Contribute

The libp2p implementation in JavaScript is a work in progress. As such, there are a few things you can do right now to help out:

- Go through the modules and **check out existing issues**. This is especially useful for modules in active development. Some knowledge of IPFS/libp2p may be required, as well as the infrastructure behind it - for instance, you may need to read up on p2p and more complex operations like muxing to be able to help technically.
- **Perform code reviews**. More eyes will help a) speed the project along b) ensure quality and c) reduce possible future bugs.
- **Add tests**. There can never be enough tests.

## License

[MIT](LICENSE) © Protocol Labs
49 changes: 35 additions & 14 deletions examples/dialer.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,42 @@
'use strict'

const tcp = require('net')
const pull = require('pull-stream')
const toPull = require('stream-to-pull-stream')
const multiplex = require('../src')
const pipe = require('it-pipe')
const AbortController = require('abort-controller')
const { toIterable } = require('./util')
const Mplex = require('../src')

const socket = tcp.connect(9999)
const socket = toIterable(tcp.connect(9999))
console.log('[dialer] socket stream opened')

const muxer = multiplex.dialer(toPull(socket))
const controller = new AbortController()

console.log('[dialer] opening stream')
const stream = muxer.newStream((err) => {
console.log('[dialer] opened stream')
if (err) throw err
})
const muxer = new Mplex({ signal: controller.signal })

pull(
pull.values(['hey, how is it going. I am the dialer']),
stream
)
const pipeMuxerToSocket = async () => {
await pipe(muxer, socket, muxer)
console.log('[dialer] socket stream closed')
}

const sendAndReceive = async () => {
const muxedStream = muxer.newStream()
console.log('[dialer] muxed stream opened')

await pipe(
['hey, how is it going. I am the dialer'],
muxedStream,
async source => {
for await (const chunk of source) {
console.log('[dialer] received:')
console.log(chunk.toString())
}
}
)
console.log('[dialer] muxed stream closed')

// Close the socket stream after 1s
setTimeout(() => controller.abort(), 1000)
}

pipeMuxerToSocket()
sendAndReceive()
43 changes: 25 additions & 18 deletions examples/listener.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,34 @@
'use strict'

const tcp = require('net')
const pull = require('pull-stream')
const toPull = require('stream-to-pull-stream')
const multiplex = require('../src')
const pipe = require('it-pipe')
const { toIterable } = require('./util')
const Mplex = require('../src')

const listener = tcp.createServer((socket) => {
const listener = tcp.createServer(async socket => {
console.log('[listener] Got connection!')

const muxer = multiplex.listener(toPull(socket))

muxer.on('stream', (stream) => {
console.log('[listener] Got stream!')
pull(
stream,
pull.drain((data) => {
console.log('[listener] Received:')
console.log(data.toString())
})
)
const muxer = new Mplex({
async onStream (stream) {
console.log('[listener] muxed stream opened')
await pipe(
stream,
source => (async function * () {
for await (const chunk of source) {
console.log('[listener] received:')
console.log(chunk.toString())
yield 'thanks for the message, I am the listener'
}
})(),
stream
)
console.log('[listener] muxed stream closed')
}
})
})

listener.listen(9999, () => {
console.log('[listener] listening on 9999')
socket = toIterable(socket)
await pipe(socket, muxer, socket)
console.log('[listener] socket stream closed')
})

listener.listen(9999, () => console.log('[listener] listening on 9999'))
Loading

0 comments on commit c9bede5

Please sign in to comment.