From 968b94a7977397804113d54b519afd1f1aaa03bd Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Tue, 18 Jan 2022 13:03:16 +0000 Subject: [PATCH] perf: concurrent pin status sync (#1104) This PR updates the pin status sync cron job to add concurrency and speed up processing. --- packages/cron/package.json | 3 +- packages/cron/src/jobs/pins.js | 130 +++++++++++++++++---------------- 2 files changed, 71 insertions(+), 62 deletions(-) diff --git a/packages/cron/package.json b/packages/cron/package.json index 70e18eae27..e3c6e919a3 100644 --- a/packages/cron/package.json +++ b/packages/cron/package.json @@ -30,7 +30,8 @@ "node-fetch": "^2.6.1", "p-retry": "^4.6.1", "p-settle": "^5.0.0", - "pg": "^8.7.1" + "pg": "^8.7.1", + "streaming-iterables": "^6.0.0" }, "devDependencies": { "@types/node-fetch": "^2.5.10", diff --git a/packages/cron/src/jobs/pins.js b/packages/cron/src/jobs/pins.js index fdbc2e0209..4c8784841e 100644 --- a/packages/cron/src/jobs/pins.js +++ b/packages/cron/src/jobs/pins.js @@ -1,7 +1,9 @@ import debug from 'debug' +import { consume, transform, pipeline } from 'streaming-iterables' import { DBError } from '../../../api/src/utils/db-client.js' const log = debug('pins:updatePinStatuses') +const CONCURRENCY = 5 /** * @typedef {{ @@ -124,7 +126,7 @@ export async function checkFailedPinStatuses(config) { * }} config */ async function updatePinStatuses(config) { - const { countPins, fetchPins, db, cluster1, cluster2, cluster3 } = config + const { countPins, fetchPins, db, cluster2, cluster3 } = config if (!log.enabled) { console.log('ℹī¸ Enable logging by setting DEBUG=pins:updatePinStatuses') } @@ -132,72 +134,78 @@ async function updatePinStatuses(config) { const count = await countPins() log(`đŸŽ¯ Updating ${count} pin statuses`) - let offset = 0 - const limit = 1000 - while (true) { - const pins = await fetchPins(offset, limit) - if (!pins.length) { - break - } - - /** @type {Pin[]} */ - const updatedPins = [] - for (const pin of pins) { - /** @type {import('@nftstorage/ipfs-cluster/dist/src/interface').StatusResponse} */ - let statusRes - - switch (pin.service) { - case 'IpfsCluster': - statusRes = await cluster3.status(pin.content_cid) - break - case 'IpfsCluster2': - statusRes = await cluster2.status(pin.content_cid) - break - case 'IpfsCluster3': - statusRes = await cluster3.status(pin.content_cid) - break - default: - throw new Error(`Service ${pin.service} not supported.`) - } - - const pinInfos = Object.values(statusRes.peerMap) - - /** @type {Pin['status']} */ - let status = 'PinError' - if (pinInfos.some((i) => i.status === 'pinned')) { - status = 'Pinned' - } else if (pinInfos.some((i) => i.status === 'pinning')) { - status = 'Pinning' - } else if (pinInfos.some((i) => i.status === 'pin_queued')) { - status = 'PinQueued' + await pipeline( + async function* () { + let offset = 0 + const limit = 1000 + while (true) { + log(`đŸļ fetching pins ${offset} -> ${offset + limit} of ${count}`) + const pins = await fetchPins(offset, limit) + if (!pins.length) { + return + } + yield pins + offset += limit } - - if (status !== pin.status) { - log(`📌 ${pin.content_cid} ${pin.status} => ${status}`) - updatedPins.push({ - ...pin, - status, - updated_at: new Date().toISOString(), - }) + }, + transform(CONCURRENCY, async (pins) => { + /** @type {Pin[]} */ + const updatedPins = [] + for (const pin of pins) { + /** @type {import('@nftstorage/ipfs-cluster/dist/src/interface').StatusResponse} */ + let statusRes + + switch (pin.service) { + case 'IpfsCluster': + statusRes = await cluster3.status(pin.content_cid) + break + case 'IpfsCluster2': + statusRes = await cluster2.status(pin.content_cid) + break + case 'IpfsCluster3': + statusRes = await cluster3.status(pin.content_cid) + break + default: + throw new Error(`Service ${pin.service} not supported.`) + } + + const pinInfos = Object.values(statusRes.peerMap) + + /** @type {Pin['status']} */ + let status = 'PinError' + if (pinInfos.some((i) => i.status === 'pinned')) { + status = 'Pinned' + } else if (pinInfos.some((i) => i.status === 'pinning')) { + status = 'Pinning' + } else if (pinInfos.some((i) => i.status === 'pin_queued')) { + status = 'PinQueued' + } + + if (status !== pin.status) { + log(`📌 ${pin.content_cid} ${pin.status} => ${status}`) + updatedPins.push({ + ...pin, + status, + updated_at: new Date().toISOString(), + }) + } } - } - if (updatedPins.length) { - // bulk upsert - const { error: updateError } = await db.client - .from('pin') - .upsert(updatedPins, { count: 'exact', returning: 'minimal' }) + if (updatedPins.length) { + // bulk upsert + const { error: updateError } = await db.client + .from('pin') + .upsert(updatedPins, { count: 'exact', returning: 'minimal' }) - if (updateError) { - throw Object.assign(new Error(), updateError) + if (updateError) { + throw Object.assign(new Error(), updateError) + } } - } - log(`🗂 ${pins.length} processed, ${updatedPins.length} updated`) - log(`ℹī¸ ${offset + pins.length} of ${count} processed in total`) - - offset += limit - } + log(`🗂 ${pins.length} processed, ${updatedPins.length} updated`) + }), + consume + ) log('✅ Done') }