Skip to content

Commit

Permalink
fix: use source_cid in pin cron job (#1757)
Browse files Browse the repository at this point in the history
TLDR; we pin to cluster using `source_cid`, but the pins cron job checks status by `content_cid` so it is not found. For most data this is not a problem but when v0 CIDs are pinned their status never gets updated to `Pinned`.

This is a long standing bug that was masked by a regression that got fixed recently here: #1634

In Web3.Storage this query is harder so we actually downgrade `content_cid` to v0 when found to be unpinned everywhere and re-check with cluster: https://github.com/web3-storage/web3.storage/blob/3231807f24190de3e5be810ab25ed9567fda41cb/packages/cron/src/jobs/pins.js#L73-L75

Fixes #1745
  • Loading branch information
Alan Shaw authored Apr 5, 2022
1 parent 21e8eb0 commit c49727c
Showing 1 changed file with 26 additions and 7 deletions.
33 changes: 26 additions & 7 deletions packages/cron/src/jobs/pins.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,23 @@ const CLUSTERS = ['IpfsCluster', 'IpfsCluster2', 'IpfsCluster3']
* cluster3: import('@nftstorage/ipfs-cluster').Cluster
* }} Config
* @typedef {import('../../../api/src/utils/db-types').definitions} definitions
* @typedef {Pick<definitions['pin'], 'id'|'status'|'content_cid'|'service'|'inserted_at'|'updated_at'>} Pin
* @typedef {Pick<definitions['pin'], 'id'|'status'|'service'|'inserted_at'|'updated_at'> & { source_cid: string }} Pin
* @typedef {import('@supabase/postgrest-js').PostgrestQueryBuilder<Pin>} PinQuery
*/

const CLUSTER_LIST = CLUSTERS.map((c) => `'${c}'`).toString()
const COUNT_PENDING_PINS = `SELECT COUNT(*) FROM pin WHERE service IN (${CLUSTER_LIST}) AND status != 'Pinned' AND status != 'PinError'`
const FETCH_PENDING_PINS = `SELECT * FROM pin WHERE service IN (${CLUSTER_LIST}) AND status != 'Pinned' AND status != 'PinError' OFFSET $1 LIMIT $2`
const FETCH_PENDING_PINS = `
SELECT p.id, p.service, p.status, u.source_cid, p.inserted_at, p.updated_at
FROM pin p
JOIN upload u
ON p.content_cid = u.content_cid
WHERE p.service IN (${CLUSTER_LIST})
AND p.status != 'Pinned'
AND p.status != 'PinError'
OFFSET $1
LIMIT $2
`

/**
* Updates pin status and size by retrieving updated status from cluster.
Expand Down Expand Up @@ -56,7 +66,16 @@ export async function updatePendingPinStatuses(config) {
}

const COUNT_FAILED_PINS = `SELECT COUNT(*) FROM pin WHERE service IN (${CLUSTER_LIST}) AND status = 'PinError' AND inserted_at > $1`
const FETCH_FAILED_PINS = `SELECT * FROM pin WHERE service IN (${CLUSTER_LIST}) AND status = 'PinError' AND inserted_at > $1 OFFSET $2 LIMIT $3`
const FETCH_FAILED_PINS = `
SELECT p.id, p.service, p.status, u.source_cid, p.inserted_at, p.updated_at
FROM pin p
JOIN upload u
ON p.content_cid = u.content_cid
WHERE p.service IN (${CLUSTER_LIST})
AND p.status = 'PinError'
AND inserted_at > $1
OFFSET $2
LIMIT $3`

// If Cluster reports PinError, set the status to Pinning when it is still a new
// pin i.e. less than 24 hours old. Cluster will continue trying to pin the data
Expand Down Expand Up @@ -162,12 +181,12 @@ async function updatePinStatuses(config) {
transform(CONCURRENCY, async (pins) => {
/** @type {Pin[]} */
const updatedPins = []
const cids = pins.map((p) => p.content_cid)
const cids = pins.map((p) => p.source_cid)
const statuses = await cluster3.statusAll({ cids })
const statusByCid = Object.fromEntries(statuses.map((s) => [s.cid, s]))

for (const pin of pins) {
const statusRes = statusByCid[pin.content_cid]
const statusRes = statusByCid[pin.source_cid]
const pinInfos = Object.values(statusRes.peerMap)

/** @type {Pin['status']} */
Expand All @@ -184,12 +203,12 @@ async function updatePinStatuses(config) {
// the PIN_ERROR_GRACE_PERIOD.
const pinAge = Date.now() - new Date(pin.inserted_at).getTime()
if (status === 'PinError' && pinAge < PIN_ERROR_GRACE_PERIOD) {
log(`ℹ️ ${pin.content_cid} is ${status} in grace period`)
log(`ℹ️ ${pin.source_cid} is ${status} in grace period`)
status = 'Pinning'
}

if (status !== pin.status) {
log(`📌 ${pin.content_cid} ${pin.status} => ${status}`)
log(`📌 ${pin.source_cid} ${pin.status} => ${status}`)
updatedPins.push({
...pin,
status,
Expand Down

0 comments on commit c49727c

Please sign in to comment.