Skip to content

Commit

Permalink
fix: move to cluster3 (#962)
Browse files Browse the repository at this point in the history
- [x] set cf secret `CLUSTER_SERVICE` to `IpfsCluster3`
- [x] needs `alter type service_type add value 'IpfsCluster3';` in production and staging
- [x] cron jobs needs to support nft3 cluster but we need to figure out if we are moving everything. If yes we can just have one single cluster.
  • Loading branch information
hugomrdias authored Jan 17, 2022
1 parent 24df1f6 commit b6fc2c3
Show file tree
Hide file tree
Showing 12 changed files with 73 additions and 24 deletions.
2 changes: 1 addition & 1 deletion packages/api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ wrangler secret put PINATA_JWT --env production # Get from Pinata
wrangler secret put SENTRY_DSN --env USER # Get from Sentry
wrangler secret put DATABASE_TOKEN --env production # Get from database account
wrangler secret put CLUSTER_BASIC_AUTH_TOKEN --env production # Get from nft.storage vault in 1password
wrangler secret put CLUSTER_SERVICE --env production # Which cluster should be used. Options 'IpfsCluster' or 'IpfsCluster2'
wrangler secret put CLUSTER_SERVICE --env production # Which cluster should be used. Options 'IpfsCluster' / 'IpfsCluster2' / 'IpfsCluster3'
wrangler secret put MAILCHIMP_API_KEY --env production # Get from mailchimp
wrangler secret put LOGTAIL_TOKEN --env production # Get from Logtail
wrangler secret put PSA_ALLOW --env production # CSV user ID list, get from 1password vault
Expand Down
4 changes: 3 additions & 1 deletion packages/api/db/tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ CREATE TYPE service_type AS ENUM (
-- The original NFT.Storage cluster.
'IpfsCluster',
-- The current cluster, originally commissioned for niftysave.
'IpfsCluster2'
'IpfsCluster2',
-- New cluster with flatfs and better DHT
'IpfsCluster3'
);

-- Upload type is the type of received upload data.
Expand Down
2 changes: 1 addition & 1 deletion packages/api/src/bindings.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { Logging } from './utils/logs.js'
declare global {
const SALT: string
const DEBUG: string
const CLUSTER_SERVICE: 'IpfsCluster' | 'IpfsCluster2'
const CLUSTER_SERVICE: 'IpfsCluster' | 'IpfsCluster2' | 'IpfsCluster3'
const CLUSTER_API_URL: string
const CLUSTER_BASIC_AUTH_TOKEN: string
const MAGIC_SECRET_KEY: string
Expand Down
4 changes: 4 additions & 0 deletions packages/api/src/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export const secrets = {

const CLUSTER1 = 'https://nft.storage.ipfscluster.io/api/'
const CLUSTER2 = 'https://nft2.storage.ipfscluster.io/api/'
const CLUSTER3 = 'https://nft3.storage.ipfscluster.io/api/'
let clusterUrl

switch (CLUSTER_SERVICE) {
Expand All @@ -21,6 +22,9 @@ switch (CLUSTER_SERVICE) {
case 'IpfsCluster2':
clusterUrl = CLUSTER2
break
case 'IpfsCluster3':
clusterUrl = CLUSTER3
break
default:
clusterUrl = CLUSTER_API_URL
break
Expand Down
27 changes: 20 additions & 7 deletions packages/api/src/utils/db-client.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@ import { PostgrestClient, PostgrestQueryBuilder } from '@supabase/postgrest-js'
/** @type {Array<definitions['upload']['type']>} */
export const UPLOAD_TYPES = ['Car', 'Blob', 'Multipart', 'Remote', 'Nft']
/** @type {Array<definitions['pin']['service']>} */
export const PIN_SERVICES = ['IpfsCluster2', 'IpfsCluster', 'Pinata']
export const PIN_SERVICES = [
'IpfsCluster3',
'IpfsCluster2',
'IpfsCluster',
'Pinata',
]
/** @type {Array<definitions['pin']['status']>} */
export const PIN_STATUSES = ['PinQueued', 'Pinning', 'Pinned', 'PinError']

Expand Down Expand Up @@ -74,7 +79,7 @@ export class DBClient {
const defaultPins = [
{
status: 'PinQueued',
service: 'IpfsCluster2',
service: 'IpfsCluster3',
},
{
status: 'PinQueued',
Expand Down Expand Up @@ -128,8 +133,12 @@ export class DBClient {
.eq('source_cid', cid)
.eq('user_id', userId)
.is('deleted_at', null)
// @ts-ignore
.filter('content.pin.service', 'in', '(IpfsCluster,IpfsCluster2)')
.filter(
// @ts-ignore
'content.pin.service',
'in',
'(IpfsCluster,IpfsCluster2,IpfsCluster3)'
)
.single()

if (status === 406 || !upload) {
Expand All @@ -156,8 +165,12 @@ export class DBClient {
.select(this.uploadQuery)
.eq('user_id', userId)
.is('deleted_at', null)
// @ts-ignore
.filter('content.pin.service', 'in', '(IpfsCluster,IpfsCluster2)')
.filter(
// @ts-ignore
'content.pin.service',
'in',
'(IpfsCluster,IpfsCluster2,IpfsCluster3)'
)
.limit(opts.limit || 10)
.order('inserted_at', { ascending: false })

Expand Down Expand Up @@ -263,7 +276,7 @@ export class DBClient {
pins:pin(status, service, inserted_at)`
)
// @ts-ignore
.filter('pins.service', 'in', '(IpfsCluster,IpfsCluster2)')
.filter('pins.service', 'in', '(IpfsCluster,IpfsCluster2,IpfsCluster3)')
.eq('cid', cid)
.single()

Expand Down
2 changes: 1 addition & 1 deletion packages/api/src/utils/db-types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,7 @@ export interface definitions {
* This is a Foreign Key to `content.cid`.<fk table='content' column='cid'/>
*/
content_cid: string
service: 'Pinata' | 'IpfsCluster' | 'IpfsCluster2'
service: 'Pinata' | 'IpfsCluster' | 'IpfsCluster2' | 'IpfsCluster3'
inserted_at: string
updated_at: string
}
Expand Down
10 changes: 6 additions & 4 deletions packages/api/test/nfts-store.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ describe('NFT store', () => {
result,
{
ipnft: 'bafyreicnwbboevx6g6fykitf4nebz2kqgkqz35qvlnlcgfulhrris66m6i',
url: 'ipfs://bafyreicnwbboevx6g6fykitf4nebz2kqgkqz35qvlnlcgfulhrris66m6i/metadata.json',
url:
'ipfs://bafyreicnwbboevx6g6fykitf4nebz2kqgkqz35qvlnlcgfulhrris66m6i/metadata.json',
data: {
name: 'name',
description: 'stuff',
Expand Down Expand Up @@ -78,7 +79,7 @@ describe('NFT store', () => {
content_cid:
'bafyreicnwbboevx6g6fykitf4nebz2kqgkqz35qvlnlcgfulhrris66m6i',
status: 'PinQueued',
service: 'IpfsCluster2',
service: 'IpfsCluster3',
},
{
content_cid:
Expand Down Expand Up @@ -115,7 +116,8 @@ describe('NFT store', () => {
result,
{
ipnft: 'bafyreihihnpztkkjaegm2cldghihv4otaa23v2lf6uhmbm2avoolgkfynm',
url: 'ipfs://bafyreihihnpztkkjaegm2cldghihv4otaa23v2lf6uhmbm2avoolgkfynm/metadata.json',
url:
'ipfs://bafyreihihnpztkkjaegm2cldghihv4otaa23v2lf6uhmbm2avoolgkfynm/metadata.json',
data: {
name: 'name',
description: 'stuff',
Expand Down Expand Up @@ -143,7 +145,7 @@ describe('NFT store', () => {
content_cid:
'bafyreihihnpztkkjaegm2cldghihv4otaa23v2lf6uhmbm2avoolgkfynm',
status: 'PinQueued',
service: 'IpfsCluster2',
service: 'IpfsCluster3',
},
{
content_cid:
Expand Down
8 changes: 6 additions & 2 deletions packages/api/test/nfts-upload.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,13 @@ describe('NFT Upload ', () => {
.from('upload')
.select('*,content(dag_size, pin(status, service, inserted_at))')
.match({ source_cid: cid, user_id: client.userId })
.filter('content.pin.service', 'in', '(IpfsCluster,IpfsCluster2)')
.filter(
'content.pin.service',
'in',
'(IpfsCluster,IpfsCluster2,IpfsCluster3)'
)
.single()

assert.equal(data.content.pin[0].service, 'IpfsCluster2')
assert.equal(data.content.pin[0].service, 'IpfsCluster3')
})
})
10 changes: 8 additions & 2 deletions packages/cron/src/bin/pins.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@ import { fileURLToPath } from 'url'
import dotenv from 'dotenv'
import fetch from 'node-fetch'
import { updatePendingPinStatuses } from '../jobs/pins.js'
import { getDBClient, getCluster1, getCluster2 } from '../lib/utils.js'
import {
getDBClient,
getCluster1,
getCluster2,
getCluster3,
} from '../lib/utils.js'

const __dirname = path.dirname(fileURLToPath(import.meta.url))
/** @ts-ignore */
Expand All @@ -15,8 +20,9 @@ async function main() {
const db = getDBClient(process.env)
const cluster1 = getCluster1(process.env)
const cluster2 = getCluster2(process.env)
const cluster3 = getCluster3(process.env)

await updatePendingPinStatuses({ db, cluster1, cluster2 })
await updatePendingPinStatuses({ db, cluster1, cluster2, cluster3 })
}

dotenv.config({ path: path.join(__dirname, '../../../../.env') })
Expand Down
2 changes: 1 addition & 1 deletion packages/cron/src/jobs/pinata.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ export async function pinToPinata({ db, pinata }) {
break
}

/** @type {PinUpdate[]} */
/** @type {Pin[]} */
const updatedPins = []
for (const pin of pins) {
try {
Expand Down
12 changes: 8 additions & 4 deletions packages/cron/src/jobs/pins.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ const log = debug('pins:updatePinStatuses')
* db: import('../../../api/src/utils/db-client').DBClient
* cluster1: import('@nftstorage/ipfs-cluster').Cluster
* cluster2: import('@nftstorage/ipfs-cluster').Cluster
* cluster3: import('@nftstorage/ipfs-cluster').Cluster
* }} Config
* @typedef {import('../../../api/src/utils/db-types').definitions} definitions
* @typedef {Pick<definitions['pin'], 'id'|'status'|'content_cid'|'service'|'inserted_at'|'updated_at'>} Pin
Expand All @@ -24,7 +25,7 @@ export async function updatePendingPinStatuses(conf) {
const { count, error: countError } = await conf.db.client
.from('pin')
.select('*', { count: 'exact', head: true })
.in('service', ['IpfsCluster', 'IpfsCluster2'])
.in('service', ['IpfsCluster', 'IpfsCluster2', 'IpfsCluster3'])
.neq('status', 'Pinned')
.neq('status', 'PinError')
.range(0, 1)
Expand Down Expand Up @@ -75,7 +76,7 @@ export async function checkFailedPinStatuses(config) {
const { count, error: countError } = await db.client
.from('pin')
.select('*', { count: 'exact', head: true })
.in('service', ['IpfsCluster', 'IpfsCluster2'])
.in('service', ['IpfsCluster', 'IpfsCluster2', 'IpfsCluster3'])
.eq('status', 'PinError')
.gt('inserted_at', after.toISOString())
.range(0, 1)
Expand All @@ -97,7 +98,7 @@ export async function checkFailedPinStatuses(config) {
const query = db.client.from('pin')
const { data: pins, error } = await query
.select('id,status,content_cid,service')
.in('service', ['IpfsCluster', 'IpfsCluster2'])
.in('service', ['IpfsCluster', 'IpfsCluster2', 'IpfsCluster3'])
.eq('status', 'PinError')
.gt('inserted_at', after.toISOString())
.range(offset, offset + limit - 1)
Expand All @@ -123,7 +124,7 @@ export async function checkFailedPinStatuses(config) {
* }} config
*/
async function updatePinStatuses(config) {
const { countPins, fetchPins, db, cluster1, cluster2 } = config
const { countPins, fetchPins, db, cluster1, cluster2, cluster3 } = config
if (!log.enabled) {
console.log('ℹ️ Enable logging by setting DEBUG=pins:updatePinStatuses')
}
Expand Down Expand Up @@ -152,6 +153,9 @@ async function updatePinStatuses(config) {
case 'IpfsCluster2':
statusRes = await cluster2.status(pin.content_cid)
break
case 'IpfsCluster3':
statusRes = await cluster3.status(pin.content_cid)
break
default:
throw new Error(`Service ${pin.service} not supported.`)
}
Expand Down
14 changes: 14 additions & 0 deletions packages/cron/src/lib/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,20 @@ export function getCluster2(env) {
})
}

/**
* Create a new IPFS Cluster instance from the passed environment variables.
* @param {Record<string, string|undefined>} env
*/
export function getCluster3(env) {
const clusterApiUrl = env.CLUSTER3_API_URL
if (!clusterApiUrl) throw new Error('missing IPFS Cluster API URL')
const basicAuthToken = env.CLUSTER3_BASIC_AUTH_TOKEN
if (!basicAuthToken) throw new Error('missing IPFS Cluster credentials')
return new Cluster(clusterApiUrl, {
headers: { Authorization: `Basic ${basicAuthToken}` },
})
}

/**
* Create a new IPFS client instance from the passed environment variables.
* @param {Record<string, string|undefined>} env
Expand Down

0 comments on commit b6fc2c3

Please sign in to comment.