Skip to content

Commit

Permalink
feat: s3 backups (#899)
Browse files Browse the repository at this point in the history
This uploads data sent to the `/upload` endpoint to S3 for disaster recovery. All data is converted to CAR (if not CAR already) before being uploaded to S3.

Note: does not backup pinned data or IPNFT's uploaded to the `/store` endpoint

resolves #390
resolves #637
resolves #355
  • Loading branch information
Alan Shaw authored Jan 21, 2022
1 parent bfeea0c commit 60f17c0
Show file tree
Hide file tree
Showing 21 changed files with 2,522 additions and 1,978 deletions.
10 changes: 9 additions & 1 deletion packages/api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,11 @@ wrangler secret put CLUSTER_SERVICE --env production # Which cluster should be u
wrangler secret put MAILCHIMP_API_KEY --env production # Get from mailchimp
wrangler secret put LOGTAIL_TOKEN --env production # Get from Logtail
wrangler secret put PSA_ALLOW --env production # CSV user ID list, get from 1password vault

wrangler secret put METAPLEX_AUTH_TOKEN --env production # User ID meteplex endpoint should use (not required for dev)
wrangler secret put S3_REGION --env production # e.g us-east-2 (not required for dev)
wrangler secret put S3_ACCESS_KEY_ID --env production # Get from Amazon S3 (not required for dev)
wrangler secret put S3_SECRET_ACCESS_KEY --env production # Get from Amazon S3 (not required for dev)
wrangler secret put S3_BUCKET_NAME --env production # e.g nft.storage-staging-us-east-2 (not required for dev)
wrangler publish --env production
```

Expand Down Expand Up @@ -138,3 +142,7 @@ When prompted for a value enter one of the following permission combinations:
- `--` = no reading or writing
- `r-` = read only mode
- `rw` = read and write (normal operation)

## S3 Setup

We use [S3](https://aws.amazon.com/s3/) for backup and disaster recovery. For production deployment an account on AWS is required.
18 changes: 17 additions & 1 deletion packages/api/db/functions.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
DROP FUNCTION IF EXISTS create_upload;
DROP FUNCTION IF EXISTS find_deals_by_content_cids;
DROP FUNCTION IF EXISTS json_arr_to_text_arr;

DROP TYPE IF EXISTS upload_pin_type;

Expand All @@ -9,12 +10,20 @@ CREATE TYPE upload_pin_type AS
service service_type
);

-- transform a JSON array property into an array of SQL text elements
CREATE OR REPLACE FUNCTION json_arr_to_text_arr(_json json)
RETURNS text[] LANGUAGE sql IMMUTABLE PARALLEL SAFE AS
'SELECT ARRAY(SELECT json_array_elements_text(_json))';

CREATE OR REPLACE FUNCTION create_upload(data json) RETURNS void
LANGUAGE plpgsql
volatile
PARALLEL UNSAFE
AS
$$
DECLARE
inserted_upload_id BIGINT;
backup_url TEXT;
BEGIN
SET LOCAL statement_timeout = '30s';

Expand Down Expand Up @@ -63,8 +72,15 @@ BEGIN
meta = (data ->> 'meta')::jsonb,
origins = (data ->> 'origins')::jsonb,
mime_type = data ->> 'mime_type',
type = (data ->> 'type')::upload_type;
type = (data ->> 'type')::upload_type
RETURNING id INTO inserted_upload_id;

FOREACH backup_url IN ARRAY json_arr_to_text_arr(data -> 'backup_urls')
LOOP
INSERT INTO backup (upload_id, url, inserted_at)
VALUES (inserted_upload_id, backup_url, (data ->> 'inserted_at')::TIMESTAMPTZ)
ON CONFLICT (upload_id, url) DO NOTHING;
END LOOP;
END
$$;

Expand Down
12 changes: 12 additions & 0 deletions packages/api/db/tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,15 @@ CREATE TABLE IF NOT EXISTS metric
inserted_at TIMESTAMP WITH TIME ZONE DEFAULT timezone('utc'::text, now()) NOT NULL,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT timezone('utc'::text, now()) NOT NULL
);

-- URLs of backups of user uploads
CREATE TABLE IF NOT EXISTS backup
(
id BIGSERIAL PRIMARY KEY,
upload_id BIGINT NOT NULL REFERENCES public.upload (id),
url TEXT NOT NULL,
inserted_at TIMESTAMP WITH TIME ZONE DEFAULT timezone('utc'::text, now()) NOT NULL,
UNIQUE (upload_id, url)
);

CREATE INDEX IF NOT EXISTS backup_upload_id_idx ON backup (upload_id);
5 changes: 4 additions & 1 deletion packages/api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,20 @@
"author": "Hugo Dias <hugomrdias@gmail.com> (hugodias.me)",
"license": "(Apache-2.0 OR MIT)",
"dependencies": {
"@aws-sdk/client-s3": "^3.37.0",
"@cfworker/json-schema": "^1.8.3",
"@ipld/car": "^3.1.20",
"@ipld/dag-cbor": "^6.0.13",
"@magic-sdk/admin": "^1.3.0",
"@nftstorage/ipfs-cluster": "^3.4.3",
"@supabase/postgrest-js": "^0.34.1",
"ipfs-car": "^0.6.1",
"merge-options": "^3.0.4",
"multiformats": "^9.4.10",
"nanoid": "^3.1.30",
"regexparam": "^2.0.0",
"toucan-js": "^2.4.1"
"toucan-js": "^2.4.1",
"uint8arrays": "^3.0.0"
},
"devDependencies": {
"@cloudflare/workers-types": "^3.1.1",
Expand Down
86 changes: 52 additions & 34 deletions packages/api/pw-test.config.cjs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ const dotenv = require('dotenv')
const execa = require('execa')
const { once } = require('events')

/** @typedef {{ proc: execa.ExecaChildProcess<string> }} ProcessObject */

dotenv.config({
path: path.join(__dirname, '../../.env'),
})
Expand Down Expand Up @@ -46,45 +48,61 @@ module.exports = {
},
beforeTests: async () => {
const project = `nft-storage-db-${Date.now()}`
const proc = execa('smoke', ['-p', '9094', 'test/mocks/cluster'], {
preferLocal: true,
})

if (proc.stdout) {
const stdout = await Promise.race([
once(proc.stdout, 'data'),
// Make sure that we fail if process crashes. However if it exits without
// producing stdout just resolve to ''.
proc.then(() => ''),
])

if (
stdout.toString().includes('Server started on: http://localhost:9094')
) {
console.log('⚡️ Mock IPFS Cluster started.')

await execa(cli, ['db', '--start', '--project', project])
console.log('⚡️ Postgres started.')

await execa(cli, ['db-sql', '--cargo', '--testing'])
console.log('⚡️ SQL schema loaded.')

proc.stdout.on('data', (line) => console.log(line.toString()))
return { proc, project }
} else {
throw new Error('Could not start smoke server')
}
} else {
throw new Error('Could not start smoke server')
}
const mockServers = [
await startMockServer('IPFS Cluster', 9094, 'test/mocks/cluster'),
await startMockServer('AWS S3', 9095, 'test/mocks/aws-s3'),
]

await execa(cli, ['db', '--start', '--project', project])
console.log('⚡️ Postgres started.')

await execa(cli, ['db-sql', '--cargo', '--testing'])
console.log('⚡️ SQL schema loaded.')

return { mockServers, project }
},
afterTests: async (ctx, /** @type{any} */ beforeTests) => {
afterTests: async (
ctx,
/** @type {{ project: string, mockServers: ProcessObject[] }} */ beforeTests
) => {
console.log('⚡️ Shutting down mock servers.')

await execa(cli, ['db', '--clean', '--project', beforeTests.project])

/** @type {import('execa').ExecaChildProcess} */
const proc = beforeTests.proc
const killed = proc.kill()
beforeTests.mockServers.forEach(({ proc }) => proc.kill())
},
}

/**
* @param {string} name
* @param {number} port
* @param {string} handlerPath
* @returns {Promise<ProcessObject>}
*/
async function startMockServer(name, port, handlerPath) {
const proc = execa('smoke', ['-p', String(port), handlerPath], {
preferLocal: true,
})
if (!proc.stdout || !proc.stderr) {
throw new Error('missing process stdio stream(s)')
}

const stdout = await Promise.race([
once(proc.stdout, 'data'),
// Make sure that we fail if process crashes. However if it exits without
// producing stdout just resolve to ''.
proc.then(() => ''),
])

proc.stdout.on('data', (line) => console.log(line.toString()))
proc.stderr.on('data', (line) => console.error(line.toString()))

const startMsg = `Server started on: http://localhost:${port}`
if (!stdout.toString().includes(startMsg)) {
throw new Error(`Failed to start ${name} mock server`)
}

console.log(`⚡️ Mock ${name} started.`)
return { proc }
}
28 changes: 28 additions & 0 deletions packages/api/src/bindings.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,18 @@ declare global {
const MAINTENANCE_MODE: Mode
const METAPLEX_AUTH_TOKEN: string
const PSA_ALLOW: string
const S3_ENDPOINT: string
const S3_REGION: string
const S3_ACCESS_KEY_ID: string
const S3_SECRET_ACCESS_KEY: string
const S3_BUCKET_NAME: string
}

