Skip to content

Commit

Permalink
feat: pinata sync job
Browse files Browse the repository at this point in the history
  • Loading branch information
Alan Shaw committed May 21, 2021
1 parent 604e1b0 commit 2e8efa2
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 59 deletions.
16 changes: 16 additions & 0 deletions packages/cron/src/bin/pinata.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#!/usr/bin/env node

import dotenv from 'dotenv'
import { syncPinata } from '../jobs/pinata.js'
import { getCloudflare, getPinata } from '../lib/utils.js'

async function main() {
const env = process.env.ENV || 'dev'
const cf = getCloudflare(process.env)
const pinata = getPinata(process.env)
const hostNodes = (process.env.CLUSTER_ADDRS || '').split(',').filter(Boolean)
await syncPinata({ cf, pinata, env, hostNodes })
}

dotenv.config()
main()
56 changes: 56 additions & 0 deletions packages/cron/src/jobs/pinata.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import debug from 'debug'
import { findNs } from '../lib/utils.js'

const log = debug('pinata:syncPinata')

/**
* Syncs our pinset with Pinata.
*
* @param {import('../types').Config & {
* pinata: import('../lib/pinata').Pinata
* hostNodes: string[]
* }} config
*/
export async function syncPinata({ cf, env, pinata, hostNodes }) {
const namespaces = await cf.fetchKVNamespaces()
const pinsNs = findNs(namespaces, env, 'PINS')
log(`🎯 Syncing ${pinsNs.title} to Pinata`)

let i = 0
for await (const keys of cf.fetchKVKeys(pinsNs.id)) {
log(`📥 Processing ${i} -> ${i + keys.length}`)

/** @type {import('../lib/cloudflare.js').BulkWritePair[]} */
const bulkWrites = []

await Promise.all(
keys.map(async (k) => {
const { name: cid, metadata: pin } = k

// if not pinned by us or already pinned on Pinata
if (pin.status !== 'pinned' || pin.pinataStatus === 'pinned') {
return
}

const pinned = await pinata.isPinned(cid)
// if pinata has finally pinned it then update status in our KV
if (pinned) {
log(`📌 ${cid} became pinned on Pinata!`)
const metadata = { ...pin, pinataStatus: 'pinned' }
return bulkWrites.push({ key: cid, value: '', metadata })
}

// submit to Pinata
log(`🙏 asking Pinata to pin ${cid}`)
return pinata.pinByHash(cid, { pinataOptions: { hostNodes } })
})
)

if (bulkWrites.length) {
log(`🗂 updating pinata status for ${bulkWrites.length} pins`)
await cf.writeKVMulti(pinsNs.id, bulkWrites)
}

i += keys.length
}
}
84 changes: 36 additions & 48 deletions packages/cron/src/lib/cloudflare.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import FormData from 'form-data'
import { RateLimiter } from 'limiter'
import fetch from 'node-fetch'
import retry from 'p-retry'
import { URL } from 'url'
import { fetchJSON } from './fetch.js'

/**
* @typedef {{ id: string, title: string }} Namespace
Expand All @@ -28,43 +27,6 @@ export class Cloudflare {
})
}

/**
* @private
* @param {string} url
* @param {import('node-fetch').RequestInit} [init]
* @returns {Promise<any>}
*/
async fetchJSON(url, init) {
await this.limiter.removeTokens(1)
const res = await retry(
async () => {
const controller = new AbortController()
const abortID = setTimeout(() => controller.abort(), 60000)
init = init || {}
init.headers = init.headers || {}
init.headers.Authorization = `Bearer ${this.apiToken}`
// @ts-ignore
init.signal = controller.signal
try {
const res = await fetch(url, init)
const text = await res.text()
if (!res.ok) {
throw new Error(`${res.status} ${res.statusText}: ${text}`)
}
return text === '' ? null : JSON.parse(text)
} finally {
clearTimeout(abortID)
}
},
{
onFailedAttempt: (err) => console.warn(`💥 fetch ${url}`, err),
retries: 5,
minTimeout: 60000,
}
)
return res
}

