Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

Commit

Permalink
feat: allow passing a http.Agent to ipfs-http-client in node (#3474)
Browse files Browse the repository at this point in the history
Right now no `http.Agent` is used for requests made using the http client in node, which means each request opens a new connection which can end up hitting process resource limits which means connections get dropped.

The change here sets a default `http.Agent` with a `keepAlive: true` and `maxSockets` of 6 which is consistent with [browsers](https://tools.ietf.org/html/rfc2616#section-8.1.4) and [native apps](https://developer.apple.com/documentation/foundation/nsurlsessionconfiguration/1407597-httpmaximumconnectionsperhost?language=objc).

The user can override the agent passed to the `ipfs-http-client` constructor to restore the previous functionality:

```js
const http = require('http')
const createClient = require('ipfs-http-client')

const client = createClient({
  url: 'http://127.0.0.1:5002',
  agent: new http.Agent({
    keepAlive: false,
    maxSockets: Infinity
  })
})
```

Refs: #3464
  • Loading branch information
achingbrain authored Jan 13, 2021
1 parent 7b48f14 commit fe93ba0
Show file tree
Hide file tree
Showing 8 changed files with 177 additions and 19 deletions.
6 changes: 5 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@ stages:

node_js:
- 'lts/*'
- 'node'
# node 15.5.1 broke Hapi - https://github.com/hapijs/hapi/issues/4208
# The change has been reverted - https://github.com/nodejs/node/pull/36647
# Will be released in 15.6.x - https://github.com/nodejs/node/pull/36889
# Disable testing on node 15.x.x until that is released
# - 'node'

os:
- linux
Expand Down
2 changes: 1 addition & 1 deletion packages/ipfs-daemon/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class Daemon {
/**
* Starts the IPFS HTTP server
*
* @returns {Promise<Daemon>}
* @returns {Promise<Daemon>} - A promise that resolves to a Daemon instance
*/
async start () {
log('starting')
Expand Down
50 changes: 37 additions & 13 deletions packages/ipfs-daemon/test/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,50 +5,74 @@ const { expect } = require('aegir/utils/chai')
const Daemon = require('../')
const fetch = require('node-fetch')
const WebSocket = require('ws')
const os = require('os')

function createDaemon () {
return new Daemon({
init: {
bits: 512
},
repo: `${os.tmpdir()}/ipfs-test-${Math.random()}`,
config: {
Addresses: {
Swarm: [
'/ip4/0.0.0.0/tcp/0',
'/ip4/127.0.0.1/tcp/0/ws'
],
API: '/ip4/127.0.0.1/tcp/0',
Gateway: '/ip4/127.0.0.1/tcp/0',
RPC: '/ip4/127.0.0.1/tcp/0'
}
}
})
}

describe('daemon', function () {
// slow ci is slow
this.timeout(60 * 1000)

describe('daemon', () => {
let daemon

it('should start a http api server', async () => {
daemon = new Daemon({})
daemon = createDaemon()

await daemon.start()

const {
uri
} = daemon._httpApi._apiServers[0].info

const httpId = (await fetch(`${uri}/api/v0/id`, {
method: 'POST'
})).json()
const idFromCore = await daemon._ipfs.id()

const apiId = await daemon._ipfs.id()
const httpId = await fetch(`${uri}/api/v0/id`, {
method: 'POST'
})

await expect(httpId).to.eventually.have.property('PublicKey', apiId.publicKey)
await expect(httpId.json()).to.eventually.have.property('PublicKey', idFromCore.publicKey)

await daemon.stop()
})

it('should start a http gateway server', async () => {
daemon = new Daemon({})
daemon = createDaemon()

await daemon.start()

const {
uri
} = daemon._httpGateway._gatewayServers[0].info

const result = await (await fetch(`${uri}/ipfs/QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn`, {
const result = await fetch(`${uri}/ipfs/QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn`, {
method: 'POST'
})).text()
})

expect(result).to.include('Index of /ipfs/QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn/')
await expect(result.text()).to.eventually.include('Index of /ipfs/QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn/')

await daemon.stop()
})

it('should start a gRPC server', async () => {
daemon = new Daemon({})
daemon = createDaemon()

await daemon.start()

Expand Down Expand Up @@ -83,7 +107,7 @@ describe('daemon', () => {
})

it('should stop', async () => {
daemon = new Daemon({})
daemon = createDaemon()

await daemon.start()
await daemon.stop()
Expand Down
1 change: 1 addition & 0 deletions packages/ipfs-http-client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ All core API methods take _additional_ `options` specific to the HTTP API:

* `headers` - An object or [Headers](https://developer.mozilla.org/en-US/docs/Web/API/Headers) instance that can be used to set custom HTTP headers. Note that this option can also be [configured globally](#custom-headers) via the constructor options.
* `searchParams` - An object or [`URLSearchParams`](https://developer.mozilla.org/en-US/docs/Web/API/URLSearchParams) instance that can be used to add additional query parameters to the query string sent with each request.
* `agent` - A node [http.Agent](https://nodejs.org/api/http.html#http_class_http_agent) used to configure connection persistence and reuse (only supported in node.js)

### Instance Utils

Expand Down
4 changes: 3 additions & 1 deletion packages/ipfs-http-client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
"./src/lib/multipart-request.js": "./src/lib/multipart-request.browser.js",
"ipfs-utils/src/files/glob-source": false,
"go-ipfs": false,
"ipfs-core-utils/src/files/normalise-input": "ipfs-core-utils/src/files/normalise-input/index.browser.js"
"ipfs-core-utils/src/files/normalise-input": "ipfs-core-utils/src/files/normalise-input/index.browser.js",
"http": false
},
"typesVersions": {
"*": {
Expand Down Expand Up @@ -79,6 +80,7 @@
},
"devDependencies": {
"aegir": "^29.2.2",
"delay": "^4.4.0",
"go-ipfs": "^0.7.0",
"ipfs-core": "^0.3.1",
"ipfsd-ctl": "^7.2.0",
Expand Down
21 changes: 18 additions & 3 deletions packages/ipfs-http-client/src/lib/core.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
'use strict'
/* eslint-env browser */
const Multiaddr = require('multiaddr')
const { isBrowser, isWebWorker } = require('ipfs-utils/src/env')
const { isBrowser, isWebWorker, isNode } = require('ipfs-utils/src/env')
const parseDuration = require('parse-duration').default
const log = require('debug')('ipfs-http-client:lib:error-handler')
const HTTP = require('ipfs-utils/src/http')
const merge = require('merge-options')
const toUrlString = require('ipfs-core-utils/src/to-url-string')
const http = require('http')

const DEFAULT_PROTOCOL = isBrowser || isWebWorker ? location.protocol : 'http'
const DEFAULT_HOST = isBrowser || isWebWorker ? location.hostname : 'localhost'
Expand All @@ -19,6 +20,7 @@ const DEFAULT_PORT = isBrowser || isWebWorker ? location.port : '5001'
const normalizeOptions = (options = {}) => {
let url
let opts = {}
let agent

if (typeof options === 'string' || Multiaddr.isMultiaddr(options)) {
url = new URL(toUrlString(options))
Expand Down Expand Up @@ -46,13 +48,22 @@ const normalizeOptions = (options = {}) => {
url.pathname = 'api/v0'
}

if (isNode) {
agent = opts.agent || new http.Agent({
keepAlive: true,
// Similar to browsers which limit connections to six per host
maxSockets: 6
})
}

return {
...opts,
host: url.host,
protocol: url.protocol.replace(':', ''),
port: Number(url.port),
apiPath: url.pathname,
url
url,
agent
}
}

Expand Down Expand Up @@ -105,6 +116,8 @@ const parseTimeout = (value) => {
}

/**
* @typedef {import('http').Agent} Agent
*
* @typedef {Object} ClientOptions
* @property {string} [host]
* @property {number} [port]
Expand All @@ -116,6 +129,7 @@ const parseTimeout = (value) => {
* @property {object} [ipld]
* @property {any[]} [ipld.formats] - An array of additional [IPLD formats](https://github.com/ipld/interface-ipld-format) to support
* @property {(format: string) => Promise<any>} [ipld.loadFormat] - an async function that takes the name of an [IPLD format](https://github.com/ipld/interface-ipld-format) as a string and should return the implementation of that codec
* @property {Agent} [agent] - A [http.Agent](https://nodejs.org/api/http.html#http_class_http_agent) used to control connection persistence and reuse for HTTP clients (only supported in node.js)
*/
class Client extends HTTP {
/**
Expand Down Expand Up @@ -149,7 +163,8 @@ class Client extends HTTP {
}

return out
}
},
agent: opts.agent
})

delete this.get
Expand Down
1 change: 1 addition & 0 deletions packages/ipfs-http-client/test/node.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
'use strict'

require('./node/agent')
require('./node/swarm')
require('./node/request-api')
require('./node/custom-headers')
111 changes: 111 additions & 0 deletions packages/ipfs-http-client/test/node/agent.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/* eslint-env mocha */
'use strict'

const { expect } = require('aegir/utils/chai')
const ipfsClient = require('../../src')
const delay = require('delay')

function startServer (handler) {
return new Promise((resolve) => {
// spin up a test http server to inspect the requests made by the library
const server = require('http').createServer((req, res) => {
req.on('data', () => {})
req.on('end', async () => {
const out = await handler(req)

res.writeHead(200)
res.write(JSON.stringify(out))
res.end()
})
})

server.listen(0, () => {
resolve({
port: server.address().port,
close: () => server.close()
})
})
})
}

describe('agent', function () {
let agent

before(() => {
const { Agent } = require('http')

agent = new Agent({
maxSockets: 2
})
})

it('restricts the number of concurrent connections', async () => {
const responses = []

const server = await startServer(() => {
const p = new Promise((resolve) => {
responses.push(resolve)
})

return p
})

const ipfs = ipfsClient({
url: `http://localhost:${server.port}`,
agent
})

// make three requests
const requests = Promise.all([
ipfs.id(),
ipfs.id(),
ipfs.id()
])

// wait for the first two to arrive
for (let i = 0; i < 5; i++) {
await delay(100)

if (responses.length === 2) {
// wait a little longer, the third should not arrive
await delay(1000)

expect(responses).to.have.lengthOf(2)

// respond to the in-flight requests
responses[0]({
res: 0
})
responses[1]({
res: 1
})

break
}
}

// wait for the final request to arrive
for (let i = 0; i < 5; i++) {
await delay(100)

if (responses.length === 3) {
// respond to it
responses[2]({
res: 2
})
}
}

const results = await requests

expect(results).to.deep.equal([{
res: 0
}, {
res: 1
}, {
res: 2
}])

server.close()
})
})

0 comments on commit fe93ba0

Please sign in to comment.