export interface RouteContext {
params: Record<string, string>
db: DBClient
log: Logging
backup?: BackupClient
}

export type Handler = (
Expand Down Expand Up @@ -146,3 +152,25 @@ export interface PinsResponse {
}
delegates: string[]
}

/**
* The known structural completeness of a given DAG. "Unknown" structure means
* it could be a partial or it could be a complete DAG i.e. we haven't walked
* the graph to verify if we have all the blocks or not.
*/
export type DagStructure = 'Unknown' | 'Partial' | 'Complete'

/**
* A client to a service that backups up a CAR file.
*/
export interface BackupClient {
/**
* Uploads the CAR file to the service and returns the URL.
*/
backupCar(
userId: number,
rootCid: CID,
car: Blob,
structure?: DagStructure
): Promise<URL>
}
35 changes: 23 additions & 12 deletions packages/api/src/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,29 @@ export const secrets = {
database: DATABASE_TOKEN,
mailchimp: MAILCHIMP_API_KEY,
logtail: LOGTAIL_TOKEN,
metaplexAuth: METAPLEX_AUTH_TOKEN,
metaplexAuth:
typeof METAPLEX_AUTH_TOKEN !== 'undefined'
? METAPLEX_AUTH_TOKEN
: undefined,
}

const CLUSTER1 = 'https://nft.storage.ipfscluster.io/api/'
const CLUSTER2 = 'https://nft2.storage.ipfscluster.io/api/'
const CLUSTER3 = 'https://nft3.storage.ipfscluster.io/api/'
let clusterUrl

