diff --git a/packages/gatsby-parallel-runner/.gitignore b/packages/gatsby-parallel-runner/.gitignore new file mode 100644 index 0000000000000..c2658d7d1b318 --- /dev/null +++ b/packages/gatsby-parallel-runner/.gitignore @@ -0,0 +1 @@ +node_modules/ diff --git a/packages/gatsby-parallel-runner/.npmignore b/packages/gatsby-parallel-runner/.npmignore new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/packages/gatsby-parallel-runner/README.md b/packages/gatsby-parallel-runner/README.md new file mode 100644 index 0000000000000..658a1e2b7bbf9 --- /dev/null +++ b/packages/gatsby-parallel-runner/README.md @@ -0,0 +1,76 @@ +# Gatsby Parallel Runner + +This is an early a parallel runner for gatsby that allows plugins and core parts of Gatsby to parallelize +suited tasks such as image processing. + +When gatsby is started from a parent process with the environment variable `ENABLE_GATSBY_EXTERNAL_JOBS` set, +it will communicate some jobs up to the parent process via ipc, instead of running them in it's own internal +queue. + +This allows a parent process to orchestrate certain task across multiple workers for better parallelization +through autoscaling cloud functions or the like. + +Currently this plugin includes a processing queue implementation based on Google Cloud Functions, but the +general abstractions in place should make it easy to add similar runtimes for other cloud providers or via +different approaches to parallelization. + +## Installation and usage + +Install in your gatsby project: + +``` +npm i gatsby-parallel-runner +``` + +To use with Google Cloud, set relevant env variables in your shell: + +``` +export GOOGLE_APPLICATION_CREDENTIALS=~/path/to/your/google-credentials.json +export TOPIC=parallel-runner-topic +``` + +Deploy the cloud function: + +``` +npx gatsby-parallel-runner deploy + +``` + +Then run your Gatsby build with the parallel runner instead of the default `gatsby build` command. + +``` +npx gatsby-parallel-runner +``` + +## Processor Queues, Processors and Implementations + +Gatsby Parallel Runner comes with a set of core abstractions for parallelizing jobs. + +The main orchestrator is the Processor Queue that gives invididual processors a simple interface for +sending jobs to cloud functions and getting back results: + +```js +const result = await queue.process(job) +``` + +To do it's job, the ProcessorQueue needs a `pubSubImplementation` that must provide +`push(msg)` and `subscribe(handler)` methods for enqueuing new jobs and receiving +results. + +Implementations are defined in `src/processor-queue/implementations` and there's currently +just one of them based on Google's Cloud Functions. + +The `src/processors` folder has the different processors that can be triggered via Gatsby's +external job feature. + +The processor folder must be named after the Redux event that should trigger it. Ie, the +Image Processing processor gets triggered by the sharp plugin via an `IMAGE_PROCESSING` job, +so the folder is called `image-processing` + +Each processor can have a set of implementations based on the Processor Queue implementations +available. + +There's currently just one processor (image-processing), with an implementation for `google-functions`. + +When running `npx gatsby-parallel-runner deploy`, the active processor queue implementation will +make sure to deploy all the cloud function needed for the available processors. diff --git a/packages/gatsby-parallel-runner/bin/run.js b/packages/gatsby-parallel-runner/bin/run.js new file mode 100755 index 0000000000000..f69f9b069d0f1 --- /dev/null +++ b/packages/gatsby-parallel-runner/bin/run.js @@ -0,0 +1,37 @@ +#!/usr/bin/env node + +const { build } = require("../src/build") +const { deploy } = require("../src/deploy") +const { writeFileSync } = require("fs") + +const requiredEnvVars = ["TOPIC", "GOOGLE_APPLICATION_CREDENTIALS"] +requiredEnvVars.forEach(key => { + if (!process.env[key]) { + console.error(`You must set a ${key} environment variable`) + process.exit(1) + } +}) + +if (!process.env.GOOGLE_APPLICATION_CREDENTIALS.match(/\.json$/)) { + const credentialsFile = "/tmp/credentials.json" + let credentials = null + try { + let buff = Buffer.from(process.env.GOOGLE_APPLICATION_CREDENTIALS, "base64") + credentials = buff.toString("ascii") + } catch (err) { + console.error( + "GOOGLE_APPLICATION_CREDENTIALS must either be a path to a .json file or base 64 encoded json credentials", + err + ) + process.exit(1) + } + writeFileSync(credentialsFile, credentials) + process.env.GOOGLE_APPLICATION_CREDENTIALS = credentialsFile +} + +if (process.argv.length === 3 && process.argv[2] === "deploy") { + console.log("Deploying Cloud Worker") + deploy().catch(error => console.error(error)) +} else { + build() +} diff --git a/packages/gatsby-parallel-runner/package.json b/packages/gatsby-parallel-runner/package.json new file mode 100644 index 0000000000000..c8b6dfb4a0516 --- /dev/null +++ b/packages/gatsby-parallel-runner/package.json @@ -0,0 +1,33 @@ +{ + "name": "gatsby-parallel-runner", + "version": "1.2.3", + "description": "Gatsby plugin that allows paralellization of external tasks", + "keywords": [ + "gatsby" + ], + "main": "index.js", + "bin": { + "gatsby-parallel-runner": "./bin/run.js" + }, + "author": "Mathias Biilmann ", + "license": "MIT", + "repository": { + "type": "git", + "url": "https://github.com/gatsbyjs/gatsby.git", + "directory": "packages/gatsby-parallel-runner" + }, + "homepage": "https://github.com/gatsbyjs/gatsby/tree/master/packages/gatsby-parallel-runner#readme", + "dependencies": { + "@google-cloud/pubsub": "^0.29.1", + "@google-cloud/storage": "^4.3.0", + "fs-extra": "^8.1.0", + "grpc": "^1.24.2", + "loglevel": "^1.6.6" + }, + "devDependencies": { + "jest": "^25.1.0" + }, + "scripts": { + "test": "jest" + } +} diff --git a/packages/gatsby-parallel-runner/src/__tests__/index.js b/packages/gatsby-parallel-runner/src/__tests__/index.js new file mode 100644 index 0000000000000..8b06237aaf34b --- /dev/null +++ b/packages/gatsby-parallel-runner/src/__tests__/index.js @@ -0,0 +1,127 @@ +const path = require(`path`) +const { messageHandler } = require(`../build`) +const { resolveProcessors } = require(`../utils`) + +test(`test message handler with image processor`, async () => { + expect.assertions(2) + const fakeGatsby = { + send: jest.fn(msg => { + expect(msg).toEqual({ + type: `JOB_COMPLETED`, + payload: { + id: `1234`, + result: { outputs: [] }, + }, + }) + }), + } + + const processors = { + IMAGE_PROCESSING: { + process: jest.fn(async msg => { + expect(msg).toEqual({ + id: `1234`, + name: `IMAGE_PROCESSING`, + args: [], + inputPaths: [ + { path: path.join(__dirname, `images`, `gatsby-astronaut.png`) }, + ], + }) + return { outputs: [] } + }), + }, + } + + const handler = messageHandler(fakeGatsby, processors) + await handler({ + type: `JOB_CREATED`, + payload: { + id: `1234`, + name: `IMAGE_PROCESSING`, + args: [], + inputPaths: [ + { path: path.join(__dirname, `images`, `gatsby-astronaut.png`) }, + ], + }, + }) +}) + +test(`test message handler with failing image processor`, async () => { + expect.assertions(2) + const fakeGatsby = { + send: jest.fn(msg => { + expect(msg).toEqual({ + type: `JOB_FAILED`, + payload: { + id: `1234`, + error: `Error during processing...`, + }, + }) + }), + } + + const processors = { + IMAGE_PROCESSING: { + process: jest.fn(async msg => { + expect(msg).toEqual({ + id: `1234`, + name: `IMAGE_PROCESSING`, + args: [], + inputPaths: [ + { path: path.join(__dirname, `images`, `gatsby-astronaut.png`) }, + ], + }) + return Promise.reject(`Error during processing...`) + }), + }, + } + + const handler = messageHandler(fakeGatsby, processors) + await handler({ + type: `JOB_CREATED`, + payload: { + id: `1234`, + name: `IMAGE_PROCESSING`, + args: [], + inputPaths: [ + { path: path.join(__dirname, `images`, `gatsby-astronaut.png`) }, + ], + }, + }) +}) + +test(`test message handler with unkown processor`, async () => { + expect.assertions(1) + const fakeGatsby = { + send: jest.fn(msg => { + expect(msg).toEqual({ + type: `JOB_NOT_WHITELISTED`, + payload: { id: `1234` }, + }) + }), + } + + const handler = messageHandler(fakeGatsby, {}) + await handler({ + type: `JOB_CREATED`, + payload: { + id: `1234`, + name: `UNKOWN_PROCESSOR`, + args: [], + inputPaths: [ + { path: path.join(__dirname, `images`, `gatsby-astronaut.png`) }, + ], + }, + }) +}) + +test(`resolve processors`, async () => { + const processors = await resolveProcessors() + expect(processors).toEqual([ + { + name: `image-processing`, + key: `IMAGE_PROCESSING`, + path: path.join(__dirname, `../processors/image-processing`), + }, + ]) +}) diff --git a/packages/gatsby-parallel-runner/src/build.js b/packages/gatsby-parallel-runner/src/build.js new file mode 100644 index 0000000000000..82a9eccd85173 --- /dev/null +++ b/packages/gatsby-parallel-runner/src/build.js @@ -0,0 +1,98 @@ +#!/usr/bin/env node + +const cp = require(`child_process`) +const log = require(`loglevel`) +const path = require(`path`) +const { ProcessorQueue } = require(`./processor-queue`) +const { + GoogleFunctions, +} = require(`./processor-queue/implementations/google-functions`) +const { resolveProcessors } = require(`./utils`) + +const MESSAGE_TYPES = { + LOG_ACTION: `LOG_ACTION`, + JOB_CREATED: `JOB_CREATED`, + JOB_COMPLETED: `JOB_COMPLETED`, + JOB_FAILED: `JOB_FAILED`, + ACTIVITY_START: `ACTIVITY_START`, + ACTIVITY_END: `ACTIVITY_END`, + ACTIVITY_SUCCESS: `ACTIVITY_SUCCESS`, + ACTIVITY_ERROR: `ACTIVITY_ERROR`, +} + +function messageHandler(gatsbyProcess, processors = {}) { + return async function(msg) { + if ( + log.getLevel() <= log.levels.TRACE && + msg.type !== MESSAGE_TYPES.LOG_ACTION + ) { + log.trace(`Got gatsby message`, JSON.stringify(msg)) + } + switch (msg.type) { + case MESSAGE_TYPES.JOB_CREATED: { + const processor = processors[msg.payload.name] + if (!processor) { + gatsbyProcess.send({ + type: `JOB_NOT_WHITELISTED`, + payload: { id: msg.payload.id }, + }) + return + } + try { + const result = await processor.process(msg.payload) + gatsbyProcess.send({ + type: `JOB_COMPLETED`, + payload: { + id: msg.payload.id, + result, + }, + }) + } catch (error) { + log.error(`Processing failed`, msg.payload.id, ` error:`, error) + gatsbyProcess.send({ + type: `JOB_FAILED`, + payload: { id: msg.payload.id, error: error.toString() }, + }) + } + break + } + case MESSAGE_TYPES.LOG_ACTION: + // msg.action.payload.text && console.log(msg.action.payload.text) + break + default: + log.warn(`Ignoring message: `, msg) + } + } +} + +exports.build = async function(cmd = `node_modules/.bin/gatsby build`) { + log.setLevel(process.env.PARALLEL_RUNNER_LOG_LEVEL || `warn`) + + process.env.ENABLE_GATSBY_EXTERNAL_JOBS = true + + const processors = {} + const processorList = await resolveProcessors() + await Promise.all( + processorList.map(async processorSettings => { + const klass = require(processorSettings.path).Processor + const pubSubImplementation = await new GoogleFunctions({ + processorSettings, + }) + const processorQueue = new ProcessorQueue({ pubSubImplementation }) + + processors[processorSettings.key] = new klass(processorQueue) + }) + ) + + const [bin, ...args] = cmd.split(` `) + const gatsbyProcess = cp.fork(path.join(process.cwd(), bin), args) + gatsbyProcess.on(`exit`, async code => { + log.debug(`Gatsby existed with`, code) + process.exit(code) + }) + + const handler = messageHandler(gatsbyProcess, processors) + gatsbyProcess.on(`message`, handler) +} + +exports.messageHandler = messageHandler diff --git a/packages/gatsby-parallel-runner/src/deploy.js b/packages/gatsby-parallel-runner/src/deploy.js new file mode 100644 index 0000000000000..0f0599b7ce750 --- /dev/null +++ b/packages/gatsby-parallel-runner/src/deploy.js @@ -0,0 +1,10 @@ +const GooglePubSub = require(`./processor-queue/implementations/google-functions/deploy`) + +exports.deploy = async function() { + try { + await GooglePubSub.deploy() + } catch (err) { + console.error(`Failed to deploy parallel functions`, err) + process.exit(1) + } +} diff --git a/packages/gatsby-parallel-runner/src/processor-queue/__tests__/hello.txt b/packages/gatsby-parallel-runner/src/processor-queue/__tests__/hello.txt new file mode 100644 index 0000000000000..8ab686eafeb1f --- /dev/null +++ b/packages/gatsby-parallel-runner/src/processor-queue/__tests__/hello.txt @@ -0,0 +1 @@ +Hello, World! diff --git a/packages/gatsby-parallel-runner/src/processor-queue/__tests__/index.js b/packages/gatsby-parallel-runner/src/processor-queue/__tests__/index.js new file mode 100644 index 0000000000000..909024a15c851 --- /dev/null +++ b/packages/gatsby-parallel-runner/src/processor-queue/__tests__/index.js @@ -0,0 +1,113 @@ +"use strict" + +const path = require(`path`) +const { ProcessorQueue } = require(`../index`) +const { Job } = require(`../queue`) + +process.env.TOPIC = `test` + +test(`Job size calculation for string`, async () => { + const job = await new Job({ + id: `1234`, + args: [], + file: Buffer.from(`Hello, World`), + }) + expect(job.fileSize).toBe(12) +}) + +test(`Job size calculation for file`, async () => { + const job = await new Job({ + id: `1234`, + args: [], + file: path.join(__dirname, `hello.txt`), + }) + expect(job.fileSize).toBe(14) +}) + +test(`Job size calculation for missing file`, async () => { + const file = path.join(__dirname, `nopes.txt`) + expect.assertions(1) + await expect(new Job({ id: `1234`, args: [], file })).rejects.toThrow() +}) + +test(`job message for string`, async () => { + const job = await new Job({ + id: `1234`, + args: [], + file: Buffer.from(`Hello, World`), + }) + const msg = await job.msg() + expect(msg).toBeInstanceOf(Buffer) + expect(JSON.parse(msg.toString())).toEqual({ + id: `1234`, + action: [], + file: Buffer.from(`Hello, World`).toString(`base64`), + topic: `test`, + }) +}) + +test(`job message for file`, async () => { + const file = path.join(__dirname, `hello.txt`) + const job = await new Job({ id: `1234`, args: [], file }) + const msg = await job.msg() + expect(msg).toBeInstanceOf(Buffer) + expect(JSON.parse(msg.toString())).toEqual({ + id: `1234`, + action: [], + file: Buffer.from(`Hello, World!\n`).toString(`base64`), + topic: `test`, + }) +}) + +test(`process should push a job unto the queue`, async () => { + expect.assertions(2) + const pubSubImplementation = { + publish: msg => { + expect(msg).toBeDefined() + }, + subscribe: handler => { + setTimeout(() => + handler({ + id: `2345`, + type: `JOB_COMPLETED`, + payload: { id: `1234`, output: `done` }, + }) + ) + }, + } + const processor = new ProcessorQueue({ pubSubImplementation }) + const result = await processor.process({ + id: `1234`, + args: [], + file: Buffer.from(`Hello`), + }) + expect(result).toEqual({ id: `1234`, output: `done` }) +}) + +test(`failure message should cancel processing`, async () => { + expect.assertions(2) + const pubSubImplementation = { + publish: msg => { + expect(msg).toBeDefined() + }, + subscribe: handler => { + setTimeout(() => + handler({ + id: `2345`, + type: `JOB_FAILED`, + payload: { id: `1234`, error: `Error` }, + }) + ) + }, + } + const processor = new ProcessorQueue({ pubSubImplementation }) + try { + await processor.process({ + id: `1234`, + args: [], + file: Buffer.from(`Hello`), + }) + } catch (err) { + expect(err).toBeDefined() + } +}) diff --git a/packages/gatsby-parallel-runner/src/processor-queue/implementations/google-functions/__tests__/index.js b/packages/gatsby-parallel-runner/src/processor-queue/implementations/google-functions/__tests__/index.js new file mode 100644 index 0000000000000..5120cc5846b34 --- /dev/null +++ b/packages/gatsby-parallel-runner/src/processor-queue/implementations/google-functions/__tests__/index.js @@ -0,0 +1,53 @@ +const { GoogleFunctions } = require(`..`) + +const googleConfig = { project_id: `test-project` } +const processorSettings = { name: `image-processor`, key: `IMAGE_PROCESSOR` } + +test(`instantiate google pubsub`, async () => { + const pubSub = await new GoogleFunctions({ + noSubscription: true, + processorSettings, + googleConfig, + }) + expect(pubSub).toBeInstanceOf(GoogleFunctions) +}) + +test(`size check for google publish`, async () => { + const pubSub = await new GoogleFunctions({ + noSubscription: true, + processorSettings, + googleConfig, + }) + const msg = Buffer.from(`Hello, World!`) + pubSub.maxPubSubSize = 10000 + pubSub.pubSubClient = { + topic: () => { + return { + publish: async pubSubMsg => { + expect(pubSubMsg).toBe(msg) + }, + } + }, + } + pubSub.storageClient = { + bucket: () => { + return { + file: path => { + expect(path).toEqual(`event-2345`) + return { + save: async (data, options) => { + expect(Buffer.from(data, `base64`).toString()).toEqual( + `Hello, World!` + ) + expect(options).toEqual({ resumable: false }) + }, + } + }, + } + }, + } + pubSub.publish(`1234`, msg) + + pubSub.maxPubSubSize = 2 + pubSub.publish(`2345`, msg) +}) diff --git a/packages/gatsby-parallel-runner/src/processor-queue/implementations/google-functions/deploy.js b/packages/gatsby-parallel-runner/src/processor-queue/implementations/google-functions/deploy.js new file mode 100644 index 0000000000000..d5bbf1023662c --- /dev/null +++ b/packages/gatsby-parallel-runner/src/processor-queue/implementations/google-functions/deploy.js @@ -0,0 +1,122 @@ +#!/usr/bin/env node + +const { spawn } = require(`child_process`) +const { readFile, pathExists } = require(`fs-extra`) +const path = require(`path`) +const { PubSub } = require(`@google-cloud/pubsub`) +const { Storage } = require(`@google-cloud/storage`) +const { topicFor, bucketFor } = require(`./utils`) +const { resolveProcessors } = require(`../../../utils`) + +function functionName(processor, type) { + return `processor-${processor.name}-${type}` + .toLocaleLowerCase() + .replace(/[^a-z0-9-]/g, `-`) + .replace(/-+/, `-`) +} + +function deployType(type, processor, cwd, config) { + return new Promise((resolve, reject) => { + const args = [ + `functions`, + `deploy`, + functionName(processor, type), + `--entry-point`, + `processor`, + `--memory`, + `1024MB`, + `--service-account`, + config.client_email, + `--project`, + config.project_id, + `--runtime`, + `nodejs10`, + ] + if (type === `PubSub`) { + args.push(`--trigger-topic`) + args.push(topicFor(processor)) + } else { + args.push(`--trigger-resource`) + args.push(bucketFor(processor)) + args.push(`--trigger-event google.storage.object.finalize`) + } + + const ps = spawn(`gcloud`, args, { shell: true, cwd, stdio: `inherit` }) + + ps.on(`close`, code => { + if (code === 0) { + return resolve(code) + } + return reject(code) + }) + }) +} + +exports.deploy = async function() { + const creds = await readFile(process.env.GOOGLE_APPLICATION_CREDENTIALS) + const config = JSON.parse(creds) + + const pubSubClient = new PubSub({ + projectId: config.project_id, + }) + const storage = new Storage({ + projectId: config.project_id, + }) + + const processors = await resolveProcessors() + try { + await Promise.all( + processors.map(async processor => { + const cwd = path.join( + processor.path, + `implementations`, + `google-functions` + ) + const exists = await pathExists(cwd) + if (!exists) { + console.warn(`No google-functions implementation for`, processor.path) + return null + } + + try { + await pubSubClient.createTopic(topicFor(processor)) + } catch (err) { + console.log(`Create topic failed`, err) + } + + try { + const lifeCycle = ` + + + + + + + 30 + + + ` + const [bucket] = await storage.createBucket(bucketFor(processor)) + await bucket.setMetadata({ lifeCycle }) + } catch (err) { + console.log(`Create bucket failed`, err) + } + + try { + console.log(`Deploying as pubsub handler`) + await deployType(`PubSub`, processor, cwd, config) + + console.log(`Deploying as storage handler`) + await deployType(`Storage`, processor, cwd, config) + } catch (err) { + console.log(`Error: `, err) + return Promise.reject(err) + } + return null + }) + ) + } catch (err) { + return Promise.reject(err) + } + return null +} diff --git a/packages/gatsby-parallel-runner/src/processor-queue/implementations/google-functions/index.js b/packages/gatsby-parallel-runner/src/processor-queue/implementations/google-functions/index.js new file mode 100644 index 0000000000000..fac0ce911452c --- /dev/null +++ b/packages/gatsby-parallel-runner/src/processor-queue/implementations/google-functions/index.js @@ -0,0 +1,141 @@ +const { PubSub } = require(`@google-cloud/pubsub`) +const { Storage } = require(`@google-cloud/storage`) +const fs = require(`fs-extra`) +const path = require(`path`) +const log = require(`loglevel`) +const { topicFor, bucketFor } = require(`./utils`) + +const DEFAULT_MAX_PUB_SUB_SIZE = 1024 * 1024 * 5 // 5 Megabyte + +class GoogleFunctions { + constructor({ + processorSettings, + maxPubSubSize, + noSubscription, + googleConfig, + }) { + this.maxPubSubSize = maxPubSubSize || DEFAULT_MAX_PUB_SUB_SIZE + const config = + googleConfig || + JSON.parse(fs.readFileSync(process.env.GOOGLE_APPLICATION_CREDENTIALS)) + this.subName = `gatsby-sub-${Date.now()}` + this.workerBucket = bucketFor(processorSettings) + this.workerTopic = topicFor(processorSettings) + this.resultBucketName = `event-results-${process.env.TOPIC}` + this.resultTopic = process.env.TOPIC + this.pubSubClient = new PubSub({ projectId: config.project_id }) + this.storageClient = new Storage({ projectId: config.project_id }) + this.subscribers = [] + + return (async () => { + const topicCreatedFile = path.join( + `.cache`, + `topic-created-${process.env.TOPIC}` + ) + const exists = await fs.pathExists(topicCreatedFile) + if (exists) { + return this + } + + try { + if (!noSubscription) { + await this._createSubscription() + await this._createBucket() + } + } catch (err) { + return Promise.reject( + `Failed to start Google PubSub subscriptionn: ${err}` + ) + } + + await fs.ensureFile(topicCreatedFile) + + return this + })() + } + + subscribe(handler) { + this.subscribers.push(handler) + } + + async publish(id, msg) { + if (msg.byteLength < this.maxPubSubSize) { + log.debug(`Publishing ${id} to pubsub ${this.workerTopic}`) + await this.pubSubClient.topic(this.workerTopic).publish(msg) + } else { + log.debug(`Publishing ${id} to storage ${this.workerBucket}`) + await this.storageClient + .bucket(this.workerBucket) + .file(`event-${id}`) + .save(msg.toString(`base64`), { resumable: false }) + } + } + + async _messageHandler(msg) { + msg.ack() + const pubSubMessage = JSON.parse(Buffer.from(msg.data, `base64`).toString()) + if (pubSubMessage.storedPayload) { + const payload = await this._downloadFromStorage( + msg.id, + pubSubMessage.storedPayload + ) + pubSubMessage.payload = payload + delete pubSubMessage.storedPayload + } + this.subscribers.forEach(handler => handler(pubSubMessage)) + } + + async _createSubscription() { + // Creates a new subscription + try { + await this.pubSubClient.createTopic(this.resultTopic) + } catch (err) { + log.trace(`Create result topic failed`, err) + } + + const [subscription] = await this.pubSubClient + .topic(this.resultTopic) + .createSubscription(this.subName) + + subscription.on(`message`, this._messageHandler.bind(this)) + subscription.on(`error`, err => log.error(`Error from subscription: `, err)) + subscription.on(`close`, err => + log.error(`Subscription closed unexpectedly`, err) + ) + } + + async _createBucket() { + try { + const lifeCycle = ` + + + + + + + 30 + + + ` + const [bucket] = await this.storageClient.createBucket( + this.resultBucketName + ) + await bucket.setMetadata({ lifeCycle }) + } catch (err) { + log.trace(`Create result bucket failed`, err) + } + } + + async _downloadFromStorage(id, storedPayload) { + const file = this.storageClient + .bucket(this.resultBucketName) + .file(storedPayload) + await file.download({ destination: `/tmp/result-${id}` }) + const data = (await fs.readFile(`/tmp/result-${id}`)).toString() + const payload = JSON.parse(data) + await fs.remove(`/tmp/result-${id}`) + return payload + } +} + +exports.GoogleFunctions = GoogleFunctions diff --git a/packages/gatsby-parallel-runner/src/processor-queue/implementations/google-functions/utils.js b/packages/gatsby-parallel-runner/src/processor-queue/implementations/google-functions/utils.js new file mode 100644 index 0000000000000..04c6415512cbb --- /dev/null +++ b/packages/gatsby-parallel-runner/src/processor-queue/implementations/google-functions/utils.js @@ -0,0 +1,7 @@ +exports.topicFor = function(processor) { + return `${process.env.WORKER_TOPIC || `gatsby_worker`}_${processor.name}` +} + +exports.bucketFor = function(processor) { + return `${process.env.WORKER_TOPIC || `gatsby_worker`}_${processor.name}` +} diff --git a/packages/gatsby-parallel-runner/src/processor-queue/index.js b/packages/gatsby-parallel-runner/src/processor-queue/index.js new file mode 100644 index 0000000000000..49449ff963e66 --- /dev/null +++ b/packages/gatsby-parallel-runner/src/processor-queue/index.js @@ -0,0 +1,44 @@ +"use strict" +const { Queue, Job } = require(`./queue`) + +const DEFAULT_MAX_MESSAGE_MEM = 1024 * 1024 * 5 * 10 // 500 megabytes + +class ProcessorQueue { + constructor({ maxJobTime, maxMessageMem, pubSubImplementation }) { + this._mem = 0 + this.maxMessageMem = maxMessageMem || DEFAULT_MAX_MESSAGE_MEM + + this.queue = new Queue({ maxJobTime, maxMessageMem, pubSubImplementation }) + } + + async process(payload) { + let size = 0 + try { + const job = await new Job(payload) + size = job.fileSize + await this._waitForFreeMessageMem() + this._mem += size + const msg = await job.msg() + const result = await this.queue.push(job.id, msg) + this._mem -= size + return result + } catch (err) { + this._mem -= size + return Promise.reject(err) + } + } + + async _waitForFreeMessageMem() { + return new Promise((resolve, reject) => { + const check = () => { + if (this._mem <= this.maxMessageMem) { + return resolve() + } + return setTimeout(check, 100) + } + check() + }) + } +} + +exports.ProcessorQueue = ProcessorQueue diff --git a/packages/gatsby-parallel-runner/src/processor-queue/queue.js b/packages/gatsby-parallel-runner/src/processor-queue/queue.js new file mode 100644 index 0000000000000..335c216cf592c --- /dev/null +++ b/packages/gatsby-parallel-runner/src/processor-queue/queue.js @@ -0,0 +1,111 @@ +const fs = require(`fs-extra`) +const log = require(`loglevel`) + +const DEFAULT_MAX_JOB_TIME = process.env.PARALLEL_RUNNER_TIMEOUT + ? parseInt(process.env.PARALLEL_RUNNER_TIMEOUT, 10) + : 5 * 60 * 1000 + +const MESSAGE_TYPES = { + JOB_COMPLETED: `JOB_COMPLETED`, + JOB_FAILED: `JOB_FAILED`, +} + +class Job { + constructor({ id, args, file }) { + this.id = id + this.args = args + this.file = file + + return (async () => { + try { + await this._calculateSize() + } catch (err) { + return Promise.reject(err) + } + return this + })() + } + + async msg() { + const data = await this._readData() + return Buffer.from( + JSON.stringify({ + id: this.id, + file: data.toString(`base64`), + action: this.args, + topic: process.env.TOPIC, + }) + ) + } + + async _calculateSize() { + if (this.file instanceof Buffer) { + return (this.fileSize = this.file.byteLength) + } + try { + const stat = await fs.stat(this.file) + return (this.fileSize = stat.size) + } catch (err) { + return Promise.reject(err) + } + } + + async _readData() { + if (this.file instanceof Buffer) { + return this.file + } + return await fs.readFile(this.file) + } +} + +class Queue { + constructor({ maxJobTime, pubSubImplementation }) { + this._jobs = new Map() + this.maxJobTime = maxJobTime || DEFAULT_MAX_JOB_TIME + this.pubSubImplementation = pubSubImplementation + if (pubSubImplementation) { + pubSubImplementation.subscribe(this._onMessage.bind(this)) + } + } + + async push(id, msg) { + return new Promise(async (resolve, reject) => { + this._jobs.set(id, { resolve, reject }) + setTimeout(() => { + if (this._jobs.has(id)) { + reject(`Job timed out ${id}`) + } + }, this.maxJobTime) + try { + await this.pubSubImplementation.publish(id, msg) + } catch (err) { + reject(err) + } + }) + } + + _onMessage(pubSubMessage) { + const { type, payload } = pubSubMessage + log.debug(`Got worker message`, type, payload && payload.id) + + switch (type) { + case MESSAGE_TYPES.JOB_COMPLETED: + if (this._jobs.has(payload.id)) { + this._jobs.get(payload.id).resolve(payload) + this._jobs.delete(payload.id) + } + return + case MESSAGE_TYPES.JOB_FAILED: + if (this._jobs.has(payload.id)) { + this._jobs.get(payload.id).reject(payload.error) + this._jobs.delete(payload.id) + } + return + default: + log.error(`Unkown worker message: `, pubSubMessage) + } + } +} + +exports.Job = Job +exports.Queue = Queue diff --git a/packages/gatsby-parallel-runner/src/processors/image-processing/__tests__/index.js b/packages/gatsby-parallel-runner/src/processors/image-processing/__tests__/index.js new file mode 100644 index 0000000000000..b747cbbd1d4cd --- /dev/null +++ b/packages/gatsby-parallel-runner/src/processors/image-processing/__tests__/index.js @@ -0,0 +1,42 @@ +const path = require(`path`) +const fs = require(`fs-extra`) +const { Processor } = require(`../index`) + +test(`test image processor`, async () => { + await fs.remove(`/tmp/gatsby-parallel-transformed-image.png`) + const mockProcessorQueue = { + process: jest.fn(async msg => { + expect(msg).toEqual({ + id: `1234`, + args: [], + file: path.join(__dirname, `images`, `gatsby-astronaut.png`), + }) + return { + files: { + "gatsby-parallel-transformed-image.png": Buffer.from( + `bogus data` + ).toString(`base64`), + }, + output: [ + { + outputPath: `gatsby-parallel-transformed-image.png`, + args: [], + }, + ], + } + }), + } + const imageProcessor = new Processor(mockProcessorQueue) + await imageProcessor.process({ + id: `1234`, + name: `IMAGE_PROCESSING`, + args: [], + outputDir: `/tmp`, + inputPaths: [ + { path: path.join(__dirname, `images`, `gatsby-astronaut.png`) }, + ], + }) + + const data = await fs.readFile(`/tmp/gatsby-parallel-transformed-image.png`) + expect(data.toString()).toEqual(`bogus data`) +}) diff --git a/packages/gatsby-parallel-runner/src/processors/image-processing/implementations/google-functions/.gcloudignore b/packages/gatsby-parallel-runner/src/processors/image-processing/implementations/google-functions/.gcloudignore new file mode 100644 index 0000000000000..ccc4eb240e1c3 --- /dev/null +++ b/packages/gatsby-parallel-runner/src/processors/image-processing/implementations/google-functions/.gcloudignore @@ -0,0 +1,16 @@ +# This file specifies files that are *not* uploaded to Google Cloud Platform +# using gcloud. It follows the same syntax as .gitignore, with the addition of +# "#!include" directives (which insert the entries of the given .gitignore-style +# file at that point). +# +# For more information, run: +# $ gcloud topic gcloudignore +# +.gcloudignore +# If you would like to upload your .git directory, .gitignore file or files +# from your .gitignore file, remove the corresponding line +# below: +.git +.gitignore + +node_modules diff --git a/packages/gatsby-parallel-runner/src/processors/image-processing/implementations/google-functions/cloud-function-task-runner.js b/packages/gatsby-parallel-runner/src/processors/image-processing/implementations/google-functions/cloud-function-task-runner.js new file mode 100644 index 0000000000000..69ead58383efe --- /dev/null +++ b/packages/gatsby-parallel-runner/src/processors/image-processing/implementations/google-functions/cloud-function-task-runner.js @@ -0,0 +1,83 @@ +const fs = require(`fs-extra`) +const sizeof = require(`object-sizeof`) +const klaw = require(`klaw`) +const { PubSub } = require(`@google-cloud/pubsub`) +const { Storage } = require(`@google-cloud/storage`) + +const MAX_PUBSUB_RESPONSE_SIZE = 1024 * 1024 // 5mb + +const pubSubClient = new PubSub() +const storageClient = new Storage() + +async function processPubSubMessageOrStorageObject(msg) { + let data = null + + if (msg.bucket && msg.name) { + const bucket = storageClient.bucket(msg.bucket) + const file = bucket.file(msg.name) + await file.download({ destination: `/tmp/${msg.name}` }) + data = (await fs.readFile(`/tmp/${msg.name}`)).toString() + } else { + data = msg.data + } + + return JSON.parse(Buffer.from(data, `base64`).toString()) +} + +exports.runTask = async (msg, handler) => { + await fs.mkdirp(`/tmp/output`) + process.chdir(`/tmp/output`) + + const event = await processPubSubMessageOrStorageObject(msg) + console.log(`Processing`, event.id) + try { + const file = Buffer.from(event.file, `base64`) + const output = await handler(file, event) + const result = { type: `JOB_COMPLETED` } + const payload = { id: event.id, files: {}, output } + for await (const file of klaw(`/tmp/output`)) { + if (file.stats.isFile()) { + const data = await fs.readFile(file.path) + payload.files[ + file.path.replace(/^\/tmp\/output\//, ``) + ] = data.toString(`base64`) + } + } + + const size = sizeof(payload) + if (size > MAX_PUBSUB_RESPONSE_SIZE) { + await storageClient + .bucket(`event-results-${event.topic}`) + .file(`result-${event.id}`) + .save(Buffer.from(JSON.stringify(payload)), { resumable: false }) + result.storedPayload = `result-${event.id}` + } else { + result.payload = payload + } + + const resultMsg = Buffer.from(JSON.stringify(result)) + const messageId = await pubSubClient.topic(event.topic).publish(resultMsg) + console.log( + `Published message `, + event.id, + messageId, + resultMsg.length, + result.storedPayload + ) + await fs.emptyDir(`/tmp`) + } catch (err) { + console.error(`Failed to process message:`, event.id, err) + await pubSubClient.topic(event.topic).publish( + Buffer.from( + JSON.stringify({ + type: `JOB_FAILED`, + payload: { + id: event.id, + error: err.toString(), + }, + }) + ) + ) + await fs.emptyDir(`/tmp`) + } +} diff --git a/packages/gatsby-parallel-runner/src/processors/image-processing/implementations/google-functions/index.js b/packages/gatsby-parallel-runner/src/processors/image-processing/implementations/google-functions/index.js new file mode 100644 index 0000000000000..fd06d1ac90936 --- /dev/null +++ b/packages/gatsby-parallel-runner/src/processors/image-processing/implementations/google-functions/index.js @@ -0,0 +1,13 @@ +const { runTask } = require(`./cloud-function-task-runner`) +const { processFile } = require(`gatsby-plugin-sharp/process-file`) + +exports.processor = async (msg, context) => { + await runTask(msg, async (file, event) => { + const results = processFile( + file, + event.action.operations, + event.action.pluginOptions + ) + return Promise.all(results) + }) +} diff --git a/packages/gatsby-parallel-runner/src/processors/image-processing/implementations/google-functions/package.json b/packages/gatsby-parallel-runner/src/processors/image-processing/implementations/google-functions/package.json new file mode 100644 index 0000000000000..b7ffbc11bca04 --- /dev/null +++ b/packages/gatsby-parallel-runner/src/processors/image-processing/implementations/google-functions/package.json @@ -0,0 +1,19 @@ +{ + "name": "cloud_sharp_processor", + "version": "1.0.0", + "description": "", + "main": "index.js", + "scripts": { + "test": "echo \"Error: no test specified\" && exit 1" + }, + "author": "", + "license": "ISC", + "dependencies": { + "@google-cloud/pubsub": "^1.4.1", + "@google-cloud/storage": "^4.3.0", + "fs-extra": "^8.1.0", + "gatsby-plugin-sharp": "^2.4.5", + "klaw": "^3.0.0", + "object-sizeof": "^1.5.2" + } +} diff --git a/packages/gatsby-parallel-runner/src/processors/image-processing/index.js b/packages/gatsby-parallel-runner/src/processors/image-processing/index.js new file mode 100644 index 0000000000000..bc74713a0ed6a --- /dev/null +++ b/packages/gatsby-parallel-runner/src/processors/image-processing/index.js @@ -0,0 +1,50 @@ +const fs = require(`fs-extra`) +const path = require(`path`) +const log = require(`loglevel`) + +class ImageProcessingProcessor { + constructor(queue) { + this.queue = queue + } + + async process(msg) { + if (!msg.inputPaths || msg.inputPaths.length > 1) { + log.error(`Wrong number of input paths in msg: `, msg) + return Promise.reject(`Wrong number of input paths`) + } + + const file = msg.inputPaths[0].path + + try { + log.debug(`Processing image`, file) + const result = await this.queue.process({ + id: msg.id, + args: msg.args, + file, + }) + log.debug(`Got output from processing`) + await Promise.all( + result.output.map(async transform => { + const filePath = path.join(msg.outputDir, transform.outputPath) + try { + await fs.mkdirp(path.dirname(filePath)) + } catch (err) { + return Promise.reject(`Failed making output directory: ${err}`) + } + log.debug(`Writing tranform to file`) + await fs.writeFile( + filePath, + Buffer.from(result.files[transform.outputPath], `base64`) + ) + return null + }) + ) + return { output: result.output } + } catch (err) { + log.error(`Error during processing`, err) + return Promise.reject(err) + } + } +} + +exports.Processor = ImageProcessingProcessor diff --git a/packages/gatsby-parallel-runner/src/utils.js b/packages/gatsby-parallel-runner/src/utils.js new file mode 100644 index 0000000000000..33719b63f92c1 --- /dev/null +++ b/packages/gatsby-parallel-runner/src/utils.js @@ -0,0 +1,22 @@ +const path = require(`path`) +const fs = require(`fs-extra`) + +exports.resolveProcessors = async function() { + const processorDir = path.join(__dirname, `processors`) + const pathNames = await fs.readdir(processorDir) + const dirs = await Promise.all( + pathNames.map(async name => { + const stat = await fs.stat(path.join(processorDir, name)) + return { stat, name } + }) + ) + return dirs + .filter(dir => dir.stat.isDirectory()) + .map(dir => { + return { + path: path.join(processorDir, dir.name), + name: dir.name, + key: dir.name.toUpperCase().replace(/-/g, `_`), + } + }) +}