Skip to content

Commit

Permalink
CON-402 CN Remove all synchronous disk ops (#3908)
Browse files Browse the repository at this point in the history
  • Loading branch information
SidSethi authored Sep 23, 2022
1 parent e438b87 commit dc2ec68
Show file tree
Hide file tree
Showing 28 changed files with 333 additions and 301 deletions.
2 changes: 1 addition & 1 deletion creator-node/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/
const enqueueSync = async (params) => {
const { serviceRegistry } = params
// eslint-disable-next-line node/no-sync
await serviceRegistry.syncQueue.enqueueSync(params)
}

Expand All @@ -13,6 +14,7 @@ const enqueueSync = async (params) => {
*/
const processManualImmediateSync = async (params) => {
const { serviceRegistry } = params
// eslint-disable-next-line node/no-sync
await serviceRegistry.syncImmediateQueue.processManualImmediateSync(params)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
const axios = require('axios')
const fs = require('fs')
const fsExtra = require('fs-extra')
const fs = require('fs-extra')
const FormData = require('form-data')
const _ = require('lodash')

Expand Down Expand Up @@ -380,7 +379,7 @@ class TrackTranscodeHandoffManager {
* @returns formData object passed in axios to send a transcode and segment request
*/
static async createFormData(pathToFile) {
const fileExists = await fsExtra.pathExists(pathToFile)
const fileExists = await fs.pathExists(pathToFile)
if (!fileExists) {
throw new Error(`File does not exist at path=${pathToFile}`)
}
Expand Down
2 changes: 1 addition & 1 deletion creator-node/src/config.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
const axios = require('axios')
const convict = require('convict')
const fs = require('fs')
const fs = require('fs-extra')
const path = require('path')
const os = require('os')
const _ = require('lodash')
Expand Down
18 changes: 9 additions & 9 deletions creator-node/src/diskManager.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
const path = require('path')
const fs = require('fs')
const fs = require('fs-extra')
const config = require('./config')
const { logger: genericLogger } = require('./logging')
const CID = require('cids')
Expand Down Expand Up @@ -27,14 +27,14 @@ class DiskManager {
* is we should be able to delete the contents of this folder without scanning through other folders with the
* naming scheme.
*/
static getTmpTrackUploadArtifactsPath() {
static async getTmpTrackUploadArtifactsPath() {
const dirPath = path.join(
config.get('storagePath'),
'files',
'tmp_track_artifacts'
)
if (!TMP_TRACK_ARTIFACTS_CREATED) {
this.ensureDirPathExists(dirPath)
await this.ensureDirPathExists(dirPath)
TMP_TRACK_ARTIFACTS_CREATED = true
}
return dirPath
Expand All @@ -51,7 +51,7 @@ class DiskManager {
* eg QmYfSQCgCwhxwYcdEwCkFJHicDe6rzCAb7AtLz3GrHmuU6 will be eg /file_storage/muU/QmYfSQCgCwhxwYcdEwCkFJHicDe6rzCAb7AtLz3GrHmuU6
* @param {String} cid file system destination, either filename or directory
*/
static computeFilePath(cid, ensureDirPathExists = true) {
static async computeFilePath(cid, ensureDirPathExists = true) {
try {
CID.isCID(new CID(cid))
} catch (e) {
Expand All @@ -76,7 +76,7 @@ class DiskManager {

// create the subdirectories in parentDirHash if they don't exist
if (ensureDirPathExists) {
this.ensureDirPathExists(parentDirPath)
await this.ensureDirPathExists(parentDirPath)
}

return path.join(parentDirPath, cid)
Expand Down Expand Up @@ -115,7 +115,7 @@ class DiskManager {
* @param {String} dirName directory name
* @param {String} fileName file name
*/
static computeFilePathInDir(dirName, fileName) {
static async computeFilePathInDir(dirName, fileName) {
if (!dirName || !fileName) {
genericLogger.error(
`Invalid dirName and/or fileName, dirName=${dirName}, fileName=${fileName}`
Expand All @@ -135,7 +135,7 @@ class DiskManager {
)
}

const parentDirPath = this.computeFilePath(dirName)
const parentDirPath = await this.computeFilePath(dirName)
const absolutePath = path.join(parentDirPath, fileName)
genericLogger.info(`File path computed, absolutePath=${absolutePath}`)
return absolutePath
Expand All @@ -146,10 +146,10 @@ class DiskManager {
* If it does exist, it will not overwrite, effectively a no-op
* @param {*} dirPath fs directory path to create if it does not exist
*/
static ensureDirPathExists(dirPath) {
static async ensureDirPathExists(dirPath) {
try {
// the mkdir recursive option is equivalent to `mkdir -p` and should created nested folders several levels deep
fs.mkdirSync(dirPath, { recursive: true })
await fs.mkdir(dirPath, { recursive: true })
} catch (e) {
genericLogger.error(
`Error making directory, dirName=${dirPath}, error=${e.toString()}`
Expand Down
32 changes: 16 additions & 16 deletions creator-node/src/diskManager.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,23 @@ describe('Test DiskManager', function () {
/**
* getTmpTrackUploadArtifactsPath
*/
it('Should pass if storagePath is correctly set', function () {
it('Should pass if storagePath is correctly set', async function () {
const tmpTrackArtifactPath = path.join(
DiskManager.getConfigStoragePath(),
'files',
'tmp_track_artifacts'
)
assert.deepStrictEqual(
tmpTrackArtifactPath,
DiskManager.getTmpTrackUploadArtifactsPath()
await DiskManager.getTmpTrackUploadArtifactsPath()
)
})

/**
* computeFilePath
*/
it('Should pass if computeFilePath returns the correct path', function () {
const fullPath = DiskManager.computeFilePath(
it('Should pass if computeFilePath returns the correct path', async function () {
const fullPath = await DiskManager.computeFilePath(
'QmYfSQCgCwhxwYcdEwCkFJHicDe6rzCAb7AtLz3GrHmuU6'
)
const validPath = path.join(
Expand All @@ -50,29 +50,29 @@ describe('Test DiskManager', function () {
assert.deepStrictEqual(fullPath, validPath)
})

it('Should fail if fileName is not passed into computeFilePath', function () {
it('Should fail if fileName is not passed into computeFilePath', async function () {
try {
DiskManager.computeFilePath()
await DiskManager.computeFilePath()
} catch (e) {
assert.ok(
e.message.includes('Please pass in a valid cid to computeFilePath')
)
}
})

it(`Should fail if fileName doesn't contain the appropriate amount of characters`, function () {
it(`Should fail if fileName doesn't contain the appropriate amount of characters`, async function () {
try {
DiskManager.computeFilePath('asd')
await DiskManager.computeFilePath('asd')
} catch (e) {
assert.ok(
e.message.includes('Please pass in a valid cid to computeFilePath')
)
}
})

it(`Should fail if fileName contains a slash`, function () {
it(`Should fail if fileName contains a slash`, async function () {
try {
DiskManager.computeFilePath('/file_storage/asdf')
await DiskManager.computeFilePath('/file_storage/asdf')
} catch (e) {
assert.ok(
e.message.includes('Please pass in a valid cid to computeFilePath')
Expand All @@ -83,8 +83,8 @@ describe('Test DiskManager', function () {
/**
* computeFilePathInDir
*/
it('Should pass if computeFilePathInDir returns the correct path', function () {
const fullPath = DiskManager.computeFilePathInDir(
it('Should pass if computeFilePathInDir returns the correct path', async function () {
const fullPath = await DiskManager.computeFilePathInDir(
'QmRSvU8NtadxPPrP4M72wUPBiTqykqziWDuGr6q2arsYW4',
'QmYfSQCgCwhxwYcdEwCkFJHicDe6rzCAb7AtLz3GrHmuU6'
)
Expand All @@ -98,17 +98,17 @@ describe('Test DiskManager', function () {
assert.deepStrictEqual(fullPath, validPath)
})

it('Should fail if dirName and fileName are not passed into computeFilePathInDir', function () {
it('Should fail if dirName and fileName are not passed into computeFilePathInDir', async function () {
try {
DiskManager.computeFilePathInDir()
await DiskManager.computeFilePathInDir()
} catch (e) {
assert.ok(e.message.includes('Must pass in valid dirName and fileName'))
}
})

it('Should fail if dirName or fileName are not a CID passed into computeFilePathInDir', function () {
it('Should fail if dirName or fileName are not a CID passed into computeFilePathInDir', async function () {
try {
DiskManager.computeFilePathInDir(
await DiskManager.computeFilePathInDir(
'Qmdirhash',
'QmYfSQCgCwhxwYcdEwCkFJHicDe6rzCAb7AtLz3GrHmuU6'
)
Expand Down
82 changes: 45 additions & 37 deletions creator-node/src/ffmpeg.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
const config = require('./config')
const fs = require('fs')
const fs = require('fs-extra')
const path = require('path')
const ffmpeg = require('ffmpeg-static').path
const spawn = require('child_process').spawn
Expand All @@ -13,7 +13,7 @@ const { logger: genericLogger } = require('./logging')
* @param {string} params.fileDir the directory of the uploaded track artifact
* @param {string} params.fileName the uploaded track artifact filename
* @param {Object} params.logContext the log context used to instantiate a logger
* @returns {Object} response in the structure
* @returns {Promise<Object>} response in the structure
{
segments: {
fileNames: segmentFileNames {string[]}: the segment file names only,
Expand All @@ -24,6 +24,7 @@ const { logger: genericLogger } = require('./logging')
*/
function segmentFile(fileDir, fileName, { logContext }) {
const logger = genericLogger.child(logContext)

return new Promise((resolve, reject) => {
const absolutePath = path.resolve(fileDir, fileName)
logger.info(`Segmenting file ${absolutePath}...`)
Expand Down Expand Up @@ -63,24 +64,27 @@ function segmentFile(fileDir, fileName, { logContext }) {
proc.stderr.on('data', (data) => (stderr += data.toString()))

proc.on('close', (code) => {
if (code === 0) {
const segmentFileNames = fs.readdirSync(fileDir + '/segments')
const segmentFilePaths = segmentFileNames.map((filename) =>
path.resolve(fileDir, 'segments', filename)
)
async function asyncFn() {
if (code === 0) {
const segmentFileNames = await fs.readdir(fileDir + '/segments')
const segmentFilePaths = segmentFileNames.map((filename) =>
path.resolve(fileDir, 'segments', filename)
)

resolve({
segments: {
fileNames: segmentFileNames,
filePaths: segmentFilePaths
},
m3u8FilePath
})
} else {
logger.error('Error when processing file with ffmpeg')
logger.error('Command stdout:', stdout, '\nCommand stderr:', stderr)
reject(new Error('FFMPEG Error'))
resolve({
segments: {
fileNames: segmentFileNames,
filePaths: segmentFilePaths
},
m3u8FilePath
})
} else {
logger.error('Error when processing file with ffmpeg')
logger.error('Command stdout:', stdout, '\nCommand stderr:', stderr)
reject(new Error('FFMPEG Error'))
}
}
asyncFn()
})
})
}
Expand All @@ -92,21 +96,22 @@ function segmentFile(fileDir, fileName, { logContext }) {
* @param {string} params.fileDir the directory of the uploaded track artifact
* @param {string} params.fileName the uploaded track artifact filename
* @param {Object} params.logContext the log context used to instantiate a logger
* @returns {string} the path to the transcode
* @returns {Promise<string>} the path to the transcode
*/
function transcodeFileTo320(fileDir, fileName, { logContext }) {
async function transcodeFileTo320(fileDir, fileName, { logContext }) {
const logger = genericLogger.child(logContext)
return new Promise((resolve, reject) => {
const sourcePath = path.resolve(fileDir, fileName)
const targetPath = path.resolve(fileDir, fileName.split('.')[0] + '-dl.mp3')
logger.info(`Transcoding file ${sourcePath}...`)

// Exit if dl-copy file already exists at target path.
if (fs.existsSync(targetPath)) {
logger.info(`Downloadable copy already exists at ${targetPath}.`)
resolve(targetPath)
}
const sourcePath = path.resolve(fileDir, fileName)
const targetPath = path.resolve(fileDir, fileName.split('.')[0] + '-dl.mp3')
logger.info(`Transcoding file ${sourcePath}...`)

// Exit if dl-copy file already exists at target path.
if (await fs.pathExists(targetPath)) {
logger.info(`Downloadable copy already exists at ${targetPath}.`)
return targetPath
}

return new Promise((resolve, reject) => {
// https://ffmpeg.org/ffmpeg-formats.html#hls-2
const args = [
'-i',
Expand All @@ -130,20 +135,23 @@ function transcodeFileTo320(fileDir, fileName, { logContext }) {
proc.stderr.on('data', (data) => (stderr += data.toString()))

proc.on('close', (code) => {
if (code === 0) {
if (fs.existsSync(targetPath)) {
logger.info(`Transcoded file ${targetPath}`)
resolve(targetPath)
async function asyncFn() {
if (code === 0) {
if (await fs.pathExists(targetPath)) {
logger.info(`Transcoded file ${targetPath}`)
resolve(targetPath)
} else {
logger.error('Error when processing file with ffmpeg')
logger.error('Command stdout:', stdout, '\nCommand stderr:', stderr)
reject(new Error('FFMPEG Error'))
}
} else {
logger.error('Error when processing file with ffmpeg')
logger.error('Command stdout:', stdout, '\nCommand stderr:', stderr)
reject(new Error('FFMPEG Error'))
}
} else {
logger.error('Error when processing file with ffmpeg')
logger.error('Command stdout:', stdout, '\nCommand stderr:', stderr)
reject(new Error('FFMPEG Error'))
}
asyncFn()
})
})
}
Expand Down
Loading

0 comments on commit dc2ec68

Please sign in to comment.