diff --git a/packages/cron/src/jobs/pins.js b/packages/cron/src/jobs/pins.js index 2298ecf889..e1272fcc74 100644 --- a/packages/cron/src/jobs/pins.js +++ b/packages/cron/src/jobs/pins.js @@ -19,13 +19,23 @@ const CLUSTERS = ['IpfsCluster', 'IpfsCluster2', 'IpfsCluster3'] * cluster3: import('@nftstorage/ipfs-cluster').Cluster * }} Config * @typedef {import('../../../api/src/utils/db-types').definitions} definitions - * @typedef {Pick} Pin + * @typedef {Pick & { source_cid: string }} Pin * @typedef {import('@supabase/postgrest-js').PostgrestQueryBuilder} PinQuery */ const CLUSTER_LIST = CLUSTERS.map((c) => `'${c}'`).toString() const COUNT_PENDING_PINS = `SELECT COUNT(*) FROM pin WHERE service IN (${CLUSTER_LIST}) AND status != 'Pinned' AND status != 'PinError'` -const FETCH_PENDING_PINS = `SELECT * FROM pin WHERE service IN (${CLUSTER_LIST}) AND status != 'Pinned' AND status != 'PinError' OFFSET $1 LIMIT $2` +const FETCH_PENDING_PINS = ` +SELECT p.id, p.service, p.status, u.source_cid, p.inserted_at, p.updated_at + FROM pin p + JOIN upload u + ON p.content_cid = u.content_cid + WHERE p.service IN (${CLUSTER_LIST}) + AND p.status != 'Pinned' + AND p.status != 'PinError' +OFFSET $1 + LIMIT $2 +` /** * Updates pin status and size by retrieving updated status from cluster. @@ -56,7 +66,16 @@ export async function updatePendingPinStatuses(config) { } const COUNT_FAILED_PINS = `SELECT COUNT(*) FROM pin WHERE service IN (${CLUSTER_LIST}) AND status = 'PinError' AND inserted_at > $1` -const FETCH_FAILED_PINS = `SELECT * FROM pin WHERE service IN (${CLUSTER_LIST}) AND status = 'PinError' AND inserted_at > $1 OFFSET $2 LIMIT $3` +const FETCH_FAILED_PINS = ` +SELECT p.id, p.service, p.status, u.source_cid, p.inserted_at, p.updated_at + FROM pin p + JOIN upload u + ON p.content_cid = u.content_cid + WHERE p.service IN (${CLUSTER_LIST}) + AND p.status = 'PinError' + AND inserted_at > $1 +OFFSET $2 + LIMIT $3` // If Cluster reports PinError, set the status to Pinning when it is still a new // pin i.e. less than 24 hours old. Cluster will continue trying to pin the data @@ -162,12 +181,12 @@ async function updatePinStatuses(config) { transform(CONCURRENCY, async (pins) => { /** @type {Pin[]} */ const updatedPins = [] - const cids = pins.map((p) => p.content_cid) + const cids = pins.map((p) => p.source_cid) const statuses = await cluster3.statusAll({ cids }) const statusByCid = Object.fromEntries(statuses.map((s) => [s.cid, s])) for (const pin of pins) { - const statusRes = statusByCid[pin.content_cid] + const statusRes = statusByCid[pin.source_cid] const pinInfos = Object.values(statusRes.peerMap) /** @type {Pin['status']} */ @@ -184,12 +203,12 @@ async function updatePinStatuses(config) { // the PIN_ERROR_GRACE_PERIOD. const pinAge = Date.now() - new Date(pin.inserted_at).getTime() if (status === 'PinError' && pinAge < PIN_ERROR_GRACE_PERIOD) { - log(`ℹī¸ ${pin.content_cid} is ${status} in grace period`) + log(`ℹī¸ ${pin.source_cid} is ${status} in grace period`) status = 'Pinning' } if (status !== pin.status) { - log(`📌 ${pin.content_cid} ${pin.status} => ${status}`) + log(`📌 ${pin.source_cid} ${pin.status} => ${status}`) updatedPins.push({ ...pin, status,