Skip to content

Commit

Permalink
perf: concurrent pin status sync (#1104)
Browse files Browse the repository at this point in the history
This PR updates the pin status sync cron job to add concurrency and speed up processing.
  • Loading branch information
Alan Shaw authored Jan 18, 2022
1 parent eab0411 commit 968b94a
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 62 deletions.
3 changes: 2 additions & 1 deletion packages/cron/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
"node-fetch": "^2.6.1",
"p-retry": "^4.6.1",
"p-settle": "^5.0.0",
"pg": "^8.7.1"
"pg": "^8.7.1",
"streaming-iterables": "^6.0.0"
},
"devDependencies": {
"@types/node-fetch": "^2.5.10",
Expand Down
130 changes: 69 additions & 61 deletions packages/cron/src/jobs/pins.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import debug from 'debug'
import { consume, transform, pipeline } from 'streaming-iterables'
import { DBError } from '../../../api/src/utils/db-client.js'

const log = debug('pins:updatePinStatuses')
const CONCURRENCY = 5

/**
* @typedef {{
Expand Down Expand Up @@ -124,80 +126,86 @@ export async function checkFailedPinStatuses(config) {
* }} config
*/
async function updatePinStatuses(config) {
const { countPins, fetchPins, db, cluster1, cluster2, cluster3 } = config
const { countPins, fetchPins, db, cluster2, cluster3 } = config
if (!log.enabled) {
console.log('ℹ️ Enable logging by setting DEBUG=pins:updatePinStatuses')
}

const count = await countPins()
log(`🎯 Updating ${count} pin statuses`)

let offset = 0
const limit = 1000
while (true) {
const pins = await fetchPins(offset, limit)
if (!pins.length) {
break
}

/** @type {Pin[]} */
const updatedPins = []
for (const pin of pins) {
/** @type {import('@nftstorage/ipfs-cluster/dist/src/interface').StatusResponse} */
let statusRes

switch (pin.service) {
case 'IpfsCluster':
statusRes = await cluster3.status(pin.content_cid)
break
case 'IpfsCluster2':
statusRes = await cluster2.status(pin.content_cid)
break
case 'IpfsCluster3':
statusRes = await cluster3.status(pin.content_cid)
break
default:
throw new Error(`Service ${pin.service} not supported.`)
}

const pinInfos = Object.values(statusRes.peerMap)

/** @type {Pin['status']} */
let status = 'PinError'
if (pinInfos.some((i) => i.status === 'pinned')) {
status = 'Pinned'
} else if (pinInfos.some((i) => i.status === 'pinning')) {
status = 'Pinning'
} else if (pinInfos.some((i) => i.status === 'pin_queued')) {
status = 'PinQueued'
await pipeline(
async function* () {
let offset = 0
const limit = 1000
while (true) {
log(`🐶 fetching pins ${offset} -> ${offset + limit} of ${count}`)
const pins = await fetchPins(offset, limit)
if (!pins.length) {
return
}
yield pins
offset += limit
}

if (status !== pin.status) {
log(`📌 ${pin.content_cid} ${pin.status} => ${status}`)
updatedPins.push({
...pin,
status,
updated_at: new Date().toISOString(),
})
},
transform(CONCURRENCY, async (pins) => {
/** @type {Pin[]} */
const updatedPins = []
for (const pin of pins) {
/** @type {import('@nftstorage/ipfs-cluster/dist/src/interface').StatusResponse} */
let statusRes

switch (pin.service) {
case 'IpfsCluster':
statusRes = await cluster3.status(pin.content_cid)
break
case 'IpfsCluster2':
statusRes = await cluster2.status(pin.content_cid)
break
case 'IpfsCluster3':
statusRes = await cluster3.status(pin.content_cid)
break
default:
throw new Error(`Service ${pin.service} not supported.`)
}

const pinInfos = Object.values(statusRes.peerMap)

/** @type {Pin['status']} */
let status = 'PinError'
if (pinInfos.some((i) => i.status === 'pinned')) {
status = 'Pinned'
} else if (pinInfos.some((i) => i.status === 'pinning')) {
status = 'Pinning'
} else if (pinInfos.some((i) => i.status === 'pin_queued')) {
status = 'PinQueued'
}

if (status !== pin.status) {
log(`📌 ${pin.content_cid} ${pin.status} => ${status}`)
updatedPins.push({
...pin,
status,
updated_at: new Date().toISOString(),
})
}
}
}

if (updatedPins.length) {
// bulk upsert
const { error: updateError } = await db.client
.from('pin')
.upsert(updatedPins, { count: 'exact', returning: 'minimal' })
if (updatedPins.length) {
// bulk upsert
const { error: updateError } = await db.client
.from('pin')
.upsert(updatedPins, { count: 'exact', returning: 'minimal' })

if (updateError) {
throw Object.assign(new Error(), updateError)
if (updateError) {
throw Object.assign(new Error(), updateError)
}
}
}

log(`🗂 ${pins.length} processed, ${updatedPins.length} updated`)
log(`ℹ️ ${offset + pins.length} of ${count} processed in total`)

offset += limit
}
log(`🗂 ${pins.length} processed, ${updatedPins.length} updated`)
}),
consume
)

log('✅ Done')
}

0 comments on commit 968b94a

Please sign in to comment.