Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use source_cid in pin cron job #1757

Merged
merged 1 commit into from
Apr 5, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 26 additions & 7 deletions packages/cron/src/jobs/pins.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,23 @@ const CLUSTERS = ['IpfsCluster', 'IpfsCluster2', 'IpfsCluster3']
* cluster3: import('@nftstorage/ipfs-cluster').Cluster
* }} Config
* @typedef {import('../../../api/src/utils/db-types').definitions} definitions
* @typedef {Pick<definitions['pin'], 'id'|'status'|'content_cid'|'service'|'inserted_at'|'updated_at'>} Pin
* @typedef {Pick<definitions['pin'], 'id'|'status'|'service'|'inserted_at'|'updated_at'> & { source_cid: string }} Pin
* @typedef {import('@supabase/postgrest-js').PostgrestQueryBuilder<Pin>} PinQuery
*/

const CLUSTER_LIST = CLUSTERS.map((c) => `'${c}'`).toString()
const COUNT_PENDING_PINS = `SELECT COUNT(*) FROM pin WHERE service IN (${CLUSTER_LIST}) AND status != 'Pinned' AND status != 'PinError'`
const FETCH_PENDING_PINS = `SELECT * FROM pin WHERE service IN (${CLUSTER_LIST}) AND status != 'Pinned' AND status != 'PinError' OFFSET $1 LIMIT $2`
const FETCH_PENDING_PINS = `
SELECT p.id, p.service, p.status, u.source_cid, p.inserted_at, p.updated_at
FROM pin p
JOIN upload u
ON p.content_cid = u.content_cid
WHERE p.service IN (${CLUSTER_LIST})
AND p.status != 'Pinned'
AND p.status != 'PinError'
OFFSET $1
LIMIT $2
`

/**
* Updates pin status and size by retrieving updated status from cluster.
Expand Down Expand Up @@ -56,7 +66,16 @@ export async function updatePendingPinStatuses(config) {
}

const COUNT_FAILED_PINS = `SELECT COUNT(*) FROM pin WHERE service IN (${CLUSTER_LIST}) AND status = 'PinError' AND inserted_at > $1`
const FETCH_FAILED_PINS = `SELECT * FROM pin WHERE service IN (${CLUSTER_LIST}) AND status = 'PinError' AND inserted_at > $1 OFFSET $2 LIMIT $3`
const FETCH_FAILED_PINS = `
SELECT p.id, p.service, p.status, u.source_cid, p.inserted_at, p.updated_at
FROM pin p
JOIN upload u
ON p.content_cid = u.content_cid
WHERE p.service IN (${CLUSTER_LIST})
AND p.status = 'PinError'
AND inserted_at > $1
OFFSET $2
LIMIT $3`

// If Cluster reports PinError, set the status to Pinning when it is still a new
// pin i.e. less than 24 hours old. Cluster will continue trying to pin the data
Expand Down Expand Up @@ -162,12 +181,12 @@ async function updatePinStatuses(config) {
transform(CONCURRENCY, async (pins) => {
/** @type {Pin[]} */
const updatedPins = []
const cids = pins.map((p) => p.content_cid)
const cids = pins.map((p) => p.source_cid)
const statuses = await cluster3.statusAll({ cids })
const statusByCid = Object.fromEntries(statuses.map((s) => [s.cid, s]))

for (const pin of pins) {
const statusRes = statusByCid[pin.content_cid]
const statusRes = statusByCid[pin.source_cid]
const pinInfos = Object.values(statusRes.peerMap)

/** @type {Pin['status']} */
Expand All @@ -184,12 +203,12 @@ async function updatePinStatuses(config) {
// the PIN_ERROR_GRACE_PERIOD.
const pinAge = Date.now() - new Date(pin.inserted_at).getTime()
if (status === 'PinError' && pinAge < PIN_ERROR_GRACE_PERIOD) {
log(`ℹ️ ${pin.content_cid} is ${status} in grace period`)
log(`ℹ️ ${pin.source_cid} is ${status} in grace period`)
status = 'Pinning'
}

if (status !== pin.status) {
log(`📌 ${pin.content_cid} ${pin.status} => ${status}`)
log(`📌 ${pin.source_cid} ${pin.status} => ${status}`)
updatedPins.push({
...pin,
status,
Expand Down