/**
* @returns {Promise<Namespace[]>}
*/
Expand All @@ -76,7 +38,13 @@ export class Cloudflare {
`${this.kvNsPath}?page=${page}&per_page=100`,
endpoint
)
const { result_info, result } = await this.fetchJSON(url.toString())
const { result_info, result } = await fetchJSON(
this.limiter,
url.toString(),
{
headers: { Authorization: `Bearer ${this.apiToken}` },
}
)
namespaces.push(...result)
if (result_info.page === result_info.total_pages) {
break
Expand All @@ -97,7 +65,13 @@ export class Cloudflare {
`${this.kvNsPath}/${nsId}/keys?cursor=${cursor}&limit=1000`,
endpoint
)
const { result_info, result } = await this.fetchJSON(url.toString())
const { result_info, result } = await fetchJSON(
this.limiter,
url.toString(),
{
headers: { Authorization: `Bearer ${this.apiToken}` },
}
)
yield result
cursor = result_info.cursor
if (!cursor) {
Expand All @@ -120,7 +94,11 @@ export class Cloudflare {
const body = new FormData()
body.append('value', JSON.stringify(value))
body.append('metadata', JSON.stringify(metadata || {}))
await this.fetchJSON(url.toString(), { method: 'PUT', body })
await fetchJSON(this.limiter, url.toString(), {
method: 'PUT',
headers: { Authorization: `Bearer ${this.apiToken}` },
body,
})
}

/**
Expand All @@ -134,9 +112,12 @@ export class Cloudflare {
...kv,
value: JSON.stringify(kv.value),
}))
return this.fetchJSON(url.toString(), {
return fetchJSON(this.limiter, url.toString(), {
method: 'PUT',
headers: { 'Content-Type': 'application/json' },
headers: {
Authorization: `Bearer ${this.apiToken}`,
'Content-Type': 'application/json',
},
body: JSON.stringify(kvs),
})
}
Expand All @@ -150,7 +131,9 @@ export class Cloudflare {
`${this.kvNsPath}/${nsId}/values/${encodeURIComponent(key)}`,
endpoint
)
return this.fetchJSON(url.toString())
return fetchJSON(this.limiter, url.toString(), {
headers: { Authorization: `Bearer ${this.apiToken}` },
})
}

/**
Expand All @@ -164,7 +147,9 @@ export class Cloudflare {
)}`,
endpoint
)
const { result } = await this.fetchJSON(url.toString())
const { result } = await fetchJSON(this.limiter, url.toString(), {
headers: { Authorization: `Bearer ${this.apiToken}` },
})
return result.length ? result[0].metadata : null
}

Expand All @@ -174,9 +159,12 @@ export class Cloudflare {
*/
async deleteKVMulti(nsId, keys) {
const url = new URL(`${this.kvNsPath}/${nsId}/bulk`, endpoint)
return this.fetchJSON(url.toString(), {
return fetchJSON(this.limiter, url.toString(), {
method: 'DELETE',
headers: { 'Content-Type': 'application/json' },
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${this.apiToken}`,
},
body: JSON.stringify(keys),
})
}
Expand Down
55 changes: 55 additions & 0 deletions packages/cron/src/lib/fetch.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import fetch from 'node-fetch'
import retry from 'p-retry'
import debug from 'debug'

const log = debug('fetchJSON')

const REQUEST_TIMEOUT = 60000
const RETRY_INTERVAL = 60000
const RETRY_ATTEMPTS = 5

/**
* @param {import('limiter').RateLimiter} limiter
* @param {string} url
* @param {import('node-fetch').RequestInit} [init]
* @returns {Promise<any>}
*/
export async function fetchJSON(limiter, url, init) {
await limiter.removeTokens(1)
const res = await retry(
async () => {
const controller = new AbortController()
const abortID = setTimeout(() => controller.abort(), REQUEST_TIMEOUT)
init = init || {}
// @ts-ignore
init.signal = controller.signal
try {
const res = await fetch(url, init)
const text = await res.text()
if (!res.ok) {
throw Object.assign(
new Error(`${res.status} ${res.statusText}: ${text}`),
{ response: res }
)
}
return text === '' ? null : JSON.parse(text)
} finally {
clearTimeout(abortID)
}
},
{
onFailedAttempt: async (err) => {
// @ts-ignore
if (err.response && err.response.status === 429) {
log(`🚦 rate limited ${url}`)
} else {
log(`💥 fetch ${url}`, err)
}
await limiter.removeTokens(1)
},
retries: RETRY_ATTEMPTS,
minTimeout: RETRY_INTERVAL,
}
)
return res
}
30 changes: 19 additions & 11 deletions packages/cron/src/lib/pinata.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { URL } from 'url'
import fetch from 'node-fetch'
import { RateLimiter } from 'limiter'
import { fetchJSON } from './fetch.js'

const endpoint = 'https://api.pinata.cloud'

Expand All @@ -9,6 +10,7 @@ export class Pinata {
*/
constructor({ apiToken }) {
this.apiToken = apiToken
this.limiter = new RateLimiter({ tokensPerInterval: 2, interval: 'second' })
}

/**
Expand All @@ -18,24 +20,30 @@ export class Pinata {
*/
async pinByHash(cid, options) {
const url = new URL('/pinning/pinByHash', endpoint)

const res = await fetch(url.toString(), {
return fetchJSON(this.limiter, url.toString(), {
method: 'POST',
headers: {
Authorization: `Bearer ${this.apiToken}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({ hashToPin: cid, ...(options || {}) }),
})
}

const text = await res.text()
if (!res.ok) {
throw Object.assign(
new Error(`${res.status} ${res.statusText}: ${text}`),
{ response: res }
)
}
/**
* @param {string} cid
* @returns {Promise<boolean>}
*/
async isPinned(cid) {
const url = new URL(
`/data/pinList?status=pinned&hashContains=${encodeURIComponent(cid)}`,
endpoint
)
const { count, rows } = await fetchJSON(this.limiter, url.toString(), {
method: 'GET',
headers: { Authorization: `Bearer ${this.apiToken}` },
})

return JSON.parse(text)
return Boolean(count)
}
}

0 comments on commit 2e8efa2

Please sign in to comment.