diff --git a/packages/cron/src/bin/pinata.js b/packages/cron/src/bin/pinata.js index e69de29bb2d..1c8ff068077 100644 --- a/packages/cron/src/bin/pinata.js +++ b/packages/cron/src/bin/pinata.js @@ -0,0 +1,16 @@ +#!/usr/bin/env node + +import dotenv from 'dotenv' +import { syncPinata } from '../jobs/pinata.js' +import { getCloudflare, getPinata } from '../lib/utils.js' + +async function main() { + const env = process.env.ENV || 'dev' + const cf = getCloudflare(process.env) + const pinata = getPinata(process.env) + const hostNodes = (process.env.CLUSTER_ADDRS || '').split(',').filter(Boolean) + await syncPinata({ cf, pinata, env, hostNodes }) +} + +dotenv.config() +main() diff --git a/packages/cron/src/jobs/pinata.js b/packages/cron/src/jobs/pinata.js index e69de29bb2d..32aedc0d4dd 100644 --- a/packages/cron/src/jobs/pinata.js +++ b/packages/cron/src/jobs/pinata.js @@ -0,0 +1,56 @@ +import debug from 'debug' +import { findNs } from '../lib/utils.js' + +const log = debug('pinata:syncPinata') + +/** + * Syncs our pinset with Pinata. + * + * @param {import('../types').Config & { + * pinata: import('../lib/pinata').Pinata + * hostNodes: string[] + * }} config + */ +export async function syncPinata({ cf, env, pinata, hostNodes }) { + const namespaces = await cf.fetchKVNamespaces() + const pinsNs = findNs(namespaces, env, 'PINS') + log(`🎯 Syncing ${pinsNs.title} to Pinata`) + + let i = 0 + for await (const keys of cf.fetchKVKeys(pinsNs.id)) { + log(`📥 Processing ${i} -> ${i + keys.length}`) + + /** @type {import('../lib/cloudflare.js').BulkWritePair[]} */ + const bulkWrites = [] + + await Promise.all( + keys.map(async (k) => { + const { name: cid, metadata: pin } = k + + // if not pinned by us or already pinned on Pinata + if (pin.status !== 'pinned' || pin.pinataStatus === 'pinned') { + return + } + + const pinned = await pinata.isPinned(cid) + // if pinata has finally pinned it then update status in our KV + if (pinned) { + log(`📌 ${cid} became pinned on Pinata!`) + const metadata = { ...pin, pinataStatus: 'pinned' } + return bulkWrites.push({ key: cid, value: '', metadata }) + } + + // submit to Pinata + log(`🙏 asking Pinata to pin ${cid}`) + return pinata.pinByHash(cid, { pinataOptions: { hostNodes } }) + }) + ) + + if (bulkWrites.length) { + log(`🗂 updating pinata status for ${bulkWrites.length} pins`) + await cf.writeKVMulti(pinsNs.id, bulkWrites) + } + + i += keys.length + } +} diff --git a/packages/cron/src/lib/cloudflare.js b/packages/cron/src/lib/cloudflare.js index 124e447831f..dced93ada23 100644 --- a/packages/cron/src/lib/cloudflare.js +++ b/packages/cron/src/lib/cloudflare.js @@ -1,8 +1,7 @@ import FormData from 'form-data' import { RateLimiter } from 'limiter' -import fetch from 'node-fetch' -import retry from 'p-retry' import { URL } from 'url' +import { fetchJSON } from './fetch.js' /** * @typedef {{ id: string, title: string }} Namespace @@ -28,43 +27,6 @@ export class Cloudflare { }) } - /** - * @private - * @param {string} url - * @param {import('node-fetch').RequestInit} [init] - * @returns {Promise} - */ - async fetchJSON(url, init) { - await this.limiter.removeTokens(1) - const res = await retry( - async () => { - const controller = new AbortController() - const abortID = setTimeout(() => controller.abort(), 60000) - init = init || {} - init.headers = init.headers || {} - init.headers.Authorization = `Bearer ${this.apiToken}` - // @ts-ignore - init.signal = controller.signal - try { - const res = await fetch(url, init) - const text = await res.text() - if (!res.ok) { - throw new Error(`${res.status} ${res.statusText}: ${text}`) - } - return text === '' ? null : JSON.parse(text) - } finally { - clearTimeout(abortID) - } - }, - { - onFailedAttempt: (err) => console.warn(`💥 fetch ${url}`, err), - retries: 5, - minTimeout: 60000, - } - ) - return res - } - /** * @returns {Promise} */ @@ -76,7 +38,13 @@ export class Cloudflare { `${this.kvNsPath}?page=${page}&per_page=100`, endpoint ) - const { result_info, result } = await this.fetchJSON(url.toString()) + const { result_info, result } = await fetchJSON( + this.limiter, + url.toString(), + { + headers: { Authorization: `Bearer ${this.apiToken}` }, + } + ) namespaces.push(...result) if (result_info.page === result_info.total_pages) { break @@ -97,7 +65,13 @@ export class Cloudflare { `${this.kvNsPath}/${nsId}/keys?cursor=${cursor}&limit=1000`, endpoint ) - const { result_info, result } = await this.fetchJSON(url.toString()) + const { result_info, result } = await fetchJSON( + this.limiter, + url.toString(), + { + headers: { Authorization: `Bearer ${this.apiToken}` }, + } + ) yield result cursor = result_info.cursor if (!cursor) { @@ -120,7 +94,11 @@ export class Cloudflare { const body = new FormData() body.append('value', JSON.stringify(value)) body.append('metadata', JSON.stringify(metadata || {})) - await this.fetchJSON(url.toString(), { method: 'PUT', body }) + await fetchJSON(this.limiter, url.toString(), { + method: 'PUT', + headers: { Authorization: `Bearer ${this.apiToken}` }, + body, + }) } /** @@ -134,9 +112,12 @@ export class Cloudflare { ...kv, value: JSON.stringify(kv.value), })) - return this.fetchJSON(url.toString(), { + return fetchJSON(this.limiter, url.toString(), { method: 'PUT', - headers: { 'Content-Type': 'application/json' }, + headers: { + Authorization: `Bearer ${this.apiToken}`, + 'Content-Type': 'application/json', + }, body: JSON.stringify(kvs), }) } @@ -150,7 +131,9 @@ export class Cloudflare { `${this.kvNsPath}/${nsId}/values/${encodeURIComponent(key)}`, endpoint ) - return this.fetchJSON(url.toString()) + return fetchJSON(this.limiter, url.toString(), { + headers: { Authorization: `Bearer ${this.apiToken}` }, + }) } /** @@ -164,7 +147,9 @@ export class Cloudflare { )}`, endpoint ) - const { result } = await this.fetchJSON(url.toString()) + const { result } = await fetchJSON(this.limiter, url.toString(), { + headers: { Authorization: `Bearer ${this.apiToken}` }, + }) return result.length ? result[0].metadata : null } @@ -174,9 +159,12 @@ export class Cloudflare { */ async deleteKVMulti(nsId, keys) { const url = new URL(`${this.kvNsPath}/${nsId}/bulk`, endpoint) - return this.fetchJSON(url.toString(), { + return fetchJSON(this.limiter, url.toString(), { method: 'DELETE', - headers: { 'Content-Type': 'application/json' }, + headers: { + 'Content-Type': 'application/json', + Authorization: `Bearer ${this.apiToken}`, + }, body: JSON.stringify(keys), }) } diff --git a/packages/cron/src/lib/fetch.js b/packages/cron/src/lib/fetch.js new file mode 100644 index 00000000000..47bf701651f --- /dev/null +++ b/packages/cron/src/lib/fetch.js @@ -0,0 +1,55 @@ +import fetch from 'node-fetch' +import retry from 'p-retry' +import debug from 'debug' + +const log = debug('fetchJSON') + +const REQUEST_TIMEOUT = 60000 +const RETRY_INTERVAL = 60000 +const RETRY_ATTEMPTS = 5 + +/** + * @param {import('limiter').RateLimiter} limiter + * @param {string} url + * @param {import('node-fetch').RequestInit} [init] + * @returns {Promise} + */ +export async function fetchJSON(limiter, url, init) { + await limiter.removeTokens(1) + const res = await retry( + async () => { + const controller = new AbortController() + const abortID = setTimeout(() => controller.abort(), REQUEST_TIMEOUT) + init = init || {} + // @ts-ignore + init.signal = controller.signal + try { + const res = await fetch(url, init) + const text = await res.text() + if (!res.ok) { + throw Object.assign( + new Error(`${res.status} ${res.statusText}: ${text}`), + { response: res } + ) + } + return text === '' ? null : JSON.parse(text) + } finally { + clearTimeout(abortID) + } + }, + { + onFailedAttempt: async (err) => { + // @ts-ignore + if (err.response && err.response.status === 429) { + log(`🚦 rate limited ${url}`) + } else { + log(`💥 fetch ${url}`, err) + } + await limiter.removeTokens(1) + }, + retries: RETRY_ATTEMPTS, + minTimeout: RETRY_INTERVAL, + } + ) + return res +} diff --git a/packages/cron/src/lib/pinata.js b/packages/cron/src/lib/pinata.js index b7f1317532a..aca74326ef6 100644 --- a/packages/cron/src/lib/pinata.js +++ b/packages/cron/src/lib/pinata.js @@ -1,5 +1,6 @@ import { URL } from 'url' -import fetch from 'node-fetch' +import { RateLimiter } from 'limiter' +import { fetchJSON } from './fetch.js' const endpoint = 'https://api.pinata.cloud' @@ -9,6 +10,7 @@ export class Pinata { */ constructor({ apiToken }) { this.apiToken = apiToken + this.limiter = new RateLimiter({ tokensPerInterval: 2, interval: 'second' }) } /** @@ -18,8 +20,7 @@ export class Pinata { */ async pinByHash(cid, options) { const url = new URL('/pinning/pinByHash', endpoint) - - const res = await fetch(url.toString(), { + return fetchJSON(this.limiter, url.toString(), { method: 'POST', headers: { Authorization: `Bearer ${this.apiToken}`, @@ -27,15 +28,22 @@ export class Pinata { }, body: JSON.stringify({ hashToPin: cid, ...(options || {}) }), }) + } - const text = await res.text() - if (!res.ok) { - throw Object.assign( - new Error(`${res.status} ${res.statusText}: ${text}`), - { response: res } - ) - } + /** + * @param {string} cid + * @returns {Promise} + */ + async isPinned(cid) { + const url = new URL( + `/data/pinList?status=pinned&hashContains=${encodeURIComponent(cid)}`, + endpoint + ) + const { count, rows } = await fetchJSON(this.limiter, url.toString(), { + method: 'GET', + headers: { Authorization: `Bearer ${this.apiToken}` }, + }) - return JSON.parse(text) + return Boolean(count) } }