Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

Commit

Permalink
feat(gateway): add streaming, conditional and range requests (#1989)
Browse files Browse the repository at this point in the history
This change simplifies code responsible for streaming response
and makes the streaming actually work by telling the payload compression
stream to flush its content on every read().
(previous version was buffering entire thing in Hapi's compressor memory)

We also do content-type detection based on the beginning of the stream
by peeking at first `fileType.minimumBytes` bytes.

- Switched from deprecated `hapi` and `joi` to `@hapi/hapi` and `@hapi/joi`
- Added support for Conditional Requests (RFC7232)
  - Returning `304 Not Modified` if `If-None-Match` is a CID matching `Etag`
  - Added `Last-Modified` to `/ipfs/` responses (improves client-side caching)
  - Always returning `304 Not Modified`
    when `If-Modified-Since` is present for immutable `/ipfs/`
- Added support for Byte Range requests (RFC7233, Section-2.1)
- Added support for `?filename=` parameter (improves downloads of raw cids)

License: MIT
Signed-off-by: Marcin Rataj <lidel@lidel.org>
  • Loading branch information
lidel authored and Alan Shaw committed May 8, 2019
1 parent bd3ade6 commit 48a8e75
Show file tree
Hide file tree
Showing 4 changed files with 359 additions and 49 deletions.
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
"stream-to-promise": "^2.2.0"
},
"dependencies": {
"@hapi/ammo": "^3.1.0",
"@hapi/hapi": "^18.3.1",
"@hapi/joi": "^15.0.1",
"async": "^2.6.1",
Expand All @@ -90,6 +91,7 @@
"bl": "^3.0.0",
"boom": "^7.2.0",
"bs58": "^4.0.1",
"buffer-peek-stream": "^1.0.1",
"byteman": "^1.3.5",
"cid-tool": "~0.2.0",
"cids": "~0.5.8",
Expand Down
158 changes: 110 additions & 48 deletions src/http/gateway/resources/gateway.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
const debug = require('debug')
const log = debug('ipfs:http-gateway')
log.error = debug('ipfs:http-gateway:error')
const pull = require('pull-stream')
const pushable = require('pull-pushable')
const toStream = require('pull-stream-to-stream')

const fileType = require('file-type')
const mime = require('mime-types')
const { PassThrough } = require('readable-stream')
const Boom = require('boom')
const Ammo = require('@hapi/ammo') // HTTP Range processing utilities
const peek = require('buffer-peek-stream')

const { resolver } = require('ipfs-http-response')
const PathUtils = require('../utils/path')
Expand All @@ -30,6 +30,20 @@ function detectContentType (ref, chunk) {
return mime.contentType(mimeType)
}

// Enable streaming of compressed payload
// https://github.com/hapijs/hapi/issues/3599
class ResponseStream extends PassThrough {
_read (size) {
super._read(size)
if (this._compressor) {
this._compressor.flush()
}
}
setCompressor (compressor) {
this._compressor = compressor
}
}

module.exports = {
checkCID (request, h) {
if (!request.params.cid) {
Expand Down Expand Up @@ -85,66 +99,114 @@ module.exports = {
return h.redirect(PathUtils.removeTrailingSlash(ref)).permanent(true)
}

return new Promise((resolve, reject) => {
let pusher
let started = false
// Support If-None-Match & Etag (Conditional Requests from RFC7232)
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/ETag
const etag = `"${data.cid}"`
const cachedEtag = request.headers['if-none-match']
if (cachedEtag === etag || cachedEtag === `W/${etag}`) {
return h.response().code(304) // Not Modified
}

pull(
ipfs.catPullStream(data.cid),
pull.drain(
chunk => {
if (!started) {
started = true
pusher = pushable()
const res = h.response(toStream.source(pusher).pipe(new PassThrough()))
// Immutable content produces 304 Not Modified for all values of If-Modified-Since
if (ref.startsWith('/ipfs/') && request.headers['if-modified-since']) {
return h.response().code(304) // Not Modified
}

// Etag maps directly to an identifier for a specific version of a resource
res.header('Etag', `"${data.cid}"`)
// This necessary to set correct Content-Length and validate Range requests
// Note: we need `size` (raw data), not `cumulativeSize` (data + DAGNodes)
const { size } = await ipfs.files.stat(`/ipfs/${data.cid}`)

// Handle Byte Range requests (https://tools.ietf.org/html/rfc7233#section-2.1)
const catOptions = {}
let rangeResponse = false
if (request.headers.range) {
// If-Range is respected (when present), but we compare it only against Etag
// (Last-Modified date is too weak for IPFS use cases)
if (!request.headers['if-range'] || request.headers['if-range'] === etag) {
const ranges = Ammo.header(request.headers.range, size)
if (!ranges) {
const error = Boom.rangeNotSatisfiable()
error.output.headers['content-range'] = `bytes */${size}`
throw error
}

if (ranges.length === 1) { // Ignore requests for multiple ranges (hard to map to ipfs.cat and not used in practice)
rangeResponse = true
const range = ranges[0]
catOptions.offset = range.from
catOptions.length = (range.to - range.from + 1)
}
}
}

// Set headers specific to the immutable namespace
if (ref.startsWith('/ipfs/')) {
res.header('Cache-Control', 'public, max-age=29030400, immutable')
}
const rawStream = ipfs.catReadableStream(data.cid, catOptions)
const responseStream = new ResponseStream()

const contentType = detectContentType(ref, chunk)
// Pass-through Content-Type sniffing over initial bytes
const { peekedStream, contentType } = await new Promise((resolve, reject) => {
const peekBytes = fileType.minimumBytes
peek(rawStream, peekBytes, (err, streamHead, peekedStream) => {
if (err) {
log.error(err)
return reject(err)
}
resolve({ peekedStream, contentType: detectContentType(ref, streamHead) })
})
})

log('ref ', ref)
log('mime-type ', contentType)
peekedStream.pipe(responseStream)

if (contentType) {
log('writing content-type header')
res.header('Content-Type', contentType)
}
const res = h.response(responseStream).code(rangeResponse ? 206 : 200)

resolve(res)
}
pusher.push(chunk)
},
err => {
if (err) {
log.error(err)

// We already started flowing, abort the stream
if (started) {
return pusher.end(err)
}

return reject(err)
}
// Etag maps directly to an identifier for a specific version of a resource
// and enables smart client-side caching thanks to If-None-Match
res.header('etag', etag)

pusher.end()
}
)
)
})
// Set headers specific to the immutable namespace
if (ref.startsWith('/ipfs/')) {
res.header('Cache-Control', 'public, max-age=29030400, immutable')
}

log('ref ', ref)
log('content-type ', contentType)

if (contentType) {
log('writing content-type header')
res.header('Content-Type', contentType)
}

if (rangeResponse) {
const from = catOptions.offset
const to = catOptions.offset + catOptions.length - 1
res.header('Content-Range', `bytes ${from}-${to}/${size}`)
res.header('Content-Length', catOptions.length)
} else {
// Announce support for Range requests
res.header('Accept-Ranges', 'bytes')
res.header('Content-Length', size)
}

// Support Content-Disposition via ?filename=foo parameter
// (useful for browser vendor to download raw CID into custom filename)
// Source: https://github.com/ipfs/go-ipfs/blob/v0.4.20/core/corehttp/gateway_handler.go#L232-L236
if (request.query.filename) {
res.header('Content-Disposition', `inline; filename*=UTF-8''${encodeURIComponent(request.query.filename)}`)
}

return res
},

afterHandler (request, h) {
const { response } = request
if (response.statusCode === 200) {
// Add headers to successfult responses (regular or range)
if (response.statusCode === 200 || response.statusCode === 206) {
const { ref } = request.pre.args
response.header('X-Ipfs-Path', ref)
if (ref.startsWith('/ipfs/')) {
// "set modtime to a really long time ago, since files are immutable and should stay cached"
// Source: https://github.com/ipfs/go-ipfs/blob/v0.4.20/core/corehttp/gateway_handler.go#L228-L229
response.header('Last-Modified', 'Thu, 01 Jan 1970 00:00:01 GMT')
// Suborigins: https://github.com/ipfs/in-web-browsers/issues/66
const rootCid = ref.split('/')[2]
const ipfsOrigin = cidToString(rootCid, { base: 'base32' })
response.header('Suborigin', 'ipfs000' + ipfsOrigin)
Expand Down
3 changes: 3 additions & 0 deletions src/http/gateway/routes/gateway.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ module.exports = {
pre: [
{ method: resources.gateway.checkCID, assign: 'args' }
],
response: {
ranges: false // disable built-in support, we do it manually
},
ext: {
onPostHandler: { method: resources.gateway.afterHandler }
}
Expand Down
Loading

0 comments on commit 48a8e75

Please sign in to comment.