Skip to content

Commit

Permalink
feat(gatsby): enable external jobs with ipc (#20835)
Browse files Browse the repository at this point in the history
* feat(gatsby): enable external jobs with ipc

* Update jobs-manager.js

* Update jobs-manager.js

* add jsdoc

* reset process.send/on

* bump ipc version

* don't export local jobs
  • Loading branch information
wardpeet authored and GatsbyJS Bot committed Jan 24, 2020
1 parent 132606b commit b4c5bfb
Show file tree
Hide file tree
Showing 3 changed files with 259 additions and 4 deletions.
2 changes: 1 addition & 1 deletion packages/gatsby/ipc.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{
"version": 1
"version": 2
}
176 changes: 175 additions & 1 deletion packages/gatsby/src/utils/__tests__/jobs-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ jest.mock(`p-defer`, () =>
jest.mock(`gatsby-cli/lib/reporter`, () => {
return {
phantomActivity: jest.fn(),
warn: jest.fn(),
}
})

Expand All @@ -24,6 +25,16 @@ jest.mock(
{ virtual: true }
)

jest.mock(
`/gatsby-plugin-local/gatsby-worker.js`,
() => {
return {
TEST_JOB: jest.fn(),
}
},
{ virtual: true }
)

jest.mock(`uuid/v4`, () =>
jest.fn().mockImplementation(jest.requireActual(`uuid/v4`))
)
Expand Down Expand Up @@ -216,7 +227,9 @@ describe(`Jobs manager`, () => {
try {
await enqueueJob(jobArgs)
} catch (err) {
expect(err).toMatchInlineSnapshot(`[WorkerError: An error occured]`)
expect(err).toMatchInlineSnapshot(
`[WorkerError: Error: An error occured]`
)
}
try {
await enqueueJob(jobArgs2)
Expand Down Expand Up @@ -334,4 +347,165 @@ describe(`Jobs manager`, () => {
expect(isJobStale({ inputPaths })).toBe(false)
})
})

describe(`IPC jobs`, () => {
let listeners = []
beforeAll(() => {
jest.useFakeTimers()
})

let originalProcessOn
let originalSend
beforeEach(() => {
process.env.ENABLE_GATSBY_EXTERNAL_JOBS = `true`
listeners = []
originalProcessOn = process.on
originalSend = process.send
process.on = (type, cb) => {
listeners.push(cb)
}

process.send = jest.fn()
})

afterAll(() => {
delete process.env.ENABLE_GATSBY_EXTERNAL_JOBS
jest.useRealTimers()
process.on = originalProcessOn
process.send = originalSend
})

it(`should schedule a remote job when ipc and env variable are enabled`, async () => {
const { enqueueJob } = jobManager
const jobArgs = createInternalMockJob()

enqueueJob(jobArgs)

jest.runAllTimers()

expect(process.send).toHaveBeenCalled()
expect(process.send).toHaveBeenCalledWith({
type: `JOB_CREATED`,
payload: jobArgs,
})

expect(listeners.length).toBe(1)
expect(worker.TEST_JOB).not.toHaveBeenCalled()
})

it(`should resolve a job when complete message is received`, async () => {
const { enqueueJob } = jobManager
const jobArgs = createInternalMockJob()

const promise = enqueueJob(jobArgs)
jest.runAllTimers()

listeners[0]({
type: `JOB_COMPLETED`,
payload: {
id: jobArgs.id,
result: {
output: `hello`,
},
},
})

jest.runAllTimers()

await expect(promise).resolves.toStrictEqual({
output: `hello`,
})
expect(worker.TEST_JOB).not.toHaveBeenCalled()
})

it(`should reject a job when failed message is received`, async () => {
const { enqueueJob } = jobManager
const jobArgs = createInternalMockJob()

const promise = enqueueJob(jobArgs)

jest.runAllTimers()

listeners[0]({
type: `JOB_FAILED`,
payload: {
id: jobArgs.id,
error: `JOB failed...`,
},
})

jest.runAllTimers()

await expect(promise).rejects.toStrictEqual(
new jobManager.WorkerError(`JOB failed...`)
)
expect(worker.TEST_JOB).not.toHaveBeenCalled()
})

it(`should run the worker locally when it's not available externally`, async () => {
worker.TEST_JOB.mockReturnValue({ output: `myresult` })
const { enqueueJob } = jobManager
const jobArgs = createInternalMockJob()

const promise = enqueueJob(jobArgs)

jest.runAllTimers()

listeners[0]({
type: `JOB_NOT_WHITELISTED`,
payload: {
id: jobArgs.id,
},
})

jest.runAllTimers()

await expect(promise).resolves.toStrictEqual({ output: `myresult` })
expect(worker.TEST_JOB).toHaveBeenCalledTimes(1)
})

it(`should run the worker locally when it's a local plugin`, async () => {
jest.useRealTimers()
const worker = require(`/gatsby-plugin-local/gatsby-worker.js`)
const { enqueueJob, createInternalJob } = jobManager
const jobArgs = createInternalJob(createMockJob(), {
name: `gatsby-plugin-local`,
version: `1.0.0`,
resolve: `/gatsby-plugin-local`,
})

await expect(enqueueJob(jobArgs)).resolves.toBeUndefined()
expect(process.send).not.toHaveBeenCalled()
expect(worker.TEST_JOB).toHaveBeenCalledTimes(1)
})

it(`shouldn't schedule a remote job when ipc is enabled and env variable is false`, async () => {
process.env.ENABLE_GATSBY_EXTERNAL_JOBS = `false`
jest.useRealTimers()
const { enqueueJob } = jobManager
const jobArgs = createInternalMockJob()

await enqueueJob(jobArgs)

expect(process.send).not.toHaveBeenCalled()
expect(worker.TEST_JOB).toHaveBeenCalled()
})

it(`should warn when external jobs are enabled but ipc isn't used`, async () => {
process.env.ENABLE_GATSBY_EXTERNAL_JOBS = `true`
process.send = null
jest.useRealTimers()
const { enqueueJob } = jobManager
const jobArgs = createInternalMockJob()
const jobArgs2 = createInternalMockJob({
args: { key: `val` },
})

await enqueueJob(jobArgs)
await enqueueJob(jobArgs2)

expect(reporter.warn).toHaveBeenCalledTimes(1)
expect(worker.TEST_JOB).toHaveBeenCalled()
})
})
})
85 changes: 83 additions & 2 deletions packages/gatsby/src/utils/jobs-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,22 @@ const _ = require(`lodash`)
const { createContentDigest, slash } = require(`gatsby-core-utils`)
const reporter = require(`gatsby-cli/lib/reporter`)

const MESSAGE_TYPES = {
JOB_CREATED: `JOB_CREATED`,
JOB_COMPLETED: `JOB_COMPLETED`,
JOB_FAILED: `JOB_FAILED`,
JOB_NOT_WHITELISTED: `JOB_NOT_WHITELISTED`,
}

let activityForJobs = null
let activeJobs = 0
let isListeningForMessages = false
let hasShownIPCDisabledWarning = false

/** @type {Map<string, {id: string, deferred: pDefer.DeferredPromise<any>}>} */
const jobsInProcess = new Map()
/** @type {Map<string, {job: InternalJob, deferred: pDefer.DeferredPromise<any>}>} */
const externalJobsMap = new Map()

/**
* We want to use absolute paths to make sure they are on the filesystem
Expand Down Expand Up @@ -57,6 +68,10 @@ const createFileHash = path => hasha.fromFileSync(path, { algorithm: `sha1` })
/** @type {pDefer.DeferredPromise<void>|null} */
let hasActiveJobs = null

const hasExternalJobsEnabled = () =>
process.env.ENABLE_GATSBY_EXTERNAL_JOBS === `true` ||
process.env.ENABLE_GATSBY_EXTERNAL_JOBS === `1`

/**
* Get the local worker function and execute it on the user's machine
*
Expand All @@ -81,12 +96,60 @@ const runLocalWorker = async (workerFn, job) => {
})
)
} catch (err) {
reject(err)
reject(new WorkerError(err))
}
})
})
}

const listenForJobMessages = () => {
process.on(`message`, msg => {
if (
msg &&
msg.type &&
msg.payload &&
msg.payload.id &&
externalJobsMap.has(msg.payload.id)
) {
const { job, deferred } = externalJobsMap.get(msg.payload.id)
switch (msg.type) {
case MESSAGE_TYPES.JOB_COMPLETED: {
deferred.resolve(msg.payload.result)
break
}
case MESSAGE_TYPES.JOB_FAILED: {
deferred.reject(new WorkerError(msg.payload.error))
break
}
case MESSAGE_TYPES.JOB_NOT_WHITELISTED: {
deferred.resolve(runJob(job, true))
break
}
}

externalJobsMap.delete(msg.payload.id)
}
})
}

/**
* @param {InternalJob} job
*/
const runExternalWorker = job => {
const deferred = pDefer()
externalJobsMap.set(job.id, {
job,
deferred,
})

process.send({
type: MESSAGE_TYPES.JOB_CREATED,
payload: job,
})

return deferred.promise
}

/**
* Make sure we have everything we need to run a job
* If we do, run it locally.
Expand All @@ -95,14 +158,32 @@ const runLocalWorker = async (workerFn, job) => {
* @param {InternalJob} job
* @return {Promise<object>}
*/
const runJob = job => {
const runJob = (job, forceLocal = false) => {
const { plugin } = job
try {
const worker = require(path.posix.join(plugin.resolve, `gatsby-worker.js`))
if (!worker[job.name]) {
throw new Error(`No worker function found for ${job.name}`)
}

if (!forceLocal && !job.plugin.isLocal && hasExternalJobsEnabled()) {
if (process.send) {
if (!isListeningForMessages) {
isListeningForMessages = true
listenForJobMessages()
}

return runExternalWorker(job)
} else {
// only show the offloading warning once
if (!hasShownIPCDisabledWarning) {
hasShownIPCDisabledWarning = true
reporter.warn(
`Offloading of a job failed as IPC could not be detected. Running job locally.`
)
}
}
}
return runLocalWorker(worker[job.name], job)
} catch (err) {
throw new Error(
Expand Down

0 comments on commit b4c5bfb

Please sign in to comment.