Skip to content

Commit

Permalink
Creator Node Track Upload Perf Improvements
Browse files Browse the repository at this point in the history
~65% faster track uploading
  • Loading branch information
SidSethi authored Aug 13, 2019
2 parents 44d4844 + c945574 commit 8cebead
Show file tree
Hide file tree
Showing 12 changed files with 461 additions and 652 deletions.
903 changes: 335 additions & 568 deletions creator-node/package-lock.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion creator-node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
"ffmpeg-static": "^2.4.0",
"ffprobe-static": "^3.0.0",
"ioredis": "^4.9.3",
"ipfs-api": "^26.1.2",
"ipfs-http-client": "^33.1.1",
"multer": "^1.4.0",
"pg": "^7.6.1",
"rate-limit-redis": "^1.6.0",
Expand Down
7 changes: 4 additions & 3 deletions creator-node/src/ffprobe.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ var spawn = require('child_process').spawn

async function getTrackDuration (fileDir) {
try {
const info = await getInfo(fileDir, { path: ffprobeStatic.path })
// TODO - data validation
return info.streams[0].duration
const resp = await getInfo(fileDir, { path: ffprobeStatic.path })
const duration = Number(resp.streams[0].duration)
if (isNaN(duration)) throw new Error(`Invalid return value from FFProbe: ${duration}`)
return duration
} catch (e) {
// If the error is the text below, it means the segment doesn't have any
// data. In that case, just return null so we skip adding the segment
Expand Down
62 changes: 49 additions & 13 deletions creator-node/src/fileManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,27 @@ const maxMemoryFileSize = parseInt(config.get('maxMemoryFileSizeBytes')) // Defa
const ALLOWED_UPLOAD_FILE_EXTENSIONS = config.get('allowedUploadFileExtensions') // default set in config.json
const AUDIO_MIME_TYPE_REGEX = /audio\/(.*)/

/** (1) Add file to IPFS; (2) save file to disk;
* (3) pin file via IPFS; (4) save file ref to DB
/**
* (1) Add file to IPFS; (2) save file to disk;
* (3) pin file via IPFS; (4) save file ref to DB
* @dev - only call this function when file is not already stored to disk
* - if it is, then use saveFileToIPFSFromFS()
*/
async function saveFile (req, buffer) {
async function saveFileFromBuffer (req, buffer) {
// make sure user has authenticated before saving file
if (!req.userId) {
throw new Error('User must be authenticated to save a file')
}

const ipfs = req.app.get('ipfsAPI')

let multihash = await ipfs.files.add(buffer, { onlyHash: true })
multihash = multihash[0].hash
const multihash = (await ipfs.add(buffer))[0].hash

const fileLocation = path.join(req.app.get('storagePath'), '/' + multihash)
await writeFile(fileLocation, buffer)
const dstPath = path.join(req.app.get('storagePath'), multihash)

// TODO(roneilr): switch to using the IPFS filestore below to avoid duplicating content
const filesAdded = await ipfs.files.add(buffer)
assert.strictEqual(multihash, filesAdded[0].hash)
await writeFile(dstPath, buffer)

// TODO: switch to using the IPFS filestore below to avoid duplicating content
await ipfs.pin.add(multihash)

// add reference to file to database
Expand All @@ -43,7 +44,7 @@ async function saveFile (req, buffer) {
cnodeUserUUID: req.userId,
multihash: multihash,
sourceFile: req.fileName,
storagePath: fileLocation
storagePath: dstPath
}
})

Expand All @@ -53,6 +54,41 @@ async function saveFile (req, buffer) {
return { multihash: multihash, fileUUID: file.fileUUID }
}

/**
* Save file to IPFS given file path.
* - Add and pin file to IPFS.
* - Re-save file to disk under multihash.
* - Save reference to file in DB.
*/
async function saveFileToIPFSFromFS (req, srcPath) {
// make sure user has authenticated before saving file
if (!req.userId) throw new Error('User must be authenticated to save a file')

const ipfs = req.app.get('ipfsAPI')

const multihash = (await ipfs.addFromFs(srcPath))[0].hash
const dstPath = path.join(req.app.get('storagePath'), multihash)

// store segment file copy under multihash for easy future retrieval
fs.copyFileSync(srcPath, dstPath)

// TODO: switch to using the IPFS filestore below to avoid duplicating content
await ipfs.pin.add(multihash)

// add reference to file to database
const file = (await models.File.findOrCreate({ where:
{
cnodeUserUUID: req.userId,
multihash: multihash,
sourceFile: req.fileName,
storagePath: dstPath
}
}))[0].dataValues

req.logger.info(`\nAdded file: ${multihash} for fileUUID ${file.fileUUID} from sourceFile ${req.fileName}`)
return { multihash: multihash, fileUUID: file.fileUUID }
}

/** Save file to disk given IPFS multihash, and ensure is pinned.
* Steps:
* - If file already stored on disk, return immediately.
Expand Down Expand Up @@ -168,7 +204,7 @@ const trackDiskStorage = multer.diskStorage({
destination: function (req, file, cb) {
// save file under randomly named folders to avoid collisions
const randomFileName = getUuid()
const fileDir = req.app.get('storagePath') + '/' + randomFileName
const fileDir = path.join(req.app.get('storagePath'), randomFileName)

// create directories for original file and segments
fs.mkdirSync(fileDir)
Expand Down Expand Up @@ -207,4 +243,4 @@ function getFileExtension (fileName) {
return (fileName.lastIndexOf('.') >= 0) ? fileName.substr(fileName.lastIndexOf('.')) : ''
}

module.exports = { saveFile, saveFileForMultihash, removeTrackFolder, upload, trackFileUpload }
module.exports = { saveFileFromBuffer, saveFileToIPFSFromFS, saveFileForMultihash, removeTrackFolder, upload, trackFileUpload }
4 changes: 2 additions & 2 deletions creator-node/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

const S3 = require('aws-sdk').S3
const ON_DEATH = require('death')
const ipfsAPI = require('ipfs-api')
const ipfsClient = require('ipfs-http-client')
const path = require('path')

const initializeApp = require('./app')
Expand Down Expand Up @@ -53,7 +53,7 @@ if (!ipfsAddr) {
logger.error('Must set ipfsAddr')
process.exit(1)
}
let ipfs = ipfsAPI(ipfsAddr, config.get('ipfsPort'))
let ipfs = ipfsClient(ipfsAddr, config.get('ipfsPort'))

// run all migrations
logger.info('Executing database migrations...')
Expand Down
2 changes: 0 additions & 2 deletions creator-node/src/redis.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ module.exports.lock = RedisLock

/** Ensure resource write access */
async function nodeSyncMiddleware (req, res, next) {
req.logger.info('before nodeysnc middleware')
if (req.session && req.session.wallet) {
const redisKey = getNodeSyncRedisKey(req.session.wallet)
const lockHeld = await RedisLock.getLock(redisKey)
Expand All @@ -38,7 +37,6 @@ async function nodeSyncMiddleware (req, res, next) {
))
}
}
req.logger.info('after nodeysnc middleware')
next()
}

