Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Pin APIs and PR feedback #810

Merged
merged 13 commits into from
Jan 13, 2022
1 change: 1 addition & 0 deletions packages/api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"publish": "wrangler publish --env $(whoami)",
"build": "WEBPACK_CLI_FORCE_LOAD_ESM_CONFIG=true webpack",
"test": "npm-run-all -p -r mock:cluster mock:backup test:e2e -s test:size",
"test:debug": "npm-run-all -p -r mock:cluster mock:backup test:e2e:debug -s test:size",
"test:e2e": "playwright-test \"test/**/*.spec.js\" --sw src/index.js -b webkit",
"test:e2e:debug": "npm run test:e2e -- --debug",
"test:size": "bundlesize",
Expand Down
52 changes: 9 additions & 43 deletions packages/api/src/car.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import * as pb from '@ipld/dag-pb'
import retry from 'p-retry'
import { GATEWAY, LOCAL_ADD_THRESHOLD, MAX_BLOCK_SIZE } from './constants.js'
import { JSONResponse } from './utils/json-response.js'
import { toPinStatusEnum } from './utils/pin.js'
import { getPins, PIN_OK_STATUS, waitAndUpdateOkPins } from './utils/pin.js'
import { normalizeCid } from './utils/normalize-cid.js'

/**
Expand All @@ -19,12 +19,6 @@ import { normalizeCid } from './utils/normalize-cid.js'

const decoders = [pb, raw, cbor]

// Duration between status check polls in ms.
const PIN_STATUS_CHECK_INTERVAL = 5000
// Max time in ms to spend polling for an OK status.
const MAX_PIN_STATUS_CHECK_TIME = 30000
// Pin statuses considered OK.
const PIN_OK_STATUS = ['Pinned', 'Pinning', 'PinQueued']
// Times to retry the transaction after the first failure.
const CREATE_UPLOAD_RETRIES = 4
// Time in ms before starting the first retry.
Expand Down Expand Up @@ -166,25 +160,12 @@ export async function handleCarUpload (request, env, ctx, car, uploadType = 'Car
// Keep querying Cluster until one of the nodes reports something other than
// Unpinned i.e. PinQueued or Pinning or Pinned.
if (!pins.some(p => PIN_OK_STATUS.includes(p.status))) {
tasks.push(async () => {
const start = Date.now()
while (Date.now() - start > MAX_PIN_STATUS_CHECK_TIME) {
await new Promise(resolve => setTimeout(resolve, PIN_STATUS_CHECK_INTERVAL))
const { peerMap } = await env.cluster.status(cid)
const pins = toPins(peerMap)
if (!pins.length) { // should not happen
throw new Error('not pinning on any node')
}

const okPins = pins.filter(p => PIN_OK_STATUS.includes(p.status))
if (!okPins.length) continue

for (const pin of okPins) {
await env.db.upsertPin(normalizedCid, pin)
}
return
}
})
tasks.push(waitAndUpdateOkPins.bind(
null,
normalizeCid,
env.cluster,
env.db)
)
}

if (ctx.waitUntil) {
Expand Down Expand Up @@ -230,12 +211,7 @@ async function addToCluster (car, env) {
// will be done async by bitswap instead.
local: car.size > LOCAL_ADD_THRESHOLD
})

const { peerMap } = await env.cluster.status(cid)
const pins = toPins(peerMap)
if (!pins.length) { // should not happen
throw new Error('not pinning on any node')
}
const pins = await getPins(cid, env.cluster)

return { cid, pins }
}
Expand All @@ -245,7 +221,7 @@ async function addToCluster (car, env) {
* @param {Blob} blob
* @param {CID} rootCid
* @param {string} userId
* @param {import('../env').Env} env
* @param {import('./env').Env} env
*/
async function backup (blob, rootCid, userId, env) {
if (!env.s3Client) {
Expand Down Expand Up @@ -338,13 +314,3 @@ function cumulativeSize (pbNodeBytes, pbNode) {
// This logic is the same as used by go/js-ipfs to display the cumulative size of a dag-pb dag.
return pbNodeBytes.byteLength + pbNode.Links.reduce((acc, curr) => acc + (curr.Tsize || 0), 0)
}

/**
* @param {import('@nftstorage/ipfs-cluster').StatusResponse['peerMap']} peerMap
*/
function toPins (peerMap) {
return Object.entries(peerMap).map(([peerId, { peerName, status }]) => ({
status: toPinStatusEnum(status),
location: { peerId, peerName }
}))
}
115 changes: 50 additions & 65 deletions packages/api/src/pins.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { JSONResponse, notFound } from './utils/json-response.js'
import { normalizeCid } from './utils/normalize-cid.js'
import { waitToGetOkPins } from './utils/pin.js'
import { getPins, PIN_OK_STATUS, waitAndUpdateOkPins } from './utils/pin.js'

/**
* @typedef {'queued' | 'pinning' | 'failed' | 'pinned'} apiPinStatus
Expand All @@ -9,7 +9,7 @@ import { waitToGetOkPins } from './utils/pin.js'
/**
*
* Service API Pin object definition
* @typedef {Object} ServiceApiPin
* @typedef {Object} PsaPin
* @property {string} cid
* @property {string} [name]
* @property {Array.<string>} [origins]
Expand All @@ -19,14 +19,14 @@ import { waitToGetOkPins } from './utils/pin.js'
/**
*
* Service API Pin Status definition
* @typedef {Object} ServiceApiPinStatus
* @typedef {Object} PsaPinStatusResponse
* @property {string} requestId
* @property {apiPinStatus} status
* @property {string} created
* @property {Array<string>} delegates
* @property {string} [info]
*
* @property {ServiceApiPin} pin
* @property {PsaPin} pin
*/

/**
Expand All @@ -38,7 +38,7 @@ import { waitToGetOkPins } from './utils/pin.js'
* @param {import('../../db/db-client-types.js').PinItemOutput[]} pins
* @return {apiPinStatus} status
*/
export const getPinningAPIStatus = (pins) => {
export const getEffectivePinStatus = (pins) => {
GaryHomewood marked this conversation as resolved.
Show resolved Hide resolved
const pinStatuses = pins.map((p) => p.status)

// TODO what happens with Sharded? I'd assumed is pinned?
Expand All @@ -56,11 +56,8 @@ export const getPinningAPIStatus = (pins) => {
return 'queued'
}

if (pinStatuses.length === 0) {
return 'queued'
// TODO after some time if there are no pins we should give up and return a failed
// status instead
}
// TODO after some time if there are no pins we should give up and return a failed
// status instead
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems that is what we do now, straight away. Maybe this can be removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this relates to the fact that it'd be nice to have some kind of background task to check after a given defined threshold (an hour, a day, a week I don't know) to tell the cluster to stop trying to pin the cid (I know you have priority queues now but I still feel it should be good to programmatically stop the cluster trying).
At the same time, we should update the DB and therefore the request status accordingly.

I don't think this is a super high priority but still worth tracking?
If you agree I can update the comment to be more verbose and tie it with a github issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping 😇


return 'failed'
}
Expand Down Expand Up @@ -106,7 +103,7 @@ export async function pinPost (request, env, ctx) {

// Validate cid
try {
normalizeCid(cid)
pinData.normalizedCid = normalizeCid(cid)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I'd not augment this user provided data with anything but instead pull out the bits you want and combine them any derived/additional data in a different object.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You caught me, I was being lazy 😛. On it.

} catch (err) {
return new JSONResponse(
{ error: { reason: ERROR_STATUS, details: INVALID_CID } },
Expand Down Expand Up @@ -165,61 +162,46 @@ export async function pinPost (request, env, ctx) {
* @return {Promise<JSONResponse>}
*/
async function createPin (pinData, authToken, env, ctx) {
const { cid, name, origins, meta } = pinData
const normalizedCid = normalizeCid(cid)

const pinRequestData = {
requestedCid: cid,
cid: normalizedCid,
authKey: authToken
}
const pinOptions = {}
const { cid, origins, meta, normalizedCid } = pinData

if (name) {
pinRequestData.name = name
pinOptions.name = name
}
const pinName = pinData.name || undefined // deal with empty strings

if (origins) {
pinOptions.origins = origins
}
await env.cluster.pin(cid, {
name: pinName,
origins,
metadata: meta
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't pass user provided metadata to cluster. Cluster is not multi-tenant so two users pinning the same CID will ovewrite the meta.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really good catch, I'll update.

})
const pins = await getPins(cid, env.cluster)

if (meta) {
pinOptions.meta = meta
const pinRequestData = {
sourceCid: cid,
contentCid: normalizedCid,
authKey: authToken,
name: pinName,
pins
}

// Pin CID to Cluster
// TODO: Look into when the returned Promised is resolved to understand if we should be awaiting this call.
env.cluster.pin(normalizedCid, pinOptions)
const pinRequest = await env.db.createPsaPinRequest(pinRequestData)

// Create Pin request in db (not creating any content at this stage if it doesn't already exists)
const pinRequest = await env.db.createPAPinRequest(pinRequestData)

/** @type {ServiceApiPinStatus} */
/** @type {PsaPinStatusResponse} */
const pinStatus = getPinStatus(pinRequest)

/** @type {(() => Promise<any>)[]} */
const tasks = []

// If we're pinning content that is currently not in the cluster, it might take a while to
// get the cid from the network. We check pinning status asyncrounosly.
if (pinRequest.pins.length === 0) {
tasks.push(async () => {
const okPins = await waitToGetOkPins(cid, env.cluster)
// Create the content row
// TODO: Get dagSize
env.db.createContent({ cid: normalizedCid, pins: okPins })
for (const pin of okPins) {
await env.db.upsertPin(normalizedCid, pin)
}
})
if (!pins.some(p => PIN_OK_STATUS.includes(p.status))) {
tasks.push(
waitAndUpdateOkPins.bind(
null,
normalizeCid,
env.cluster,
env.db)
)
}

// TODO: Backups. At the moment backups are related to uploads so
// TODO(https://github.com/web3-storage/web3.storage/issues/794)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should remove this todo. This is not where the code for backups of pin requests will live.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack 👍

// Backups. At the moment backups are related to uploads so
// they' re currently not taken care of in respect of a pin request.
// We should look into this. There's an argument where backups should be related to content rather than upload, at the moment we're
// backing up content multiple times if uploaded multiple times.
// If we refactor that it will naturally work for merge requests as well.

if (ctx.waitUntil) {
tasks.forEach(t => ctx.waitUntil(t()))
Expand All @@ -234,7 +216,7 @@ async function createPin (pinData, authToken, env, ctx) {
* @param {import('./index').Ctx} ctx
*/
export async function pinGet (request, env, ctx) {
// Check if requestId contains other charachers than digits
// Ensure requestId only contains digits
if (!(/^\d+$/.test(request.params.requestId))) {
return new JSONResponse(
{ error: { reason: ERROR_STATUS, details: INVALID_REQUEST_ID } },
Expand All @@ -243,18 +225,20 @@ export async function pinGet (request, env, ctx) {
}

const requestId = parseInt(request.params.requestId, 10)

/** @type { import('../../db/db-client-types.js').PsaPinRequestUpsertOutput } */
let pinRequest

try {
pinRequest = await env.db.getPAPinRequest(requestId)
pinRequest = await env.db.getPsaPinRequest(requestId)
} catch (e) {
console.error(e)
// TODO catch different exceptions
// TODO notFound error paylod does not strictly comply to spec.
return notFound()
}

/** @type { ServiceApiPinStatus } */
/** @type { PsaPinStatusResponse } */
const pin = getPinStatus(pinRequest)
return new JSONResponse(pin)
}
Expand All @@ -281,7 +265,7 @@ export async function pinsGet (request, env, ctx) {
const opts = result.data

try {
pinRequests = await env.db.listPAPinRequests(request.auth.authToken._id, opts)
pinRequests = await env.db.listPsaPinRequests(request.auth.authToken._id, opts)
} catch (e) {
console.error(e)
return notFound()
Expand Down Expand Up @@ -407,19 +391,20 @@ function parseSearchParams (params) {
* Transform a PinRequest into a PinStatus
*
* @param { Object } pinRequest
* @returns { ServiceApiPinStatus }
* @returns { PsaPinStatusResponse }
*/
function getPinStatus (pinRequest) {
return {
requestId: pinRequest._id,
status: getPinningAPIStatus(pinRequest.pins),
requestId: pinRequest._id.toString(),
status: getEffectivePinStatus(pinRequest.pins),
created: pinRequest.created,
pin: {
cid: pinRequest.requestedCid,
cid: pinRequest.sourceCid,
name: pinRequest.name,
origins: [],
meta: {}
},
// TODO(https://github.com/web3-storage/web3.storage/issues/792)
delegates: []
}
}
Expand Down Expand Up @@ -453,15 +438,15 @@ export async function pinDelete (request, env, ctx) {
let res
try {
// Update deleted_at (and updated_at) timestamp for the pin request.
res = await env.db.deletePAPinRequest(requestId, authToken._id)
res = await env.db.deletePsaPinRequest(requestId, authToken._id)
} catch (e) {
console.error(e)
// TODO catch different exceptions
// TODO notFound error paylod does not strictly comply to spec.
return notFound()
}

return new JSONResponse(res)
return new JSONResponse({}, { status: 202 })
}

/**
Expand All @@ -474,12 +459,12 @@ export async function pinDelete (request, env, ctx) {
async function replacePin (newPinData, requestId, authToken, env, ctx) {
let existingPinRequest
try {
existingPinRequest = await env.db.getPAPinRequest(requestId)
existingPinRequest = await env.db.getPsaPinRequest(requestId)
} catch (e) {
return notFound()
}

const existingCid = existingPinRequest.requestedCid
const existingCid = existingPinRequest.sourceCid
if (newPinData.cid === existingCid) {
return new JSONResponse(
{ error: { reason: ERROR_STATUS, details: INVALID_REPLACE } },
Expand All @@ -498,7 +483,7 @@ async function replacePin (newPinData, requestId, authToken, env, ctx) {
}

try {
await env.db.deletePAPinRequest(requestId, authToken)
await env.db.deletePsaPinRequest(requestId, authToken)
} catch (e) {
return new JSONResponse(
{ error: { reason: `DB Error: ${e}` } },
Expand Down
Loading