switch (CLUSTER_SERVICE) {
case 'IpfsCluster':
let clusterUrl
if (typeof CLUSTER_SERVICE !== 'undefined' && CLUSTER_SERVICE) {
if (CLUSTER_SERVICE === 'IpfsCluster') {
clusterUrl = CLUSTER1
break
case 'IpfsCluster2':
} else if (CLUSTER_SERVICE === 'IpfsCluster2') {
clusterUrl = CLUSTER2
break
case 'IpfsCluster3':
} else if (CLUSTER_SERVICE === 'IpfsCluster3') {
clusterUrl = CLUSTER3
break
default:
clusterUrl = CLUSTER_API_URL
break
} else {
throw new Error(`unknown cluster service: ${CLUSTER_SERVICE}`)
}
} else {
clusterUrl = CLUSTER_API_URL
}

export const cluster = {
Expand Down Expand Up @@ -57,3 +59,12 @@ export const isDebug = DEBUG === 'true'
*/
export const psaAllow =
typeof PSA_ALLOW !== 'undefined' ? PSA_ALLOW.split(',') : ['*']

export const s3 = {
endpoint: typeof S3_ENDPOINT !== 'undefined' ? S3_ENDPOINT : '',
region: typeof S3_REGION !== 'undefined' ? S3_REGION : '',
accessKeyId: typeof S3_ACCESS_KEY_ID !== 'undefined' ? S3_ACCESS_KEY_ID : '',
secretAccessKey:
typeof S3_SECRET_ACCESS_KEY !== 'undefined' ? S3_SECRET_ACCESS_KEY : '',
bucketName: typeof S3_BUCKET_NAME !== 'undefined' ? S3_BUCKET_NAME : '',
}
Loading

0 comments on commit 60f17c0

Please sign in to comment.