From 59a4399ab21b5041c7ae6e4f7ac96a827f2db4de Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Fri, 25 Jun 2021 15:24:07 -0700 Subject: [PATCH 1/3] fix(gatsby-source-drupal): getNext no longer returns a value --- packages/gatsby-source-drupal/src/gatsby-node.js | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/gatsby-source-drupal/src/gatsby-node.js b/packages/gatsby-source-drupal/src/gatsby-node.js index 242a47c5d759f..3096a552d7307 100644 --- a/packages/gatsby-source-drupal/src/gatsby-node.js +++ b/packages/gatsby-source-drupal/src/gatsby-node.js @@ -373,9 +373,8 @@ exports.sourceNodes = async ( apiBase, urlPath ) - const dataForLanguage = await getNext(joinedUrl) - dataArray.push(...dataForLanguage) + await getNext(joinedUrl) } } From bcaf46eacf4ddebde60ad19b16685594b449ff3f Mon Sep 17 00:00:00 2001 From: David Disch Date: Sun, 27 Jun 2021 21:34:01 +1000 Subject: [PATCH 2/3] Use content-list endpoint to fetch content entities individually --- .../gatsby-source-drupal/src/gatsby-node.js | 210 ++++++++---------- 1 file changed, 87 insertions(+), 123 deletions(-) diff --git a/packages/gatsby-source-drupal/src/gatsby-node.js b/packages/gatsby-source-drupal/src/gatsby-node.js index 3096a552d7307..282357789f1e6 100644 --- a/packages/gatsby-source-drupal/src/gatsby-node.js +++ b/packages/gatsby-source-drupal/src/gatsby-node.js @@ -37,6 +37,8 @@ const requestQueue = require(`fastq`).promise(worker, 20) const asyncPool = require(`tiny-async-pool`) const bodyParser = require(`body-parser`) +const REPORTING_RATE_MS = 10000 + function gracefullyRethrow(activity, error) { // activity.panicOnBuild was implemented at some point in gatsby@2 // but plugin can still be used with older version of gatsby core @@ -77,7 +79,7 @@ exports.sourceNodes = async ( apiBase = `jsonapi`, basicAuth = {}, filters, - headers, + headers = {}, params = {}, concurrentFileRequests = 20, concurrentAPIRequests = 20, @@ -95,12 +97,17 @@ exports.sourceNodes = async ( enabledLanguages: [`und`], translatableEntities: [], }, + useAuthOn = [], } = pluginOptions const { createNode, setPluginStatus, touchNode } = actions // Update the concurrency limit from the plugin options requestQueue.concurrency = concurrentAPIRequests + if (typeof basicAuth.username === 'string' && typeof basicAuth.password === 'string') { + headers['Authorization'] = `Basic ${Buffer.from(`${basicAuth.username}:${basicAuth.password}`).toString('base64')}` + } + if (webhookBody && Object.keys(webhookBody).length) { const changesActivity = reporter.activityTimer( `loading Drupal content changes`, @@ -173,8 +180,6 @@ exports.sourceNodes = async ( const res = await requestQueue.push([ urlJoin(baseUrl, `gatsby-fastbuilds/sync/`, lastFetched.toString()), { - username: basicAuth.username, - password: basicAuth.password, headers, searchParams: params, responseType: `json`, @@ -268,142 +273,101 @@ exports.sourceNodes = async ( drupalFetchActivity.start() - let allData - try { - const res = await requestQueue.push([ - urlJoin(baseUrl, apiBase), - { - username: basicAuth.username, - password: basicAuth.password, - headers, - searchParams: params, - responseType: `json`, - }, - ]) - allData = await Promise.all( - _.map(res.body.links, async (url, type) => { - const dataArray = [] - if (disallowedLinkTypes.includes(type)) return - if (!url) return - if (!type) return - - // Lookup this type in our list of language alterable entities. - const isTranslatable = languageConfig.translatableEntities.some( - entityType => entityType === type - ) + const listResponse = await worker([ + urlJoin(baseUrl, 'gatsby/content-list'), + { + headers, + responseType: `json`, + } + ]) - const getNext = async url => { - if (typeof url === `object`) { - // url can be string or object containing href field - url = url.href - - // Apply any filters configured in gatsby-config.js. Filters - // can be any valid JSON API filter query string. - // See https://www.drupal.org/docs/8/modules/jsonapi/filtering - if (typeof filters === `object`) { - if (filters.hasOwnProperty(type)) { - url = new URL(url) - const filterParams = new URLSearchParams(filters[type]) - const filterKeys = Array.from(filterParams.keys()) - filterKeys.forEach(filterKey => { - // Only add filter params to url if it has not already been - // added. - if (!url.searchParams.has(filterKey)) { - url.searchParams.set(filterKey, filterParams.get(filterKey)) - } - }) - url = url.toString() - } - } - } + const requestUrls = [] + for (let entityTypeAndBundle in listResponse.body) { + if (disallowedLinkTypes.indexOf(entityTypeAndBundle) !== -1) continue - let d - try { - d = await requestQueue.push([ - url, - { - username: basicAuth.username, - password: basicAuth.password, - headers, - responseType: `json`, - }, - ]) - } catch (error) { - if (error.response && error.response.statusCode == 405) { - // The endpoint doesn't support the GET method, so just skip it. - return - } else { - console.error(`Failed to fetch ${url}`, error.message) - console.log(error) - throw error - } - } - dataArray.push(...d.body.data) - // Add support for includes. Includes allow entity data to be expanded - // based on relationships. The expanded data is exposed as `included` - // in the JSON API response. - // See https://www.drupal.org/docs/8/modules/jsonapi/includes - if (d.body.included) { - dataArray.push(...d.body.included) - } - if (d.body.links && d.body.links.next) { - await getNext(d.body.links.next) - } - } + const [entityType, entityBundle] = entityTypeAndBundle.split('--') - if (isTranslatable === false) { - await getNext(url) - } else { - for (let i = 0; i < languageConfig.enabledLanguages.length; i++) { - let currentLanguage = languageConfig.enabledLanguages[i] - const urlPath = url.href.split(`${apiBase}/`).pop() - const baseUrlWithoutTrailingSlash = baseUrl.replace(/\/$/, ``) - // The default language's JSON API is at the root. - if ( - currentLanguage === getOptions().languageConfig.defaultLanguage || - baseUrlWithoutTrailingSlash.slice(-currentLanguage.length) == - currentLanguage - ) { - currentLanguage = `` - } + for (let entityUuid of listResponse.body[entityTypeAndBundle]) { + const isTranslatable = languageConfig.translatableEntities.some( + translatableEntityType => translatableEntityType === entityTypeAndBundle + ) - const joinedUrl = urlJoin( - baseUrlWithoutTrailingSlash, - currentLanguage, - apiBase, - urlPath - ) + requestUrls.push({ + language: '', + entityType, + entityBundle, + entityUuid + }) - await getNext(joinedUrl) + if (isTranslatable) { + for (let language of languageConfig.enabledLanguages) { + if (language !== languageConfig.defaultLanguage) { + requestUrls.push({ + language, + entityType, + entityBundle, + entityUuid + }) } } + } + } + } - const result = { - type, - data: dataArray, + let allData = [] + let nRequests = 0 + let nCachedRequests = 0 + let lastReportRequests = 0 + let lastReportTime = Date.now() + let lastFetchedContentType = '' + await asyncPool(concurrentAPIRequests, requestUrls, async ({ language, entityType, entityBundle, entityUuid }) => { + nRequests++ + + if (lastFetchedContentType !== `${entityType}--${entityBundle}`) { + lastFetchedContentType = `${entityType}--${entityBundle}` + const percentageCompleted = ((nRequests / requestUrls.length) * 100).toFixed(2) + const cacheHitRate = ((nCachedRequests / nRequests) * 100).toFixed(2) + reporter.info(`Starting ${lastFetchedContentType} (${percentageCompleted}%) (CHR: ${cacheHitRate}%)`) + } + + if ((Date.now() - lastReportTime) >= REPORTING_RATE_MS) { + const nRequestsDoneThisPeriod = nRequests - lastReportRequests + const requestsPerSecond = Math.floor(nRequestsDoneThisPeriod / (REPORTING_RATE_MS / 1000)) + const percentageCompleted = ((nRequests / requestUrls.length) * 100).toFixed(2) + const cacheHitRate = ((nCachedRequests / nRequests) * 100).toFixed(2) + reporter.info(`${requestsPerSecond}rps (${nRequests}/${requestUrls.length}) (${percentageCompleted}%) (CHR: ${cacheHitRate}%)`) + + lastReportTime = Date.now() + lastReportRequests = nRequests + } + + const shouldUseAuth = useAuthOn.indexOf(`${entityType}--${entityBundle}`) !== -1 + try { + const entityResponse = await worker([ + urlJoin(baseUrl, language, apiBase, `/${entityType}/${entityBundle}/${entityUuid}`), + { + headers: shouldUseAuth ? headers : undefined, + responseType: `json`, } + ]) - // eslint-disable-next-line consistent-return - return result - }) - ) - } catch (e) { - gracefullyRethrow(drupalFetchActivity, e) - return - } + if (parseInt(entityResponse.headers.age) > 0) { + nCachedRequests++ + } + + allData.push(entityResponse.body.data) + } catch (err) {} + }) drupalFetchActivity.end() const nodes = new Map() // first pass - create basic nodes - _.each(allData, contentType => { - if (!contentType) return - _.each(contentType.data, datum => { - if (!datum) return - const node = nodeFromData(datum, createNodeId, entityReferenceRevisions) - nodes.set(node.id, node) - }) + _.each(allData, datum => { + if (!datum) return + const node = nodeFromData(datum, createNodeId, entityReferenceRevisions) + nodes.set(node.id, node) }) // second pass - handle relationships and back references From e28d4e481981d309a8fce29d2a783d37f7440cb9 Mon Sep 17 00:00:00 2001 From: David Disch Date: Mon, 28 Jun 2021 12:54:30 +1000 Subject: [PATCH 3/3] UChange from tiny-async-pool to fastq queues --- .../gatsby-source-drupal/src/gatsby-node.js | 103 ++++++------------ 1 file changed, 33 insertions(+), 70 deletions(-) diff --git a/packages/gatsby-source-drupal/src/gatsby-node.js b/packages/gatsby-source-drupal/src/gatsby-node.js index 282357789f1e6..63dc7d7d6a5cf 100644 --- a/packages/gatsby-source-drupal/src/gatsby-node.js +++ b/packages/gatsby-source-drupal/src/gatsby-node.js @@ -22,23 +22,28 @@ const agent = { // http2: new http2wrapper.Agent(), } +let lastReport = 0 +const REPORT_EVERY_N = 1000 async function worker([url, options]) { - return got(url, { + const result = await got(url, { agent, cache: false, // request: http2wrapper.auto, // http2: true, ...options, }) + const remainingRequests = requestQueue.length() + if (Math.abs(lastReport - remainingRequests) >= REPORT_EVERY_N) { + console.log(`Fetching: ${url} (${remainingRequests} requests remaining)`) + lastReport = remainingRequests + } + return result } const requestQueue = require(`fastq`).promise(worker, 20) - const asyncPool = require(`tiny-async-pool`) const bodyParser = require(`body-parser`) -const REPORTING_RATE_MS = 10000 - function gracefullyRethrow(activity, error) { // activity.panicOnBuild was implemented at some point in gatsby@2 // but plugin can still be used with older version of gatsby core @@ -273,91 +278,49 @@ exports.sourceNodes = async ( drupalFetchActivity.start() - const listResponse = await worker([ + const listResponse = await requestQueue.push([ urlJoin(baseUrl, 'gatsby/content-list'), { headers, - responseType: `json`, } ]) - - const requestUrls = [] - for (let entityTypeAndBundle in listResponse.body) { + const listResponseBody = JSON.parse(listResponse.body) + + const requestPromises = [] + for (let entityTypeAndBundle in listResponseBody) { if (disallowedLinkTypes.indexOf(entityTypeAndBundle) !== -1) continue - + const isTranslatable = languageConfig.translatableEntities.indexOf(entityTypeAndBundle) !== -1 + const shouldUseAuth = useAuthOn.indexOf(entityTypeAndBundle) !== -1 const [entityType, entityBundle] = entityTypeAndBundle.split('--') - for (let entityUuid of listResponse.body[entityTypeAndBundle]) { - const isTranslatable = languageConfig.translatableEntities.some( - translatableEntityType => translatableEntityType === entityTypeAndBundle + for (let entityUuid of listResponseBody[entityTypeAndBundle]) { + requestPromises.push( + requestQueue.push([ + urlJoin(baseUrl, apiBase, `/${entityType}/${entityBundle}/${entityUuid}`), + { + headers: shouldUseAuth ? headers : undefined, + } + ]).then(response => JSON.parse(response.body).data).catch(() => {}) ) - requestUrls.push({ - language: '', - entityType, - entityBundle, - entityUuid - }) - if (isTranslatable) { for (let language of languageConfig.enabledLanguages) { if (language !== languageConfig.defaultLanguage) { - requestUrls.push({ - language, - entityType, - entityBundle, - entityUuid - }) + requestPromises.push( + requestQueue.push([ + urlJoin(baseUrl, language, apiBase, `/${entityType}/${entityBundle}/${entityUuid}`), + { + headers: shouldUseAuth ? headers : undefined + } + ]).then(response => JSON.parse(response.body).data).catch(() => {}) + ) } } } } } - let allData = [] - let nRequests = 0 - let nCachedRequests = 0 - let lastReportRequests = 0 - let lastReportTime = Date.now() - let lastFetchedContentType = '' - await asyncPool(concurrentAPIRequests, requestUrls, async ({ language, entityType, entityBundle, entityUuid }) => { - nRequests++ - - if (lastFetchedContentType !== `${entityType}--${entityBundle}`) { - lastFetchedContentType = `${entityType}--${entityBundle}` - const percentageCompleted = ((nRequests / requestUrls.length) * 100).toFixed(2) - const cacheHitRate = ((nCachedRequests / nRequests) * 100).toFixed(2) - reporter.info(`Starting ${lastFetchedContentType} (${percentageCompleted}%) (CHR: ${cacheHitRate}%)`) - } - - if ((Date.now() - lastReportTime) >= REPORTING_RATE_MS) { - const nRequestsDoneThisPeriod = nRequests - lastReportRequests - const requestsPerSecond = Math.floor(nRequestsDoneThisPeriod / (REPORTING_RATE_MS / 1000)) - const percentageCompleted = ((nRequests / requestUrls.length) * 100).toFixed(2) - const cacheHitRate = ((nCachedRequests / nRequests) * 100).toFixed(2) - reporter.info(`${requestsPerSecond}rps (${nRequests}/${requestUrls.length}) (${percentageCompleted}%) (CHR: ${cacheHitRate}%)`) - - lastReportTime = Date.now() - lastReportRequests = nRequests - } - - const shouldUseAuth = useAuthOn.indexOf(`${entityType}--${entityBundle}`) !== -1 - try { - const entityResponse = await worker([ - urlJoin(baseUrl, language, apiBase, `/${entityType}/${entityBundle}/${entityUuid}`), - { - headers: shouldUseAuth ? headers : undefined, - responseType: `json`, - } - ]) - - if (parseInt(entityResponse.headers.age) > 0) { - nCachedRequests++ - } - - allData.push(entityResponse.body.data) - } catch (err) {} - }) + const allData = await Promise.all(requestPromises) drupalFetchActivity.end()