Skip to content
This repository has been archived by the owner on Aug 29, 2023. It is now read-only.

callbacks -> async / await #102

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ logs
*.log

coverage
.nyc_output

# Runtime data
pids
Expand Down
1 change: 0 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ jobs:
os: linux
script:
- npx aegir build --bundlesize
- npx aegir commitlint --travis
- npx aegir dep-check
- npm run lint

Expand Down
44 changes: 20 additions & 24 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
[![](https://raw.githubusercontent.com/libp2p/interface-connection/master/img/badge.png)](https://github.com/libp2p/interface-connection)


> JavaScript implementation of the TCP module for libp2p. It exposes the [interface-transport](https://github.com/libp2p/interface-connection) for dial/listen. `libp2p-tcp` is a very thin shim that adds support for dialing to a `multiaddr`. This small shim will enable libp2p to use other different transports.
> JavaScript implementation of the TCP module for libp2p. It exposes the [interface-transport](https://github.com/libp2p/interface-connection) for dial/listen. `libp2p-tcp` is a very thin shim that adds support for dialing to a `multiaddr`. This small shim will enable libp2p to use other transports.

## Lead Maintainer

Expand Down Expand Up @@ -41,37 +41,33 @@
```js
const TCP = require('libp2p-tcp')
const multiaddr = require('multiaddr')
const pull = require('pull-stream')
const pipe = require('it-pipe')
const { collect } = require('streaming-iterables')

const mh = multiaddr('/ip4/127.0.0.1/tcp/9090')
const addr = multiaddr('/ip4/127.0.0.1/tcp/9090')

const tcp = new TCP()

const listener = tcp.createListener((socket) => {
console.log('new connection opened')
pull(
pull.values(['hello']),
pipe(
['hello'],
socket
)
})

listener.listen(mh, () => {
console.log('listening')

pull(
tcp.dial(mh),
pull.collect((err, values) => {
if (!err) {
console.log(`Value: ${values.toString()}`)
} else {
console.log(`Error: ${err}`)
}

// Close connection after reading
listener.close()
}),
)
})
await listener.listen(addr)
console.log('listening')

const socket = await tcp.dial(addr)
const values = await pipe(
socket,
collect
)
console.log(`Value: ${values.toString()}`)

// Close connection after reading
await listener.close()
dirkmc marked this conversation as resolved.
Show resolved Hide resolved
```

Outputs:
Expand All @@ -88,12 +84,12 @@ Value: hello

[![](https://raw.githubusercontent.com/libp2p/interface-transport/master/img/badge.png)](https://github.com/libp2p/interface-transport)

`libp2p-tcp` accepts TCP addresses both IPFS and non IPFS encapsulated addresses, i.e:
`libp2p-tcp` accepts TCP addresses as both IPFS and non IPFS encapsulated addresses, i.e:

`/ip4/127.0.0.1/tcp/4001`
`/ip4/127.0.0.1/tcp/4001/ipfs/QmHash`

Both for dialing and listening.
(both for dialing and listening)

### Connection

Expand Down
21 changes: 12 additions & 9 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@
"lint": "aegir lint",
"test": "aegir test -t node",
"test:node": "aegir test -t node",
"build": "aegir build",
"docs": "aegir docs",
"release": "aegir release -t node --no-build",
"release-minor": "aegir release -t node --type minor --no-build",
"release-major": "aegir-release -t node --type major --no-build",
"coverage": "aegir coverage",
"coverage-publish": "aegir coverage --provider coveralls"
"coverage": "nyc --reporter=text --reporter=lcov npm run test:node"
},
"pre-push": [
"lint",
"test"
"lint"
],
"repository": {
"type": "git",
Expand All @@ -38,20 +38,23 @@
"aegir": "^20.0.0",
"chai": "^4.2.0",
"dirty-chai": "^2.0.1",
"interface-transport": "~0.3.6",
"pull-stream": "^3.6.14"
"pull-stream": "^3.6.9",
"sinon": "^7.4.1"
},
"dependencies": {
"abortable-iterator": "^2.1.0",
"class-is": "^1.1.0",
"debug": "^4.1.1",
"err-code": "^1.1.2",
"interface-connection": "~0.3.3",
"interface-transport": "~0.5.2",
"ip-address": "^6.1.0",
"it-pipe": "^1.0.1",
"lodash.includes": "^4.3.0",
"lodash.isfunction": "^3.0.9",
"mafmt": "^6.0.7",
"mafmt": "^6.0.8",
"multiaddr": "^6.1.0",
"once": "^1.4.0",
"stream-to-pull-stream": "^1.7.3"
"streaming-iterables": "^4.1.0"
},
"contributors": [
"Alan Shaw <alan@tableflip.io>",
Expand Down
17 changes: 17 additions & 0 deletions src/adapter.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
'use strict'

const { Adapter } = require('interface-transport')
const withIs = require('class-is')
const TCP = require('.')

// Legacy adapter to old transport & connection interface
class TcpAdapter extends Adapter {
constructor () {
super(new TCP())
}
}

module.exports = withIs(TcpAdapter, {
className: 'TCP',
symbolName: '@libp2p/js-libp2p-tcp/tcp'
})
8 changes: 8 additions & 0 deletions src/constants.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
'use strict'

// IPFS multi-address code
module.exports.IPFS_MA_CODE = 421

// Time to wait for a connection to close gracefully before destroying it
// manually
module.exports.CLOSE_TIMEOUT = 2000
76 changes: 48 additions & 28 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -1,55 +1,76 @@
'use strict'

const net = require('net')
const toPull = require('stream-to-pull-stream')
const mafmt = require('mafmt')
const withIs = require('class-is')
const includes = require('lodash.includes')
const isFunction = require('lodash.isfunction')
const Connection = require('interface-connection').Connection
const once = require('once')
const errcode = require('err-code')
const debug = require('debug')
const log = debug('libp2p:tcp:dial')

const Libp2pSocket = require('./socket')
const createListener = require('./listener')
const { AbortError } = require('interface-transport')

function noop () {}

class TCP {
dial (ma, options, callback) {
if (isFunction(options)) {
callback = options
options = {}
}
async dial (ma, options) {
const cOpts = ma.toOptions()
log('Dialing %s:%s', cOpts.host, cOpts.port)

callback = once(callback || noop)
const rawSocket = await this._connect(cOpts, options)
return new Libp2pSocket(rawSocket, ma, options)
}

const cOpts = ma.toOptions()
log('Connecting to %s %s', cOpts.port, cOpts.host)
_connect (cOpts, options = {}) {
return new Promise((resolve, reject) => {
if ((options.signal || {}).aborted) {
return reject(new AbortError())
}

const rawSocket = net.connect(cOpts)
const start = Date.now()
const rawSocket = net.connect(cOpts)

rawSocket.once('timeout', () => {
log('timeout')
rawSocket.emit('error', new Error('Timeout'))
})
const onError = (err) => {
const msg = `Error dialing ${cOpts.host}:${cOpts.port}: ${err.message}`
done(errcode(msg, err.code))
}

rawSocket.once('error', callback)
const onTimeout = () => {
log('Timeout dialing %s:%s', cOpts.host, cOpts.port)
const err = errcode(`Timeout after ${Date.now() - start}ms`, 'ETIMEDOUT')
// Note: this will result in onError() being called
rawSocket.emit('error', err)
}

rawSocket.once('connect', () => {
rawSocket.removeListener('error', callback)
callback()
})
const onConnect = () => {
log('Connected to %s:%s', cOpts.host, cOpts.port)
done(null, rawSocket)
}

const socket = toPull.duplex(rawSocket)
const onAbort = () => {
log('Dial to %s:%s aborted', cOpts.host, cOpts.port)
rawSocket.destroy()
done(new AbortError())
}

const conn = new Connection(socket)
const done = (err, res) => {
rawSocket.removeListener('error', onError)
rawSocket.removeListener('timeout', onTimeout)
rawSocket.removeListener('connect', onConnect)

conn.getObservedAddrs = (callback) => {
return callback(null, [ma])
}
options.signal && options.signal.removeEventListener('abort', onAbort)

return conn
err ? reject(err) : resolve(res)
}

rawSocket.once('error', onError)
rawSocket.once('timeout', onTimeout)
rawSocket.once('connect', onConnect)
options.signal && options.signal.addEventListener('abort', onAbort)
})
}

createListener (options, handler) {
Expand All @@ -59,7 +80,6 @@ class TCP {
}

handler = handler || noop

return createListener(handler)
}

Expand Down
Loading