From 472e14f2b46c9303dd7464825f4b171c91da1a13 Mon Sep 17 00:00:00 2001 From: dirkmc Date: Wed, 27 Nov 2019 06:43:17 -0500 Subject: [PATCH] refactor(docs): async await version of examples/chat (#482) * fix: performance bottleneck in stat.js (#463) Array.shift seems to be very slow, perhaps linear, on some engines, resulting in _update consuming a lot of CPU. * docs(fix): correct docs and example for pnet (#464) * docs(fix): correct docs and example for pnet * docs(fix): correct pnet docs * docs(fix): update README.md language (#468) * docs: reciprocate (#474) * docs(example): fix ipfs cat (#475) `ipfs.files.cat` is incorrect. the correct function is `ipfs.cat` * fix: async-await example chat * fix: move handler before start * fix: examples readme typos (#481) * fix: simplify libp2p bundle for echo example * chore: remove unused vars --- examples/chat/src/dialer.js | 92 ++++++++++-------------------- examples/chat/src/libp2p-bundle.js | 54 +----------------- examples/chat/src/listener.js | 63 ++++++++------------ examples/chat/src/stream.js | 40 +++++++++++++ 4 files changed, 96 insertions(+), 153 deletions(-) create mode 100644 examples/chat/src/stream.js diff --git a/examples/chat/src/dialer.js b/examples/chat/src/dialer.js index d4e9ca7aee..ae6d8d05b5 100644 --- a/examples/chat/src/dialer.js +++ b/examples/chat/src/dialer.js @@ -4,76 +4,44 @@ const PeerId = require('peer-id') const PeerInfo = require('peer-info') const Node = require('./libp2p-bundle') -const pull = require('pull-stream') -const async = require('async') -const Pushable = require('pull-pushable') -const p = Pushable() -let idListener +const { stdinToStream, streamToConsole } = require('./stream') -async.parallel([ - (callback) => { - PeerId.createFromJSON(require('./peer-id-dialer'), (err, idDialer) => { - if (err) { - throw err - } - callback(null, idDialer) - }) - }, - (callback) => { - PeerId.createFromJSON(require('./peer-id-listener'), (err, idListener) => { - if (err) { - throw err - } - callback(null, idListener) - }) - } -], (err, ids) => { - if (err) throw err - const peerDialer = new PeerInfo(ids[0]) +async function run() { + const [idDialer, idListener] = await Promise.all([ + PeerId.createFromJSON(require('./peer-id-dialer')), + PeerId.createFromJSON(require('./peer-id-listener')) + ]) + + // Create a new libp2p node on localhost with a randomly chosen port + const peerDialer = new PeerInfo(idDialer) peerDialer.multiaddrs.add('/ip4/0.0.0.0/tcp/0') const nodeDialer = new Node({ peerInfo: peerDialer }) - const peerListener = new PeerInfo(ids[1]) - idListener = ids[1] + // Create a PeerInfo with the listening peer's address + const peerListener = new PeerInfo(idListener) peerListener.multiaddrs.add('/ip4/127.0.0.1/tcp/10333') - nodeDialer.start((err) => { - if (err) { - throw err - } - console.log('Dialer ready, listening on:') + // Start the libp2p host + await nodeDialer.start() - peerListener.multiaddrs.forEach((ma) => { - console.log(ma.toString() + '/p2p/' + idListener.toB58String()) - }) + // Output this node's address + console.log('Dialer ready, listening on:') + peerListener.multiaddrs.forEach((ma) => { + console.log(ma.toString() + '/p2p/' + idListener.toB58String()) + }) - nodeDialer.dialProtocol(peerListener, '/chat/1.0.0', (err, conn) => { - if (err) { - throw err - } - console.log('nodeA dialed to nodeB on protocol: /chat/1.0.0') - console.log('Type a message and see what happens') - // Write operation. Data sent as a buffer - pull( - p, - conn - ) - // Sink, data converted from buffer to utf8 string - pull( - conn, - pull.map((data) => { - return data.toString('utf8').replace('\n', '') - }), - pull.drain(console.log) - ) + // Dial to the remote peer (the "listener") + const { stream } = await nodeDialer.dialProtocol(peerListener, '/chat/1.0.0') - process.stdin.setEncoding('utf8') - process.openStdin().on('data', (chunk) => { - var data = chunk.toString() - p.push(data) - }) - }) - }) -}) + console.log('Dialer dialed to listener on protocol: /chat/1.0.0') + console.log('Type a message and see what happens') + + // Send stdin to the stream + stdinToStream(stream) + // Read the stream and output to console + streamToConsole(stream) +} + +run() diff --git a/examples/chat/src/libp2p-bundle.js b/examples/chat/src/libp2p-bundle.js index c45b4cfaf5..cca65f8f95 100644 --- a/examples/chat/src/libp2p-bundle.js +++ b/examples/chat/src/libp2p-bundle.js @@ -1,41 +1,12 @@ 'use strict' const TCP = require('libp2p-tcp') -const MulticastDNS = require('libp2p-mdns') const WS = require('libp2p-websockets') -const Bootstrap = require('libp2p-bootstrap') -const spdy = require('libp2p-spdy') -const KadDHT = require('libp2p-kad-dht') const mplex = require('libp2p-mplex') const secio = require('libp2p-secio') const defaultsDeep = require('@nodeutils/defaults-deep') const libp2p = require('../../..') -function mapMuxers (list) { - return list.map((pref) => { - if (typeof pref !== 'string') { - return pref - } - switch (pref.trim().toLowerCase()) { - case 'spdy': return spdy - case 'mplex': return mplex - default: - throw new Error(pref + ' muxer not available') - } - }) -} - -function getMuxers (muxers) { - const muxerPrefs = process.env.LIBP2P_MUXER - if (muxerPrefs && !muxers) { - return mapMuxers(muxerPrefs.split(',')) - } else if (muxers) { - return mapMuxers(muxers) - } else { - return [mplex, spdy] - } -} - class Node extends libp2p { constructor (_options) { const defaults = { @@ -44,29 +15,8 @@ class Node extends libp2p { TCP, WS ], - streamMuxer: getMuxers(_options.muxer), - connEncryption: [ secio ], - peerDiscovery: [ - MulticastDNS, - Bootstrap - ], - dht: KadDHT - }, - config: { - peerDiscovery: { - mdns: { - interval: 10000, - enabled: false - }, - bootstrap: { - interval: 10000, - enabled: false, - list: _options.bootstrapList - } - }, - dht: { - kBucketSize: 20 - } + streamMuxer: [ mplex ], + connEncryption: [ secio ] } } diff --git a/examples/chat/src/listener.js b/examples/chat/src/listener.js index 1dc3233d73..52c69edeb2 100644 --- a/examples/chat/src/listener.js +++ b/examples/chat/src/listener.js @@ -4,53 +4,38 @@ const PeerId = require('peer-id') const PeerInfo = require('peer-info') const Node = require('./libp2p-bundle.js') -const pull = require('pull-stream') -const Pushable = require('pull-pushable') -const p = Pushable() +const { stdinToStream, streamToConsole } = require('./stream') -PeerId.createFromJSON(require('./peer-id-listener'), (err, idListener) => { - if (err) { - throw err - } +async function run() { + // Create a new libp2p node with the given multi-address + const idListener = await PeerId.createFromJSON(require('./peer-id-listener')) const peerListener = new PeerInfo(idListener) peerListener.multiaddrs.add('/ip4/0.0.0.0/tcp/10333') const nodeListener = new Node({ peerInfo: peerListener }) - nodeListener.start((err) => { - if (err) { - throw err - } - - nodeListener.on('peer:connect', (peerInfo) => { - console.log(peerInfo.id.toB58String()) - }) - - nodeListener.handle('/chat/1.0.0', (protocol, conn) => { - pull( - p, - conn - ) + // Log a message when a remote peer connects to us + nodeListener.on('peer:connect', (peerInfo) => { + console.log(peerInfo.id.toB58String()) + }) - pull( - conn, - pull.map((data) => { - return data.toString('utf8').replace('\n', '') - }), - pull.drain(console.log) - ) + // Handle messages for the protocol + await nodeListener.handle('/chat/1.0.0', async ({ stream }) => { + // Send stdin to the stream + stdinToStream(stream) + // Read the stream and output to console + streamToConsole(stream) + }) - process.stdin.setEncoding('utf8') - process.openStdin().on('data', (chunk) => { - var data = chunk.toString() - p.push(data) - }) - }) + // Start listening + await nodeListener.start() - console.log('Listener ready, listening on:') - peerListener.multiaddrs.forEach((ma) => { - console.log(ma.toString() + '/p2p/' + idListener.toB58String()) - }) + // Output listen addresses to the console + console.log('Listener ready, listening on:') + peerListener.multiaddrs.forEach((ma) => { + console.log(ma.toString() + '/p2p/' + idListener.toB58String()) }) -}) +} + +run() diff --git a/examples/chat/src/stream.js b/examples/chat/src/stream.js new file mode 100644 index 0000000000..30385b9718 --- /dev/null +++ b/examples/chat/src/stream.js @@ -0,0 +1,40 @@ +'use strict' +/* eslint-disable no-console */ + +const pipe = require('it-pipe') +const lp = require('it-length-prefixed') + +function stdinToStream(stream) { + // Read utf-8 from stdin + process.stdin.setEncoding('utf8') + pipe( + // Read from stdin (the source) + process.stdin, + // Encode with length prefix (so receiving side knows how much data is coming) + lp.encode(), + // Write to the stream (the sink) + stream.sink + ) +} + +function streamToConsole(stream) { + pipe( + // Read from the stream (the source) + stream.source, + // Decode length-prefixed data + lp.decode(), + // Sink function + async function (source) { + // For each chunk of data + for await (const msg of source) { + // Output the data as a utf8 string + console.log('> ' + msg.toString('utf8').replace('\n', '')) + } + } + ) +} + +module.exports = { + stdinToStream, + streamToConsole +}