Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

Gateway Improvements: Streaming, Conditional and Range Requests #1989

Merged
merged 3 commits into from
May 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
"stream-to-promise": "^2.2.0"
},
"dependencies": {
"@hapi/ammo": "^3.1.0",
"@hapi/hapi": "^18.3.1",
"@hapi/joi": "^15.0.1",
"async": "^2.6.1",
Expand All @@ -90,6 +91,7 @@
"bl": "^3.0.0",
"boom": "^7.2.0",
"bs58": "^4.0.1",
"buffer-peek-stream": "^1.0.1",
"byteman": "^1.3.5",
"cid-tool": "~0.2.0",
"cids": "~0.5.8",
Expand Down
158 changes: 110 additions & 48 deletions src/http/gateway/resources/gateway.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
const debug = require('debug')
const log = debug('ipfs:http-gateway')
log.error = debug('ipfs:http-gateway:error')
const pull = require('pull-stream')
const pushable = require('pull-pushable')
const toStream = require('pull-stream-to-stream')

const fileType = require('file-type')
const mime = require('mime-types')
const { PassThrough } = require('readable-stream')
const Boom = require('boom')
const Ammo = require('@hapi/ammo') // HTTP Range processing utilities
const peek = require('buffer-peek-stream')

const { resolver } = require('ipfs-http-response')
const PathUtils = require('../utils/path')
Expand All @@ -30,6 +30,20 @@ function detectContentType (ref, chunk) {
return mime.contentType(mimeType)
}

// Enable streaming of compressed payload
// https://github.com/hapijs/hapi/issues/3599
class ResponseStream extends PassThrough {
_read (size) {
super._read(size)
if (this._compressor) {
this._compressor.flush()
}
}
setCompressor (compressor) {
this._compressor = compressor
}
}

module.exports = {
checkCID (request, h) {
if (!request.params.cid) {
Expand Down Expand Up @@ -85,66 +99,114 @@ module.exports = {
return h.redirect(PathUtils.removeTrailingSlash(ref)).permanent(true)
}

return new Promise((resolve, reject) => {
let pusher
let started = false
// Support If-None-Match & Etag (Conditional Requests from RFC7232)
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/ETag
const etag = `"${data.cid}"`
const cachedEtag = request.headers['if-none-match']
if (cachedEtag === etag || cachedEtag === `W/${etag}`) {
return h.response().code(304) // Not Modified
}

pull(
ipfs.catPullStream(data.cid),
pull.drain(
chunk => {
if (!started) {
started = true
pusher = pushable()
const res = h.response(toStream.source(pusher).pipe(new PassThrough()))
// Immutable content produces 304 Not Modified for all values of If-Modified-Since
if (ref.startsWith('/ipfs/') && request.headers['if-modified-since']) {
return h.response().code(304) // Not Modified
}

// Etag maps directly to an identifier for a specific version of a resource
res.header('Etag', `"${data.cid}"`)
// This necessary to set correct Content-Length and validate Range requests
// Note: we need `size` (raw data), not `cumulativeSize` (data + DAGNodes)
const { size } = await ipfs.files.stat(`/ipfs/${data.cid}`)

// Handle Byte Range requests (https://tools.ietf.org/html/rfc7233#section-2.1)
const catOptions = {}
let rangeResponse = false
if (request.headers.range) {
// If-Range is respected (when present), but we compare it only against Etag
// (Last-Modified date is too weak for IPFS use cases)
if (!request.headers['if-range'] || request.headers['if-range'] === etag) {
const ranges = Ammo.header(request.headers.range, size)
if (!ranges) {
const error = Boom.rangeNotSatisfiable()
error.output.headers['content-range'] = `bytes */${size}`
throw error
}

if (ranges.length === 1) { // Ignore requests for multiple ranges (hard to map to ipfs.cat and not used in practice)
rangeResponse = true
const range = ranges[0]
catOptions.offset = range.from
catOptions.length = (range.to - range.from + 1)
}
}
}

// Set headers specific to the immutable namespace
if (ref.startsWith('/ipfs/')) {
res.header('Cache-Control', 'public, max-age=29030400, immutable')
}
const rawStream = ipfs.catReadableStream(data.cid, catOptions)
const responseStream = new ResponseStream()

const contentType = detectContentType(ref, chunk)
// Pass-through Content-Type sniffing over initial bytes
const { peekedStream, contentType } = await new Promise((resolve, reject) => {
const peekBytes = fileType.minimumBytes
peek(rawStream, peekBytes, (err, streamHead, peekedStream) => {
if (err) {
log.error(err)
return reject(err)
}
resolve({ peekedStream, contentType: detectContentType(ref, streamHead) })
})
})

log('ref ', ref)
log('mime-type ', contentType)
peekedStream.pipe(responseStream)

if (contentType) {
log('writing content-type header')
res.header('Content-Type', contentType)
}
const res = h.response(responseStream).code(rangeResponse ? 206 : 200)

resolve(res)
}
pusher.push(chunk)
},
err => {
if (err) {
log.error(err)

// We already started flowing, abort the stream
if (started) {
return pusher.end(err)
}

return reject(err)
}
// Etag maps directly to an identifier for a specific version of a resource
// and enables smart client-side caching thanks to If-None-Match
res.header('etag', etag)

pusher.end()
}
)
)
})
// Set headers specific to the immutable namespace
if (ref.startsWith('/ipfs/')) {
res.header('Cache-Control', 'public, max-age=29030400, immutable')
}

log('ref ', ref)
log('content-type ', contentType)

if (contentType) {
log('writing content-type header')
res.header('Content-Type', contentType)
}

if (rangeResponse) {
const from = catOptions.offset
const to = catOptions.offset + catOptions.length - 1
res.header('Content-Range', `bytes ${from}-${to}/${size}`)
res.header('Content-Length', catOptions.length)
} else {
// Announce support for Range requests
res.header('Accept-Ranges', 'bytes')
res.header('Content-Length', size)
}

// Support Content-Disposition via ?filename=foo parameter
// (useful for browser vendor to download raw CID into custom filename)
// Source: https://github.com/ipfs/go-ipfs/blob/v0.4.20/core/corehttp/gateway_handler.go#L232-L236
if (request.query.filename) {
res.header('Content-Disposition', `inline; filename*=UTF-8''${encodeURIComponent(request.query.filename)}`)
}

return res
},

afterHandler (request, h) {
const { response } = request
if (response.statusCode === 200) {
// Add headers to successfult responses (regular or range)
if (response.statusCode === 200 || response.statusCode === 206) {
const { ref } = request.pre.args
response.header('X-Ipfs-Path', ref)
if (ref.startsWith('/ipfs/')) {
// "set modtime to a really long time ago, since files are immutable and should stay cached"
// Source: https://github.com/ipfs/go-ipfs/blob/v0.4.20/core/corehttp/gateway_handler.go#L228-L229
response.header('Last-Modified', 'Thu, 01 Jan 1970 00:00:01 GMT')
// Suborigins: https://github.com/ipfs/in-web-browsers/issues/66
const rootCid = ref.split('/')[2]
const ipfsOrigin = cidToString(rootCid, { base: 'base32' })
response.header('Suborigin', 'ipfs000' + ipfsOrigin)
Expand Down
3 changes: 3 additions & 0 deletions src/http/gateway/routes/gateway.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ module.exports = {
pre: [
{ method: resources.gateway.checkCID, assign: 'args' }
],
response: {
ranges: false // disable built-in support, we do it manually
},
ext: {
onPostHandler: { method: resources.gateway.afterHandler }
}
Expand Down
Loading