Expand Down
20 changes: 9 additions & 11 deletions creator-node/src/routes/audiusUsers.js
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
const { Buffer } = require('ipfs-http-client')

const models = require('../models')
const authMiddleware = require('../authMiddleware')
const nodeSyncMiddleware = require('../redis').nodeSyncMiddleware
const { saveFile } = require('../fileManager')
const { saveFileFromBuffer } = require('../fileManager')
const { handleResponse, successResponse, errorResponseBadRequest } = require('../apiHelpers')

module.exports = function (app) {
// create AudiusUser from provided metadata, and make metadata available to network
app.post('/audius_users', authMiddleware, nodeSyncMiddleware, handleResponse(async (req, res) => {
const ipfs = req.app.get('ipfsAPI')

// TODO(roneilr): do some validation on metadata given
// TODO: do some validation on metadata given
const metadataJSON = req.body

const metadataBuffer = ipfs.types.Buffer.from(JSON.stringify(metadataJSON))
const { multihash, fileUUID } = await saveFile(req, metadataBuffer)
const metadataBuffer = Buffer.from(JSON.stringify(metadataJSON))
const { multihash, fileUUID } = await saveFileFromBuffer(req, metadataBuffer)

const audiusUserObj = {
cnodeUserUUID: req.userId,
Expand Down Expand Up @@ -58,7 +58,6 @@ module.exports = function (app) {

// update a AudiusUser
app.put('/audius_users/:blockchainId', authMiddleware, nodeSyncMiddleware, handleResponse(async (req, res) => {
const ipfs = req.app.get('ipfsAPI')
const blockchainId = req.params.blockchainId
const audiusUser = await models.AudiusUser.findOne({ where: { blockchainId, cnodeUserUUID: req.userId } })

Expand All @@ -67,13 +66,12 @@ module.exports = function (app) {
return errorResponseBadRequest(`Audius User doesn't exist for that blockchainId`)
}

// TODO(roneilr, dmanjunath): do some validation on metadata given
// TODO: do some validation on metadata given
const metadataJSON = req.body

const metadataBuffer = ipfs.types.Buffer.from(JSON.stringify(metadataJSON))
const metadataBuffer = Buffer.from(JSON.stringify(metadataJSON))

// write to a new file so there's still a record of the old file
const { multihash, fileUUID } = await saveFile(req, metadataBuffer)
const { multihash, fileUUID } = await saveFileFromBuffer(req, metadataBuffer)

// Update the file to the new fileId and write the metadata blob in the json field
let updateObj = {
Expand Down
18 changes: 10 additions & 8 deletions creator-node/src/routes/files.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
const { saveFile, upload } = require('../fileManager')
const { Buffer } = require('ipfs-http-client')

const { saveFileFromBuffer, upload } = require('../fileManager')
const { handleResponse, sendResponse, successResponse, errorResponseBadRequest, errorResponseServerError, errorResponseNotFound } = require('../apiHelpers')

const models = require('../models')
Expand All @@ -10,19 +12,19 @@ let Redis = require('ioredis')
let client = new Redis(config.get('redisPort'), config.get('redisHost'))

module.exports = function (app) {
// upload image file and make avail
// TODO(ss) - input validation
/** Store image on disk + DB and make available via IPFS */
app.post('/image_upload', authMiddleware, nodeSyncMiddleware, upload.single('file'), handleResponse(async (req, res) => {
const { multihash } = await saveFile(req, req.file.buffer)
// TODO: input validation
// TODO: switch to saveFileToIPFSFromFS
const { multihash } = await saveFileFromBuffer(req, req.file.buffer)
return successResponse({ 'image_file_multihash': multihash })
}))

// upload metadata to IPFS and save in Files table
/** upload metadata to IPFS and save in Files table */
app.post('/metadata', authMiddleware, nodeSyncMiddleware, handleResponse(async (req, res) => {
const ipfs = req.app.get('ipfsAPI')
const metadataJSON = req.body
const metadataBuffer = ipfs.types.Buffer.from(JSON.stringify(metadataJSON))
const { multihash } = await saveFile(req, metadataBuffer)
const metadataBuffer = Buffer.from(JSON.stringify(metadataJSON))
const { multihash } = await saveFileFromBuffer(req, metadataBuffer)
return successResponse({ 'metadataMultihash': multihash })
}))

Expand Down
64 changes: 37 additions & 27 deletions creator-node/src/routes/tracks.js
Original file line number Diff line number Diff line change
@@ -1,22 +1,25 @@
const fs = require('fs')
const path = require('path')
const { Buffer } = require('ipfs-http-client')

const ffmpeg = require('../ffmpeg')
const ffprobe = require('../ffprobe')

const models = require('../models')
const authMiddleware = require('../authMiddleware')
const nodeSyncMiddleware = require('../redis').nodeSyncMiddleware
const { saveFile, removeTrackFolder, trackFileUpload } = require('../fileManager')
const { saveFileFromBuffer, saveFileToIPFSFromFS, removeTrackFolder, trackFileUpload } = require('../fileManager')
const { handleResponse, successResponse, errorResponseBadRequest, errorResponseServerError } = require('../apiHelpers')

module.exports = function (app) {
// upload track segment files and make avail - will later be associated with Audius track
/**
* upload track segment files and make avail - will later be associated with Audius track
* @dev - currently stores each segment twice, once under random file UUID & once under IPFS multihash
* - this should be addressed eventually
*/
app.post('/track_content', authMiddleware, nodeSyncMiddleware, trackFileUpload.single('file'), handleResponse(async (req, res) => {
if (req.fileFilterError) {
// POST body is not a valid file type
return errorResponseBadRequest(req.fileFilterError)
}
if (req.fileFilterError) return errorResponseBadRequest(req.fileFilterError)

// create and save segments to disk
// create and save track file segments to disk
let segmentFilePaths
try {
segmentFilePaths = await ffmpeg.segmentFile(req, req.fileDir, req.fileName)
Expand All @@ -25,26 +28,35 @@ module.exports = function (app) {
return errorResponseServerError(err)
}

// for each path, read file into buffer and pass to saveFile
const files = []
for (let path of segmentFilePaths) {
let absolutePath = req.fileDir + '/segments/' + path
let fileBuffer = fs.readFileSync(absolutePath)
let { multihash } = await saveFile(req, fileBuffer)
const duration = await ffprobe.getTrackDuration(absolutePath)
if (duration) files.push({ 'multihash': multihash, duration: duration })
// for each path, call saveFile and get back multihash; return multihash + segment duration
// run all async ops in parallel as they are not independent
let saveFileProms = []
let durationProms = []
for (let filePath of segmentFilePaths) {
const absolutePath = path.join(req.fileDir, 'segments', filePath)
const saveFileProm = saveFileToIPFSFromFS(req, absolutePath)
const durationProm = ffprobe.getTrackDuration(absolutePath)
saveFileProms.push(saveFileProm)
durationProms.push(durationProm)
}
// Resolve all promises + process responses
const [saveFilePromResps, durationPromResps] = await Promise.all(
[saveFileProms, durationProms].map(promiseArray => Promise.all(promiseArray))
)
let trackSegments = saveFilePromResps.map((saveFileResp, i) => {
return { 'multihash': saveFileResp.multihash, 'duration': durationPromResps[i] }
})
// exclude 0-length segments that are sometimes outputted by ffmpeg segmentation
trackSegments = trackSegments.filter(trackSegment => trackSegment.duration)

return successResponse({ 'track_segments': files })
return successResponse({ 'track_segments': trackSegments })
}))

/** given track metadata object, create track and share track metadata with network
* - return on success: temporary ID of track
* - return on failure: error if linked segments have not already been created via POST /track_content
*/
app.post('/tracks', authMiddleware, nodeSyncMiddleware, handleResponse(async (req, res) => {
const ipfs = req.app.get('ipfsAPI')

// TODO - input validation
const metadataJSON = req.body

Expand Down Expand Up @@ -78,8 +90,8 @@ module.exports = function (app) {
}

// store metadata multihash
const metadataBuffer = ipfs.types.Buffer.from(JSON.stringify(metadataJSON))
const { multihash, fileUUID } = await saveFile(req, metadataBuffer)
const metadataBuffer = Buffer.from(JSON.stringify(metadataJSON))
const { multihash, fileUUID } = await saveFileFromBuffer(req, metadataBuffer)

// build track object for db storage
const trackObj = {
Expand Down Expand Up @@ -131,7 +143,7 @@ module.exports = function (app) {
return errorResponseBadRequest('Invalid track ID')
}

// TODO(roneilr): validate that provided blockchain ID is indeed associated with
// TODO: validate that provided blockchain ID is indeed associated with
// user wallet and metadata CID
await track.update({
blockchainId: blockchainId
Expand All @@ -152,21 +164,19 @@ module.exports = function (app) {

// update a track
app.put('/tracks/:blockchainId', authMiddleware, nodeSyncMiddleware, handleResponse(async (req, res) => {
const ipfs = req.app.get('ipfsAPI')
const blockchainId = req.params.blockchainId
const cnodeUserUUID = req.userId

const track = await models.Track.findOne({ where: { blockchainId, cnodeUserUUID } })

if (!track) return errorResponseBadRequest(`Could not find track with id ${blockchainId} owned by calling user`)

// TODO(roneilr, dmanjunath): do some validation on metadata given
// TODO: do some validation on metadata given
const metadataJSON = req.body

const metadataBuffer = ipfs.types.Buffer.from(JSON.stringify(metadataJSON))
const metadataBuffer = Buffer.from(JSON.stringify(metadataJSON))

// write to a new file so there's still a record of the old file
const { multihash, fileUUID } = await saveFile(req, metadataBuffer)
const { multihash, fileUUID } = await saveFileFromBuffer(req, metadataBuffer)

const coverArtFileMultihash = metadataJSON.cover_art
let coverArtFileUUID = null
Expand Down
4 changes: 2 additions & 2 deletions creator-node/test/audiusUsers.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ describe('test AudiusUsers', function () {
const metadata = {
test: 'field1'
}
ipfsMock.files.add.twice().withArgs(Buffer.from(JSON.stringify(metadata)))
ipfsMock.add.twice().withArgs(Buffer.from(JSON.stringify(metadata)))
ipfsMock.pin.add.once().withArgs('testCIDLink')

request(app)
Expand All @@ -44,7 +44,7 @@ describe('test AudiusUsers', function () {
const metadata = {
test: 'field1'
}
ipfsMock.files.add.twice().withArgs(Buffer.from(JSON.stringify(metadata)))
ipfsMock.add.twice().withArgs(Buffer.from(JSON.stringify(metadata)))
ipfsMock.pin.add.once().withArgs('testCIDLink')

request(app)
Expand Down
Loading

0 comments on commit 8cebead

Please sign in to comment.